forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsequential_executors.py
More file actions
85 lines (64 loc) · 2.84 KB
/
sequential_executors.py
File metadata and controls
85 lines (64 loc) · 2.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import cast
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
handler,
)
from typing_extensions import Never
"""
Sample: Sequential workflow with streaming.
Two custom executors run in sequence. The first converts text to uppercase,
the second reverses the text and completes the workflow. The streaming run loop prints events as they occur.
Purpose:
Show how to define explicit Executor classes with @handler methods, wire them in order with
WorkflowBuilder, and consume streaming events. Demonstrate typed WorkflowContext[T_Out, T_W_Out] for outputs,
ctx.send_message to pass intermediate values, and ctx.yield_output to provide workflow outputs.
Prerequisites:
- No external services required.
"""
class UpperCaseExecutor(Executor):
"""Converts an input string to uppercase and forwards it.
Concepts:
- @handler methods define invokable steps.
- WorkflowContext[str] indicates this step emits a string to the next node.
"""
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Transform the input to uppercase and send it downstream."""
result = text.upper()
# Pass the intermediate result to the next executor in the chain.
await ctx.send_message(result)
class ReverseTextExecutor(Executor):
"""Reverses the incoming string and yields workflow output.
Concepts:
- Use ctx.yield_output to provide workflow outputs when the terminal result is ready.
- The terminal node does not forward messages further.
"""
@handler
async def reverse_text(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
"""Reverse the input string and yield the workflow output."""
result = text[::-1]
await ctx.yield_output(result)
async def main() -> None:
"""Build a two step sequential workflow and run it with streaming to observe events."""
# Step 1: Build the workflow graph.
# Order matters. We connect upper_case_executor -> reverse_text_executor and set the start.
upper_case_executor = UpperCaseExecutor(id="upper_case_executor")
reverse_text_executor = ReverseTextExecutor(id="reverse_text_executor")
workflow = (
WorkflowBuilder(start_executor=upper_case_executor).add_edge(upper_case_executor, reverse_text_executor).build()
)
# Step 2: Stream events for a single input.
# The stream will include executor invoke and completion events, plus workflow outputs.
outputs: list[str] = []
async for event in workflow.run("hello world", stream=True):
print(f"Event: {event}")
if event.type == "output":
outputs.append(cast(str, event.data))
if outputs:
print(f"Workflow outputs: {outputs}")
if __name__ == "__main__":
asyncio.run(main())