Skip to content

[SS-63][Oneshot Sources] Prevent timely workers from dropping oneshot ingestions without controller commands#35006

Merged
patrickwwbutler merged 3 commits intoMaterializeInc:mainfrom
patrickwwbutler:patrick/oneshot-replica-support
Feb 26, 2026
Merged

[SS-63][Oneshot Sources] Prevent timely workers from dropping oneshot ingestions without controller commands#35006
patrickwwbutler merged 3 commits intoMaterializeInc:mainfrom
patrickwwbutler:patrick/oneshot-replica-support

Conversation

@patrickwwbutler
Copy link
Contributor

@patrickwwbutler patrickwwbutler commented Feb 13, 2026

This fixes a dataflow coordination issue in which the first timely worker begins to process an ingestion, fails, and receives an error that cancels the operation very quickly. Then, in the render_completion_operator operator of the dataflow, on reception of an error result, the worker exits without exhausting its input. Then, the now-removed code would drop the ingestion, including the shutdown buttons from that worker, effectively stopping scheduling of the dataflow. This would then result in the other timely workers observing the disconnected results channel indefinitely.

By removing this early ingestion drop logic from the worker, and allowing the StorageController to drop it from all workers when the ingestion is fully completed, we can ensure that the results channel is not disconnected early.

Verification

This now passes the copy_from_s3_minio.td test with multiple replicas, and larger cluster sizes.

@github-actions
Copy link

github-actions bot commented Feb 13, 2026

Pre-merge checklist

  • The PR title is descriptive and will make sense in the git log.
  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).

@patrickwwbutler patrickwwbutler requested a review from a team February 13, 2026 16:19
@patrickwwbutler
Copy link
Contributor Author

ran a nightly building using trigger-ci containing tests with multiple replicas and larger cluster sizes

@patrickwwbutler patrickwwbutler force-pushed the patrick/oneshot-replica-support branch from ddcfaba to 9b38777 Compare February 19, 2026 18:29
@patrickwwbutler patrickwwbutler requested review from a team, ggevay and teskje as code owners February 19, 2026 19:27
@patrickwwbutler patrickwwbutler force-pushed the patrick/oneshot-replica-support branch 2 times, most recently from 9b38777 to c5136e3 Compare February 19, 2026 19:41
Copy link
Contributor

@martykulma martykulma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like there's an additional cleanup of in process_oneshot_ingestion, but looks great otherwise!

Copy link
Member

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is the right fix. Looking at #31136 which introduced the cancel protocol for one-shot ingestions, it has a design deficit that its reconciliation will be racy, so I'm doubtful that this fix will cover all cases. Commands that render a dataflow need to be executed on all workers, there is no way to avoid this. Even if a worker receives a cancel of a one-shot ingestion before it rendered it, it'll need to render because otherwise the Timely infrastructure will stop working.

Copy link
Member

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Requesting changes to avoid merging as-is.)

@patrickwwbutler
Copy link
Contributor Author

patrickwwbutler commented Feb 20, 2026

I am not sure this is the right fix. Looking at #31136 which introduced the cancel protocol for one-shot ingestions, it has a design deficit that its reconciliation will be racy, so I'm doubtful that this fix will cover all cases.

@antiguru Could you clarify this design deficit for me? It appears there was some back and forth, but ultimately it was decided to go for option 1 from Jan's comment.

It seems to be around clusters dropping with pending oneshots, but I'm not sure what the code we're deleting here would have to do with that? I think that the adapter sends a cancel command to the storage controller, which in turn sends cancel commands to the workers. Is the issue that the workers in the cluster may be dropped before receiving that cancel command? If so, I don't think this will actually affect that reconciliation process, as the workers that would have been dropped by this deleted code have already finished. Do you think this change would actually cause a regression, or just not fully address cleanup issues?

Commands that render a dataflow need to be executed on all workers, there is no way to avoid this. Even if a worker receives a cancel of a one-shot ingestion before it rendered it, it'll need to render because otherwise the Timely infrastructure will stop working.

My observation was that the StorageController would not actually send the CancelOneshotIngestion command until at least some number of the timely workers had rendered and completed (I believe related to some measure of "progress" updates?), which is actually what was causing this to hang in the first place (only one worker was finished and the rest were waiting around forever for progress from the finished worker).

What this change does is simply prevent individual workers from dropping their dataflows too early, and instead wait to be told to do so once the controller has received a sufficient result.

@antiguru
Copy link
Member

I typed up something and then realized this only applies during reconciliation. This is not the case here, right? I.e., we don't have envd reconnect, correct?

What I'd like to understand is why removing the code in your PR fixes the problem. Looking at the implementation, if we receive a oneshot ingestion instruction, we always render a render_completion_operator at the end, which holds on to the tx endpoint of a channel to send along a batch. Maybe the line maybe_payload = Some(result.map_err(|e| e.to_string())?); silently swallows the storage error?

@antiguru antiguru self-requested a review February 20, 2026 14:43
@patrickwwbutler
Copy link
Contributor Author

I typed up something and then realized this only applies during reconciliation. This is not the case here, right? I.e., we don't have envd reconnect, correct?

Yes, this is not a reconciliation related change, this applies when a cluster with multiple workers hits an error while processing the data early on, and exits quickly. It's a bit unclear, but the understanding @DAlperin and I have is that by dropping ingestion from the first worker (worker_index=0) when it errors out quickly, we trigger the dataflow's shutdown button, but the other workers have not finished rendering, let alone processing the dataflow, so they end up never receiving progress messages from the first worker.

Basically we believe the early trigger on the shutdown button is causing the issue.

