Skip to content

Commit 65ab89a

Browse files
danielmillerpclaude
andcommitted
feat(temporal): opt-in continue-as-new for long-lived agent workflows
Long-lived chat/session agents run as a single Temporal workflow that stays open indefinitely, so their event history grows until it hits Temporal's ~50k-event / 50MB limit and the workflow stalls. This adds an opt-in continue-as-new path that recycles the history so a session can stay open forever, plus the discipline of keeping messages/state outside workflow state so they survive the recycle. SDK (BaseWorkflow): - should_continue_as_new(): recycle decision (Temporal's is_continue_as_new_ suggested() or a configurable WORKFLOW_MAX_HISTORY_LENGTH threshold). - drain_and_continue_as_new(): waits all_handlers_finished (so an in-flight turn is never lost/duplicated at the boundary) then continue_as_new. - run_until_complete(): drop-in replacement for the usual wait_condition(timeout=None) tail; gated once behind workflow.patched() so in-flight pre-patch workflows keep the old behaviour (no non-determinism on replay). Identical behaviour unless WORKFLOW_CONTINUE_AS_NEW_ENABLED is set. - conversation_from_messages(): rebuild the conversation from the adk.messages ledger after a recycle (messages live in adk.messages, not workflow state). Config (default off, so existing agents are unaffected): - WORKFLOW_CONTINUE_AS_NEW_ENABLED (bool) - WORKFLOW_MAX_HISTORY_LENGTH (int|None) Examples: all 13 long-lived Temporal tutorial agents adopt run_until_complete. Message-based chat agents rebuild conversation from adk.messages; harness agents with an opaque session handle (claude-code, codex, claude-sdk) or rich history (pydantic-ai via ModelMessagesTypeAdapter, langgraph) persist their non-message state to adk.state and re-hydrate on recycle. Every adk.state / adk.messages round-trip is guarded by the enabled flag, so the default path is byte-for-byte unchanged. Note: continue-as-new bounds history SIZE; it does NOT extend the chain-wide WORKFLOW_EXECUTION_TIMEOUT_SECONDS (raise that to keep workflows long-lived). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent b30a90b commit 65ab89a

18 files changed

Lines changed: 1023 additions & 195 deletions

File tree

examples/tutorials/10_async/10_temporal/000_hello_acp/project/workflow.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ async def on_task_create(self, params: CreateTaskParams) -> None:
6565
# 2. Wait for the task to be completed indefinitely. If we don't do this the workflow will close as soon as this function returns. Temporal can run hundreds of millions of workflows in parallel, so you don't need to worry about too many workflows running at once.
6666

6767
# Thus, if you want this agent to field events indefinitely (or for a long time) you need to wait for a condition to be met.
68-
await workflow.wait_condition(
69-
lambda: self._complete_task,
70-
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
71-
)
68+
# run_until_complete behaves exactly like the old indefinite wait_condition
69+
# by default; when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set it also recycles
70+
# event history via continue-as-new before Temporal's history limit. This
71+
# agent keeps no cross-turn conversation state, so the only thing carried
72+
# forward across a recycle is `params`.
73+
await self.run_until_complete(params, is_complete=lambda: self._complete_task)

examples/tutorials/10_async/10_temporal/010_agent_chat/project/workflow.py

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,19 @@ def __init__(self):
160160
self._complete_task = False
161161
self._state: StateModel | None = None
162162

163+
async def _rehydrate_state(self, task_id: str) -> None:
164+
"""Seed in-memory state, rebuilding the conversation from adk.messages.
165+
166+
Messages are the source of truth and live in adk.messages — NOT in workflow
167+
state. On a brand-new task the ledger is empty (fresh start); on a continued
168+
run (after continue-as-new) we reconstruct input_list from the ledger so the
169+
recycle is invisible to the user. ``conversation_from_messages`` returns []
170+
when continue-as-new is disabled, so the default path is unchanged.
171+
"""
172+
conversation = await self.conversation_from_messages(task_id)
173+
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
174+
self._state = StateModel(input_list=conversation, turn_number=turn_number)
175+
163176
@workflow.signal(name=SignalName.RECEIVE_EVENT)
164177
@override
165178
async def on_task_event_send(self, params: SendEventParams) -> None:
@@ -253,24 +266,29 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
253266
if span and self._state:
254267
span.output = self._state.model_dump()
255268

