Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ async def on_task_create(self, params: CreateTaskParams) -> None:
# 2. Wait for the task to be completed indefinitely. If we don't do this the workflow will close as soon as this function returns. Temporal can run hundreds of millions of workflows in parallel, so you don't need to worry about too many workflows running at once.

# Thus, if you want this agent to field events indefinitely (or for a long time) you need to wait for a condition to be met.
await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
)
# run_until_complete behaves exactly like the old indefinite wait_condition
# by default; when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set it also recycles
# event history via continue-as-new before Temporal's history limit. This
# agent keeps no cross-turn conversation state, so the only thing carried
# forward across a recycle is `params`.
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,19 @@ def __init__(self):
self._complete_task = False
self._state: StateModel | None = None

async def _rehydrate_state(self, task_id: str) -> None:
"""Seed in-memory state, rebuilding the conversation from adk.messages.

Messages are the source of truth and live in adk.messages — NOT in workflow
state. On a brand-new task the ledger is empty (fresh start); on a continued
run (after continue-as-new) we reconstruct input_list from the ledger so the
recycle is invisible to the user. ``conversation_from_messages`` returns []
when continue-as-new is disabled, so the default path is unchanged.
"""
conversation = await self.conversation_from_messages(task_id)
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
self._state = StateModel(input_list=conversation, turn_number=turn_number)

@workflow.signal(name=SignalName.RECEIVE_EVENT)
@override
async def on_task_event_send(self, params: SendEventParams) -> None:
Expand Down Expand Up @@ -253,24 +266,29 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
if span and self._state:
span.output = self._state.model_dump()

# NOTE: we do NOT persist the conversation to workflow state or adk.state.
# Every user + agent message is already written to the adk.messages ledger
# (the user echo above + the agent auto-send), which is the source of truth.
# On a continue-as-new recycle we rebuild input_list from that ledger in
# _rehydrate_state — messages live in adk.messages, state in adk.state.

@workflow.run
@override
async def on_task_create(self, params: CreateTaskParams) -> None:
logger.info(f"Received task create params: {params}")

# 1. Initialize the state. You can either do this here or in the __init__ method.
# This function is triggered whenever a client creates a task for this agent.
# It is not re-triggered when a new event is sent to the task.
self._state = StateModel(
input_list=[],
turn_number=0,
)

# 2. Wait for the task to be completed indefinitely. If we don't do this the workflow will close as soon as this function returns. Temporal can run hundreds of millions of workflows in parallel, so you don't need to worry about too many workflows running at once.

# Thus, if you want this agent to field events indefinitely (or for a long time) you need to wait for a condition to be met.

await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
)
# 1. Initialize (or, on a continued run, re-hydrate) the state. This
# function runs both when a client first creates the task AND when the
# workflow recycles itself via continue-as-new — on the latter the
# in-memory state was reset, so we reload it from the adk.messages ledger.
await self._rehydrate_state(params.task.id)

# 2. Keep the workflow open to field events. Temporal can run hundreds of
# millions of workflows in parallel, so staying open is cheap — but a single
# ever-open run accumulates event history until it hits Temporal's ~50k-event
# / 50MB limit. run_until_complete keeps the workflow open exactly like the
# old indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set)
# recycles history via continue-as-new before that limit. The conversation
# is rebuilt from adk.messages on each run, so the carry-forward is just
# `params`.
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
Original file line number Diff line number Diff line change
Expand Up @@ -455,14 +455,16 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
async def on_task_create(self, params: CreateTaskParams) -> None:
logger.info(f"Received task create params: {params}")

