Skip to content

Commit 5d63a08

Browse files
danielmillerpclaude
andcommitted
feat(temporal): opt-in continue-as-new for long-lived agent workflows
Long-lived chat/session agents run as a single Temporal workflow that stays open indefinitely, so their event history grows until it hits Temporal's ~50k-event / 50MB limit and the workflow stalls. This adds an opt-in continue-as-new path that recycles the history so a session can stay open forever, plus the discipline of keeping messages/state outside workflow state so they survive the recycle. SDK (BaseWorkflow): - should_continue_as_new(): recycle decision (Temporal's is_continue_as_new_ suggested() or a configurable WORKFLOW_MAX_HISTORY_LENGTH threshold). - drain_and_continue_as_new(): waits all_handlers_finished (so an in-flight turn is never lost/duplicated at the boundary) then continue_as_new. - run_until_complete(): drop-in replacement for the usual wait_condition(timeout=None) tail; gated once behind workflow.patched() so in-flight pre-patch workflows keep the old behaviour (no non-determinism on replay). Identical behaviour unless WORKFLOW_CONTINUE_AS_NEW_ENABLED is set. - conversation_from_messages(): rebuild the conversation from the adk.messages ledger after a recycle (messages live in adk.messages, not workflow state). Config (default off, so existing agents are unaffected): - WORKFLOW_CONTINUE_AS_NEW_ENABLED (bool) - WORKFLOW_MAX_HISTORY_LENGTH (int|None) Examples: all 13 long-lived Temporal tutorial agents adopt run_until_complete. Message-based chat agents rebuild conversation from adk.messages; harness agents with an opaque session handle (claude-code, codex, claude-sdk) or rich history (pydantic-ai via ModelMessagesTypeAdapter, langgraph) persist their non-message state to adk.state and re-hydrate on recycle. Every adk.state / adk.messages round-trip is guarded by the enabled flag, so the default path is byte-for-byte unchanged. Note: continue-as-new bounds history SIZE; it does NOT extend the chain-wide WORKFLOW_EXECUTION_TIMEOUT_SECONDS (raise that to keep workflows long-lived). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent b30a90b commit 5d63a08

16 files changed

Lines changed: 699 additions & 91 deletions

File tree

examples/tutorials/10_async/10_temporal/000_hello_acp/project/workflow.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ async def on_task_create(self, params: CreateTaskParams) -> None:
6565
# 2. Wait for the task to be completed indefinitely. If we don't do this the workflow will close as soon as this function returns. Temporal can run hundreds of millions of workflows in parallel, so you don't need to worry about too many workflows running at once.
6666

6767
# Thus, if you want this agent to field events indefinitely (or for a long time) you need to wait for a condition to be met.
68-
await workflow.wait_condition(
69-
lambda: self._complete_task,
70-
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
71-
)
68+
# run_until_complete behaves exactly like the old indefinite wait_condition
69+
# by default; when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set it also recycles
70+
# event history via continue-as-new before Temporal's history limit. This
71+
# agent keeps no cross-turn conversation state, so the only thing carried
72+
# forward across a recycle is `params`.
73+
await self.run_until_complete(params, is_complete=lambda: self._complete_task)

examples/tutorials/10_async/10_temporal/010_agent_chat/project/workflow.py

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,19 @@ def __init__(self):
160160
self._complete_task = False
161161
self._state: StateModel | None = None
162162

163+
async def _rehydrate_state(self, task_id: str) -> None:
164+
"""Seed in-memory state, rebuilding the conversation from adk.messages.
165+
166+
Messages are the source of truth and live in adk.messages — NOT in workflow
167+
state. On a brand-new task the ledger is empty (fresh start); on a continued
168+
run (after continue-as-new) we reconstruct input_list from the ledger so the
169+
recycle is invisible to the user. ``conversation_from_messages`` returns []
170+
when continue-as-new is disabled, so the default path is unchanged.
171+
"""
172+
conversation = await self.conversation_from_messages(task_id)
173+
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
174+
self._state = StateModel(input_list=conversation, turn_number=turn_number)
175+
163176
@workflow.signal(name=SignalName.RECEIVE_EVENT)
164177
@override
165178
async def on_task_event_send(self, params: SendEventParams) -> None:
@@ -253,24 +266,29 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
253266
if span and self._state:
254267
span.output = self._state.model_dump()
255268

269+
# NOTE: we do NOT persist the conversation to workflow state or adk.state.
270+
# Every user + agent message is already written to the adk.messages ledger
271+
# (the user echo above + the agent auto-send), which is the source of truth.
272+
# On a continue-as-new recycle we rebuild input_list from that ledger in
273+
# _rehydrate_state — messages live in adk.messages, state in adk.state.
274+
256275
@workflow.run
257276
@override
258277
async def on_task_create(self, params: CreateTaskParams) -> None:
259278
logger.info(f"Received task create params: {params}")
260279

