diff --git a/differential-dataflow/examples/spines.rs b/differential-dataflow/examples/spines.rs new file mode 100644 index 000000000..c8df1db10 --- /dev/null +++ b/differential-dataflow/examples/spines.rs @@ -0,0 +1,186 @@ +//! Quick spines benchmark over `val`, `key`, and `col` arrangements. +//! +//! `col` mode feeds the columnar batcher directly via `InputHandle` + +//! `ValColBuilder`, formatting integers into a reusable `String` buffer rather +//! than allocating per record. This way `col`'s measurements aren't polluted +//! by `String` allocation overhead, since the row-based `val`/`key` paths +//! genuinely need owned strings. + +use std::fmt::Write as _; + +use timely::dataflow::operators::probe::Handle; +use timely::dataflow::InputHandle; + +use differential_dataflow::input::{Input, InputSession}; + +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + +/// What each arm needs to feed the dataflow round-by-round. +trait Workload { + fn insert_data(&mut self, val: usize); + fn insert_keys(&mut self, val: usize); + fn advance_to(&mut self, t: u64); +} + +/// Row-based mode (`val`, `key`): each insert allocates a `String`. +struct RowWorkload { + data_input: InputSession, + keys_input: InputSession, +} +impl Workload for RowWorkload { + fn insert_data(&mut self, val: usize) { self.data_input.insert(format!("{:?}", val)); } + fn insert_keys(&mut self, val: usize) { self.keys_input.insert(format!("{:?}", val)); } + fn advance_to(&mut self, t: u64) { + self.data_input.advance_to(t); self.data_input.flush(); + self.keys_input.advance_to(t); self.keys_input.flush(); + } +} + +/// Columnar mode: format into a reusable buffer, push refs into the col builder. +type DataU = (String, (), u64, i64); +type ColBuilder = differential_dataflow::columnar::ValColBuilder; +struct ColWorkload { + data_input: InputHandle, + keys_input: InputHandle, + buf: String, +} +impl Workload for ColWorkload { + fn insert_data(&mut self, val: usize) { + self.buf.clear(); + write!(&mut self.buf, "{:?}", val).unwrap(); + let t = *self.data_input.time(); + self.data_input.send((self.buf.as_str(), (), t, 1i64)); + } + fn insert_keys(&mut self, val: usize) { + self.buf.clear(); + write!(&mut self.buf, "{:?}", val).unwrap(); + let t = *self.keys_input.time(); + self.keys_input.send((self.buf.as_str(), (), t, 1i64)); + } + fn advance_to(&mut self, t: u64) { + self.data_input.advance_to(t); self.data_input.flush(); + self.keys_input.advance_to(t); self.keys_input.flush(); + } +} + +fn main() { + + let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap(); + let size: usize = std::env::args().nth(2).unwrap().parse().unwrap(); + + let mode: String = std::env::args().nth(3).unwrap(); + + println!("Running [{:?}] arrangement", mode); + + let timer1 = ::std::time::Instant::now(); + let timer2 = timer1.clone(); + + timely::execute_from_args(std::env::args(), move |worker| { + + let mut probe = Handle::new(); + let mut workload: Box = worker.dataflow(|scope| { + + use differential_dataflow::operators::arrange::Arrange; + + match mode.as_str() { + "key" => { + use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine}; + let (data_input, data) = scope.new_collection::(); + let (keys_input, keys) = scope.new_collection::(); + let data = data.arrange::, RcOrdKeyBuilder, OrdKeySpine>(); + let keys = keys.arrange::, RcOrdKeyBuilder, OrdKeySpine>(); + keys.join_core(data, |_k, &(), &()| Option::<()>::None) + .probe_with(&mut probe); + Box::new(RowWorkload { data_input, keys_input }) as Box + }, + "val" => { + use differential_dataflow::trace::implementations::ord_neu::{OrdValBatcher, RcOrdValBuilder, OrdValSpine}; + let (data_input, data) = scope.new_collection::(); + let (keys_input, keys) = scope.new_collection::(); + let data = data.map(|x| (x, ())).arrange::, RcOrdValBuilder, OrdValSpine>(); + let keys = keys.map(|x| (x, ())).arrange::, RcOrdValBuilder, OrdValSpine>(); + keys.join_core(data, |_k, &(), &()| Option::<()>::None) + .probe_with(&mut probe); + Box::new(RowWorkload { data_input, keys_input }) + }, + "col" => { + use timely::dataflow::operators::Input as _; + use differential_dataflow::columnar::{ValBatcher, ValBuilder, ValPact, ValSpine}; + use differential_dataflow::operators::arrange::arrangement::arrange_core; + + fn string_hash(s: columnar::Ref<'_, String>) -> u64 { + use std::hash::{Hash, Hasher}; + let mut h = std::collections::hash_map::DefaultHasher::new(); + s.hash(&mut h); + h.finish() + } + + let mut data_input = >::new_with_builder(); + let mut keys_input = >::new_with_builder(); + + let data_stream = scope.input_from(&mut data_input); + let keys_stream = scope.input_from(&mut keys_input); + + let data = arrange_core::<_, ValBatcher, ValBuilder, ValSpine>( + data_stream, ValPact { hashfunc: |k: columnar::Ref<'_, String>| string_hash(k) }, "DataArrange", + ); + let keys = arrange_core::<_, ValBatcher, ValBuilder, ValSpine>( + keys_stream, ValPact { hashfunc: |k: columnar::Ref<'_, String>| string_hash(k) }, "KeysArrange", + ); + keys.join_core(data, |_k, (), ()| Option::<()>::None) + .probe_with(&mut probe); + + Box::new(ColWorkload { data_input, keys_input, buf: String::new() }) + }, + _ => { + panic!("unrecognized mode: {:?}", mode); + } + } + }); + + // Load up data in batches. + let mut counter = 0; + let mut t: u64 = 1; + while counter < 10 * keys { + let mut i = worker.index(); + while i < size { + let val = (counter + i) % keys; + workload.insert_data(val); + i += worker.peers(); + } + counter += size; + workload.advance_to(t); + while probe.less_than(&t) { + worker.step(); + } + t += 1; + } + println!("{:?}\tloading complete", timer1.elapsed()); + + let mut queries = 0; + + while queries < 10 * keys { + let mut i = worker.index(); + while i < size { + let val = (queries + i) % keys; + workload.insert_keys(val); + i += worker.peers(); + } + queries += size; + workload.advance_to(t); + while probe.less_than(&t) { + worker.step(); + } + t += 1; + } + + println!("{:?}\tqueries complete", timer1.elapsed()); + + }).unwrap(); + + println!("{:?}\tshut down", timer2.elapsed()); + +} diff --git a/differential-dataflow/src/columnar/arrangement/mod.rs b/differential-dataflow/src/columnar/arrangement/mod.rs index fe8efa246..82bc1598c 100644 --- a/differential-dataflow/src/columnar/arrangement/mod.rs +++ b/differential-dataflow/src/columnar/arrangement/mod.rs @@ -128,44 +128,127 @@ use crate::trace::implementations::merge_batcher::MergeBatcher; type ValBatcher2 = MergeBatcher, TrieChunker, trie_merger::TrieMerger>; /// A chunker that unwraps `RecordedUpdates` into bare `Updates` for the merge batcher. -/// The `records` accounting is discarded here — it has served its purpose for exchange. /// -/// IMPORTANT: This chunker assumes the input `Updates` are sorted and consolidated -/// (as produced by `ValColBuilder::form`). The downstream `InternalMerge` relies on -/// this invariant. If `RecordedUpdates` could carry unsorted data (e.g. from a `map`), -/// we would need either a sorting chunker for that case, or a type-level distinction -/// (e.g. `RecordedUpdates` vs `RecordedUpdates`) to -/// route to the right chunker. +/// The intended behavior is to produce chunks whose size is within 1-2x `LINK_TARGET`. +/// It ships large batches immediately, accumulates small batches, consolidates as they +/// exceed 2xLINK_TARGET, and ships them unless they drop below 1xLINK_TARGET. +/// +/// The flow is into (or around) `self.stage`, then consolidated blocks into `self.ready`, +/// each of which is put in `self.stage` pub struct TrieChunker { + /// Insufficiently large updates we haven't figured out how to ship yet. + blobs: Vec<(Updates, bool)>, + /// Sum of `len()` across `blobs`. + blob_records: usize, + /// Ready-to-emit chunks. Each is sorted and consolidated; size ≥ `LINK_TARGET` + /// (or smaller, only for the final chunk produced by `finish`). ready: std::collections::VecDeque>, - empty: Option>, + /// Staging area for the next pull call. + stage: Option>, } impl Default for TrieChunker { - fn default() -> Self { Self { ready: Default::default(), empty: None } } + fn default() -> Self { + Self { + blobs: Default::default(), + blob_records: 0, + ready: Default::default(), + stage: None, + } + } +} + +impl TrieChunker { + /// Consolidate and empty `self.blobs`, into `self.ready` if large enough or else return. + fn consolidate_blobs(&mut self) -> Updates { + // Single consolidated entry: pass through, no work. + if self.blobs.len() == 1 && self.blobs[0].1 { + let (result, _) = self.blobs.pop().unwrap(); + self.blob_records = 0; + return result; + } + + // TODO: Improve consolidation through column-oriented sorts. + let result = Updates::::form_unsorted(self.blobs.iter().flat_map(|(u, _)| u.iter())); + self.blobs.clear(); + self.blob_records = 0; + result + } + + /// Push a non-empty `Updates` into blobs and update accounting. + fn absorb(&mut self, updates: Updates, consolidated: bool) { + self.blob_records += updates.len(); + self.blobs.push((updates, consolidated)); + } } impl<'a, U: super::layout::ColumnarUpdate> timely::container::PushInto<&'a mut RecordedUpdates> for TrieChunker { fn push_into(&mut self, container: &'a mut RecordedUpdates) { - let mut updates = std::mem::take(&mut container.updates); - if !container.consolidated { updates = updates.consolidate(); } - if updates.len() > 0 { self.ready.push_back(updates); } + // Early return if an empty container (legit, for accountable progress tracking). + if container.updates.len() == 0 { return; } + + // Our main goal is to only ship links that are 1-2 x LINK_TARGET, using blobs + // to accumulate updates until they are ready to go or we are asked to finish. + // + // Informally, we are aiming to move `container` into or around `self.blobs`. + // Into if small enough, as we can further consolidate, but if not we need to + // consolidate and then either ship (if large) or hold (if small) the results. + + let updates = std::mem::take(&mut container.updates); + let consolidated = container.consolidated; + let len = updates.len(); + + // The input may be ready to ship on its own. + // This is ideal, if we've used an accumulating container builder elsewhere. + if consolidated && len >= crate::columnar::LINK_TARGET { self.ready.push_back(updates); } + // Can move into blobs if the combined length is not too large. + else if self.blob_records + len < 2 * crate::columnar::LINK_TARGET { self.absorb(updates, consolidated); } + // Otherwise, we'll need to manage `self.blobs`. + else { + // Together `updates` and `self.blobs` exceed 2 * LINK_TARGET. + // At least one, perhaps both of them, are LINK_TARGET in size. + // We'll consolidate any that are, and ship or merge the results. + // We'll end up with at most LINK_TARGET in `self.blobs`, retiring + // a constant factor of the pending work we started with. + + // Consolidate and move to ready if large; stash otherwise. + let input_residual = if len >= crate::columnar::LINK_TARGET { + let cons = if consolidated { updates } else { updates.consolidate() }; + if cons.len() >= crate::columnar::LINK_TARGET { self.ready.push_back(cons); None } + else if cons.len() > 0 { Some((cons, true)) } + else { None } + } + else { Some((updates, consolidated)) }; + + // Consolidate and move to ready if large; stash otherwise. + let blobs_residual = if self.blob_records >= crate::columnar::LINK_TARGET { + let cons = self.consolidate_blobs(); + if cons.len() >= crate::columnar::LINK_TARGET { self.ready.push_back(cons); None } + else if cons.len() > 0 { Some((cons, true)) } + else { None } + } + else { None }; + + // Return un-shipped + if let Some((r, c)) = input_residual { self.absorb(r, c); } + if let Some((r, c)) = blobs_residual { self.absorb(r, c); } + } } } impl timely::container::ContainerBuilder for TrieChunker { type Container = Updates; fn extract(&mut self) -> Option<&mut Self::Container> { - if let Some(ready) = self.ready.pop_front() { - self.empty = Some(ready); - self.empty.as_mut() - } else { - None - } + self.stage = self.ready.pop_front(); + self.stage.as_mut() } fn finish(&mut self) -> Option<&mut Self::Container> { - self.empty = self.ready.pop_front(); - self.empty.as_mut() + // Drain whatever's left in blobs as a single (possibly small) final chunk. + if !self.blobs.is_empty() { + let cons = self.consolidate_blobs(); + if cons.len() > 0 { self.ready.push_back(cons); } + } + self.extract() } } @@ -180,7 +263,7 @@ pub mod batcher { use super::super::updates::Updates; impl timely::container::SizableContainer for Updates { - fn at_capacity(&self) -> bool { self.diffs.values.len() >= 64 * 1024 } + fn at_capacity(&self) -> bool { self.diffs.values.len() >= crate::columnar::LINK_TARGET } fn ensure_capacity(&mut self, _stash: &mut Option) { } } diff --git a/differential-dataflow/src/columnar/arrangement/trie_merger.rs b/differential-dataflow/src/columnar/arrangement/trie_merger.rs index 964685196..8994a9ced 100644 --- a/differential-dataflow/src/columnar/arrangement/trie_merger.rs +++ b/differential-dataflow/src/columnar/arrangement/trie_merger.rs @@ -62,7 +62,7 @@ fn form_chunks<'a, U: Update>( ) { let mut sorted = sorted.peekable(); while sorted.peek().is_some() { - let chunk = Updates::::form((&mut sorted).take(64 * 1024)); + let chunk = Updates::::form((&mut sorted).take(crate::columnar::LINK_TARGET)); if chunk.len() > 0 { output.push(chunk); } @@ -152,28 +152,6 @@ where }); } } - - - // // Flatten the sorted, consolidated chain into refs. - // let all = merged.iter().flat_map(|chunk| chunk.iter()); - - // // Partition into two sorted streams by time. - // let mut time_owned = U::Time::default(); - // let mut keep_vec = Vec::new(); - // let mut ship_vec = Vec::new(); - // for (k, v, t, d) in all { - // Columnar::copy_from(&mut time_owned, t); - // if upper.less_equal(&time_owned) { - // frontier.insert_ref(&time_owned); - // keep_vec.push((k, v, t, d)); - // } else { - // ship_vec.push((k, v, t, d)); - // } - // } - - // // Build chunks via form (which consolidates internally). - // form_chunks::(keep_vec.into_iter(), kept); - // form_chunks::(ship_vec.into_iter(), ship); } fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { @@ -306,6 +284,8 @@ where builder: &mut ChainBuilder, stash: &mut Vec>, ) { + // TODO: Optimization for one batch exceeding the other. + let ((k0_idx, v0_idx, t0_idx), updates0) = batch1.take().unwrap(); let ((k1_idx, v1_idx, t1_idx), updates1) = batch2.take().unwrap(); @@ -669,7 +649,7 @@ impl ChainBuilder { link = link.filter_zero(); if link.len() > 0 { if let Some(last) = self.updates.last_mut() { - if last.len() + link.len() < 2 * 64 * 1024 { + if last.len() + link.len() < 2 * crate::columnar::LINK_TARGET { let mut build = super::super::updates::UpdatesBuilder::new_from(std::mem::take(last)); build.meld(&link); *last = build.done(); diff --git a/differential-dataflow/src/columnar/builder.rs b/differential-dataflow/src/columnar/builder.rs index 0612e7880..22bf48e44 100644 --- a/differential-dataflow/src/columnar/builder.rs +++ b/differential-dataflow/src/columnar/builder.rs @@ -28,7 +28,7 @@ impl PushInto for ValBuilder where TupleContainer : Push< #[inline] fn push_into(&mut self, item: T) { self.current.push(item); - if self.current.len() > 1024 * 1024 { + if self.current.len() > crate::columnar::LINK_TARGET { use columnar::{Borrow, Index}; let records = self.current.len(); let mut refs = self.current.borrow().into_index_iter().collect::>(); diff --git a/differential-dataflow/src/columnar/exchange.rs b/differential-dataflow/src/columnar/exchange.rs index 8693c22d6..01dc3bb08 100644 --- a/differential-dataflow/src/columnar/exchange.rs +++ b/differential-dataflow/src/columnar/exchange.rs @@ -39,10 +39,23 @@ impl FnMut(columnar::Ref<'a, U::Key>)->u64> Distributor pre { diff --git a/differential-dataflow/src/columnar/mod.rs b/differential-dataflow/src/columnar/mod.rs index 887774bf8..b32a4763c 100644 --- a/differential-dataflow/src/columnar/mod.rs +++ b/differential-dataflow/src/columnar/mod.rs @@ -41,6 +41,9 @@ pub use builder::ValBuilder as ValColBuilder; pub use exchange::ValPact; pub use arrangement::{ValBatcher, ValBuilder, ValSpine}; +/// Target size for update batches, in number of updates. +pub const LINK_TARGET: usize = 64 * 1024; + /// A thin wrapper around `Updates` that tracks the pre-consolidation record count /// for timely's exchange accounting. This wrapper is the stream container type; /// the `TrieChunker` strips it, passing bare `Updates` into the merge batcher. diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index f76ab12f3..15829b605 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -370,8 +370,8 @@ where Ordering::Greater => batch.seek_key(batch_storage, trace_key), Ordering::Equal => { - thinker.history1.edits.load(trace, trace_storage, trace_key, Some(meet)); - thinker.history2.edits.load(batch, batch_storage, batch_key, None); + thinker.history1.edits.load(trace, trace_storage, Some(meet)); + thinker.history2.edits.load(batch, batch_storage, None); // populate `temp` with the results in the best way we know how. thinker.think(|v1,v2,t,r1,r2| { diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index 91a9896bf..4260c2b95 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -24,7 +24,7 @@ pub struct EditList { edits: Vec<(T, D)>, } -impl EditList { +impl EditList { /// Creates an empty list of edits. #[inline] fn new() -> Self { @@ -33,12 +33,26 @@ impl EditList { edits: Vec::new(), } } - /// Loads the contents of a cursor at `key`, advancing times by `meet` if supplied. - fn load<'a, C>(&mut self, cursor: &mut C, storage: &'a C::Storage, key: C::Key<'a>, meet: Option<&T>) + /// Walks the cursor's vals at the current key into `self`, advancing times by `meet` if supplied. + /// + /// The cursor is assumed to be positioned at a key already; callers that need + /// to seek should use [`Cursor::populate_key`] (or [`ValueHistory::replay_key`]) + /// instead. This split avoids a redundant seek in the merge-join inner loop, + /// where the cursor is positioned by the upstream merge step. + fn load<'a, C>(&mut self, cursor: &mut C, storage: &'a C::Storage, meet: Option<&T>) where C: Cursor = V, Time = T, Diff = D>, { - cursor.populate_key(storage, key, meet, self); + self.clear(); + while let Some(val) = cursor.get_val(storage) { + cursor.map_times(storage, |time, diff| { + let mut t = C::owned_time(time); + if let Some(m) = meet { t.join_assign(m); } + self.push(t, C::owned_diff(diff)); + }); + self.seal(val); + cursor.step_val(storage); + } } /// Clears the list of edits. #[inline]