Skip to content

feat: add workflow stage resume#747

Open
andreatgretel wants to merge 3 commits into
mainfrom
andreatgretel/feat/stage-level-resume
Open

feat: add workflow stage resume#747
andreatgretel wants to merge 3 commits into
mainfrom
andreatgretel/feat/stage-level-resume

Conversation

@andreatgretel

@andreatgretel andreatgretel commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

📋 Summary

Adds stage-level resume support for chained workflows so compatible completed stages can be reused, matching partial stages can continue through the existing single-stage resume path, and downstream stages rerun when upstream outputs change.

🔗 Related Issue

N/A

🔄 Changes

  • Add CompositeWorkflow.run(resume=...) with completed-stage reuse and partial-stage delegation.
  • Invalidate downstream stages when an upstream stage reruns, changes, or has missing selected/callback output.
  • Harden workflow metadata writes and let ResumeMode.IF_POSSIBLE fall back to fresh runs when prior metadata is unusable.
  • Add public workflow resume tests for skip, rerun, partial/failed-stage resume, callback-output, output-processor, completed-empty, corrupt metadata, and strict resume behavior.
  • Document workflow resume behavior in MkDocs and Fern docs.
  • Update the workflow chaining plan with the completed stage-level resume slice.

🧪 Testing

  • make test passes
  • Unit tests added/updated
  • E2E tests added/updated (if applicable)

Ran:

  • .venv/bin/ruff format .
  • .venv/bin/ruff check packages/data-designer/src/data_designer/interface/composite_workflow.py packages/data-designer/tests/interface/test_composite_workflow.py
  • .venv/bin/ruff format --check packages/data-designer/src/data_designer/interface/composite_workflow.py packages/data-designer/tests/interface/test_composite_workflow.py
  • .venv/bin/pytest packages/data-designer/tests/interface/test_composite_workflow.py -q - 55 passed, 2 warnings
  • .venv/bin/pytest /home/ubuntu/Code/reviews/DataDesigner-747/smoke_test.py -q -s - 2 passed against NVIDIA Build (nvidia/nemotron-3-nano-30b-a3b) and NVIDIA Inference (openai/openai/gpt-5.4-nano) using /home/ubuntu/Code/.env

Note: full .venv/bin/ruff check --fix . currently hits an unrelated existing generated-notebook lint in docs/colab_notebooks/7-nemotron-personas.ipynb (F404).

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated (if applicable)

@andreatgretel andreatgretel marked this pull request as ready for review June 11, 2026 19:58
@andreatgretel andreatgretel requested a review from a team as a code owner June 11, 2026 19:58
@github-actions

github-actions Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

MkDocs preview: https://d1e17c5c.dd-docs-preview.pages.dev

Fern preview: https://nvidia-preview-pr-747.docs.buildwithfern.com/nemo/datadesigner

Fern previews include the docs-website version archive with PR changes synced into latest. Notebook tutorials are rendered without execution outputs in previews.

@greptile-apps

greptile-apps Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR adds stage-level resume support to CompositeWorkflow.run() so that completed stages with matching fingerprints can be reused across runs, partially-executed stages can continue via DataDesigner.create(resume=ResumeMode.ALWAYS), and all stages downstream of any changed or re-run stage are automatically invalidated.

  • Adds prior_matches fingerprint comparison (incorporating upstream_fingerprint for cascading invalidation), a force_rerun_downstream flag, and _can_skip_prior_stage guards covering callback-output presence, empty parquets, and version-less callbacks.
  • _write_workflow_metadata is hardened to an atomic tmp-file + os.replace + fsync pattern, and output-processor directories are explicitly cleaned before re-running processors on a partial-stage resume.
  • Comprehensive unit tests cover skip, rerun, partial-stage delegation, empty-upstream propagation, missing callback output, and strict ResumeMode.ALWAYS rejection.

Confidence Score: 5/5

Safe to merge — the resume logic is correct, the fingerprint cascade properly invalidates downstream stages, and the metadata write is now atomic.

