Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions differential-dataflow/examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ trait Workload {
struct RowWorkload {
data_input: InputSession<u64, String, isize>,
keys_input: InputSession<u64, String, isize>,
pad: usize,
}
impl Workload for RowWorkload {
fn insert_data(&mut self, val: usize) { self.data_input.insert(format!("{:?}", val)); }
fn insert_keys(&mut self, val: usize) { self.keys_input.insert(format!("{:?}", val)); }
fn insert_data(&mut self, val: usize) { self.data_input.insert(format!("{:0width$}", val, width = self.pad)); }
fn insert_keys(&mut self, val: usize) { self.keys_input.insert(format!("{:0width$}", val, width = self.pad)); }
fn advance_to(&mut self, t: u64) {
self.data_input.advance_to(t); self.data_input.flush();
self.keys_input.advance_to(t); self.keys_input.flush();
Expand All @@ -46,17 +47,18 @@ struct ColWorkload {
data_input: InputHandle<u64, ColBuilder>,
keys_input: InputHandle<u64, ColBuilder>,
buf: String,
pad: usize,
}
impl Workload for ColWorkload {
fn insert_data(&mut self, val: usize) {
self.buf.clear();
write!(&mut self.buf, "{:?}", val).unwrap();
write!(&mut self.buf, "{:0width$}", val, width = self.pad).unwrap();
let t = *self.data_input.time();
self.data_input.send((self.buf.as_str(), (), t, 1i64));
}
fn insert_keys(&mut self, val: usize) {
self.buf.clear();
write!(&mut self.buf, "{:?}", val).unwrap();
write!(&mut self.buf, "{:0width$}", val, width = self.pad).unwrap();
let t = *self.keys_input.time();
self.keys_input.send((self.buf.as_str(), (), t, 1i64));
}
Expand All @@ -72,8 +74,9 @@ fn main() {
let size: usize = std::env::args().nth(2).unwrap().parse().unwrap();

let mode: String = std::env::args().nth(3).unwrap();
let pad: usize = std::env::args().nth(4).and_then(|s| s.parse().ok()).unwrap_or(0);

println!("Running [{:?}] arrangement", mode);
println!("Running [{:?}] arrangement (pad={})", mode, pad);

let timer1 = ::std::time::Instant::now();
let timer2 = timer1.clone();
Expand All @@ -94,7 +97,7 @@ fn main() {
let keys = keys.arrange::<OrdKeyBatcher<String,_,isize>, RcOrdKeyBuilder<String,_,isize>, OrdKeySpine<String,_,isize>>();
keys.join_core(data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
Box::new(RowWorkload { data_input, keys_input }) as Box<dyn Workload>
Box::new(RowWorkload { data_input, keys_input, pad }) as Box<dyn Workload>
},
"val" => {
use differential_dataflow::trace::implementations::ord_neu::{OrdValBatcher, RcOrdValBuilder, OrdValSpine};
Expand All @@ -104,7 +107,7 @@ fn main() {
let keys = keys.map(|x| (x, ())).arrange::<OrdValBatcher<String,(),_,isize>, RcOrdValBuilder<String,(),_,isize>, OrdValSpine<String,(),_,isize>>();
keys.join_core(data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
Box::new(RowWorkload { data_input, keys_input })
Box::new(RowWorkload { data_input, keys_input, pad })
},
"col" => {
use timely::dataflow::operators::Input as _;
Expand Down Expand Up @@ -133,7 +136,7 @@ fn main() {
keys.join_core(data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);

Box::new(ColWorkload { data_input, keys_input, buf: String::new() })
Box::new(ColWorkload { data_input, keys_input, buf: String::new(), pad })
},
_ => {
panic!("unrecognized mode: {:?}", mode);
Expand Down
49 changes: 24 additions & 25 deletions differential-dataflow/src/columnar/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! queues it. `finish` produces one final trie from any remaining tuples.

use std::collections::VecDeque;
use columnar::{Columnar, Clear, Len, Push};
use columnar::{Clear, Columnar, Len, Push};

use super::layout::ColumnarUpdate as Update;
use super::updates::UpdatesTyped;
Expand All @@ -23,19 +23,31 @@ pub struct ValBuilder<U: Update> {
pending: VecDeque<RecordedUpdates<U>>,
}

impl<U: Update> ValBuilder<U> {
/// Wrap `self.current` as a stride-1 `UpdatesTyped`, consolidate it, and queue.
///
/// Reclaims the column allocations: takes `self.current` by value, consolidates
/// (which builds a fresh trie via Push), then clears the now-stale columns and
/// returns them to `self.current` so capacity is retained across flushes.
fn flush(&mut self) {
if self.current.len() == 0 { return; }
let records = self.current.len();
let raw: UpdatesTyped<U> = std::mem::take(&mut self.current).into();
let updates = raw.consolidate().into();
// Reclaim raw's column allocations.
self.current = (raw.keys.values, raw.vals.values, raw.times.values, raw.diffs.values);
self.current.clear();
self.pending.push_back(RecordedUpdates { updates, records, consolidated: true });
}
}

use timely::container::PushInto;
impl<T, U: Update> PushInto<T> for ValBuilder<U> where TupleContainer<U> : Push<T> {
#[inline]
fn push_into(&mut self, item: T) {
self.current.push(item);
if self.current.len() > crate::columnar::LINK_TARGET {
use columnar::{Borrow, Index};
let records = self.current.len();
let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
refs.sort();
let updates = UpdatesTyped::form(refs.into_iter()).into();
self.pending.push_back(RecordedUpdates { updates, records, consolidated: true });
self.current.clear();
self.flush();
}
}
}
Expand All @@ -56,27 +68,14 @@ impl<U: Update> ContainerBuilder for ValBuilder<U> {

#[inline]
fn extract(&mut self) -> Option<&mut Self::Container> {
if let Some(container) = self.pending.pop_front() {
self.empty = Some(container);
self.empty.as_mut()
} else {
None
}
self.empty = self.pending.pop_front();
self.empty.as_mut()
}

#[inline]
fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.current.is_empty() {
use columnar::{Borrow, Index};
let records = self.current.len();
let mut refs = self.current.borrow().into_index_iter().collect::<Vec<_>>();
refs.sort();
let updates = UpdatesTyped::form(refs.into_iter()).into();
self.pending.push_back(RecordedUpdates { updates, records, consolidated: true });
self.current.clear();
}
self.empty = self.pending.pop_front();
self.empty.as_mut()
self.flush();
self.extract()
}
}

Expand Down
44 changes: 42 additions & 2 deletions differential-dataflow/src/columnar/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,24 @@ impl<U: Update, B: std::ops::Deref<Target = [u8]> + Clone + 'static> Updates<U,
/// The flat `(key, val, time, diff)` tuple for an [`Update`].
pub type Tuple<U> = (<U as Update>::Key, <U as Update>::Val, <U as Update>::Time, <U as Update>::Diff);

/// Wrap a flat `Tuple<U>` columnar container as a stride-1 [`UpdatesTyped<U>`].
///
/// Each layer gets `Strides::new(1, n)` bounds, where `n` is the record count.
/// The result is unsorted and unconsolidated; pass it through `consolidate()` to
/// canonicalize. Allocation-free: the four column containers move in place.
impl<U: Update> From<(ContainerOf<U::Key>, ContainerOf<U::Val>, ContainerOf<U::Time>, ContainerOf<U::Diff>)> for UpdatesTyped<U> {
fn from(container: (ContainerOf<U::Key>, ContainerOf<U::Val>, ContainerOf<U::Time>, ContainerOf<U::Diff>)) -> Self {
let (k_col, v_col, t_col, d_col) = container;
let n = k_col.len() as u64;
Self {
keys: Lists { values: k_col, bounds: Strides::new(1, n) },
vals: Lists { values: v_col, bounds: Strides::new(1, n) },
times: Lists { values: t_col, bounds: Strides::new(1, n) },
diffs: Lists { values: d_col, bounds: Strides::new(1, n) },
}
}
}

/// Returns the value-index range for list `i` given cumulative bounds.
#[inline]
pub fn child_range<B: IndexAs<u64>>(bounds: B, i: usize) -> std::ops::Range<usize> {
Expand Down Expand Up @@ -339,7 +357,7 @@ impl<U: Update> UpdatesTyped<U> {
/// Forms a consolidated `UpdatesTyped` trie from unsorted `(key, val, time, diff)` refs.
pub fn form_unsorted<'a>(unsorted: impl Iterator<Item = columnar::Ref<'a, Tuple<U>>>) -> Self {
let mut data = unsorted.collect::<Vec<_>>();
data.sort();
data.sort_by(|(ak, av, at, _), (bk, bv, bt, _)| (ak, av, at).cmp(&(bk, bv, bt)));
Self::form(data.into_iter())
}

Expand Down Expand Up @@ -402,7 +420,29 @@ impl<U: Update> UpdatesTyped<U> {
/// Consolidates into canonical trie form:
/// single outer key list, all lists sorted and deduplicated,
/// diff lists are singletons (or absent if cancelled).
pub fn consolidate(self) -> Self { Self::form_unsorted(self.iter()) }
pub fn consolidate(&self) -> Self {
// Fast path: stride-1 at every layer (flat tuple list, e.g. produced by
// `From<TupleContainer>`). Skip the four-level trie iter and walk the
// underlying value columns directly.
if self.keys.bounds.strided() == Some(1)
&& self.vals.bounds.strided() == Some(1)
&& self.times.bounds.strided() == Some(1)
&& self.diffs.bounds.strided() == Some(1)
{
let n = Len::len(&self.keys.values);
// Tuple up borrows of the four columns — `Index::get(i)` on the
// tuple dispatches to each component, yielding a `Ref<Tuple<U>>`
// in a single call.
let view = (
self.keys.values.borrow(),
self.vals.values.borrow(),
self.times.values.borrow(),
self.diffs.values.borrow(),
);
return Self::form_unsorted((0..n).map(|i| view.get(i)));
}
Self::form_unsorted(self.iter())
}
/// Drop entries whose diff list is empty (cancelled), rebuilding the trie.
pub fn filter_zero(self) -> Self {
if self.diffs.bounds.strided() == Some(1) { self }
Expand Down
Loading