Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a5cf61c
Add Checkpoint member to SuperStepStartInfo
Mar 9, 2026
a81c3e2
initial proposed solution from Claude. Causes unit tests to fail, lik…
Mar 10, 2026
d1b4449
Fix Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync definition
Mar 10, 2026
151f7db
fix Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync definition
Mar 10, 2026
30380d6
update Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAsyn…
Mar 10, 2026
61fb686
All unit tests now passing.
Mar 10, 2026
4c13736
Add a couple more unit tests as well as fix failed InProcessRun_State…
Mar 10, 2026
1f859cc
Update dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Checkpoin…
elgold92 Mar 10, 2026
4c44228
Address GitHubCopilot's comments on CheckpointParentTest.cs. All mino…
Mar 10, 2026
71f50f0
Merge branch 'main' into ericgold/CheckpointOnSuperStepStarted
elgold92 Mar 10, 2026
9f4de94
minor suggestions from Copilot
Mar 11, 2026
90d7b08
Merge branch 'ericgold/CheckpointOnSuperStepStarted' of https://githu…
Mar 11, 2026
3bf2b1c
Merge branch 'main' into ericgold/CheckpointOnSuperStepStarted
elgold92 Mar 11, 2026
386a5df
resolve 1 more of Copilot's test comments
Mar 11, 2026
2fcf732
Update dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/Checkpoin…
elgold92 Mar 11, 2026
f797794
Merge branch 'main' into ericgold/CheckpointOnSuperStepStarted
elgold92 Mar 11, 2026
3b8cb64
Merge branch 'main' into ericgold/CheckpointOnSuperStepStarted
elgold92 Mar 15, 2026
7999edc
Merge branch 'main' into ericgold/CheckpointOnSuperStepStarted
elgold92 Mar 19, 2026
894297a
Merge branch 'main' into ericgold/CheckpointOnSuperStepStarted
elgold92 Mar 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ internal sealed class InProcStepTracer : IStepTracer
/// <param name="lastStepNumber">The Step Number of the last SuperStep. Note that Step Numbers are 0-indexed.</param>
public void Reload(int lastStepNumber = 0) => this._nextStepNumber = lastStepNumber + 1;

public SuperStepStartedEvent Advance(StepContext step)
public SuperStepStartedEvent Advance(StepContext step, CheckpointInfo? startCheckpoint = null)
{
this._nextStepNumber++;
this.Activated.Clear();
Expand All @@ -57,7 +57,8 @@ public SuperStepStartedEvent Advance(StepContext step)

return new SuperStepStartedEvent(this.StepNumber, new SuperStepStartInfo(sendingExecutors)
{
HasExternalMessages = hasExternalMessages
HasExternalMessages = hasExternalMessages,
Checkpoint = startCheckpoint
});
}

Expand Down
13 changes: 10 additions & 3 deletions dotnet/src/Microsoft.Agents.AI.Workflows/InProc/InProcessRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ await executor.ExecuteCoreAsync(

private async ValueTask RunSuperstepAsync(StepContext currentStep, CancellationToken cancellationToken)
{
await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep)).ConfigureAwait(false);
// Save a checkpoint before the superstep executes, capturing the pre-delivery state.
await this.CheckpointAsync(currentStep, cancellationToken).ConfigureAwait(false);
CheckpointInfo? startCheckpoint = this.StepTracer.Checkpoint;

await this.RaiseWorkflowEventAsync(this.StepTracer.Advance(currentStep, startCheckpoint)).ConfigureAwait(false);

