Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ opentelemetry = ["opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2"]
pydantic = ["pydantic>=2.0.0,<3"]
openai-agents = ["openai-agents>=0.3,<0.7", "mcp>=1.9.4, <2"]
google-adk = ["google-adk>=1.27.0,<2"]
langsmith = ["langsmith>=0.7.0,<0.8"]
langsmith = ["langsmith>=0.7.34,<0.8"]
lambda-worker-otel = [
"opentelemetry-api>=1.11.1,<2",
"opentelemetry-sdk>=1.11.1,<2",
Expand Down Expand Up @@ -79,7 +79,7 @@ dev = [
"pytest-rerunfailures>=16.1",
"pytest-xdist>=3.6,<4",
"moto[s3,server]>=5",
"langsmith>=0.7.0,<0.8",
"langsmith>=0.7.34,<0.8",
"setuptools<82",
"opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2",
"opentelemetry-semantic-conventions>=0.40b0,<1",
Expand Down
60 changes: 29 additions & 31 deletions temporalio/contrib/langsmith/_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,48 +152,46 @@ def _get_current_run_for_propagation() -> RunTree | None:


# ---------------------------------------------------------------------------
# Workflow event loop safety: patch @traceable's aio_to_thread
# Workflow event loop safety: override @traceable's aio_to_thread
# ---------------------------------------------------------------------------

_aio_to_thread_patched = False
_aio_to_thread_override_installed = False


def _patch_aio_to_thread() -> None:
"""Patch langsmith's ``aio_to_thread`` to run synchronously in workflows.
async def _temporal_aio_to_thread(
default_aio_to_thread: Callable[..., Any],
ctx: Any,
func: Callable[..., Any],
/,
*args: Any,
**kwargs: Any,
) -> Any:
"""Run LangSmith's ``aio_to_thread`` synchronously inside Temporal workflows.

The ``@traceable`` decorator on async functions uses ``aio_to_thread()`` →
``loop.run_in_executor()`` for run setup/teardown. The Temporal workflow
event loop does not support ``run_in_executor``. This patch runs those
functions synchronously on the workflow thread when inside a workflow.
Functions passed here must not perform blocking I/O.
event loop does not support ``run_in_executor``. This override runs those
functions synchronously on the workflow thread when inside a workflow,
and delegates to the default implementation outside workflows.

Registered via ``langsmith.set_runtime_overrides(aio_to_thread=...)``.
"""
global _aio_to_thread_patched # noqa: PLW0603
if _aio_to_thread_patched:
return

import langsmith._internal._aiter as _aiter
if not temporalio.workflow.in_workflow():
return await default_aio_to_thread(ctx, func, *args, **kwargs)
with temporalio.workflow.unsafe.sandbox_unrestricted():
return ctx.run(func, *args, **kwargs)

_original = _aiter.aio_to_thread

import contextvars
def _install_aio_to_thread_override() -> None:
"""Install the ``aio_to_thread`` override via LangSmith's official API.

async def _safe_aio_to_thread(
func: Callable[..., Any],
/,
*args: Any,
__ctx: contextvars.Context | None = None,
**kwargs: Any,
) -> Any:
if not temporalio.workflow.in_workflow():
return await _original(func, *args, __ctx=__ctx, **kwargs)
with temporalio.workflow.unsafe.sandbox_unrestricted():
# Run without ctx.run() so context var changes propagate
# to the caller. Safe because workflows are single-threaded.
return func(*args, **kwargs)

_aiter.aio_to_thread = _safe_aio_to_thread # type: ignore[assignment]
_aio_to_thread_patched = True
Safe to call multiple times; the override is only installed once.
"""
global _aio_to_thread_override_installed # noqa: PLW0603
if _aio_to_thread_override_installed:
return
langsmith.set_runtime_overrides(aio_to_thread=_temporal_aio_to_thread)
_aio_to_thread_override_installed = True


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -595,7 +593,7 @@ def workflow_interceptor_class(
self, input: temporalio.worker.WorkflowInterceptorClassInput
) -> type[_LangSmithWorkflowInboundInterceptor]:
"""Return the workflow interceptor class with config bound."""
_patch_aio_to_thread()
_install_aio_to_thread_override()
config = self

class InterceptorWithConfig(_LangSmithWorkflowInboundInterceptor):
Expand Down
10 changes: 5 additions & 5 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading