Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 186 additions & 0 deletions differential-dataflow/examples/spines.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
//! Quick spines benchmark over `val`, `key`, and `col` arrangements.
//!
//! `col` mode feeds the columnar batcher directly via `InputHandle` +
//! `ValColBuilder`, formatting integers into a reusable `String` buffer rather
//! than allocating per record. This way `col`'s measurements aren't polluted
//! by `String` allocation overhead, since the row-based `val`/`key` paths
//! genuinely need owned strings.

use std::fmt::Write as _;

use timely::dataflow::operators::probe::Handle;
use timely::dataflow::InputHandle;

use differential_dataflow::input::{Input, InputSession};

use mimalloc::MiMalloc;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

/// What each arm needs to feed the dataflow round-by-round.
trait Workload {
fn insert_data(&mut self, val: usize);
fn insert_keys(&mut self, val: usize);
fn advance_to(&mut self, t: u64);
}

/// Row-based mode (`val`, `key`): each insert allocates a `String`.
struct RowWorkload {
data_input: InputSession<u64, String, isize>,
keys_input: InputSession<u64, String, isize>,
}
impl Workload for RowWorkload {
fn insert_data(&mut self, val: usize) { self.data_input.insert(format!("{:?}", val)); }
fn insert_keys(&mut self, val: usize) { self.keys_input.insert(format!("{:?}", val)); }
fn advance_to(&mut self, t: u64) {
self.data_input.advance_to(t); self.data_input.flush();
self.keys_input.advance_to(t); self.keys_input.flush();
}
}

/// Columnar mode: format into a reusable buffer, push refs into the col builder.
type DataU = (String, (), u64, i64);
type ColBuilder = differential_dataflow::columnar::ValColBuilder<DataU>;
struct ColWorkload {
data_input: InputHandle<u64, ColBuilder>,
keys_input: InputHandle<u64, ColBuilder>,
buf: String,
}
impl Workload for ColWorkload {
fn insert_data(&mut self, val: usize) {
self.buf.clear();
write!(&mut self.buf, "{:?}", val).unwrap();
let t = *self.data_input.time();
self.data_input.send((self.buf.as_str(), (), t, 1i64));
}
fn insert_keys(&mut self, val: usize) {
self.buf.clear();
write!(&mut self.buf, "{:?}", val).unwrap();
let t = *self.keys_input.time();
self.keys_input.send((self.buf.as_str(), (), t, 1i64));
}
fn advance_to(&mut self, t: u64) {
self.data_input.advance_to(t); self.data_input.flush();
self.keys_input.advance_to(t); self.keys_input.flush();
}
}

fn main() {

let keys: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let size: usize = std::env::args().nth(2).unwrap().parse().unwrap();

let mode: String = std::env::args().nth(3).unwrap();

println!("Running [{:?}] arrangement", mode);

let timer1 = ::std::time::Instant::now();
let timer2 = timer1.clone();

timely::execute_from_args(std::env::args(), move |worker| {

let mut probe = Handle::new();
let mut workload: Box<dyn Workload> = worker.dataflow(|scope| {

use differential_dataflow::operators::arrange::Arrange;

match mode.as_str() {
"key" => {
use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine};
let (data_input, data) = scope.new_collection::<String, isize>();
let (keys_input, keys) = scope.new_collection::<String, isize>();
let data = data.arrange::<OrdKeyBatcher<String,_,isize>, RcOrdKeyBuilder<String,_,isize>, OrdKeySpine<String,_,isize>>();
let keys = keys.arrange::<OrdKeyBatcher<String,_,isize>, RcOrdKeyBuilder<String,_,isize>, OrdKeySpine<String,_,isize>>();
keys.join_core(data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
Box::new(RowWorkload { data_input, keys_input }) as Box<dyn Workload>
},
"val" => {
use differential_dataflow::trace::implementations::ord_neu::{OrdValBatcher, RcOrdValBuilder, OrdValSpine};
let (data_input, data) = scope.new_collection::<String, isize>();
let (keys_input, keys) = scope.new_collection::<String, isize>();
let data = data.map(|x| (x, ())).arrange::<OrdValBatcher<String,(),_,isize>, RcOrdValBuilder<String,(),_,isize>, OrdValSpine<String,(),_,isize>>();
let keys = keys.map(|x| (x, ())).arrange::<OrdValBatcher<String,(),_,isize>, RcOrdValBuilder<String,(),_,isize>, OrdValSpine<String,(),_,isize>>();
keys.join_core(data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
Box::new(RowWorkload { data_input, keys_input })
},
"col" => {
use timely::dataflow::operators::Input as _;
use differential_dataflow::columnar::{ValBatcher, ValBuilder, ValPact, ValSpine};
use differential_dataflow::operators::arrange::arrangement::arrange_core;

fn string_hash(s: columnar::Ref<'_, String>) -> u64 {
use std::hash::{Hash, Hasher};
let mut h = std::collections::hash_map::DefaultHasher::new();
s.hash(&mut h);
h.finish()
}

let mut data_input = <InputHandle<u64, ColBuilder>>::new_with_builder();
let mut keys_input = <InputHandle<u64, ColBuilder>>::new_with_builder();

let data_stream = scope.input_from(&mut data_input);
let keys_stream = scope.input_from(&mut keys_input);

let data = arrange_core::<_, ValBatcher<String,(),u64,i64>, ValBuilder<String,(),u64,i64>, ValSpine<String,(),u64,i64>>(
data_stream, ValPact { hashfunc: |k: columnar::Ref<'_, String>| string_hash(k) }, "DataArrange",
);
let keys = arrange_core::<_, ValBatcher<String,(),u64,i64>, ValBuilder<String,(),u64,i64>, ValSpine<String,(),u64,i64>>(
keys_stream, ValPact { hashfunc: |k: columnar::Ref<'_, String>| string_hash(k) }, "KeysArrange",
);
keys.join_core(data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);

Box::new(ColWorkload { data_input, keys_input, buf: String::new() })
},
_ => {
panic!("unrecognized mode: {:?}", mode);
}
}
});

// Load up data in batches.
let mut counter = 0;
let mut t: u64 = 1;
while counter < 10 * keys {
let mut i = worker.index();
while i < size {
let val = (counter + i) % keys;
workload.insert_data(val);
i += worker.peers();
}
counter += size;
workload.advance_to(t);
while probe.less_than(&t) {
worker.step();
}
t += 1;
}
println!("{:?}\tloading complete", timer1.elapsed());

let mut queries = 0;

while queries < 10 * keys {
let mut i = worker.index();
while i < size {
let val = (queries + i) % keys;
workload.insert_keys(val);
i += worker.peers();
}
queries += size;
workload.advance_to(t);
while probe.less_than(&t) {
worker.step();
}
t += 1;
}

println!("{:?}\tqueries complete", timer1.elapsed());

}).unwrap();

println!("{:?}\tshut down", timer2.elapsed());

}
125 changes: 104 additions & 21 deletions differential-dataflow/src/columnar/arrangement/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,44 +128,127 @@ use crate::trace::implementations::merge_batcher::MergeBatcher;
type ValBatcher2<U> = MergeBatcher<RecordedUpdates<U>, TrieChunker<U>, trie_merger::TrieMerger<U>>;

/// A chunker that unwraps `RecordedUpdates` into bare `Updates` for the merge batcher.
/// The `records` accounting is discarded here — it has served its purpose for exchange.
///
/// IMPORTANT: This chunker assumes the input `Updates` are sorted and consolidated
/// (as produced by `ValColBuilder::form`). The downstream `InternalMerge` relies on
/// this invariant. If `RecordedUpdates` could carry unsorted data (e.g. from a `map`),
/// we would need either a sorting chunker for that case, or a type-level distinction
/// (e.g. `RecordedUpdates<U, Consolidated>` vs `RecordedUpdates<U, Unsorted>`) to
/// route to the right chunker.
/// The intended behavior is to produce chunks whose size is within 1-2x `LINK_TARGET`.
/// It ships large batches immediately, accumulates small batches, consolidates as they
/// exceed 2xLINK_TARGET, and ships them unless they drop below 1xLINK_TARGET.
///
/// The flow is into (or around) `self.stage`, then consolidated blocks into `self.ready`,
/// each of which is put in `self.stage`
pub struct TrieChunker<U: super::layout::ColumnarUpdate> {
/// Insufficiently large updates we haven't figured out how to ship yet.
blobs: Vec<(Updates<U>, bool)>,
/// Sum of `len()` across `blobs`.
blob_records: usize,
/// Ready-to-emit chunks. Each is sorted and consolidated; size ≥ `LINK_TARGET`
/// (or smaller, only for the final chunk produced by `finish`).
ready: std::collections::VecDeque<Updates<U>>,
empty: Option<Updates<U>>,
/// Staging area for the next pull call.
stage: Option<Updates<U>>,
}

impl<U: super::layout::ColumnarUpdate> Default for TrieChunker<U> {
fn default() -> Self { Self { ready: Default::default(), empty: None } }
fn default() -> Self {
Self {
blobs: Default::default(),
blob_records: 0,
ready: Default::default(),
stage: None,
}
}
}

impl<U: super::layout::ColumnarUpdate> TrieChunker<U> {
/// Consolidate and empty `self.blobs`, into `self.ready` if large enough or else return.
fn consolidate_blobs(&mut self) -> Updates<U> {
// Single consolidated entry: pass through, no work.
if self.blobs.len() == 1 && self.blobs[0].1 {
let (result, _) = self.blobs.pop().unwrap();
self.blob_records = 0;
return result;
}

// TODO: Improve consolidation through column-oriented sorts.
let result = Updates::<U>::form_unsorted(self.blobs.iter().flat_map(|(u, _)| u.iter()));
self.blobs.clear();
self.blob_records = 0;
result
}

/// Push a non-empty `Updates` into blobs and update accounting.
fn absorb(&mut self, updates: Updates<U>, consolidated: bool) {
self.blob_records += updates.len();
self.blobs.push((updates, consolidated));
}
}

impl<'a, U: super::layout::ColumnarUpdate> timely::container::PushInto<&'a mut RecordedUpdates<U>> for TrieChunker<U> {
fn push_into(&mut self, container: &'a mut RecordedUpdates<U>) {
let mut updates = std::mem::take(&mut container.updates);
if !container.consolidated { updates = updates.consolidate(); }
if updates.len() > 0 { self.ready.push_back(updates); }
// Early return if an empty container (legit, for accountable progress tracking).
if container.updates.len() == 0 { return; }

// Our main goal is to only ship links that are 1-2 x LINK_TARGET, using blobs
// to accumulate updates until they are ready to go or we are asked to finish.
//
// Informally, we are aiming to move `container` into or around `self.blobs`.
// Into if small enough, as we can further consolidate, but if not we need to
// consolidate and then either ship (if large) or hold (if small) the results.

let updates = std::mem::take(&mut container.updates);
let consolidated = container.consolidated;
let len = updates.len();

// The input may be ready to ship on its own.
// This is ideal, if we've used an accumulating container builder elsewhere.
if consolidated && len >= crate::columnar::LINK_TARGET { self.ready.push_back(updates); }
// Can move into blobs if the combined length is not too large.
else if self.blob_records + len < 2 * crate::columnar::LINK_TARGET { self.absorb(updates, consolidated); }
// Otherwise, we'll need to manage `self.blobs`.
else {
// Together `updates` and `self.blobs` exceed 2 * LINK_TARGET.
// At least one, perhaps both of them, are LINK_TARGET in size.
// We'll consolidate any that are, and ship or merge the results.
// We'll end up with at most LINK_TARGET in `self.blobs`, retiring
// a constant factor of the pending work we started with.

// Consolidate and move to ready if large; stash otherwise.
let input_residual = if len >= crate::columnar::LINK_TARGET {
let cons = if consolidated { updates } else { updates.consolidate() };
if cons.len() >= crate::columnar::LINK_TARGET { self.ready.push_back(cons); None }
else if cons.len() > 0 { Some((cons, true)) }
else { None }
}
else { Some((updates, consolidated)) };

// Consolidate and move to ready if large; stash otherwise.
let blobs_residual = if self.blob_records >= crate::columnar::LINK_TARGET {
let cons = self.consolidate_blobs();
if cons.len() >= crate::columnar::LINK_TARGET { self.ready.push_back(cons); None }
else if cons.len() > 0 { Some((cons, true)) }
else { None }
}
else { None };

// Return un-shipped
if let Some((r, c)) = input_residual { self.absorb(r, c); }
if let Some((r, c)) = blobs_residual { self.absorb(r, c); }
}
}
}

impl<U: super::layout::ColumnarUpdate> timely::container::ContainerBuilder for TrieChunker<U> {
type Container = Updates<U>;
fn extract(&mut self) -> Option<&mut Self::Container> {
if let Some(ready) = self.ready.pop_front() {
self.empty = Some(ready);
self.empty.as_mut()
} else {
None
}
self.stage = self.ready.pop_front();
self.stage.as_mut()
}
fn finish(&mut self) -> Option<&mut Self::Container> {
self.empty = self.ready.pop_front();
self.empty.as_mut()
// Drain whatever's left in blobs as a single (possibly small) final chunk.
if !self.blobs.is_empty() {
let cons = self.consolidate_blobs();
if cons.len() > 0 { self.ready.push_back(cons); }
}
self.extract()
}
}

Expand All @@ -180,7 +263,7 @@ pub mod batcher {
use super::super::updates::Updates;

impl<U: Update> timely::container::SizableContainer for Updates<U> {
fn at_capacity(&self) -> bool { self.diffs.values.len() >= 64 * 1024 }
fn at_capacity(&self) -> bool { self.diffs.values.len() >= crate::columnar::LINK_TARGET }
fn ensure_capacity(&mut self, _stash: &mut Option<Self>) { }
}

Expand Down
28 changes: 4 additions & 24 deletions differential-dataflow/src/columnar/arrangement/trie_merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn form_chunks<'a, U: Update>(
) {
let mut sorted = sorted.peekable();
while sorted.peek().is_some() {
let chunk = Updates::<U>::form((&mut sorted).take(64 * 1024));
let chunk = Updates::<U>::form((&mut sorted).take(crate::columnar::LINK_TARGET));
if chunk.len() > 0 {
output.push(chunk);
}
Expand Down Expand Up @@ -152,28 +152,6 @@ where
});
}
}


// // Flatten the sorted, consolidated chain into refs.
// let all = merged.iter().flat_map(|chunk| chunk.iter());

// // Partition into two sorted streams by time.
// let mut time_owned = U::Time::default();
// let mut keep_vec = Vec::new();
// let mut ship_vec = Vec::new();
// for (k, v, t, d) in all {
// Columnar::copy_from(&mut time_owned, t);
// if upper.less_equal(&time_owned) {
// frontier.insert_ref(&time_owned);
// keep_vec.push((k, v, t, d));
// } else {
// ship_vec.push((k, v, t, d));
// }
// }

// // Build chunks via form (which consolidates internally).
// form_chunks::<U>(keep_vec.into_iter(), kept);
// form_chunks::<U>(ship_vec.into_iter(), ship);
}

fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
Expand Down Expand Up @@ -306,6 +284,8 @@ where
builder: &mut ChainBuilder<U>,
stash: &mut Vec<Updates<U>>,
) {
// TODO: Optimization for one batch exceeding the other.

let ((k0_idx, v0_idx, t0_idx), updates0) = batch1.take().unwrap();
let ((k1_idx, v1_idx, t1_idx), updates1) = batch2.take().unwrap();

Expand Down Expand Up @@ -669,7 +649,7 @@ impl<U: super::super::layout::ColumnarUpdate> ChainBuilder<U> {
link = link.filter_zero();
if link.len() > 0 {
if let Some(last) = self.updates.last_mut() {
if last.len() + link.len() < 2 * 64 * 1024 {
if last.len() + link.len() < 2 * crate::columnar::LINK_TARGET {
let mut build = super::super::updates::UpdatesBuilder::new_from(std::mem::take(last));
build.meld(&link);
*last = build.done();
Expand Down
Loading
Loading