-
Notifications
You must be signed in to change notification settings - Fork 113
Expand file tree
/
Copy pathfull_duplex.py
More file actions
142 lines (117 loc) · 4.36 KB
/
full_duplex.py
File metadata and controls
142 lines (117 loc) · 4.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import os
import asyncio
import logging
import threading
import queue
from dotenv import load_dotenv, find_dotenv
from livekit import api, rtc
from db_meter import calculate_db_level, display_single_db_meter
async def main() -> None:
logging.basicConfig(level=logging.INFO)
# Load environment variables from a .env file if present
load_dotenv(find_dotenv())
url = os.getenv("LIVEKIT_URL")
api_key = os.getenv("LIVEKIT_API_KEY")
api_secret = os.getenv("LIVEKIT_API_SECRET")
token = os.getenv("LIVEKIT_TOKEN")
# check for either token or api_key and api_secret
if not url or (not token and (not api_key or not api_secret)):
raise RuntimeError(
"LIVEKIT_TOKEN or LIVEKIT_API_KEY and LIVEKIT_API_SECRET must be set in env"
)
room = rtc.Room()
devices = rtc.MediaDevices()
# Open microphone & speaker
mic = devices.open_input()
player = devices.open_output()
# dB level monitoring (mic only)
mic_db_queue = queue.Queue()
def on_track_subscribed(
track: rtc.Track,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant,
):
if track.kind == rtc.TrackKind.KIND_AUDIO:
asyncio.create_task(player.add_track(track))
logging.info("subscribed to audio from %s", participant.identity)
room.on("track_subscribed", on_track_subscribed)
def on_track_unsubscribed(
track: rtc.Track,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant,
):
asyncio.create_task(player.remove_track(track))
logging.info("unsubscribed from audio of %s", participant.identity)
room.on("track_unsubscribed", on_track_unsubscribed)
# generate token if not provided
if not token:
token = (
api.AccessToken(api_key, api_secret)
.with_identity("local-audio")
.with_name("Local Audio")
.with_grants(
api.VideoGrants(
room_join=True,
room="local-audio",
)
)
.to_jwt()
)
try:
await room.connect(url, token)
logging.info("connected to room %s", room.name)
# Publish microphone
track = rtc.LocalAudioTrack.create_audio_track("mic", mic.source)
pub_opts = rtc.TrackPublishOptions()
pub_opts.source = rtc.TrackSource.SOURCE_MICROPHONE
await room.local_participant.publish_track(track, pub_opts)
logging.info("published local microphone")
# Start dB meter display in a separate thread
meter_thread = threading.Thread(
target=display_single_db_meter,
args=(mic_db_queue,),
kwargs={"label": "Mic Level: "},
daemon=True,
)
meter_thread.start()
# Start playing mixed remote audio (tracks added via event handlers)
await player.start()
# Monitor microphone dB levels
async def monitor_mic_db():
mic_stream = rtc.AudioStream(track, sample_rate=48000, num_channels=1)
frame_count = 0
sample_interval = 5 # Process every 5th frame to reduce load
try:
async for frame_event in mic_stream:
# Skip frames to reduce processing load
frame_count += 1
if frame_count % sample_interval != 0:
continue
frame = frame_event.frame
# Convert frame data to list of samples
samples = list(frame.data)
db_level = calculate_db_level(samples)
# Update queue with latest value (non-blocking)
try:
mic_db_queue.put_nowait(db_level)
except queue.Full:
pass # Drop if queue is full
except Exception:
pass
finally:
await mic_stream.aclose()
asyncio.create_task(monitor_mic_db())
# Run until Ctrl+C
while True:
await asyncio.sleep(1)
except KeyboardInterrupt:
pass
finally:
await mic.aclose()
await player.aclose()
try:
await room.disconnect()
except Exception:
pass
if __name__ == "__main__":
asyncio.run(main())