261-
# 1. Initialize the state. You can either do this here or in the __init__ method.
262-
# This function is triggered whenever a client creates a task for this agent.
263-
# It is not re-triggered when a new event is sent to the task.
264-
self._state = StateModel(
265-
input_list=[],
266-
turn_number=0,
267-
)
268-
269-
# 2. Wait for the task to be completed indefinitely. If we don't do this the workflow will close as soon as this function returns. Temporal can run hundreds of millions of workflows in parallel, so you don't need to worry about too many workflows running at once.
270-
271-
# Thus, if you want this agent to field events indefinitely (or for a long time) you need to wait for a condition to be met.
272-
273-
await workflow.wait_condition(
274-
lambda: self._complete_task,
275-
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
276-
)
280+
# 1. Initialize (or, on a continued run, re-hydrate) the state. This
281+
# function runs both when a client first creates the task AND when the
282+
# workflow recycles itself via continue-as-new — on the latter the
283+
# in-memory state was reset, so we reload it from the adk.messages ledger.
284+
await self._rehydrate_state(params.task.id)
285+
286+
# 2. Keep the workflow open to field events. Temporal can run hundreds of
287+
# millions of workflows in parallel, so staying open is cheap — but a single
288+
# ever-open run accumulates event history until it hits Temporal's ~50k-event
289+
# / 50MB limit. run_until_complete keeps the workflow open exactly like the
290+
# old indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set)
291+
# recycles history via continue-as-new before that limit. The conversation
292+
# is rebuilt from adk.messages on each run, so the carry-forward is just
293+
# `params`.
294+
await self.run_until_complete(params, is_complete=lambda: self._complete_task)

examples/tutorials/10_async/10_temporal/050_agent_chat_guardrails/project/workflow.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -455,14 +455,16 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
455455
async def on_task_create(self, params: CreateTaskParams) -> None:
456456
logger.info(f"Received task create params: {params}")
457457

458-
# 1. Initialize the state. You can either do this here or in the
459-
# __init__ method. This function is triggered whenever a client
460-
# creates a task for this agent. It is not re-triggered when a new
461-
# event is sent to the task.
462-
self._state = StateModel(
463-
input_list=[],
464-
turn_number=0,
465-
)
458+
# 1. Initialize (or, on a continued run, re-hydrate) the state. Messages
459+
# are the source of truth and live in adk.messages — NOT in workflow
460+
# state. On a brand-new task the ledger is empty (fresh start); on a
461+
# continued run (after continue-as-new) we reconstruct input_list from
462+
# the ledger so the recycle is invisible to the user.
463+
# conversation_from_messages returns [] when continue-as-new is disabled,
464+
# so the default path is unchanged.
465+
conversation = await self.conversation_from_messages(params.task.id)
466+
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
467+
self._state = StateModel(input_list=conversation, turn_number=turn_number)
466468

467469
# 2. Wait for the task to be completed indefinitely. If we don't do
468470
# this the workflow will close as soon as this function returns.
@@ -472,10 +474,9 @@ async def on_task_create(self, params: CreateTaskParams) -> None:
472474
# Thus, if you want this agent to field events indefinitely (or for
473475
# a long time) you need to wait for a condition to be met.
474476

475-
await workflow.wait_condition(
476-
lambda: self._complete_task,
477-
timeout=None, # Set a timeout if you want to prevent the task
478-
# from running indefinitely. Generally this is not needed.
479-
# Temporal can run hundreds of millions of workflows in parallel
480-
# and more. Only do this if you have a specific reason to do so.
481-
)
477+
# run_until_complete keeps the workflow open exactly like the old
478+
# indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set)
479+
# recycles event history via continue-as-new before Temporal's
480+
# ~50k-event / 50MB limit. The conversation is rebuilt from adk.messages
481+
# on each run, so the carry-forward is just `params`.
482+
await self.run_until_complete(params, is_complete=lambda: self._complete_task)

examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -258,12 +258,16 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
258258
# ============================================================================
259259
# WORKFLOW INITIALIZATION: Initialize State
260260
# ============================================================================
261-
# Initialize the conversation state with an empty history
262-
# This will be populated as the conversation progresses
263-
self._state = StateModel(
264-
input_list=[],
265-
turn_number=0,
266-
)
261+
# Initialize (or, on a continued run, re-hydrate) the conversation state.
262+
# Messages are the source of truth and live in adk.messages — NOT in
263+
# workflow state. On a brand-new task the ledger is empty (fresh start);
264+
# on a continued run (after continue-as-new) we reconstruct input_list
265+
# from the ledger so the recycle is invisible to the user.
266+
# conversation_from_messages returns [] when continue-as-new is disabled,
267+
# so the default path is unchanged.
268+
conversation = await self.conversation_from_messages(params.task.id)
269+
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
270+
self._state = StateModel(input_list=conversation, turn_number=turn_number)
267271