// Deliver the messages and queue the next step
List<Task> receiverTasks =
Expand Down Expand Up @@ -278,7 +282,7 @@ await this.RaiseWorkflowEventAsync(this.StepTracer.Complete(this.RunContext.Next
private WorkflowInfo? _workflowInfoCache;
private CheckpointInfo? _lastCheckpointInfo;
private readonly List<CheckpointInfo> _checkpoints = [];
internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = default)
internal async ValueTask CheckpointAsync(StepContext? queuedMessagesOverride, CancellationToken cancellationToken = default)
{
this.RunContext.CheckEnded();
if (this.CheckpointManager is null)
Expand All @@ -299,7 +303,7 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d
await prepareTask.ConfigureAwait(false);
await this.RunContext.StateManager.PublishUpdatesAsync(this.StepTracer).ConfigureAwait(false);

RunnerStateData runnerData = await this.RunContext.ExportStateAsync().ConfigureAwait(false);
RunnerStateData runnerData = await this.RunContext.ExportStateAsync(queuedMessagesOverride).ConfigureAwait(false);
Dictionary<ScopeKey, PortableValue> stateData = await this.RunContext.StateManager.ExportStateAsync().ConfigureAwait(false);

Checkpoint checkpoint = new(this.StepTracer.StepNumber, this._workflowInfoCache, runnerData, stateData, edgeData, this._lastCheckpointInfo);
Expand All @@ -308,6 +312,9 @@ internal async ValueTask CheckpointAsync(CancellationToken cancellationToken = d
this._checkpoints.Add(this._lastCheckpointInfo);
}

internal ValueTask CheckpointAsync(CancellationToken cancellationToken = default)
=> this.CheckpointAsync(null, cancellationToken);

public async ValueTask RestoreCheckpointAsync(CheckpointInfo checkpointInfo, CancellationToken cancellationToken = default)
{
this.RunContext.CheckEnded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,13 @@ async Task InvokeCheckpointRestoredAsync(Task<Executor> executorTask)
}
}

internal ValueTask<RunnerStateData> ExportStateAsync()
internal ValueTask<RunnerStateData> ExportStateAsync(StepContext? queuedMessagesOverride = null)
{
this.CheckEnded();

Dictionary<string, List<PortableMessageEnvelope>> queuedMessages = this._nextStep.ExportMessages();
StepContext queuedMessagesSource = queuedMessagesOverride ?? this._nextStep;
Dictionary<string, List<PortableMessageEnvelope>> queuedMessages = queuedMessagesSource.ExportMessages();

RunnerStateData result = new(instantiatedExecutors: [.. this._executors.Keys],
queuedMessages,
outstandingRequests: [.. this._externalRequests.Values]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,10 @@ public sealed class SuperStepStartInfo(HashSet<string>? sendingExecutors = null)
/// Gets a value indicating whether there are any external messages queued during the previous SuperStep.
/// </summary>
public bool HasExternalMessages { get; init; }

/// <summary>
/// Gets the <see cref="CheckpointInfo"/> corresponding to the checkpoint created at the start of this SuperStep, if any.
/// <see langword="null"/> if checkpointing was not enabled when the run was started.
/// </summary>
public CheckpointInfo? Checkpoint { get; init; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();

// Act
StreamingRun run =
await using StreamingRun run =
await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");

List<CheckpointInfo> checkpoints = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
checkpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
checkpoints.Add(cp);
}
Expand All @@ -51,6 +55,14 @@ internal async Task Checkpoint_FirstCheckpoint_ShouldHaveNullParentAsync(Executi
Checkpoint storedFirst = await ((ICheckpointManager)checkpointManager)
.LookupCheckpointAsync(firstCheckpoint.SessionId, firstCheckpoint);
storedFirst.Parent.Should().BeNull("the first checkpoint should have no parent");

// Assert: The second checkpoint should have 1 parent, the first checkpoint.
checkpoints.Should().HaveCountGreaterThanOrEqualTo(2, "multiple checkpoints should have been created, and we can't verify parent checkpoint without at least 2 checkpoints present.");
CheckpointInfo secondCheckpoint = checkpoints[1];
Checkpoint storedSecond = await ((ICheckpointManager)checkpointManager)
.LookupCheckpointAsync(secondCheckpoint.SessionId, secondCheckpoint);
storedSecond.Parent.Should().NotBeNull("the second checkpoint should have a parent");
storedSecond.Parent.Should().Be(firstCheckpoint, "the second checkpoint's parent should be the first checkpoint");
}

[Theory]
Expand Down Expand Up @@ -79,13 +91,18 @@ internal async Task Checkpoint_SubsequentCheckpoints_ShouldChainParentsAsync(Exe

await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
checkpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
checkpoints.Add(cp);
if (checkpoints.Count >= 3)
{
cts.Cancel();
}
}

if (checkpoints.Count >= 3)
{
cts.Cancel();
}
}

Expand Down Expand Up @@ -126,19 +143,24 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();

// First run: collect a checkpoint to resume from
await using StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");
StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");

List<CheckpointInfo> firstRunCheckpoints = [];
using CancellationTokenSource cts = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync(cts.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
firstRunCheckpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
firstRunCheckpoints.Add(cp);
if (firstRunCheckpoints.Count >= 2)
{
cts.Cancel();
}
}

if (firstRunCheckpoints.Count >= 2)
{
cts.Cancel();
}
}

Expand All @@ -149,19 +171,24 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs
await run.DisposeAsync();

// Act: Resume from the first checkpoint
StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint);
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint);

List<CheckpointInfo> resumedCheckpoints = [];
using CancellationTokenSource cts2 = new();
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync(cts2.Token))
{
if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
resumedCheckpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
resumedCheckpoints.Add(cp);
if (resumedCheckpoints.Count >= 1)
{
cts2.Cancel();
}
}

