[SS-63][Oneshot Sources] Prevent timely workers from dropping oneshot ingestions without controller commands#35006
Conversation
Pre-merge checklist
|
|
ran a nightly building using trigger-ci containing tests with multiple replicas and larger cluster sizes |
ddcfaba to
9b38777
Compare
9b38777 to
c5136e3
Compare
martykulma
left a comment
There was a problem hiding this comment.
It looks like there's an additional cleanup of in process_oneshot_ingestion, but looks great otherwise!
antiguru
left a comment
There was a problem hiding this comment.
I am not sure this is the right fix. Looking at #31136 which introduced the cancel protocol for one-shot ingestions, it has a design deficit that its reconciliation will be racy, so I'm doubtful that this fix will cover all cases. Commands that render a dataflow need to be executed on all workers, there is no way to avoid this. Even if a worker receives a cancel of a one-shot ingestion before it rendered it, it'll need to render because otherwise the Timely infrastructure will stop working.
antiguru
left a comment
There was a problem hiding this comment.
(Requesting changes to avoid merging as-is.)
@antiguru Could you clarify this design deficit for me? It appears there was some back and forth, but ultimately it was decided to go for option 1 from Jan's comment. It seems to be around clusters dropping with pending oneshots, but I'm not sure what the code we're deleting here would have to do with that? I think that the adapter sends a cancel command to the storage controller, which in turn sends cancel commands to the workers. Is the issue that the workers in the cluster may be dropped before receiving that cancel command? If so, I don't think this will actually affect that reconciliation process, as the workers that would have been dropped by this deleted code have already finished. Do you think this change would actually cause a regression, or just not fully address cleanup issues?
My observation was that the What this change does is simply prevent individual workers from dropping their dataflows too early, and instead wait to be told to do so once the controller has received a sufficient result. |
|
I typed up something and then realized this only applies during reconciliation. This is not the case here, right? I.e., we don't have envd reconnect, correct? What I'd like to understand is why removing the code in your PR fixes the problem. Looking at the implementation, if we receive a oneshot ingestion instruction, we always render a |
Yes, this is not a reconciliation related change, this applies when a cluster with multiple workers hits an error while processing the data early on, and exits quickly. It's a bit unclear, but the understanding @DAlperin and I have is that by dropping ingestion from the first worker ( Basically we believe the early trigger on the shutdown button is causing the issue. |
The shutdown button should wait for the other workers to catch up. If it doesn't this might be a bug. (If using the Mz-internal one!) |
Possibly it is not waiting because the other workers haven't finished rendering in this case? I believe they are still waiting for the |
c5136e3 to
1f8b8ff
Compare
|
Alternatively, this could publish |
1f8b8ff to
ec50e78
Compare
It doesn't seem like that should be necessary, is there a good reason not to just allow the controller to send the cancellation commands? I feel like simplifying the logic as much as possible here is ideal |
|
I don't think the problem is in how we use the buttons. The buttons work correctly regardless of render/drop timing in in that when eventually all workers press their buttons the dataflow shuts down. I looked a bit at the oneshot ingestion code and it seems to me that the operator rendered by I think we need a better grasp of what is causing the hang, happy to hop on a call to discuss this |
|
Might be worth throwing some logs on progress messages/probes around to observe the blocking behavior more explicitly |
|
I looked at this with a fresh set of eyes today and found what's going on.
This turns out to be not true! Do you see it? There is a try ( So another way to fix the problem would be to change the completion operator according to this diff: builder.build(move |_| async move {
let result = async move {
- let mut maybe_payload: Option<ProtoBatch> = None;
+ let mut maybe_payload: Result<Option<ProtoBatch>, String> = Ok(None);
while let Some(event) = results_input.next().await {
if let AsyncEvent::Data(_cap, results) = event {
@@ -645,15 +645,15 @@ pub fn render_completion_operator<G, F>(
.expect("only 1 event on the result stream");
// TODO(cf2): Lift this restriction.
- if maybe_payload.is_some() {
+ if !matches!(maybe_payload, Ok(None)) {
panic!("expected only one batch!");
}
- maybe_payload = Some(result.map_err(|e| e.to_string())?);
+ maybe_payload = result.map(Some).map_err(|e| e.to_string());
}
}
- Ok(maybe_payload)
+ maybe_payload
}
.await;That said, I think the PR as-is is the preferred route. We don't want clusters to make independent decisions about which dataflows get cancelled, this is the job of the controller. This way we are also free to write any operator logic we want, including early exiting etc without worrying about deadlocks. Under this light, I think we should merge the PR but we should update the PR description to reflect this analysis so that it gets recorded in the git log. |
ec50e78 to
32bf981
Compare
Concerns were addressed after discussion with Petros and Moritz confirmed with me that he's okay with merging now
This fixes a dataflow coordination issue in which the first timely worker begins to process an ingestion, fails, and receives an error that cancels the operation very quickly. Then, in the
render_completion_operatoroperator of the dataflow, on reception of an error result, the worker exits without exhausting its input. Then, the now-removed code would drop the ingestion, including the shutdown buttons from that worker, effectively stopping scheduling of the dataflow. This would then result in the other timely workers observing the disconnected results channel indefinitely.By removing this early ingestion drop logic from the worker, and allowing the
StorageControllerto drop it from all workers when the ingestion is fully completed, we can ensure that the results channel is not disconnected early.Verification
This now passes the
copy_from_s3_minio.tdtest with multiple replicas, and larger cluster sizes.