Skip to content

Commit df6595e

Browse files
committed
logs for streaming
1 parent 1e2b350 commit df6595e

3 files changed

Lines changed: 121 additions & 10 deletions

File tree

backend/robot-gateway/capture_relay.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ class RelaySession:
159159
last_audio_chunk_at: datetime | None = None
160160
last_upstream_activity_at: datetime | None = None
161161
latest_video_frame: bytes | None = None
162+
video_frame_count: int = 0
163+
audio_chunk_count: int = 0
164+
audio_bytes_received: int = 0
162165
audio_buffer: deque[bytes] = field(
163166
default_factory=lambda: deque(maxlen=CAPTURE_RELAY_AUDIO_BUFFER_CHUNKS)
164167
)
@@ -310,6 +313,7 @@ async def handle_upstream_json(self, session_id: str, message: dict[str, Any]) -
310313
session.video_meta = message
311314
elif message_type == "audio_meta":
312315
session.audio_meta = message
316+
logger.info("[capture_relay] Audio metadata session=%s meta=%s", session_id, message)
313317
elif message_type == "goodbye":
314318
session.last_error = str(message.get("reason") or "upstream_goodbye")
315319

@@ -329,11 +333,23 @@ async def handle_upstream_binary(self, session_id: str, payload: bytes) -> None:
329333
if track_type == VIDEO_TRACK:
330334
session.latest_video_frame = chunk
331335
session.last_video_frame_at = utc_now()
336+
session.video_frame_count += 1
332337
viewers = list(session.video_viewers.values())
333338
elif track_type == AUDIO_TRACK:
334339
session.audio_buffer.append(chunk)
335340
session.last_audio_chunk_at = utc_now()
341+
session.audio_chunk_count += 1
342+
session.audio_bytes_received += len(chunk)
336343
viewers = list(session.audio_viewers.values())
344+
if session.audio_chunk_count == 1 or session.audio_chunk_count % 100 == 0:
345+
logger.info(
346+
"[capture_relay] Audio chunk received session=%s chunks=%d bytes=%d last_bytes=%d viewers=%d",
347+
session_id,
348+
session.audio_chunk_count,
349+
session.audio_bytes_received,
350+
len(chunk),
351+
len(viewers),
352+
)
337353
else:
338354
logger.warning("[capture_relay] Unknown track=%s session=%s", track_type, session_id)
339355
return
@@ -370,9 +386,12 @@ async def register_audio_viewer(self, session_id: str) -> tuple[RelaySession, Re
370386
session.audio_viewers[viewer.viewer_id] = viewer
371387
backlog = list(session.audio_buffer)
372388
logger.info(
373-
"[capture_relay] Audio viewer joined session=%s viewers=%d",
389+
"[capture_relay] Audio viewer joined session=%s viewers=%d backlog=%d chunks_received=%d last_audio=%s",
374390
session_id,
375391
session.viewer_count,
392+
len(backlog),
393+
session.audio_chunk_count,
394+
session.last_audio_chunk_at.isoformat() if session.last_audio_chunk_at else None,
376395
)
377396
return session, viewer, backlog
378397

backend/robot-gateway/routes/capture.py

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,19 +103,39 @@ async def frame_stream():
103103
@router.websocket("/api/capture/streams/{session_id}/audio.pcm")
104104
async def stream_audio_pcm(websocket: WebSocket, session_id: str):
105105
viewer_token = websocket.query_params.get("token", "")
106+
logger.info(
107+
"[capture_relay] Audio viewer upgrade session=%s client=%s has_token=%s header_keys=%s",
108+
session_id,
109+
websocket.client,
110+
"yes" if viewer_token else "no",
111+
sorted(websocket.headers.keys()),
112+
)
106113
if viewer_token:
107114
try:
108115
claims = validate_relay_token(viewer_token)
109-
except HTTPException:
116+
except HTTPException as exc:
117+
logger.warning(
118+
"[capture_relay] Audio viewer token rejected session=%s status=%s detail=%s",
119+
session_id,
120+
exc.status_code,
121+
exc.detail,
122+
)
110123
await websocket.close(code=4401)
111124
return
112125
if (
113126
claims.get("session_id") != session_id
114127
or claims.get("tracks") != ["audio"]
115128
):
129+
logger.warning(
130+
"[capture_relay] Audio viewer token scope rejected session=%s claims_session=%s tracks=%s",
131+
session_id,
132+
claims.get("session_id"),
133+
claims.get("tracks"),
134+
)
116135
await websocket.close(code=4403)
117136
return
118137
else:
138+
logger.warning("[capture_relay] Audio viewer rejected session=%s reason=missing_token", session_id)
119139
await websocket.close(code=4401)
120140
return
121141

@@ -124,18 +144,55 @@ async def stream_audio_pcm(websocket: WebSocket, session_id: str):
124144
session, viewer, backlog = await relay.register_audio_viewer(session_id)
125145
if not session.audio_enabled:
126146
await relay.unregister_viewer(session_id, viewer, mqtt)
147+
logger.warning("[capture_relay] Audio viewer rejected session=%s reason=audio_disabled", session_id)
127148
await websocket.close(code=4400, reason="Audio disabled")
128149
return
129150

130151
await websocket.accept()
152+
logger.info(
153+
"[capture_relay] Audio viewer accepted session=%s viewer=%s backlog=%d",
154+
session_id,
155+
viewer.viewer_id,
156+
len(backlog),
157+
)
158+
chunks_sent = 0
159+
bytes_sent = 0
131160
try:
132161
for chunk in backlog:
133162
await websocket.send_bytes(chunk)
163+
chunks_sent += 1
164+
bytes_sent += len(chunk)
134165
while True:
135166
chunk = await viewer.queue.get()
136167
await websocket.send_bytes(chunk)
168+
chunks_sent += 1
169+
bytes_sent += len(chunk)
170+
if chunks_sent == 1 or chunks_sent % 100 == 0:
171+
logger.info(
172+
"[capture_relay] Audio viewer send session=%s viewer=%s chunks=%d bytes=%d last_bytes=%d",
173+
session_id,
174+
viewer.viewer_id,
175+
chunks_sent,
176+
bytes_sent,
177+
len(chunk),
178+
)
137179
except WebSocketDisconnect:
138-
pass
180+
logger.info(
181+
"[capture_relay] Audio viewer disconnected session=%s viewer=%s chunks=%d bytes=%d",
182+
session_id,
183+
viewer.viewer_id,
184+
chunks_sent,
185+
bytes_sent,
186+
)
187+
except Exception as exc:
188+
logger.warning(
189+
"[capture_relay] Audio viewer send failed session=%s viewer=%s chunks=%d bytes=%d error=%s",
190+
session_id,
191+
viewer.viewer_id,
192+
chunks_sent,
193+
bytes_sent,
194+
exc,
195+
)
139196
finally:
140197
await relay.unregister_viewer(session_id, viewer, mqtt)
141198

frontend/web/src/app/robots/stream-test/page.tsx

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,11 @@ export default function RobotStreamTestPage() {
116116
const interval = window.setInterval(async () => {
117117
try {
118118
const next = await gw<CaptureStream>(statusPath);
119-
setSession(next);
119+
setSession((current) =>
120+
current?.session_id === next.session_id
121+
? { ...next, viewer_paths: current.viewer_paths }
122+
: next
123+
);
120124
} catch {
121125
setSession(null);
122126
}
@@ -125,9 +129,7 @@ export default function RobotStreamTestPage() {
125129
return () => window.clearInterval(interval);
126130
}, [sessionId, statusPath]);
127131

128-
const mjpgUrl = session
129-
? `${GW_BASE}${session.viewer_paths.camera_mjpg}?t=${encodeURIComponent(session.last_video_frame_at ?? session.created_at)}`
130-
: null;
132+
const mjpgUrl = session ? `${GW_BASE}${session.viewer_paths.camera_mjpg}` : null;
131133

132134
useEffect(() => {
133135
if (!session?.audio_enabled || !session.viewer_paths.audio_pcm_ws_public) {
@@ -152,23 +154,55 @@ export default function RobotStreamTestPage() {
152154
socket.binaryType = "arraybuffer";
153155
audioSocketRef.current = socket;
154156
setAudioStatus("connecting");
157+
console.info("[stream-test] audio websocket connecting", {
158+
session_id: session.session_id,
159+
url: session.viewer_paths.audio_pcm_ws_public,
160+
});
155161

156162
const sampleRate = Number(session.audio_meta?.sample_rate_hz ?? 16000) || 16000;
157163
const channelCount = Number(session.audio_meta?.channels ?? 1) || 1;
164+
let receivedChunks = 0;
165+
let receivedBytes = 0;
158166

159-
socket.onopen = () => setAudioStatus("live");
160-
socket.onclose = () => {
167+
socket.onopen = () => {
168+
console.info("[stream-test] audio websocket open", { session_id: session.session_id });
169+
setAudioStatus("live");
170+
};
171+
socket.onclose = (event) => {
172+
console.info("[stream-test] audio websocket closed", {
173+
session_id: session.session_id,
174+
code: event.code,
175+
reason: event.reason,
176+
wasClean: event.wasClean,
177+
receivedChunks,
178+
receivedBytes,
179+
});
161180
if (audioSocketRef.current === socket) {
162181
audioSocketRef.current = null;
163182
setAudioStatus("disconnected");
164183
}
165184
};
166-
socket.onerror = () => setAudioStatus("error");
185+
socket.onerror = (event) => {
186+
console.warn("[stream-test] audio websocket error", { session_id: session.session_id, event });
187+
setAudioStatus("error");
188+
};
167189
socket.onmessage = (event) => {
168190
if (!(event.data instanceof ArrayBuffer)) return;
169191

170192
const pcm = new Int16Array(event.data);
171193
if (!pcm.length) return;
194+
receivedChunks += 1;
195+
receivedBytes += event.data.byteLength;
196+
if (receivedChunks === 1 || receivedChunks % 100 === 0) {
197+
console.info("[stream-test] audio chunk received", {
198+
session_id: session.session_id,
199+
chunks: receivedChunks,
200+
bytes: receivedBytes,
201+
lastBytes: event.data.byteLength,
202+
sampleRate,
203+
channelCount,
204+
});
205+
}
172206

173207
const frameCount = Math.floor(pcm.length / channelCount);
174208
const audioBuffer = context.createBuffer(channelCount, frameCount, sampleRate);
@@ -196,6 +230,7 @@ export default function RobotStreamTestPage() {
196230
};
197231
}, [
198232
session?.audio_enabled,
233+
session?.session_id,
199234
session?.viewer_paths.audio_pcm_ws_public,
200235
session?.audio_meta?.sample_rate_hz,
201236
session?.audio_meta?.channels,

0 commit comments

Comments
 (0)