-
Notifications
You must be signed in to change notification settings - Fork 110
Expand file tree
/
Copy pathworkflow.py
More file actions
73 lines (54 loc) · 2.13 KB
/
Copy pathworkflow.py
File metadata and controls
73 lines (54 loc) · 2.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
"""Two-hook audit example.
``AuditHook`` subscribes two callbacks to ``AfterToolCallEvent``:
* An in-workflow callback that mutates per-workflow state. It runs in workflow
context, so it must be deterministic — pure data manipulation only.
* An ``activity_as_hook`` callback that dispatches to a Temporal activity. Use
this for anything with I/O: writing to an audit log, calling out to a
metrics system, alerting, etc.
"""
from datetime import timedelta
from strands import tool
from strands.hooks import HookProvider, HookRegistry
from strands.hooks.events import AfterToolCallEvent
from temporalio import activity, workflow
from temporalio.contrib.strands import TemporalAgent
from temporalio.contrib.strands.workflow import activity_as_hook
# @@@SNIPSTART python-strands-hooks-activity
@activity.defn
async def persist_tool_call(tool_name: str) -> None:
# In production, write to a database / S3 / your audit pipeline.
activity.logger.info(f"audit: tool {tool_name} completed")
# @@@SNIPEND
@tool
def echo(text: str) -> str:
return text
# @@@SNIPSTART python-strands-hooks-provider
class AuditHook(HookProvider):
def __init__(self) -> None:
self.fired: list[str] = []
def register_hooks(self, registry: HookRegistry, **kwargs: object) -> None:
registry.add_callback(AfterToolCallEvent, self._record)
registry.add_callback(
AfterToolCallEvent,
activity_as_hook(
persist_tool_call,
activity_input=lambda event: event.tool_use["name"],
start_to_close_timeout=timedelta(seconds=15),
),
)
def _record(self, event: AfterToolCallEvent) -> None:
self.fired.append(event.tool_use["name"])
# @@@SNIPEND
@workflow.defn
class HooksWorkflow:
def __init__(self) -> None:
self.hook = AuditHook()
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
tools=[echo],
hooks=[self.hook],
)
@workflow.run
async def run(self, prompt: str) -> list[str]:
await self.agent.invoke_async(prompt)
return self.hook.fired