# 1. Initialize the state. You can either do this here or in the
# __init__ method. This function is triggered whenever a client
# creates a task for this agent. It is not re-triggered when a new
# event is sent to the task.
self._state = StateModel(
input_list=[],
turn_number=0,
)
# 1. Initialize (or, on a continued run, re-hydrate) the state. Messages
# are the source of truth and live in adk.messages — NOT in workflow
# state. On a brand-new task the ledger is empty (fresh start); on a
# continued run (after continue-as-new) we reconstruct input_list from
# the ledger so the recycle is invisible to the user.
# conversation_from_messages returns [] when continue-as-new is disabled,
# so the default path is unchanged.
conversation = await self.conversation_from_messages(params.task.id)
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
self._state = StateModel(input_list=conversation, turn_number=turn_number)

# 2. Wait for the task to be completed indefinitely. If we don't do
# this the workflow will close as soon as this function returns.
Expand All @@ -472,10 +474,9 @@ async def on_task_create(self, params: CreateTaskParams) -> None:
# Thus, if you want this agent to field events indefinitely (or for
# a long time) you need to wait for a condition to be met.

await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # Set a timeout if you want to prevent the task
# from running indefinitely. Generally this is not needed.
# Temporal can run hundreds of millions of workflows in parallel
# and more. Only do this if you have a specific reason to do so.
)
# run_until_complete keeps the workflow open exactly like the old
# indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set)
# recycles event history via continue-as-new before Temporal's
# ~50k-event / 50MB limit. The conversation is rebuilt from adk.messages
# on each run, so the carry-forward is just `params`.
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
Original file line number Diff line number Diff line change
Expand Up @@ -258,29 +258,34 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
# ============================================================================
# WORKFLOW INITIALIZATION: Initialize State
# ============================================================================
# Initialize the conversation state with an empty history
# This will be populated as the conversation progresses
self._state = StateModel(
input_list=[],
turn_number=0,
)
# Initialize (or, on a continued run, re-hydrate) the conversation state.
# Messages are the source of truth and live in adk.messages — NOT in
# workflow state. On a brand-new task the ledger is empty (fresh start);
# on a continued run (after continue-as-new) we reconstruct input_list
# from the ledger so the recycle is invisible to the user.
# conversation_from_messages returns [] when continue-as-new is disabled,
# so the default path is unchanged.
conversation = await self.conversation_from_messages(params.task.id)
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
self._state = StateModel(input_list=conversation, turn_number=turn_number)

# ============================================================================
# WORKFLOW INITIALIZATION: Send Welcome Message
# ============================================================================
# Acknowledge that the task has been created and the agent is ready.
# This message appears once when the conversation starts.
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author="agent",
content=f"🌸 Hello! I'm your Haiku Assistant, powered by OpenAI Agents SDK + Temporal! 🌸\n\n"
f"I'll respond to all your messages in beautiful haiku form. "
f"This conversation is now durable - even if I restart, our chat continues!\n\n"
f"Task created with params:\n{json.dumps(params.params, indent=2)}\n\n"
f"Send me a message and I'll respond with a haiku! 🎋",
),
)
if not self.is_continued_run():
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author="agent",
content=f"🌸 Hello! I'm your Haiku Assistant, powered by OpenAI Agents SDK + Temporal! 🌸\n\n"
f"I'll respond to all your messages in beautiful haiku form. "
f"This conversation is now durable - even if I restart, our chat continues!\n\n"
f"Task created with params:\n{json.dumps(params.params, indent=2)}\n\n"
f"Send me a message and I'll respond with a haiku! 🎋",
),
)

# ============================================================================
# WORKFLOW PERSISTENCE: Wait for Completion Signal
Expand All @@ -294,10 +299,12 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
# - Temporal can handle millions of such concurrent workflows
# - If worker crashes, workflow resumes exactly where it left off
# - All conversation state is preserved in Temporal's event log
await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # No timeout = truly long-running agent conversation
)
# run_until_complete keeps the workflow open exactly like the old
# indefinite wait, and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set)
# recycles event history via continue-as-new before Temporal's
# ~50k-event / 50MB limit. The conversation is rebuilt from adk.messages
# on each run, so the carry-forward is just `params`.
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
return "Agent conversation completed"

