forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
352 lines (275 loc) · 12.8 KB
/
main.py
File metadata and controls
352 lines (275 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# Copyright (c) Microsoft. All rights reserved.
"""
CustomerSupport workflow sample.
This workflow demonstrates using multiple agents to provide automated
troubleshooting steps to resolve common issues with escalation options.
Example input: "My PC keeps rebooting and I can't use it."
Usage:
python main.py
The workflow:
1. SelfServiceAgent: Works with user to provide troubleshooting steps
2. TicketingAgent: Creates a ticket if issue needs escalation
3. TicketRoutingAgent: Determines which team should handle the ticket
4. WindowsSupportAgent: Provides Windows-specific troubleshooting
5. TicketResolutionAgent: Resolves the ticket when issue is fixed
6. TicketEscalationAgent: Escalates to human support if needed
"""
import asyncio
import json
import logging
import os
import uuid
from pathlib import Path
from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.declarative import (
AgentExternalInputRequest,
AgentExternalInputResponse,
WorkflowFactory,
)
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from ticketing_plugin import TicketingPlugin
logging.basicConfig(level=logging.ERROR)
# Load environment variables from .env file
load_dotenv()
# ANSI color codes for output formatting
CYAN = "\033[36m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
MAGENTA = "\033[35m"
RESET = "\033[0m"
# Agent Instructions
SELF_SERVICE_INSTRUCTIONS = """
Use your knowledge to work with the user to provide the best possible troubleshooting steps.
- If the user confirms that the issue is resolved, then the issue is resolved.
- If the user reports that the issue persists, then escalate.
""".strip()
TICKETING_INSTRUCTIONS = """Always create a ticket in Azure DevOps using the available tools.
Include the following information in the TicketSummary.
- Issue description: {{IssueDescription}}
- Attempted resolution steps: {{AttemptedResolutionSteps}}
After creating the ticket, provide the user with the ticket ID."""
TICKET_ROUTING_INSTRUCTIONS = """Determine how to route the given issue to the appropriate support team.
Choose from the available teams and their functions:
- Windows Activation Support: Windows license activation issues
- Windows Support: Windows related issues
- Azure Support: Azure related issues
- Network Support: Network related issues
- Hardware Support: Hardware related issues
- Microsoft Office Support: Microsoft Office related issues
- General Support: General issues not related to the above categories"""
WINDOWS_SUPPORT_INSTRUCTIONS = """
Use your knowledge to work with the user to provide the best possible troubleshooting steps
for issues related to Windows operating system.
- Utilize the "Attempted Resolutions Steps" as a starting point for your troubleshooting.
- Never escalate without troubleshooting with the user.
- If the user confirms that the issue is resolved, then the issue is resolved.
- If the user reports that the issue persists, then escalate.
Issue: {{IssueDescription}}
Attempted Resolution Steps: {{AttemptedResolutionSteps}}"""
RESOLUTION_INSTRUCTIONS = """Resolve the following ticket in Azure DevOps.
Always include the resolution details.
- Ticket ID: #{{TicketId}}
- Resolution Summary: {{ResolutionSummary}}"""
ESCALATION_INSTRUCTIONS = """
You escalate the provided issue to human support team by sending an email.
Here are some additional details that might help:
- TicketId : {{TicketId}}
- IssueDescription : {{IssueDescription}}
- AttemptedResolutionSteps : {{AttemptedResolutionSteps}}
Before escalating, gather the user's email address for follow-up.
If not known, ask the user for their email address so that the support team can reach them when needed.
When sending the email, include the following details:
- To: support@contoso.com
- Cc: user's email address
- Subject of the email: "Support Ticket - {TicketId} - [Compact Issue Description]"
- Body:
- Issue description
- Attempted resolution steps
- User's email address
- Any other relevant information from the conversation history
Assure the user that their issue will be resolved and provide them with a ticket ID for reference."""
# Pydantic models for structured outputs
class SelfServiceResponse(BaseModel):
"""Response from self-service agent evaluation."""
IsResolved: bool = Field(description="True if the user issue/ask has been resolved.")
NeedsTicket: bool = Field(description="True if the user issue/ask requires that a ticket be filed.")
IssueDescription: str = Field(description="A concise description of the issue.")
AttemptedResolutionSteps: str = Field(description="An outline of the steps taken to attempt resolution.")
class TicketingResponse(BaseModel):
"""Response from ticketing agent."""
TicketId: str = Field(description="The identifier of the ticket created in response to the user issue.")
TicketSummary: str = Field(description="The summary of the ticket created in response to the user issue.")
class RoutingResponse(BaseModel):
"""Response from routing agent."""
TeamName: str = Field(description="The name of the team to route the issue")
class SupportResponse(BaseModel):
"""Response from support agent."""
IsResolved: bool = Field(description="True if the user issue/ask has been resolved.")
NeedsEscalation: bool = Field(
description="True resolution could not be achieved and the issue/ask requires escalation."
)
ResolutionSummary: str = Field(description="The summary of the steps that led to resolution.")
class EscalationResponse(BaseModel):
"""Response from escalation agent."""
IsComplete: bool = Field(description="Has the email been sent and no more user input is required.")
UserMessage: str = Field(description="A natural language message to the user.")
async def main() -> None:
"""Run the customer support workflow."""
# Create ticketing plugin
plugin = TicketingPlugin()
# Create Azure OpenAI client
client = AzureOpenAIResponsesClient(
project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
# This sample has been tested only on `gpt-5.1` and may not work as intended on other models
# This sample is known to fail on `gpt-5-mini` reasoning input (GH issue #4059)
deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
credential=AzureCliCredential(),
)
# Create agents with structured outputs
self_service_agent = client.as_agent(
name="SelfServiceAgent",
instructions=SELF_SERVICE_INSTRUCTIONS,
default_options={"response_format": SelfServiceResponse},
)
ticketing_agent = client.as_agent(
name="TicketingAgent",
instructions=TICKETING_INSTRUCTIONS,
tools=plugin.get_functions(),
default_options={"response_format": TicketingResponse},
)
routing_agent = client.as_agent(
name="TicketRoutingAgent",
instructions=TICKET_ROUTING_INSTRUCTIONS,
tools=[plugin.get_ticket],
default_options={"response_format": RoutingResponse},
)
windows_support_agent = client.as_agent(
name="WindowsSupportAgent",
instructions=WINDOWS_SUPPORT_INSTRUCTIONS,
tools=[plugin.get_ticket],
default_options={"response_format": SupportResponse},
)
resolution_agent = client.as_agent(
name="TicketResolutionAgent",
instructions=RESOLUTION_INSTRUCTIONS,
tools=[plugin.resolve_ticket],
)
escalation_agent = client.as_agent(
name="TicketEscalationAgent",
instructions=ESCALATION_INSTRUCTIONS,
tools=[plugin.get_ticket, plugin.send_notification],
default_options={"response_format": EscalationResponse},
)
# Agent registry for lookup
agents = {
"SelfServiceAgent": self_service_agent,
"TicketingAgent": ticketing_agent,
"TicketRoutingAgent": routing_agent,
"WindowsSupportAgent": windows_support_agent,
"TicketResolutionAgent": resolution_agent,
"TicketEscalationAgent": escalation_agent,
}
# Print loaded agents (similar to .NET "PROMPT AGENT: AgentName:1")
for agent_name in agents:
print(f"{CYAN}PROMPT AGENT: {agent_name}:1{RESET}")
# Create workflow factory
factory = WorkflowFactory(agents=agents)
# Load workflow from YAML
samples_root = Path(__file__).parent.parent.parent.parent.parent.parent.parent
workflow_path = samples_root / "workflow-samples" / "CustomerSupport.yaml"
if not workflow_path.exists():
# Fall back to local copy if workflow-samples doesn't exist
workflow_path = Path(__file__).parent / "workflow.yaml"
workflow = factory.create_workflow_from_yaml_path(workflow_path)
print()
print("=" * 60)
# Example input
user_input = "My computer won't boot"
pending_request_id: str | None = None
# Track responses for formatting
accumulated_response: str = ""
last_agent_name: str | None = None
print(f"\n{GREEN}INPUT:{RESET} {user_input}\n")
while True:
if pending_request_id:
# Continue workflow with user response
print(f"\n{YELLOW}WORKFLOW:{RESET} Restore\n")
response = AgentExternalInputResponse(user_input=user_input)
stream = workflow.run(stream=True, responses={pending_request_id: response})
pending_request_id = None
else:
# Start workflow
stream = workflow.run(user_input, stream=True)
async for event in stream:
if event.type == "output":
data = event.data
# source_executor_id is only available on request_info events.
# For output events, use executor_id to identify the emitting node.
source_id = event.executor_id or ""
# Check if this is a SendActivity output (activity text from log_ticket, log_route, etc.)
if "log_" in source_id.lower():
# Print any accumulated agent response first
if accumulated_response and last_agent_name:
msg_id = f"msg_{uuid.uuid4().hex[:32]}"
print(f"{CYAN}{last_agent_name.upper()}:{RESET} [{msg_id}]")
try:
parsed = json.loads(accumulated_response)
print(json.dumps(parsed))
except (json.JSONDecodeError, TypeError):
print(accumulated_response)
accumulated_response = ""
last_agent_name = None
# Print activity
print(f"\n{MAGENTA}ACTIVITY:{RESET}")
print(data)
else:
# Accumulate agent response (streaming text)
if isinstance(data, str):
accumulated_response += data
else:
accumulated_response += str(data)
elif event.type == "request_info" and isinstance(event.data, AgentExternalInputRequest):
request = event.data
# The agent_response from the request contains the structured response
agent_name = request.agent_name
agent_response = request.agent_response
# Print the agent's response
if agent_response:
msg_id = f"msg_{uuid.uuid4().hex[:32]}"
print(f"{CYAN}{agent_name.upper()}:{RESET} [{msg_id}]")
try:
parsed = json.loads(agent_response)
print(json.dumps(parsed))
except (json.JSONDecodeError, TypeError):
print(agent_response)
# Clear accumulated since we printed from the request
accumulated_response = ""
last_agent_name = agent_name
pending_request_id = event.request_id
print(f"\n{YELLOW}WORKFLOW:{RESET} Yield")
# Print any remaining accumulated response at end of stream
if accumulated_response:
# Try to identify which agent this came from based on content
msg_id = f"msg_{uuid.uuid4().hex[:32]}"
print(f"\nResponse: [{msg_id}]")
try:
parsed = json.loads(accumulated_response)
print(json.dumps(parsed))
except (json.JSONDecodeError, TypeError):
print(accumulated_response)
accumulated_response = ""
if not pending_request_id:
break
# Get next user input
user_input = input(f"\n{GREEN}INPUT:{RESET} ").strip() # noqa: ASYNC250
if not user_input:
print("Exiting...")
break
print()
print("\n" + "=" * 60)
print("Workflow Complete")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())