Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 44 additions & 16 deletions tree/ntuple/doc/Merging.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ Please note that the RNTupleMerger is currently experimental and the content of

Currently there is no guarantee for the user about which mode will be used to generate the merged RNTuple.
At the moment, this is how it works:
- if both compression and encoding of the target column match those of the source column, L1 is used;
- otherwise, if compression matches but encoding doesn't, L2 is used;
- otherwise L3 is used.
- if the compression of the target column match that of the source column, L1 is used;
- otherwise, L2 is used.

Note that L0 and L4 are currently never used.
L0, L3 and L4 are currently never used.

**NOTE**: prior to ROOT 6.42, if two columns had the same compression but different encoding they would undergo L3 merging (implying a recompression and resealing);
from 6.42 onwards the RNTupleMerger will instead attach a new column to the parent field as a new representation and L1-merge them.

## Goal
The goal of the RNTuple merging process is producing one output RNTuple from *N* input RNTuples that can be used as if it were produced directly in the merged state. This means that:
Expand All @@ -44,15 +46,16 @@ Consequences of R3 and R4:
The following properties are currently true but they are subject to change:

* P1: all output pages have the **same compression** (which may be different from the input pages' compression);
* P2: all pages in the same output column have the **same encoding** (which may be different from the inputs' encoding);
* P3: the output clusters are **the same as the input clusters**;
* P4: the output RNTuple **always has 1 cluster group**
* P2: the output clusters are **the same as the input clusters**;
* P3: the output RNTuple **always has 1 cluster group**

Note that these properties influence and are influenced by the level of merging used.
E.g. P1 is currently true because we only support L1 merging of pages with identical compressions. This is a limitation that we intend to lift at some point (both for L1 and L0 if we ever support it).
P2 and P3 would not necessarily be true with L4 support (which might be desirable in some cases, e.g. to group pages into smaller/larger clusters).

Note that these properties influence and are influenced by the level of merging used.
E.g. P1 and P2 are currently true because we only support L1 merging of pages with identical compressions. This is a limitation that we intend to lift at some point (both for L1 and L0 if we ever support it).
P3 and P4 would not necessarily be true with L4 support (which might be desirable in some cases, e.g. to group pages into smaller/larger clusters).
Also note that the output pages coming from matching columns of a field may use mixed encodings.

Therefore we *will* want to drop these properties at some point, in order to improve the capabilities of the Merger.
Therefore we *will* want to drop at least some of these properties at some point, in order to improve the capabilities of the Merger.

## High-level description
The merging process requires at least 1 input, in the form of an `RPageSource`.
Expand All @@ -64,14 +67,15 @@ In `Union` mode only, we allow any subsequent input RNTuple to define new fields
## Descriptor compatibility and validation
Whenever a new input is processed, we compare its descriptor with the output descriptor to verify that merging is possible.

The comparison function does 3 main things:
The comparison function does 4 main things:
- collect all "extra destination fields" (i.e. fields that exist in the output but not in this input RNTuple)
- collect all "extra source fields" from the input RNTuple
- collect and validate all common fields.
- collect and validate all common fields
- collect all columns that need to be extended with additional representations.

If the Merging Mode is set to **Filter** we require the "extra destination fields" list to be empty.
If the Merging Mode is set to **Strict** we require both the "extra destination fields" and "extra source fields" lists to be empty.
If the Merging Mode is set to **Union**, the "extra source fields" list is used to late model extend the destination model.
If the merging mode is set to **Filter** we require the "extra destination fields" list to be empty.
If the merging mode is set to **Strict** we require both the "extra destination fields" and "extra source fields" lists to be empty.
If the merging mode is set to **Union**, the "extra source fields" list is used to late model extend the destination model.

As for common fields, they are matched by name and validated as follows:
- any field that is projected in the destination must be also projected in the source and must be projected to the same field;
Expand All @@ -90,3 +94,27 @@ As for common fields, they are matched by name and validated as follows:


<sup>1</sup>: these restrictions will likely not be required for L4 merging.

## Column representation extension
In all merging modes, we allow new column representations to be attached to the source fields. This is done to allow for L1 merging of columns with different encodings, which would otherwise require recompressing.
These new column representations are added to the output RNTuple's footer and become part of its Schema Extension section. Note that in general these columns will be added as deferred *and* suppressed.

**Technical note**: this is *not* done via the regular late model extension API, but uses internal functionality.

We add new (physical) column representations in the following cases:

- when one or more columns of a field has a different type than its matching counterpart in the destination RNTuple;
- when one or more columns of a field has the same type but different metadata than its matching counterpart in the destination RNTuple (e.g. in case of a Real32Quant column, different bit width or value range).

Whenever we extend a physical column that is referred to by one or more alias columns in some projected fields, we also add a corresponding new alias column in those fields.

#### Example
Suppose we merge source RNTuples **S1** and **S2**, each with the following fields:

1. `foo` of type `int`
1. `fooProj` projecting onto field `foo`

Suppose that S1 is compressed and thus its `foo` field is represented by a column of type `kSplitInt32`, whereas S2 is uncompressed and its `foo` field is represented by a column `kInt32`.
When merging S1 and S2 we collate those two representations under the same field `foo`, so that it will now have representatives: `{kSplitInt32, kInt32}`.
At the same time, we add a second alias column to the field `fooProj`, which will now have its first column aliasing the `kSplitInt32` column (column 0 of field `foo`) and its second one aliasing the `kInt32` one (column 1 of field `foo`).

4 changes: 2 additions & 2 deletions tree/ntuple/inc/ROOT/RField/RFieldFundamental.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,11 @@ protected:
fAvailableColumns.emplace_back(ROOT::Internal::RColumn::Create<T>(onDiskTypes[0], 0, representationIndex));
if (onDiskTypes[0] == ROOT::ENTupleColumnType::kReal32Trunc) {
const auto &fdesc = desc.GetFieldDescriptor(Base::GetOnDiskId());
const auto &coldesc = desc.GetColumnDescriptor(fdesc.GetLogicalColumnIds()[0]);
const auto &coldesc = desc.GetColumnDescriptor(fdesc.GetLogicalColumnIds()[representationIndex]);
column->SetBitsOnStorage(coldesc.GetBitsOnStorage());
} else if (onDiskTypes[0] == ROOT::ENTupleColumnType::kReal32Quant) {
const auto &fdesc = desc.GetFieldDescriptor(Base::GetOnDiskId());
const auto &coldesc = desc.GetColumnDescriptor(fdesc.GetLogicalColumnIds()[0]);
const auto &coldesc = desc.GetColumnDescriptor(fdesc.GetLogicalColumnIds()[representationIndex]);
assert(coldesc.GetValueRange().has_value());
const auto [valMin, valMax] = *coldesc.GetValueRange();
column->SetBitsOnStorage(coldesc.GetBitsOnStorage());
Expand Down
7 changes: 4 additions & 3 deletions tree/ntuple/inc/ROOT/RFieldBase.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,15 @@ private:
func(target);
}

/// Translate an entry index to a column element index of the principal column and vice versa. These functions
/// take into account the role and number of repetitions on each level of the field hierarchy as follows:
/// Translate an entry index to a column element index of the principal column. This function
/// takes into account the role and number of repetitions on each level of the field hierarchy as follows:
/// - Top level fields: element index == entry index
/// - Record fields propagate their principal column index to the principal columns of direct descendant fields
/// - Collection and variant fields set the principal column index of their children to 0
///
/// The column element index also depends on the number of repetitions of each field in the hierarchy, e.g., given a
/// field with type `std::array<std::array<float, 4>, 2>`, this function returns 8 for the innermost field.
/// field with type `std::array<std::array<float, 4>, 2>`, this function called with `globalIndex == 1`
/// returns 8 for the innermost field.
ROOT::NTupleSize_t EntryToColumnElementIndex(ROOT::NTupleSize_t globalIndex) const;

/// Flushes data from active columns
Expand Down
6 changes: 6 additions & 0 deletions tree/ntuple/inc/ROOT/RNTupleDescriptor.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,12 @@ public:
ROOT::NTupleSize_t GetFirstEntryIndex() const { return fFirstEntryIndex; }
ROOT::NTupleSize_t GetNEntries() const { return fNEntries; }
const RColumnRange &GetColumnRange(ROOT::DescriptorId_t physicalId) const { return fColumnRanges.at(physicalId); }
const RColumnRange *TryGetColumnRange(ROOT::DescriptorId_t physicalId) const
{
if (auto it = fColumnRanges.find(physicalId); it != fColumnRanges.end())
return &it->second;
return nullptr;
}
const RPageRange &GetPageRange(ROOT::DescriptorId_t physicalId) const { return fPageRanges.at(physicalId); }
/// Returns an iterator over pairs { columnId, columnRange }. The iteration order is unspecified.
RColumnRangeIterable GetColumnRangeIterable() const;
Expand Down
14 changes: 7 additions & 7 deletions tree/ntuple/inc/ROOT/RNTupleMerger.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,16 @@ class RNTupleMerger final {
std::unique_ptr<ROOT::RNTupleModel> fModel;

[[nodiscard]]
ROOT::RResult<void> MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool,
const ROOT::RClusterDescriptor &clusterDesc,
std::span<const RColumnMergeInfo> commonColumns,
const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet,
std::size_t nCommonColumnsInCluster, RSealedPageMergeData &sealedPageData,
const RNTupleMergeData &mergeData, ROOT::Internal::RPageAllocator &pageAlloc);
ROOT::RResult<void>
MergeCommonColumns(ROOT::Internal::RClusterPool &clusterPool, const ROOT::RClusterDescriptor &clusterDesc,
std::span<RColumnMergeInfo> commonColumns,
const ROOT::Internal::RCluster::ColumnSet_t &commonColumnSet, std::size_t nCommonColumnsInCluster,
RSealedPageMergeData &sealedPageData, const RNTupleMergeData &mergeData,
ROOT::Internal::RPageAllocator &pageAlloc);

[[nodiscard]]
ROOT::RResult<void>
MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span<const RColumnMergeInfo> commonColumns,
MergeSourceClusters(ROOT::Internal::RPageSource &source, std::span<RColumnMergeInfo> commonColumns,
std::span<const RColumnMergeInfo> extraDstColumns, RNTupleMergeData &mergeData);

/// Creates a RNTupleMerger with the given destination.
Expand Down
15 changes: 15 additions & 0 deletions tree/ntuple/inc/ROOT/RPageStorage.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,21 @@ public:
[[nodiscard]] std::unique_ptr<RNTupleModel>
InitFromDescriptor(const ROOT::RNTupleDescriptor &descriptor, bool copyClusters);

struct RColumnReprElement {
ENTupleColumnType fType = ENTupleColumnType::kUnknown;
// 0 means "use default". Only valid for fixed-bitwidth column types.
std::uint16_t fBitWidth = 0;
std::optional<RColumnDescriptor::RValueRange> fValueRange;
};
/// Adds a new column representation to the given field.
/// \return The physical id of the first newly added column.
ROOT::DescriptorId_t
AddColumnRepresentation(const ROOT::RFieldDescriptor &field, std::span<const RColumnReprElement> newRepresentation);

/// Adds a new alias column pointing to an existing column with the given physical id to the given field.
void AddAliasColumn(const ROOT::RNTupleDescriptor &desc, const ROOT::RFieldDescriptor &field,
ROOT::DescriptorId_t physicalId);

void CommitSuppressedColumn(ColumnHandle_t columnHandle) final;
void CommitPage(ColumnHandle_t columnHandle, const ROOT::Internal::RPage &page) final;
void CommitSealedPage(ROOT::DescriptorId_t physicalColumnId, const RPageStorage::RSealedPage &sealedPage) final;
Expand Down
9 changes: 3 additions & 6 deletions tree/ntuple/src/RFieldBase.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -668,14 +668,14 @@ void ROOT::RFieldBase::Attach(std::unique_ptr<ROOT::RFieldBase> child, std::stri

ROOT::NTupleSize_t ROOT::RFieldBase::EntryToColumnElementIndex(ROOT::NTupleSize_t globalIndex) const
{
std::size_t result = globalIndex;
ROOT::NTupleSize_t result = globalIndex;
for (auto f = this; f != nullptr; f = f->GetParent()) {
auto parent = f->GetParent();
if (parent && (parent->GetStructure() == ROOT::ENTupleStructure::kCollection ||
parent->GetStructure() == ROOT::ENTupleStructure::kVariant)) {
return 0U;
}
result *= std::max(f->GetNRepetitions(), std::size_t{1U});
result *= std::max<ROOT::NTupleSize_t>(f->GetNRepetitions(), ROOT::NTupleSize_t{1U});
}
return result;
}
Expand Down Expand Up @@ -835,10 +835,7 @@ void ROOT::RFieldBase::SetColumnRepresentatives(const RColumnRepresentations::Se
if (itRepresentative == std::end(validTypes))
throw RException(R__FAIL("invalid column representative"));

// don't add a duplicate representation
if (std::find_if(fColumnRepresentatives.begin(), fColumnRepresentatives.end(),
[&r](const auto &rep) { return r == rep.get(); }) == fColumnRepresentatives.end())
fColumnRepresentatives.emplace_back(*itRepresentative);
fColumnRepresentatives.emplace_back(*itRepresentative);
}
}

Expand Down
20 changes: 18 additions & 2 deletions tree/ntuple/src/RNTupleDescriptor.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -961,8 +961,16 @@ ROOT::Internal::RClusterDescriptorBuilder::AddExtendedColumnRanges(const RNTuple
// `ROOT::RFieldBase::EntryToColumnElementIndex()`, i.e. it is a principal column reachable from the
// field zero excluding subfields of collection and variant fields.
if (c.IsDeferredColumn()) {
columnRange.SetFirstElementIndex(fCluster.GetFirstEntryIndex() * nRepetitions);
columnRange.SetNElements(fCluster.GetNEntries() * nRepetitions);
if (c.GetRepresentationIndex() == 0) {
columnRange.SetFirstElementIndex(fCluster.GetFirstEntryIndex() * nRepetitions);
columnRange.SetNElements(fCluster.GetNEntries() * nRepetitions);
} else {
const auto &field = desc.GetFieldDescriptor(fieldId);
const auto firstReprColumnId = field.GetLogicalColumnIds()[c.GetIndex()];
const auto &firstReprColumnRange = fCluster.fColumnRanges[firstReprColumnId];
columnRange.SetFirstElementIndex(firstReprColumnRange.GetFirstElementIndex());
columnRange.SetNElements(firstReprColumnRange.GetNElements());
}
if (!columnRange.IsSuppressed()) {
auto &pageRange = fCluster.fPageRanges[physicalId];
pageRange.fPhysicalColumnId = physicalId;
Expand Down Expand Up @@ -1380,6 +1388,14 @@ void ROOT::Internal::RNTupleDescriptorBuilder::ShiftAliasColumns(std::uint32_t o
R__ASSERT(fDescriptor.fColumnDescriptors.count(c.fLogicalColumnId) == 0);
fDescriptor.fColumnDescriptors.emplace(c.fLogicalColumnId, std::move(c));
}

// Patch up column ids in the header extension
if (auto &xHeader = fDescriptor.fHeaderExtension) {
for (auto &columnId : xHeader->fExtendedColumnRepresentations) {
if (columnId >= fDescriptor.GetNPhysicalColumns())
columnId += offset;
}
}
}

ROOT::RResult<void> ROOT::Internal::RNTupleDescriptorBuilder::AddCluster(RClusterDescriptor &&clusterDesc)
Expand Down
Loading
Loading