268272
# ============================================================================
269273
# WORKFLOW INITIALIZATION: Send Welcome Message
@@ -294,10 +298,12 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
294298
# - Temporal can handle millions of such concurrent workflows
295299
# - If worker crashes, workflow resumes exactly where it left off
296300
# - All conversation state is preserved in Temporal's event log
297-
await workflow.wait_condition(
298-
lambda: self._complete_task,
299-
timeout=None, # No timeout = truly long-running agent conversation
300-
)
301+
# run_until_complete keeps the workflow open exactly like the old
302+
# indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set)
303+
# recycles event history via continue-as-new before Temporal's
304+
# ~50k-event / 50MB limit. The conversation is rebuilt from adk.messages
305+
# on each run, so the carry-forward is just `params`.
306+
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
301307
return "Agent conversation completed"
302308

303309
@workflow.signal

examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -331,10 +331,17 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
331331
async def on_task_create(self, params: CreateTaskParams) -> str:
332332
logger.info(f"Received task create params: {params}")
333333

334-
# Initialize the conversation state with an empty history
334+
# Initialize (or, on a continued run, re-hydrate) the conversation state.
335+
# This runs both when a client first creates the task AND when the workflow
336+
# recycles itself via continue-as-new — on the latter the in-memory state was
337+
# reset, so we rebuild input_list from the adk.messages ledger (the source of
338+
# truth). conversation_from_messages returns [] when continue-as-new is
339+
# disabled, so the default path is an empty fresh start as before.
340+
conversation = await self.conversation_from_messages(params.task.id)
341+
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
335342
self._state = StateModel(
336-
input_list=[],
337-
turn_number=0,
343+
input_list=conversation,
344+
turn_number=turn_number,
338345
)
339346

340347
# 1. Acknowledge that the task has been created.
@@ -346,10 +353,13 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
346353
),
347354
)
348355

349-
await workflow.wait_condition(
350-
lambda: self._complete_task,
351-
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
352-
)
356+
# Keep the workflow open to field events. run_until_complete behaves exactly
357+
# like the old indefinite wait_condition, and (when
358+
# WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) recycles event history via
359+
# continue-as-new before Temporal's ~50k-event / 50MB limit. Messages live in
360+
# adk.messages and are rebuilt above, so the only state carried forward is
361+
# params.
362+
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
353363
return "Task completed"
354364

355365
@workflow.signal

examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,24 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
216216
"""
217217
logger.info(f"Received task create params: {params}")
218218

219-
# Initialize the conversation state with an empty history
219+
# Initialize (or, on a continued run, re-hydrate) the conversation state.
220+
# This runs both when a client first creates the task AND when the workflow
221+
# recycles itself via continue-as-new — on the latter the in-memory state was
222+
# reset, so we rebuild input_list from the adk.messages ledger (the source of
223+
# truth). conversation_from_messages returns [] when continue-as-new is
224+
# disabled, so the default path is an empty fresh start as before.
225+
#
226+
# The only cross-turn parent state here is input_list/turn_number; the
227+
# human-approval state lives entirely in the child workflow spawned by
228+
# wait_for_confirmation (it waits for fulfill_order_signal), and an in-flight
229+
# approval turn keeps on_task_event_send running — run_until_complete drains
230+
# all in-flight handlers before recycling, so it is never cut off. Hence
231+
# conversation_from_messages alone is enough; no adk.state is needed.
232+
conversation = await self.conversation_from_messages(params.task.id)
233+
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
220234
self._state = StateModel(
221-
input_list=[],
222-
turn_number=0,
235+
input_list=conversation,
236+
turn_number=turn_number,
223237
)
224238

225239
# Send welcome message when task is created
@@ -231,12 +245,13 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
231245
),
232246
)
233247

234-
# Keep workflow running indefinitely to handle user messages and human approvals
235-
# This survives system failures and can resume exactly where it left off
236-
await workflow.wait_condition(
237-
lambda: self._complete_task,
238-
timeout=None, # No timeout for long-running human-in-the-loop workflows
239-
)
248+
# Keep the workflow open to handle user messages and human approvals.
249+
# run_until_complete behaves exactly like the old indefinite wait_condition,
250+
# and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) recycles event history
251+
# via continue-as-new before Temporal's ~50k-event / 50MB limit, draining any
252+
# in-flight approval turn first. Messages live in adk.messages and are rebuilt
253+
# above, so the only state carried forward is params.
254+
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
240255
return "Task completed"
241256

242257
# TEMPORAL UI (localhost:8080):

examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/workflow.py

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,64 @@ def __init__(self):
8686
self._trace_id = None
8787
self._parent_span_id = None
8888
self._workspace_path = None
89+
# adk.state row id for the opaque session handle (continue-as-new only).
90+
self._state_id: str | None = None
91+
92+
async def _rehydrate_state(self, task_id: str) -> None:
93+
"""Initialize state, restoring the Claude session handle on a continued run.
94+
95+
``claude_session_id`` is an opaque CLI handle that maintains conversation
96+
context — it CANNOT be rebuilt from adk.messages, so to survive a
97+
continue-as-new recycle it is persisted to adk.state (the sanctioned home
98+
for non-message state). When continue-as-new is disabled (the default) this
99+
makes ZERO extra activity calls and behaves exactly like the old fresh init.
100+
"""
101+
if self._continue_as_new_enabled and environment_variables.AGENT_ID:
102+
existing = await adk.state.get_by_task_and_agent(
103+
task_id=task_id,
104+
agent_id=environment_variables.AGENT_ID,
105+
)
106+
if existing is not None:
107+
self._state_id = existing.id
108+
self._state = StateModel(
109+
claude_session_id=existing.state.get("claude_session_id"),
110+
turn_number=existing.state.get("turn_number", 0),
111+
)
112+
return
113+
# Fresh start (default path).
114+
self._state = StateModel(
115+
claude_session_id=None,
116+
turn_number=0,
117+
)
118+
119+
async def _persist_state(self, task_id: str) -> None:
120+
"""Persist the opaque Claude session handle + turn number to adk.state.
121+
122+
Guarded by continue-as-new + AGENT_ID so the default path makes ZERO extra
123+
activity calls. Creates the row on first write, updates it thereafter.
124+
"""
125+
if not (self._continue_as_new_enabled and environment_variables.AGENT_ID):
126+
return
127+
if self._state is None:
128+
return
129+
state_payload = {
130+
"claude_session_id": self._state.claude_session_id,
131+
"turn_number": self._state.turn_number,
132+
}
133+
if self._state_id is None:
134+
created = await adk.state.create(
135+
task_id=task_id,
136+
agent_id=environment_variables.AGENT_ID,
137+
state=state_payload,
138+
)
139+
self._state_id = created.id
140+
else:
141+
await adk.state.update(
142+
state_id=self._state_id,
143+
task_id=task_id,
144+
agent_id=environment_variables.AGENT_ID,
145+
state=state_payload,
146+
)
89147

90148
@workflow.signal(name=SignalName.RECEIVE_EVENT)
91149
async def on_task_event_send(self, params: SendEventParams):
@@ -171,6 +229,10 @@ async def on_task_event_send(self, params: SendEventParams):
171229
# Response already streamed to UI by activity - no need to send again
172230
logger.debug(f"Turn {self._state.turn_number} completed successfully")
173231

232+
# Persist the opaque session handle so it survives a
233+
# continue-as-new recycle (no-op when the feature is disabled).
234+
await self._persist_state(params.task.id)
235+
174236
except Exception as e:
175237
logger.error(f"Error running Claude agent: {e}", exc_info=True)
176238
# Send error message to user
@@ -189,11 +251,9 @@ async def on_task_create(self, params: CreateTaskParams):
189251

190252
logger.info(f"Creating Claude MVP workflow for task: {params.task.id}")
191253

192-
# Initialize state with session tracking
193-
self._state = StateModel(
194-
claude_session_id=None,
195-
turn_number=0,
196-
)
254+
# Initialize state, or (on a continue-as-new recycle) re-hydrate the
255+
# opaque Claude session handle from adk.state. Default path is a fresh init.
256+
await self._rehydrate_state(params.task.id)
197257

198258
# Create workspace via activity (avoids determinism issues with file I/O)
199259
workspace_root = os.environ.get("CLAUDE_WORKSPACE_ROOT")
@@ -223,12 +283,12 @@ async def on_task_create(self, params: CreateTaskParams):
223283
)
224284
)
225285

226-
# Wait for completion signal
286+
# Keep the workflow open to field events. run_until_complete behaves
287+
# exactly like the old indefinite wait unless continue-as-new is enabled,
288+
# in which case it recycles event history before Temporal's limit. The
289+
# opaque session handle is carried across recycles via adk.state.
227290
logger.info("Waiting for task completion...")
228-
await workflow.wait_condition(
229-
lambda: self._complete_task,
230-
timeout=None, # Long-running workflow
231-
)
291+
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
232292

233293
logger.info("Claude MVP workflow completed")
234294
return "Task completed successfully"

0 commit comments

Comments
 (0)