Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions diagnostics/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,13 @@ <h2 id="detail-title"></h2>
ws.addEventListener('message', event => {
try {
const data = JSON.parse(event.data);
if (Array.isArray(data)) {
applyBatch(data);
// Wire format is one Frame envelope per WebSocket message:
// { type: "Frame", ts_us: <u64>, updates: [...] }
// The server only emits a Frame once the capture frontier has
// advanced past `ts_us`, so each Frame is a complete view at that
// closed logical timestamp.
if (data && data.type === 'Frame' && Array.isArray(data.updates)) {
applyBatch(data.updates);
}
} catch (e) {
// ignore parse errors
Expand Down
82 changes: 70 additions & 12 deletions diagnostics/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
//! (e.g., `python3 -m http.server 8000`). A future improvement could embed
//! static file serving here so only one port is needed.

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::net::TcpListener;
use std::sync::mpsc;
use std::thread;
Expand Down Expand Up @@ -155,6 +155,23 @@ fn run_server(port: u16, sink: SinkHandle) {
let mut clients: HashMap<usize, tungstenite::WebSocket<std::net::TcpStream>> = HashMap::new();
let mut next_client_id: usize = 0;

// Per-client buffer of records awaiting their timestamp to close, keyed
// by the inner `ts` of each record. A timestamp is "closed" when the
// capture stream's frontier (tracked in `progress_counts`) advances past
// it — at which point we flush the bucket as a single Frame to the client.
let mut pending: HashMap<usize, BTreeMap<Duration, Vec<serde_json::Value>>> =
HashMap::new();
// Running multiplicities from `Event::Progress` updates. The current
// frontier is the smallest key with positive count; an entry at zero is
// removed. Anything strictly less than the smallest live key is closed.
//
// Timely's capture protocol sends progress as *deltas* relative to an
// assumed initial frontier of `{T::default(): 1}` — so we must seed the
// counter that way, otherwise the first event's `(0ns, -1)` retraction
// leaves us at `{0ns: -1}` and the frontier sticks at 0 forever.
let mut progress_counts: BTreeMap<Duration, i64> = BTreeMap::new();
progress_counts.insert(Duration::default(), 1);

loop {
// Accept pending connections.
loop {
Expand Down Expand Up @@ -183,17 +200,34 @@ fn run_server(port: u16, sink: SinkHandle) {
}
}

// Drain diagnostic updates and group by client.
let mut batches_by_client: HashMap<usize, Vec<serde_json::Value>> = HashMap::new();
// Drain diagnostic updates: bucket records by their inner timestamp
// per client; absorb progress updates into the running frontier.
let mut frontier_changed = false;
loop {
match receiver.try_recv() {
Ok(Event::Messages(_time, data)) => {
for ((client_id, update), _ts, diff) in data {
Ok(Event::Messages(_envelope_time, data)) => {
// The capture envelope time is incidental; the meaningful
// logical time is the per-record `ts`.
for ((client_id, update), ts, diff) in data {
let json = update_to_json(&update, diff);
batches_by_client.entry(client_id).or_default().push(json);
pending
.entry(client_id)
.or_default()
.entry(ts)
.or_default()
.push(json);
}
}
Ok(Event::Progress(_)) => {}
Ok(Event::Progress(updates)) => {
for (t, diff) in updates {
let entry = progress_counts.entry(t).or_insert(0);
*entry += diff;
if *entry == 0 {
progress_counts.remove(&t);
}
}
frontier_changed = true;
}
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => {
eprintln!("Diagnostics output channel closed, shutting down server");
Expand All @@ -206,14 +240,37 @@ fn run_server(port: u16, sink: SinkHandle) {
}
}

// Send batched updates to each client.
// If the frontier moved, flush every closed timestamp bucket. One
// Frame per closed `ts`, in timestamp order, so each Frame is one
// atomic transaction on the client.
let mut disconnected = Vec::new();
for (client_id, updates) in &batches_by_client {
if let Some(ws) = clients.get_mut(client_id) {
if !updates.is_empty() {
let payload = serde_json::to_string(updates).unwrap();
if frontier_changed {
// Anything strictly less than the smallest live progress count
// is closed. If `progress_counts` is empty, every buffered
// timestamp is closed.
let frontier: Option<Duration> = progress_counts.keys().next().copied();
for (client_id, buckets) in pending.iter_mut() {
let closed: Vec<Duration> = match frontier {
Some(f) => buckets.range(..f).map(|(t, _)| *t).collect(),
None => buckets.keys().copied().collect(),
};
for ts in closed {
let updates = buckets.remove(&ts).unwrap_or_default();
if updates.is_empty() {
continue;
}
let Some(ws) = clients.get_mut(client_id) else {
continue;
};
let frame = serde_json::json!({
"type": "Frame",
"ts_us": ts.as_micros() as u64,
"updates": updates,
});
let payload = serde_json::to_string(&frame).unwrap();
if ws.send(Message::Text(payload.into())).is_err() {
disconnected.push(*client_id);
break;
}
}
}
Expand All @@ -229,6 +286,7 @@ fn run_server(port: u16, sink: SinkHandle) {
}
for client_id in disconnected {
clients.remove(&client_id);
pending.remove(&client_id);
client_input.disconnect(client_id, start.elapsed());
eprintln!("Diagnostics client {client_id} disconnected");
}
Expand Down
Loading