if (resumedCheckpoints.Count >= 1)
{
cts2.Cancel();
}
}

Expand All @@ -172,4 +199,118 @@ internal async Task Checkpoint_AfterResume_ShouldHaveResumedCheckpointAsParentAs
storedResumed.Parent.Should().NotBeNull("checkpoint created after resume should have a parent");
storedResumed.Parent.Should().Be(resumePoint, "checkpoint after resume should reference the checkpoint we resumed from");
}

[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
internal async Task Checkpoint_AfterResumeFromSuperstepStart_CountCheckpointsEmittedAsync(ExecutionEnvironment environment)
{
// Arrange: A basic workflow with 3 executor stages
ForwardMessageExecutor<string> executorA = new("A");
ForwardMessageExecutor<string> executorB = new("B");
ForwardMessageExecutor<string> executorC = new("C");

Workflow workflow = new WorkflowBuilder(executorA)
.AddEdge(executorA, executorB)
.AddEdge(executorB, executorC)
.Build();

CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();

// First run: collect a checkpoint to resume from
StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");

List<CheckpointInfo> firstRunCheckpoints = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
firstRunCheckpoints.Add(startCp);
}
}

firstRunCheckpoints.Should().HaveCount(3);
CheckpointInfo resumePoint = firstRunCheckpoints[1];

// Dispose the first run to release workflow ownership before resuming.
await run.DisposeAsync();

// Act: Resume from the second checkpoint
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint);

List<CheckpointInfo> resumedCheckpoints = [];
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync())
{
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
resumedCheckpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
resumedCheckpoints.Add(cp);
}
}

// Assert: The workflow should save the right number of checkpoints on re-run.
resumedCheckpoints.Should().NotBeEmpty();
resumedCheckpoints.Should().HaveCount(4, "the resumed workflow has 2 executors to run, each generating 2 checkpoints");
}

[Theory]
[InlineData(ExecutionEnvironment.InProcess_Lockstep)]
[InlineData(ExecutionEnvironment.InProcess_OffThread)]
internal async Task Checkpoint_AfterResumeFromSuperstepCompleted_CountCheckpointsEmittedAsync(ExecutionEnvironment environment)
{
// Arrange: A basic workflow with 3 executor stages
ForwardMessageExecutor<string> executorA = new("A");
ForwardMessageExecutor<string> executorB = new("B");
ForwardMessageExecutor<string> executorC = new("C");

Workflow workflow = new WorkflowBuilder(executorA)
.AddEdge(executorA, executorB)
.AddEdge(executorB, executorC)
.Build();

CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
InProcessExecutionEnvironment env = environment.ToWorkflowExecutionEnvironment();

// First run: collect a checkpoint to resume from
StreamingRun run = await env.WithCheckpointing(checkpointManager).RunStreamingAsync(workflow, "Hello");

List<CheckpointInfo> firstRunCheckpoints = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is SuperStepCompletedEvent completedEvent && completedEvent.CompletionInfo?.Checkpoint is { } completedCp)
{
firstRunCheckpoints.Add(completedCp);
}
}

firstRunCheckpoints.Should().HaveCount(3);
CheckpointInfo resumePoint = firstRunCheckpoints[1];

// Dispose the first run to release workflow ownership before resuming.
await run.DisposeAsync();

// Act: Resume from the second checkpoint
await using StreamingRun resumed = await env.WithCheckpointing(checkpointManager).ResumeStreamingAsync(workflow, resumePoint);

List<CheckpointInfo> resumedCheckpoints = [];
await foreach (WorkflowEvent evt in resumed.WatchStreamAsync())
{
if (evt is SuperStepStartedEvent superStepStartEvt && superStepStartEvt.StartInfo?.Checkpoint is { } startCp)
{
resumedCheckpoints.Add(startCp);
}
else if (evt is SuperStepCompletedEvent stepEvt && stepEvt.CompletionInfo?.Checkpoint is { } cp)
{
resumedCheckpoints.Add(cp);
}
}

// Assert: The workflow should save the right number of checkpoints on re-run.
resumedCheckpoints.Should().NotBeEmpty();
resumedCheckpoints.Should().HaveCount(2, "the resumed workflow has 1 executor to run, generating 2 checkpoints");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public async Task InProcessRun_StateShouldPersist_CheckpointedAsync()

Run checkpointed = await InProcessExecution.RunAsync<TurnToken>(workflow, new(), CheckpointManager.Default);

checkpointed.Checkpoints.Should().HaveCount(4);
checkpointed.Checkpoints.Should().HaveCount(8);

RunStatus status = await checkpointed.GetStatusAsync();
status.Should().Be(RunStatus.Idle);
Expand Down