forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathresponse_stream.py
More file actions
358 lines (283 loc) · 15.2 KB
/
response_stream.py
File metadata and controls
358 lines (283 loc) · 15.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from collections.abc import AsyncIterable, Sequence
from agent_framework import ChatResponse, ChatResponseUpdate, Content, Message, ResponseStream
"""ResponseStream: A Deep Dive
This sample explores the ResponseStream class - a powerful abstraction for working with
streaming responses in the Agent Framework.
=== Why ResponseStream Exists ===
When working with AI models, responses can be delivered in two ways:
1. **Non-streaming**: Wait for the complete response, then return it all at once
2. **Streaming**: Receive incremental updates as they're generated
Streaming provides a better user experience (faster time-to-first-token, progressive rendering)
but introduces complexity:
- How do you process updates as they arrive?
- How do you also get a final, complete response?
- How do you ensure the underlying stream is only consumed once?
- How do you add custom logic (hooks) at different stages?
ResponseStream solves all these problems by wrapping an async iterable and providing:
- Multiple consumption patterns (iteration OR direct finalization)
- Hook points for transformation, cleanup, finalization, and result processing
- The `wrap()` API to layer behavior without double-consuming the stream
=== The Four Hook Types ===
ResponseStream provides four ways to inject custom logic. All can be passed via constructor
or added later via fluent methods:
1. **Transform Hooks** (`transform_hooks=[]` or `.with_transform_hook()`)
- Called for EACH update as it's yielded during iteration
- Can transform updates before they're returned to the consumer
- Multiple hooks are called in order, each receiving the previous hook's output
- Only triggered during iteration (not when calling get_final_response directly)
2. **Cleanup Hooks** (`cleanup_hooks=[]` or `.with_cleanup_hook()`)
- Called ONCE when iteration completes (stream fully consumed), BEFORE finalizer
- Used for cleanup: closing connections, releasing resources, logging
- Cannot modify the stream or response
- Triggered regardless of how the stream ends (normal completion or exception)
3. **Finalizer** (`finalizer=` constructor parameter)
- Called ONCE when `get_final_response()` is invoked
- Receives the list of collected updates and converts to the final type
- There is only ONE finalizer per stream (set at construction)
4. **Result Hooks** (`result_hooks=[]` or `.with_result_hook()`)
- Called ONCE after the finalizer produces its result
- Transform the final response before returning
- Multiple result hooks are called in order, each receiving the previous result
- Can return None to keep the previous value unchanged
=== Two Consumption Patterns ===
**Pattern 1: Async Iteration**
```python
async for update in response_stream:
print(update.text) # Process each update
# Stream is now consumed; updates are stored internally
```
- Transform hooks are called for each yielded item
- Cleanup hooks are called after the last item
- The stream collects all updates internally for later finalization
- Does not run the finalizer automatically
**Pattern 2: Direct Finalization**
```python
final = await response_stream.get_final_response()
```
- If the stream hasn't been iterated, it auto-iterates (consuming all updates)
- The finalizer converts collected updates to a final response
- Result hooks transform the response
- You get the complete response without ever seeing individual updates
** Pattern 3: Combined Usage **
When you first iterate the stream and then call `get_final_response()`, the following occurs:
- Iteration yields updates with transform hooks applied
- Cleanup hooks run after iteration completes
- Calling `get_final_response()` uses the already collected updates to produce the final response
- Note that it does not re-iterate the stream since it's already been consumed
```python
async for update in response_stream:
print(update.text) # See each update
final = await response_stream.get_final_response() # Get the aggregated result
```
=== Chaining with .map() and .with_finalizer() ===
When building a Agent on top of a ChatClient, we face a challenge:
- The ChatClient returns a ResponseStream[ChatResponseUpdate, ChatResponse]
- The Agent needs to return a ResponseStream[AgentResponseUpdate, AgentResponse]
- We can't iterate the ChatClient's stream twice!
The `.map()` and `.with_finalizer()` methods solve this by creating new ResponseStreams that:
- Delegate iteration to the inner stream (only consuming it once)
- Maintain their OWN separate transform hooks, result hooks, and cleanup hooks
- Allow type-safe transformation of updates and final responses
**`.map(transform)`**: Creates a new stream that transforms each update.
- Returns a new ResponseStream with the transformed update type
- Falls back to the inner stream's finalizer if no new finalizer is set
**`.with_finalizer(finalizer)`**: Creates a new stream with a different finalizer.
- Returns a new ResponseStream with the new final type
- The inner stream's finalizer and result_hooks ARE still called (see below)
**IMPORTANT**: When chaining these methods via `get_final_response()`:
1. The inner stream's finalizer runs first (on the original updates)
2. The inner stream's result_hooks run (on the inner final result)
3. The outer stream's finalizer runs (on the transformed updates)
4. The outer stream's result_hooks run (on the outer final result)
This ensures that post-processing hooks registered on the inner stream (e.g., context
provider notifications, telemetry, thread updates) are still executed even when the
stream is wrapped/mapped.
```python
# Agent does something like this internally:
chat_stream = client.get_response(messages, stream=True)
agent_stream = (
chat_stream
.map(_to_agent_update, _to_agent_response)
.with_result_hook(_notify_thread) # Outer hook runs AFTER inner hooks
)
```
This ensures:
- The underlying ChatClient stream is only consumed once
- The agent can add its own transform hooks, result hooks, and cleanup logic
- Each layer (ChatClient, Agent, middleware) can add independent behavior
- Inner stream post-processing (like context provider notification) still runs
- Types flow naturally through the chain
"""
async def main() -> None:
"""Demonstrate the various ResponseStream patterns and capabilities."""
# =========================================================================
# Example 1: Basic ResponseStream with iteration
# =========================================================================
print("=== Example 1: Basic Iteration ===\n")
async def generate_updates() -> AsyncIterable[ChatResponseUpdate]:
"""Simulate a streaming response from an AI model."""
words = ["Hello", " ", "from", " ", "the", " ", "streaming", " ", "response", "!"]
for word in words:
await asyncio.sleep(0.05) # Simulate network delay
yield ChatResponseUpdate(contents=[Content.from_text(word)], role="assistant")
def combine_updates(updates: Sequence[ChatResponseUpdate]) -> ChatResponse:
"""Finalizer that combines all updates into a single response."""
return ChatResponse.from_updates(updates)
stream = ResponseStream(generate_updates(), finalizer=combine_updates)
print("Iterating through updates:")
async for update in stream:
print(f" Update: '{update.text}'")
# After iteration, we can still get the final response
final = await stream.get_final_response()
print(f"\nFinal response: '{final.text}'")
# =========================================================================
# Example 2: Using get_final_response() without iteration
# =========================================================================
print("\n=== Example 2: Direct Finalization (No Iteration) ===\n")
# Create a fresh stream (streams can only be consumed once)
stream2 = ResponseStream(generate_updates(), finalizer=combine_updates)
# Skip iteration entirely - get_final_response() auto-consumes the stream
final2 = await stream2.get_final_response()
print(f"Got final response directly: '{final2.text}'")
print(f"Number of updates collected internally: {len(stream2.updates)}")
# =========================================================================
# Example 3: Transform hooks - transform updates during iteration
# =========================================================================
print("\n=== Example 3: Transform Hooks ===\n")
update_count = {"value": 0}
def counting_hook(update: ChatResponseUpdate) -> ChatResponseUpdate:
"""Hook that counts and annotates each update."""
update_count["value"] += 1
# Return the update (or a modified version)
return update
def uppercase_hook(update: ChatResponseUpdate) -> ChatResponseUpdate:
"""Hook that converts text to uppercase."""
if update.text:
return ChatResponseUpdate(
contents=[Content.from_text(update.text.upper())], role=update.role, response_id=update.response_id
)
return update
# Pass transform_hooks directly to constructor
stream3 = ResponseStream(
generate_updates(),
finalizer=combine_updates,
transform_hooks=[counting_hook, uppercase_hook], # First counts, then uppercases
)
print("Iterating with hooks applied:")
async for update in stream3:
print(f" Received: '{update.text}'") # Will be uppercase
print(f"\nTotal updates processed: {update_count['value']}")
# =========================================================================
# Example 4: Cleanup hooks - cleanup after stream consumption
# =========================================================================
print("\n=== Example 4: Cleanup Hooks ===\n")
cleanup_performed = {"value": False}
async def cleanup_hook() -> None:
"""Cleanup hook for releasing resources after stream consumption."""
print(" [Cleanup] Cleaning up resources...")
cleanup_performed["value"] = True
# Pass cleanup_hooks directly to constructor
stream4 = ResponseStream(
generate_updates(),
finalizer=combine_updates,
cleanup_hooks=[cleanup_hook],
)
print("Starting iteration (cleanup happens after):")
async for _update in stream4:
pass # Just consume the stream
print(f"Cleanup was performed: {cleanup_performed['value']}")
# =========================================================================
# Example 5: Result hooks - transform the final response
# =========================================================================
print("\n=== Example 5: Result Hooks ===\n")
def add_metadata_hook(response: ChatResponse) -> ChatResponse:
"""Result hook that adds metadata to the response."""
response.additional_properties["processed"] = True
response.additional_properties["word_count"] = len((response.text or "").split())
return response
def wrap_in_quotes_hook(response: ChatResponse) -> ChatResponse:
"""Result hook that wraps the response text in quotes."""
if response.text:
return ChatResponse(
messages=[Message(text=f'"{response.text}"', role="assistant")],
additional_properties=response.additional_properties,
)
return response
# Finalizer converts updates to response, then result hooks transform it
stream5 = ResponseStream(
generate_updates(),
finalizer=combine_updates,
result_hooks=[add_metadata_hook, wrap_in_quotes_hook], # First adds metadata, then wraps in quotes
)
final5 = await stream5.get_final_response()
print(f"Final text: {final5.text}")
print(f"Metadata: {final5.additional_properties}")
# =========================================================================
# Example 6: The wrap() API - layering without double-consumption
# =========================================================================
print("\n=== Example 6: wrap() API for Layering ===\n")
# Simulate what ChatClient returns
inner_stream = ResponseStream(generate_updates(), finalizer=combine_updates)
# Simulate what Agent does: wrap the inner stream
def to_agent_format(update: ChatResponseUpdate) -> ChatResponseUpdate:
"""Map ChatResponseUpdate to agent format (simulated transformation)."""
# In real code, this would convert to AgentResponseUpdate
return ChatResponseUpdate(
contents=[Content.from_text(f"[AGENT] {update.text}")], role=update.role, response_id=update.response_id
)
def to_agent_response(updates: Sequence[ChatResponseUpdate]) -> ChatResponse:
"""Finalizer that converts updates to agent response (simulated)."""
# In real code, this would create an AgentResponse
text = "".join(u.text or "" for u in updates)
return ChatResponse(
messages=[Message(text=f"[AGENT FINAL] {text}", role="assistant")],
additional_properties={"layer": "agent"},
)
# .map() creates a new stream that:
# 1. Delegates iteration to inner_stream (only consuming it once)
# 2. Transforms each update via the transform function
# 3. Uses the provided finalizer (required since update type may change)
outer_stream = inner_stream.map(to_agent_format, to_agent_response)
print("Iterating the mapped stream:")
async for update in outer_stream:
print(f" {update.text}")
final_outer = await outer_stream.get_final_response()
print(f"\nMapped final: {final_outer.text}")
print(f"Mapped metadata: {final_outer.additional_properties}")
# Important: the inner stream was only consumed once!
print(f"Inner stream consumed: {inner_stream._consumed}")
# =========================================================================
# Example 7: Combining all patterns
# =========================================================================
print("\n=== Example 7: Full Integration ===\n")
stats = {"updates": 0, "characters": 0}
def track_stats(update: ChatResponseUpdate) -> ChatResponseUpdate:
"""Track statistics as updates flow through."""
stats["updates"] += 1
stats["characters"] += len(update.text or "")
return update
def log_cleanup() -> None:
"""Log when stream consumption completes."""
print(f" [Cleanup] Stream complete: {stats['updates']} updates, {stats['characters']} chars")
def add_stats_to_response(response: ChatResponse) -> ChatResponse:
"""Result hook to include the statistics in the final response."""
response.additional_properties["stats"] = stats.copy()
return response
# All hooks can be passed via constructor
full_stream = ResponseStream(
generate_updates(),
finalizer=combine_updates,
transform_hooks=[track_stats],
result_hooks=[add_stats_to_response],
cleanup_hooks=[log_cleanup],
)
print("Processing with all hooks active:")
async for update in full_stream:
print(f" -> '{update.text}'")
final_full = await full_stream.get_final_response()
print(f"\nFinal: '{final_full.text}'")
print(f"Stats: {final_full.additional_properties['stats']}")
if __name__ == "__main__":
asyncio.run(main())