-
Notifications
You must be signed in to change notification settings - Fork 118
Expand file tree
/
Copy pathagent.py
More file actions
103 lines (89 loc) · 3.81 KB
/
agent.py
File metadata and controls
103 lines (89 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
"""
---
title: Vision-Enabled Agent
category: complex-agents
tags: [video_stream, grok_vision, x_ai, frame_capture, image_content]
difficulty: intermediate
description: Agent with camera vision capabilities using Grok-2 Vision model
demonstrates:
- Video stream processing from remote participants
- Frame buffering from video tracks
- X.AI Grok-2 Vision model integration
- Dynamic video track subscription
- Image content injection into chat context
- Track publication event handling
---
"""
import asyncio
import logging
from pathlib import Path
from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import JobContext, WorkerOptions, cli, get_job_context
from livekit.agents.llm import function_tool, ImageContent, ChatContext, ChatMessage
from livekit.agents.voice import Agent, AgentSession, RunContext
from livekit.plugins import deepgram, openai, silero, rime
logger = logging.getLogger("vision-agent")
logger.setLevel(logging.INFO)
load_dotenv(dotenv_path=Path(__file__).parent.parent / '.env')
class VisionAgent(Agent):
def __init__(self) -> None:
self._latest_frame = None
self._video_stream = None
self._tasks = []
super().__init__(
instructions="""
You are an assistant communicating through voice with vision capabilities.
You can see what the user is showing you through their camera.
Don't use any unpronouncable characters.
""",
stt=deepgram.STT(),
llm=openai.LLM.with_x_ai(model="grok-2-vision", tool_choice=None),
tts=rime.TTS(),
vad=silero.VAD.load()
)
async def on_enter(self):
room = get_job_context().room
# Find the first video track (if any) from the remote participant
if room.remote_participants:
remote_participant = list(room.remote_participants.values())[0]
video_tracks = [
publication.track
for publication in list(remote_participant.track_publications.values())
if publication.track and publication.track.kind == rtc.TrackKind.KIND_VIDEO
]
if video_tracks:
self._create_video_stream(video_tracks[0])
# Watch for new video tracks not yet published
@room.on("track_subscribed")
def on_track_subscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant):
if track.kind == rtc.TrackKind.KIND_VIDEO:
self._create_video_stream(track)
async def on_user_turn_completed(self, turn_ctx: ChatContext, new_message: ChatMessage) -> None:
# Add the latest video frame, if any, to the new message
if self._latest_frame:
new_message.content.append(ImageContent(image=self._latest_frame))
self._latest_frame = None
# Helper method to buffer the latest video frame from the user's track
def _create_video_stream(self, track: rtc.Track):
# Close any existing stream (we only want one at a time)
if self._video_stream is not None:
self._video_stream.close()
# Create a new stream to receive frames
self._video_stream = rtc.VideoStream(track)
async def read_stream():
async for event in self._video_stream:
# Store the latest frame for use later
self._latest_frame = event.frame
# Store the async task
task = asyncio.create_task(read_stream())
task.add_done_callback(lambda t: self._tasks.remove(t))
self._tasks.append(task)
async def entrypoint(ctx: JobContext):
session = AgentSession()
await session.start(
agent=VisionAgent(),
room=ctx.room
)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))