feat: add workflow stage resume#747
Conversation
|
MkDocs preview: https://d1e17c5c.dd-docs-preview.pages.dev Fern preview: https://nvidia-preview-pr-747.docs.buildwithfern.com/nemo/datadesigner
|
Greptile SummaryThis PR adds stage-level resume support to
|
| Filename | Overview |
|---|---|
| packages/data-designer/src/data_designer/interface/composite_workflow.py | Core implementation of stage-level resume: adds prior-metadata loading, per-stage fingerprint comparison with upstream-chain propagation, skip/partial-resume/rerun dispatch, and atomic metadata writes. Logic is consistent and edge cases are well-handled. |
| packages/data-designer/tests/interface/test_composite_workflow.py | Adds 10 new test cases covering skip, changed-stage rerun, empty-upstream propagation, missing callback output, corrupt metadata, partial-stage delegation (running/failed), and strict ALWAYS mode rejection. Coverage is broad; mock-based tests correctly inspect resume arguments passed to create(). |
| docs/concepts/workflow-chaining.md | Adds Resume section documenting IF_POSSIBLE and ALWAYS modes, and removes the now-stale 'Stage-level resume is not implemented yet' bullet from the limits list. |
| fern/versions/latest/pages/concepts/workflow-chaining.mdx | Mirror of the MkDocs doc change; identical Resume section added and the same limits bullet removed. |
| plans/workflow-chaining/workflow-chaining.md | Architecture plan updated with a status note after the stage-level resume slice and a minor clarification on the post-#636 deferrals. No logic concerns. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[workflow.run resume=X] --> B[_read_prior_workflow_metadata]
B --> C{resume == NEVER?}
C -- Yes --> D[prior_metadata = None]
C -- No --> E{metadata file exists?}
E -- No + ALWAYS --> F[raise: no metadata found]
E -- No + IF_POSSIBLE --> D
E -- Yes --> G[Load prior_metadata]
D & G --> H[Stage Loop]
H --> I{skipped_upstream_stage set?}
I -- Yes --> J[Write skipped_empty_upstream, continue]
I -- No --> K[Compute stage_fingerprint incl. upstream_fingerprint]
K --> L{prior_matches?}
L -- Yes --> M{_can_skip_prior_stage?}
M -- Yes --> N[Skip: copy prior metadata, continue]
N -->|status==completed_empty| O[skipped_upstream_stage = stage.name]
N --> H
M -- No --> P{status in RESUMABLE and stage_path exists?}
P -- Yes --> Q[stage_resume = ALWAYS]
P -- No --> R{resume==ALWAYS AND NOT force_rerun?}
R -- Yes --> S[raise: stage not reusable]
R -- No --> T[stage_resume = NEVER]
L -- No --> T
Q & T --> U{stage_resume==NEVER and stage_path exists?}
U -- Yes --> V[rmtree stage_path]
U -- No --> W[Keep stage_path]
V & W --> X[data_designer.create with stage_resume]
X --> Y{output_processors?}
Y -- Yes --> Z[rmtree output-processors, re-run processors]
Y -- No --> AA[resolve output_seed_path]
Z --> AA
AA --> AB{output_records == 0?}
AB -- Yes + allow_empty --> AC[completed_empty, set skipped_upstream_stage]
AB -- No --> AD[completed]
AB -- Yes + not allow_empty --> AE[raise: empty output]
AC & AD --> AF[force_rerun_downstream = True, write metadata]
AF --> H
Reviews (2): Last reviewed commit: "fix: harden workflow resume metadata" | Re-trigger Greptile
| if prior_matches and _can_skip_prior_stage(stage, prior_stage_metadata): | ||
| stage_metadata.update(prior_stage_metadata) | ||
| output_seed_path = Path(stage_metadata["output_seed_path"]) | ||
| output_records = _count_parquet_records(output_seed_path) | ||
| output_result = _stage_result_from_metadata( | ||
| workflow_path=workflow_path, | ||
| stage=stage, | ||
| stage_dir_name=stage_dir_name, | ||
| stage_builder=stage_builder, | ||
| ) | ||
| stage_results[stage.name] = output_result | ||
| stage_output_paths[stage.name] = output_seed_path | ||
| previous_seed_path = output_seed_path | ||
| previous_output_records = output_records | ||
| previous_stage_name = stage.name | ||
| previous_stage_fingerprint = stage_fingerprint | ||
| if stage_metadata["status"] == "completed_empty": | ||
| skipped_upstream_stage = stage.name | ||
| _write_workflow_metadata(workflow_path, metadata) | ||
| continue |
There was a problem hiding this comment.
completed_empty skip path leaves previous_output_records = 0 in metadata
When a stage was previously completed_empty and is being skipped, _count_parquet_records(output_seed_path) may return 0 (parquet files with 0 rows), assigning previous_output_records = output_records = 0. The 0 is harmless because skipped_upstream_stage is set in the same block, causing all subsequent stages to fast-path skip before any num_records computation. The relationship is implicit though — if those two assignments were ever reordered, a downstream stage could inherit previous_output_records = 0 and silently fall back to DEFAULT_NUM_RECORDS.
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer/src/data_designer/interface/composite_workflow.py
Line: 306-325
Comment:
**`completed_empty` skip path leaves `previous_output_records = 0` in metadata**
When a stage was previously `completed_empty` and is being skipped, `_count_parquet_records(output_seed_path)` may return `0` (parquet files with 0 rows), assigning `previous_output_records = output_records = 0`. The `0` is harmless because `skipped_upstream_stage` is set in the same block, causing all subsequent stages to fast-path skip before any `num_records` computation. The relationship is implicit though — if those two assignments were ever reordered, a downstream stage could inherit `previous_output_records = 0` and silently fall back to `DEFAULT_NUM_RECORDS`.
How can I resolve this? If you propose a fix, please make it concise.| def test_composite_workflow_resume_if_possible_delegates_matching_partial_stage( | ||
| stub_artifact_path: Path, | ||
| stub_model_providers: list[ModelProvider], | ||
| stub_model_configs: list[ModelConfig], | ||
| stub_dataset_profiler_results, | ||
| ) -> None: | ||
| data_designer = _data_designer(stub_artifact_path, stub_model_providers) | ||
| create_mock = _patch_create(data_designer, stub_dataset_profiler_results) | ||
| workflow = data_designer.compose_workflow(name="resume-partial") | ||
| workflow.add_stage("base", _category_builder(stub_model_configs), num_records=2) | ||
| workflow.add_stage("copy", _copy_builder(stub_model_configs)) | ||
| workflow.run() | ||
| metadata_path = stub_artifact_path / "resume-partial" / "workflow-metadata.json" | ||
| metadata = json.loads(metadata_path.read_text(encoding="utf-8")) | ||
| metadata["stages"][0]["status"] = "running" | ||
| metadata_path.write_text(json.dumps(metadata), encoding="utf-8") | ||
| create_mock.reset_mock() | ||
|
|
||
| resumed = data_designer.compose_workflow(name="resume-partial") | ||
| resumed.add_stage("base", _category_builder(stub_model_configs), num_records=2) | ||
| resumed.add_stage("copy", _copy_builder(stub_model_configs)) | ||
| resumed.run(resume=ResumeMode.IF_POSSIBLE) | ||
|
|
||
| assert [call.kwargs["dataset_name"] for call in create_mock.call_args_list] == ["stage-0-base", "stage-1-copy"] | ||
| assert [call.kwargs["resume"] for call in create_mock.call_args_list] == [ResumeMode.ALWAYS, ResumeMode.NEVER] |
There was a problem hiding this comment.
Partial-stage test uses already-complete metadata, not a genuine interrupted run
The test sets stage[0]["status"] = "running" on metadata that still contains fingerprint, output_seed_path, num_records_actual, etc. from the completed run. A real interrupted run writes status: "running" before output_seed_path is added (lines 337-347 in composite_workflow.py), so the scenario where prior stage metadata lacks output_seed_path is not exercised.
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer/tests/interface/test_composite_workflow.py
Line: 566-590
Comment:
**Partial-stage test uses already-complete metadata, not a genuine interrupted run**
The test sets `stage[0]["status"] = "running"` on metadata that still contains `fingerprint`, `output_seed_path`, `num_records_actual`, etc. from the completed run. A real interrupted run writes `status: "running"` before `output_seed_path` is added (lines 337-347 in `composite_workflow.py`), so the scenario where prior stage metadata lacks `output_seed_path` is not exercised.
How can I resolve this? If you propose a fix, please make it concise.
Review: PR #747 —
|
📋 Summary
Adds stage-level resume support for chained workflows so compatible completed stages can be reused, matching partial stages can continue through the existing single-stage resume path, and downstream stages rerun when upstream outputs change.
🔗 Related Issue
N/A
🔄 Changes
CompositeWorkflow.run(resume=...)with completed-stage reuse and partial-stage delegation.ResumeMode.IF_POSSIBLEfall back to fresh runs when prior metadata is unusable.🧪 Testing
make testpassesRan:
.venv/bin/ruff format ..venv/bin/ruff check packages/data-designer/src/data_designer/interface/composite_workflow.py packages/data-designer/tests/interface/test_composite_workflow.py.venv/bin/ruff format --check packages/data-designer/src/data_designer/interface/composite_workflow.py packages/data-designer/tests/interface/test_composite_workflow.py.venv/bin/pytest packages/data-designer/tests/interface/test_composite_workflow.py -q- 55 passed, 2 warnings.venv/bin/pytest /home/ubuntu/Code/reviews/DataDesigner-747/smoke_test.py -q -s- 2 passed against NVIDIA Build (nvidia/nemotron-3-nano-30b-a3b) and NVIDIA Inference (openai/openai/gpt-5.4-nano) using/home/ubuntu/Code/.envNote: full
.venv/bin/ruff check --fix .currently hits an unrelated existing generated-notebook lint indocs/colab_notebooks/7-nemotron-personas.ipynb(F404).✅ Checklist