-
Notifications
You must be signed in to change notification settings - Fork 110
Expand file tree
/
Copy pathworkflow.py
More file actions
32 lines (24 loc) · 1019 Bytes
/
Copy pathworkflow.py
File metadata and controls
32 lines (24 loc) · 1019 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
"""Workflow that streams model chunks to an external subscriber.
``TemporalAgent(streaming_topic="events")`` publishes each ``StreamEvent`` from
inside the model activity onto a workflow-hosted ``WorkflowStream``.
Subscribers connect via ``WorkflowStreamClient`` and read the topic in real
time. Chunks are batched on the ``streaming_batch_interval`` (default 100ms).
"""
# @@@SNIPSTART python-strands-streaming-workflow
from datetime import timedelta
from temporalio import workflow
from temporalio.contrib.strands import TemporalAgent
from temporalio.contrib.workflow_streams import WorkflowStream
@workflow.defn
class StreamingWorkflow:
def __init__(self) -> None:
self.stream = WorkflowStream()
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
streaming_topic="events",
)
@workflow.run
async def run(self, prompt: str) -> str:
result = await self.agent.invoke_async(prompt)
return str(result)
# @@@SNIPEND