The stage-skip, partial-resume, and force-rerun paths are all consistent with each other. The upstream_fingerprint chain correctly propagates changes, the _can_skip_prior_stage guards cover callback-output presence and empty-parquet edge cases, and the output-processor directory cleanup during partial resumes is handled correctly. No logic errors or incorrect data flows were found.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer/src/data_designer/interface/composite_workflow.py Core implementation of stage-level resume: adds prior-metadata loading, per-stage fingerprint comparison with upstream-chain propagation, skip/partial-resume/rerun dispatch, and atomic metadata writes. Logic is consistent and edge cases are well-handled.
packages/data-designer/tests/interface/test_composite_workflow.py Adds 10 new test cases covering skip, changed-stage rerun, empty-upstream propagation, missing callback output, corrupt metadata, partial-stage delegation (running/failed), and strict ALWAYS mode rejection. Coverage is broad; mock-based tests correctly inspect resume arguments passed to create().
docs/concepts/workflow-chaining.md Adds Resume section documenting IF_POSSIBLE and ALWAYS modes, and removes the now-stale 'Stage-level resume is not implemented yet' bullet from the limits list.
fern/versions/latest/pages/concepts/workflow-chaining.mdx Mirror of the MkDocs doc change; identical Resume section added and the same limits bullet removed.
plans/workflow-chaining/workflow-chaining.md Architecture plan updated with a status note after the stage-level resume slice and a minor clarification on the post-#636 deferrals. No logic concerns.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[workflow.run resume=X] --> B[_read_prior_workflow_metadata]
    B --> C{resume == NEVER?}
    C -- Yes --> D[prior_metadata = None]
    C -- No --> E{metadata file exists?}
    E -- No + ALWAYS --> F[raise: no metadata found]
    E -- No + IF_POSSIBLE --> D
    E -- Yes --> G[Load prior_metadata]

    D & G --> H[Stage Loop]

    H --> I{skipped_upstream_stage set?}
    I -- Yes --> J[Write skipped_empty_upstream, continue]

    I -- No --> K[Compute stage_fingerprint incl. upstream_fingerprint]
    K --> L{prior_matches?}

    L -- Yes --> M{_can_skip_prior_stage?}
    M -- Yes --> N[Skip: copy prior metadata, continue]
    N -->|status==completed_empty| O[skipped_upstream_stage = stage.name]
    N --> H

    M -- No --> P{status in RESUMABLE and stage_path exists?}
    P -- Yes --> Q[stage_resume = ALWAYS]
    P -- No --> R{resume==ALWAYS AND NOT force_rerun?}
    R -- Yes --> S[raise: stage not reusable]
    R -- No --> T[stage_resume = NEVER]

    L -- No --> T

    Q & T --> U{stage_resume==NEVER and stage_path exists?}
    U -- Yes --> V[rmtree stage_path]
    U -- No --> W[Keep stage_path]

    V & W --> X[data_designer.create with stage_resume]
    X --> Y{output_processors?}
    Y -- Yes --> Z[rmtree output-processors, re-run processors]
    Y -- No --> AA[resolve output_seed_path]
    Z --> AA

    AA --> AB{output_records == 0?}
    AB -- Yes + allow_empty --> AC[completed_empty, set skipped_upstream_stage]
    AB -- No --> AD[completed]
    AB -- Yes + not allow_empty --> AE[raise: empty output]

    AC & AD --> AF[force_rerun_downstream = True, write metadata]
    AF --> H
Loading

Reviews (2): Last reviewed commit: "fix: harden workflow resume metadata" | Re-trigger Greptile

Comment on lines +306 to +325
if prior_matches and _can_skip_prior_stage(stage, prior_stage_metadata):
stage_metadata.update(prior_stage_metadata)
output_seed_path = Path(stage_metadata["output_seed_path"])
output_records = _count_parquet_records(output_seed_path)
output_result = _stage_result_from_metadata(
workflow_path=workflow_path,
stage=stage,
stage_dir_name=stage_dir_name,
stage_builder=stage_builder,
)
stage_results[stage.name] = output_result
stage_output_paths[stage.name] = output_seed_path
previous_seed_path = output_seed_path
previous_output_records = output_records
previous_stage_name = stage.name
previous_stage_fingerprint = stage_fingerprint
if stage_metadata["status"] == "completed_empty":
skipped_upstream_stage = stage.name
_write_workflow_metadata(workflow_path, metadata)
continue

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 completed_empty skip path leaves previous_output_records = 0 in metadata

When a stage was previously completed_empty and is being skipped, _count_parquet_records(output_seed_path) may return 0 (parquet files with 0 rows), assigning previous_output_records = output_records = 0. The 0 is harmless because skipped_upstream_stage is set in the same block, causing all subsequent stages to fast-path skip before any num_records computation. The relationship is implicit though — if those two assignments were ever reordered, a downstream stage could inherit previous_output_records = 0 and silently fall back to DEFAULT_NUM_RECORDS.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer/src/data_designer/interface/composite_workflow.py
Line: 306-325

Comment:
**`completed_empty` skip path leaves `previous_output_records = 0` in metadata**

