diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 364608be82ca..a3f23aebdf8f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -216,6 +216,8 @@ public void scheduleWork( Work.ProcessingContext processingContext, boolean drainMode, ImmutableList getWorkStreamLatencies) { + // Before any processing starts, call any pending OnCommit callbacks + commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList()); computationState.activateWork( ExecutableWork.create( Work.create( @@ -255,9 +257,6 @@ private void processWork( setUpWorkLoggingContext(work.getLatencyTrackingId(), computationId); LOG.debug("Starting processing for {}:\n{}", computationId, work); - // Before any processing starts, call any pending OnCommit callbacks. Nothing that requires - // cleanup should be done before this, since we might exit early here. - commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList()); if (workItem.getSourceState().getOnlyFinalize()) { Windmill.WorkItemCommitRequest.Builder outputBuilder = initializeOutputBuilder(key, workItem); outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));