From bfc28ae71d74f4874bf01e082a555cdd89ed909d Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 2 May 2026 18:01:43 -0400 Subject: [PATCH] Further Columnar work --- differential-dataflow/examples/spines.rs | 19 ++++--- differential-dataflow/src/columnar/builder.rs | 49 +++++++++---------- differential-dataflow/src/columnar/updates.rs | 44 ++++++++++++++++- 3 files changed, 77 insertions(+), 35 deletions(-) diff --git a/differential-dataflow/examples/spines.rs b/differential-dataflow/examples/spines.rs index c8df1db10..03851ebd1 100644 --- a/differential-dataflow/examples/spines.rs +++ b/differential-dataflow/examples/spines.rs @@ -29,10 +29,11 @@ trait Workload { struct RowWorkload { data_input: InputSession, keys_input: InputSession, + pad: usize, } impl Workload for RowWorkload { - fn insert_data(&mut self, val: usize) { self.data_input.insert(format!("{:?}", val)); } - fn insert_keys(&mut self, val: usize) { self.keys_input.insert(format!("{:?}", val)); } + fn insert_data(&mut self, val: usize) { self.data_input.insert(format!("{:0width$}", val, width = self.pad)); } + fn insert_keys(&mut self, val: usize) { self.keys_input.insert(format!("{:0width$}", val, width = self.pad)); } fn advance_to(&mut self, t: u64) { self.data_input.advance_to(t); self.data_input.flush(); self.keys_input.advance_to(t); self.keys_input.flush(); @@ -46,17 +47,18 @@ struct ColWorkload { data_input: InputHandle, keys_input: InputHandle, buf: String, + pad: usize, } impl Workload for ColWorkload { fn insert_data(&mut self, val: usize) { self.buf.clear(); - write!(&mut self.buf, "{:?}", val).unwrap(); + write!(&mut self.buf, "{:0width$}", val, width = self.pad).unwrap(); let t = *self.data_input.time(); self.data_input.send((self.buf.as_str(), (), t, 1i64)); } fn insert_keys(&mut self, val: usize) { self.buf.clear(); - write!(&mut self.buf, "{:?}", val).unwrap(); + write!(&mut self.buf, "{:0width$}", val, width = self.pad).unwrap(); let t = *self.keys_input.time(); self.keys_input.send((self.buf.as_str(), (), t, 1i64)); } @@ -72,8 +74,9 @@ fn main() { let size: usize = std::env::args().nth(2).unwrap().parse().unwrap(); let mode: String = std::env::args().nth(3).unwrap(); + let pad: usize = std::env::args().nth(4).and_then(|s| s.parse().ok()).unwrap_or(0); - println!("Running [{:?}] arrangement", mode); + println!("Running [{:?}] arrangement (pad={})", mode, pad); let timer1 = ::std::time::Instant::now(); let timer2 = timer1.clone(); @@ -94,7 +97,7 @@ fn main() { let keys = keys.arrange::, RcOrdKeyBuilder, OrdKeySpine>(); keys.join_core(data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); - Box::new(RowWorkload { data_input, keys_input }) as Box + Box::new(RowWorkload { data_input, keys_input, pad }) as Box }, "val" => { use differential_dataflow::trace::implementations::ord_neu::{OrdValBatcher, RcOrdValBuilder, OrdValSpine}; @@ -104,7 +107,7 @@ fn main() { let keys = keys.map(|x| (x, ())).arrange::, RcOrdValBuilder, OrdValSpine>(); keys.join_core(data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); - Box::new(RowWorkload { data_input, keys_input }) + Box::new(RowWorkload { data_input, keys_input, pad }) }, "col" => { use timely::dataflow::operators::Input as _; @@ -133,7 +136,7 @@ fn main() { keys.join_core(data, |_k, (), ()| Option::<()>::None) .probe_with(&mut probe); - Box::new(ColWorkload { data_input, keys_input, buf: String::new() }) + Box::new(ColWorkload { data_input, keys_input, buf: String::new(), pad }) }, _ => { panic!("unrecognized mode: {:?}", mode); diff --git a/differential-dataflow/src/columnar/builder.rs b/differential-dataflow/src/columnar/builder.rs index 9b616e26b..e9ad6becd 100644 --- a/differential-dataflow/src/columnar/builder.rs +++ b/differential-dataflow/src/columnar/builder.rs @@ -5,7 +5,7 @@ //! queues it. `finish` produces one final trie from any remaining tuples. use std::collections::VecDeque; -use columnar::{Columnar, Clear, Len, Push}; +use columnar::{Clear, Columnar, Len, Push}; use super::layout::ColumnarUpdate as Update; use super::updates::UpdatesTyped; @@ -23,19 +23,31 @@ pub struct ValBuilder { pending: VecDeque>, } +impl ValBuilder { + /// Wrap `self.current` as a stride-1 `UpdatesTyped`, consolidate it, and queue. + /// + /// Reclaims the column allocations: takes `self.current` by value, consolidates + /// (which builds a fresh trie via Push), then clears the now-stale columns and + /// returns them to `self.current` so capacity is retained across flushes. + fn flush(&mut self) { + if self.current.len() == 0 { return; } + let records = self.current.len(); + let raw: UpdatesTyped = std::mem::take(&mut self.current).into(); + let updates = raw.consolidate().into(); + // Reclaim raw's column allocations. + self.current = (raw.keys.values, raw.vals.values, raw.times.values, raw.diffs.values); + self.current.clear(); + self.pending.push_back(RecordedUpdates { updates, records, consolidated: true }); + } +} + use timely::container::PushInto; impl PushInto for ValBuilder where TupleContainer : Push { #[inline] fn push_into(&mut self, item: T) { self.current.push(item); if self.current.len() > crate::columnar::LINK_TARGET { - use columnar::{Borrow, Index}; - let records = self.current.len(); - let mut refs = self.current.borrow().into_index_iter().collect::>(); - refs.sort(); - let updates = UpdatesTyped::form(refs.into_iter()).into(); - self.pending.push_back(RecordedUpdates { updates, records, consolidated: true }); - self.current.clear(); + self.flush(); } } } @@ -56,27 +68,14 @@ impl ContainerBuilder for ValBuilder { #[inline] fn extract(&mut self) -> Option<&mut Self::Container> { - if let Some(container) = self.pending.pop_front() { - self.empty = Some(container); - self.empty.as_mut() - } else { - None - } + self.empty = self.pending.pop_front(); + self.empty.as_mut() } #[inline] fn finish(&mut self) -> Option<&mut Self::Container> { - if !self.current.is_empty() { - use columnar::{Borrow, Index}; - let records = self.current.len(); - let mut refs = self.current.borrow().into_index_iter().collect::>(); - refs.sort(); - let updates = UpdatesTyped::form(refs.into_iter()).into(); - self.pending.push_back(RecordedUpdates { updates, records, consolidated: true }); - self.current.clear(); - } - self.empty = self.pending.pop_front(); - self.empty.as_mut() + self.flush(); + self.extract() } } diff --git a/differential-dataflow/src/columnar/updates.rs b/differential-dataflow/src/columnar/updates.rs index a9d263dd0..5043a3d93 100644 --- a/differential-dataflow/src/columnar/updates.rs +++ b/differential-dataflow/src/columnar/updates.rs @@ -263,6 +263,24 @@ impl + Clone + 'static> Updates = (::Key, ::Val, ::Time, ::Diff); +/// Wrap a flat `Tuple` columnar container as a stride-1 [`UpdatesTyped`]. +/// +/// Each layer gets `Strides::new(1, n)` bounds, where `n` is the record count. +/// The result is unsorted and unconsolidated; pass it through `consolidate()` to +/// canonicalize. Allocation-free: the four column containers move in place. +impl From<(ContainerOf, ContainerOf, ContainerOf, ContainerOf)> for UpdatesTyped { + fn from(container: (ContainerOf, ContainerOf, ContainerOf, ContainerOf)) -> Self { + let (k_col, v_col, t_col, d_col) = container; + let n = k_col.len() as u64; + Self { + keys: Lists { values: k_col, bounds: Strides::new(1, n) }, + vals: Lists { values: v_col, bounds: Strides::new(1, n) }, + times: Lists { values: t_col, bounds: Strides::new(1, n) }, + diffs: Lists { values: d_col, bounds: Strides::new(1, n) }, + } + } +} + /// Returns the value-index range for list `i` given cumulative bounds. #[inline] pub fn child_range>(bounds: B, i: usize) -> std::ops::Range { @@ -339,7 +357,7 @@ impl UpdatesTyped { /// Forms a consolidated `UpdatesTyped` trie from unsorted `(key, val, time, diff)` refs. pub fn form_unsorted<'a>(unsorted: impl Iterator>>) -> Self { let mut data = unsorted.collect::>(); - data.sort(); + data.sort_by(|(ak, av, at, _), (bk, bv, bt, _)| (ak, av, at).cmp(&(bk, bv, bt))); Self::form(data.into_iter()) } @@ -402,7 +420,29 @@ impl UpdatesTyped { /// Consolidates into canonical trie form: /// single outer key list, all lists sorted and deduplicated, /// diff lists are singletons (or absent if cancelled). - pub fn consolidate(self) -> Self { Self::form_unsorted(self.iter()) } + pub fn consolidate(&self) -> Self { + // Fast path: stride-1 at every layer (flat tuple list, e.g. produced by + // `From`). Skip the four-level trie iter and walk the + // underlying value columns directly. + if self.keys.bounds.strided() == Some(1) + && self.vals.bounds.strided() == Some(1) + && self.times.bounds.strided() == Some(1) + && self.diffs.bounds.strided() == Some(1) + { + let n = Len::len(&self.keys.values); + // Tuple up borrows of the four columns — `Index::get(i)` on the + // tuple dispatches to each component, yielding a `Ref>` + // in a single call. + let view = ( + self.keys.values.borrow(), + self.vals.values.borrow(), + self.times.values.borrow(), + self.diffs.values.borrow(), + ); + return Self::form_unsorted((0..n).map(|i| view.get(i))); + } + Self::form_unsorted(self.iter()) + } /// Drop entries whose diff list is empty (cancelled), rebuilding the trie. pub fn filter_zero(self) -> Self { if self.diffs.bounds.strided() == Some(1) { self }