Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/uipath-core/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-core"
version = "0.5.14"
version = "0.5.15"
description = "UiPath Core abstractions"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
2 changes: 2 additions & 0 deletions packages/uipath-core/src/uipath/core/triggers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
"UiPathResumeTrigger",
"UiPathResumeTriggerType",
"UiPathApiTrigger",
"UiPathIntegrationTrigger",
"UiPathResumeTriggerName",
]

from uipath.core.triggers.trigger import (
UiPathApiTrigger,
UiPathIntegrationTrigger,
UiPathResumeTrigger,
UiPathResumeTriggerName,
UiPathResumeTriggerType,
Expand Down
22 changes: 22 additions & 0 deletions packages/uipath-core/src/uipath/core/triggers/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,25 @@ class UiPathApiTrigger(BaseModel):
model_config = ConfigDict(validate_by_name=True)


class UiPathIntegrationTrigger(BaseModel):
"""Integration Services (Inbox) resume trigger request.

Mirrors Orchestrator's `IntegrationResumeDto`: the configuration needed to
register a remote event trigger through the Connections service and
correlate the eventual payload back to the suspended job via `inbox_id`.
"""

connector: str = Field(alias="connector")
connection_id: str = Field(alias="connectionId")
operation: str = Field(alias="operation")
object_name: str = Field(alias="objectName")
filter_expression: str | None = Field(default=None, alias="filterExpression")
parameters: dict[str, str] | None = Field(default=None, alias="parameters")
inbox_id: str = Field(alias="inboxId")

model_config = ConfigDict(validate_by_name=True)


class UiPathResumeTrigger(BaseModel):
"""Information needed to resume execution."""

Expand All @@ -65,6 +84,9 @@ class UiPathResumeTrigger(BaseModel):
)
item_key: str | None = Field(default=None, alias="itemKey")
api_resume: UiPathApiTrigger | None = Field(default=None, alias="apiResume")
integration_resume: UiPathIntegrationTrigger | None = Field(
default=None, alias="integrationResume"
)
folder_path: str | None = Field(default=None, alias="folderPath")
folder_key: str | None = Field(default=None, alias="folderKey")
payload: Any | None = Field(default=None, alias="interruptObject", exclude=True)
Expand Down
2 changes: 1 addition & 1 deletion packages/uipath-core/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion packages/uipath-platform/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-platform"
version = "0.1.39"
version = "0.1.40"
description = "HTTP client library for programmatic access to UiPath Platform"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
WaitEphemeralIndex,
WaitEphemeralIndexRaw,
WaitEscalation,
WaitIntegrationEvent,
WaitJob,
WaitJobRaw,
WaitSystemAgent,
Expand Down Expand Up @@ -89,6 +90,7 @@
"WaitEphemeralIndexRaw",
"DocumentExtractionValidation",
"WaitDocumentExtractionValidation",
"WaitIntegrationEvent",
"RequestSpec",
"Endpoint",
"UiPathUrl",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,23 @@ class WaitDocumentExtractionValidation(BaseModel):

extraction_validation: StartExtractionValidationResponse
task_url: str | None = None


class WaitIntegrationEvent(BaseModel):
"""Model representing a wait on an Integration Services event.

Used to suspend a job until a remote event (e.g. Slack message, Teams reply)
is delivered by Integration Services. The SDK resolves `connection_name`
(scoped to `connection_folder_path` when provided) to the underlying
connection id and generates a fresh `inbox_id` when the trigger is created;
the rest of the fields describe which remote event to subscribe to via
the Connections service.
"""

connector: str
connection_name: str
connection_folder_path: str | None = None
operation: str
object_name: str
filter_expression: str | None = None
parameters: dict[str, str] | None = None
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,52 @@ def _retrieve_api_payload_spec(
},
)

def retrieve_inbox_payload(self, inbox_id: str) -> Any:
"""Fetch payload data for Integration Services (Inbox) triggers.

Unlike `retrieve_api_payload`, this returns the response body as-is.
Orchestrator's `GET /JobTriggers/GetPayload/{inboxId}` returns the
stored payload directly without an envelope.

Args:
inbox_id: The Id of the inbox to fetch the payload for.

Returns:
The stored payload.
"""
spec = self._retrieve_api_payload_spec(inbox_id=inbox_id)

response = self.request(
spec.method,
url=spec.endpoint,
headers=spec.headers,
)

return response.json()

async def retrieve_inbox_payload_async(self, inbox_id: str) -> Any:
"""Asynchronously fetch payload data for Integration Services (Inbox) triggers.

Unlike `retrieve_api_payload_async`, this returns the response body
as-is. Orchestrator's `GET /JobTriggers/GetPayload/{inboxId}` returns
the stored payload directly without an envelope.

Args:
inbox_id: The Id of the inbox to fetch the payload for.

Returns:
The stored payload.
"""
spec = self._retrieve_api_payload_spec(inbox_id=inbox_id)

response = await self.request_async(
spec.method,
url=spec.endpoint,
headers=spec.headers,
)

return response.json()

