-
Notifications
You must be signed in to change notification settings - Fork 110
Add SANO sample #294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add SANO sample #294
Changes from all commits
5b55fce
20ad2cc
a8b65d3
98f1559
b3f78e3
d8e50cb
683ef53
a810f9a
5705f36
23d9e92
f198860
429a066
227a7a1
93efd2f
e24ec96
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| This sample demonstrates how to execute Nexus operations directly from client code, | ||
| without wrapping them in a workflow. It shows both synchronous and asynchronous | ||
| (workflow-backed) operations, plus listing and counting operations. | ||
|
|
||
|
|
||
| ### Temporal Python SDK support for Standalone Nexus Operations is at [Pre-release](https://docs.temporal.io/evaluate/development-production-features/release-stages#pre-release). | ||
|
|
||
| All APIs are experimental and may be subject to backwards-incompatible changes. | ||
|
|
||
| Standalone Nexus operations require a server version that supports this feature. Use the dev server build at https://github.com/temporalio/cli/releases/tag/v1.7.2-standalone-nexus-operations. | ||
|
|
||
| ### Sample directory structure | ||
|
|
||
| - [service.py](./service.py) - Nexus service definition with echo (sync) and hello (async) operations | ||
| - [handler.py](./handler.py) - Nexus operation handlers and the backing workflow for the async operation | ||
| - [worker.py](./worker.py) - Temporal worker that hosts the Nexus service | ||
| - [starter.py](./starter.py) - Client that executes standalone Nexus operations | ||
|
|
||
|
|
||
| ### Instructions | ||
|
|
||
| Run the [Temporal dev server build that supports standalone Nexus operations](https://github.com/temporalio/cli/releases/tag/v1.7.2-standalone-nexus-operations). | ||
| (If you are going to run locally, you will want to start it in another terminal; this command is blocking and runs until it receives a SIGINT (Ctrl + C) command.) | ||
|
|
||
| Start a Temporal dev server with the dynamic config flags required for standalone Nexus operations: | ||
|
|
||
| ```bash | ||
| temporal server start-dev \ | ||
| --dynamic-config-value "nexusoperation.enableStandalone=true" \ | ||
| --dynamic-config-value "history.enableChasmCallbacks=true" | ||
| ``` | ||
|
|
||
| Create the Nexus endpoint: | ||
|
|
||
| ``` | ||
| temporal operator nexus endpoint create \ | ||
|
Evanthx marked this conversation as resolved.
|
||
| --name nexus-standalone-operations-endpoint \ | ||
| --target-namespace default \ | ||
| --target-task-queue nexus-standalone-operations | ||
| ``` | ||
|
|
||
| In one terminal, start the worker: | ||
| ``` | ||
| uv run nexus_standalone_operations/worker.py | ||
| ``` | ||
|
|
||
| In another terminal, run the starter: | ||
| ``` | ||
| uv run nexus_standalone_operations/starter.py | ||
| ``` | ||
|
|
||
| ### Expected output | ||
|
|
||
| ``` | ||
| Echo result: hello | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I got a different output, this might need updating - |
||
| Echo result from existing operation handle: hello | ||
|
|
||
| Started `MyNexusService.Hello`. OperationID: hello-... | ||
| `MyNexusService.Hello` result: Hello, World! | ||
|
|
||
| Listing Nexus operations: | ||
| OperationId: echo-..., Operation: echo, Status: COMPLETED | ||
| OperationId: hello-..., Operation: hello, Status: COMPLETED | ||
|
|
||
| Total Nexus operations: 2 | ||
| ``` | ||
|
|
||
| If you run the starter code multiple times, you should see additional operations in the listing results, as more operations are run. | ||
| The same goes for the total number of operations. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| """Nexus service handler and backing workflow for standalone operations sample.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import uuid | ||
|
|
||
| import nexusrpc.handler | ||
| from temporalio import nexus, workflow | ||
|
|
||
| from nexus_standalone_operations.service import ( | ||
| EchoInput, | ||
| EchoOutput, | ||
| HelloInput, | ||
| HelloOutput, | ||
| MyNexusService, | ||
| ) | ||
|
|
||
|
|
||
| @workflow.defn | ||
| class HelloWorkflow: | ||
| @workflow.run | ||
| async def run(self, input: HelloInput) -> HelloOutput: | ||
| return HelloOutput(greeting=f"Hello, {input.name}!") | ||
|
|
||
|
|
||
| @nexusrpc.handler.service_handler(service=MyNexusService) | ||
| class MyNexusServiceHandler: | ||
| @nexusrpc.handler.sync_operation | ||
| async def echo( | ||
| self, _ctx: nexusrpc.handler.StartOperationContext, input: EchoInput | ||
| ) -> EchoOutput: | ||
| return EchoOutput(message=input.message) | ||
|
|
||
| @nexus.temporal_operation | ||
| async def hello( | ||
| self, | ||
| _ctx: nexus.TemporalStartOperationContext, | ||
| client: nexus.TemporalNexusClient, | ||
| input: HelloInput, | ||
| ) -> nexus.TemporalOperationResult[HelloOutput]: | ||
| return await client.start_workflow( | ||
| HelloWorkflow.run, | ||
| input, | ||
| id=str(uuid.uuid4()), | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| """Nexus service definition for standalone operations sample. | ||
|
|
||
| Defines a Nexus service with two operations: | ||
| - echo: a synchronous operation that echoes the input message | ||
| - hello: an asynchronous (workflow-backed) operation that returns a greeting | ||
|
|
||
| This service definition is used by both the handler (to validate operation | ||
| signatures) and the client (to create type-safe nexus clients). | ||
| """ | ||
|
|
||
| from dataclasses import dataclass | ||
|
|
||
| import nexusrpc | ||
|
|
||
|
|
||
| @dataclass | ||
| class EchoInput: | ||
| message: str | ||
|
|
||
|
|
||
| @dataclass | ||
| class EchoOutput: | ||
| message: str | ||
|
|
||
|
|
||
| @dataclass | ||
| class HelloInput: | ||
| name: str | ||
|
|
||
|
|
||
| @dataclass | ||
| class HelloOutput: | ||
| greeting: str | ||
|
|
||
|
|
||
| @nexusrpc.service | ||
| class MyNexusService: | ||
| echo: nexusrpc.Operation[EchoInput, EchoOutput] | ||
| hello: nexusrpc.Operation[HelloInput, HelloOutput] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| """Starter that demonstrates standalone Nexus operation execution. | ||
|
|
||
| Unlike other Nexus samples that call operations from within a workflow, this | ||
| sample executes Nexus operations directly from client code using the standalone | ||
| Nexus operation APIs. | ||
| """ | ||
|
|
||
| import asyncio | ||
| import uuid | ||
| from datetime import timedelta | ||
|
|
||
| from temporalio.client import Client | ||
| from temporalio.envconfig import ClientConfig | ||
|
|
||
| from nexus_standalone_operations.service import ( | ||
| EchoInput, | ||
| EchoOutput, | ||
| HelloInput, | ||
| MyNexusService, | ||
| ) | ||
|
|
||
| ENDPOINT_NAME = "nexus-standalone-operations-endpoint" | ||
|
|
||
|
|
||
| async def main() -> None: | ||
| config = ClientConfig.load_client_connect_config() | ||
| _ = config.setdefault("target_host", "localhost:7233") | ||
| client = await Client.connect(**config) | ||
|
|
||
| # Create a typed NexusClient bound to the endpoint and service. | ||
| # The endpoint must be pre-created on the server (see README). | ||
| nexus_client = client.create_nexus_client( | ||
| service=MyNexusService, endpoint=ENDPOINT_NAME | ||
| ) | ||
|
|
||
| # Start sync echo operation and await the result immediately. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Should this comment say something about how this starts the standalone operation? Not important, but just reading the sample it's SUPER easy to miss what it's doing and how it's different. |
||
| operation_id = f"echo-{uuid.uuid4()}" | ||
| echo_result = await nexus_client.execute_operation( | ||
| MyNexusService.echo, | ||
| EchoInput(message="hello"), | ||
| id=operation_id, | ||
| schedule_to_close_timeout=timedelta(seconds=10), | ||
| ) | ||
| print(f"Echo result: {echo_result.message}") | ||
|
|
||
| # Get a handle and the result of an existing operation | ||
| existing_op_handle = client.get_nexus_operation_handle( | ||
| operation_id, operation=MyNexusService.echo | ||
| ) | ||
| existing_result = await existing_op_handle.result() | ||
| print(f"Echo result from existing operation handle: {existing_result.message}") | ||
|
|
||
| # Start async (workflow-backed) hello operation and get a NexusOperationHandle. | ||
| handle = await nexus_client.start_operation( | ||
| MyNexusService.hello, | ||
| HelloInput(name="World"), | ||
| id=f"hello-{uuid.uuid4()}", | ||
| schedule_to_close_timeout=timedelta(seconds=10), | ||
| ) | ||
|
|
||
| print(f"\nStarted `MyNexusService.Hello`. OperationID: {handle.operation_id}") | ||
|
|
||
| # Use the NexusOperationHandle to await the result of the operation. | ||
| hello_result = await handle.result() | ||
| print(f"`MyNexusService.Hello` result: {hello_result.greeting}") | ||
|
|
||
| # List nexus operations. | ||
| print("\nListing Nexus operations:") | ||
| query = f'Endpoint = "{ENDPOINT_NAME}"' | ||
| async for op in client.list_nexus_operations(query): | ||
| print( | ||
| f" OperationId: {op.operation_id},", | ||
| f" Operation: {op.operation},", | ||
| f" Status: {op.status.name}", | ||
| ) | ||
|
|
||
| # Count nexus operations. | ||
| count = await client.count_nexus_operations(query) | ||
| print(f"\nTotal Nexus operations: {count.count}") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(main()) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| """Worker that hosts the Nexus service for standalone operations sample.""" | ||
|
|
||
| import asyncio | ||
| import logging | ||
|
|
||
| from temporalio.client import Client | ||
| from temporalio.envconfig import ClientConfig | ||
| from temporalio.worker import Worker | ||
|
|
||
| from nexus_standalone_operations.handler import HelloWorkflow, MyNexusServiceHandler | ||
|
|
||
| interrupt_event = asyncio.Event() | ||
|
|
||
| TASK_QUEUE = "nexus-standalone-operations" | ||
|
|
||
|
|
||
| async def main() -> None: | ||
| logging.basicConfig(level=logging.INFO) | ||
|
|
||
| config = ClientConfig.load_client_connect_config() | ||
| _ = config.setdefault("target_host", "localhost:7233") | ||
| client = await Client.connect(**config) | ||
|
|
||
| async with Worker( | ||
| client, | ||
| task_queue=TASK_QUEUE, | ||
| workflows=[HelloWorkflow], | ||
| nexus_service_handlers=[MyNexusServiceHandler()], | ||
| ): | ||
| logging.info("Worker started, ctrl+c to exit") | ||
| _ = await interrupt_event.wait() | ||
| logging.info("Shutting down") | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| loop = asyncio.new_event_loop() | ||
| try: | ||
| loop.run_until_complete(main()) | ||
| except KeyboardInterrupt: | ||
| interrupt_event.set() | ||
| loop.run_until_complete(loop.shutdown_asyncgens()) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| import asyncio | ||
| import time | ||
| from collections.abc import Awaitable, Callable | ||
| from datetime import timedelta | ||
| from typing import TypeVar | ||
|
|
||
| T = TypeVar("T") | ||
|
|
||
|
|
||
| async def assert_eventually( | ||
| fn: Callable[[], Awaitable[T]], | ||
| *, | ||
| timeout: timedelta = timedelta(seconds=10), | ||
| interval: timedelta = timedelta(milliseconds=200), | ||
| ) -> T: | ||
| start_sec = time.monotonic() | ||
| while True: | ||
| try: | ||
| res = await fn() | ||
| return res | ||
| except AssertionError: | ||
| if timedelta(seconds=time.monotonic() - start_sec) >= timeout: | ||
| raise | ||
| await asyncio.sleep(interval.total_seconds()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should call out this feature is not stable near the top
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a blurb about SANO being pre-release and the stability guarantees that implies. Copied from the docs PR.