When a stage was previously `completed_empty` and is being skipped, `_count_parquet_records(output_seed_path)` may return `0` (parquet files with 0 rows), assigning `previous_output_records = output_records = 0`. The `0` is harmless because `skipped_upstream_stage` is set in the same block, causing all subsequent stages to fast-path skip before any `num_records` computation. The relationship is implicit though — if those two assignments were ever reordered, a downstream stage could inherit `previous_output_records = 0` and silently fall back to `DEFAULT_NUM_RECORDS`.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +566 to +590
def test_composite_workflow_resume_if_possible_delegates_matching_partial_stage(
stub_artifact_path: Path,
stub_model_providers: list[ModelProvider],
stub_model_configs: list[ModelConfig],
stub_dataset_profiler_results,
) -> None:
data_designer = _data_designer(stub_artifact_path, stub_model_providers)
create_mock = _patch_create(data_designer, stub_dataset_profiler_results)
workflow = data_designer.compose_workflow(name="resume-partial")
workflow.add_stage("base", _category_builder(stub_model_configs), num_records=2)
workflow.add_stage("copy", _copy_builder(stub_model_configs))
workflow.run()
metadata_path = stub_artifact_path / "resume-partial" / "workflow-metadata.json"
metadata = json.loads(metadata_path.read_text(encoding="utf-8"))
metadata["stages"][0]["status"] = "running"
metadata_path.write_text(json.dumps(metadata), encoding="utf-8")
create_mock.reset_mock()

resumed = data_designer.compose_workflow(name="resume-partial")
resumed.add_stage("base", _category_builder(stub_model_configs), num_records=2)
resumed.add_stage("copy", _copy_builder(stub_model_configs))
resumed.run(resume=ResumeMode.IF_POSSIBLE)

assert [call.kwargs["dataset_name"] for call in create_mock.call_args_list] == ["stage-0-base", "stage-1-copy"]
assert [call.kwargs["resume"] for call in create_mock.call_args_list] == [ResumeMode.ALWAYS, ResumeMode.NEVER]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Partial-stage test uses already-complete metadata, not a genuine interrupted run

The test sets stage[0]["status"] = "running" on metadata that still contains fingerprint, output_seed_path, num_records_actual, etc. from the completed run. A real interrupted run writes status: "running" before output_seed_path is added (lines 337-347 in composite_workflow.py), so the scenario where prior stage metadata lacks output_seed_path is not exercised.

Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer/tests/interface/test_composite_workflow.py
Line: 566-590

Comment:
**Partial-stage test uses already-complete metadata, not a genuine interrupted run**

The test sets `stage[0]["status"] = "running"` on metadata that still contains `fingerprint`, `output_seed_path`, `num_records_actual`, etc. from the completed run. A real interrupted run writes `status: "running"` before `output_seed_path` is added (lines 337-347 in `composite_workflow.py`), so the scenario where prior stage metadata lacks `output_seed_path` is not exercised.

How can I resolve this? If you propose a fix, please make it concise.

@github-actions

Copy link
Copy Markdown
Contributor

Review: PR #747feat: add workflow stage resume

Summary

Adds stage-level resume to CompositeWorkflow.run(resume=...):

  • Reuses compatible completed stages (fingerprint match + output verifiable).
  • Delegates partial stages (running / failed) to the existing single-stage resume path via DataDesigner.create(..., resume=ResumeMode.ALWAYS).
  • Invalidates downstream stages on first re-run, change, or missing output.
  • Adds atomic, fsync'd workflow-metadata.json writes via tmp+os.replace.
  • Falls back to fresh runs in ResumeMode.IF_POSSIBLE when prior metadata is missing/corrupt; raises in ResumeMode.ALWAYS.
  • Tests for skip / rerun / partial-resume / callback-output / output-processor / completed-empty / corrupt metadata / strict-resume.
  • Docs added in both mkdocs and Fern; phase-3 plan updated.

Diff is well-scoped (5 files, +503/−10) and lives entirely in the interface layer — no engine/config touch, no import-direction violations.

Findings

Correctness — Medium

  1. output_seed_path is persisted and reused as an absolute path (composite_workflow.py:400, consumed at composite_workflow.py:309). If a user moves their artifact root (rsync, archive, container mount), prior_metadata still matches by name/stage_dir, but output_seed_path points at the old location while _stage_result_from_metadata builds an ArtifactStorage rooted at the new workflow_path. The result is a mixed-state read (old data, new storage object) or a _count_parquet_records raise — silently invalidating reuse in the best case, confusing behavior in the worst. Storing this as a path relative to workflow_path would be more robust. (Same applies to callback_output_path.)

  2. ResumeMode.ALWAYS semantics drift once any stage is re-run. The added doc says: "If a stage changed or its selected output is missing, the workflow raises instead of starting fresh." But once an ALWAYS resume successfully resumes a partial upstream stage from checkpoint (stage_resume == ALWAYS, line 328), force_rerun_downstream flips to True and the downstream elif resume == ResumeMode.ALWAYS and not force_rerun_downstream: guard (line 330) is bypassed — so a downstream stage whose fingerprint also differs from prior metadata silently runs fresh instead of raising. That may be the intended behavior, but it deviates from the doc and from the strict reading of the test_composite_workflow_resume_always_rejects_changed_stage contract. Consider tightening the doc and adding a test for "ALWAYS, upstream resumed-from-checkpoint, downstream fingerprint differs".

