refactor(dataset-raw): simplify RawTableWriter logic#2089
refactor(dataset-raw): simplify RawTableWriter logic#2089
Conversation
leoyvens
left a comment
There was a problem hiding this comment.
The changes to RawTableWriter seem like an improvement, even if we're missing a integration-level test of this situation being fixed.
The change to the higher level loop is something we need to further discuss.
| ?missing_dataset_ranges, | ||
| "terminating infinite materialize loop" | ||
| ); | ||
| break; |
There was a problem hiding this comment.
This break is very suspicious. This loop is meant to be infinite, breaking would entirely stop the extraction procedure. Are you entirely sure you need this?
There was a problem hiding this comment.
I needed something to prevent busy loops in the ongoing Solana sync without manual intervention on my part. But I agree, it is probably too much of a hack. What do you think about this (at least until a better solution, perhaps the one you suggested): d78db51
| async fn write(&mut self, table_rows: &TableRows) -> Result<(), ParquetError> { | ||
| assert_eq!(&self.range.network, &table_rows.range.network); | ||
| self.file.write(&table_rows.rows).await?; | ||
|
|
||
| let incoming_range = &table_rows.range; | ||
| self.range.numbers = self.range.start()..=incoming_range.end(); | ||
| self.range.hash = incoming_range.hash; | ||
| self.range.timestamp = incoming_range.timestamp; | ||
|
|
||
| Ok(()) |
There was a problem hiding this comment.
This is a good callout by the AI. Reconstructing the BlockRange is the more robust pattern, if a new field is added to BlockRange that will generate a compiler error, preventing the new field from silently introducing a bug here.
|
This sequence you describe is very helpful to understand the problem:
It jumps out that we would run cc @Theodus who worked on supporting missing block numbers. |
af1977f to
f8a57c5
Compare
bca979d to
7090a30
Compare
Adds a `RawTableSegment` struct to encapsulate data related to a segment that is being written to. Changes the `RawTableWriter` to open segments lazily, whenever they are about to be written to. This ensures that no empty segments (i.e. files) will end up on disk. Refactors the `RawTableWriter` write and close logic to have less branching which should make it easier to follow.
Adds a break condition to the materialize loop to prevent it from going infinite. This only manifests itself with providers that stream non-contiguous blocks (e.g. skipped slots in Solana).
Adds an integration test which ensures that the raw dataset writer correctly handles skipped segments (segments where no blocks were streamed for the entire range). Based on the Solana extractor, since it is the only one that can skip blocks (i.e. slots).
Instead of breaking out of the materialize loop (which is supposed to be infinite), keep track of unreachable ranges and filter them out before running each materialize iteration. This allows the loop to keep tracking the latest block but also prevents it infinite from going infinite due to skipped (non-existent) ranges.
7090a30 to
9f6a487
Compare
| // Find the union of missing table block ranges. Then subtract ranges that | ||
| // were missing in the previous iteration as well. These are segments that | ||
| // can never be materialized because they do not exist (e.g. skipped slot | ||
| // ranges in Solana). |
There was a problem hiding this comment.
So the assumption is: A range that is passed in to materialize_ranges but remains missing after it runs must be missing from the chain. Seems reasonable. But then I think we should move the logic to after running materialize_ranges, this would be more direct.
Motivation
While syncing the Solana chain, I encountered this panic when resuming a stopped stream:
worker-1 | 2026-03-27T00:44:08.252459Z ERROR worker::service: job failed with fatal error node_id=dump job_id=1 error=Failed to materialize raw dataset error_source=["Partition task failed", "task 236 panicked with message \"called `Option::unwrap()` on a `None` value\""]with the panicking unwrap being this one.
A failing scenario
The panic can only happen with providers that stream non-contiguous blocks (e.g. skipped slots in Solana). Here's a scenario in which it occurs (slot numbers are made up for the purposes of the example):
Cause
The
RawTableWriterimplementation opens files eagerly for each segment, once upon creation and then once for each new range or after a flush (exceeded size, time limit etc). In those scenarios, thecurrent_filefield gets set toSomebut nothing is written into the file yet and the current_range field is stillNone(because it is updated after a write).This works when all streamed blocks are contiguous because, eventually, the block stream will reach the current segment's range and something will be written into the file (
current_rangewill be set toSome). However, for providers that stream non-contiguous blocks, that is not necessarily true (as described in the scenario above).Solution
The main problem with the current approach is that segments are open eagerly, which leaves a time window where an empty segment exists. Therefore, the solution presented in this PR is to open segments lazily, whenever we have something to write into them. In addition, the PR introduced a
RawTableSegmentabstraction that encapsulates the segment file and block range into a single field, instead of having them as two separate fields on theRawTableWriter.