@workflow.signal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,25 +331,36 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
async def on_task_create(self, params: CreateTaskParams) -> str:
logger.info(f"Received task create params: {params}")

# Initialize the conversation state with an empty history
# Initialize (or, on a continued run, re-hydrate) the conversation state.
# This runs both when a client first creates the task AND when the workflow
# recycles itself via continue-as-new — on the latter the in-memory state was
# reset, so we rebuild input_list from the adk.messages ledger (the source of
# truth). conversation_from_messages returns [] when continue-as-new is
# disabled, so the default path is an empty fresh start as before.
conversation = await self.conversation_from_messages(params.task.id)
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
self._state = StateModel(
input_list=[],
turn_number=0,
input_list=conversation,
turn_number=turn_number,
)

# 1. Acknowledge that the task has been created.
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author="agent",
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
),
)

await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so.
)
if not self.is_continued_run():
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author="agent",
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
),
)

# Keep the workflow open to field events. run_until_complete behaves exactly
# like the old indefinite wait_condition, and (when
# WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) recycles event history via
# continue-as-new before Temporal's ~50k-event / 50MB limit. Messages live in
# adk.messages and are rebuilt above, so the only state carried forward is
# params.
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
return "Task completed"

@workflow.signal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,27 +216,43 @@ async def on_task_create(self, params: CreateTaskParams) -> str:
"""
logger.info(f"Received task create params: {params}")

# Initialize the conversation state with an empty history
# Initialize (or, on a continued run, re-hydrate) the conversation state.
# This runs both when a client first creates the task AND when the workflow
# recycles itself via continue-as-new — on the latter the in-memory state was
# reset, so we rebuild input_list from the adk.messages ledger (the source of
# truth). conversation_from_messages returns [] when continue-as-new is
# disabled, so the default path is an empty fresh start as before.
#
# The only cross-turn parent state here is input_list/turn_number; the
# human-approval state lives entirely in the child workflow spawned by
# wait_for_confirmation (it waits for fulfill_order_signal), and an in-flight
# approval turn keeps on_task_event_send running — run_until_complete drains
# all in-flight handlers before recycling, so it is never cut off. Hence
# conversation_from_messages alone is enough; no adk.state is needed.
conversation = await self.conversation_from_messages(params.task.id)
turn_number = sum(1 for entry in conversation if entry["role"] == "user")
self._state = StateModel(
input_list=[],
turn_number=0,
input_list=conversation,
turn_number=turn_number,
)

# Send welcome message when task is created
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author="agent",
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
),
)

# Keep workflow running indefinitely to handle user messages and human approvals
# This survives system failures and can resume exactly where it left off
await workflow.wait_condition(
lambda: self._complete_task,
timeout=None, # No timeout for long-running human-in-the-loop workflows
)
if not self.is_continued_run():
await adk.messages.create(
task_id=params.task.id,
content=TextContent(
author="agent",
content=f"Hello! I've received your task. Normally you can do some state initialization here, or just pass and do nothing until you get your first event. For now I'm just acknowledging that I've received a task with the following params:\n\n{json.dumps(params.params, indent=2)}.\n\nYou should only see this message once, when the task is created. All subsequent events will be handled by the `on_task_event_send` handler.",
),
)

# Keep the workflow open to handle user messages and human approvals.
# run_until_complete behaves exactly like the old indefinite wait_condition,
# and (when WORKFLOW_CONTINUE_AS_NEW_ENABLED is set) recycles event history
# via continue-as-new before Temporal's ~50k-event / 50MB limit, draining any
# in-flight approval turn first. Messages live in adk.messages and are rebuilt
# above, so the only state carried forward is params.
await self.run_until_complete(params, is_complete=lambda: self._complete_task)
return "Task completed"

# TEMPORAL UI (localhost:8080):
Expand Down
Loading
Loading