Storage introspection mirrored in compute#35863
Storage introspection mirrored in compute#35863frankmcsherry wants to merge 4 commits intoMaterializeInc:mainfrom
Conversation
|
Thanks for opening this PR! Here are a few tips to help make the review process smooth for everyone. PR title guidelines
Pre-merge checklist
|
7baca63 to
ec858b9
Compare
antiguru
left a comment
There was a problem hiding this comment.
The approach checks out, I left some comments around the implementation.
One though I have is that we should feature-gate the functionality and have it turned off by default to prevent downstream consumers to make assumptions around the operator IDs, but that is contrary to the goal of introspection. So we should at least document it as a temporary behavior until we figure out our plans for cluster unification.
| // We use an approach similar to compute's logging: wrap the writer in | ||
| // a BatchLogger that translates Logger callbacks into Event pushes, | ||
| // then register the Logger with timely's log_register. | ||
| let interval_ms = 1000u128; // 1 second batching interval |
There was a problem hiding this comment.
TODO, but low priority: Make the interval configurable.
| None | ||
| }; | ||
| let mut active = self.activate_compute().unwrap(); | ||
| active.storage_log_reader = storage_log_reader; |
There was a problem hiding this comment.
This is fragile as it only works if the current command is CrateInstance, but not in any other case. This is transient state, but it seems to be better to make it part of ComputeState to avoid unrelated parts of the code to depend on implementation details.
| /// Per-worker readers for storage timely logging events. | ||
| // TODO: Consider using the timely config's `process` and `workers` fields to | ||
| // deterministically assign readers to workers by local index, rather than pop(). | ||
| pub storage_log_readers: Arc<Mutex<Vec<StorageTimelyLogReader>>>, |
There was a problem hiding this comment.
The invariant in compute dataflows is that worker id
| // Create per-worker bridges for forwarding storage timely logging events to compute. | ||
| let num_workers = storage_timely_config.workers; | ||
| let (storage_log_writers, storage_log_readers): (Vec<_>, Vec<_>) = | ||
| (0..num_workers).map(|_| arc_event_link()).unzip(); |
There was a problem hiding this comment.
This needs to be feature-flagged. It means a new parameter to clusterd that is set by envd depending on a feature flag.
ec858b9 to
e87ceb1
Compare
e87ceb1 to
46e4c45
Compare
| /// Offset added to storage operator/channel IDs to avoid collisions with compute IDs. | ||
| /// | ||
| /// This is large enough that compute IDs (which start from 0 and grow) will never reach it, | ||
| /// but small enough to be representable as a `u64` with room for many storage operators. | ||
| const STORAGE_ID_OFFSET: usize = 1 << 48; | ||
|
|
||
| /// Remaps operator, channel, and address IDs in a `TimelyEvent` originating from a storage | ||
| /// timely instance so they don't collide with compute's IDs. | ||
| fn remap_timely_event_ids(event: &mut TimelyEvent) { |
There was a problem hiding this comment.
Question for console folks: How does the UI decide which things show up in the clickable introspection graph thing? If a bunch of storage operators show up in the introspection tables with these gigantic IDs, will that clutter the existing introspection UI for compute?
@leedqin
f0b20ae to
620cdc0
Compare
Events from storage's timely infra are captured and replayed in the compute logging dataflows, with identifiers bumped up by 2^48. They are pretty large at that point, but shouldn't collide with the compute identifiers. The nudged identifiers should include Both the timely operator ids, and also the dataflow identifiers (the lead coordinate of addresses).
Not much testing, other than observing the information flowing after creating an auction leaden source.