-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Expand file tree
/
Copy pathusage_tracking_middleware.py
More file actions
184 lines (143 loc) · 6.26 KB
/
usage_tracking_middleware.py
File metadata and controls
184 lines (143 loc) · 6.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# Copyright (c) Microsoft. All rights reserved.
"""
This sample demonstrates a single chat middleware that tracks per-model-call usage
for both non-streaming and streaming tool-loop runs.
"""
import asyncio
from collections.abc import Awaitable, Callable
from random import randint
from typing import Annotated
from agent_framework import (
Agent,
ChatContext,
ChatResponse,
ChatResponseUpdate,
ResponseStream,
chat_middleware,
tool,
)
from agent_framework.openai import OpenAIResponsesClient
from dotenv import load_dotenv
from pydantic import Field
# Load environment variables from .env file
load_dotenv()
NON_STREAMING_CALL_COUNT = 0
STREAMING_CALL_COUNT = 0
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production;
# see samples/02-agents/tools/function_tool_with_approval.py
# and samples/02-agents/tools/function_tool_with_approval_and_sessions.py.
@tool(approval_mode="never_require")
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
def _reset_usage_counters() -> None:
"""Reset call counters between sample runs."""
global NON_STREAMING_CALL_COUNT, STREAMING_CALL_COUNT
NON_STREAMING_CALL_COUNT = 0
STREAMING_CALL_COUNT = 0
def _create_agent() -> Agent:
"""Create the shared agent used by both demonstrations."""
return Agent(
client=OpenAIResponsesClient(),
instructions=(
"You are a weather assistant. Always call the weather tool before answering weather questions, "
"then summarize the tool result in one short paragraph."
),
tools=[get_weather],
middleware=[print_usage],
)
@chat_middleware
async def print_usage(
context: ChatContext,
call_next: Callable[[], Awaitable[None]],
) -> None:
"""Print usage for each inner model call in both non-streaming and streaming runs."""
global NON_STREAMING_CALL_COUNT, STREAMING_CALL_COUNT
if context.stream:
STREAMING_CALL_COUNT += 1
call_number = STREAMING_CALL_COUNT
usage_seen_in_updates = False
def capture_usage_update(update: ChatResponseUpdate) -> ChatResponseUpdate:
nonlocal usage_seen_in_updates
for content in update.contents:
if content.type == "usage":
usage_seen_in_updates = True
print(f"\n[Streaming model call #{call_number}] Usage update: {content.usage_details}")
return update
def capture_final_usage(result: ChatResponse) -> ChatResponse:
if not usage_seen_in_updates and result.usage_details:
print(f"\n[Streaming model call #{call_number}] Final usage: {result.usage_details}")
return result
context.stream_transform_hooks.append(capture_usage_update)
context.stream_result_hooks.append(capture_final_usage)
await call_next()
return
NON_STREAMING_CALL_COUNT += 1
call_number = NON_STREAMING_CALL_COUNT
await call_next()
response = context.result
if isinstance(response, ChatResponse) and response.usage_details:
print(f"[Non-streaming model call #{call_number}] Usage: {response.usage_details}")
async def non_streaming_usage_example() -> None:
"""Run the non-streaming usage tracking example."""
_reset_usage_counters()
print("\n=== Non-streaming per-call usage tracking ===")
# 1. Create an agent with middleware that prints usage after each inner model call.
agent = _create_agent()
# 2. Run a weather question and require a tool call so the function loop performs multiple model calls.
query = "What is the weather in Seattle, and should I bring an umbrella?"
print(f"User: {query}")
result = await agent.run(
query,
options={"tool_choice": "required"},
)
# 3. Print the final user-visible answer after the middleware already logged per-call usage.
print(f"Assistant: {result.text}")
async def streaming_usage_example() -> None:
"""Run the streaming usage tracking example."""
_reset_usage_counters()
print("\n=== Streaming per-call usage tracking ===")
# 1. Create an agent with middleware that watches streaming usage for each inner model call.
agent = _create_agent()
# 2. Start a streaming run and force tool usage so the function loop performs multiple model calls.
query = "What is the weather in Portland, and should I bring a jacket?"
print(f"User: {query}")
print("Assistant: ", end="", flush=True)
stream: ResponseStream = agent.run(
query,
stream=True,
options={"tool_choice": "required"},
)
# 3. Consume the stream normally while the middleware reports usage in the background.
async for update in stream:
if update.text:
print(update.text, end="", flush=True)
print()
# 4. Finalize the stream so you can inspect the final response if needed.
final_response = await stream.get_final_response()
print(f"Final assistant message: {final_response.text}")
async def main() -> None:
"""Run both usage tracking demonstrations."""
print("=== Usage Tracking Middleware Example ===")
await non_streaming_usage_example()
await streaming_usage_example()
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
=== Usage Tracking Middleware Example ===
=== Non-streaming per-call usage tracking ===
User: What is the weather in Seattle, and should I bring an umbrella?
[Non-streaming model call #1] Usage: {'input_tokens': ..., 'output_tokens': ..., ...}
[Non-streaming model call #2] Usage: {'input_tokens': ..., 'output_tokens': ..., ...}
Assistant: Based on the weather in Seattle, ...
=== Streaming per-call usage tracking ===
User: What is the weather in Portland, and should I bring a jacket?
Assistant: Based on the weather in Portland, ...
[Streaming model call #1] Usage update: {'input_tokens': ..., 'output_tokens': ..., ...}
[Streaming model call #2] Usage update: {'input_tokens': ..., 'output_tokens': ..., ...}
Final assistant message: Based on the weather in Portland, ...
"""