269+
# NOTE: we do NOT persist the conversation to workflow state or adk.state.
270+
# Every user + agent message is already written to the adk.messages ledger
271+
# (the user echo above + the agent auto-send), which is the source of truth.
272+
# On a continue-as-new recycle we rebuild input_list from that ledger in
273+
# _rehydrate_state — messages live in adk.messages, state in adk.state.
274+
256275
@workflow.run
257276
@override
258277
async def on_task_create(self, params: CreateTaskParams) -> None:
259278
logger.info(f"Received task create params: {params}")
260279

261-
# 1. Initialize the state. You can either do this here or in the __init__ method.
262-
# This function is triggered whenever a client creates a task for this agent.
263-
# It is not re-triggered when a new event is sent to the task.
264-
self._state = StateModel(
265-
input_list=[],
266-
turn_number=0,
267-
)
268-
269-
# 2. Wait for the task to be completed indefinitely. If we don't do this the workflow will close as soon as this function returns. Temporal can run hundreds of millions of workflows in parallel, so you don't need to worry about too many workflows running at once.
270-
271-
# Thus, if you want this agent to field events indefinitely (or for a long time) you need to wait for a condition to be met.
272-
273-
await workflow.wait_condition(
274-
lambda: self._complete_task,
275-
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
276-
)
280+
# 1. Initialize (or, on a continued run, re-hydrate) the state. This
281+
# function runs both when a client first creates the task AND when the
282+
# workflow recycles itself via continue-as-new — on the latter the
283+
# in-memory state was reset, so we reload it from the adk.messages ledger.
284+
await self._rehydrate_state(params.task.id)
285+
286+
# 2. Keep the workflow open to field events. Temporal can run hundreds of
287+
# millions of workflows in parallel, so staying open is cheap — but a single
288+
# ever-open run accumulates event history until it hits Temporal's ~50k-event
289+
# / 50MB limit. run_until_complete keeps the workflow open exactly like the
290+
# old indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set)
291+
# recycles history via continue-as-new before that limit. The conversation
292+
# is rebuilt from adk.messages on each run, so the carry-forward is just
293+
# `params`.
294+
await self.run_until_complete(params, is_complete=lambda: self._complete_task)

examples/tutorials/10_async/10_temporal/050_agent_chat_guardrails/project/workflow.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -455,14 +455,16 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
455455
async def on_task_create(self, params: CreateTaskParams) -> None:
456456
logger.info(f"Received task create params: {params}")
457457

458-
# 1. Initialize the state. You can either do this here or in the
459-
# __init__ method. This function is triggered whenever a client
460-
# creates a task for this agent. It is not re-triggered when a new
461-
# event is sent to the task.
462-
self._state = StateModel(
463-
input_list=[],
464-
turn_number=0,
465-
)
458+
# 1. Initialize (or, on a continued run, re-hydrate) the state. Messages
459+
# are the source of truth and live in adk.messages — NOT in workflow
460+
# state. On a brand-new task the ledger is empty (fresh start); on a
461+
# continued run (after continue-as-new) we reconstruct input_list from
462+
# the ledger so the recycle is invisible to the user.
463+
# conversation_from_messages returns [] when continue-as-new is disabled,
464+
# so the default path is unchanged.
465+
conversation = await self.conversation_from_messages(params.task.id)
466+
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
467+
self._state = StateModel(input_list=conversation, turn_number=turn_number)
466468

467469
# 2. Wait for the task to be completed indefinitely. If we don't do
468470
# this the workflow will close as soon as this function returns.
@@ -472,10 +474,9 @@ async def on_task_create(self, params: CreateTaskParams) -> None:
472474
# Thus, if you want this agent to field events indefinitely (or for
473475
# a long time) you need to wait for a condition to be met.
474476