def _extract_first_inbox_id(self, response: Any) -> str:
if len(response["value"]) > 0:
return response["value"][0]["ItemKey"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from uipath.core.serialization import serialize_object
from uipath.core.triggers import (
UiPathApiTrigger,
UiPathIntegrationTrigger,
UiPathResumeTrigger,
UiPathResumeTriggerName,
UiPathResumeTriggerType,
Expand Down Expand Up @@ -43,11 +44,13 @@
WaitEphemeralIndex,
WaitEphemeralIndexRaw,
WaitEscalation,
WaitIntegrationEvent,
WaitJob,
WaitJobRaw,
WaitSystemAgent,
WaitTask,
)
from uipath.platform.connections import EventArguments
from uipath.platform.context_grounding import DeepRagStatus, IndexStatus
from uipath.platform.context_grounding.context_grounding_index import (
ContextGroundingIndex,
Expand Down Expand Up @@ -401,6 +404,23 @@ async def read_trigger(self, trigger: UiPathResumeTrigger) -> Any | None:
f"Error fetching API trigger payload for inbox {trigger.api_resume.inbox_id}: {str(e)}",
) from e

case UiPathResumeTriggerType.INBOX:
if trigger.integration_resume and trigger.integration_resume.inbox_id:
try:
inbox_payload = await uipath.jobs.retrieve_inbox_payload_async(
trigger.integration_resume.inbox_id
)
event_args = EventArguments.model_validate(inbox_payload)
return await uipath.connections.retrieve_event_payload_async(
event_args
)
except Exception as e:
raise UiPathFaultedTriggerError(
ErrorCategory.SYSTEM,
f"Failed to get trigger payload"
f"Error fetching Inbox trigger payload for inbox {trigger.integration_resume.inbox_id}: {str(e)}",
) from e

case _:
raise UiPathFaultedTriggerError(
ErrorCategory.SYSTEM,
Expand Down Expand Up @@ -461,6 +481,9 @@ async def create_trigger(self, suspend_value: Any) -> UiPathResumeTrigger:
case UiPathResumeTriggerType.API:
self._handle_api_trigger(suspend_value, resume_trigger)

case UiPathResumeTriggerType.INBOX:
await self._handle_inbox_trigger(suspend_value, resume_trigger)

case UiPathResumeTriggerType.DEEP_RAG:
await self._handle_deep_rag_job_trigger(
suspend_value, resume_trigger
Expand Down Expand Up @@ -545,6 +568,8 @@ def _determine_trigger_type(self, value: Any) -> UiPathResumeTriggerType:
value, (DocumentExtractionValidation, WaitDocumentExtractionValidation)
):
return UiPathResumeTriggerType.IXP_VS_ESCALATION
if isinstance(value, WaitIntegrationEvent):
return UiPathResumeTriggerType.INBOX
# default to API trigger
return UiPathResumeTriggerType.API

Expand Down Expand Up @@ -579,6 +604,8 @@ def _determine_trigger_name(self, value: Any) -> UiPathResumeTriggerName:
return UiPathResumeTriggerName.BATCH_RAG
if isinstance(value, (DocumentExtraction, WaitDocumentExtraction)):
return UiPathResumeTriggerName.EXTRACTION
if isinstance(value, WaitIntegrationEvent):
return UiPathResumeTriggerName.INBOX
# default to API trigger
return UiPathResumeTriggerName.API

Expand Down Expand Up @@ -901,6 +928,57 @@ def _handle_api_trigger(
inbox_id=str(uuid.uuid4()), request=serialize_object(value)
)

async def _handle_inbox_trigger(
self, value: WaitIntegrationEvent, resume_trigger: UiPathResumeTrigger
) -> None:
"""Handle Inbox-type resume triggers.

Resolves `connection_name` (scoped to `connection_folder_path` when
provided) to a connection id via the Connections service, populates
`integration_resume` with the Integration Services configuration plus a
freshly generated `inbox_id`. The Connections-service registration is
performed server-side by Orchestrator's `CreateResumeTriggerTaskHandler`
once the job suspends.

Args:
value: The suspend value (WaitIntegrationEvent)
resume_trigger: The resume trigger to populate

Raises:
Exception: If no connection matches `connection_name`, or if more
than one exact match is found.
"""
uipath = UiPath()
connections = await uipath.connections.list_async(
name=value.connection_name,
folder_path=value.connection_folder_path,
connector_key=value.connector,
)
connection = next(
(c for c in connections if c.name == value.connection_name), None
)
if connection is None:
raise Exception(
f"No connection named '{value.connection_name}' "
f"for connector '{value.connector}' found"
+ (
f" in folder '{value.connection_folder_path}'"
if value.connection_folder_path
else ""
)
)
assert connection.id is not None

resume_trigger.integration_resume = UiPathIntegrationTrigger(
connector=value.connector,
connection_id=connection.id,
operation=value.operation,
object_name=value.object_name,
filter_expression=value.filter_expression,
parameters=value.parameters,
inbox_id=str(uuid.uuid4()),
)


class UiPathResumeTriggerHandler:
"""Combined handler for creating and reading resume triggers.
Expand Down
Loading
Loading