Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 16 additions & 20 deletions src/google/adk/agents/live_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,26 @@


class LiveRequest(BaseModel):
"""Request send to live agents."""
"""Request send to live agents.

When multiple fields are set, they are processed by priority (highest first):
activity_start > activity_end > audio_stream_end > blob > content.
"""

model_config = ConfigDict(ser_json_bytes='base64', val_json_bytes='base64')
"""The pydantic model config."""

content: Optional[types.Content] = None
"""If set, send the content to the model in turn-by-turn mode.

When multiple fields are set, they are processed by priority (highest first):
activity_start > activity_end > blob > content.
"""
"""If set, send the content to the model in turn-by-turn mode."""
blob: Optional[types.Blob] = None
"""If set, send the blob to the model in realtime mode.

When multiple fields are set, they are processed by priority (highest first):
activity_start > activity_end > blob > content.
"""
"""If set, send the blob to the model in realtime mode."""
activity_start: Optional[types.ActivityStart] = None
"""If set, signal the start of user activity to the model.

When multiple fields are set, they are processed by priority (highest first):
activity_start > activity_end > blob > content.
"""
"""If set, signal the start of user activity to the model."""
activity_end: Optional[types.ActivityEnd] = None
"""If set, signal the end of user activity to the model.

When multiple fields are set, they are processed by priority (highest first):
activity_start > activity_end > blob > content.
"""If set, signal the end of user activity to the model."""
audio_stream_end: bool = False
"""If set, signal the end of the audio stream to the model.
This is only used when Voice Activity Detection is enabled.
"""
close: bool = False
"""If set, close the queue. queue.shutdown() is only supported in Python 3.13+."""
Expand Down Expand Up @@ -80,6 +72,10 @@ def send_activity_end(self):
"""Sends an activity end signal to mark the end of user input."""
self._queue.put_nowait(LiveRequest(activity_end=types.ActivityEnd()))

def send_audio_stream_end(self) -> None:
"""Sends an audio stream end signal to force flush audio."""
self._queue.put_nowait(LiveRequest(audio_stream_end=True))

def send(self, req: LiveRequest):
self._queue.put_nowait(req)

Expand Down
4 changes: 4 additions & 0 deletions src/google/adk/flows/llm_flows/base_llm_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,10 @@ async def _send_to_model(
await llm_connection.send_realtime(types.ActivityStart())
elif live_request.activity_end:
await llm_connection.send_realtime(types.ActivityEnd())
elif live_request.audio_stream_end:
await llm_connection.send_realtime(
types.LiveClientRealtimeInput(audio_stream_end=True)
)
elif live_request.blob:
# Cache input audio chunks before flushing
self.audio_cache_manager.cache_audio(
Expand Down
14 changes: 13 additions & 1 deletion src/google/adk/models/gemini_llm_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@

logger = logging.getLogger('google_adk.' + __name__)

RealtimeInput = Union[types.Blob, types.ActivityStart, types.ActivityEnd]
RealtimeInput = Union[
types.Blob,
types.ActivityStart,
types.ActivityEnd,
types.LiveClientRealtimeInput,
]
from typing import TYPE_CHECKING

if TYPE_CHECKING:
Expand Down Expand Up @@ -136,6 +141,13 @@ async def send_realtime(self, input: RealtimeInput):
elif isinstance(input, types.ActivityEnd):
logger.debug('Sending LLM activity end signal.')
await self._gemini_session.send_realtime_input(activity_end=input)

elif isinstance(input, types.LiveClientRealtimeInput):
if input.audio_stream_end:
logger.debug('Sending LLM audio stream end signal.')
await self._gemini_session.send_realtime_input(audio_stream_end=True)
else:
logger.warning('Unary LiveClientRealtimeInput not fully supported yet.')
else:
raise ValueError('Unsupported input type: %s' % type(input))

Expand Down
33 changes: 33 additions & 0 deletions tests/unittests/models/test_gemini_llm_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,39 @@ async def test_send_realtime_default_behavior(
mock_gemini_session.send.assert_not_called()


@pytest.mark.asyncio
async def test_send_realtime_audiostreamend(
gemini_connection, mock_gemini_session
):
"""Test send_realtime with LiveClientRealtimeInput(audio_stream_end=True)."""
input_signal = types.LiveClientRealtimeInput(audio_stream_end=True)
await gemini_connection.send_realtime(input_signal)

# Should call send_realtime_input with audio_stream_end=True
mock_gemini_session.send_realtime_input.assert_called_once_with(
audio_stream_end=True
)
# Should not call .send function
mock_gemini_session.send.assert_not_called()


Copy link
Collaborator

@ryanaiagent ryanaiagent Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a test to cover the send_realtime with unsupported LiveClientRealtimeInput where it logs a warning.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryanaiagent added the new test case, thank you!

@pytest.mark.asyncio
async def test_send_realtime_unsupported_liveClientRealtimeInput(
gemini_connection, mock_gemini_session, caplog
):
"""Test send_realtime with unsupported LiveClientRealtimeInput."""
input_signal = types.LiveClientRealtimeInput()

with caplog.at_level('WARNING'):
await gemini_connection.send_realtime(input_signal)

# Should log a warning
assert 'Unary LiveClientRealtimeInput not fully supported yet.' in caplog.text
# Should not call send_realtime_input or send
mock_gemini_session.send_realtime_input.assert_not_called()
mock_gemini_session.send.assert_not_called()


@pytest.mark.asyncio
async def test_send_history(gemini_connection, mock_gemini_session):
"""Test send_history method."""
Expand Down