475-
await workflow.wait_condition(
476-
lambda: self._complete_task,
477-
timeout=None, # Set a timeout if you want to prevent the task
478-
# from running indefinitely. Generally this is not needed.
479-
# Temporal can run hundreds of millions of workflows in parallel
480-
# and more. Only do this if you have a specific reason to do so.
481-
)
477+
# run_until_complete keeps the workflow open exactly like the old
478+
# indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set)
479+
# recycles event history via continue-as-new before Temporal's
480+
# ~50k-event / 50MB limit. The conversation is rebuilt from adk.messages
481+
# on each run, so the carry-forward is just `params`.
482+
await self.run_until_complete(params, is_complete=lambda: self._complete_task)

examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -258,29 +258,34 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
258258
# ============================================================================
259259
# WORKFLOW INITIALIZATION: Initialize State
260260
# ============================================================================
261-
# Initialize the conversation state with an empty history
262-
# This will be populated as the conversation progresses
263-
self._state = StateModel(
264-
input_list=[],
265-
turn_number=0,
266-
)
261+
# Initialize (or, on a continued run, re-hydrate) the conversation state.
262+
# Messages are the source of truth and live in adk.messages — NOT in
263+
# workflow state. On a brand-new task the ledger is empty (fresh start);
264+
# on a continued run (after continue-as-new) we reconstruct input_list
265+
# from the ledger so the recycle is invisible to the user.
266+
# conversation_from_messages returns [] when continue-as-new is disabled,
267+
# so the default path is unchanged.
268+
conversation = await self.conversation_from_messages(params.task.id)
269+
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
270+
self._state = StateModel(input_list=conversation, turn_number=turn_number)
267271

268272
# ============================================================================
269273
# WORKFLOW INITIALIZATION: Send Welcome Message
270274
# ============================================================================
271275
# Acknowledge that the task has been created and the agent is ready.
272276
# This message appears once when the conversation starts.
273-
await adk.messages.create(
274-
task_id=params.task.id,
275-
content=TextContent(
276-
author="agent",
277-
content=f"🌸 Hello! I'm your Haiku Assistant, powered by OpenAI Agents SDK + Temporal! 🌸\n\n"
278-
f"I'll respond to all your messages in beautiful haiku form. "
279-
f"This conversation is now durable - even if I restart, our chat continues!\n\n"
280-
f"Task created with params:\n{json.dumps(params.params, indent=2)}\n\n"
281-
f"Send me a message and I'll respond with a haiku! 🎋",
282-
),
283-
)
277+
if not self.is_continued_run():
278+
await adk.messages.create(
279+
task_id=params.task.id,
280+
content=TextContent(
281+
author="agent",
282+
content=f"🌸 Hello! I'm your Haiku Assistant, powered by OpenAI Agents SDK + Temporal! 🌸\n\n"
283+
f"I'll respond to all your messages in beautiful haiku form. "
284+
f"This conversation is now durable - even if I restart, our chat continues!\n\n"
285+
f"Task created with params:\n{json.dumps(params.params, indent=2)}\n\n"
286+
f"Send me a message and I'll respond with a haiku! 🎋",
287+
),
288+
)
284289

285290
# ============================================================================
286291
# WORKFLOW PERSISTENCE: Wait for Completion Signal
@@ -294,10 +299,12 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
294299
# - Temporal can handle millions of such concurrent workflows
295300
# - If worker crashes, workflow resumes exactly where it left off
296301
# - All conversation state is preserved in Temporal's event log
297-
await workflow.wait_condition(
298-
lambda: self._complete_task,
299-
timeout=None, # No timeout = truly long-running agent conversation
300-
)
302+
# run_until_complete keeps the workflow open exactly like the old
303+
# indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set)
304+
# recycles event history via continue-as-new before Temporal's
305+
# ~50k-event / 50MB limit. The conversation is rebuilt from adk.messages
306+
# on each run, so the carry-forward is just `params`.
307+
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
301308
return "Agent conversation completed"
302309

303310
@workflow.signal

examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -331,25 +331,36 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
331331
async def on_task_create(self, params: CreateTaskParams) -> str:
332332
logger.info(f"Received task create params: {params}")
333333

