forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path05_first_workflow.py
More file actions
68 lines (50 loc) · 1.74 KB
/
05_first_workflow.py
File metadata and controls
68 lines (50 loc) · 1.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
executor,
handler,
)
from typing_extensions import Never
"""
First Workflow — Chain executors with edges
This sample builds a minimal workflow with two steps:
1. Convert text to uppercase (class-based executor)
2. Reverse the text (function-based executor)
No external services are required.
"""
# <create_workflow>
# Step 1: A class-based executor that converts text to uppercase
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert input to uppercase and forward to the next node."""
await ctx.send_message(text.upper())
# Step 2: A function-based executor that reverses the string and yields output
@executor(id="reverse_text")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
"""Reverse the string and yield the final workflow output."""
await ctx.yield_output(text[::-1])
def create_workflow():
"""Build the workflow: UpperCase → reverse_text."""
upper = UpperCase(id="upper_case")
return WorkflowBuilder(start_executor=upper).add_edge(upper, reverse_text).build()
# </create_workflow>
async def main() -> None:
# <run_workflow>
workflow = create_workflow()
events = await workflow.run("hello world")
print(f"Output: {events.get_outputs()}")
print(f"Final state: {events.get_final_state()}")
# </run_workflow>
"""
Expected output:
Output: ['DLROW OLLEH']
Final state: WorkflowRunState.IDLE
"""
if __name__ == "__main__":
asyncio.run(main())