diff --git a/packages/uipath-core/pyproject.toml b/packages/uipath-core/pyproject.toml index 7e559c9a4..cb188084e 100644 --- a/packages/uipath-core/pyproject.toml +++ b/packages/uipath-core/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-core" -version = "0.5.14" +version = "0.5.15" description = "UiPath Core abstractions" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/packages/uipath-core/src/uipath/core/triggers/__init__.py b/packages/uipath-core/src/uipath/core/triggers/__init__.py index 400462277..fb4ee6ae5 100644 --- a/packages/uipath-core/src/uipath/core/triggers/__init__.py +++ b/packages/uipath-core/src/uipath/core/triggers/__init__.py @@ -4,11 +4,13 @@ "UiPathResumeTrigger", "UiPathResumeTriggerType", "UiPathApiTrigger", + "UiPathIntegrationTrigger", "UiPathResumeTriggerName", ] from uipath.core.triggers.trigger import ( UiPathApiTrigger, + UiPathIntegrationTrigger, UiPathResumeTrigger, UiPathResumeTriggerName, UiPathResumeTriggerType, diff --git a/packages/uipath-core/src/uipath/core/triggers/trigger.py b/packages/uipath-core/src/uipath/core/triggers/trigger.py index 424245079..c897acd28 100644 --- a/packages/uipath-core/src/uipath/core/triggers/trigger.py +++ b/packages/uipath-core/src/uipath/core/triggers/trigger.py @@ -53,6 +53,25 @@ class UiPathApiTrigger(BaseModel): model_config = ConfigDict(validate_by_name=True) +class UiPathIntegrationTrigger(BaseModel): + """Integration Services (Inbox) resume trigger request. + + Mirrors Orchestrator's `IntegrationResumeDto`: the configuration needed to + register a remote event trigger through the Connections service and + correlate the eventual payload back to the suspended job via `inbox_id`. + """ + + connector: str = Field(alias="connector") + connection_id: str = Field(alias="connectionId") + operation: str = Field(alias="operation") + object_name: str = Field(alias="objectName") + filter_expression: str | None = Field(default=None, alias="filterExpression") + parameters: dict[str, str] | None = Field(default=None, alias="parameters") + inbox_id: str = Field(alias="inboxId") + + model_config = ConfigDict(validate_by_name=True) + + class UiPathResumeTrigger(BaseModel): """Information needed to resume execution.""" @@ -65,6 +84,9 @@ class UiPathResumeTrigger(BaseModel): ) item_key: str | None = Field(default=None, alias="itemKey") api_resume: UiPathApiTrigger | None = Field(default=None, alias="apiResume") + integration_resume: UiPathIntegrationTrigger | None = Field( + default=None, alias="integrationResume" + ) folder_path: str | None = Field(default=None, alias="folderPath") folder_key: str | None = Field(default=None, alias="folderKey") payload: Any | None = Field(default=None, alias="interruptObject", exclude=True) diff --git a/packages/uipath-core/uv.lock b/packages/uipath-core/uv.lock index 61159a1d5..bf73c603d 100644 --- a/packages/uipath-core/uv.lock +++ b/packages/uipath-core/uv.lock @@ -1007,7 +1007,7 @@ wheels = [ [[package]] name = "uipath-core" -version = "0.5.14" +version = "0.5.15" source = { editable = "." } dependencies = [ { name = "opentelemetry-instrumentation" }, diff --git a/packages/uipath-platform/pyproject.toml b/packages/uipath-platform/pyproject.toml index 59585688f..234dd78b8 100644 --- a/packages/uipath-platform/pyproject.toml +++ b/packages/uipath-platform/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-platform" -version = "0.1.39" +version = "0.1.40" description = "HTTP client library for programmatic access to UiPath Platform" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/packages/uipath-platform/src/uipath/platform/common/__init__.py b/packages/uipath-platform/src/uipath/platform/common/__init__.py index 9070d0d70..cefd92075 100644 --- a/packages/uipath-platform/src/uipath/platform/common/__init__.py +++ b/packages/uipath-platform/src/uipath/platform/common/__init__.py @@ -48,6 +48,7 @@ WaitEphemeralIndex, WaitEphemeralIndexRaw, WaitEscalation, + WaitIntegrationEvent, WaitJob, WaitJobRaw, WaitSystemAgent, @@ -89,6 +90,7 @@ "WaitEphemeralIndexRaw", "DocumentExtractionValidation", "WaitDocumentExtractionValidation", + "WaitIntegrationEvent", "RequestSpec", "Endpoint", "UiPathUrl", diff --git a/packages/uipath-platform/src/uipath/platform/common/interrupt_models.py b/packages/uipath-platform/src/uipath/platform/common/interrupt_models.py index 100b601bd..3b2468551 100644 --- a/packages/uipath-platform/src/uipath/platform/common/interrupt_models.py +++ b/packages/uipath-platform/src/uipath/platform/common/interrupt_models.py @@ -259,3 +259,23 @@ class WaitDocumentExtractionValidation(BaseModel): extraction_validation: StartExtractionValidationResponse task_url: str | None = None + + +class WaitIntegrationEvent(BaseModel): + """Model representing a wait on an Integration Services event. + + Used to suspend a job until a remote event (e.g. Slack message, Teams reply) + is delivered by Integration Services. The SDK resolves `connection_name` + (scoped to `connection_folder_path` when provided) to the underlying + connection id and generates a fresh `inbox_id` when the trigger is created; + the rest of the fields describe which remote event to subscribe to via + the Connections service. + """ + + connector: str + connection_name: str + connection_folder_path: str | None = None + operation: str + object_name: str + filter_expression: str | None = None + parameters: dict[str, str] | None = None diff --git a/packages/uipath-platform/src/uipath/platform/orchestrator/_jobs_service.py b/packages/uipath-platform/src/uipath/platform/orchestrator/_jobs_service.py index f9433d221..a9a132e02 100644 --- a/packages/uipath-platform/src/uipath/platform/orchestrator/_jobs_service.py +++ b/packages/uipath-platform/src/uipath/platform/orchestrator/_jobs_service.py @@ -674,6 +674,52 @@ def _retrieve_api_payload_spec( }, ) + def retrieve_inbox_payload(self, inbox_id: str) -> Any: + """Fetch payload data for Integration Services (Inbox) triggers. + + Unlike `retrieve_api_payload`, this returns the response body as-is. + Orchestrator's `GET /JobTriggers/GetPayload/{inboxId}` returns the + stored payload directly without an envelope. + + Args: + inbox_id: The Id of the inbox to fetch the payload for. + + Returns: + The stored payload. + """ + spec = self._retrieve_api_payload_spec(inbox_id=inbox_id) + + response = self.request( + spec.method, + url=spec.endpoint, + headers=spec.headers, + ) + + return response.json() + + async def retrieve_inbox_payload_async(self, inbox_id: str) -> Any: + """Asynchronously fetch payload data for Integration Services (Inbox) triggers. + + Unlike `retrieve_api_payload_async`, this returns the response body + as-is. Orchestrator's `GET /JobTriggers/GetPayload/{inboxId}` returns + the stored payload directly without an envelope. + + Args: + inbox_id: The Id of the inbox to fetch the payload for. + + Returns: + The stored payload. + """ + spec = self._retrieve_api_payload_spec(inbox_id=inbox_id) + + response = await self.request_async( + spec.method, + url=spec.endpoint, + headers=spec.headers, + ) + + return response.json() + def _extract_first_inbox_id(self, response: Any) -> str: if len(response["value"]) > 0: return response["value"][0]["ItemKey"] diff --git a/packages/uipath-platform/src/uipath/platform/resume_triggers/_protocol.py b/packages/uipath-platform/src/uipath/platform/resume_triggers/_protocol.py index 98e4f81a2..b2dbae787 100644 --- a/packages/uipath-platform/src/uipath/platform/resume_triggers/_protocol.py +++ b/packages/uipath-platform/src/uipath/platform/resume_triggers/_protocol.py @@ -13,6 +13,7 @@ from uipath.core.serialization import serialize_object from uipath.core.triggers import ( UiPathApiTrigger, + UiPathIntegrationTrigger, UiPathResumeTrigger, UiPathResumeTriggerName, UiPathResumeTriggerType, @@ -43,11 +44,13 @@ WaitEphemeralIndex, WaitEphemeralIndexRaw, WaitEscalation, + WaitIntegrationEvent, WaitJob, WaitJobRaw, WaitSystemAgent, WaitTask, ) +from uipath.platform.connections import EventArguments from uipath.platform.context_grounding import DeepRagStatus, IndexStatus from uipath.platform.context_grounding.context_grounding_index import ( ContextGroundingIndex, @@ -401,6 +404,23 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any | None: f"Error fetching API trigger payload for inbox {trigger.api_resume.inbox_id}: {str(e)}", ) from e + case UiPathResumeTriggerType.INBOX: + if trigger.integration_resume and trigger.integration_resume.inbox_id: + try: + inbox_payload = await uipath.jobs.retrieve_inbox_payload_async( + trigger.integration_resume.inbox_id + ) + event_args = EventArguments.model_validate(inbox_payload) + return await uipath.connections.retrieve_event_payload_async( + event_args + ) + except Exception as e: + raise UiPathFaultedTriggerError( + ErrorCategory.SYSTEM, + f"Failed to get trigger payload" + f"Error fetching Inbox trigger payload for inbox {trigger.integration_resume.inbox_id}: {str(e)}", + ) from e + case _: raise UiPathFaultedTriggerError( ErrorCategory.SYSTEM, @@ -461,6 +481,9 @@ async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger: case UiPathResumeTriggerType.API: self._handle_api_trigger(suspend_value, resume_trigger) + case UiPathResumeTriggerType.INBOX: + await self._handle_inbox_trigger(suspend_value, resume_trigger) + case UiPathResumeTriggerType.DEEP_RAG: await self._handle_deep_rag_job_trigger( suspend_value, resume_trigger @@ -545,6 +568,8 @@ def _determine_trigger_type(self, value: Any) -> UiPathResumeTriggerType: value, (DocumentExtractionValidation, WaitDocumentExtractionValidation) ): return UiPathResumeTriggerType.IXP_VS_ESCALATION + if isinstance(value, WaitIntegrationEvent): + return UiPathResumeTriggerType.INBOX # default to API trigger return UiPathResumeTriggerType.API @@ -579,6 +604,8 @@ def _determine_trigger_name(self, value: Any) -> UiPathResumeTriggerName: return UiPathResumeTriggerName.BATCH_RAG if isinstance(value, (DocumentExtraction, WaitDocumentExtraction)): return UiPathResumeTriggerName.EXTRACTION + if isinstance(value, WaitIntegrationEvent): + return UiPathResumeTriggerName.INBOX # default to API trigger return UiPathResumeTriggerName.API @@ -901,6 +928,57 @@ def _handle_api_trigger( inbox_id=str(uuid.uuid4()), request=serialize_object(value) ) + async def _handle_inbox_trigger( + self, value: WaitIntegrationEvent, resume_trigger: UiPathResumeTrigger + ) -> None: + """Handle Inbox-type resume triggers. + + Resolves `connection_name` (scoped to `connection_folder_path` when + provided) to a connection id via the Connections service, populates + `integration_resume` with the Integration Services configuration plus a + freshly generated `inbox_id`. The Connections-service registration is + performed server-side by Orchestrator's `CreateResumeTriggerTaskHandler` + once the job suspends. + + Args: + value: The suspend value (WaitIntegrationEvent) + resume_trigger: The resume trigger to populate + + Raises: + Exception: If no connection matches `connection_name`, or if more + than one exact match is found. + """ + uipath = UiPath() + connections = await uipath.connections.list_async( + name=value.connection_name, + folder_path=value.connection_folder_path, + connector_key=value.connector, + ) + connection = next( + (c for c in connections if c.name == value.connection_name), None + ) + if connection is None: + raise Exception( + f"No connection named '{value.connection_name}' " + f"for connector '{value.connector}' found" + + ( + f" in folder '{value.connection_folder_path}'" + if value.connection_folder_path + else "" + ) + ) + assert connection.id is not None + + resume_trigger.integration_resume = UiPathIntegrationTrigger( + connector=value.connector, + connection_id=connection.id, + operation=value.operation, + object_name=value.object_name, + filter_expression=value.filter_expression, + parameters=value.parameters, + inbox_id=str(uuid.uuid4()), + ) + class UiPathResumeTriggerHandler: """Combined handler for creating and reading resume triggers. diff --git a/packages/uipath-platform/tests/services/test_hitl.py b/packages/uipath-platform/tests/services/test_hitl.py index 2e8069a3e..7395908a9 100644 --- a/packages/uipath-platform/tests/services/test_hitl.py +++ b/packages/uipath-platform/tests/services/test_hitl.py @@ -1,3 +1,4 @@ +import json import uuid from typing import Any from unittest.mock import AsyncMock, patch @@ -7,6 +8,7 @@ from uipath.core.errors import ErrorCategory, UiPathFaultedTriggerError from uipath.core.triggers import ( UiPathApiTrigger, + UiPathIntegrationTrigger, UiPathResumeTrigger, UiPathResumeTriggerName, UiPathResumeTriggerType, @@ -32,11 +34,13 @@ WaitDocumentExtractionValidation, WaitEphemeralIndex, WaitEphemeralIndexRaw, + WaitIntegrationEvent, WaitJob, WaitJobRaw, WaitSystemAgent, WaitTask, ) +from uipath.platform.connections import Connection from uipath.platform.context_grounding import ( BatchTransformCreationResponse, BatchTransformOutputColumn, @@ -508,6 +512,91 @@ async def test_read_api_trigger_failure( await reader.read_trigger(resume_trigger) assert exc_info.value.category == ErrorCategory.SYSTEM + @pytest.mark.anyio + async def test_read_inbox_trigger( + self, + httpx_mock: HTTPXMock, + base_url: str, + setup_test_env: None, + ) -> None: + """Test reading an Inbox trigger fetches the IS metadata via GetPayload + and then enriches it via /elements_/v1/events/{processedEventId}. + """ + inbox_id = str(uuid.uuid4()) + processed_event_id = "v2::pp::1777041494382::334071::e374ecd5d0f73c21" + inbox_metadata = { + "UiPathEventConnector": "uipath-slack", + "UiPathEvent": "NEW_MESSAGE", + "UiPathEventObjectType": "Message", + "UiPathEventObjectId": "C123:1777041494.382", + "UiPathAdditionalEventData": json.dumps( + {"processedEventId": processed_event_id} + ), + } + enriched_event = { + "channel": "alerts", + "user": "U456", + "text": "hello from slack", + "ts": "1777041494.382", + } + + httpx_mock.add_response( + url=f"{base_url}/orchestrator_/api/JobTriggers/GetPayload/{inbox_id}", + status_code=200, + json=inbox_metadata, + ) + httpx_mock.add_response( + url=f"{base_url}/elements_/v1/events/{processed_event_id}", + status_code=200, + json=enriched_event, + ) + + resume_trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.INBOX, + integration_resume=UiPathIntegrationTrigger( + connector="slack", + connection_id=str(uuid.uuid4()), + operation="OnMessage", + object_name="Message", + inbox_id=inbox_id, + ), + ) + + reader = UiPathResumeTriggerReader() + result = await reader.read_trigger(resume_trigger) + assert result == enriched_event + + @pytest.mark.anyio + async def test_read_inbox_trigger_failure( + self, + httpx_mock: HTTPXMock, + base_url: str, + setup_test_env: None, + ) -> None: + """Test reading an Inbox trigger with a failed payload response.""" + inbox_id = str(uuid.uuid4()) + + httpx_mock.add_response( + url=f"{base_url}/orchestrator_/api/JobTriggers/GetPayload/{inbox_id}", + status_code=500, + ) + + resume_trigger = UiPathResumeTrigger( + trigger_type=UiPathResumeTriggerType.INBOX, + integration_resume=UiPathIntegrationTrigger( + connector="slack", + connection_id=str(uuid.uuid4()), + operation="OnMessage", + object_name="Message", + inbox_id=inbox_id, + ), + ) + + with pytest.raises(UiPathFaultedTriggerError) as exc_info: + reader = UiPathResumeTriggerReader() + await reader.read_trigger(resume_trigger) + assert exc_info.value.category == ErrorCategory.SYSTEM + @pytest.mark.anyio async def test_read_deep_rag_trigger_successful( self, @@ -1374,6 +1463,186 @@ async def test_create_resume_trigger_api( assert isinstance(resume_trigger.api_resume.inbox_id, str) assert resume_trigger.api_resume.request == api_input + @pytest.mark.anyio + async def test_create_resume_trigger_wait_integration_event( + self, + setup_test_env: None, + ) -> None: + """Test creating a resume trigger for WaitIntegrationEvent.""" + connection_id = str(uuid.uuid4()) + mock_connection = Connection( + id=connection_id, name="Slack-Alerts", element_instance_id=1 + ) + mock_list_async = AsyncMock(return_value=[mock_connection]) + + wait_event = WaitIntegrationEvent( + connector="slack", + connection_name="Slack-Alerts", + connection_folder_path="Shared", + operation="OnMessage", + object_name="Message", + filter_expression="channel == 'alerts'", + parameters={"channel_id": "C123"}, + ) + + with patch( + "uipath.platform.connections._connections_service.ConnectionsService.list_async", + new=mock_list_async, + ): + processor = UiPathResumeTriggerCreator() + resume_trigger = await processor.create_trigger(wait_event) + + assert resume_trigger is not None + assert resume_trigger.trigger_type == UiPathResumeTriggerType.INBOX + assert resume_trigger.trigger_name == UiPathResumeTriggerName.INBOX + assert resume_trigger.api_resume is None + assert resume_trigger.integration_resume is not None + assert resume_trigger.integration_resume.connector == "slack" + assert resume_trigger.integration_resume.connection_id == connection_id + assert resume_trigger.integration_resume.operation == "OnMessage" + assert resume_trigger.integration_resume.object_name == "Message" + assert ( + resume_trigger.integration_resume.filter_expression == "channel == 'alerts'" + ) + assert resume_trigger.integration_resume.parameters == {"channel_id": "C123"} + assert isinstance(resume_trigger.integration_resume.inbox_id, str) + uuid.UUID(resume_trigger.integration_resume.inbox_id) + mock_list_async.assert_called_once_with( + name="Slack-Alerts", folder_path="Shared", connector_key="slack" + ) + + @pytest.mark.anyio + async def test_create_resume_trigger_wait_integration_event_optional_fields_omitted( + self, + setup_test_env: None, + ) -> None: + """Test that filter_expression, parameters, and folder_path are optional.""" + mock_connection = Connection( + id=str(uuid.uuid4()), name="Teams-Default", element_instance_id=2 + ) + mock_list_async = AsyncMock(return_value=[mock_connection]) + + wait_event = WaitIntegrationEvent( + connector="teams", + connection_name="Teams-Default", + operation="OnReply", + object_name="Reply", + ) + + with patch( + "uipath.platform.connections._connections_service.ConnectionsService.list_async", + new=mock_list_async, + ): + processor = UiPathResumeTriggerCreator() + resume_trigger = await processor.create_trigger(wait_event) + + assert resume_trigger.integration_resume is not None + assert resume_trigger.integration_resume.filter_expression is None + assert resume_trigger.integration_resume.parameters is None + mock_list_async.assert_called_once_with( + name="Teams-Default", folder_path=None, connector_key="teams" + ) + + @pytest.mark.anyio + async def test_wait_integration_event_serializes_with_camelcase_aliases( + self, + setup_test_env: None, + ) -> None: + """Wire shape: UiPathResumeTrigger.integration_resume must serialize + with the field names Orchestrator's ResumeTriggerDto/IntegrationResumeDto + expect (PascalCase-ish camelCase). + """ + connection_id = str(uuid.uuid4()) + mock_connection = Connection( + id=connection_id, name="Slack-Alerts", element_instance_id=3 + ) + mock_list_async = AsyncMock(return_value=[mock_connection]) + + wait_event = WaitIntegrationEvent( + connector="slack", + connection_name="Slack-Alerts", + operation="OnMessage", + object_name="Message", + ) + + with patch( + "uipath.platform.connections._connections_service.ConnectionsService.list_async", + new=mock_list_async, + ): + processor = UiPathResumeTriggerCreator() + resume_trigger = await processor.create_trigger(wait_event) + + dumped = resume_trigger.model_dump(by_alias=True, exclude_none=True) + + assert dumped["triggerType"] == UiPathResumeTriggerType.INBOX + assert "integrationResume" in dumped + integration = dumped["integrationResume"] + assert integration["connector"] == "slack" + assert integration["connectionId"] == connection_id + assert integration["operation"] == "OnMessage" + assert integration["objectName"] == "Message" + assert "inboxId" in integration + uuid.UUID(integration["inboxId"]) + + @pytest.mark.anyio + async def test_create_resume_trigger_wait_integration_event_no_match( + self, + setup_test_env: None, + ) -> None: + """Listing returns no exact-name match -> creator raises.""" + mock_list_async = AsyncMock(return_value=[]) + + wait_event = WaitIntegrationEvent( + connector="slack", + connection_name="Missing-Connection", + operation="OnMessage", + object_name="Message", + ) + + with patch( + "uipath.platform.connections._connections_service.ConnectionsService.list_async", + new=mock_list_async, + ): + processor = UiPathResumeTriggerCreator() + with pytest.raises(UiPathFaultedTriggerError): + await processor.create_trigger(wait_event) + + @pytest.mark.anyio + async def test_create_resume_trigger_wait_integration_event_filters_to_exact_match( + self, + setup_test_env: None, + ) -> None: + """list_async partial-matches; creator must pick the exact-name entry.""" + target_id = str(uuid.uuid4()) + # list_async partial-matches; simulate prefix-matching returning extras + mock_list_async = AsyncMock( + return_value=[ + Connection( + id=str(uuid.uuid4()), + name="Slack-Alerts-Old", + element_instance_id=4, + ), + Connection(id=target_id, name="Slack-Alerts", element_instance_id=5), + ] + ) + + wait_event = WaitIntegrationEvent( + connector="slack", + connection_name="Slack-Alerts", + operation="OnMessage", + object_name="Message", + ) + + with patch( + "uipath.platform.connections._connections_service.ConnectionsService.list_async", + new=mock_list_async, + ): + processor = UiPathResumeTriggerCreator() + resume_trigger = await processor.create_trigger(wait_event) + + assert resume_trigger.integration_resume is not None + assert resume_trigger.integration_resume.connection_id == target_id + @pytest.mark.anyio async def test_create_resume_trigger_create_deep_rag( self, diff --git a/packages/uipath-platform/uv.lock b/packages/uipath-platform/uv.lock index 415634498..773e3ea39 100644 --- a/packages/uipath-platform/uv.lock +++ b/packages/uipath-platform/uv.lock @@ -1056,7 +1056,7 @@ wheels = [ [[package]] name = "uipath-core" -version = "0.5.14" +version = "0.5.15" source = { editable = "../uipath-core" } dependencies = [ { name = "opentelemetry-instrumentation" }, @@ -1088,7 +1088,7 @@ dev = [ [[package]] name = "uipath-platform" -version = "0.1.39" +version = "0.1.40" source = { editable = "." } dependencies = [ { name = "httpx" }, diff --git a/packages/uipath/uv.lock b/packages/uipath/uv.lock index 10b7e683b..5bef60a68 100644 --- a/packages/uipath/uv.lock +++ b/packages/uipath/uv.lock @@ -2650,7 +2650,7 @@ dev = [ [[package]] name = "uipath-core" -version = "0.5.14" +version = "0.5.15" source = { editable = "../uipath-core" } dependencies = [ { name = "opentelemetry-instrumentation" }, @@ -2682,7 +2682,7 @@ dev = [ [[package]] name = "uipath-platform" -version = "0.1.39" +version = "0.1.40" source = { editable = "../uipath-platform" } dependencies = [ { name = "httpx" },