334-
# Initialize the conversation state with an empty history
334+
# Initialize (or, on a continued run, re-hydrate) the conversation state.
335+
# This runs both when a client first creates the task AND when the workflow
336+
# recycles itself via continue-as-new — on the latter the in-memory state was
337+
# reset, so we rebuild input_list from the adk.messages ledger (the source of
338+
# truth). conversation_from_messages returns [] when continue-as-new is
339+
# disabled, so the default path is an empty fresh start as before.
340+
conversation = await self.conversation_from_messages(params.task.id)
341+
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
335342
self._state = StateModel(
336-
input_list=[],
337-
turn_number=0,
343+
input_list=conversation,
344+
turn_number=turn_number,
338345
)
339346

340347
# 1. Acknowledge that the task has been created.
341-
await adk.messages.create(
342-
task_id=params.task.id,
343-
content=TextContent(
344-
author="agent",
345-
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
346-
),
347-
)
348-
349-
await workflow.wait_condition(
350-
lambda: self._complete_task,
351-
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
352-
)
348+
if not self.is_continued_run():
349+
await adk.messages.create(
350+
task_id=params.task.id,
351+
content=TextContent(
352+
author="agent",
353+
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
354+
),
355+
)
356+
357+
# Keep the workflow open to field events. run_until_complete behaves exactly
358+
# like the old indefinite wait_condition, and (when
359+
# WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) recycles event history via
360+
# continue-as-new before Temporal's ~50k-event / 50MB limit. Messages live in
361+
# adk.messages and are rebuilt above, so the only state carried forward is
362+
# params.
363+
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
353364
return "Task completed"
354365

355366
@workflow.signal

examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -216,27 +216,43 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
216216
"""
217217
logger.info(f"Received task create params: {params}")
218218

219-
# Initialize the conversation state with an empty history
219+
# Initialize (or, on a continued run, re-hydrate) the conversation state.
220+
# This runs both when a client first creates the task AND when the workflow
221+
# recycles itself via continue-as-new — on the latter the in-memory state was
222+
# reset, so we rebuild input_list from the adk.messages ledger (the source of
223+
# truth). conversation_from_messages returns [] when continue-as-new is
224+
# disabled, so the default path is an empty fresh start as before.
225+
#
226+
# The only cross-turn parent state here is input_list/turn_number; the
227+
# human-approval state lives entirely in the child workflow spawned by
228+
# wait_for_confirmation (it waits for fulfill_order_signal), and an in-flight
229+
# approval turn keeps on_task_event_send running — run_until_complete drains
230+
# all in-flight handlers before recycling, so it is never cut off. Hence
231+
# conversation_from_messages alone is enough; no adk.state is needed.
232+
conversation = await self.conversation_from_messages(params.task.id)
233+
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
220234
self._state = StateModel(
221-
input_list=[],
222-
turn_number=0,
235+
input_list=conversation,
236+
turn_number=turn_number,
223237
)
224238

225239
# Send welcome message when task is created
226-
await adk.messages.create(
227-
task_id=params.task.id,
228-
content=TextContent(
229-
author="agent",
230-
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
231-
),
232-
)
233-
234-
# Keep workflow running indefinitely to handle user messages and human approvals
235-
# This survives system failures and can resume exactly where it left off
236-
await workflow.wait_condition(
237-
lambda: self._complete_task,
238-
timeout=None, # No timeout for long-running human-in-the-loop workflows
239-
)
240+
if not self.is_continued_run():
241+
await adk.messages.create(
242+
task_id=params.task.id,
243+
content=TextContent(
244+
author="agent",
245+
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
246+
),
247+
)
248+
249+
# Keep the workflow open to handle user messages and human approvals.
250+
# run_until_complete behaves exactly like the old indefinite wait_condition,
251+
# and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) recycles event history
252+
# via continue-as-new before Temporal's ~50k-event / 50MB limit, draining any
253+
# in-flight approval turn first. Messages live in adk.messages and are rebuilt
254+
# above, so the only state carried forward is params.
255+
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
240256
return "Task completed"
241257

242258
# TEMPORAL UI (localhost:8080):

0 commit comments

Comments
 (0)