Correctness — Low

  1. _load_stage_analysis swallows every exception (composite_workflow.py:552):

    try:
        return DatasetProfilerResults.model_validate({...})
    except Exception:
        return None

    Bare except Exception masks both ValidationError (the only one you'd plausibly recover from) and unrelated bugs introduced by future schema changes. Narrow it to pydantic.ValidationError so a real bug doesn't decay into a silent analysis=None.

  2. stage_metadata.update(prior_stage_metadata) on the skip path (line 308) wholesale-imports every key the prior run wrote, including ones not produced by the current run (config, seed_path, seeded_from_stage, num_records_requested, duration_sec). That's by design here, but means the new metadata file's seeded_from_stage for a re-skipped stage may name a stage from the prior run that no longer exists in the current workflow definition. Fine for inspection; worth a note if later phases lean on those fields.

  3. _stage_result_from_metadata returns DatasetMetadata() — empty (line 528). The original run may have collected real DatasetMetadata. Reusing the cached stage exposes a stripped-down DatasetCreationResults to user code. Document or persist+rehydrate.

Style / Nits

  1. Unused import in TYPE_CHECKING removal: DatasetProfilerResults was lifted out of TYPE_CHECKING (line 18) because it's now used at module scope by _load_stage_analysis. That's correct, but DatasetProfilerResults is heavy; the project lazy-loads pandas already (lazy.pd). Confirm that data_designer.config.analysis.dataset_profiler doesn't drag in numpy/pandas at import — if it does, this regresses the import-time profile (make perf-import would catch it).

  2. tmp_path = path.with_name(f"{path.name}.tmp.{os.getpid()}") (line 706): if two threads in the same process both write workflow metadata for the same workflow (uncommon but possible during async cleanup), they collide on the temp filename. Adding a uuid suffix would be safer; PID alone is not unique within a process. Probably out of scope.

  3. force_rerun_downstream is largely redundant with the upstream-fingerprint chain (since each stage's fingerprint folds in upstream_fingerprint). Its real job is to short-circuit the ResumeMode.ALWAYS raise on line 330. A short comment to that effect would help future readers — right now the flag's necessity is non-obvious.

Tests

Coverage is strong. Suggestions only:

  • Add a test pinning the ALWAYS-after-partial-upstream-resume behavior (see finding DataDesigner.make_seed_reference_from_file doesn't support paths with multiple parquet partition #2) — whichever way the project decides on, lock it in.
  • test_composite_workflow_resume_if_possible_skips_stage_with_output_processors checks main-batch mtime; consider also asserting the output-processors directory wasn't touched (its mtime is the actual cache-hit signal for the output-processor work).
  • Worth a regression test for prior metadata whose stages length exceeds the current workflow's stage list — _get_prior_stage_metadata handles index >= len(stages), but the inverse case (current workflow has fewer stages than prior) isn't explicitly covered.

Security

No new attack surface. Metadata is JSON-validated on read and treated as data. The os.fsync + os.replace pattern is the right defense against torn writes from crashes mid-run.

Performance

  • One extra _count_parquet_records call per skippable stage (validation in _can_skip_prior_stage + the actual count in run). Cheap (parquet metadata, not row read), but doubling it is unnecessary — could cache the count or skip the validation since the count immediately follows.
  • _stage_result_from_metadata instantiates ArtifactStorage with resume=ResumeMode.ALWAYS, which triggers the resolved_dataset_name check. Fine for hits; on misses you'll get an ArtifactStorageError from the validator instead of going through DataDesignerWorkflowError. Consider catching and re-wrapping at the boundary so callers see a single error type per the project's "Errors normalize at boundaries" invariant.

Docs

  • docs/concepts/workflow-chaining.md and the Fern mirror are updated symmetrically — good.
  • Phase-3 plan update accurately describes the slice and remaining deferred items.
  • One precision nit: the ResumeMode.ALWAYS blurb should clarify the "first changed stage raises; downstream of a checkpoint-resumed stage runs fresh" behavior (or close the gap).

Verdict

Solid, well-tested addition. The core logic is correct and the new behavior is opt-in through resume=, so risk to existing callers is minimal. Address #1 (relative paths) before this is relied on for portable artifacts; #2 (ALWAYS doc/test) and #3 (narrow the except) are easy follow-ups. Everything else is taste.

Recommendation: approve with the path-portability and except Exception narrowing addressed (or filed as follow-ups). Other findings are non-blocking polish.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant