From ab4974bf3bb09d12d7706848388baca4e606948f Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sun, 26 Apr 2026 14:50:31 -0400 Subject: [PATCH 1/5] Restore pre-#725 spines.rs and inline EditList::load MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Brings back the spines arrangement bake-off (deleted in #724 Spring cleaning, then RHH-dependent) with three modes: `key` (OrdKeySpine), `val` (OrdValSpine with Val=()), and `col` (columnar ValSpine via the columnar module added in #730). All three feed the same Vec-shaped input collections through one driver loop; `col` repacks via a small in-dataflow `unary` (`ToRecorded`) that builds `RecordedUpdates` containers before `arrange_core`. Bisecting against the example exposed a regression introduced in #725: EditList::load now delegates to populate_key, which seek_keys + checks + rewinds vals on every call. In the merge-join inner loop (join.rs Ordering::Equal arm), the cursor is already positioned by the upstream `match trace_key.cmp(&batch_key)` work, so the seek is redundant. Repeated 1M times in the spines query phase, this added ~3s (+40% queries time vs pre-#725 baseline). Restoring EditList::load to its pre-#725 division of labor — assume the cursor is positioned, walk vals inline — recovers performance. populate_key and replay_key keep the seek for callers that legitimately need it (reduce, ValueHistory). The Option-based meet API from #725 stays. Measurements (1M keys, 1000 size, key mode): - v0.23.0 baseline: 6.56s queries - pre-#725 (f4e75503): 7.16s queries - master HEAD before this commit: 10.12s queries - this commit: 7.00s queries Co-Authored-By: Claude Opus 4.7 (1M context) --- differential-dataflow/examples/spines.rs | 148 ++++++++++++++++++++ differential-dataflow/src/operators/join.rs | 4 +- differential-dataflow/src/operators/mod.rs | 22 ++- 3 files changed, 168 insertions(+), 6 deletions(-) create mode 100644 differential-dataflow/examples/spines.rs diff --git a/differential-dataflow/examples/spines.rs b/differential-dataflow/examples/spines.rs new file mode 100644 index 000000000..e9ffb26e0 --- /dev/null +++ b/differential-dataflow/examples/spines.rs @@ -0,0 +1,148 @@ +use timely::dataflow::operators::probe::Handle; + +use differential_dataflow::input::Input; + +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + +fn main() { + + let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap(); + let size: usize = std::env::args().nth(2).unwrap().parse().unwrap(); + + let mode: String = std::env::args().nth(3).unwrap(); + + println!("Running [{:?}] arrangement", mode); + + let timer1 = ::std::time::Instant::now(); + let timer2 = timer1.clone(); + + // define a new computational scope, in which to run BFS + timely::execute_from_args(std::env::args(), move |worker| { + + // define BFS dataflow; return handles to roots and edges inputs + let mut probe = Handle::new(); + let (mut data_input, mut keys_input) = worker.dataflow(|scope| { + + use differential_dataflow::operators::{arrange::Arrange}; + + let (data_input, data) = scope.new_collection::(); + let (keys_input, keys) = scope.new_collection::(); + + match mode.as_str() { + "key" => { + use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine}; + let data = data.arrange::, RcOrdKeyBuilder, OrdKeySpine>(); + let keys = keys.arrange::, RcOrdKeyBuilder, OrdKeySpine>(); + keys.join_core(data, |_k, &(), &()| Option::<()>::None) + .probe_with(&mut probe); + }, + "val" => { + use differential_dataflow::trace::implementations::ord_neu::{OrdValBatcher, RcOrdValBuilder, OrdValSpine}; + let data = data.map(|x| (x, ())).arrange::, RcOrdValBuilder, OrdValSpine>(); + let keys = keys.map(|x| (x, ())).arrange::, RcOrdValBuilder, OrdValSpine>(); + keys.join_core(data, |_k, &(), &()| Option::<()>::None) + .probe_with(&mut probe); + }, + "col" => { + use timely::dataflow::operators::generic::Operator; + use timely::dataflow::channels::pact::Pipeline; + use differential_dataflow::columnar::{ValBatcher, ValBuilder, ValColBuilder, ValPact, ValSpine}; + use differential_dataflow::operators::arrange::arrangement::arrange_core; + + type DataU = (String, (), u64, i64); + + fn string_hash(s: columnar::Ref<'_, String>) -> u64 { + use std::hash::{Hash, Hasher}; + let mut h = std::collections::hash_map::DefaultHasher::new(); + s.hash(&mut h); + h.finish() + } + + // Convert a Vec-container stream of `((K, ()), Time, Diff)` into + // a `RecordedUpdates`-container stream feeding the columnar batcher. + fn to_recorded<'scope>( + stream: timely::dataflow::Stream<'scope, u64, Vec<((String, ()), u64, isize)>>, + ) -> timely::dataflow::Stream<'scope, u64, differential_dataflow::columnar::RecordedUpdates> { + stream.unary::, _, _, _>(Pipeline, "ToRecorded", |_, _| { + move |input, output| { + input.for_each(|cap, batch| { + let mut session = output.session_with_builder(&cap); + for ((k, _), t, d) in batch.drain(..) { + session.give((k.as_str(), (), t, d as i64)); + } + }); + } + }) + } + + let data_stream = to_recorded(data.map(|x| (x, ())).inner); + let keys_stream = to_recorded(keys.map(|x| (x, ())).inner); + + let data = arrange_core::<_, ValBatcher, ValBuilder, ValSpine>( + data_stream, ValPact { hashfunc: |k: columnar::Ref<'_, String>| string_hash(k) }, "DataArrange", + ); + let keys = arrange_core::<_, ValBatcher, ValBuilder, ValSpine>( + keys_stream, ValPact { hashfunc: |k: columnar::Ref<'_, String>| string_hash(k) }, "KeysArrange", + ); + keys.join_core(data, |_k, (), ()| Option::<()>::None) + .probe_with(&mut probe); + }, + _ => { + println!("unrecognized mode: {:?}", mode) + } + } + + (data_input, keys_input) + }); + + // Load up data in batches. + let mut counter = 0; + while counter < 10 * keys { + let mut i = worker.index(); + while i < size { + let val = (counter + i) % keys; + data_input.insert(format!("{:?}", val)); + i += worker.peers(); + } + counter += size; + data_input.advance_to(data_input.time() + 1); + data_input.flush(); + keys_input.advance_to(keys_input.time() + 1); + keys_input.flush(); + while probe.less_than(data_input.time()) { + worker.step(); + } + } + println!("{:?}\tloading complete", timer1.elapsed()); + + let mut queries = 0; + + while queries < 10 * keys { + let mut i = worker.index(); + while i < size { + let val = (queries + i) % keys; + keys_input.insert(format!("{:?}", val)); + i += worker.peers(); + } + queries += size; + data_input.advance_to(data_input.time() + 1); + data_input.flush(); + keys_input.advance_to(keys_input.time() + 1); + keys_input.flush(); + while probe.less_than(data_input.time()) { + worker.step(); + } + } + + println!("{:?}\tqueries complete", timer1.elapsed()); + + // loop { } + + }).unwrap(); + + println!("{:?}\tshut down", timer2.elapsed()); + +} diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index f76ab12f3..15829b605 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -370,8 +370,8 @@ where Ordering::Greater => batch.seek_key(batch_storage, trace_key), Ordering::Equal => { - thinker.history1.edits.load(trace, trace_storage, trace_key, Some(meet)); - thinker.history2.edits.load(batch, batch_storage, batch_key, None); + thinker.history1.edits.load(trace, trace_storage, Some(meet)); + thinker.history2.edits.load(batch, batch_storage, None); // populate `temp` with the results in the best way we know how. thinker.think(|v1,v2,t,r1,r2| { diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index 91a9896bf..4260c2b95 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -24,7 +24,7 @@ pub struct EditList { edits: Vec<(T, D)>, } -impl EditList { +impl EditList { /// Creates an empty list of edits. #[inline] fn new() -> Self { @@ -33,12 +33,26 @@ impl EditList { edits: Vec::new(), } } - /// Loads the contents of a cursor at `key`, advancing times by `meet` if supplied. - fn load<'a, C>(&mut self, cursor: &mut C, storage: &'a C::Storage, key: C::Key<'a>, meet: Option<&T>) + /// Walks the cursor's vals at the current key into `self`, advancing times by `meet` if supplied. + /// + /// The cursor is assumed to be positioned at a key already; callers that need + /// to seek should use [`Cursor::populate_key`] (or [`ValueHistory::replay_key`]) + /// instead. This split avoids a redundant seek in the merge-join inner loop, + /// where the cursor is positioned by the upstream merge step. + fn load<'a, C>(&mut self, cursor: &mut C, storage: &'a C::Storage, meet: Option<&T>) where C: Cursor = V, Time = T, Diff = D>, { - cursor.populate_key(storage, key, meet, self); + self.clear(); + while let Some(val) = cursor.get_val(storage) { + cursor.map_times(storage, |time, diff| { + let mut t = C::owned_time(time); + if let Some(m) = meet { t.join_assign(m); } + self.push(t, C::owned_diff(diff)); + }); + self.seal(val); + cursor.step_val(storage); + } } /// Clears the list of edits. #[inline] From 30859c4a26bce788bfbc2708bba28becf1891cf8 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 27 Apr 2026 16:42:32 -0400 Subject: [PATCH 2/5] Tighten up spines examples --- differential-dataflow/examples/spines.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/differential-dataflow/examples/spines.rs b/differential-dataflow/examples/spines.rs index e9ffb26e0..630376ec7 100644 --- a/differential-dataflow/examples/spines.rs +++ b/differential-dataflow/examples/spines.rs @@ -68,10 +68,12 @@ fn main() { ) -> timely::dataflow::Stream<'scope, u64, differential_dataflow::columnar::RecordedUpdates> { stream.unary::, _, _, _>(Pipeline, "ToRecorded", |_, _| { move |input, output| { - input.for_each(|cap, batch| { + input.for_each_time(|cap, batches| { let mut session = output.session_with_builder(&cap); - for ((k, _), t, d) in batch.drain(..) { - session.give((k.as_str(), (), t, d as i64)); + for batch in batches { + for ((k, _), t, d) in batch.drain(..) { + session.give((k.as_str(), (), t, d as i64)); + } } }); } @@ -91,7 +93,7 @@ fn main() { .probe_with(&mut probe); }, _ => { - println!("unrecognized mode: {:?}", mode) + panic!("unrecognized mode: {:?}", mode); } } From 125a1af6fe06d7d18bba25c3e1398d153e65bfca Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 27 Apr 2026 17:32:16 -0400 Subject: [PATCH 3/5] Extract common target columnar size --- .../src/columnar/arrangement/mod.rs | 2 +- .../src/columnar/arrangement/trie_merger.rs | 28 +++---------------- differential-dataflow/src/columnar/builder.rs | 2 +- differential-dataflow/src/columnar/mod.rs | 3 ++ 4 files changed, 9 insertions(+), 26 deletions(-) diff --git a/differential-dataflow/src/columnar/arrangement/mod.rs b/differential-dataflow/src/columnar/arrangement/mod.rs index fe8efa246..d9852961c 100644 --- a/differential-dataflow/src/columnar/arrangement/mod.rs +++ b/differential-dataflow/src/columnar/arrangement/mod.rs @@ -180,7 +180,7 @@ pub mod batcher { use super::super::updates::Updates; impl timely::container::SizableContainer for Updates { - fn at_capacity(&self) -> bool { self.diffs.values.len() >= 64 * 1024 } + fn at_capacity(&self) -> bool { self.diffs.values.len() >= crate::columnar::LINK_TARGET } fn ensure_capacity(&mut self, _stash: &mut Option) { } } diff --git a/differential-dataflow/src/columnar/arrangement/trie_merger.rs b/differential-dataflow/src/columnar/arrangement/trie_merger.rs index 964685196..8994a9ced 100644 --- a/differential-dataflow/src/columnar/arrangement/trie_merger.rs +++ b/differential-dataflow/src/columnar/arrangement/trie_merger.rs @@ -62,7 +62,7 @@ fn form_chunks<'a, U: Update>( ) { let mut sorted = sorted.peekable(); while sorted.peek().is_some() { - let chunk = Updates::::form((&mut sorted).take(64 * 1024)); + let chunk = Updates::::form((&mut sorted).take(crate::columnar::LINK_TARGET)); if chunk.len() > 0 { output.push(chunk); } @@ -152,28 +152,6 @@ where }); } } - - - // // Flatten the sorted, consolidated chain into refs. - // let all = merged.iter().flat_map(|chunk| chunk.iter()); - - // // Partition into two sorted streams by time. - // let mut time_owned = U::Time::default(); - // let mut keep_vec = Vec::new(); - // let mut ship_vec = Vec::new(); - // for (k, v, t, d) in all { - // Columnar::copy_from(&mut time_owned, t); - // if upper.less_equal(&time_owned) { - // frontier.insert_ref(&time_owned); - // keep_vec.push((k, v, t, d)); - // } else { - // ship_vec.push((k, v, t, d)); - // } - // } - - // // Build chunks via form (which consolidates internally). - // form_chunks::(keep_vec.into_iter(), kept); - // form_chunks::(ship_vec.into_iter(), ship); } fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { @@ -306,6 +284,8 @@ where builder: &mut ChainBuilder, stash: &mut Vec>, ) { + // TODO: Optimization for one batch exceeding the other. + let ((k0_idx, v0_idx, t0_idx), updates0) = batch1.take().unwrap(); let ((k1_idx, v1_idx, t1_idx), updates1) = batch2.take().unwrap(); @@ -669,7 +649,7 @@ impl ChainBuilder { link = link.filter_zero(); if link.len() > 0 { if let Some(last) = self.updates.last_mut() { - if last.len() + link.len() < 2 * 64 * 1024 { + if last.len() + link.len() < 2 * crate::columnar::LINK_TARGET { let mut build = super::super::updates::UpdatesBuilder::new_from(std::mem::take(last)); build.meld(&link); *last = build.done(); diff --git a/differential-dataflow/src/columnar/builder.rs b/differential-dataflow/src/columnar/builder.rs index 0612e7880..22bf48e44 100644 --- a/differential-dataflow/src/columnar/builder.rs +++ b/differential-dataflow/src/columnar/builder.rs @@ -28,7 +28,7 @@ impl PushInto for ValBuilder where TupleContainer : Push< #[inline] fn push_into(&mut self, item: T) { self.current.push(item); - if self.current.len() > 1024 * 1024 { + if self.current.len() > crate::columnar::LINK_TARGET { use columnar::{Borrow, Index}; let records = self.current.len(); let mut refs = self.current.borrow().into_index_iter().collect::>(); diff --git a/differential-dataflow/src/columnar/mod.rs b/differential-dataflow/src/columnar/mod.rs index 887774bf8..b32a4763c 100644 --- a/differential-dataflow/src/columnar/mod.rs +++ b/differential-dataflow/src/columnar/mod.rs @@ -41,6 +41,9 @@ pub use builder::ValBuilder as ValColBuilder; pub use exchange::ValPact; pub use arrangement::{ValBatcher, ValBuilder, ValSpine}; +/// Target size for update batches, in number of updates. +pub const LINK_TARGET: usize = 64 * 1024; + /// A thin wrapper around `Updates` that tracks the pre-consolidation record count /// for timely's exchange accounting. This wrapper is the stream container type; /// the `TrieChunker` strips it, passing bare `Updates` into the merge batcher. From 777eff0565c076ba0a0817b8aa682f879672fb9a Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 28 Apr 2026 20:58:58 -0400 Subject: [PATCH 4/5] TrieChunker work --- .../src/columnar/arrangement/mod.rs | 123 +++++++++++++++--- .../src/columnar/exchange.rs | 21 ++- 2 files changed, 120 insertions(+), 24 deletions(-) diff --git a/differential-dataflow/src/columnar/arrangement/mod.rs b/differential-dataflow/src/columnar/arrangement/mod.rs index d9852961c..82bc1598c 100644 --- a/differential-dataflow/src/columnar/arrangement/mod.rs +++ b/differential-dataflow/src/columnar/arrangement/mod.rs @@ -128,44 +128,127 @@ use crate::trace::implementations::merge_batcher::MergeBatcher; type ValBatcher2 = MergeBatcher, TrieChunker, trie_merger::TrieMerger>; /// A chunker that unwraps `RecordedUpdates` into bare `Updates` for the merge batcher. -/// The `records` accounting is discarded here — it has served its purpose for exchange. /// -/// IMPORTANT: This chunker assumes the input `Updates` are sorted and consolidated -/// (as produced by `ValColBuilder::form`). The downstream `InternalMerge` relies on -/// this invariant. If `RecordedUpdates` could carry unsorted data (e.g. from a `map`), -/// we would need either a sorting chunker for that case, or a type-level distinction -/// (e.g. `RecordedUpdates` vs `RecordedUpdates`) to -/// route to the right chunker. +/// The intended behavior is to produce chunks whose size is within 1-2x `LINK_TARGET`. +/// It ships large batches immediately, accumulates small batches, consolidates as they +/// exceed 2xLINK_TARGET, and ships them unless they drop below 1xLINK_TARGET. +/// +/// The flow is into (or around) `self.stage`, then consolidated blocks into `self.ready`, +/// each of which is put in `self.stage` pub struct TrieChunker { + /// Insufficiently large updates we haven't figured out how to ship yet. + blobs: Vec<(Updates, bool)>, + /// Sum of `len()` across `blobs`. + blob_records: usize, + /// Ready-to-emit chunks. Each is sorted and consolidated; size ≥ `LINK_TARGET` + /// (or smaller, only for the final chunk produced by `finish`). ready: std::collections::VecDeque>, - empty: Option>, + /// Staging area for the next pull call. + stage: Option>, } impl Default for TrieChunker { - fn default() -> Self { Self { ready: Default::default(), empty: None } } + fn default() -> Self { + Self { + blobs: Default::default(), + blob_records: 0, + ready: Default::default(), + stage: None, + } + } +} + +impl TrieChunker { + /// Consolidate and empty `self.blobs`, into `self.ready` if large enough or else return. + fn consolidate_blobs(&mut self) -> Updates { + // Single consolidated entry: pass through, no work. + if self.blobs.len() == 1 && self.blobs[0].1 { + let (result, _) = self.blobs.pop().unwrap(); + self.blob_records = 0; + return result; + } + + // TODO: Improve consolidation through column-oriented sorts. + let result = Updates::::form_unsorted(self.blobs.iter().flat_map(|(u, _)| u.iter())); + self.blobs.clear(); + self.blob_records = 0; + result + } + + /// Push a non-empty `Updates` into blobs and update accounting. + fn absorb(&mut self, updates: Updates, consolidated: bool) { + self.blob_records += updates.len(); + self.blobs.push((updates, consolidated)); + } } impl<'a, U: super::layout::ColumnarUpdate> timely::container::PushInto<&'a mut RecordedUpdates> for TrieChunker { fn push_into(&mut self, container: &'a mut RecordedUpdates) { - let mut updates = std::mem::take(&mut container.updates); - if !container.consolidated { updates = updates.consolidate(); } - if updates.len() > 0 { self.ready.push_back(updates); } + // Early return if an empty container (legit, for accountable progress tracking). + if container.updates.len() == 0 { return; } + + // Our main goal is to only ship links that are 1-2 x LINK_TARGET, using blobs + // to accumulate updates until they are ready to go or we are asked to finish. + // + // Informally, we are aiming to move `container` into or around `self.blobs`. + // Into if small enough, as we can further consolidate, but if not we need to + // consolidate and then either ship (if large) or hold (if small) the results. + + let updates = std::mem::take(&mut container.updates); + let consolidated = container.consolidated; + let len = updates.len(); + + // The input may be ready to ship on its own. + // This is ideal, if we've used an accumulating container builder elsewhere. + if consolidated && len >= crate::columnar::LINK_TARGET { self.ready.push_back(updates); } + // Can move into blobs if the combined length is not too large. + else if self.blob_records + len < 2 * crate::columnar::LINK_TARGET { self.absorb(updates, consolidated); } + // Otherwise, we'll need to manage `self.blobs`. + else { + // Together `updates` and `self.blobs` exceed 2 * LINK_TARGET. + // At least one, perhaps both of them, are LINK_TARGET in size. + // We'll consolidate any that are, and ship or merge the results. + // We'll end up with at most LINK_TARGET in `self.blobs`, retiring + // a constant factor of the pending work we started with. + + // Consolidate and move to ready if large; stash otherwise. + let input_residual = if len >= crate::columnar::LINK_TARGET { + let cons = if consolidated { updates } else { updates.consolidate() }; + if cons.len() >= crate::columnar::LINK_TARGET { self.ready.push_back(cons); None } + else if cons.len() > 0 { Some((cons, true)) } + else { None } + } + else { Some((updates, consolidated)) }; + + // Consolidate and move to ready if large; stash otherwise. + let blobs_residual = if self.blob_records >= crate::columnar::LINK_TARGET { + let cons = self.consolidate_blobs(); + if cons.len() >= crate::columnar::LINK_TARGET { self.ready.push_back(cons); None } + else if cons.len() > 0 { Some((cons, true)) } + else { None } + } + else { None }; + + // Return un-shipped + if let Some((r, c)) = input_residual { self.absorb(r, c); } + if let Some((r, c)) = blobs_residual { self.absorb(r, c); } + } } } impl timely::container::ContainerBuilder for TrieChunker { type Container = Updates; fn extract(&mut self) -> Option<&mut Self::Container> { - if let Some(ready) = self.ready.pop_front() { - self.empty = Some(ready); - self.empty.as_mut() - } else { - None - } + self.stage = self.ready.pop_front(); + self.stage.as_mut() } fn finish(&mut self) -> Option<&mut Self::Container> { - self.empty = self.ready.pop_front(); - self.empty.as_mut() + // Drain whatever's left in blobs as a single (possibly small) final chunk. + if !self.blobs.is_empty() { + let cons = self.consolidate_blobs(); + if cons.len() > 0 { self.ready.push_back(cons); } + } + self.extract() } } diff --git a/differential-dataflow/src/columnar/exchange.rs b/differential-dataflow/src/columnar/exchange.rs index 8693c22d6..01dc3bb08 100644 --- a/differential-dataflow/src/columnar/exchange.rs +++ b/differential-dataflow/src/columnar/exchange.rs @@ -39,10 +39,23 @@ impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor pre { From 3c7f594314384e1db2ccdadb974fe9510bd02da0 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 29 Apr 2026 14:06:16 -0400 Subject: [PATCH 5/5] De-penalize col in spiners.rs --- differential-dataflow/examples/spines.rs | 134 ++++++++++++++--------- 1 file changed, 85 insertions(+), 49 deletions(-) diff --git a/differential-dataflow/examples/spines.rs b/differential-dataflow/examples/spines.rs index 630376ec7..c8df1db10 100644 --- a/differential-dataflow/examples/spines.rs +++ b/differential-dataflow/examples/spines.rs @@ -1,12 +1,71 @@ +//! Quick spines benchmark over `val`, `key`, and `col` arrangements. +//! +//! `col` mode feeds the columnar batcher directly via `InputHandle` + +//! `ValColBuilder`, formatting integers into a reusable `String` buffer rather +//! than allocating per record. This way `col`'s measurements aren't polluted +//! by `String` allocation overhead, since the row-based `val`/`key` paths +//! genuinely need owned strings. + +use std::fmt::Write as _; + use timely::dataflow::operators::probe::Handle; +use timely::dataflow::InputHandle; -use differential_dataflow::input::Input; +use differential_dataflow::input::{Input, InputSession}; use mimalloc::MiMalloc; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; +/// What each arm needs to feed the dataflow round-by-round. +trait Workload { + fn insert_data(&mut self, val: usize); + fn insert_keys(&mut self, val: usize); + fn advance_to(&mut self, t: u64); +} + +/// Row-based mode (`val`, `key`): each insert allocates a `String`. +struct RowWorkload { + data_input: InputSession, + keys_input: InputSession, +} +impl Workload for RowWorkload { + fn insert_data(&mut self, val: usize) { self.data_input.insert(format!("{:?}", val)); } + fn insert_keys(&mut self, val: usize) { self.keys_input.insert(format!("{:?}", val)); } + fn advance_to(&mut self, t: u64) { + self.data_input.advance_to(t); self.data_input.flush(); + self.keys_input.advance_to(t); self.keys_input.flush(); + } +} + +/// Columnar mode: format into a reusable buffer, push refs into the col builder. +type DataU = (String, (), u64, i64); +type ColBuilder = differential_dataflow::columnar::ValColBuilder; +struct ColWorkload { + data_input: InputHandle, + keys_input: InputHandle, + buf: String, +} +impl Workload for ColWorkload { + fn insert_data(&mut self, val: usize) { + self.buf.clear(); + write!(&mut self.buf, "{:?}", val).unwrap(); + let t = *self.data_input.time(); + self.data_input.send((self.buf.as_str(), (), t, 1i64)); + } + fn insert_keys(&mut self, val: usize) { + self.buf.clear(); + write!(&mut self.buf, "{:?}", val).unwrap(); + let t = *self.keys_input.time(); + self.keys_input.send((self.buf.as_str(), (), t, 1i64)); + } + fn advance_to(&mut self, t: u64) { + self.data_input.advance_to(t); self.data_input.flush(); + self.keys_input.advance_to(t); self.keys_input.flush(); + } +} + fn main() { let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap(); @@ -19,41 +78,39 @@ fn main() { let timer1 = ::std::time::Instant::now(); let timer2 = timer1.clone(); - // define a new computational scope, in which to run BFS timely::execute_from_args(std::env::args(), move |worker| { - // define BFS dataflow; return handles to roots and edges inputs let mut probe = Handle::new(); - let (mut data_input, mut keys_input) = worker.dataflow(|scope| { + let mut workload: Box = worker.dataflow(|scope| { - use differential_dataflow::operators::{arrange::Arrange}; - - let (data_input, data) = scope.new_collection::(); - let (keys_input, keys) = scope.new_collection::(); + use differential_dataflow::operators::arrange::Arrange; match mode.as_str() { "key" => { use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine}; + let (data_input, data) = scope.new_collection::(); + let (keys_input, keys) = scope.new_collection::(); let data = data.arrange::, RcOrdKeyBuilder, OrdKeySpine>(); let keys = keys.arrange::, RcOrdKeyBuilder, OrdKeySpine>(); keys.join_core(data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); + Box::new(RowWorkload { data_input, keys_input }) as Box }, "val" => { use differential_dataflow::trace::implementations::ord_neu::{OrdValBatcher, RcOrdValBuilder, OrdValSpine}; + let (data_input, data) = scope.new_collection::(); + let (keys_input, keys) = scope.new_collection::(); let data = data.map(|x| (x, ())).arrange::, RcOrdValBuilder, OrdValSpine>(); let keys = keys.map(|x| (x, ())).arrange::, RcOrdValBuilder, OrdValSpine>(); keys.join_core(data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); + Box::new(RowWorkload { data_input, keys_input }) }, "col" => { - use timely::dataflow::operators::generic::Operator; - use timely::dataflow::channels::pact::Pipeline; - use differential_dataflow::columnar::{ValBatcher, ValBuilder, ValColBuilder, ValPact, ValSpine}; + use timely::dataflow::operators::Input as _; + use differential_dataflow::columnar::{ValBatcher, ValBuilder, ValPact, ValSpine}; use differential_dataflow::operators::arrange::arrangement::arrange_core; - type DataU = (String, (), u64, i64); - fn string_hash(s: columnar::Ref<'_, String>) -> u64 { use std::hash::{Hash, Hasher}; let mut h = std::collections::hash_map::DefaultHasher::new(); @@ -61,27 +118,11 @@ fn main() { h.finish() } - // Convert a Vec-container stream of `((K, ()), Time, Diff)` into - // a `RecordedUpdates`-container stream feeding the columnar batcher. - fn to_recorded<'scope>( - stream: timely::dataflow::Stream<'scope, u64, Vec<((String, ()), u64, isize)>>, - ) -> timely::dataflow::Stream<'scope, u64, differential_dataflow::columnar::RecordedUpdates> { - stream.unary::, _, _, _>(Pipeline, "ToRecorded", |_, _| { - move |input, output| { - input.for_each_time(|cap, batches| { - let mut session = output.session_with_builder(&cap); - for batch in batches { - for ((k, _), t, d) in batch.drain(..) { - session.give((k.as_str(), (), t, d as i64)); - } - } - }); - } - }) - } + let mut data_input = >::new_with_builder(); + let mut keys_input = >::new_with_builder(); - let data_stream = to_recorded(data.map(|x| (x, ())).inner); - let keys_stream = to_recorded(keys.map(|x| (x, ())).inner); + let data_stream = scope.input_from(&mut data_input); + let keys_stream = scope.input_from(&mut keys_input); let data = arrange_core::<_, ValBatcher, ValBuilder, ValSpine>( data_stream, ValPact { hashfunc: |k: columnar::Ref<'_, String>| string_hash(k) }, "DataArrange", @@ -91,32 +132,31 @@ fn main() { ); keys.join_core(data, |_k, (), ()| Option::<()>::None) .probe_with(&mut probe); + + Box::new(ColWorkload { data_input, keys_input, buf: String::new() }) }, _ => { panic!("unrecognized mode: {:?}", mode); } } - - (data_input, keys_input) }); // Load up data in batches. let mut counter = 0; + let mut t: u64 = 1; while counter < 10 * keys { let mut i = worker.index(); while i < size { let val = (counter + i) % keys; - data_input.insert(format!("{:?}", val)); + workload.insert_data(val); i += worker.peers(); } counter += size; - data_input.advance_to(data_input.time() + 1); - data_input.flush(); - keys_input.advance_to(keys_input.time() + 1); - keys_input.flush(); - while probe.less_than(data_input.time()) { + workload.advance_to(t); + while probe.less_than(&t) { worker.step(); } + t += 1; } println!("{:?}\tloading complete", timer1.elapsed()); @@ -126,23 +166,19 @@ fn main() { let mut i = worker.index(); while i < size { let val = (queries + i) % keys; - keys_input.insert(format!("{:?}", val)); + workload.insert_keys(val); i += worker.peers(); } queries += size; - data_input.advance_to(data_input.time() + 1); - data_input.flush(); - keys_input.advance_to(keys_input.time() + 1); - keys_input.flush(); - while probe.less_than(data_input.time()) { + workload.advance_to(t); + while probe.less_than(&t) { worker.step(); } + t += 1; } println!("{:?}\tqueries complete", timer1.elapsed()); - // loop { } - }).unwrap(); println!("{:?}\tshut down", timer2.elapsed());