diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 2a9f9799..10fc28d6 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,4 +1,5 @@ * @temporalio/sdk +/langgraph_plugin/ @temporalio/sdk @temporalio/ai-sdk # SDK & Nexus own the README, pyproject.toml, and uv.lock /README.md @temporalio/sdk @temporalio/nexus diff --git a/langgraph_plugin/README.md b/langgraph_plugin/README.md index 8691d178..a242e943 100644 --- a/langgraph_plugin/README.md +++ b/langgraph_plugin/README.md @@ -16,6 +16,7 @@ Samples are organized by API style: | **Continue-as-new** | [graph_api/continue_as_new](graph_api/continue_as_new) | [functional_api/continue_as_new](functional_api/continue_as_new) | Multi-stage data pipeline that uses `continue-as-new` with task result caching so previously-completed stages are not re-executed. | | **ReAct Agent** | [graph_api/react_agent](graph_api/react_agent) | [functional_api/react_agent](functional_api/react_agent) | Tool-calling agent loop. Graph API uses conditional edges; Functional API uses a `while` loop. | | **Control Flow** | -- | [functional_api/control_flow](functional_api/control_flow) | Demonstrates parallel task execution, `for` loops, and `if/else` branching -- patterns that are natural in the Functional API. | +| **Streaming** | [graph_api/streaming](graph_api/streaming) | -- | Streams live output from a running workflow via [Workflow Streams](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/workflow_streams): node tokens via `get_stream_writer()` + `streaming_topic`, plus workflow-side `astream` progress published with `WorkflowStream.topic().publish()`. | | **LangSmith Tracing** | [graph_api/langsmith_tracing](graph_api/langsmith_tracing) | [functional_api/langsmith_tracing](functional_api/langsmith_tracing) | Combines `LangGraphPlugin` with Temporal's `LangSmithPlugin` for durable execution + full observability of LLM calls. Requires API keys. | ## Prerequisites @@ -67,6 +68,7 @@ uv run langgraph_plugin//langsmith_tracing/main.py - **Continue-as-new with caching** -- `cache()` captures completed task results; passing the cache to the next execution avoids re-running them. - **Conditional routing** -- Graph API's `add_conditional_edges` and Functional API's native `if/else`/`while` for agent loops. - **Parallel execution** -- Functional API launches multiple tasks concurrently by creating futures before awaiting them. +- **Streaming** -- Workflow Streams expose a durable, offset-addressed event channel that external clients subscribe to while the workflow is still running. Nodes emit fine-grained tokens via `get_stream_writer()` (routed by the plugin's `streaming_topic`), and the workflow can publish coarse `astream` progress to its own topic. ## Related diff --git a/langgraph_plugin/graph_api/streaming/__init__.py b/langgraph_plugin/graph_api/streaming/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/langgraph_plugin/graph_api/streaming/run_worker.py b/langgraph_plugin/graph_api/streaming/run_worker.py new file mode 100644 index 00000000..46e0090e --- /dev/null +++ b/langgraph_plugin/graph_api/streaming/run_worker.py @@ -0,0 +1,35 @@ +"""Worker for the streaming sample (Graph API).""" + +import asyncio +import os + +from temporalio.client import Client +from temporalio.contrib.langgraph import LangGraphPlugin +from temporalio.worker import Worker + +from langgraph_plugin.graph_api.streaming.workflow import ( + StreamingWorkflow, + make_streaming_graph, +) + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + # streaming_topic routes node get_stream_writer() output onto the "tokens" topic. + plugin = LangGraphPlugin( + graphs={"streaming": make_streaming_graph()}, + streaming_topic="tokens", + ) + + worker = Worker( + client, + task_queue="langgraph-streaming", + workflows=[StreamingWorkflow], + plugins=[plugin], + ) + print("Worker started. Ctrl+C to exit.") + await worker.run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/langgraph_plugin/graph_api/streaming/run_workflow.py b/langgraph_plugin/graph_api/streaming/run_workflow.py new file mode 100644 index 00000000..94881cca --- /dev/null +++ b/langgraph_plugin/graph_api/streaming/run_workflow.py @@ -0,0 +1,44 @@ +"""Start the streaming workflow and subscribe to its Workflow Stream (Graph API).""" + +import asyncio +import os +from datetime import timedelta + +from temporalio.client import Client +from temporalio.contrib.workflow_streams import WorkflowStreamClient + +from langgraph_plugin.graph_api.streaming.workflow import StreamingWorkflow + + +async def main() -> None: + client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233")) + + handle = await client.start_workflow( + StreamingWorkflow.run, + "a brave robot", + id="streaming-workflow", + task_queue="langgraph-streaming", + ) + + # Subscribe to all topics on the workflow's stream and demultiplex on topic. + ws = WorkflowStreamClient.create(client, handle.id) + async for item in ws.subscribe( + from_offset=0, + result_type=dict, + poll_cooldown=timedelta(milliseconds=50), + ): + if item.topic == "tokens": + print(item.data["token"], end="", flush=True) + elif item.topic == "progress": + if item.data.get("done"): + # Let the workflow know we are done consuming so it can complete. + await handle.signal(StreamingWorkflow.ack_stream) + break + print(f"\n[progress] {item.data}") + + result = await handle.result() + print(f"\n\nFinal result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/langgraph_plugin/graph_api/streaming/workflow.py b/langgraph_plugin/graph_api/streaming/workflow.py new file mode 100644 index 00000000..a698b0d6 --- /dev/null +++ b/langgraph_plugin/graph_api/streaming/workflow.py @@ -0,0 +1,90 @@ +"""Streaming with the LangGraph Graph API and Temporal Workflow Streams. + +A workflow's :class:`WorkflowStream` is a durable, offset-addressed event channel +external clients can subscribe to while the workflow is still running. This sample +demonstrates both ways the LangGraph plugin produces stream items: + +- **Node token streaming** -- the ``write_story`` node calls LangGraph's + ``get_stream_writer()`` to emit fine-grained tokens. The plugin is configured with + ``streaming_topic="tokens"`` (see ``run_worker.py``), which routes those writes onto + the ``"tokens"`` topic. +- **Workflow-side ``astream`` publish** -- the workflow drives the graph with + ``app.astream(...)`` and publishes each node-completion chunk onto a ``"progress"`` + topic it owns. + +A single client subscribes to all topics and demultiplexes on ``item.topic``. +""" + +from datetime import timedelta + +from langgraph.config import get_stream_writer +from langgraph.graph import START, StateGraph +from temporalio import workflow +from temporalio.contrib.langgraph import graph as temporal_graph +from temporalio.contrib.workflow_streams import WorkflowStream +from typing_extensions import TypedDict + + +class State(TypedDict): + topic: str + story: str + + +async def outline(state: State) -> dict[str, str]: + """Produce a short opening line. Runs first so ``astream`` emits an early chunk.""" + return {"story": f"A story about {state['topic']}:"} + + +async def write_story(state: State) -> dict[str, str]: + """Write the story, emitting each word as a token via the stream writer.""" + writer = get_stream_writer() + words = f"{state['story']} Once upon a time, there was {state['topic']}.".split() + for word in words: + writer({"token": word + " "}) + return {"story": " ".join(words)} + + +def make_streaming_graph() -> StateGraph: + g = StateGraph(State) + activity_metadata = { + "execute_in": "activity", + "start_to_close_timeout": timedelta(seconds=10), + } + g.add_node("outline", outline, metadata=activity_metadata) + g.add_node("write_story", write_story, metadata=activity_metadata) + g.add_edge(START, "outline") + g.add_edge("outline", "write_story") + return g + + +@workflow.defn +class StreamingWorkflow: + def __init__(self) -> None: + # WorkflowStream must be constructed during workflow initialization. + self.stream = WorkflowStream() + self._stream_acked = False + + @workflow.signal + def ack_stream(self) -> None: + """Signalled by the client once it has finished consuming the stream.""" + self._stream_acked = True + + @workflow.run + async def run(self, topic: str) -> str: + app = temporal_graph("streaming").compile() + progress = self.stream.topic("progress") + + story = "" + async for chunk in app.astream({"topic": topic, "story": ""}): + # Each chunk is {node_name: {state updates}}. Forward it as progress. + progress.publish(chunk) + for node_update in chunk.values(): + if "story" in node_update: + story = node_update["story"] + + progress.publish({"done": True}) + + # The stream disappears when the workflow completes, so wait until the + # client acknowledges it has finished consuming before returning. + await workflow.wait_condition(lambda: self._stream_acked) + return story diff --git a/tests/langgraph_plugin/streaming_test.py b/tests/langgraph_plugin/streaming_test.py new file mode 100644 index 00000000..b96acc8a --- /dev/null +++ b/tests/langgraph_plugin/streaming_test.py @@ -0,0 +1,63 @@ +import uuid +from datetime import timedelta +from typing import Any + +from temporalio.client import Client +from temporalio.contrib.langgraph import LangGraphPlugin +from temporalio.contrib.workflow_streams import WorkflowStreamClient +from temporalio.worker import Worker + +from langgraph_plugin.graph_api.streaming.workflow import ( + StreamingWorkflow, + make_streaming_graph, +) + + +async def test_streaming_graph_api(client: Client) -> None: + task_queue = f"streaming-test-{uuid.uuid4()}" + plugin = LangGraphPlugin( + graphs={"streaming": make_streaming_graph()}, + streaming_topic="tokens", + ) + + async with Worker( + client, + task_queue=task_queue, + workflows=[StreamingWorkflow], + plugins=[plugin], + ): + handle = await client.start_workflow( + StreamingWorkflow.run, + "a brave robot", + id=f"streaming-{uuid.uuid4()}", + task_queue=task_queue, + ) + + ws = WorkflowStreamClient.create(client, handle.id) + tokens: list[dict[str, Any]] = [] + progress: list[dict[str, Any]] = [] + async for item in ws.subscribe( + from_offset=0, + result_type=dict, + poll_cooldown=timedelta(milliseconds=10), + ): + if item.topic == "tokens": + tokens.append(item.data) + elif item.topic == "progress": + if item.data.get("done"): + await handle.signal(StreamingWorkflow.ack_stream) + break + progress.append(item.data) + + result = await handle.result() + + # Tokens reassemble into the final story. + assert tokens, "expected at least one token" + assert all("token" in t for t in tokens) + assembled = "".join(t["token"] for t in tokens).strip() + assert assembled == result + + # Workflow-side astream publish: one chunk per node, in order. + assert [list(chunk)[0] for chunk in progress] == ["outline", "write_story"] + assert result == progress[-1]["write_story"]["story"] + assert "a brave robot" in result