From aec6591a57e539100076a0c0200a8d8692d0be70 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 29 Apr 2026 17:22:07 -0400 Subject: [PATCH 01/10] Introduce UpdatesView reader type --- differential-dataflow/src/columnar/updates.rs | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/differential-dataflow/src/columnar/updates.rs b/differential-dataflow/src/columnar/updates.rs index 14c71c16e..dd44107f5 100644 --- a/differential-dataflow/src/columnar/updates.rs +++ b/differential-dataflow/src/columnar/updates.rs @@ -94,6 +94,35 @@ impl Clone for Updates { } } +/// Borrowed view of an [`Updates`] with the same four-field shape. +/// +/// Reader code should consume an `Updates` through this view rather than reading +/// fields directly. This decouples readers from the storage representation: the +/// view's shape stays the same whether the underlying `Updates` holds owned +/// `Lists` or (later) `Stash`-backed columns that may be borrowed from wire bytes. +pub struct UpdatesView<'a, U: Update> { + /// Outer key list (one entry per group of keys at the trie root). + pub keys: > as Borrow>::Borrowed<'a>, + /// Per-key list of vals. + pub vals: > as Borrow>::Borrowed<'a>, + /// Per-val list of times. + pub times: > as Borrow>::Borrowed<'a>, + /// Per-time list of diffs. + pub diffs: > as Borrow>::Borrowed<'a>, +} + +impl Updates { + /// Borrow the four columns as a single `UpdatesView`. + pub fn view(&self) -> UpdatesView<'_, U> { + UpdatesView { + keys: self.keys.borrow(), + vals: self.vals.borrow(), + times: self.times.borrow(), + diffs: self.diffs.borrow(), + } + } +} + /// The flat `(key, val, time, diff)` tuple for an [`Update`]. pub type Tuple = (::Key, ::Val, ::Time, ::Diff); From da7fe81c8653dc83f20117bb74824a99d1158ec0 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 29 Apr 2026 17:31:03 -0400 Subject: [PATCH 02/10] Use UpdatesView reader --- .../src/columnar/arrangement/mod.rs | 12 ++-- .../src/columnar/arrangement/trie_merger.rs | 62 ++++++++++--------- .../src/columnar/exchange.rs | 5 +- 3 files changed, 42 insertions(+), 37 deletions(-) diff --git a/differential-dataflow/src/columnar/arrangement/mod.rs b/differential-dataflow/src/columnar/arrangement/mod.rs index 82bc1598c..13f2350f7 100644 --- a/differential-dataflow/src/columnar/arrangement/mod.rs +++ b/differential-dataflow/src/columnar/arrangement/mod.rs @@ -263,7 +263,7 @@ pub mod batcher { use super::super::updates::Updates; impl timely::container::SizableContainer for Updates { - fn at_capacity(&self) -> bool { self.diffs.values.len() >= crate::columnar::LINK_TARGET } + fn at_capacity(&self) -> bool { self.view().diffs.values.len() >= crate::columnar::LINK_TARGET } fn ensure_capacity(&mut self, _stash: &mut Option) { } } @@ -343,12 +343,12 @@ pub mod builder { // Meld sorted, consolidated chain entries in order. // Pre-allocate to avoid reallocations during meld. - use columnar::{Borrow, Container}; + use columnar::Container; let mut updates = Updates::::default(); - updates.keys.reserve_for(chain.iter().map(|c| c.keys.borrow())); - updates.vals.reserve_for(chain.iter().map(|c| c.vals.borrow())); - updates.times.reserve_for(chain.iter().map(|c| c.times.borrow())); - updates.diffs.reserve_for(chain.iter().map(|c| c.diffs.borrow())); + updates.keys.reserve_for(chain.iter().map(|c| c.view().keys)); + updates.vals.reserve_for(chain.iter().map(|c| c.view().vals)); + updates.times.reserve_for(chain.iter().map(|c| c.view().times)); + updates.diffs.reserve_for(chain.iter().map(|c| c.view().diffs)); let mut builder = super::super::updates::UpdatesBuilder::new_from(updates); for chunk in chain.iter() { builder.meld(chunk); diff --git a/differential-dataflow/src/columnar/arrangement/trie_merger.rs b/differential-dataflow/src/columnar/arrangement/trie_merger.rs index 8994a9ced..5f0701846 100644 --- a/differential-dataflow/src/columnar/arrangement/trie_merger.rs +++ b/differential-dataflow/src/columnar/arrangement/trie_merger.rs @@ -95,7 +95,7 @@ where kept: &mut Vec, _stash: &mut Vec, ) { - use columnar::{Borrow, Container, ContainerOf, Index, Push}; + use columnar::{Container, ContainerOf, Index, Push}; use columnar::primitive::offsets::Strides; use crate::columnar::updates::{Lists, retain_items}; @@ -104,7 +104,8 @@ where let mut bitmap = Vec::new(); // update should be kept. for chunk in merged.drain(..) { bitmap.clear(); - let times = chunk.times.values.borrow(); + let view = chunk.view(); + let times = view.times.values; for idx in 0 .. times.len() { Columnar::copy_from(&mut time_owned, times.get(idx)); if upper.less_equal(&time_owned) { @@ -117,10 +118,10 @@ where else if bitmap.iter().all(|x| !*x) { ship.push(chunk); } else { - let (times, temp) = retain_items::>(chunk.times.borrow(), &bitmap[..]); - let (vals, temp) = retain_items::>(chunk.vals.borrow(), &temp[..]); - let (keys, _temp) = retain_items::>(chunk.keys.borrow(), &temp[..]); - let d_borrow = chunk.diffs.borrow(); + let (times, temp) = retain_items::>(view.times, &bitmap[..]); + let (vals, temp) = retain_items::>(view.vals, &temp[..]); + let (keys, _temp) = retain_items::>(view.keys, &temp[..]); + let d_borrow = view.diffs; let mut diffs = > as Container>::with_capacity_for([d_borrow].into_iter()); for (index, bit) in bitmap.iter().enumerate() { if *bit { diffs.values.push(d_borrow.values.get(index)); } @@ -135,10 +136,10 @@ where for bit in bitmap.iter_mut() { *bit = !*bit; } - let (times, temp) = retain_items::>(chunk.times.borrow(), &bitmap[..]); - let (vals, temp) = retain_items::>(chunk.vals.borrow(), &temp[..]); - let (keys, _temp) = retain_items::>(chunk.keys.borrow(), &temp[..]); - let d_borrow = chunk.diffs.borrow(); + let (times, temp) = retain_items::>(view.times, &bitmap[..]); + let (vals, temp) = retain_items::>(view.vals, &temp[..]); + let (keys, _temp) = retain_items::>(view.keys, &temp[..]); + let d_borrow = view.diffs; let mut diffs = > as Container>::with_capacity_for([d_borrow].into_iter()); for (index, bit) in bitmap.iter().enumerate() { if *bit { diffs.values.push(d_borrow.values.get(index)); } @@ -228,13 +229,14 @@ where if let Some(((k,v,t),batch)) = cursor1 { let mut out_batch = stash.pop().unwrap_or_default(); let empty: Updates = Default::default(); + let view = batch.view(); write_from_surveys( &batch, &empty, &[Report::This(0, 1)], - &[Report::This(k, batch.keys.values.len())], - &[Report::This(v, batch.vals.values.len())], - &[Report::This(t, batch.times.values.len())], + &[Report::This(k, view.keys.values.len())], + &[Report::This(v, view.vals.values.len())], + &[Report::This(t, view.times.values.len())], &mut out_batch, ); builder.push(out_batch); @@ -242,13 +244,14 @@ where if let Some(((k,v,t),batch)) = cursor2 { let mut out_batch = stash.pop().unwrap_or_default(); let empty: Updates = Default::default(); + let view = batch.view(); write_from_surveys( &empty, &batch, &[Report::That(0, 1)], - &[Report::That(k, batch.keys.values.len())], - &[Report::That(v, batch.vals.values.len())], - &[Report::That(t, batch.times.values.len())], + &[Report::That(k, view.keys.values.len())], + &[Report::That(v, view.vals.values.len())], + &[Report::That(t, view.times.values.len())], &mut out_batch, ); builder.push(out_batch); @@ -289,13 +292,14 @@ where let ((k0_idx, v0_idx, t0_idx), updates0) = batch1.take().unwrap(); let ((k1_idx, v1_idx, t1_idx), updates1) = batch2.take().unwrap(); - use columnar::Borrow; - let keys0 = updates0.keys.borrow(); - let keys1 = updates1.keys.borrow(); - let vals0 = updates0.vals.borrow(); - let vals1 = updates1.vals.borrow(); - let times0 = updates0.times.borrow(); - let times1 = updates1.times.borrow(); + let view0 = updates0.view(); + let view1 = updates1.view(); + let keys0 = view0.keys; + let keys1 = view1.keys; + let vals0 = view0.vals; + let vals1 = view1.vals; + let times0 = view0.times; + let times1 = view1.times; // Survey the interleaving of the two inputs. let mut key_survey = survey::>(keys0, keys1, &[Report::Both(0,0)]); @@ -393,12 +397,12 @@ fn write_from_surveys( time_survey: &[Report], output: &mut Updates, ) { - use columnar::Borrow; - - write_layer(updates0.keys.borrow(), updates1.keys.borrow(), root_survey, key_survey, &mut output.keys); - write_layer(updates0.vals.borrow(), updates1.vals.borrow(), key_survey, val_survey, &mut output.vals); - write_layer(updates0.times.borrow(), updates1.times.borrow(), val_survey, time_survey, &mut output.times); - write_diffs::(updates0.diffs.borrow(), updates1.diffs.borrow(), time_survey, &mut output.diffs); + let view0 = updates0.view(); + let view1 = updates1.view(); + write_layer(view0.keys, view1.keys, root_survey, key_survey, &mut output.keys); + write_layer(view0.vals, view1.vals, key_survey, val_survey, &mut output.vals); + write_layer(view0.times, view1.times, val_survey, time_survey, &mut output.times); + write_diffs::(view0.diffs, view1.diffs, time_survey, &mut output.diffs); } /// From two sequences of interleaved lists, map out the interleaving of their values. diff --git a/differential-dataflow/src/columnar/exchange.rs b/differential-dataflow/src/columnar/exchange.rs index 01dc3bb08..8d784ae1a 100644 --- a/differential-dataflow/src/columnar/exchange.rs +++ b/differential-dataflow/src/columnar/exchange.rs @@ -5,7 +5,7 @@ use std::rc::Rc; -use columnar::{Borrow, Index, Len}; +use columnar::{Index, Len}; use timely::logging::TimelyLogger; use timely::dataflow::channels::pushers::{Exchange, exchange::Distributor}; use timely::dataflow::channels::Message; @@ -32,7 +32,8 @@ impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor>>>(&mut self, container: &mut RecordedUpdates, time: &T, pushers: &mut [P]) { use super::updates::child_range; - let keys_b = container.updates.keys.borrow(); + let view = container.updates.view(); + let keys_b = view.keys; let mut outputs: Vec> = (0..pushers.len()).map(|_| Updates::default()).collect(); // Each outer key group becomes a separate run in the destination. From 50bf9efc58f7829954baab4e0b37c8443df2cfb7 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 29 Apr 2026 17:36:30 -0400 Subject: [PATCH 03/10] Introduce UpdatesOwned for writeable typed allocations --- .../examples/columnar/main.rs | 4 +- .../src/columnar/arrangement/mod.rs | 42 ++++----- .../src/columnar/arrangement/trie_merger.rs | 68 +++++++-------- differential-dataflow/src/columnar/builder.rs | 6 +- .../src/columnar/exchange.rs | 6 +- differential-dataflow/src/columnar/mod.rs | 20 ++--- differential-dataflow/src/columnar/updates.rs | 86 +++++++++---------- 7 files changed, 116 insertions(+), 116 deletions(-) diff --git a/differential-dataflow/examples/columnar/main.rs b/differential-dataflow/examples/columnar/main.rs index 919d0b99a..fdfe49c06 100644 --- a/differential-dataflow/examples/columnar/main.rs +++ b/differential-dataflow/examples/columnar/main.rs @@ -2,7 +2,7 @@ //! //! Demonstrates columnar-backed arrangements in an iterative scope, //! exercising Enter, Leave, Negate, ResultsIn on RecordedUpdates, -//! and Push on Updates for the reduce builder path. +//! and Push on UpdatesOwned for the reduce builder path. use timely::container::{ContainerBuilder, PushInto}; use timely::dataflow::InputHandle; @@ -87,7 +87,7 @@ fn main() { /// /// This module exercises the container traits needed for iterative columnar /// computation: Enter, Leave, Negate, ResultsIn on RecordedUpdates, and -/// Push on Updates for the reduce builder path. +/// Push on UpdatesOwned for the reduce builder path. mod reachability { use timely::order::Product; diff --git a/differential-dataflow/src/columnar/arrangement/mod.rs b/differential-dataflow/src/columnar/arrangement/mod.rs index 13f2350f7..d983c9f12 100644 --- a/differential-dataflow/src/columnar/arrangement/mod.rs +++ b/differential-dataflow/src/columnar/arrangement/mod.rs @@ -3,8 +3,8 @@ //! - Type aliases (`ValSpine`, `ValBatcher`, `ValBuilder`) glue columnar storage //! into DD's trace machinery. //! - `Coltainer` wraps a columnar `C::Container` as a DD `BatchContainer`. -//! - `TrieChunker` strips `RecordedUpdates` down to `Updates` for the merge batcher. -//! - `batcher` contains required trait stubs for `Updates`. +//! - `TrieChunker` strips `RecordedUpdates` down to `UpdatesOwned` for the merge batcher. +//! - `batcher` contains required trait stubs for `UpdatesOwned`. //! - `trie_merger` is the batch-at-a-time merging logic. //! - `builder::ValMirror` is the `trace::Builder` that seals melded chunks into //! an `OrdValBatch`. @@ -122,12 +122,12 @@ pub mod batch_container { } } -use super::updates::Updates; +use super::updates::UpdatesOwned; use super::RecordedUpdates; use crate::trace::implementations::merge_batcher::MergeBatcher; type ValBatcher2 = MergeBatcher, TrieChunker, trie_merger::TrieMerger>; -/// A chunker that unwraps `RecordedUpdates` into bare `Updates` for the merge batcher. +/// A chunker that unwraps `RecordedUpdates` into bare `UpdatesOwned` for the merge batcher. /// /// The intended behavior is to produce chunks whose size is within 1-2x `LINK_TARGET`. /// It ships large batches immediately, accumulates small batches, consolidates as they @@ -137,14 +137,14 @@ type ValBatcher2 = MergeBatcher, TrieChunker, trie_merg /// each of which is put in `self.stage` pub struct TrieChunker { /// Insufficiently large updates we haven't figured out how to ship yet. - blobs: Vec<(Updates, bool)>, + blobs: Vec<(UpdatesOwned, bool)>, /// Sum of `len()` across `blobs`. blob_records: usize, /// Ready-to-emit chunks. Each is sorted and consolidated; size ≥ `LINK_TARGET` /// (or smaller, only for the final chunk produced by `finish`). - ready: std::collections::VecDeque>, + ready: std::collections::VecDeque>, /// Staging area for the next pull call. - stage: Option>, + stage: Option>, } impl Default for TrieChunker { @@ -160,7 +160,7 @@ impl Default for TrieChunker { impl TrieChunker { /// Consolidate and empty `self.blobs`, into `self.ready` if large enough or else return. - fn consolidate_blobs(&mut self) -> Updates { + fn consolidate_blobs(&mut self) -> UpdatesOwned { // Single consolidated entry: pass through, no work. if self.blobs.len() == 1 && self.blobs[0].1 { let (result, _) = self.blobs.pop().unwrap(); @@ -169,14 +169,14 @@ impl TrieChunker { } // TODO: Improve consolidation through column-oriented sorts. - let result = Updates::::form_unsorted(self.blobs.iter().flat_map(|(u, _)| u.iter())); + let result = UpdatesOwned::::form_unsorted(self.blobs.iter().flat_map(|(u, _)| u.iter())); self.blobs.clear(); self.blob_records = 0; result } - /// Push a non-empty `Updates` into blobs and update accounting. - fn absorb(&mut self, updates: Updates, consolidated: bool) { + /// Push a non-empty `UpdatesOwned` into blobs and update accounting. + fn absorb(&mut self, updates: UpdatesOwned, consolidated: bool) { self.blob_records += updates.len(); self.blobs.push((updates, consolidated)); } @@ -237,7 +237,7 @@ impl<'a, U: super::layout::ColumnarUpdate> timely::container::PushInto<&'a mut R } impl timely::container::ContainerBuilder for TrieChunker { - type Container = Updates; + type Container = UpdatesOwned; fn extract(&mut self) -> Option<&mut Self::Container> { self.stage = self.ready.pop_front(); self.stage.as_mut() @@ -253,16 +253,16 @@ impl timely::container::ContainerBuilder for T } pub mod batcher { - //! Batcher trait stubs required to plug `Updates` into DD's merge batcher. + //! Batcher trait stubs required to plug `UpdatesOwned` into DD's merge batcher. use columnar::Len; use timely::progress::frontier::{Antichain, AntichainRef}; use crate::trace::implementations::merge_batcher::container::InternalMerge; use super::super::layout::ColumnarUpdate as Update; - use super::super::updates::Updates; + use super::super::updates::UpdatesOwned; - impl timely::container::SizableContainer for Updates { + impl timely::container::SizableContainer for UpdatesOwned { fn at_capacity(&self) -> bool { self.view().diffs.values.len() >= crate::columnar::LINK_TARGET } fn ensure_capacity(&mut self, _stash: &mut Option) { } } @@ -270,7 +270,7 @@ pub mod batcher { /// Required by `reduce_abelian`'s bound `Builder::Input: InternalMerge`. /// Not called at runtime — our batcher uses `TrieMerger` instead. /// TODO: Relax the bound in DD's reduce to remove this requirement. - impl InternalMerge for Updates { + impl InternalMerge for UpdatesOwned { type TimeOwned = U::Time; fn len(&self) -> usize { unimplemented!() } fn clear(&mut self) { @@ -298,7 +298,7 @@ pub mod builder { use crate::trace::implementations::ord_neu::val_batch::{OrdValBatch, OrdValStorage}; use crate::trace::Description; - use super::super::updates::Updates; + use super::super::updates::UpdatesOwned; use super::super::layout::ColumnarUpdate as Update; use super::super::layout::ColumnarLayout as Layout; use super::Coltainer; @@ -316,14 +316,14 @@ pub mod builder { output } - /// Trace [`Builder`](crate::trace::Builder) that accumulates `Updates` + /// Trace [`Builder`](crate::trace::Builder) that accumulates `UpdatesOwned` /// chunks and seals them into a single [`OrdValBatch`]. pub struct ValMirror { - chunks: Vec>, + chunks: Vec>, } impl crate::trace::Builder for ValMirror { type Time = U::Time; - type Input = Updates; + type Input = UpdatesOwned; type Output = OrdValBatch>; fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { @@ -344,7 +344,7 @@ pub mod builder { // Meld sorted, consolidated chain entries in order. // Pre-allocate to avoid reallocations during meld. use columnar::Container; - let mut updates = Updates::::default(); + let mut updates = UpdatesOwned::::default(); updates.keys.reserve_for(chain.iter().map(|c| c.view().keys)); updates.vals.reserve_for(chain.iter().map(|c| c.view().vals)); updates.times.reserve_for(chain.iter().map(|c| c.view().times)); diff --git a/differential-dataflow/src/columnar/arrangement/trie_merger.rs b/differential-dataflow/src/columnar/arrangement/trie_merger.rs index 5f0701846..0b761dff2 100644 --- a/differential-dataflow/src/columnar/arrangement/trie_merger.rs +++ b/differential-dataflow/src/columnar/arrangement/trie_merger.rs @@ -1,4 +1,4 @@ -//! Batch-at-a-time merging of sorted, consolidated `Updates` chains. +//! Batch-at-a-time merging of sorted, consolidated `UpdatesOwned` chains. //! //! The core is `TrieMerger::merge_batches`, which walks pairs of chunks via //! `merge_batch`, building a chain of merged outputs with `ChainBuilder`. @@ -11,9 +11,9 @@ use timely::progress::frontier::{Antichain, AntichainRef}; use crate::trace::implementations::merge_batcher::Merger; use super::super::layout::ColumnarUpdate as Update; -use super::super::updates::Updates; +use super::super::updates::UpdatesOwned; -/// Merge-batcher merger that melds sorted, consolidated `Updates` tries. +/// Merge-batcher merger that melds sorted, consolidated `UpdatesOwned` tries. pub struct TrieMerger { _marker: std::marker::PhantomData, } @@ -54,15 +54,15 @@ where } } -/// Build sorted `Updates` chunks from a sorted iterator of refs, -/// using `Updates::form` (which consolidates internally) on batches. +/// Build sorted `UpdatesOwned` chunks from a sorted iterator of refs, +/// using `UpdatesOwned::form` (which consolidates internally) on batches. fn form_chunks<'a, U: Update>( sorted: impl Iterator>>, - output: &mut Vec>, + output: &mut Vec>, ) { let mut sorted = sorted.peekable(); while sorted.peek().is_some() { - let chunk = Updates::::form((&mut sorted).take(crate::columnar::LINK_TARGET)); + let chunk = UpdatesOwned::::form((&mut sorted).take(crate::columnar::LINK_TARGET)); if chunk.len() > 0 { output.push(chunk); } @@ -73,15 +73,15 @@ impl Merger for TrieMerger where U::Time: 'static, { - type Chunk = Updates; + type Chunk = UpdatesOwned; type Time = U::Time; fn merge( &mut self, - list1: Vec>, - list2: Vec>, - output: &mut Vec>, - _stash: &mut Vec>, + list1: Vec>, + list2: Vec>, + output: &mut Vec>, + _stash: &mut Vec>, ) { Self::merge_batches(list1, list2, output, _stash); } @@ -127,7 +127,7 @@ where if *bit { diffs.values.push(d_borrow.values.get(index)); } } diffs.bounds = Strides::new(1, times.values.len() as u64); - kept.push(Updates { + kept.push(UpdatesOwned { keys, vals, times, @@ -145,7 +145,7 @@ where if *bit { diffs.values.push(d_borrow.values.get(index)); } } diffs.bounds = Strides::new(1, times.values.len() as u64); - ship.push(Updates { + ship.push(UpdatesOwned { keys, vals, times, @@ -169,9 +169,9 @@ where /// Correct but slow — used as fallback. #[allow(dead_code)] fn merge_iterator( - list1: &[Updates], - list2: &[Updates], - output: &mut Vec>, + list1: &[UpdatesOwned], + list2: &[UpdatesOwned], + output: &mut Vec>, ) { let iter1 = list1.iter().flat_map(|chunk| chunk.iter()); let iter2 = list2.iter().flat_map(|chunk| chunk.iter()); @@ -187,10 +187,10 @@ where /// A merge implementation that operates batch-at-a-time. #[inline(never)] fn merge_batches( - list1: Vec>, - list2: Vec>, - output: &mut Vec>, - stash: &mut Vec>, + list1: Vec>, + list2: Vec>, + output: &mut Vec>, + stash: &mut Vec>, ) { // The design for efficient "batch" merginging of chains of links is: @@ -228,7 +228,7 @@ where // TODO: create batch for the non-empty cursor. if let Some(((k,v,t),batch)) = cursor1 { let mut out_batch = stash.pop().unwrap_or_default(); - let empty: Updates = Default::default(); + let empty: UpdatesOwned = Default::default(); let view = batch.view(); write_from_surveys( &batch, @@ -243,7 +243,7 @@ where } if let Some(((k,v,t),batch)) = cursor2 { let mut out_batch = stash.pop().unwrap_or_default(); - let empty: Updates = Default::default(); + let empty: UpdatesOwned = Default::default(); let view = batch.view(); write_from_surveys( &empty, @@ -282,10 +282,10 @@ where /// cursors as part of the mapping. #[inline(never)] fn merge_batch( - batch1: &mut Option<((usize, usize, usize), Updates)>, - batch2: &mut Option<((usize, usize, usize), Updates)>, + batch1: &mut Option<((usize, usize, usize), UpdatesOwned)>, + batch2: &mut Option<((usize, usize, usize), UpdatesOwned)>, builder: &mut ChainBuilder, - stash: &mut Vec>, + stash: &mut Vec>, ) { // TODO: Optimization for one batch exceeding the other. @@ -389,13 +389,13 @@ where /// and times; `write_diffs` handles diff consolidation. #[inline(never)] fn write_from_surveys( - updates0: &Updates, - updates1: &Updates, + updates0: &UpdatesOwned, + updates1: &UpdatesOwned, root_survey: &[Report], key_survey: &[Report], val_survey: &[Report], time_survey: &[Report], - output: &mut Updates, + output: &mut UpdatesOwned, ) { let view0 = updates0.view(); let view1 = updates1.view(); @@ -642,14 +642,14 @@ pub enum Report { Both(usize, usize), } -/// Accumulates a sequence of `Updates` chunks, merging the tail when a new +/// Accumulates a sequence of `UpdatesOwned` chunks, merging the tail when a new /// chunk would extend the current run rather than start a new one. -pub struct ChainBuilder { updates: Vec> } +pub struct ChainBuilder { updates: Vec> } impl Default for ChainBuilder { fn default() -> Self { Self { updates: Default::default() } } } impl ChainBuilder { - fn push(&mut self, mut link: Updates) { + fn push(&mut self, mut link: UpdatesOwned) { link = link.filter_zero(); if link.len() > 0 { if let Some(last) = self.updates.last_mut() { @@ -664,6 +664,6 @@ impl ChainBuilder { else { self.updates.push(link); } } } - fn extend(&mut self, iter: impl IntoIterator>) { for link in iter { self.push(link); }} - fn done(self) -> Vec> { self.updates } + fn extend(&mut self, iter: impl IntoIterator>) { for link in iter { self.push(link); }} + fn done(self) -> Vec> { self.updates } } diff --git a/differential-dataflow/src/columnar/builder.rs b/differential-dataflow/src/columnar/builder.rs index 22bf48e44..c05e81bb4 100644 --- a/differential-dataflow/src/columnar/builder.rs +++ b/differential-dataflow/src/columnar/builder.rs @@ -8,7 +8,7 @@ use std::collections::VecDeque; use columnar::{Columnar, Clear, Len, Push}; use super::layout::ColumnarUpdate as Update; -use super::updates::Updates; +use super::updates::UpdatesOwned; use super::RecordedUpdates; type TupleContainer = <(::Key, ::Val, ::Time, ::Diff) as Columnar>::Container; @@ -33,7 +33,7 @@ impl PushInto for ValBuilder where TupleContainer : Push< let records = self.current.len(); let mut refs = self.current.borrow().into_index_iter().collect::>(); refs.sort(); - let updates = Updates::form(refs.into_iter()); + let updates = UpdatesOwned::form(refs.into_iter()); self.pending.push_back(RecordedUpdates { updates, records, consolidated: true }); self.current.clear(); } @@ -71,7 +71,7 @@ impl ContainerBuilder for ValBuilder { let records = self.current.len(); let mut refs = self.current.borrow().into_index_iter().collect::>(); refs.sort(); - let updates = Updates::form(refs.into_iter()); + let updates = UpdatesOwned::form(refs.into_iter()); self.pending.push_back(RecordedUpdates { updates, records, consolidated: true }); self.current.clear(); } diff --git a/differential-dataflow/src/columnar/exchange.rs b/differential-dataflow/src/columnar/exchange.rs index 8d784ae1a..861241aaa 100644 --- a/differential-dataflow/src/columnar/exchange.rs +++ b/differential-dataflow/src/columnar/exchange.rs @@ -14,7 +14,7 @@ use timely::progress::Timestamp; use timely::worker::Worker; use super::layout::ColumnarUpdate as Update; -use super::updates::Updates; +use super::updates::UpdatesOwned; use super::RecordedUpdates; /// Distributor that routes `RecordedUpdates` records to workers by hashing keys. @@ -25,7 +25,7 @@ pub struct ValDistributor { } impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor> for ValDistributor { - // TODO: For unsorted Updates (stride-1 outer keys), each key is its own outer group, + // TODO: For unsorted UpdatesOwned (stride-1 outer keys), each key is its own outer group, // so the per-group pre_lens snapshot and seal check costs O(keys × workers). Should // either batch keys by destination first, or detect stride-1 outer bounds and use a // simpler single-pass partitioning that seals once at the end. @@ -34,7 +34,7 @@ impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor> = (0..pushers.len()).map(|_| Updates::default()).collect(); + let mut outputs: Vec> = (0..pushers.len()).map(|_| UpdatesOwned::default()).collect(); // Each outer key group becomes a separate run in the destination. for outer in 0..Len::len(&keys_b) { diff --git a/differential-dataflow/src/columnar/mod.rs b/differential-dataflow/src/columnar/mod.rs index b32a4763c..dbf93fe96 100644 --- a/differential-dataflow/src/columnar/mod.rs +++ b/differential-dataflow/src/columnar/mod.rs @@ -4,7 +4,7 @@ //! changes; do not rely on stability across releases. //! //! Known rough edges: -//! - `ContainerBytes` for `RecordedUpdates` and `Updates` is `unimplemented!()`; +//! - `ContainerBytes` for `RecordedUpdates` and `UpdatesOwned` is `unimplemented!()`; //! multi-process dataflows that exchange these containers will panic. //! - `leave_dynamic` consolidates eagerly on each batch; the //! [`crate::dynamic`] counterpart defers consolidation. Same observable @@ -20,7 +20,7 @@ //! //! Module layout (bottom-up): //! - [`layout`] — `ColumnarUpdate` / `ColumnarLayout` / `OrdContainer`. -//! - [`updates`] — `Updates` trie, `Consolidating`, `UpdatesBuilder`. +//! - [`updates`] — `UpdatesOwned` trie, `Consolidating`, `UpdatesBuilder`. //! - [`builder`] — `ValColBuilder`: the input-side `ContainerBuilder`. //! - [`exchange`] — `ValPact` / `ValDistributor`: PACT for shuffling. //! - [`arrangement`] — type aliases + `Coltainer` + `TrieChunker` + @@ -36,7 +36,7 @@ pub mod builder; pub mod exchange; pub mod arrangement; -pub use updates::Updates; +pub use updates::UpdatesOwned; pub use builder::ValBuilder as ValColBuilder; pub use exchange::ValPact; pub use arrangement::{ValBatcher, ValBuilder, ValSpine}; @@ -44,12 +44,12 @@ pub use arrangement::{ValBatcher, ValBuilder, ValSpine}; /// Target size for update batches, in number of updates. pub const LINK_TARGET: usize = 64 * 1024; -/// A thin wrapper around `Updates` that tracks the pre-consolidation record count +/// A thin wrapper around `UpdatesOwned` that tracks the pre-consolidation record count /// for timely's exchange accounting. This wrapper is the stream container type; -/// the `TrieChunker` strips it, passing bare `Updates` into the merge batcher. +/// the `TrieChunker` strips it, passing bare `UpdatesOwned` into the merge batcher. pub struct RecordedUpdates { /// The trie of `(key, val, time, diff)` updates. - pub updates: Updates, + pub updates: UpdatesOwned, /// Number of records in `updates` before consolidation. pub records: usize, /// Whether `updates` is known to be sorted and consolidated @@ -83,7 +83,7 @@ mod container_impls { use crate::collection::containers::{Negate, Enter, Leave, ResultsIn}; use super::layout::ColumnarUpdate as Update; - use super::updates::Updates; + use super::updates::UpdatesOwned; use super::RecordedUpdates; impl> Negate for RecordedUpdates { @@ -122,7 +122,7 @@ mod container_impls { // TODO: Assumes Enter (to_inner) is order-preserving on times. RecordedUpdates { consolidated: self.consolidated, - updates: Updates { + updates: UpdatesOwned { keys: self.updates.keys, vals: self.updates.vals, times: super::updates::Lists { values: new_times, bounds: self.updates.times.bounds }, @@ -146,7 +146,7 @@ mod container_impls { // Flatten, convert times, and reconsolidate via consolidate. // Leave can collapse distinct T1 times to the same T2 time, // so the trie must be rebuilt with consolidation. - let mut flat = Updates::<(K, V, T2, R)>::default(); + let mut flat = UpdatesOwned::<(K, V, T2, R)>::default(); let mut t1_owned = T1::default(); for (k, v, t, d) in self.updates.iter() { Columnar::copy_from(&mut t1_owned, t); @@ -166,7 +166,7 @@ mod container_impls { use timely::progress::PathSummary; // Apply results_in to each time; drop updates whose time maps to None. // This must rebuild the trie since some entries may be removed. - let mut output = Updates::::default(); + let mut output = UpdatesOwned::::default(); let mut time_owned = U::Time::default(); for (k, v, t, d) in self.updates.iter() { Columnar::copy_from(&mut time_owned, t); diff --git a/differential-dataflow/src/columnar/updates.rs b/differential-dataflow/src/columnar/updates.rs index dd44107f5..467744f65 100644 --- a/differential-dataflow/src/columnar/updates.rs +++ b/differential-dataflow/src/columnar/updates.rs @@ -1,10 +1,10 @@ //! Trie-structured update storage. //! -//! `Updates` is the core trie: four nested `Lists` (keys, vals, times, diffs). +//! `UpdatesOwned` is the core trie: four nested `Lists` (keys, vals, times, diffs). //! `Consolidating` is a streaming consolidator over sorted `(k,v,t,d)` data. //! `UpdatesBuilder` melds sorted, consolidated chunks into a single trie. //! -//! NOTE: `Updates::iter` / `form` / `form_unsorted` / `consolidate` / `filter_zero` +//! NOTE: `UpdatesOwned::iter` / `form` / `form_unsorted` / `consolidate` / `filter_zero` //! are escape hatches that flatten the trie. Prefer trie-native operations where //! possible — flattening + rebuilding is a significant cost on hot paths. @@ -55,7 +55,7 @@ pub fn retain_items<'a, C: Container>(lists: as Borrow>::Borrowed<'a>, /// one val per key, one time per val, one diff per time). /// A fully consolidated trie has a single outer key list, all lists sorted /// and deduplicated, and singleton diff lists. -pub struct Updates { +pub struct UpdatesOwned { /// Outer key list (one entry per group of keys at the trie root). pub keys: Lists>, /// Per-key list of vals. @@ -66,7 +66,7 @@ pub struct Updates { pub diffs: Lists>, } -impl Default for Updates { +impl Default for UpdatesOwned { fn default() -> Self { Self { keys: Default::default(), @@ -77,13 +77,13 @@ impl Default for Updates { } } -impl std::fmt::Debug for Updates { +impl std::fmt::Debug for UpdatesOwned { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Updates").finish() + f.debug_struct("UpdatesOwned").finish() } } -impl Clone for Updates { +impl Clone for UpdatesOwned { fn clone(&self) -> Self { Self { keys: self.keys.clone(), @@ -94,11 +94,11 @@ impl Clone for Updates { } } -/// Borrowed view of an [`Updates`] with the same four-field shape. +/// Borrowed view of an [`UpdatesOwned`] with the same four-field shape. /// -/// Reader code should consume an `Updates` through this view rather than reading +/// Reader code should consume an `UpdatesOwned` through this view rather than reading /// fields directly. This decouples readers from the storage representation: the -/// view's shape stays the same whether the underlying `Updates` holds owned +/// view's shape stays the same whether the underlying `UpdatesOwned` holds owned /// `Lists` or (later) `Stash`-backed columns that may be borrowed from wire bytes. pub struct UpdatesView<'a, U: Update> { /// Outer key list (one entry per group of keys at the trie root). @@ -111,7 +111,7 @@ pub struct UpdatesView<'a, U: Update> { pub diffs: > as Borrow>::Borrowed<'a>, } -impl Updates { +impl UpdatesOwned { /// Borrow the four columns as a single `UpdatesView`. pub fn view(&self) -> UpdatesView<'_, U> { UpdatesView { @@ -187,7 +187,7 @@ where } } -impl Updates { +impl UpdatesOwned { /// Translate a key-range into the corresponding val-range via `vals.bounds`. pub fn vals_bounds(&self, key_range: std::ops::Range) -> std::ops::Range { @@ -218,14 +218,14 @@ impl Updates { self.diffs.extend_from_self(other.diffs.borrow(), time_range); } - /// Forms a consolidated `Updates` trie from unsorted `(key, val, time, diff)` refs. + /// Forms a consolidated `UpdatesOwned` trie from unsorted `(key, val, time, diff)` refs. pub fn form_unsorted<'a>(unsorted: impl Iterator>>) -> Self { let mut data = unsorted.collect::>(); data.sort(); Self::form(data.into_iter()) } - /// Forms a consolidated `Updates` trie from sorted `(key, val, time, diff)` refs. + /// Forms a consolidated `UpdatesOwned` trie from sorted `(key, val, time, diff)` refs. pub fn form<'a>(sorted: impl Iterator>>) -> Self { // Step 1: Streaming consolidation — accumulate diffs, drop zeros. @@ -300,7 +300,7 @@ impl Updates { let (times, keep) = retain_items(self.times.borrow(), &keep[..]); let (vals, keep) = retain_items(self.vals.borrow(), &keep[..]); let (keys, _keep) = retain_items(self.keys.borrow(), &keep[..]); - Updates { + UpdatesOwned { keys, vals, times, @@ -321,7 +321,7 @@ impl Updates { /// /// Each field is independently typed — columnar refs, `&Owned`, owned values, /// or any other type the column container accepts via its `Push` impl. -impl Push<(KP, VP, TP, DP)> for Updates +impl Push<(KP, VP, TP, DP)> for UpdatesOwned where ContainerOf: Push, ContainerOf: Push, @@ -341,13 +341,13 @@ where } /// PushInto for the `((K, V), T, R)` shape that reduce_trace uses. -impl timely::container::PushInto<((U::Key, U::Val), U::Time, U::Diff)> for Updates { +impl timely::container::PushInto<((U::Key, U::Val), U::Time, U::Diff)> for UpdatesOwned { fn push_into(&mut self, ((key, val), time, diff): ((U::Key, U::Val), U::Time, U::Diff)) { self.push((&key, &val, &time, &diff)); } } -impl Updates { +impl UpdatesOwned { /// Iterate all `(key, val, time, diff)` entries as refs. pub fn iter(&self) -> impl Iterator Updates { } } -impl timely::Accountable for Updates { +impl timely::Accountable for UpdatesOwned { #[inline] fn record_count(&self) -> i64 { Len::len(&self.diffs.values) as i64 } } -impl timely::dataflow::channels::ContainerBytes for Updates { +impl timely::dataflow::channels::ContainerBytes for UpdatesOwned { fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } fn length_in_bytes(&self) -> usize { unimplemented!() } fn into_bytes(&self, _writer: &mut W) { unimplemented!() } } -/// An incremental trie builder that accepts sorted, consolidated `Updates` chunks -/// and melds them into a single `Updates` trie. +/// An incremental trie builder that accepts sorted, consolidated `UpdatesOwned` chunks +/// and melds them into a single `UpdatesOwned` trie. /// -/// The internal `Updates` has open (unsealed) bounds at the keys, vals, and times +/// The internal `UpdatesOwned` has open (unsealed) bounds at the keys, vals, and times /// levels — the last group at each level has its values pushed but no corresponding /// bounds entry. `diffs.bounds` is always 1:1 with `times.values`. /// -/// `meld` accepts a consolidated `Updates` whose first `(key, val, time)` is +/// `meld` accepts a consolidated `UpdatesOwned` whose first `(key, val, time)` is /// strictly greater than the builder's last `(key, val, time)`. The key and val /// may equal the builder's current open key/val, as long as the time is greater. /// -/// `done` seals all open bounds and returns the completed `Updates`. +/// `done` seals all open bounds and returns the completed `UpdatesOwned`. pub struct UpdatesBuilder { /// Non-empty, consolidated updates. - updates: Updates, + updates: UpdatesOwned, } impl UpdatesBuilder { @@ -411,7 +411,7 @@ impl UpdatesBuilder { /// Unseals the last group at keys, vals, and times levels so that /// subsequent `meld` calls can extend the open groups. /// If the updates are not consolidated none of this works. - pub fn new_from(mut updates: Updates) -> Self { + pub fn new_from(mut updates: UpdatesOwned) -> Self { use columnar::Len; if Len::len(&updates.keys.values) > 0 { updates.keys.bounds.pop(); @@ -421,13 +421,13 @@ impl UpdatesBuilder { Self { updates } } - /// Meld a sorted, consolidated `Updates` chunk into this builder. + /// Meld a sorted, consolidated `UpdatesOwned` chunk into this builder. /// /// The chunk's first `(key, val, time)` must be strictly greater than /// the builder's last `(key, val, time)`. Keys and vals may overlap /// (continue the current group), but times must be strictly increasing /// within the same `(key, val)`. - pub fn meld(&mut self, chunk: &Updates) { + pub fn meld(&mut self, chunk: &UpdatesOwned) { use columnar::{Borrow, Index, Len}; if chunk.len() == 0 { return; } @@ -546,8 +546,8 @@ impl UpdatesBuilder { self.updates.diffs.extend_from_self(chunk.diffs.borrow(), 0..chunk_num_times); } - /// Seal all open bounds and return the completed `Updates`. - pub fn done(mut self) -> Updates { + /// Seal all open bounds and return the completed `UpdatesOwned`. + pub fn done(mut self) -> UpdatesOwned { use columnar::Len; if Len::len(&self.updates.keys.values) > 0 { // Seal the open time group. @@ -568,13 +568,13 @@ mod tests { type TestUpdate = (u64, u64, u64, i64); - fn collect(updates: &Updates) -> Vec<(u64, u64, u64, i64)> { + fn collect(updates: &UpdatesOwned) -> Vec<(u64, u64, u64, i64)> { updates.iter().map(|(k, v, t, d)| (*k, *v, *t, *d)).collect() } #[test] fn test_push_and_consolidate_basic() { - let mut updates = Updates::::default(); + let mut updates = UpdatesOwned::::default(); updates.push((&1, &10, &100, &1)); updates.push((&1, &10, &100, &2)); updates.push((&2, &20, &200, &5)); @@ -584,7 +584,7 @@ mod tests { #[test] fn test_cancellation() { - let mut updates = Updates::::default(); + let mut updates = UpdatesOwned::::default(); updates.push((&1, &10, &100, &3)); updates.push((&1, &10, &100, &-3)); updates.push((&2, &20, &200, &1)); @@ -593,7 +593,7 @@ mod tests { #[test] fn test_multiple_vals_and_times() { - let mut updates = Updates::::default(); + let mut updates = UpdatesOwned::::default(); updates.push((&1, &10, &100, &1)); updates.push((&1, &10, &200, &2)); updates.push((&1, &20, &100, &3)); @@ -603,7 +603,7 @@ mod tests { #[test] fn test_val_cancellation_propagates() { - let mut updates = Updates::::default(); + let mut updates = UpdatesOwned::::default(); updates.push((&1, &10, &100, &5)); updates.push((&1, &10, &100, &-5)); updates.push((&1, &20, &100, &1)); @@ -612,13 +612,13 @@ mod tests { #[test] fn test_empty() { - let updates = Updates::::default(); + let updates = UpdatesOwned::::default(); assert_eq!(collect(&updates.consolidate()), vec![]); } #[test] fn test_total_cancellation() { - let mut updates = Updates::::default(); + let mut updates = UpdatesOwned::::default(); updates.push((&1, &10, &100, &1)); updates.push((&1, &10, &100, &-1)); assert_eq!(collect(&updates.consolidate()), vec![]); @@ -626,7 +626,7 @@ mod tests { #[test] fn test_unsorted_input() { - let mut updates = Updates::::default(); + let mut updates = UpdatesOwned::::default(); updates.push((&3, &30, &300, &1)); updates.push((&1, &10, &100, &2)); updates.push((&2, &20, &200, &3)); @@ -635,7 +635,7 @@ mod tests { #[test] fn test_first_key_cancels() { - let mut updates = Updates::::default(); + let mut updates = UpdatesOwned::::default(); updates.push((&1, &10, &100, &5)); updates.push((&1, &10, &100, &-5)); updates.push((&2, &20, &200, &3)); @@ -644,7 +644,7 @@ mod tests { #[test] fn test_middle_time_cancels() { - let mut updates = Updates::::default(); + let mut updates = UpdatesOwned::::default(); updates.push((&1, &10, &100, &1)); updates.push((&1, &10, &200, &2)); updates.push((&1, &10, &200, &-2)); @@ -654,7 +654,7 @@ mod tests { #[test] fn test_first_val_cancels() { - let mut updates = Updates::::default(); + let mut updates = UpdatesOwned::::default(); updates.push((&1, &10, &100, &1)); updates.push((&1, &10, &100, &-1)); updates.push((&1, &20, &100, &5)); @@ -663,7 +663,7 @@ mod tests { #[test] fn test_interleaved_cancellations() { - let mut updates = Updates::::default(); + let mut updates = UpdatesOwned::::default(); updates.push((&1, &10, &100, &1)); updates.push((&1, &10, &100, &-1)); updates.push((&2, &20, &200, &7)); From 1d2f6fe6d35c9fdd677c2bc56ab20bdecdcabd01 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 29 Apr 2026 17:39:16 -0400 Subject: [PATCH 04/10] Re-introduce Updates as Stash-backed fields that can be serialized --- differential-dataflow/src/columnar/updates.rs | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/differential-dataflow/src/columnar/updates.rs b/differential-dataflow/src/columnar/updates.rs index 467744f65..ad4b31cab 100644 --- a/differential-dataflow/src/columnar/updates.rs +++ b/differential-dataflow/src/columnar/updates.rs @@ -123,6 +123,59 @@ impl UpdatesOwned { } } +/// `Stash`-backed update storage: each column may be typed (writable) or +/// borrowed from wire bytes (read-only, zero-copy). +/// +/// Construction sites work in [`UpdatesOwned`]; convert via `From` at the +/// boundary. Reader code uses [`UpdatesView`] via [`Updates::view`], which +/// produces the same shape regardless of whether the columns are typed or +/// borrowed. +pub struct Updates { + /// Outer key list (one entry per group of keys at the trie root). + pub keys: columnar::bytes::stash::Stash>, B>, + /// Per-key list of vals. + pub vals: columnar::bytes::stash::Stash>, B>, + /// Per-val list of times. + pub times: columnar::bytes::stash::Stash>, B>, + /// Per-time list of diffs. + pub diffs: columnar::bytes::stash::Stash>, B>, +} + +impl Default for Updates { + fn default() -> Self { + Self { + keys: Default::default(), + vals: Default::default(), + times: Default::default(), + diffs: Default::default(), + } + } +} + +impl From> for Updates { + fn from(owned: UpdatesOwned) -> Self { + use columnar::bytes::stash::Stash; + Self { + keys: Stash::Typed(owned.keys), + vals: Stash::Typed(owned.vals), + times: Stash::Typed(owned.times), + diffs: Stash::Typed(owned.diffs), + } + } +} + +impl + Clone + 'static> Updates { + /// Borrow the four columns as a single `UpdatesView`. + pub fn view(&self) -> UpdatesView<'_, U> { + UpdatesView { + keys: self.keys.borrow(), + vals: self.vals.borrow(), + times: self.times.borrow(), + diffs: self.diffs.borrow(), + } + } +} + /// The flat `(key, val, time, diff)` tuple for an [`Update`]. pub type Tuple = (::Key, ::Val, ::Time, ::Diff); From 659be49ac77eb745b18d6aeb21bbd7d8b1ea781c Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 29 Apr 2026 19:49:04 -0400 Subject: [PATCH 05/10] Pivot RecordedUpdates to Stash-backed Updates --- .../src/columnar/arrangement/mod.rs | 2 +- differential-dataflow/src/columnar/builder.rs | 4 +- .../src/columnar/exchange.rs | 9 +- differential-dataflow/src/columnar/mod.rs | 100 ++++++++++++------ differential-dataflow/src/columnar/updates.rs | 36 +++++++ 5 files changed, 110 insertions(+), 41 deletions(-) diff --git a/differential-dataflow/src/columnar/arrangement/mod.rs b/differential-dataflow/src/columnar/arrangement/mod.rs index d983c9f12..ff2b48e0e 100644 --- a/differential-dataflow/src/columnar/arrangement/mod.rs +++ b/differential-dataflow/src/columnar/arrangement/mod.rs @@ -194,7 +194,7 @@ impl<'a, U: super::layout::ColumnarUpdate> timely::container::PushInto<&'a mut R // Into if small enough, as we can further consolidate, but if not we need to // consolidate and then either ship (if large) or hold (if small) the results. - let updates = std::mem::take(&mut container.updates); + let updates = std::mem::take(&mut container.updates).into_owned(); let consolidated = container.consolidated; let len = updates.len(); diff --git a/differential-dataflow/src/columnar/builder.rs b/differential-dataflow/src/columnar/builder.rs index c05e81bb4..548aeec02 100644 --- a/differential-dataflow/src/columnar/builder.rs +++ b/differential-dataflow/src/columnar/builder.rs @@ -33,7 +33,7 @@ impl PushInto for ValBuilder where TupleContainer : Push< let records = self.current.len(); let mut refs = self.current.borrow().into_index_iter().collect::>(); refs.sort(); - let updates = UpdatesOwned::form(refs.into_iter()); + let updates = UpdatesOwned::form(refs.into_iter()).into(); self.pending.push_back(RecordedUpdates { updates, records, consolidated: true }); self.current.clear(); } @@ -71,7 +71,7 @@ impl ContainerBuilder for ValBuilder { let records = self.current.len(); let mut refs = self.current.borrow().into_index_iter().collect::>(); refs.sort(); - let updates = UpdatesOwned::form(refs.into_iter()); + let updates = UpdatesOwned::form(refs.into_iter()).into(); self.pending.push_back(RecordedUpdates { updates, records, consolidated: true }); self.current.clear(); } diff --git a/differential-dataflow/src/columnar/exchange.rs b/differential-dataflow/src/columnar/exchange.rs index 861241aaa..9a54c963e 100644 --- a/differential-dataflow/src/columnar/exchange.rs +++ b/differential-dataflow/src/columnar/exchange.rs @@ -32,7 +32,8 @@ impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor>>>(&mut self, container: &mut RecordedUpdates, time: &T, pushers: &mut [P]) { use super::updates::child_range; - let view = container.updates.view(); + let owned = std::mem::take(&mut container.updates).into_owned(); + let view = owned.view(); let keys_b = view.keys; let mut outputs: Vec> = (0..pushers.len()).map(|_| UpdatesOwned::default()).collect(); @@ -46,7 +47,7 @@ impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor { /// The trie of `(key, val, time, diff)` updates. - pub updates: UpdatesOwned, + pub updates: updates::Updates, /// Number of records in `updates` before consolidation. pub records: usize, /// Whether `updates` is known to be sorted and consolidated @@ -77,7 +77,7 @@ impl timely::dataflow::channels::ContainerBytes for R // Container trait impls for RecordedUpdates, enabling iterative scopes. mod container_impls { - use columnar::{Borrow, Columnar, Index, Len, Push}; + use columnar::{Columnar, Index, Len, Push}; use timely::progress::{Timestamp, timestamp::Refines}; use crate::difference::Abelian; use crate::collection::containers::{Negate, Enter, Leave, ResultsIn}; @@ -87,17 +87,21 @@ mod container_impls { use super::RecordedUpdates; impl> Negate for RecordedUpdates { - fn negate(mut self) -> Self { - let len = self.updates.diffs.values.len(); - let mut new_diffs = <::Container as Default>::default(); + fn negate(self) -> Self { + use columnar::Container; + let RecordedUpdates { mut updates, records, consolidated } = self; + let view = updates.view(); + let old_diffs = view.diffs.values; + let mut new_diffs = <::Container as Container>::with_capacity_for([old_diffs].into_iter()); let mut owned = U::Diff::default(); - for i in 0..len { - columnar::Columnar::copy_from(&mut owned, self.updates.diffs.values.borrow().get(i)); + for i in 0..old_diffs.len() { + columnar::Columnar::copy_from(&mut owned, old_diffs.get(i)); owned.negate(); new_diffs.push(&owned); } - self.updates.diffs.values = new_diffs; - self + // TODO: avoid make_typed() call as we are overwriting. + updates.diffs.make_typed().values = new_diffs; + RecordedUpdates { updates, records, consolidated } } } @@ -111,24 +115,33 @@ mod container_impls { { type InnerContainer = RecordedUpdates<(K, V, T2, R)>; fn enter(self) -> Self::InnerContainer { - // Rebuild the time column; everything else moves as-is. + // Rebuild the time column from a borrowed view; keys/vals/diffs + // move untouched, preserving any Stash::Bytes backing. + use columnar::bytes::stash::Stash; + let RecordedUpdates { updates, records, consolidated } = self; + let times = updates.times.borrow(); + let times_values = times.values; let mut new_times = <::Container as Default>::default(); let mut t1_owned = T1::default(); - for i in 0..self.updates.times.values.len() { - Columnar::copy_from(&mut t1_owned, self.updates.times.values.borrow().get(i)); + for i in 0..times_values.len() { + Columnar::copy_from(&mut t1_owned, times_values.get(i)); let t2 = T2::to_inner(t1_owned.clone()); new_times.push(&t2); } // TODO: Assumes Enter (to_inner) is order-preserving on times. + // Deconstruct `updates` to reform with same parts but different time type. + let super::updates::Updates { keys, vals, mut times, diffs } = updates; + // TODO: Avoid make_typed() call, as we are overwriting. + times.make_typed(); + let Stash::Typed(times_lists) = times else { unreachable!() }; + let times = Stash::Typed(super::updates::Lists { + values: new_times, + bounds: times_lists.bounds, + }); RecordedUpdates { - consolidated: self.consolidated, - updates: UpdatesOwned { - keys: self.updates.keys, - vals: self.updates.vals, - times: super::updates::Lists { values: new_times, bounds: self.updates.times.bounds }, - diffs: self.updates.diffs, - }, - records: self.records, + updates: super::updates::Updates { keys, vals, times, diffs }, + records, + consolidated, } } } @@ -143,19 +156,33 @@ mod container_impls { { type OuterContainer = RecordedUpdates<(K, V, T2, R)>; fn leave(self) -> Self::OuterContainer { - // Flatten, convert times, and reconsolidate via consolidate. - // Leave can collapse distinct T1 times to the same T2 time, - // so the trie must be rebuilt with consolidation. - let mut flat = UpdatesOwned::<(K, V, T2, R)>::default(); + // Rebuild the time column from a borrowed view; keys/vals/diffs + // move untouched. Distinct T1 times can collapse to the same T2 + // time, so the result is consolidated. + use columnar::bytes::stash::Stash; + let RecordedUpdates { updates, records, consolidated: _ } = self; + let times = updates.times.borrow(); + let times_values = times.values; + let mut new_times = <::Container as Default>::default(); let mut t1_owned = T1::default(); - for (k, v, t, d) in self.updates.iter() { - Columnar::copy_from(&mut t1_owned, t); + for i in 0..times_values.len() { + Columnar::copy_from(&mut t1_owned, times_values.get(i)); let t2: T2 = t1_owned.clone().to_outer(); - flat.push((k, v, &t2, d)); + new_times.push(&t2); } + let super::updates::Updates { keys, vals, mut times, diffs } = updates; + // Extract `times` bounds via make_typed (one-column copy if Bytes-backed). + times.make_typed(); + let Stash::Typed(times_lists) = times else { unreachable!() }; + let times = Stash::Typed(super::updates::Lists { + values: new_times, + bounds: times_lists.bounds, + }); + let mid = super::updates::Updates { keys, vals, times, diffs }; + // Collapse adjacent (k,v,t2) duplicates created by `to_outer`. RecordedUpdates { - updates: flat.consolidate(), - records: self.records, + updates: mid.into_owned().consolidate().into(), + records, consolidated: true, } } @@ -166,9 +193,12 @@ mod container_impls { use timely::progress::PathSummary; // Apply results_in to each time; drop updates whose time maps to None. // This must rebuild the trie since some entries may be removed. + let RecordedUpdates { updates, records, consolidated: _ } = self; + let owned = updates.into_owned(); let mut output = UpdatesOwned::::default(); let mut time_owned = U::Time::default(); - for (k, v, t, d) in self.updates.iter() { + // TODO: Build all times first, and if no `None` outputs, can re-use k, v, d. + for (k, v, t, d) in owned.iter() { Columnar::copy_from(&mut time_owned, t); if let Some(new_time) = step.results_in(&time_owned) { output.push((k, v, &new_time, d)); @@ -176,7 +206,7 @@ mod container_impls { } // TODO: Time advancement may not be order preserving, but .. it could be. // TODO: Before this is consolidated the above would need to be `form`ed. - RecordedUpdates { updates: output, records: self.records, consolidated: false } + RecordedUpdates { updates: output.into(), records, consolidated: false } } } } @@ -215,7 +245,8 @@ where let mut d1o = U::Diff::default(); input.for_each(|time, data| { let mut session = output.session_with_builder(&time); - for (k1, v1, t1, d1) in data.updates.iter() { + let owned = std::mem::take(&mut data.updates).into_owned(); + for (k1, v1, t1, d1) in owned.iter() { Columnar::copy_from(&mut t1o, t1); Columnar::copy_from(&mut d1o, d1); for (k2, v2, t2, d2) in logic(k1, v1, t1, d1) { @@ -299,7 +330,8 @@ where // TODO: The input trie is already sorted; a streaming form // that accepts pre-sorted, potentially-collapsing timestamps // could avoid the re-sort inside the builder. - for (k, v, t, d) in data.updates.iter() { + let owned = std::mem::take(&mut data.updates).into_owned(); + for (k, v, t, d) in owned.iter() { Columnar::copy_from(&mut time, t); let mut inner_vec = std::mem::take(&mut time.inner).into_inner(); inner_vec.truncate(level - 1); diff --git a/differential-dataflow/src/columnar/updates.rs b/differential-dataflow/src/columnar/updates.rs index ad4b31cab..db41b0de9 100644 --- a/differential-dataflow/src/columnar/updates.rs +++ b/differential-dataflow/src/columnar/updates.rs @@ -152,6 +152,17 @@ impl Default for Updates { } } +impl Clone for Updates { + fn clone(&self) -> Self { + Self { + keys: self.keys.clone(), + vals: self.vals.clone(), + times: self.times.clone(), + diffs: self.diffs.clone(), + } + } +} + impl From> for Updates { fn from(owned: UpdatesOwned) -> Self { use columnar::bytes::stash::Stash; @@ -174,6 +185,31 @@ impl + Clone + 'static> Updates usize { + self.view().diffs.values.len() + } + + /// Whether the trie is empty. + pub fn is_empty(&self) -> bool { self.len() == 0 } + + /// Convert to fully owned form, copying any `Stash::Bytes` columns into + /// typed `Lists`. Already-typed columns pass through with no copy. + /// + /// This method should be avoided unless typed containers are truly needed. + pub fn into_owned(mut self) -> UpdatesOwned { + use columnar::bytes::stash::Stash; + self.keys.make_typed(); + self.vals.make_typed(); + self.times.make_typed(); + self.diffs.make_typed(); + let Stash::Typed(keys) = self.keys else { unreachable!() }; + let Stash::Typed(vals) = self.vals else { unreachable!() }; + let Stash::Typed(times) = self.times else { unreachable!() }; + let Stash::Typed(diffs) = self.diffs else { unreachable!() }; + UpdatesOwned { keys, vals, times, diffs } + } } /// The flat `(key, val, time, diff)` tuple for an [`Update`]. From 6c3fb0694e6aa26040d5e59100b7bf0d2510f8fe Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 29 Apr 2026 19:57:35 -0400 Subject: [PATCH 06/10] Migrate read uses to UpdatesView --- .../src/columnar/exchange.rs | 7 ++- differential-dataflow/src/columnar/updates.rs | 53 ++++++++++--------- 2 files changed, 32 insertions(+), 28 deletions(-) diff --git a/differential-dataflow/src/columnar/exchange.rs b/differential-dataflow/src/columnar/exchange.rs index 9a54c963e..6b65a7450 100644 --- a/differential-dataflow/src/columnar/exchange.rs +++ b/differential-dataflow/src/columnar/exchange.rs @@ -32,8 +32,7 @@ impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor>>>(&mut self, container: &mut RecordedUpdates, time: &T, pushers: &mut [P]) { use super::updates::child_range; - let owned = std::mem::take(&mut container.updates).into_owned(); - let view = owned.view(); + let view = container.updates.view(); let keys_b = view.keys; let mut outputs: Vec> = (0..pushers.len()).map(|_| UpdatesOwned::default()).collect(); @@ -47,7 +46,7 @@ impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor { pub diffs: > as Borrow>::Borrowed<'a>, } +impl<'a, U: Update> Copy for UpdatesView<'a, U> {} +impl<'a, U: Update> Clone for UpdatesView<'a, U> { fn clone(&self) -> Self { *self } } + +impl<'a, U: Update> UpdatesView<'a, U> { + /// Translate a key-range into the corresponding val-range via `vals.bounds`. + pub fn vals_bounds(self, key_range: std::ops::Range) -> std::ops::Range { + if !key_range.is_empty() { + let bounds = self.vals.bounds; + let lower = if key_range.start == 0 { 0 } else { bounds.index_as(key_range.start - 1) as usize }; + let upper = bounds.index_as(key_range.end - 1) as usize; + lower..upper + } else { key_range } + } + /// Translate a val-range into the corresponding time-range via `times.bounds`. + pub fn times_bounds(self, val_range: std::ops::Range) -> std::ops::Range { + if !val_range.is_empty() { + let bounds = self.times.bounds; + let lower = if val_range.start == 0 { 0 } else { bounds.index_as(val_range.start - 1) as usize }; + let upper = bounds.index_as(val_range.end - 1) as usize; + lower..upper + } else { val_range } + } +} + impl UpdatesOwned { /// Borrow the four columns as a single `UpdatesView`. pub fn view(&self) -> UpdatesView<'_, U> { @@ -278,33 +302,14 @@ where impl UpdatesOwned { - /// Translate a key-range into the corresponding val-range via `vals.bounds`. - pub fn vals_bounds(&self, key_range: std::ops::Range) -> std::ops::Range { - if !key_range.is_empty() { - let bounds = self.vals.bounds.borrow(); - let lower = if key_range.start == 0 { 0 } else { bounds.index_as(key_range.start - 1) as usize }; - let upper = bounds.index_as(key_range.end - 1) as usize; - lower..upper - } else { key_range } - } - /// Translate a val-range into the corresponding time-range via `times.bounds`. - pub fn times_bounds(&self, val_range: std::ops::Range) -> std::ops::Range { - if !val_range.is_empty() { - let bounds = self.times.bounds.borrow(); - let lower = if val_range.start == 0 { 0 } else { bounds.index_as(val_range.start - 1) as usize }; - let upper = bounds.index_as(val_range.end - 1) as usize; - lower..upper - } else { val_range } - } - /// Copies `other[key_range]` into self, keys and all. - pub fn extend_from_keys(&mut self, other: &Self, key_range: std::ops::Range) { - self.keys.values.extend_from_self(other.keys.values.borrow(), key_range.clone()); - self.vals.extend_from_self(other.vals.borrow(), key_range.clone()); + pub fn extend_from_keys(&mut self, other: UpdatesView<'_, U>, key_range: std::ops::Range) { + self.keys.values.extend_from_self(other.keys.values, key_range.clone()); + self.vals.extend_from_self(other.vals, key_range.clone()); let val_range = other.vals_bounds(key_range); - self.times.extend_from_self(other.times.borrow(), val_range.clone()); + self.times.extend_from_self(other.times, val_range.clone()); let time_range = other.times_bounds(val_range); - self.diffs.extend_from_self(other.diffs.borrow(), time_range); + self.diffs.extend_from_self(other.diffs, time_range); } /// Forms a consolidated `UpdatesOwned` trie from unsorted `(key, val, time, diff)` refs. From 7c0327c00aa7a42a2f678316589bf99c43988926 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 29 Apr 2026 20:06:10 -0400 Subject: [PATCH 07/10] Move UpdatesOwned::iter() to UpdatesView::iter() --- differential-dataflow/src/columnar/mod.rs | 12 ++--- differential-dataflow/src/columnar/updates.rs | 44 +++++++++++-------- 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/differential-dataflow/src/columnar/mod.rs b/differential-dataflow/src/columnar/mod.rs index 15691351c..a10356313 100644 --- a/differential-dataflow/src/columnar/mod.rs +++ b/differential-dataflow/src/columnar/mod.rs @@ -193,12 +193,10 @@ mod container_impls { use timely::progress::PathSummary; // Apply results_in to each time; drop updates whose time maps to None. // This must rebuild the trie since some entries may be removed. - let RecordedUpdates { updates, records, consolidated: _ } = self; - let owned = updates.into_owned(); let mut output = UpdatesOwned::::default(); let mut time_owned = U::Time::default(); // TODO: Build all times first, and if no `None` outputs, can re-use k, v, d. - for (k, v, t, d) in owned.iter() { + for (k, v, t, d) in self.updates.view().iter() { Columnar::copy_from(&mut time_owned, t); if let Some(new_time) = step.results_in(&time_owned) { output.push((k, v, &new_time, d)); @@ -206,7 +204,7 @@ mod container_impls { } // TODO: Time advancement may not be order preserving, but .. it could be. // TODO: Before this is consolidated the above would need to be `form`ed. - RecordedUpdates { updates: output.into(), records, consolidated: false } + RecordedUpdates { updates: output.into(), records: self.records, consolidated: false } } } } @@ -245,8 +243,7 @@ where let mut d1o = U::Diff::default(); input.for_each(|time, data| { let mut session = output.session_with_builder(&time); - let owned = std::mem::take(&mut data.updates).into_owned(); - for (k1, v1, t1, d1) in owned.iter() { + for (k1, v1, t1, d1) in data.updates.view().iter() { Columnar::copy_from(&mut t1o, t1); Columnar::copy_from(&mut d1o, d1); for (k2, v2, t2, d2) in logic(k1, v1, t1, d1) { @@ -330,8 +327,7 @@ where // TODO: The input trie is already sorted; a streaming form // that accepts pre-sorted, potentially-collapsing timestamps // could avoid the re-sort inside the builder. - let owned = std::mem::take(&mut data.updates).into_owned(); - for (k, v, t, d) in owned.iter() { + for (k, v, t, d) in data.updates.view().iter() { Columnar::copy_from(&mut time, t); let mut inner_vec = std::mem::take(&mut time.inner).into_inner(); inner_vec.truncate(level - 1); diff --git a/differential-dataflow/src/columnar/updates.rs b/differential-dataflow/src/columnar/updates.rs index 018d36d92..60106e2bf 100644 --- a/differential-dataflow/src/columnar/updates.rs +++ b/differential-dataflow/src/columnar/updates.rs @@ -115,6 +115,30 @@ impl<'a, U: Update> Copy for UpdatesView<'a, U> {} impl<'a, U: Update> Clone for UpdatesView<'a, U> { fn clone(&self) -> Self { *self } } impl<'a, U: Update> UpdatesView<'a, U> { + /// Iterate all `(key, val, time, diff)` entries as refs. + pub fn iter(self) -> impl Iterator, + columnar::Ref<'a, U::Val>, + columnar::Ref<'a, U::Time>, + columnar::Ref<'a, U::Diff>, + )> { + let UpdatesView { keys, vals, times, diffs } = self; + (0..Len::len(&keys)) + .flat_map(move |outer| child_range(keys.bounds, outer)) + .flat_map(move |k| { + let key = keys.values.get(k); + child_range(vals.bounds, k).map(move |v| (key, v)) + }) + .flat_map(move |(key, v)| { + let val = vals.values.get(v); + child_range(times.bounds, v).map(move |t| (key, val, t)) + }) + .flat_map(move |(key, val, t)| { + let time = times.values.get(t); + child_range(diffs.bounds, t).map(move |d| (key, val, time, diffs.values.get(d))) + }) + } + /// Translate a key-range into the corresponding val-range via `vals.bounds`. pub fn vals_bounds(self, key_range: std::ops::Range) -> std::ops::Range { if !key_range.is_empty() { @@ -450,25 +474,7 @@ impl UpdatesOwned { columnar::Ref<'_, U::Time>, columnar::Ref<'_, U::Diff>, )> { - let keys_b = self.keys.borrow(); - let vals_b = self.vals.borrow(); - let times_b = self.times.borrow(); - let diffs_b = self.diffs.borrow(); - - (0..Len::len(&keys_b)) - .flat_map(move |outer| child_range(keys_b.bounds, outer)) - .flat_map(move |k| { - let key = keys_b.values.get(k); - child_range(vals_b.bounds, k).map(move |v| (key, v)) - }) - .flat_map(move |(key, v)| { - let val = vals_b.values.get(v); - child_range(times_b.bounds, v).map(move |t| (key, val, t)) - }) - .flat_map(move |(key, val, t)| { - let time = times_b.values.get(t); - child_range(diffs_b.bounds, t).map(move |d| (key, val, time, diffs_b.values.get(d))) - }) + self.view().iter() } } From a2035991850a12aabe04b734dfc6aec09a971cbd Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 29 Apr 2026 20:12:37 -0400 Subject: [PATCH 08/10] Rename UpdatesOwned to UpdatesTyped --- .../examples/columnar/main.rs | 4 +- .../src/columnar/arrangement/mod.rs | 44 ++++----- .../src/columnar/arrangement/trie_merger.rs | 68 ++++++------- differential-dataflow/src/columnar/builder.rs | 6 +- .../src/columnar/exchange.rs | 6 +- differential-dataflow/src/columnar/mod.rs | 14 +-- differential-dataflow/src/columnar/updates.rs | 96 +++++++++---------- 7 files changed, 119 insertions(+), 119 deletions(-) diff --git a/differential-dataflow/examples/columnar/main.rs b/differential-dataflow/examples/columnar/main.rs index fdfe49c06..582e5ce98 100644 --- a/differential-dataflow/examples/columnar/main.rs +++ b/differential-dataflow/examples/columnar/main.rs @@ -2,7 +2,7 @@ //! //! Demonstrates columnar-backed arrangements in an iterative scope, //! exercising Enter, Leave, Negate, ResultsIn on RecordedUpdates, -//! and Push on UpdatesOwned for the reduce builder path. +//! and Push on UpdatesTyped for the reduce builder path. use timely::container::{ContainerBuilder, PushInto}; use timely::dataflow::InputHandle; @@ -87,7 +87,7 @@ fn main() { /// /// This module exercises the container traits needed for iterative columnar /// computation: Enter, Leave, Negate, ResultsIn on RecordedUpdates, and -/// Push on UpdatesOwned for the reduce builder path. +/// Push on UpdatesTyped for the reduce builder path. mod reachability { use timely::order::Product; diff --git a/differential-dataflow/src/columnar/arrangement/mod.rs b/differential-dataflow/src/columnar/arrangement/mod.rs index ff2b48e0e..d998d37a6 100644 --- a/differential-dataflow/src/columnar/arrangement/mod.rs +++ b/differential-dataflow/src/columnar/arrangement/mod.rs @@ -3,8 +3,8 @@ //! - Type aliases (`ValSpine`, `ValBatcher`, `ValBuilder`) glue columnar storage //! into DD's trace machinery. //! - `Coltainer` wraps a columnar `C::Container` as a DD `BatchContainer`. -//! - `TrieChunker` strips `RecordedUpdates` down to `UpdatesOwned` for the merge batcher. -//! - `batcher` contains required trait stubs for `UpdatesOwned`. +//! - `TrieChunker` strips `RecordedUpdates` down to `UpdatesTyped` for the merge batcher. +//! - `batcher` contains required trait stubs for `UpdatesTyped`. //! - `trie_merger` is the batch-at-a-time merging logic. //! - `builder::ValMirror` is the `trace::Builder` that seals melded chunks into //! an `OrdValBatch`. @@ -122,12 +122,12 @@ pub mod batch_container { } } -use super::updates::UpdatesOwned; +use super::updates::UpdatesTyped; use super::RecordedUpdates; use crate::trace::implementations::merge_batcher::MergeBatcher; type ValBatcher2 = MergeBatcher, TrieChunker, trie_merger::TrieMerger>; -/// A chunker that unwraps `RecordedUpdates` into bare `UpdatesOwned` for the merge batcher. +/// A chunker that unwraps `RecordedUpdates` into bare `UpdatesTyped` for the merge batcher. /// /// The intended behavior is to produce chunks whose size is within 1-2x `LINK_TARGET`. /// It ships large batches immediately, accumulates small batches, consolidates as they @@ -137,14 +137,14 @@ type ValBatcher2 = MergeBatcher, TrieChunker, trie_merg /// each of which is put in `self.stage` pub struct TrieChunker { /// Insufficiently large updates we haven't figured out how to ship yet. - blobs: Vec<(UpdatesOwned, bool)>, + blobs: Vec<(UpdatesTyped, bool)>, /// Sum of `len()` across `blobs`. blob_records: usize, /// Ready-to-emit chunks. Each is sorted and consolidated; size ≥ `LINK_TARGET` /// (or smaller, only for the final chunk produced by `finish`). - ready: std::collections::VecDeque>, + ready: std::collections::VecDeque>, /// Staging area for the next pull call. - stage: Option>, + stage: Option>, } impl Default for TrieChunker { @@ -160,7 +160,7 @@ impl Default for TrieChunker { impl TrieChunker { /// Consolidate and empty `self.blobs`, into `self.ready` if large enough or else return. - fn consolidate_blobs(&mut self) -> UpdatesOwned { + fn consolidate_blobs(&mut self) -> UpdatesTyped { // Single consolidated entry: pass through, no work. if self.blobs.len() == 1 && self.blobs[0].1 { let (result, _) = self.blobs.pop().unwrap(); @@ -169,14 +169,14 @@ impl TrieChunker { } // TODO: Improve consolidation through column-oriented sorts. - let result = UpdatesOwned::::form_unsorted(self.blobs.iter().flat_map(|(u, _)| u.iter())); + let result = UpdatesTyped::::form_unsorted(self.blobs.iter().flat_map(|(u, _)| u.iter())); self.blobs.clear(); self.blob_records = 0; result } - /// Push a non-empty `UpdatesOwned` into blobs and update accounting. - fn absorb(&mut self, updates: UpdatesOwned, consolidated: bool) { + /// Push a non-empty `UpdatesTyped` into blobs and update accounting. + fn absorb(&mut self, updates: UpdatesTyped, consolidated: bool) { self.blob_records += updates.len(); self.blobs.push((updates, consolidated)); } @@ -194,7 +194,7 @@ impl<'a, U: super::layout::ColumnarUpdate> timely::container::PushInto<&'a mut R // Into if small enough, as we can further consolidate, but if not we need to // consolidate and then either ship (if large) or hold (if small) the results. - let updates = std::mem::take(&mut container.updates).into_owned(); + let updates = std::mem::take(&mut container.updates).into_typed(); let consolidated = container.consolidated; let len = updates.len(); @@ -237,7 +237,7 @@ impl<'a, U: super::layout::ColumnarUpdate> timely::container::PushInto<&'a mut R } impl timely::container::ContainerBuilder for TrieChunker { - type Container = UpdatesOwned; + type Container = UpdatesTyped; fn extract(&mut self) -> Option<&mut Self::Container> { self.stage = self.ready.pop_front(); self.stage.as_mut() @@ -253,16 +253,16 @@ impl timely::container::ContainerBuilder for T } pub mod batcher { - //! Batcher trait stubs required to plug `UpdatesOwned` into DD's merge batcher. + //! Batcher trait stubs required to plug `UpdatesTyped` into DD's merge batcher. use columnar::Len; use timely::progress::frontier::{Antichain, AntichainRef}; use crate::trace::implementations::merge_batcher::container::InternalMerge; use super::super::layout::ColumnarUpdate as Update; - use super::super::updates::UpdatesOwned; + use super::super::updates::UpdatesTyped; - impl timely::container::SizableContainer for UpdatesOwned { + impl timely::container::SizableContainer for UpdatesTyped { fn at_capacity(&self) -> bool { self.view().diffs.values.len() >= crate::columnar::LINK_TARGET } fn ensure_capacity(&mut self, _stash: &mut Option) { } } @@ -270,7 +270,7 @@ pub mod batcher { /// Required by `reduce_abelian`'s bound `Builder::Input: InternalMerge`. /// Not called at runtime — our batcher uses `TrieMerger` instead. /// TODO: Relax the bound in DD's reduce to remove this requirement. - impl InternalMerge for UpdatesOwned { + impl InternalMerge for UpdatesTyped { type TimeOwned = U::Time; fn len(&self) -> usize { unimplemented!() } fn clear(&mut self) { @@ -298,7 +298,7 @@ pub mod builder { use crate::trace::implementations::ord_neu::val_batch::{OrdValBatch, OrdValStorage}; use crate::trace::Description; - use super::super::updates::UpdatesOwned; + use super::super::updates::UpdatesTyped; use super::super::layout::ColumnarUpdate as Update; use super::super::layout::ColumnarLayout as Layout; use super::Coltainer; @@ -316,14 +316,14 @@ pub mod builder { output } - /// Trace [`Builder`](crate::trace::Builder) that accumulates `UpdatesOwned` + /// Trace [`Builder`](crate::trace::Builder) that accumulates `UpdatesTyped` /// chunks and seals them into a single [`OrdValBatch`]. pub struct ValMirror { - chunks: Vec>, + chunks: Vec>, } impl crate::trace::Builder for ValMirror { type Time = U::Time; - type Input = UpdatesOwned; + type Input = UpdatesTyped; type Output = OrdValBatch>; fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self { @@ -344,7 +344,7 @@ pub mod builder { // Meld sorted, consolidated chain entries in order. // Pre-allocate to avoid reallocations during meld. use columnar::Container; - let mut updates = UpdatesOwned::::default(); + let mut updates = UpdatesTyped::::default(); updates.keys.reserve_for(chain.iter().map(|c| c.view().keys)); updates.vals.reserve_for(chain.iter().map(|c| c.view().vals)); updates.times.reserve_for(chain.iter().map(|c| c.view().times)); diff --git a/differential-dataflow/src/columnar/arrangement/trie_merger.rs b/differential-dataflow/src/columnar/arrangement/trie_merger.rs index 0b761dff2..fb88140ca 100644 --- a/differential-dataflow/src/columnar/arrangement/trie_merger.rs +++ b/differential-dataflow/src/columnar/arrangement/trie_merger.rs @@ -1,4 +1,4 @@ -//! Batch-at-a-time merging of sorted, consolidated `UpdatesOwned` chains. +//! Batch-at-a-time merging of sorted, consolidated `UpdatesTyped` chains. //! //! The core is `TrieMerger::merge_batches`, which walks pairs of chunks via //! `merge_batch`, building a chain of merged outputs with `ChainBuilder`. @@ -11,9 +11,9 @@ use timely::progress::frontier::{Antichain, AntichainRef}; use crate::trace::implementations::merge_batcher::Merger; use super::super::layout::ColumnarUpdate as Update; -use super::super::updates::UpdatesOwned; +use super::super::updates::UpdatesTyped; -/// Merge-batcher merger that melds sorted, consolidated `UpdatesOwned` tries. +/// Merge-batcher merger that melds sorted, consolidated `UpdatesTyped` tries. pub struct TrieMerger { _marker: std::marker::PhantomData, } @@ -54,15 +54,15 @@ where } } -/// Build sorted `UpdatesOwned` chunks from a sorted iterator of refs, -/// using `UpdatesOwned::form` (which consolidates internally) on batches. +/// Build sorted `UpdatesTyped` chunks from a sorted iterator of refs, +/// using `UpdatesTyped::form` (which consolidates internally) on batches. fn form_chunks<'a, U: Update>( sorted: impl Iterator>>, - output: &mut Vec>, + output: &mut Vec>, ) { let mut sorted = sorted.peekable(); while sorted.peek().is_some() { - let chunk = UpdatesOwned::::form((&mut sorted).take(crate::columnar::LINK_TARGET)); + let chunk = UpdatesTyped::::form((&mut sorted).take(crate::columnar::LINK_TARGET)); if chunk.len() > 0 { output.push(chunk); } @@ -73,15 +73,15 @@ impl Merger for TrieMerger where U::Time: 'static, { - type Chunk = UpdatesOwned; + type Chunk = UpdatesTyped; type Time = U::Time; fn merge( &mut self, - list1: Vec>, - list2: Vec>, - output: &mut Vec>, - _stash: &mut Vec>, + list1: Vec>, + list2: Vec>, + output: &mut Vec>, + _stash: &mut Vec>, ) { Self::merge_batches(list1, list2, output, _stash); } @@ -127,7 +127,7 @@ where if *bit { diffs.values.push(d_borrow.values.get(index)); } } diffs.bounds = Strides::new(1, times.values.len() as u64); - kept.push(UpdatesOwned { + kept.push(UpdatesTyped { keys, vals, times, @@ -145,7 +145,7 @@ where if *bit { diffs.values.push(d_borrow.values.get(index)); } } diffs.bounds = Strides::new(1, times.values.len() as u64); - ship.push(UpdatesOwned { + ship.push(UpdatesTyped { keys, vals, times, @@ -169,9 +169,9 @@ where /// Correct but slow — used as fallback. #[allow(dead_code)] fn merge_iterator( - list1: &[UpdatesOwned], - list2: &[UpdatesOwned], - output: &mut Vec>, + list1: &[UpdatesTyped], + list2: &[UpdatesTyped], + output: &mut Vec>, ) { let iter1 = list1.iter().flat_map(|chunk| chunk.iter()); let iter2 = list2.iter().flat_map(|chunk| chunk.iter()); @@ -187,10 +187,10 @@ where /// A merge implementation that operates batch-at-a-time. #[inline(never)] fn merge_batches( - list1: Vec>, - list2: Vec>, - output: &mut Vec>, - stash: &mut Vec>, + list1: Vec>, + list2: Vec>, + output: &mut Vec>, + stash: &mut Vec>, ) { // The design for efficient "batch" merginging of chains of links is: @@ -228,7 +228,7 @@ where // TODO: create batch for the non-empty cursor. if let Some(((k,v,t),batch)) = cursor1 { let mut out_batch = stash.pop().unwrap_or_default(); - let empty: UpdatesOwned = Default::default(); + let empty: UpdatesTyped = Default::default(); let view = batch.view(); write_from_surveys( &batch, @@ -243,7 +243,7 @@ where } if let Some(((k,v,t),batch)) = cursor2 { let mut out_batch = stash.pop().unwrap_or_default(); - let empty: UpdatesOwned = Default::default(); + let empty: UpdatesTyped = Default::default(); let view = batch.view(); write_from_surveys( &empty, @@ -282,10 +282,10 @@ where /// cursors as part of the mapping. #[inline(never)] fn merge_batch( - batch1: &mut Option<((usize, usize, usize), UpdatesOwned)>, - batch2: &mut Option<((usize, usize, usize), UpdatesOwned)>, + batch1: &mut Option<((usize, usize, usize), UpdatesTyped)>, + batch2: &mut Option<((usize, usize, usize), UpdatesTyped)>, builder: &mut ChainBuilder, - stash: &mut Vec>, + stash: &mut Vec>, ) { // TODO: Optimization for one batch exceeding the other. @@ -389,13 +389,13 @@ where /// and times; `write_diffs` handles diff consolidation. #[inline(never)] fn write_from_surveys( - updates0: &UpdatesOwned, - updates1: &UpdatesOwned, + updates0: &UpdatesTyped, + updates1: &UpdatesTyped, root_survey: &[Report], key_survey: &[Report], val_survey: &[Report], time_survey: &[Report], - output: &mut UpdatesOwned, + output: &mut UpdatesTyped, ) { let view0 = updates0.view(); let view1 = updates1.view(); @@ -642,14 +642,14 @@ pub enum Report { Both(usize, usize), } -/// Accumulates a sequence of `UpdatesOwned` chunks, merging the tail when a new +/// Accumulates a sequence of `UpdatesTyped` chunks, merging the tail when a new /// chunk would extend the current run rather than start a new one. -pub struct ChainBuilder { updates: Vec> } +pub struct ChainBuilder { updates: Vec> } impl Default for ChainBuilder { fn default() -> Self { Self { updates: Default::default() } } } impl ChainBuilder { - fn push(&mut self, mut link: UpdatesOwned) { + fn push(&mut self, mut link: UpdatesTyped) { link = link.filter_zero(); if link.len() > 0 { if let Some(last) = self.updates.last_mut() { @@ -664,6 +664,6 @@ impl ChainBuilder { else { self.updates.push(link); } } } - fn extend(&mut self, iter: impl IntoIterator>) { for link in iter { self.push(link); }} - fn done(self) -> Vec> { self.updates } + fn extend(&mut self, iter: impl IntoIterator>) { for link in iter { self.push(link); }} + fn done(self) -> Vec> { self.updates } } diff --git a/differential-dataflow/src/columnar/builder.rs b/differential-dataflow/src/columnar/builder.rs index 548aeec02..9b616e26b 100644 --- a/differential-dataflow/src/columnar/builder.rs +++ b/differential-dataflow/src/columnar/builder.rs @@ -8,7 +8,7 @@ use std::collections::VecDeque; use columnar::{Columnar, Clear, Len, Push}; use super::layout::ColumnarUpdate as Update; -use super::updates::UpdatesOwned; +use super::updates::UpdatesTyped; use super::RecordedUpdates; type TupleContainer = <(::Key, ::Val, ::Time, ::Diff) as Columnar>::Container; @@ -33,7 +33,7 @@ impl PushInto for ValBuilder where TupleContainer : Push< let records = self.current.len(); let mut refs = self.current.borrow().into_index_iter().collect::>(); refs.sort(); - let updates = UpdatesOwned::form(refs.into_iter()).into(); + let updates = UpdatesTyped::form(refs.into_iter()).into(); self.pending.push_back(RecordedUpdates { updates, records, consolidated: true }); self.current.clear(); } @@ -71,7 +71,7 @@ impl ContainerBuilder for ValBuilder { let records = self.current.len(); let mut refs = self.current.borrow().into_index_iter().collect::>(); refs.sort(); - let updates = UpdatesOwned::form(refs.into_iter()).into(); + let updates = UpdatesTyped::form(refs.into_iter()).into(); self.pending.push_back(RecordedUpdates { updates, records, consolidated: true }); self.current.clear(); } diff --git a/differential-dataflow/src/columnar/exchange.rs b/differential-dataflow/src/columnar/exchange.rs index 6b65a7450..6f9fa3b04 100644 --- a/differential-dataflow/src/columnar/exchange.rs +++ b/differential-dataflow/src/columnar/exchange.rs @@ -14,7 +14,7 @@ use timely::progress::Timestamp; use timely::worker::Worker; use super::layout::ColumnarUpdate as Update; -use super::updates::UpdatesOwned; +use super::updates::UpdatesTyped; use super::RecordedUpdates; /// Distributor that routes `RecordedUpdates` records to workers by hashing keys. @@ -25,7 +25,7 @@ pub struct ValDistributor { } impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor> for ValDistributor { - // TODO: For unsorted UpdatesOwned (stride-1 outer keys), each key is its own outer group, + // TODO: For unsorted UpdatesTyped (stride-1 outer keys), each key is its own outer group, // so the per-group pre_lens snapshot and seal check costs O(keys × workers). Should // either batch keys by destination first, or detect stride-1 outer bounds and use a // simpler single-pass partitioning that seals once at the end. @@ -34,7 +34,7 @@ impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor> = (0..pushers.len()).map(|_| UpdatesOwned::default()).collect(); + let mut outputs: Vec> = (0..pushers.len()).map(|_| UpdatesTyped::default()).collect(); // Each outer key group becomes a separate run in the destination. for outer in 0..Len::len(&keys_b) { diff --git a/differential-dataflow/src/columnar/mod.rs b/differential-dataflow/src/columnar/mod.rs index a10356313..f5606634f 100644 --- a/differential-dataflow/src/columnar/mod.rs +++ b/differential-dataflow/src/columnar/mod.rs @@ -4,7 +4,7 @@ //! changes; do not rely on stability across releases. //! //! Known rough edges: -//! - `ContainerBytes` for `RecordedUpdates` and `UpdatesOwned` is `unimplemented!()`; +//! - `ContainerBytes` for `RecordedUpdates` and `UpdatesTyped` is `unimplemented!()`; //! multi-process dataflows that exchange these containers will panic. //! - `leave_dynamic` consolidates eagerly on each batch; the //! [`crate::dynamic`] counterpart defers consolidation. Same observable @@ -20,7 +20,7 @@ //! //! Module layout (bottom-up): //! - [`layout`] — `ColumnarUpdate` / `ColumnarLayout` / `OrdContainer`. -//! - [`updates`] — `UpdatesOwned` trie, `Consolidating`, `UpdatesBuilder`. +//! - [`updates`] — `UpdatesTyped` trie, `Consolidating`, `UpdatesBuilder`. //! - [`builder`] — `ValColBuilder`: the input-side `ContainerBuilder`. //! - [`exchange`] — `ValPact` / `ValDistributor`: PACT for shuffling. //! - [`arrangement`] — type aliases + `Coltainer` + `TrieChunker` + @@ -36,7 +36,7 @@ pub mod builder; pub mod exchange; pub mod arrangement; -pub use updates::UpdatesOwned; +pub use updates::UpdatesTyped; pub use builder::ValBuilder as ValColBuilder; pub use exchange::ValPact; pub use arrangement::{ValBatcher, ValBuilder, ValSpine}; @@ -46,7 +46,7 @@ pub const LINK_TARGET: usize = 64 * 1024; /// A thin wrapper around `Updates` that tracks the pre-consolidation record count /// for timely's exchange accounting. This wrapper is the stream container type; -/// the `TrieChunker` strips it, passing bare `UpdatesOwned` into the merge batcher. +/// the `TrieChunker` strips it, passing bare `UpdatesTyped` into the merge batcher. pub struct RecordedUpdates { /// The trie of `(key, val, time, diff)` updates. pub updates: updates::Updates, @@ -83,7 +83,7 @@ mod container_impls { use crate::collection::containers::{Negate, Enter, Leave, ResultsIn}; use super::layout::ColumnarUpdate as Update; - use super::updates::UpdatesOwned; + use super::updates::UpdatesTyped; use super::RecordedUpdates; impl> Negate for RecordedUpdates { @@ -181,7 +181,7 @@ mod container_impls { let mid = super::updates::Updates { keys, vals, times, diffs }; // Collapse adjacent (k,v,t2) duplicates created by `to_outer`. RecordedUpdates { - updates: mid.into_owned().consolidate().into(), + updates: mid.into_typed().consolidate().into(), records, consolidated: true, } @@ -193,7 +193,7 @@ mod container_impls { use timely::progress::PathSummary; // Apply results_in to each time; drop updates whose time maps to None. // This must rebuild the trie since some entries may be removed. - let mut output = UpdatesOwned::::default(); + let mut output = UpdatesTyped::::default(); let mut time_owned = U::Time::default(); // TODO: Build all times first, and if no `None` outputs, can re-use k, v, d. for (k, v, t, d) in self.updates.view().iter() { diff --git a/differential-dataflow/src/columnar/updates.rs b/differential-dataflow/src/columnar/updates.rs index 60106e2bf..a9d263dd0 100644 --- a/differential-dataflow/src/columnar/updates.rs +++ b/differential-dataflow/src/columnar/updates.rs @@ -1,10 +1,10 @@ //! Trie-structured update storage. //! -//! `UpdatesOwned` is the core trie: four nested `Lists` (keys, vals, times, diffs). +//! `UpdatesTyped` is the core trie: four nested `Lists` (keys, vals, times, diffs). //! `Consolidating` is a streaming consolidator over sorted `(k,v,t,d)` data. //! `UpdatesBuilder` melds sorted, consolidated chunks into a single trie. //! -//! NOTE: `UpdatesOwned::iter` / `form` / `form_unsorted` / `consolidate` / `filter_zero` +//! NOTE: `UpdatesTyped::iter` / `form` / `form_unsorted` / `consolidate` / `filter_zero` //! are escape hatches that flatten the trie. Prefer trie-native operations where //! possible — flattening + rebuilding is a significant cost on hot paths. @@ -55,7 +55,7 @@ pub fn retain_items<'a, C: Container>(lists: as Borrow>::Borrowed<'a>, /// one val per key, one time per val, one diff per time). /// A fully consolidated trie has a single outer key list, all lists sorted /// and deduplicated, and singleton diff lists. -pub struct UpdatesOwned { +pub struct UpdatesTyped { /// Outer key list (one entry per group of keys at the trie root). pub keys: Lists>, /// Per-key list of vals. @@ -66,7 +66,7 @@ pub struct UpdatesOwned { pub diffs: Lists>, } -impl Default for UpdatesOwned { +impl Default for UpdatesTyped { fn default() -> Self { Self { keys: Default::default(), @@ -77,13 +77,13 @@ impl Default for UpdatesOwned { } } -impl std::fmt::Debug for UpdatesOwned { +impl std::fmt::Debug for UpdatesTyped { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("UpdatesOwned").finish() + f.debug_struct("UpdatesTyped").finish() } } -impl Clone for UpdatesOwned { +impl Clone for UpdatesTyped { fn clone(&self) -> Self { Self { keys: self.keys.clone(), @@ -94,11 +94,11 @@ impl Clone for UpdatesOwned { } } -/// Borrowed view of an [`UpdatesOwned`] with the same four-field shape. +/// Borrowed view of an [`UpdatesTyped`] with the same four-field shape. /// -/// Reader code should consume an `UpdatesOwned` through this view rather than reading +/// Reader code should consume an `UpdatesTyped` through this view rather than reading /// fields directly. This decouples readers from the storage representation: the -/// view's shape stays the same whether the underlying `UpdatesOwned` holds owned +/// view's shape stays the same whether the underlying `UpdatesTyped` holds owned /// `Lists` or (later) `Stash`-backed columns that may be borrowed from wire bytes. pub struct UpdatesView<'a, U: Update> { /// Outer key list (one entry per group of keys at the trie root). @@ -159,7 +159,7 @@ impl<'a, U: Update> UpdatesView<'a, U> { } } -impl UpdatesOwned { +impl UpdatesTyped { /// Borrow the four columns as a single `UpdatesView`. pub fn view(&self) -> UpdatesView<'_, U> { UpdatesView { @@ -174,7 +174,7 @@ impl UpdatesOwned { /// `Stash`-backed update storage: each column may be typed (writable) or /// borrowed from wire bytes (read-only, zero-copy). /// -/// Construction sites work in [`UpdatesOwned`]; convert via `From` at the +/// Construction sites work in [`UpdatesTyped`]; convert via `From` at the /// boundary. Reader code uses [`UpdatesView`] via [`Updates::view`], which /// produces the same shape regardless of whether the columns are typed or /// borrowed. @@ -211,8 +211,8 @@ impl Clone for Updates { } } -impl From> for Updates { - fn from(owned: UpdatesOwned) -> Self { +impl From> for Updates { + fn from(owned: UpdatesTyped) -> Self { use columnar::bytes::stash::Stash; Self { keys: Stash::Typed(owned.keys), @@ -246,7 +246,7 @@ impl + Clone + 'static> Updates UpdatesOwned { + pub fn into_typed(mut self) -> UpdatesTyped { use columnar::bytes::stash::Stash; self.keys.make_typed(); self.vals.make_typed(); @@ -256,7 +256,7 @@ impl + Clone + 'static> Updates UpdatesOwned { +impl UpdatesTyped { /// Copies `other[key_range]` into self, keys and all. pub fn extend_from_keys(&mut self, other: UpdatesView<'_, U>, key_range: std::ops::Range) { @@ -336,14 +336,14 @@ impl UpdatesOwned { self.diffs.extend_from_self(other.diffs, time_range); } - /// Forms a consolidated `UpdatesOwned` trie from unsorted `(key, val, time, diff)` refs. + /// Forms a consolidated `UpdatesTyped` trie from unsorted `(key, val, time, diff)` refs. pub fn form_unsorted<'a>(unsorted: impl Iterator>>) -> Self { let mut data = unsorted.collect::>(); data.sort(); Self::form(data.into_iter()) } - /// Forms a consolidated `UpdatesOwned` trie from sorted `(key, val, time, diff)` refs. + /// Forms a consolidated `UpdatesTyped` trie from sorted `(key, val, time, diff)` refs. pub fn form<'a>(sorted: impl Iterator>>) -> Self { // Step 1: Streaming consolidation — accumulate diffs, drop zeros. @@ -418,7 +418,7 @@ impl UpdatesOwned { let (times, keep) = retain_items(self.times.borrow(), &keep[..]); let (vals, keep) = retain_items(self.vals.borrow(), &keep[..]); let (keys, _keep) = retain_items(self.keys.borrow(), &keep[..]); - UpdatesOwned { + UpdatesTyped { keys, vals, times, @@ -439,7 +439,7 @@ impl UpdatesOwned { /// /// Each field is independently typed — columnar refs, `&Owned`, owned values, /// or any other type the column container accepts via its `Push` impl. -impl Push<(KP, VP, TP, DP)> for UpdatesOwned +impl Push<(KP, VP, TP, DP)> for UpdatesTyped where ContainerOf: Push, ContainerOf: Push, @@ -459,13 +459,13 @@ where } /// PushInto for the `((K, V), T, R)` shape that reduce_trace uses. -impl timely::container::PushInto<((U::Key, U::Val), U::Time, U::Diff)> for UpdatesOwned { +impl timely::container::PushInto<((U::Key, U::Val), U::Time, U::Diff)> for UpdatesTyped { fn push_into(&mut self, ((key, val), time, diff): ((U::Key, U::Val), U::Time, U::Diff)) { self.push((&key, &val, &time, &diff)); } } -impl UpdatesOwned { +impl UpdatesTyped { /// Iterate all `(key, val, time, diff)` entries as refs. pub fn iter(&self) -> impl Iterator UpdatesOwned { } } -impl timely::Accountable for UpdatesOwned { +impl timely::Accountable for UpdatesTyped { #[inline] fn record_count(&self) -> i64 { Len::len(&self.diffs.values) as i64 } } -impl timely::dataflow::channels::ContainerBytes for UpdatesOwned { +impl timely::dataflow::channels::ContainerBytes for UpdatesTyped { fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } fn length_in_bytes(&self) -> usize { unimplemented!() } fn into_bytes(&self, _writer: &mut W) { unimplemented!() } } -/// An incremental trie builder that accepts sorted, consolidated `UpdatesOwned` chunks -/// and melds them into a single `UpdatesOwned` trie. +/// An incremental trie builder that accepts sorted, consolidated `UpdatesTyped` chunks +/// and melds them into a single `UpdatesTyped` trie. /// -/// The internal `UpdatesOwned` has open (unsealed) bounds at the keys, vals, and times +/// The internal `UpdatesTyped` has open (unsealed) bounds at the keys, vals, and times /// levels — the last group at each level has its values pushed but no corresponding /// bounds entry. `diffs.bounds` is always 1:1 with `times.values`. /// -/// `meld` accepts a consolidated `UpdatesOwned` whose first `(key, val, time)` is +/// `meld` accepts a consolidated `UpdatesTyped` whose first `(key, val, time)` is /// strictly greater than the builder's last `(key, val, time)`. The key and val /// may equal the builder's current open key/val, as long as the time is greater. /// -/// `done` seals all open bounds and returns the completed `UpdatesOwned`. +/// `done` seals all open bounds and returns the completed `UpdatesTyped`. pub struct UpdatesBuilder { /// Non-empty, consolidated updates. - updates: UpdatesOwned, + updates: UpdatesTyped, } impl UpdatesBuilder { @@ -511,7 +511,7 @@ impl UpdatesBuilder { /// Unseals the last group at keys, vals, and times levels so that /// subsequent `meld` calls can extend the open groups. /// If the updates are not consolidated none of this works. - pub fn new_from(mut updates: UpdatesOwned) -> Self { + pub fn new_from(mut updates: UpdatesTyped) -> Self { use columnar::Len; if Len::len(&updates.keys.values) > 0 { updates.keys.bounds.pop(); @@ -521,13 +521,13 @@ impl UpdatesBuilder { Self { updates } } - /// Meld a sorted, consolidated `UpdatesOwned` chunk into this builder. + /// Meld a sorted, consolidated `UpdatesTyped` chunk into this builder. /// /// The chunk's first `(key, val, time)` must be strictly greater than /// the builder's last `(key, val, time)`. Keys and vals may overlap /// (continue the current group), but times must be strictly increasing /// within the same `(key, val)`. - pub fn meld(&mut self, chunk: &UpdatesOwned) { + pub fn meld(&mut self, chunk: &UpdatesTyped) { use columnar::{Borrow, Index, Len}; if chunk.len() == 0 { return; } @@ -646,8 +646,8 @@ impl UpdatesBuilder { self.updates.diffs.extend_from_self(chunk.diffs.borrow(), 0..chunk_num_times); } - /// Seal all open bounds and return the completed `UpdatesOwned`. - pub fn done(mut self) -> UpdatesOwned { + /// Seal all open bounds and return the completed `UpdatesTyped`. + pub fn done(mut self) -> UpdatesTyped { use columnar::Len; if Len::len(&self.updates.keys.values) > 0 { // Seal the open time group. @@ -668,13 +668,13 @@ mod tests { type TestUpdate = (u64, u64, u64, i64); - fn collect(updates: &UpdatesOwned) -> Vec<(u64, u64, u64, i64)> { + fn collect(updates: &UpdatesTyped) -> Vec<(u64, u64, u64, i64)> { updates.iter().map(|(k, v, t, d)| (*k, *v, *t, *d)).collect() } #[test] fn test_push_and_consolidate_basic() { - let mut updates = UpdatesOwned::::default(); + let mut updates = UpdatesTyped::::default(); updates.push((&1, &10, &100, &1)); updates.push((&1, &10, &100, &2)); updates.push((&2, &20, &200, &5)); @@ -684,7 +684,7 @@ mod tests { #[test] fn test_cancellation() { - let mut updates = UpdatesOwned::::default(); + let mut updates = UpdatesTyped::::default(); updates.push((&1, &10, &100, &3)); updates.push((&1, &10, &100, &-3)); updates.push((&2, &20, &200, &1)); @@ -693,7 +693,7 @@ mod tests { #[test] fn test_multiple_vals_and_times() { - let mut updates = UpdatesOwned::::default(); + let mut updates = UpdatesTyped::::default(); updates.push((&1, &10, &100, &1)); updates.push((&1, &10, &200, &2)); updates.push((&1, &20, &100, &3)); @@ -703,7 +703,7 @@ mod tests { #[test] fn test_val_cancellation_propagates() { - let mut updates = UpdatesOwned::::default(); + let mut updates = UpdatesTyped::::default(); updates.push((&1, &10, &100, &5)); updates.push((&1, &10, &100, &-5)); updates.push((&1, &20, &100, &1)); @@ -712,13 +712,13 @@ mod tests { #[test] fn test_empty() { - let updates = UpdatesOwned::::default(); + let updates = UpdatesTyped::::default(); assert_eq!(collect(&updates.consolidate()), vec![]); } #[test] fn test_total_cancellation() { - let mut updates = UpdatesOwned::::default(); + let mut updates = UpdatesTyped::::default(); updates.push((&1, &10, &100, &1)); updates.push((&1, &10, &100, &-1)); assert_eq!(collect(&updates.consolidate()), vec![]); @@ -726,7 +726,7 @@ mod tests { #[test] fn test_unsorted_input() { - let mut updates = UpdatesOwned::::default(); + let mut updates = UpdatesTyped::::default(); updates.push((&3, &30, &300, &1)); updates.push((&1, &10, &100, &2)); updates.push((&2, &20, &200, &3)); @@ -735,7 +735,7 @@ mod tests { #[test] fn test_first_key_cancels() { - let mut updates = UpdatesOwned::::default(); + let mut updates = UpdatesTyped::::default(); updates.push((&1, &10, &100, &5)); updates.push((&1, &10, &100, &-5)); updates.push((&2, &20, &200, &3)); @@ -744,7 +744,7 @@ mod tests { #[test] fn test_middle_time_cancels() { - let mut updates = UpdatesOwned::::default(); + let mut updates = UpdatesTyped::::default(); updates.push((&1, &10, &100, &1)); updates.push((&1, &10, &200, &2)); updates.push((&1, &10, &200, &-2)); @@ -754,7 +754,7 @@ mod tests { #[test] fn test_first_val_cancels() { - let mut updates = UpdatesOwned::::default(); + let mut updates = UpdatesTyped::::default(); updates.push((&1, &10, &100, &1)); updates.push((&1, &10, &100, &-1)); updates.push((&1, &20, &100, &5)); @@ -763,7 +763,7 @@ mod tests { #[test] fn test_interleaved_cancellations() { - let mut updates = UpdatesOwned::::default(); + let mut updates = UpdatesTyped::::default(); updates.push((&1, &10, &100, &1)); updates.push((&1, &10, &100, &-1)); updates.push((&2, &20, &200, &7)); From d7f0f2f61cf0c32f4caec3b0729c3792f46d1cbd Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 29 Apr 2026 20:40:28 -0400 Subject: [PATCH 09/10] impl ContainerBytes for RecordedUpdates --- differential-dataflow/src/columnar/mod.rs | 59 +++++++++++++++++++++-- 1 file changed, 54 insertions(+), 5 deletions(-) diff --git a/differential-dataflow/src/columnar/mod.rs b/differential-dataflow/src/columnar/mod.rs index f5606634f..0a22398b9 100644 --- a/differential-dataflow/src/columnar/mod.rs +++ b/differential-dataflow/src/columnar/mod.rs @@ -4,8 +4,9 @@ //! changes; do not rely on stability across releases. //! //! Known rough edges: -//! - `ContainerBytes` for `RecordedUpdates` and `UpdatesTyped` is `unimplemented!()`; -//! multi-process dataflows that exchange these containers will panic. +//! - `ContainerBytes` for `UpdatesTyped` is `unimplemented!()`. The wire-side +//! container is `RecordedUpdates`, whose `ContainerBytes` is implemented; +//! `UpdatesTyped` is the input-builder type and isn't shipped over channels. //! - `leave_dynamic` consolidates eagerly on each batch; the //! [`crate::dynamic`] counterpart defers consolidation. Same observable //! semantics, different work distribution. @@ -70,9 +71,57 @@ impl timely::Accountable for RecordedUpdates { } impl timely::dataflow::channels::ContainerBytes for RecordedUpdates { - fn from_bytes(_bytes: timely::bytes::arc::Bytes) -> Self { unimplemented!() } - fn length_in_bytes(&self) -> usize { unimplemented!() } - fn into_bytes(&self, _writer: &mut W) { unimplemented!() } + fn from_bytes(mut bytes: timely::bytes::arc::Bytes) -> Self { + // Header: records, consolidated, and four column lengths (all u64). + let header = bytes.extract_to(48); + let records = u64::from_le_bytes(header[0..8].try_into().unwrap()) as usize; + let consolidated = u64::from_le_bytes(header[8..16].try_into().unwrap()) != 0; + let keys_len = u64::from_le_bytes(header[16..24].try_into().unwrap()) as usize; + let vals_len = u64::from_le_bytes(header[24..32].try_into().unwrap()) as usize; + let times_len = u64::from_le_bytes(header[32..40].try_into().unwrap()) as usize; + let diffs_len = u64::from_le_bytes(header[40..48].try_into().unwrap()) as usize; + // Slice the per-column byte ranges and wrap each as `Stash::Bytes`. + let keys_bytes = bytes.extract_to(keys_len); + let vals_bytes = bytes.extract_to(vals_len); + let times_bytes = bytes.extract_to(times_len); + let diffs_bytes = bytes.extract_to(diffs_len); + use columnar::bytes::stash::Stash; + let keys = Stash::try_from_bytes(keys_bytes).expect("keys decode failed"); + let vals = Stash::try_from_bytes(vals_bytes).expect("vals decode failed"); + let times = Stash::try_from_bytes(times_bytes).expect("times decode failed"); + let diffs = Stash::try_from_bytes(diffs_bytes).expect("diffs decode failed"); + RecordedUpdates { + updates: updates::Updates { keys, vals, times, diffs }, + records, + consolidated, + } + } + + fn length_in_bytes(&self) -> usize { + 48 + self.updates.keys.length_in_bytes() + + self.updates.vals.length_in_bytes() + + self.updates.times.length_in_bytes() + + self.updates.diffs.length_in_bytes() + } + + fn into_bytes(&self, writer: &mut W) { + let keys_len = self.updates.keys.length_in_bytes() as u64; + let vals_len = self.updates.vals.length_in_bytes() as u64; + let times_len = self.updates.times.length_in_bytes() as u64; + let diffs_len = self.updates.diffs.length_in_bytes() as u64; + // Header. + writer.write_all(&(self.records as u64).to_le_bytes()).unwrap(); + writer.write_all(&(self.consolidated as u64).to_le_bytes()).unwrap(); + writer.write_all(&keys_len.to_le_bytes()).unwrap(); + writer.write_all(&vals_len.to_le_bytes()).unwrap(); + writer.write_all(×_len.to_le_bytes()).unwrap(); + writer.write_all(&diffs_len.to_le_bytes()).unwrap(); + // Body: each Stash writes its own indexed encoding. + self.updates.keys.write_bytes(writer).unwrap(); + self.updates.vals.write_bytes(writer).unwrap(); + self.updates.times.write_bytes(writer).unwrap(); + self.updates.diffs.write_bytes(writer).unwrap(); + } } // Container trait impls for RecordedUpdates, enabling iterative scopes. From 4c04a3c5e235408f2e6c873d89632a8c4dcd792f Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 29 Apr 2026 21:36:49 -0400 Subject: [PATCH 10/10] Fixup ddir_col example --- interactive/examples/ddir_col.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/interactive/examples/ddir_col.rs b/interactive/examples/ddir_col.rs index 6610bd199..33d50ed27 100644 --- a/interactive/examples/ddir_col.rs +++ b/interactive/examples/ddir_col.rs @@ -260,7 +260,7 @@ mod render { let label = label.clone(); nodes.insert(id, Rendered::Collection(col.inspect_container(move |event| { if let Ok((_time, container)) = event { - for (k, v, t, d) in container.updates.iter() { + for (k, v, t, d) in container.updates.view().iter() { eprintln!(" [{}] ({:?}, {:?}, {:?}, {:?})", label, ::into_owned(k), ::into_owned(v),