forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkflow_observability.py
More file actions
114 lines (86 loc) · 4.13 KB
/
workflow_observability.py
File metadata and controls
114 lines (86 loc) · 4.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
handler,
)
from agent_framework.observability import configure_otel_providers, get_tracer
from opentelemetry.trace import SpanKind
from opentelemetry.trace.span import format_trace_id
from typing_extensions import Never
"""
This sample shows the telemetry collected when running a Agent Framework workflow.
This simple workflow consists of two executors arranged sequentially:
1. An executor that converts input text to uppercase.
2. An executor that reverses the uppercase text.
The workflow receives an initial string message, processes it through the two executors,
and yields the final result.
Telemetry data that the workflow system emits includes:
- Overall workflow build & execution spans
- workflow.build (events: build.started, build.validation_completed, build.completed, edge_group.process)
- workflow.run (events: workflow.started, workflow.completed or workflow.error)
- Individual executor processing spans
- executor.process (for each executor invocation)
- Message publishing between executors
- message.send (for each outbound message)
Prerequisites:
- Basic understanding of workflow executors, edges, and messages.
- Basic understanding of OpenTelemetry concepts like spans and traces.
"""
# Executors for sequential workflow
class UpperCaseExecutor(Executor):
"""An executor that converts text to uppercase."""
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Execute the task by converting the input string to uppercase."""
print(f"UpperCaseExecutor: Processing '{text}'")
result = text.upper()
print(f"UpperCaseExecutor: Result '{result}'")
# Send the result to the next executor in the workflow.
await ctx.send_message(result)
class ReverseTextExecutor(Executor):
"""An executor that reverses text."""
@handler
async def reverse_text(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
"""Execute the task by reversing the input string."""
print(f"ReverseTextExecutor: Processing '{text}'")
result = text[::-1]
print(f"ReverseTextExecutor: Result '{result}'")
# Yield the output.
await ctx.yield_output(result)
async def run_sequential_workflow() -> None:
"""Run a simple sequential workflow demonstrating telemetry collection.
This workflow processes a string through two executors in sequence:
1. UpperCaseExecutor converts the input to uppercase
2. ReverseTextExecutor reverses the string and completes the workflow
"""
# Step 1: Create the executors.
upper_case_executor = UpperCaseExecutor(id="upper_case_executor")
reverse_text_executor = ReverseTextExecutor(id="reverse_text_executor")
# Step 2: Build the workflow with the defined edges.
workflow = (
WorkflowBuilder(start_executor=upper_case_executor).add_edge(upper_case_executor, reverse_text_executor).build()
)
# Step 3: Run the workflow with an initial message.
input_text = "hello world"
print(f"Starting workflow with input: '{input_text}'")
output_event = None
async for event in workflow.run("Hello world", stream=True):
if event.type == "output":
# The WorkflowOutputEvent contains the final result.
output_event = event
if output_event:
print(f"Workflow completed with result: '{output_event.data}'")
async def main():
"""Run the telemetry sample with a simple sequential workflow."""
# This will enable tracing and create the necessary tracing, logging and metrics providers
# based on environment variables. See the .env.example file for the available configuration options.
configure_otel_providers()
with get_tracer().start_as_current_span("Sequential Workflow Scenario", kind=SpanKind.CLIENT) as current_span:
print(f"Trace ID: {format_trace_id(current_span.get_span_context().trace_id)}")
# Run the sequential workflow scenario
await run_sequential_workflow()
if __name__ == "__main__":
asyncio.run(main())