diff --git a/examples/tutorials/10_async/10_temporal/000_hello_acp/project/workflow.py b/examples/tutorials/10_async/10_temporal/000_hello_acp/project/workflow.py index 2ca0858ba..ccac5dfe7 100644 --- a/examples/tutorials/10_async/10_temporal/000_hello_acp/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/000_hello_acp/project/workflow.py @@ -65,7 +65,9 @@ async def on_task_create(self, params: CreateTaskParams) -> None: # 2. Wait for the task to be completed indefinitely. If we don't do this the workflow will close as soon as this function returns. Temporal can run hundreds of millions of workflows in parallel, so you don't need to worry about too many workflows running at once. # Thus, if you want this agent to field events indefinitely (or for a long time) you need to wait for a condition to be met. - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so. - ) + # run_until_complete behaves exactly like the old indefinite wait_condition + # by default; when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set it also recycles + # event history via continue-as-new before Temporal's history limit. This + # agent keeps no cross-turn conversation state, so the only thing carried + # forward across a recycle is `params`. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) diff --git a/examples/tutorials/10_async/10_temporal/010_agent_chat/project/workflow.py b/examples/tutorials/10_async/10_temporal/010_agent_chat/project/workflow.py index 3e3ac5b27..dd2229713 100644 --- a/examples/tutorials/10_async/10_temporal/010_agent_chat/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/010_agent_chat/project/workflow.py @@ -160,6 +160,19 @@ def __init__(self): self._complete_task = False self._state: StateModel | None = None + async def _rehydrate_state(self, task_id: str) -> None: + """Seed in-memory state, rebuilding the conversation from adk.messages. + + Messages are the source of truth and live in adk.messages — NOT in workflow + state. On a brand-new task the ledger is empty (fresh start); on a continued + run (after continue-as-new) we reconstruct input_list from the ledger so the + recycle is invisible to the user. ``conversation_from_messages`` returns [] + when continue-as-new is disabled, so the default path is unchanged. + """ + conversation = await self.conversation_from_messages(task_id) + turn_number = sum(1 for entry in conversation if entry["role"] == "user") + self._state = StateModel(input_list=conversation, turn_number=turn_number) + @workflow.signal(name=SignalName.RECEIVE_EVENT) @override async def on_task_event_send(self, params: SendEventParams) -> None: @@ -253,24 +266,29 @@ async def on_task_event_send(self, params: SendEventParams) -> None: if span and self._state: span.output = self._state.model_dump() + # NOTE: we do NOT persist the conversation to workflow state or adk.state. + # Every user + agent message is already written to the adk.messages ledger + # (the user echo above + the agent auto-send), which is the source of truth. + # On a continue-as-new recycle we rebuild input_list from that ledger in + # _rehydrate_state — messages live in adk.messages, state in adk.state. + @workflow.run @override async def on_task_create(self, params: CreateTaskParams) -> None: logger.info(f"Received task create params: {params}") - # 1. Initialize the state. You can either do this here or in the __init__ method. - # This function is triggered whenever a client creates a task for this agent. - # It is not re-triggered when a new event is sent to the task. - self._state = StateModel( - input_list=[], - turn_number=0, - ) - - # 2. Wait for the task to be completed indefinitely. If we don't do this the workflow will close as soon as this function returns. Temporal can run hundreds of millions of workflows in parallel, so you don't need to worry about too many workflows running at once. - - # Thus, if you want this agent to field events indefinitely (or for a long time) you need to wait for a condition to be met. - - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so. - ) + # 1. Initialize (or, on a continued run, re-hydrate) the state. This + # function runs both when a client first creates the task AND when the + # workflow recycles itself via continue-as-new — on the latter the + # in-memory state was reset, so we reload it from the adk.messages ledger. + await self._rehydrate_state(params.task.id) + + # 2. Keep the workflow open to field events. Temporal can run hundreds of + # millions of workflows in parallel, so staying open is cheap — but a single + # ever-open run accumulates event history until it hits Temporal's ~50k-event + # / 50MB limit. run_until_complete keeps the workflow open exactly like the + # old indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) + # recycles history via continue-as-new before that limit. The conversation + # is rebuilt from adk.messages on each run, so the carry-forward is just + # `params`. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) diff --git a/examples/tutorials/10_async/10_temporal/050_agent_chat_guardrails/project/workflow.py b/examples/tutorials/10_async/10_temporal/050_agent_chat_guardrails/project/workflow.py index b54c8fade..49fc403e7 100644 --- a/examples/tutorials/10_async/10_temporal/050_agent_chat_guardrails/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/050_agent_chat_guardrails/project/workflow.py @@ -455,14 +455,16 @@ async def on_task_event_send(self, params: SendEventParams) -> None: async def on_task_create(self, params: CreateTaskParams) -> None: logger.info(f"Received task create params: {params}") - # 1. Initialize the state. You can either do this here or in the - # __init__ method. This function is triggered whenever a client - # creates a task for this agent. It is not re-triggered when a new - # event is sent to the task. - self._state = StateModel( - input_list=[], - turn_number=0, - ) + # 1. Initialize (or, on a continued run, re-hydrate) the state. Messages + # are the source of truth and live in adk.messages — NOT in workflow + # state. On a brand-new task the ledger is empty (fresh start); on a + # continued run (after continue-as-new) we reconstruct input_list from + # the ledger so the recycle is invisible to the user. + # conversation_from_messages returns [] when continue-as-new is disabled, + # so the default path is unchanged. + conversation = await self.conversation_from_messages(params.task.id) + turn_number = sum(1 for entry in conversation if entry["role"] == "user") + self._state = StateModel(input_list=conversation, turn_number=turn_number) # 2. Wait for the task to be completed indefinitely. If we don't do # this the workflow will close as soon as this function returns. @@ -472,10 +474,9 @@ async def on_task_create(self, params: CreateTaskParams) -> None: # Thus, if you want this agent to field events indefinitely (or for # a long time) you need to wait for a condition to be met. - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, # Set a timeout if you want to prevent the task - # from running indefinitely. Generally this is not needed. - # Temporal can run hundreds of millions of workflows in parallel - # and more. Only do this if you have a specific reason to do so. - ) + # run_until_complete keeps the workflow open exactly like the old + # indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) + # recycles event history via continue-as-new before Temporal's + # ~50k-event / 50MB limit. The conversation is rebuilt from adk.messages + # on each run, so the carry-forward is just `params`. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) diff --git a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py index e01f40ce6..9d3b3bfd8 100644 --- a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py @@ -258,29 +258,34 @@ async def on_task_create(self, params: CreateTaskParams) -> str: # ============================================================================ # WORKFLOW INITIALIZATION: Initialize State # ============================================================================ - # Initialize the conversation state with an empty history - # This will be populated as the conversation progresses - self._state = StateModel( - input_list=[], - turn_number=0, - ) + # Initialize (or, on a continued run, re-hydrate) the conversation state. + # Messages are the source of truth and live in adk.messages — NOT in + # workflow state. On a brand-new task the ledger is empty (fresh start); + # on a continued run (after continue-as-new) we reconstruct input_list + # from the ledger so the recycle is invisible to the user. + # conversation_from_messages returns [] when continue-as-new is disabled, + # so the default path is unchanged. + conversation = await self.conversation_from_messages(params.task.id) + turn_number = sum(1 for entry in conversation if entry["role"] == "user") + self._state = StateModel(input_list=conversation, turn_number=turn_number) # ============================================================================ # WORKFLOW INITIALIZATION: Send Welcome Message # ============================================================================ # Acknowledge that the task has been created and the agent is ready. # This message appears once when the conversation starts. - await adk.messages.create( - task_id=params.task.id, - content=TextContent( - author="agent", - content=f"🌸 Hello! I'm your Haiku Assistant, powered by OpenAI Agents SDK + Temporal! 🌸\n\n" - f"I'll respond to all your messages in beautiful haiku form. " - f"This conversation is now durable - even if I restart, our chat continues!\n\n" - f"Task created with params:\n{json.dumps(params.params, indent=2)}\n\n" - f"Send me a message and I'll respond with a haiku! 🎋", - ), - ) + if not self.is_continued_run(): + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=f"🌸 Hello! I'm your Haiku Assistant, powered by OpenAI Agents SDK + Temporal! 🌸\n\n" + f"I'll respond to all your messages in beautiful haiku form. " + f"This conversation is now durable - even if I restart, our chat continues!\n\n" + f"Task created with params:\n{json.dumps(params.params, indent=2)}\n\n" + f"Send me a message and I'll respond with a haiku! 🎋", + ), + ) # ============================================================================ # WORKFLOW PERSISTENCE: Wait for Completion Signal @@ -294,10 +299,12 @@ async def on_task_create(self, params: CreateTaskParams) -> str: # - Temporal can handle millions of such concurrent workflows # - If worker crashes, workflow resumes exactly where it left off # - All conversation state is preserved in Temporal's event log - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, # No timeout = truly long-running agent conversation - ) + # run_until_complete keeps the workflow open exactly like the old + # indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) + # recycles event history via continue-as-new before Temporal's + # ~50k-event / 50MB limit. The conversation is rebuilt from adk.messages + # on each run, so the carry-forward is just `params`. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Agent conversation completed" @workflow.signal diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py index 2204d3a05..93a5c0fe6 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py @@ -331,25 +331,36 @@ async def on_task_event_send(self, params: SendEventParams) -> None: async def on_task_create(self, params: CreateTaskParams) -> str: logger.info(f"Received task create params: {params}") - # Initialize the conversation state with an empty history + # Initialize (or, on a continued run, re-hydrate) the conversation state. + # This runs both when a client first creates the task AND when the workflow + # recycles itself via continue-as-new — on the latter the in-memory state was + # reset, so we rebuild input_list from the adk.messages ledger (the source of + # truth). conversation_from_messages returns [] when continue-as-new is + # disabled, so the default path is an empty fresh start as before. + conversation = await self.conversation_from_messages(params.task.id) + turn_number = sum(1 for entry in conversation if entry["role"] == "user") self._state = StateModel( - input_list=[], - turn_number=0, + input_list=conversation, + turn_number=turn_number, ) # 1. Acknowledge that the task has been created. - await adk.messages.create( - task_id=params.task.id, - content=TextContent( - author="agent", - content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.", - ), - ) - - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so. - ) + if not self.is_continued_run(): + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.", + ), + ) + + # Keep the workflow open to field events. run_until_complete behaves exactly + # like the old indefinite wait_condition, and (when + # WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) recycles event history via + # continue-as-new before Temporal's ~50k-event / 50MB limit. Messages live in + # adk.messages and are rebuilt above, so the only state carried forward is + # params. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Task completed" @workflow.signal diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py index 4f11ac4c0..e941a332b 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py @@ -216,27 +216,43 @@ async def on_task_create(self, params: CreateTaskParams) -> str: """ logger.info(f"Received task create params: {params}") - # Initialize the conversation state with an empty history + # Initialize (or, on a continued run, re-hydrate) the conversation state. + # This runs both when a client first creates the task AND when the workflow + # recycles itself via continue-as-new — on the latter the in-memory state was + # reset, so we rebuild input_list from the adk.messages ledger (the source of + # truth). conversation_from_messages returns [] when continue-as-new is + # disabled, so the default path is an empty fresh start as before. + # + # The only cross-turn parent state here is input_list/turn_number; the + # human-approval state lives entirely in the child workflow spawned by + # wait_for_confirmation (it waits for fulfill_order_signal), and an in-flight + # approval turn keeps on_task_event_send running — run_until_complete drains + # all in-flight handlers before recycling, so it is never cut off. Hence + # conversation_from_messages alone is enough; no adk.state is needed. + conversation = await self.conversation_from_messages(params.task.id) + turn_number = sum(1 for entry in conversation if entry["role"] == "user") self._state = StateModel( - input_list=[], - turn_number=0, + input_list=conversation, + turn_number=turn_number, ) # Send welcome message when task is created - await adk.messages.create( - task_id=params.task.id, - content=TextContent( - author="agent", - content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.", - ), - ) - - # Keep workflow running indefinitely to handle user messages and human approvals - # This survives system failures and can resume exactly where it left off - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, # No timeout for long-running human-in-the-loop workflows - ) + if not self.is_continued_run(): + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.", + ), + ) + + # Keep the workflow open to handle user messages and human approvals. + # run_until_complete behaves exactly like the old indefinite wait_condition, + # and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) recycles event history + # via continue-as-new before Temporal's ~50k-event / 50MB limit, draining any + # in-flight approval turn first. Messages live in adk.messages and are rebuilt + # above, so the only state carried forward is params. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Task completed" # TEMPORAL UI (localhost:8080): diff --git a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/workflow.py b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/workflow.py index c22045152..736f97122 100644 --- a/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/090_claude_agents_sdk_mvp/project/workflow.py @@ -86,6 +86,64 @@ def __init__(self): self._trace_id = None self._parent_span_id = None self._workspace_path = None + # adk.state row id for the opaque session handle (continue-as-new only). + self._state_id: str | None = None + + async def _rehydrate_state(self, task_id: str) -> None: + """Initialize state, restoring the Claude session handle on a continued run. + + ``claude_session_id`` is an opaque CLI handle that maintains conversation + context — it CANNOT be rebuilt from adk.messages, so to survive a + continue-as-new recycle it is persisted to adk.state (the sanctioned home + for non-message state). When continue-as-new is disabled (the default) this + makes ZERO extra activity calls and behaves exactly like the old fresh init. + """ + if self.recycling_active() and environment_variables.AGENT_ID and self.is_continued_run(): + existing = await adk.state.get_by_task_and_agent( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + ) + if existing is not None: + self._state_id = existing.id + self._state = StateModel( + claude_session_id=existing.state.get("claude_session_id"), + turn_number=existing.state.get("turn_number", 0), + ) + return + # Fresh start (default path). + self._state = StateModel( + claude_session_id=None, + turn_number=0, + ) + + async def _persist_state(self, task_id: str) -> None: + """Persist the opaque Claude session handle + turn number to adk.state. + + Guarded by continue-as-new + AGENT_ID so the default path makes ZERO extra + activity calls. Creates the row on first write, updates it thereafter. + """ + if not (self.recycling_active() and environment_variables.AGENT_ID): + return + if self._state is None: + return + state_payload = { + "claude_session_id": self._state.claude_session_id, + "turn_number": self._state.turn_number, + } + if self._state_id is None: + created = await adk.state.create( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=state_payload, + ) + self._state_id = created.id + else: + await adk.state.update( + state_id=self._state_id, + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=state_payload, + ) @workflow.signal(name=SignalName.RECEIVE_EVENT) async def on_task_event_send(self, params: SendEventParams): @@ -171,6 +229,10 @@ async def on_task_event_send(self, params: SendEventParams): # Response already streamed to UI by activity - no need to send again logger.debug(f"Turn {self._state.turn_number} completed successfully") + # Persist the opaque session handle so it survives a + # continue-as-new recycle (no-op when the feature is disabled). + await self._persist_state(params.task.id) + except Exception as e: logger.error(f"Error running Claude agent: {e}", exc_info=True) # Send error message to user @@ -189,11 +251,9 @@ async def on_task_create(self, params: CreateTaskParams): logger.info(f"Creating Claude MVP workflow for task: {params.task.id}") - # Initialize state with session tracking - self._state = StateModel( - claude_session_id=None, - turn_number=0, - ) + # Initialize state, or (on a continue-as-new recycle) re-hydrate the + # opaque Claude session handle from adk.state. Default path is a fresh init. + await self._rehydrate_state(params.task.id) # Create workspace via activity (avoids determinism issues with file I/O) workspace_root = os.environ.get("CLAUDE_WORKSPACE_ROOT") @@ -206,28 +266,36 @@ async def on_task_create(self, params: CreateTaskParams): logger.info(f"Workspace ready: {self._workspace_path}") # Send welcome message - await adk.messages.create( - task_id=params.task.id, - content=TextContent( - author="agent", - content=( - "🚀 **Claude MVP Agent Ready!**\n\n" - f"Workspace: `{self._workspace_path}`\n\n" - "I'm powered by Claude Agents SDK + Temporal. Try asking me to:\n" - "- Create files: *'Create a hello.py file'*\n" - "- Read files: *'What's in hello.py?'*\n" - "- Run commands: *'List files in the workspace'*\n\n" - "Send me a message to get started! 💬" - ), - format="markdown", + if not self.is_continued_run(): + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + "🚀 **Claude MVP Agent Ready!**\n\n" + f"Workspace: `{self._workspace_path}`\n\n" + "I'm powered by Claude Agents SDK + Temporal. Try asking me to:\n" + "- Create files: *'Create a hello.py file'*\n" + "- Read files: *'What's in hello.py?'*\n" + "- Run commands: *'List files in the workspace'*\n\n" + "Send me a message to get started! 💬" + ), + format="markdown", + ) ) - ) - # Wait for completion signal + # Keep the workflow open to field events. run_until_complete behaves + # exactly like the old indefinite wait unless continue-as-new is enabled, + # in which case it recycles event history before Temporal's limit. The + # opaque session handle is carried across recycles via adk.state. logger.info("Waiting for task completion...") - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, # Long-running workflow + await self.run_until_complete( + params, + is_complete=lambda: self._complete_task, + # Surviving a recycle requires persisting the session handle to + # adk.state (keyed by task + agent); without AGENT_ID there is nowhere + # to persist it, so don't recycle rather than silently drop it. + can_recycle=bool(environment_variables.AGENT_ID), ) logger.info("Claude MVP workflow completed") diff --git a/examples/tutorials/10_async/10_temporal/100_gemini_litellm/project/workflow.py b/examples/tutorials/10_async/10_temporal/100_gemini_litellm/project/workflow.py index 249bdaa50..014e3627a 100644 --- a/examples/tutorials/10_async/10_temporal/100_gemini_litellm/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/100_gemini_litellm/project/workflow.py @@ -200,31 +200,38 @@ async def on_task_create(self, params: CreateTaskParams) -> str: """ logger.info(f"Received task create params: {params}") - # Initialize the conversation state - self._state = StateModel( - input_list=[], - turn_number=0, - ) + # Initialize (or, on a continued run, re-hydrate) the conversation state. + # Messages are the source of truth and live in adk.messages — NOT in + # workflow state. On a brand-new task the ledger is empty (fresh start); + # on a continued run (after continue-as-new) we reconstruct input_list + # from the ledger so the recycle is invisible to the user. + # conversation_from_messages returns [] when continue-as-new is disabled, + # so the default path is unchanged. + conversation = await self.conversation_from_messages(params.task.id) + turn_number = sum(1 for entry in conversation if entry["role"] == "user") + self._state = StateModel(input_list=conversation, turn_number=turn_number) # Send welcome message - await adk.messages.create( - task_id=params.task.id, - content=TextContent( - author="agent", - content=f"Hello! I'm your assistant powered by Google's Gemini model via LiteLLM!\n\n" - f"This demonstrates how to use alternative model providers with the OpenAI Agents SDK " - f"and Temporal workflows. The code structure is nearly identical to OpenAI examples - " - f"only the model specification changes.\n\n" - f"Task created with params:\n{json.dumps(params.params, indent=2)}\n\n" - f"Send me a message and I'll respond using Gemini!", - ), - ) - - # Wait for completion signal - await workflow.wait_condition( - lambda: self._complete_task, - timeout=None, - ) + if not self.is_continued_run(): + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=f"Hello! I'm your assistant powered by Google's Gemini model via LiteLLM!\n\n" + f"This demonstrates how to use alternative model providers with the OpenAI Agents SDK " + f"and Temporal workflows. The code structure is nearly identical to OpenAI examples - " + f"only the model specification changes.\n\n" + f"Task created with params:\n{json.dumps(params.params, indent=2)}\n\n" + f"Send me a message and I'll respond using Gemini!", + ), + ) + + # run_until_complete keeps the workflow open exactly like the old + # indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) + # recycles event history via continue-as-new before Temporal's + # ~50k-event / 50MB limit. The conversation is rebuilt from adk.messages + # on each run, so the carry-forward is just `params`. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Agent conversation completed" @workflow.signal diff --git a/examples/tutorials/10_async/10_temporal/110_pydantic_ai/project/workflow.py b/examples/tutorials/10_async/10_temporal/110_pydantic_ai/project/workflow.py index 9a01be7de..86c538d4a 100644 --- a/examples/tutorials/10_async/10_temporal/110_pydantic_ai/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/110_pydantic_ai/project/workflow.py @@ -9,8 +9,19 @@ Multi-turn memory is kept on the workflow instance itself (``self._message_history``). Temporal's workflow state is already durable and -replay-safe, so unlike the async-base agent we don't need an external +replay-safe, so for crash recovery alone we don't need an external ``adk.state`` round-trip. + +When continue-as-new is enabled (``WORKFLOW_CONTINUE_AS_NEW_ENABLED``), the +workflow may recycle its event history mid-conversation, which resets all +in-memory state. To survive that recycle the conversation must live OUTSIDE +workflow state. Pydantic-ai ``ModelMessage`` objects carry tool rounds and are +not plain text (so the adk.messages ledger can't reconstruct them losslessly), +so we persist them to ``adk.state`` using pydantic-ai's official +``ModelMessagesTypeAdapter`` serializer and re-hydrate on each run. All of this +is gated behind ``self._continue_as_new_enabled`` (and a present ``AGENT_ID``), +so the default-off path makes zero extra activity calls and is byte-for-byte +unchanged. """ from __future__ import annotations @@ -20,6 +31,7 @@ from typing import TYPE_CHECKING from temporalio import workflow +from pydantic_ai.messages import ModelMessagesTypeAdapter from agentex.lib import adk from project.agent import TaskDeps, temporal_agent @@ -77,6 +89,68 @@ def __init__(self): # produced these messages, so the list is rebuilt deterministically if # the workflow ever recovers from a crash. self._message_history: list["ModelMessage"] = [] + # adk.state row id backing the persisted conversation, set lazily on the + # first persist (or on re-hydrate). Only used when continue-as-new is + # enabled; stays None otherwise. + self._state_id: str | None = None + + async def _rehydrate_state(self, task_id: str) -> None: + """Reload turn number + message history from adk.state on a continued run. + + Only does anything when continue-as-new is enabled AND an AGENT_ID is + present; otherwise the normal fresh in-memory init stands and no activity + is scheduled. On a brand-new task there is no stored state yet, so this is + a no-op there too. On a run that resumed via continue-as-new the stored + payload is found, ``self._state_id`` is set, ``self._turn_number`` is + restored, and ``self._message_history`` is deserialized losslessly via + ``ModelMessagesTypeAdapter`` (preserving tool rounds). + """ + if not self.recycling_active() or not environment_variables.AGENT_ID or not self.is_continued_run(): + return + state = await adk.state.get_by_task_and_agent( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + ) + if state is None: + return + self._state_id = state.id + self._turn_number = state.state.get("turn_number", 0) + self._message_history = ModelMessagesTypeAdapter.validate_python( + state.state.get("message_history", []) + ) + + async def _persist_state(self, task_id: str) -> None: + """Persist turn number + message history to adk.state for recycle survival. + + Only does anything when continue-as-new is enabled AND an AGENT_ID is + present; otherwise it returns immediately and schedules no activity, so + the default-off path is unchanged. The pydantic-ai ``ModelMessage`` list + is serialized to a JSON-safe object via ``ModelMessagesTypeAdapter`` so it + round-trips losslessly. Creates the state row on first call, updates it + thereafter. + """ + if not self.recycling_active() or not environment_variables.AGENT_ID: + return + payload = { + "turn_number": self._turn_number, + "message_history": ModelMessagesTypeAdapter.dump_python( + self._message_history, mode="json" + ), + } + if self._state_id is None: + state = await adk.state.create( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=payload, + ) + self._state_id = state.id + else: + await adk.state.update( + state_id=self._state_id, + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=payload, + ) @workflow.signal(name=SignalName.RECEIVE_EVENT) async def on_task_event_send(self, params: SendEventParams) -> None: @@ -111,23 +185,46 @@ async def on_task_event_send(self, params: SendEventParams) -> None: if span: span.output = {"final_output": result.output} + # Persist the updated conversation OUTSIDE workflow state so it survives a + # continue-as-new recycle. No-op (zero activity calls) unless + # continue-as-new is enabled and an AGENT_ID is present. + await self._persist_state(params.task.id) + @workflow.run async def on_task_create(self, params: CreateTaskParams) -> str: """Workflow entry point — keep the conversation alive for incoming signals.""" logger.info(f"Task created: {params.task.id}") - await adk.messages.create( - task_id=params.task.id, - content=TextContent( - author="agent", - content=( - f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n" - f"Send me a message and I'll respond using a Pydantic AI agent backed by Temporal." + # On a run resumed via continue-as-new the in-memory conversation was + # reset; reload it from adk.state. No-op on a brand-new task or when + # continue-as-new is disabled (the default). + await self._rehydrate_state(params.task.id) + + if not self.is_continued_run(): + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n" + f"Send me a message and I'll respond using a Pydantic AI agent backed by Temporal." + ), ), - ), - ) + ) - await workflow.wait_condition(lambda: self._complete_task, timeout=None) + # Keep the workflow open to field events. Identical to the old indefinite + # wait unless WORKFLOW_CONTINUE_AS_NEW_ENABLED is set, in which case the + # event history is recycled via continue-as-new before it hits Temporal's + # ~50k-event / 50MB limit. The conversation is persisted to adk.state and + # re-hydrated on each run, so the carry-forward is just `params`. + await self.run_until_complete( + params, + is_complete=lambda: self._complete_task, + # Surviving a recycle requires persisting the session handle to + # adk.state (keyed by task + agent); without AGENT_ID there is nowhere + # to persist it, so don't recycle rather than silently drop it. + can_recycle=bool(environment_variables.AGENT_ID), + ) return "Task completed" @workflow.signal diff --git a/examples/tutorials/10_async/10_temporal/120_openai_agents/project/workflow.py b/examples/tutorials/10_async/10_temporal/120_openai_agents/project/workflow.py index 5cb8fb38b..24911e2a2 100644 --- a/examples/tutorials/10_async/10_temporal/120_openai_agents/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/120_openai_agents/project/workflow.py @@ -102,19 +102,34 @@ async def on_task_create(self, params: CreateTaskParams) -> str: """Workflow entry point — keep the conversation alive for incoming signals.""" logger.info(f"Task created: {params.task.id}") - await adk.messages.create( - task_id=params.task.id, - content=TextContent( - author="agent", - content=( - f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n" - f"Send me a message and I'll respond using an OpenAI Agents SDK agent " - f"delivered through the unified harness surface." + # Rebuild the running conversation from the adk.messages ledger (the source + # of truth), then derive the turn number. On a brand-new task the ledger is + # empty so this is a no-op fresh start; on a continued run (after a + # continue-as-new recycle) it reconstructs self._messages so the recycle is + # invisible to the user. conversation_from_messages returns [] when + # continue-as-new is disabled, so the default path is unchanged. + self._messages = await self.conversation_from_messages(params.task.id) + self._turn_number = sum(1 for m in self._messages if m["role"] == "user") + + if not self.is_continued_run(): + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n" + f"Send me a message and I'll respond using an OpenAI Agents SDK agent " + f"delivered through the unified harness surface." + ), ), - ), - ) + ) - await workflow.wait_condition(lambda: self._complete_task, timeout=None) + # run_until_complete behaves exactly like the old indefinite wait_condition + # by default; when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set it recycles event + # history via continue-as-new before Temporal's history limit. The + # conversation is rebuilt from adk.messages on each run (above), so the only + # thing carried forward across a recycle is `params`. + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Task completed" @workflow.signal diff --git a/examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py b/examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py index b9224ca00..881c21888 100644 --- a/examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/130_langgraph/project/workflow.py @@ -65,16 +65,28 @@ async def complete_task_signal(self) -> None: @workflow.run async def on_task_create(self, params: CreateTaskParams) -> str: - await adk.messages.create( - task_id=params.task.id, - content=TextContent( - author="agent", - content=( - f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n\n" - "Send me a message and I'll respond using a LangGraph agent whose nodes " - "run as durable Temporal activities." + # Rebuild the conversation from the adk.messages ledger. This is a no-op + # fresh start by default (returns [] when continue-as-new is disabled); + # on a continue-as-new recycle it reloads the prior turns so the recycle + # is invisible. These rebuilt messages are already in the ledger, so set + # the high-water mark to their count to avoid re-emitting them. + # NOTE: rebuilt messages are plain {"role", "content"} text-only dicts; + # LangGraph accepts these as input, but any tool-call structure from + # prior turns is not reconstructed — acceptable at a turn boundary. + self._messages = await self.conversation_from_messages(params.task.id) + self._emitted = len(self._messages) + + if not self.is_continued_run(): + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n\n" + "Send me a message and I'll respond using a LangGraph agent whose nodes " + "run as durable Temporal activities." + ), ), - ), - ) - await workflow.wait_condition(lambda: self._complete_task, timeout=None) + ) + await self.run_until_complete(params, is_complete=lambda: self._complete_task) return "Task completed" diff --git a/examples/tutorials/10_async/10_temporal/140_claude_code/project/workflow.py b/examples/tutorials/10_async/10_temporal/140_claude_code/project/workflow.py index 7f50ba8d5..030b0c25b 100644 --- a/examples/tutorials/10_async/10_temporal/140_claude_code/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/140_claude_code/project/workflow.py @@ -71,6 +71,54 @@ def __init__(self): self._turn_number = 0 # Claude Code session_id for multi-turn resume. self._session_id: str | None = None + # adk.state row id for the opaque session handle (continue-as-new only). + self._state_id: str | None = None + + async def _rehydrate_state(self, task_id: str) -> None: + """Restore the Claude Code session handle on a continue-as-new recycle. + + ``session_id`` is an opaque CLI handle (``-r ``) that maintains + conversation context — it CANNOT be rebuilt from adk.messages, so to survive + a continue-as-new recycle it is persisted to adk.state (the sanctioned home + for non-message state). When continue-as-new is disabled (the default) this + makes ZERO extra activity calls and leaves the normal fresh init in place. + """ + if self.recycling_active() and environment_variables.AGENT_ID and self.is_continued_run(): + existing = await adk.state.get_by_task_and_agent( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + ) + if existing is not None: + self._state_id = existing.id + self._session_id = existing.state.get("session_id") + self._turn_number = existing.state.get("turn_number", 0) + + async def _persist_state(self, task_id: str) -> None: + """Persist the opaque Claude Code session handle + turn number to adk.state. + + Guarded by continue-as-new + AGENT_ID so the default path makes ZERO extra + activity calls. Creates the row on first write, updates it thereafter. + """ + if not (self.recycling_active() and environment_variables.AGENT_ID): + return + state_payload = { + "session_id": self._session_id, + "turn_number": self._turn_number, + } + if self._state_id is None: + created = await adk.state.create( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=state_payload, + ) + self._state_id = created.id + else: + await adk.state.update( + state_id=self._state_id, + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=state_payload, + ) @workflow.signal(name=SignalName.RECEIVE_EVENT) async def on_task_event_send(self, params: SendEventParams) -> None: @@ -113,22 +161,38 @@ async def on_task_event_send(self, params: SendEventParams) -> None: if span: span.output = {"final_text": result.get("final_text")} + # Persist the opaque session handle so it survives a continue-as-new + # recycle (no-op when the feature is disabled). + await self._persist_state(task_id) + @workflow.run async def on_task_create(self, params: CreateTaskParams) -> str: logger.info("Task created: %s", params.task.id) - await adk.messages.create( - task_id=params.task.id, - content=TextContent( - author="agent", - content=( - f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n" - "Send me a message and I'll run it through Claude Code locally." + # On a continue-as-new recycle, restore the opaque session handle from + # adk.state. Default path (feature off) is a no-op fresh start. + await self._rehydrate_state(params.task.id) + + if not self.is_continued_run(): + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + f"Task initialized with params:\n{json.dumps(params.params, indent=2)}\n" + "Send me a message and I'll run it through Claude Code locally." + ), ), - ), - ) + ) - await workflow.wait_condition(lambda: self._complete_task, timeout=None) + await self.run_until_complete( + params, + is_complete=lambda: self._complete_task, + # Surviving a recycle requires persisting the session handle to + # adk.state (keyed by task + agent); without AGENT_ID there is nowhere + # to persist it, so don't recycle rather than silently drop it. + can_recycle=bool(environment_variables.AGENT_ID), + ) return "Task completed" @workflow.signal diff --git a/examples/tutorials/10_async/10_temporal/150_codex/project/workflow.py b/examples/tutorials/10_async/10_temporal/150_codex/project/workflow.py index 1970b478f..19faa080f 100644 --- a/examples/tutorials/10_async/10_temporal/150_codex/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/150_codex/project/workflow.py @@ -14,8 +14,9 @@ ``StreamTaskMessage*`` events to Redis so the UI sees tokens in real time. - Passing ``created_at=workflow.now()`` for deterministic timestamps under Temporal replay (required for Temporal-safe delivery). -- Persisting the codex thread ID on the workflow instance itself — Temporal's - workflow state is durable, so no external ``adk.state`` round-trip is needed. +- Persisting the codex thread ID on the workflow instance for in-run durability; + when continue-as-new is enabled it is also flushed to ``adk.state`` so the + thread survives a history recycle (gated off by default). """ from __future__ import annotations @@ -63,8 +64,9 @@ class AtHarnessCodexWorkflow(BaseWorkflow): """Long-running Temporal workflow that runs codex exec for each turn. Conversation state (codex thread ID + turn counter) is kept on the - workflow instance. Temporal's durable replay reconstructs this state if - the worker crashes, so no external ``adk.state`` round-trip is needed. + workflow instance, which Temporal's durable replay reconstructs if the + worker crashes. When continue-as-new is enabled it is additionally flushed + to ``adk.state`` so the thread survives a history recycle (off by default). """ def __init__(self): @@ -72,6 +74,54 @@ def __init__(self): self._complete_task = False self._turn_number = 0 self._codex_thread_id: str | None = None + # adk.state row id for the opaque session handle (continue-as-new only). + self._state_id: str | None = None + + async def _rehydrate_state(self, task_id: str) -> None: + """Restore the opaque codex thread id on a continue-as-new recycle. + + The codex thread id is an opaque handle that maintains conversation context + across turns — it CANNOT be rebuilt from adk.messages, so to survive a + continue-as-new recycle it is persisted to adk.state (the sanctioned home + for non-message state). When continue-as-new is disabled (the default) this + makes ZERO extra activity calls and leaves the normal fresh init in place. + """ + if self.recycling_active() and environment_variables.AGENT_ID and self.is_continued_run(): + existing = await adk.state.get_by_task_and_agent( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + ) + if existing is not None: + self._state_id = existing.id + self._codex_thread_id = existing.state.get("session_id") + self._turn_number = existing.state.get("turn_number", 0) + + async def _persist_state(self, task_id: str) -> None: + """Persist the opaque codex thread id + turn number to adk.state. + + Guarded by continue-as-new + AGENT_ID so the default path makes ZERO extra + activity calls. Creates the row on first write, updates it thereafter. + """ + if not (self.recycling_active() and environment_variables.AGENT_ID): + return + state_payload = { + "session_id": self._codex_thread_id, + "turn_number": self._turn_number, + } + if self._state_id is None: + created = await adk.state.create( + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=state_payload, + ) + self._state_id = created.id + else: + await adk.state.update( + state_id=self._state_id, + task_id=task_id, + agent_id=environment_variables.AGENT_ID, + state=state_payload, + ) @workflow.signal(name=SignalName.RECEIVE_EVENT) async def on_task_event_send(self, params: SendEventParams) -> None: @@ -118,24 +168,40 @@ async def on_task_event_send(self, params: SendEventParams) -> None: "model": result.get("model"), } + # Persist the opaque session handle so it survives a continue-as-new + # recycle (no-op when the feature is disabled). + await self._persist_state(params.task.id) + @workflow.run async def on_task_create(self, params: CreateTaskParams) -> str: """Workflow entry point — keep the conversation alive for incoming signals.""" logger.info("Task created: %s", params.task.id) - await adk.messages.create( - task_id=params.task.id, - content=TextContent( - author="agent", - content=( - f"Task initialized.\n" - f"Send me a message and I'll run codex (local subprocess) " - f"to answer, streaming events via the unified harness surface." + # On a continue-as-new recycle, restore the opaque codex thread id from + # adk.state. Default path (feature off) is a no-op fresh start. + await self._rehydrate_state(params.task.id) + + if not self.is_continued_run(): + await adk.messages.create( + task_id=params.task.id, + content=TextContent( + author="agent", + content=( + f"Task initialized.\n" + f"Send me a message and I'll run codex (local subprocess) " + f"to answer, streaming events via the unified harness surface." + ), ), - ), - ) + ) - await workflow.wait_condition(lambda: self._complete_task, timeout=None) + await self.run_until_complete( + params, + is_complete=lambda: self._complete_task, + # Surviving a recycle requires persisting the session handle to + # adk.state (keyed by task + agent); without AGENT_ID there is nowhere + # to persist it, so don't recycle rather than silently drop it. + can_recycle=bool(environment_variables.AGENT_ID), + ) return "Task completed" @workflow.signal diff --git a/examples/tutorials/10_async/10_temporal/150_codex/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/150_codex/tests/test_agent.py index fa6c66083..86dd73594 100644 --- a/examples/tutorials/10_async/10_temporal/150_codex/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/150_codex/tests/test_agent.py @@ -151,6 +151,11 @@ async def _fake_execute_activity(_activity, params, **_kw): wf._codex_thread_id = None wf._complete_task = False wf._display_name = "test" + # __new__ bypasses BaseWorkflow.__init__, so set the continue-as-new + # config attributes it would have populated (feature off for this test). + wf._continue_as_new_enabled = False + wf._max_history_length = None + wf._state_id = None params = MagicMock() params.task.id = "task-temporal-offline-1" diff --git a/src/agentex/lib/adk/_modules/messages.py b/src/agentex/lib/adk/_modules/messages.py index 992683b58..1598413cb 100644 --- a/src/agentex/lib/adk/_modules/messages.py +++ b/src/agentex/lib/adk/_modules/messages.py @@ -260,6 +260,7 @@ async def list( self, task_id: str, limit: int | None = None, + page_number: int | None = None, trace_id: str | None = None, parent_span_id: str | None = None, start_to_close_timeout: timedelta = timedelta(seconds=5), @@ -282,6 +283,7 @@ async def list( params = ListMessagesParams( task_id=task_id, limit=limit, + page_number=page_number, trace_id=trace_id, parent_span_id=parent_span_id, ) @@ -298,4 +300,5 @@ async def list( return await self._messages_service.list_messages( task_id=task_id, limit=limit, + page_number=page_number, ) diff --git a/src/agentex/lib/core/services/adk/messages.py b/src/agentex/lib/core/services/adk/messages.py index 929100eb1..83e0fffc8 100644 --- a/src/agentex/lib/core/services/adk/messages.py +++ b/src/agentex/lib/core/services/adk/messages.py @@ -135,6 +135,7 @@ async def list_messages( self, task_id: str, limit: int | None = None, + page_number: int | None = None, trace_id: str | None = None, parent_span_id: str | None = None, ) -> list[TaskMessage]: @@ -148,6 +149,7 @@ async def list_messages( task_messages = await self._agentex_client.messages.list( task_id=task_id, limit=limit, + page_number=page_number, ) if span: span.output = [task_message.model_dump() for task_message in task_messages] diff --git a/src/agentex/lib/core/temporal/activities/adk/messages_activities.py b/src/agentex/lib/core/temporal/activities/adk/messages_activities.py index 3ae5aaf5b..54f575906 100644 --- a/src/agentex/lib/core/temporal/activities/adk/messages_activities.py +++ b/src/agentex/lib/core/temporal/activities/adk/messages_activities.py @@ -50,6 +50,7 @@ class UpdateMessagesBatchParams(BaseModelWithTraceParams): class ListMessagesParams(BaseModelWithTraceParams): task_id: str limit: int | None = None + page_number: int | None = None class MessagesActivities: @@ -94,4 +95,5 @@ async def list_messages(self, params: ListMessagesParams) -> list[TaskMessage]: return await self._messages_service.list_messages( task_id=params.task_id, limit=params.limit, + page_number=params.page_number, ) diff --git a/src/agentex/lib/core/temporal/workflows/workflow.py b/src/agentex/lib/core/temporal/workflows/workflow.py index 3e4498162..06371e496 100644 --- a/src/agentex/lib/core/temporal/workflows/workflow.py +++ b/src/agentex/lib/core/temporal/workflows/workflow.py @@ -1,13 +1,31 @@ +from __future__ import annotations + from abc import ABC, abstractmethod +from typing import Any, Callable from temporalio import workflow from agentex.protocol.acp import SendEventParams, CreateTaskParams from agentex.lib.utils.logging import make_logger +from agentex.lib.environment_variables import EnvironmentVariables from agentex.lib.core.temporal.types.workflow import SignalName logger = make_logger(__name__) +# Patch identifier gating the continue-as-new recycle path. Workflows that were +# already running before this code shipped have no record of this patch in their +# event history; `workflow.patched()` lets the new drain + continue_as_new branch +# be introduced without breaking determinism when those in-flight executions +# replay against the new code. Gate the decision ONCE in the agent's @workflow.run +# (see the 010_agent_chat reference agent) so pre-patch runs keep their old +# behaviour and never spin. +CONTINUE_AS_NEW_PATCH_ID = "agentex-base-workflow-continue-as-new-v1" + +# Page size used when rebuilding a conversation from the adk.messages ledger after +# a recycle. conversation_from_messages pages through the whole ledger in chunks of +# this size so no turns are dropped for a long-lived chat. +_CONVERSATION_PAGE_SIZE = 200 + class BaseWorkflow(ABC): def __init__( @@ -15,6 +33,13 @@ def __init__( display_name: str, ): self.display_name = display_name + # Read once at construction. `refresh()` returns the process-wide cached + # config the worker loaded at import time, so this does not read os.environ + # during workflow execution (which the sandbox would flag) — the values are + # deployment constants and therefore replay-safe. + env = EnvironmentVariables.refresh() + self._continue_as_new_enabled: bool = env.WORKFLOW_CONTINUE_AS_NEW_ENABLED + self._max_history_length: int | None = env.WORKFLOW_MAX_HISTORY_LENGTH @abstractmethod @workflow.signal(name=SignalName.RECEIVE_EVENT) @@ -24,3 +49,228 @@ async def on_task_event_send(self, params: SendEventParams) -> None: @abstractmethod async def on_task_create(self, params: CreateTaskParams) -> None: raise NotImplementedError + + # ------------------------------------------------------------------ # + # Continue-as-new lifecycle helpers # + # # + # These let a long-lived chat/session workflow recycle its event # + # history so it can stay open indefinitely without hitting Temporal's # + # ~50k-event / 50MB history limit. The SDK owns the genuinely hard, # + # easy-to-get-wrong Temporal mechanics here (threshold decision, # + # draining in-flight handlers, the continue_as_new call); the agent # + # owns its data and is responsible for keeping it OUTSIDE workflow # + # state so it survives the recycle: messages in `adk.messages`, any # + # other state in `adk.state`. See the 010_agent_chat reference agent # + # for the full recipe. # + # ------------------------------------------------------------------ # + + def should_continue_as_new(self) -> bool: + """Whether this run should recycle its event history via continue-as-new. + + Returns True only when continue-as-new is enabled for the agent AND + either: + - Temporal suggests it (history is approaching the server's limit), or + - the configured ``WORKFLOW_MAX_HISTORY_LENGTH`` threshold is reached. + + This reads only deterministic ``workflow.info()`` values and constant + config and emits no commands, so it is safe to use directly as a + ``workflow.wait_condition`` predicate, e.g.:: + + await workflow.wait_condition( + lambda: self._complete_task or self.should_continue_as_new() + ) + """ + if not self._continue_as_new_enabled: + return False + info = workflow.info() + if info.is_continue_as_new_suggested(): + return True + if ( + self._max_history_length is not None + and info.get_current_history_length() >= self._max_history_length + ): + return True + return False + + async def drain_and_continue_as_new( + self, + *args: Any, + is_complete: Callable[[], bool] | None = None, + ) -> None: + """Drain in-flight signal handlers, then continue-as-new. + + Call this from the agent's ``@workflow.run`` once the run loop wakes for a + recycle (see :meth:`should_continue_as_new`). ``args`` are forwarded + verbatim to ``workflow.continue_as_new`` and become the new run's input, so + pass whatever your ``@workflow.run`` signature expects — typically the + original ``CreateTaskParams`` (the new run keeps the same workflow id / task + id and re-hydrates its state from ``adk.state``). + + IMPORTANT: keep your data OUTSIDE workflow state BEFORE calling this — + messages in ``adk.messages`` and any other state in ``adk.state``. + In-workflow attributes do NOT survive the recycle; only the forwarded + ``args`` do. + + Waits on ``all_handlers_finished`` first so an in-flight turn (a signal + handler still running an activity) is never lost or duplicated across the + recycle boundary. ``workflow.continue_as_new`` raises to end the run, so + this never returns normally — EXCEPT when ``is_complete`` is given and + returns True after draining: a completion signal can arrive while we wait + for the drain, and the recycled run would start fresh (losing that + completion), so in that case we return without recycling and let the caller + finish. + """ + # Don't recycle until any signal handler still running has finished, so a + # message mid-flight at the boundary is carried into the next run intact. + await workflow.wait_condition(workflow.all_handlers_finished) + # A completion signal may have landed during the drain — re-check before + # recycling so a workflow that should finish isn't kept open by the recycle. + if is_complete is not None and is_complete(): + return + logger.info( + "Recycling workflow via continue-as-new " + f"(history_length={workflow.info().get_current_history_length()}, " + f"run_id={workflow.info().run_id})" + ) + workflow.continue_as_new(*args) + + async def run_until_complete( + self, + *continue_as_new_args: Any, + is_complete: Callable[[], bool], + can_recycle: bool = True, + ) -> None: + """Keep the workflow open to field events, recycling history as needed. + + Drop-in replacement for the usual ``await workflow.wait_condition( + lambda: self._complete_task, timeout=None)`` at the end of an agent's + ``@workflow.run``. ``is_complete`` is a no-arg predicate (typically + ``lambda: self._complete_task``); ``continue_as_new_args`` are forwarded to + continue-as-new on recycle (typically the original ``CreateTaskParams``). + + Behaviour is identical to the old indefinite wait UNLESS + ``WORKFLOW_CONTINUE_AS_NEW_ENABLED`` is set, so adopting this is safe by + default. The recycle path is gated once behind ``workflow.patched(...)`` so + workflows that started before this code shipped keep waiting the old way + and never hit a non-determinism error on replay. + + ``can_recycle`` lets an agent declare that recycling is unsafe right now and + fall back to the plain wait even when the feature is enabled. Use it when a + prerequisite for surviving a recycle is missing — e.g. an agent that keeps + non-message state in ``adk.state`` (keyed by task + agent) has nothing to + persist into without an ``AGENT_ID``, so it should pass + ``can_recycle=bool(environment_variables.AGENT_ID)`` rather than recycle and + silently drop that state on the first continue-as-new. + + Persist anything you need across a recycle OUTSIDE workflow state first — + messages in ``adk.messages``, other state in ``adk.state`` — and rebuild it + at the top of ``@workflow.run`` (see :meth:`conversation_from_messages`). + """ + if not can_recycle or not self.recycling_active(): + await workflow.wait_condition(is_complete, timeout=None) + return + while True: + await workflow.wait_condition( + lambda: is_complete() or self.should_continue_as_new() + ) + if is_complete(): + return + # Drains in-flight handlers, then continue-as-new (raises; never + # returns) — UNLESS a completion signal arrived during the drain, in + # which case it returns here and the next loop iteration completes. + await self.drain_and_continue_as_new( + *continue_as_new_args, is_complete=is_complete + ) + if is_complete(): + return + + def recycling_active(self) -> bool: + """Whether continue-as-new machinery should run for this workflow. + + ``True`` only when the agent enabled it AND this run is on patched code + (``workflow.patched``). This is the single gate every caller must check + before running ANY new, command-emitting continue-as-new code — re-hydrating + from ``adk.state``, persisting to ``adk.state``, the recycle wait — so that a + workflow which started BEFORE this code shipped (no patch marker in its + history) replays its original command stream unchanged and never hits a + non-determinism error, even if the flag is later turned on. Calling it is + replay-safe: on a pre-patch run ``workflow.patched`` returns False and emits + no command. + """ + return self._continue_as_new_enabled and workflow.patched( + CONTINUE_AS_NEW_PATCH_ID + ) + + def is_continued_run(self) -> bool: + """Whether this run was produced by a continue-as-new from a prior run. + + True only on a recycled run (``workflow.info().continued_run_id`` is set), + False on the original run a client created. This is the gate for any new, + activity-emitting prologue work in ``@workflow.run`` (re-hydrating state + from ``adk.state`` / ``adk.messages``): such work is only needed on a + continued run, and gating it here means an original run emits no new + commands. That keeps a pre-existing in-flight workflow (started before this + code shipped) deterministic on replay even with continue-as-new enabled, + since its original run skips the new prologue entirely. + """ + return workflow.info().continued_run_id is not None + + async def conversation_from_messages( + self, + task_id: str, + ) -> list[dict[str, str]]: + """Rebuild a role/content conversation list from the adk.messages ledger. + + Messages are the source of truth and live in adk.messages, not in workflow + state — so after a continue-as-new recycle the conversation is reconstructed + from the ledger rather than carried through workflow history. Returns an + OpenAI-style ``[{"role": "user"|"assistant", "content": str}, ...]`` list. + + Returns an empty list unless this is a continued run on patched code — i.e. + only a recycled run rebuilds from the ledger. On the original run there is + nothing to restore and no ``adk.messages`` activity is scheduled, so callers + can invoke this unconditionally to seed their state and pre-patch in-flight + workflows stay deterministic on replay. + """ + if not self.recycling_active() or not self.is_continued_run(): + return [] + # Lazy import to avoid any import cycle at module load. + from agentex.lib import adk + from agentex.types.text_content import TextContent + + # Page through the whole ledger (offset pagination, 1-based page_number) + # so a long chat that recycles after many turns restores its full history + # rather than just the first page. Stop when a page comes back short. + messages = [] + page_number = 1 + while True: + page = await adk.messages.list( + task_id=task_id, + limit=_CONVERSATION_PAGE_SIZE, + page_number=page_number, + ) + messages.extend(page) + if len(page) < _CONVERSATION_PAGE_SIZE: + break + page_number += 1 + + conversation: list[dict[str, str]] = [] + for message in messages: + content = message.content + # Only text turns reconstruct the conversation; the isinstance check + # narrows the content union (DataContent / ToolRequestContent etc. + # have no `.content`/`.author`). + if not isinstance(content, TextContent): + continue + # Map known conversation authors explicitly. `author` may also be + # `system` or `tool`; skip those rather than defaulting them to a user + # turn, so a recycle never replays system/tool text — or the model's own + # assistant output — back into the next model call as a user prompt. + if content.author == "user": + role = "user" + elif content.author in ("agent", "assistant"): + role = "assistant" + else: + continue + conversation.append({"role": role, "content": content.content}) + return conversation diff --git a/src/agentex/lib/environment_variables.py b/src/agentex/lib/environment_variables.py index 31ce43ab8..e2db6cad0 100644 --- a/src/agentex/lib/environment_variables.py +++ b/src/agentex/lib/environment_variables.py @@ -34,6 +34,8 @@ class EnvVarKeys(str, Enum): WORKFLOW_NAME = "WORKFLOW_NAME" WORKFLOW_TASK_QUEUE = "WORKFLOW_TASK_QUEUE" WORKFLOW_EXECUTION_TIMEOUT_SECONDS = "WORKFLOW_EXECUTION_TIMEOUT_SECONDS" + WORKFLOW_CONTINUE_AS_NEW_ENABLED = "WORKFLOW_CONTINUE_AS_NEW_ENABLED" + WORKFLOW_MAX_HISTORY_LENGTH = "WORKFLOW_MAX_HISTORY_LENGTH" # Temporal Worker Configuration HEALTH_CHECK_PORT = "HEALTH_CHECK_PORT" # Auth Configuration @@ -81,6 +83,17 @@ class EnvironmentVariables(BaseModel): # agents with longer-running tasks should override this. Must be > 0 — a # zero or negative timedelta would cause every submitted workflow to fail. WORKFLOW_EXECUTION_TIMEOUT_SECONDS: int = Field(default=86400, gt=0) + # Opt-in: when enabled, a BaseWorkflow-derived agent recycles its Temporal + # event history via continue-as-new before hitting the server's history limit, + # so a single chat/session can stay open indefinitely. Off by default so + # existing agents are unaffected. continue-as-new bounds HISTORY SIZE; it does + # NOT extend WORKFLOW_EXECUTION_TIMEOUT_SECONDS (that timeout is chain-wide and + # bounds wall-clock lifetime) — raise that knob too to keep workflows long-lived. + WORKFLOW_CONTINUE_AS_NEW_ENABLED: bool = False + # Optional history-length threshold (event count) at which to continue-as-new. + # When unset, recycling relies solely on Temporal's own is_continue_as_new_suggested() + # signal. Set a value below ~50k to recycle earlier with headroom. + WORKFLOW_MAX_HISTORY_LENGTH: int | None = Field(default=None, gt=0) # Temporal Worker Configuration HEALTH_CHECK_PORT: int = 80 # Auth Configuration diff --git a/tests/lib/core/temporal/test_base_workflow_continue_as_new.py b/tests/lib/core/temporal/test_base_workflow_continue_as_new.py new file mode 100644 index 000000000..60d607815 --- /dev/null +++ b/tests/lib/core/temporal/test_base_workflow_continue_as_new.py @@ -0,0 +1,196 @@ +"""Unit tests for BaseWorkflow's continue-as-new decision logic. + +These exercise ``should_continue_as_new`` in isolation by faking +``workflow.info()`` so we don't need a running Temporal server. The drain + +``workflow.continue_as_new`` mechanics in ``drain_and_continue_as_new`` are best +covered by a replay/integration test against a Temporal test environment (a +follow-up); here we lock down the threshold logic that decides *when* to recycle. +""" + +from __future__ import annotations + +from typing import override + +import pytest + +from agentex.lib.core.temporal.workflows import workflow as base_workflow_module +from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow + + +class _ConcreteWorkflow(BaseWorkflow): + """Minimal concrete subclass so we can instantiate the ABC in a test. + + Bypasses BaseWorkflow.__init__ (which reads cached env config) and sets the + two config attributes directly so each test controls them explicitly. + """ + + def __init__(self, *, enabled: bool, max_history_length: int | None) -> None: + self.display_name = "test" + self._continue_as_new_enabled = enabled + self._max_history_length = max_history_length + + @override + async def on_task_event_send(self, params) -> None: # pragma: no cover - unused + raise NotImplementedError + + @override + async def on_task_create(self, params) -> None: # pragma: no cover - unused + raise NotImplementedError + + +class _FakeInfo: + def __init__( + self, + *, + suggested: bool, + history_length: int, + continued_run_id: str | None = None, + ) -> None: + self._suggested = suggested + self._history_length = history_length + self.continued_run_id = continued_run_id + + def is_continue_as_new_suggested(self) -> bool: + return self._suggested + + def get_current_history_length(self) -> int: + return self._history_length + + +@pytest.fixture +def patch_info(monkeypatch): + """Patch ``workflow.info`` used inside the BaseWorkflow module.""" + + def _apply( + *, suggested: bool, history_length: int, continued_run_id: str | None = None + ) -> None: + monkeypatch.setattr( + base_workflow_module.workflow, + "info", + lambda: _FakeInfo( + suggested=suggested, + history_length=history_length, + continued_run_id=continued_run_id, + ), + ) + + return _apply + + +@pytest.fixture +def patch_patched(monkeypatch): + """Patch ``workflow.patched`` used inside the BaseWorkflow module.""" + + def _apply(value: bool) -> None: + monkeypatch.setattr( + base_workflow_module.workflow, "patched", lambda _patch_id: value + ) + + return _apply + + +def test_disabled_never_recycles(patch_info): + patch_info(suggested=True, history_length=10_000_000) + wf = _ConcreteWorkflow(enabled=False, max_history_length=1) + assert wf.should_continue_as_new() is False + + +def test_recycles_when_temporal_suggests(patch_info): + patch_info(suggested=True, history_length=5) + wf = _ConcreteWorkflow(enabled=True, max_history_length=None) + assert wf.should_continue_as_new() is True + + +def test_recycles_at_history_threshold(patch_info): + patch_info(suggested=False, history_length=10_000) + wf = _ConcreteWorkflow(enabled=True, max_history_length=10_000) + assert wf.should_continue_as_new() is True + + +def test_does_not_recycle_below_threshold(patch_info): + patch_info(suggested=False, history_length=9_999) + wf = _ConcreteWorkflow(enabled=True, max_history_length=10_000) + assert wf.should_continue_as_new() is False + + +def test_no_threshold_relies_only_on_suggestion(patch_info): + patch_info(suggested=False, history_length=10_000_000) + wf = _ConcreteWorkflow(enabled=True, max_history_length=None) + assert wf.should_continue_as_new() is False + + +# --- is_continued_run / recycling_active: the prologue determinism gates --- # + + +def test_is_continued_run_false_on_original_run(patch_info): + patch_info(suggested=False, history_length=0, continued_run_id=None) + wf = _ConcreteWorkflow(enabled=True, max_history_length=None) + assert wf.is_continued_run() is False + + +def test_is_continued_run_true_after_recycle(patch_info): + patch_info(suggested=False, history_length=0, continued_run_id="run-123") + wf = _ConcreteWorkflow(enabled=True, max_history_length=None) + assert wf.is_continued_run() is True + + +def test_recycling_active_requires_enabled_and_patched(patch_patched): + # Disabled short-circuits before workflow.patched is consulted. + patch_patched(True) + assert _ConcreteWorkflow(enabled=False, max_history_length=None).recycling_active() is False + # Enabled but unpatched (pre-patch in-flight workflow) → inactive. + patch_patched(False) + assert _ConcreteWorkflow(enabled=True, max_history_length=None).recycling_active() is False + # Enabled and patched → active. + patch_patched(True) + assert _ConcreteWorkflow(enabled=True, max_history_length=None).recycling_active() is True + + +@pytest.mark.asyncio +async def test_conversation_from_messages_empty_unless_continued_and_patched( + patch_info, patch_patched +): + # Patched but original (not continued) run → no ledger fetch, empty result. + patch_patched(True) + patch_info(suggested=False, history_length=0, continued_run_id=None) + wf = _ConcreteWorkflow(enabled=True, max_history_length=None) + assert await wf.conversation_from_messages("task-1") == [] + + # Continued run but unpatched (pre-patch in-flight) → still empty (no command). + patch_patched(False) + patch_info(suggested=False, history_length=0, continued_run_id="run-1") + wf = _ConcreteWorkflow(enabled=True, max_history_length=None) + assert await wf.conversation_from_messages("task-1") == [] + + +@pytest.mark.asyncio +async def test_conversation_from_messages_maps_authors_explicitly( + monkeypatch, patch_info, patch_patched +): + """user→user and agent→assistant (MessageAuthor is Literal["user","agent"]).""" + import types + + import agentex.lib.adk as adk_module + from agentex.types.text_content import TextContent + + ledger = [ + types.SimpleNamespace(content=TextContent(author="user", content="hi")), + types.SimpleNamespace(content=TextContent(author="agent", content="hello")), + types.SimpleNamespace(content=TextContent(author="user", content="more")), + ] + + async def _fake_list(task_id, **_kwargs): # noqa: ARG001 + return ledger + + monkeypatch.setattr(adk_module.messages, "list", _fake_list) + patch_patched(True) + patch_info(suggested=False, history_length=0, continued_run_id="run-1") + + wf = _ConcreteWorkflow(enabled=True, max_history_length=None) + convo = await wf.conversation_from_messages("task-1") + + assert convo == [ + {"role": "user", "content": "hi"}, + {"role": "assistant", "content": "hello"}, + {"role": "user", "content": "more"}, + ] diff --git a/uv.lock b/uv.lock index 8a41ba29c..620e8a64e 100644 --- a/uv.lock +++ b/uv.lock @@ -15,7 +15,7 @@ members = [ [[package]] name = "agentex-client" -version = "0.13.0" +version = "0.15.0" source = { editable = "." } dependencies = [ { name = "anyio" }, @@ -91,7 +91,7 @@ dev = [ [[package]] name = "agentex-sdk" -version = "0.13.0" +version = "0.14.0" source = { editable = "adk" } dependencies = [ { name = "agentex-client" },