forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathedge_condition.py
More file actions
246 lines (194 loc) · 10.6 KB
/
edge_condition.py
File metadata and controls
246 lines (194 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
from typing import Any
from agent_framework import ( # Core chat primitives used to build requests
Agent,
AgentExecutor,
AgentExecutorRequest, # Input message bundle for an AgentExecutor
AgentExecutorResponse,
Message,
WorkflowBuilder, # Fluent builder for wiring executors and edges
WorkflowContext, # Per-run context and event bus
executor, # Decorator to declare a Python function as a workflow executor
)
from agent_framework.azure import AzureOpenAIResponsesClient # Thin client wrapper for Azure OpenAI chat models
from azure.identity import AzureCliCredential # Uses your az CLI login for credentials
from dotenv import load_dotenv
from pydantic import BaseModel # Structured outputs for safer parsing
from typing_extensions import Never
# Load environment variables from .env file
load_dotenv()
"""
Sample: Conditional routing with structured outputs
What this sample is:
- A minimal decision workflow that classifies an inbound email as spam or not spam, then routes to the
appropriate handler.
Purpose:
- Show how to attach boolean edge conditions that inspect an AgentExecutorResponse.
- Demonstrate using Pydantic models as response_format so the agent returns JSON we can validate and parse.
- Illustrate how to transform one agent's structured result into a new AgentExecutorRequest for a downstream agent.
Prerequisites:
- AZURE_AI_PROJECT_ENDPOINT must be your Azure AI Foundry Agent Service (V2) project endpoint.
- You understand the basics of WorkflowBuilder, executors, and events in this framework.
- You know the concept of edge conditions and how they gate routes using a predicate function.
- Azure OpenAI access is configured for AzureOpenAIResponsesClient. You should be logged in with Azure CLI (AzureCliCredential)
and have the Foundry V2 Project environment variables set as documented in the getting started chat client README.
- The sample email resource file exists at workflow/resources/email.txt.
High level flow:
1) spam_detection_agent reads an email and returns DetectionResult.
2) If not spam, we transform the detection output into a user message for email_assistant_agent, then finish by
yielding the drafted reply as workflow output.
3) If spam, we short circuit to a spam handler that yields a spam notice as workflow output.
Output:
- The final workflow output is printed to stdout, either with a drafted reply or a spam notice.
Notes:
- Conditions read the agent response text and validate it into DetectionResult for robust routing.
- Executors are small and single purpose to keep control flow easy to follow.
- The workflow completes when it becomes idle, not via explicit completion events.
"""
class DetectionResult(BaseModel):
"""Represents the result of spam detection."""
# is_spam drives the routing decision taken by edge conditions
is_spam: bool
# Human readable rationale from the detector
reason: str
# The agent must include the original email so downstream agents can operate without reloading content
email_content: str
class EmailResponse(BaseModel):
"""Represents the response from the email assistant."""
# The drafted reply that a user could copy or send
response: str
def get_condition(expected_result: bool):
"""Create a condition callable that routes based on DetectionResult.is_spam."""
# The returned function will be used as an edge predicate.
# It receives whatever the upstream executor produced.
def condition(message: Any) -> bool:
# Defensive guard. If a non AgentExecutorResponse appears, let the edge pass to avoid dead ends.
if not isinstance(message, AgentExecutorResponse):
return True
try:
# Prefer parsing a structured DetectionResult from the agent JSON text.
# Using model_validate_json ensures type safety and raises if the shape is wrong.
detection = DetectionResult.model_validate_json(message.agent_response.text)
# Route only when the spam flag matches the expected path.
return detection.is_spam == expected_result
except Exception:
# Fail closed on parse errors so we do not accidentally route to the wrong path.
# Returning False prevents this edge from activating.
return False
return condition
@executor(id="send_email")
async def handle_email_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
# Downstream of the email assistant. Parse a validated EmailResponse and yield the workflow output.
email_response = EmailResponse.model_validate_json(response.agent_response.text)
await ctx.yield_output(f"Email sent:\n{email_response.response}")
@executor(id="handle_spam")
async def handle_spam_classifier_response(response: AgentExecutorResponse, ctx: WorkflowContext[Never, str]) -> None:
# Spam path. Confirm the DetectionResult and yield the workflow output. Guard against accidental non spam input.
detection = DetectionResult.model_validate_json(response.agent_response.text)
if detection.is_spam:
await ctx.yield_output(f"Email marked as spam: {detection.reason}")
else:
# This indicates the routing predicate and executor contract are out of sync.
raise RuntimeError("This executor should only handle spam messages.")
@executor(id="to_email_assistant_request")
async def to_email_assistant_request(
response: AgentExecutorResponse, ctx: WorkflowContext[AgentExecutorRequest]
) -> None:
"""Transform detection result into an AgentExecutorRequest for the email assistant.
Extracts DetectionResult.email_content and forwards it as a user message.
"""
# Bridge executor. Converts a structured DetectionResult into a Message and forwards it as a new request.
detection = DetectionResult.model_validate_json(response.agent_response.text)
user_msg = Message("user", text=detection.email_content)
await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))
def create_spam_detector_agent() -> Agent:
"""Helper to create a spam detection agent."""
# AzureCliCredential uses your current az login. This avoids embedding secrets in code.
return AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
).as_agent(
instructions=(
"You are a spam detection assistant that identifies spam emails. "
"Always return JSON with fields is_spam (bool), reason (string), and email_content (string). "
"Include the original email content in email_content."
),
name="spam_detection_agent",
default_options={"response_format": DetectionResult},
)
def create_email_assistant_agent() -> Agent:
"""Helper to create an email assistant agent."""
# AzureCliCredential uses your current az login. This avoids embedding secrets in code.
return AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
).as_agent(
instructions=(
"You are an email assistant that helps users draft professional responses to emails. "
"Your input may be a JSON object that includes 'email_content'; base your reply on that content. "
"Return JSON with a single field 'response' containing the drafted reply."
),
name="email_assistant_agent",
default_options={"response_format": EmailResponse},
)
async def main() -> None:
# Build the workflow graph.
# Start at the spam detector.
# If not spam, hop to a transformer that creates a new AgentExecutorRequest,
# then call the email assistant, then finalize.
# If spam, go directly to the spam handler and finalize.
spam_detection_agent = AgentExecutor(create_spam_detector_agent())
email_assistant_agent = AgentExecutor(create_email_assistant_agent())
workflow = (
WorkflowBuilder(start_executor=spam_detection_agent)
# Not spam path: transform response -> request for assistant -> assistant -> send email
.add_edge(spam_detection_agent, to_email_assistant_request, condition=get_condition(False))
.add_edge(to_email_assistant_request, email_assistant_agent)
.add_edge(email_assistant_agent, handle_email_response)
# Spam path: send to spam handler
.add_edge(spam_detection_agent, handle_spam_classifier_response, condition=get_condition(True))
.build()
)
# Read Email content from the sample resource file.
# This keeps the sample deterministic since the model sees the same email every run.
email_path = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), "resources", "email.txt") # noqa: ASYNC240
with open(email_path) as email_file: # noqa: ASYNC230
email = email_file.read()
# Execute the workflow. Since the start is an AgentExecutor, pass an AgentExecutorRequest.
# The workflow completes when it becomes idle (no more work to do).
request = AgentExecutorRequest(messages=[Message("user", text=email)], should_respond=True)
events = await workflow.run(request)
outputs = events.get_outputs()
if outputs:
print(f"Workflow output: {outputs[0]}")
"""
Sample Output:
Processing email:
Subject: Team Meeting Follow-up - Action Items
Hi Sarah,
I wanted to follow up on our team meeting this morning and share the action items we discussed:
1. Update the project timeline by Friday
2. Schedule client presentation for next week
3. Review the budget allocation for Q4
Please let me know if you have any questions or if I missed anything from our discussion.
Best regards,
Alex Johnson
Project Manager
Tech Solutions Inc.
alex.johnson@techsolutions.com
(555) 123-4567
----------------------------------------
Workflow output: Email sent:
Hi Alex,
Thank you for the follow-up and for summarizing the action items from this morning's meeting. The points you listed accurately reflect our discussion, and I don't have any additional items to add at this time.
I will update the project timeline by Friday, begin scheduling the client presentation for next week, and start reviewing the Q4 budget allocation. If any questions or issues arise, I'll reach out.
Thank you again for outlining the next steps.
Best regards,
Sarah
""" # noqa: E501
if __name__ == "__main__":
asyncio.run(main())