@antiguru
Copy link
Member

antiguru commented Feb 20, 2026

we trigger the dataflow's shutdown button

The shutdown button should wait for the other workers to catch up. If it doesn't this might be a bug. (If using the Mz-internal one!)

@patrickwwbutler
Copy link
Contributor Author

we trigger the dataflow's shutdown button

The shutdown button should wait for the other workers to catch up. If it doesn't this might be a bug. (If using the Mz-internal one!)

Possibly it is not waiting because the other workers haven't finished rendering in this case? I believe they are still waiting for the InternalStorageCommand that tells them to actually create the ingestion at this point, because the command is sent out by worker 0, and it then begins rendering the dataflow significantly faster, and errors out almost immediately

@patrickwwbutler patrickwwbutler force-pushed the patrick/oneshot-replica-support branch from c5136e3 to 1f8b8ff Compare February 20, 2026 16:33
@martykulma
Copy link
Contributor

Alternatively, this could publish StorageCommand::CancelOneshotIngestion via internal_cmd_tx for each id in to_remove.

@patrickwwbutler patrickwwbutler force-pushed the patrick/oneshot-replica-support branch from 1f8b8ff to ec50e78 Compare February 23, 2026 17:00
@patrickwwbutler
Copy link
Contributor Author

Alternatively, this could publish StorageCommand::CancelOneshotIngestion via internal_cmd_tx for each id in to_remove.

It doesn't seem like that should be necessary, is there a good reason not to just allow the controller to send the cancellation commands? I feel like simplifying the logic as much as possible here is ideal

@petrosagg
Copy link
Contributor

I don't think the problem is in how we use the buttons. The buttons work correctly regardless of render/drop timing in in that when eventually all workers press their buttons the dataflow shuts down.

I looked a bit at the oneshot ingestion code and it seems to me that the operator rendered by render_completion_operator is the only thing that can cause the worker to observe a disconnected results channel. But that operator waits for its input to be exhausted (i.e reach the empty frontier) before sending a value down the channel and then exiting, dropping its tx side. But to observe an empty frontier on its input it means that all other workers have also rendered their oneshot dataflow and made progress, which goes against the theory that this is caused by one worker failing quickly before the other workers have rendered their part.

I think we need a better grasp of what is causing the hang, happy to hop on a call to discuss this

@DAlperin
Copy link
Member

Might be worth throwing some logs on progress messages/probes around to observe the blocking behavior more explicitly

@petrosagg
Copy link
Contributor

I looked at this with a fresh set of eyes today and found what's going on.

But that operator waits for its input to be exhausted (i.e reach the empty frontier) before sending a value down the channel and then exiting, dropping its tx side.

This turns out to be not true! Do you see it?

https://github.com/MaterializeInc/materialize/blob/main/src/storage-operators/src/oneshot_source.rs#L641-L654

There is a try (?) operator when assigning to make_payload which means that the completion operator does not actual wait for the frontier to reach the empty frontier in the case of error, and immediately disconnects the channel! This then leads to the worker dropping the shutdown button which effectively stops scheduling the dataflow. The only way out from here is if all other workers also drop their buttons but that won't happen unless the oneshot ingestion is cancelled by the user or the other workers complete, which cannot happen since the worker that observed the error has stopped being scheduled.

So another way to fix the problem would be to change the completion operator according to this diff:

     builder.build(move |_| async move {
         let result = async move {
-            let mut maybe_payload: Option<ProtoBatch> = None;
+            let mut maybe_payload: Result<Option<ProtoBatch>, String> = Ok(None);

             while let Some(event) = results_input.next().await {
                 if let AsyncEvent::Data(_cap, results) = event {
@@ -645,15 +645,15 @@ pub fn render_completion_operator<G, F>(
                         .expect("only 1 event on the result stream");

                     // TODO(cf2): Lift this restriction.
-                    if maybe_payload.is_some() {
+                    if !matches!(maybe_payload, Ok(None)) {
                         panic!("expected only one batch!");
                     }

-                    maybe_payload = Some(result.map_err(|e| e.to_string())?);
+                    maybe_payload = result.map(Some).map_err(|e| e.to_string());
                 }
             }

-            Ok(maybe_payload)
+            maybe_payload
         }
         .await;

That said, I think the PR as-is is the preferred route. We don't want clusters to make independent decisions about which dataflows get cancelled, this is the job of the controller. This way we are also free to write any operator logic we want, including early exiting etc without worrying about deadlocks.

Under this light, I think we should merge the PR but we should update the PR description to reflect this analysis so that it gets recorded in the git log.

@patrickwwbutler patrickwwbutler force-pushed the patrick/oneshot-replica-support branch from ec50e78 to 32bf981 Compare February 25, 2026 21:16
@patrickwwbutler patrickwwbutler changed the title [SS-63][Oneshot Sources] Fix race condition in Oneshot Ingestion related to multiple Timely Workers [SS-63][Oneshot Sources] Prevent timely workers from dropping ingestions without controller commands Feb 25, 2026
@patrickwwbutler patrickwwbutler dismissed antiguru’s stale review February 26, 2026 15:10

Concerns were addressed after discussion with Petros and Moritz confirmed with me that he's okay with merging now

@patrickwwbutler patrickwwbutler merged commit a52646c into MaterializeInc:main Feb 26, 2026
135 checks passed
@patrickwwbutler patrickwwbutler changed the title [SS-63][Oneshot Sources] Prevent timely workers from dropping ingestions without controller commands [SS-63][Oneshot Sources] Prevent timely workers from dropping oneshot ingestions without controller commands Feb 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants