Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/uipath-pydantic-ai/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-pydantic-ai"
version = "0.0.2"
version = "0.0.3"
description = "Python SDK that enables developers to build and deploy PydanticAI agents to the UiPath Cloud Platform"
readme = "README.md"
requires-python = ">=3.11"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
flowchart TB
__start__(__start__)
weather_agent(weather_agent)
weather_agent_tools(tools)
__end__(__end__)
weather_agent --> weather_agent_tools
weather_agent_tools --> weather_agent
__start__ --> |input|weather_agent
weather_agent --> |output|__end__
Original file line number Diff line number Diff line change
@@ -1,11 +1,24 @@
"""Runtime class for executing PydanticAI Agents within the UiPath framework."""

import json
from datetime import datetime, timezone
from typing import Any, AsyncGenerator
from uuid import uuid4

from pydantic import BaseModel
from pydantic_ai import Agent, FunctionToolset
from pydantic_ai.messages import ToolReturnPart
from uipath.core.chat.content import (
UiPathConversationContentPartChunkEvent,
UiPathConversationContentPartEndEvent,
UiPathConversationContentPartEvent,
UiPathConversationContentPartStartEvent,
)
from uipath.core.chat.message import (
UiPathConversationMessageEndEvent,
UiPathConversationMessageEvent,
UiPathConversationMessageStartEvent,
)
from uipath.core.serialization import serialize_json
from uipath.runtime import (
UiPathExecuteOptions,
Expand Down Expand Up @@ -88,18 +101,65 @@ async def stream(
)

model_node = node
node = await agent_run.next(node)

yield UiPathRuntimeMessageEvent(
payload=json.loads(serialize_json(model_node.request)),
metadata={"event_name": "model_request"},
)
message_id = str(uuid4())
content_part_id = f"chunk-{message_id}-0"
has_text = False

async with model_node.stream(agent_run.ctx) as stream:
async for text_chunk in stream.stream_text(
delta=True, debounce_by=None
):
if not has_text:
has_text = True
yield UiPathRuntimeMessageEvent(
payload=UiPathConversationMessageEvent(
message_id=message_id,
start=UiPathConversationMessageStartEvent(
role="assistant",
timestamp=self._get_timestamp(),
),
content_part=UiPathConversationContentPartEvent(
content_part_id=content_part_id,
start=UiPathConversationContentPartStartEvent(
mime_type="text/plain",
),
),
),
)

yield UiPathRuntimeMessageEvent(
payload=UiPathConversationMessageEvent(
message_id=message_id,
content_part=UiPathConversationContentPartEvent(
content_part_id=content_part_id,
chunk=UiPathConversationContentPartChunkEvent(
data=text_chunk,
),
),
),
)

next_node = await agent_run.next(model_node)

if has_text:
yield UiPathRuntimeMessageEvent(
payload=UiPathConversationMessageEvent(
message_id=message_id,
end=UiPathConversationMessageEndEvent(),
content_part=UiPathConversationContentPartEvent(
content_part_id=content_part_id,
end=UiPathConversationContentPartEndEvent(),
),
),
)

yield UiPathRuntimeStateEvent(
payload=self._model_response_payload(node),
node_name=agent_name,
phase=UiPathRuntimeStatePhase.COMPLETED,
)
if Agent.is_call_tools_node(next_node):
yield UiPathRuntimeStateEvent(
payload=self._model_response_payload(next_node),
node_name=agent_name,
phase=UiPathRuntimeStatePhase.COMPLETED,
)
node = next_node

elif Agent.is_call_tools_node(node):
tool_calls = node.model_response.tool_calls if has_tools else []
Expand All @@ -115,14 +175,15 @@ async def stream(
phase=UiPathRuntimeStatePhase.STARTED,
)

node = await agent_run.next(node)
next_node = await agent_run.next(node)

if tool_calls:
if tool_calls and Agent.is_model_request_node(next_node):
yield UiPathRuntimeStateEvent(
payload=self._tool_results_payload(node),
payload=self._tool_results_payload(next_node),
node_name=tools_node_name,
phase=UiPathRuntimeStatePhase.COMPLETED,
)
node = next_node

else:
node = await agent_run.next(node)
Expand All @@ -135,6 +196,12 @@ async def stream(
except Exception as e:
raise self._create_runtime_error(e) from e

@staticmethod
def _get_timestamp() -> str:
"""Get current UTC timestamp in ISO 8601 format."""
now = datetime.now(timezone.utc)
return now.strftime("%Y-%m-%dT%H:%M:%S.") + f"{now.microsecond // 1000:03d}Z"

@staticmethod
def _model_request_payload(node: Any) -> dict[str, Any]:
"""Build payload for a ModelRequestNode STARTED event."""
Expand Down Expand Up @@ -181,8 +248,6 @@ def _tool_results_payload(next_node: Any) -> dict[str, Any]:
After agent_run.next() the returned node is a ModelRequestNode
whose request.parts contain ToolReturnPart objects with results.
"""
from pydantic_ai.messages import ToolReturnPart

payload: dict[str, Any] = {}
try:
parts = next_node.request.parts if next_node.request else []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,10 @@ def _conversation_message_item_schema() -> dict[str, Any]:
},
"required": ["inline"],
},
"citations": {
"type": "array",
"items": {"type": "object"},
},
},
"required": ["data"],
},
},
"toolCalls": {"type": "array", "items": {"type": "object"}},
"interrupts": {"type": "array", "items": {"type": "object"}},
},
"required": ["role", "contentParts"],
}
Expand Down
187 changes: 187 additions & 0 deletions packages/uipath-pydantic-ai/tests/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import pytest
from pydantic import BaseModel
from pydantic_ai import Agent
from uipath.core.chat.message import UiPathConversationMessageEvent
from uipath.runtime.events import UiPathRuntimeMessageEvent

from uipath_pydantic_ai.runtime.errors import (
UiPathPydanticAIErrorCode,
Expand Down Expand Up @@ -582,3 +584,188 @@ def my_tool(ctx, query: str) -> str:
result = event.payload["tool_results"][0]
assert "tool_name" in result
assert "content" in result


# ============= TOKEN STREAMING TESTS =============


@pytest.mark.asyncio
async def test_stream_emits_message_events_with_message_id():
"""Streaming must emit UiPathConversationMessageEvent payloads with a message_id."""
from pydantic_ai.models.test import TestModel

agent = Agent(TestModel(custom_output_text="Hi there"), name="msg_agent")
runtime = UiPathPydanticAIRuntime(agent=agent, runtime_id="test", entrypoint="test")

msg_events: list[UiPathConversationMessageEvent] = []
async for event in runtime.stream(input=_uipath_input("Hello")):
if isinstance(event, UiPathRuntimeMessageEvent):
payload = event.payload
assert isinstance(payload, UiPathConversationMessageEvent)
msg_events.append(payload)

assert len(msg_events) >= 3 # START + at least one CHUNK + END
# All events share the same message_id
ids = {e.message_id for e in msg_events}
assert len(ids) == 1


@pytest.mark.asyncio
async def test_stream_message_lifecycle_start_chunks_end():
"""Streaming follows START -> CHUNK(s) -> END lifecycle."""
from pydantic_ai.models.test import TestModel

agent = Agent(TestModel(custom_output_text="Hello world"), name="lc_agent")
runtime = UiPathPydanticAIRuntime(agent=agent, runtime_id="test", entrypoint="test")

msg_events: list[UiPathConversationMessageEvent] = []
async for event in runtime.stream(input=_uipath_input("Say hello")):
if isinstance(event, UiPathRuntimeMessageEvent):
msg_events.append(event.payload)

# First event: START (has start + content_part.start)
first = msg_events[0]
assert first.start is not None
assert first.start.role == "assistant"
assert first.start.timestamp is not None
assert first.content_part is not None
assert first.content_part.start is not None
assert first.content_part.start.mime_type == "text/plain"

# Middle events: CHUNK (has content_part.chunk)
chunks = msg_events[1:-1]
assert len(chunks) >= 1
for chunk_event in chunks:
assert chunk_event.content_part is not None
assert chunk_event.content_part.chunk is not None
assert isinstance(chunk_event.content_part.chunk.data, str)
assert len(chunk_event.content_part.chunk.data) > 0

# Last event: END (has end + content_part.end)
last = msg_events[-1]
assert last.end is not None
assert last.content_part is not None
assert last.content_part.end is not None


@pytest.mark.asyncio
async def test_stream_token_chunks_reassemble_to_full_text():
"""Concatenating all chunk data must produce the full response text."""
from pydantic_ai.models.test import TestModel

expected_text = "The quick brown fox jumps over the lazy dog"
agent = Agent(TestModel(custom_output_text=expected_text), name="concat_agent")
runtime = UiPathPydanticAIRuntime(agent=agent, runtime_id="test", entrypoint="test")

chunk_texts: list[str] = []
async for event in runtime.stream(input=_uipath_input("Tell me something")):
if isinstance(event, UiPathRuntimeMessageEvent):
payload = event.payload
if payload.content_part and payload.content_part.chunk:
chunk_texts.append(payload.content_part.chunk.data)

reassembled = "".join(chunk_texts)
assert reassembled == expected_text


@pytest.mark.asyncio
async def test_stream_content_part_id_consistent():
"""All content_part events in a message must share the same content_part_id."""
from pydantic_ai.models.test import TestModel

agent = Agent(TestModel(custom_output_text="Consistent IDs"), name="cpid_agent")
runtime = UiPathPydanticAIRuntime(agent=agent, runtime_id="test", entrypoint="test")

content_part_ids: set[str] = set()
async for event in runtime.stream(input=_uipath_input("Check IDs")):
if isinstance(event, UiPathRuntimeMessageEvent):
payload = event.payload
if payload.content_part:
content_part_ids.add(payload.content_part.content_part_id)

assert len(content_part_ids) == 1


@pytest.mark.asyncio
async def test_stream_with_tools_emits_message_events():
"""Streaming an agent with tools must emit message events for the final text response."""
from pydantic_ai.models.test import TestModel

def my_tool(ctx, query: str) -> str:
"""Search tool.

Args:
ctx: The agent context.
query: The search query.

Returns:
Search results.
"""
return f"Result for {query}"

agent = Agent(TestModel(), name="tool_msg_agent", tools=[my_tool])
runtime = UiPathPydanticAIRuntime(agent=agent, runtime_id="test", entrypoint="test")

msg_events: list[UiPathConversationMessageEvent] = []
async for event in runtime.stream(input=_uipath_input("Search for cats")):
if isinstance(event, UiPathRuntimeMessageEvent):
msg_events.append(event.payload)

# Should have at least one message lifecycle (final response after tool call)
assert len(msg_events) >= 3

# Verify START/END presence
starts = [e for e in msg_events if e.start is not None]
ends = [e for e in msg_events if e.end is not None]
assert len(starts) >= 1
assert len(ends) >= 1

# Text chunks should exist
chunks = [e for e in msg_events if e.content_part and e.content_part.chunk]
assert len(chunks) >= 1


@pytest.mark.asyncio
async def test_stream_tool_only_turn_skips_message_events():
"""Model turns that produce only tool calls (no text) should not emit message events."""
from pydantic_ai.models.test import TestModel
from uipath.runtime.events import (
UiPathRuntimeStateEvent,
UiPathRuntimeStatePhase,
)

def my_tool(ctx, query: str) -> str:
"""A tool.

Args:
ctx: The agent context.
query: The query.

Returns:
Results.
"""
return "result"

# TestModel with tools: first turn calls tool (no text), second turn returns text
agent = Agent(TestModel(), name="skip_agent", tools=[my_tool])
runtime = UiPathPydanticAIRuntime(agent=agent, runtime_id="test", entrypoint="test")

msg_events: list[UiPathConversationMessageEvent] = []
state_events: list[UiPathRuntimeStateEvent] = []
async for event in runtime.stream(input=_uipath_input("Do something")):
if isinstance(event, UiPathRuntimeMessageEvent):
msg_events.append(event.payload)
elif isinstance(event, UiPathRuntimeStateEvent):
state_events.append(event)

# Should have multiple model turns via state events (tool turn + final turn)
agent_started = [
e
for e in state_events
if e.node_name == "skip_agent" and e.phase == UiPathRuntimeStatePhase.STARTED
]
assert len(agent_started) >= 2 # at least 2 model request turns

# Message events only come from the text-producing turn(s)
message_ids = {e.message_id for e in msg_events}
assert len(message_ids) == 1 # only the final text response
Loading