Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ Some examples require extra dependencies. See each sample's directory for specif
* [Nexus Messaging](nexus_messaging): Demonstrates how send signal, update and query messages through Nexus.
This contains two samples, one sending messages to an existing workflow and a second that creates a workflow through Nexus
and sends messages to it.
* [nexus_standalone_operations](nexus_standalone_operations) - Execute Nexus operations directly from client code,
without wrapping them in a workflow.
* [open_telemetry](open_telemetry) - Trace workflows with OpenTelemetry.
* [patching](patching) - Alter workflows safely with `patch` and `deprecate_patch`.
* [polling](polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
Expand Down
69 changes: 69 additions & 0 deletions nexus_standalone_operations/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
This sample demonstrates how to execute Nexus operations directly from client code,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should call out this feature is not stable near the top

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a blurb about SANO being pre-release and the stability guarantees that implies. Copied from the docs PR.

without wrapping them in a workflow. It shows both synchronous and asynchronous
(workflow-backed) operations, plus listing and counting operations.


### Temporal Python SDK support for Standalone Nexus Operations is at [Pre-release](https://docs.temporal.io/evaluate/development-production-features/release-stages#pre-release).

All APIs are experimental and may be subject to backwards-incompatible changes.

Standalone Nexus operations require a server version that supports this feature. Use the dev server build at https://github.com/temporalio/cli/releases/tag/v1.7.2-standalone-nexus-operations.

### Sample directory structure

- [service.py](./service.py) - Nexus service definition with echo (sync) and hello (async) operations
- [handler.py](./handler.py) - Nexus operation handlers and the backing workflow for the async operation
- [worker.py](./worker.py) - Temporal worker that hosts the Nexus service
- [starter.py](./starter.py) - Client that executes standalone Nexus operations


### Instructions

Run the [Temporal dev server build that supports standalone Nexus operations](https://github.com/temporalio/cli/releases/tag/v1.7.2-standalone-nexus-operations).
(If you are going to run locally, you will want to start it in another terminal; this command is blocking and runs until it receives a SIGINT (Ctrl + C) command.)

Start a Temporal dev server with the dynamic config flags required for standalone Nexus operations:

```bash
temporal server start-dev \
--dynamic-config-value "nexusoperation.enableStandalone=true" \
--dynamic-config-value "history.enableChasmCallbacks=true"
```

Create the Nexus endpoint:

```
temporal operator nexus endpoint create \
Comment thread
Evanthx marked this conversation as resolved.
--name nexus-standalone-operations-endpoint \
--target-namespace default \
--target-task-queue nexus-standalone-operations
```

In one terminal, start the worker:
```
uv run nexus_standalone_operations/worker.py
```

In another terminal, run the starter:
```
uv run nexus_standalone_operations/starter.py
```

### Expected output

```
Echo result: hello

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got a different output, this might need updating -

Echo result: hello

Started `MyNexusService.Hello`. OperationID: hello-c40bbf66-606d-4515-bc52-24d0249ec87b
`MyNexusService.Hello` result: Hello, World!

Listing Nexus operations:
 OperationId: hello-c40bbf66-606d-4515-bc52-24d0249ec87b,  Operation: hello,  Status: COMPLETED
 OperationId: echo-0b9a3a7b-987c-471a-bd65-2c8e52804616,  Operation: echo,  Status: COMPLETED

Total Nexus operations: 2

Echo result from existing operation handle: hello

Started `MyNexusService.Hello`. OperationID: hello-...
`MyNexusService.Hello` result: Hello, World!

Listing Nexus operations:
OperationId: echo-..., Operation: echo, Status: COMPLETED
OperationId: hello-..., Operation: hello, Status: COMPLETED

Total Nexus operations: 2
```

If you run the starter code multiple times, you should see additional operations in the listing results, as more operations are run.
The same goes for the total number of operations.
Empty file.
45 changes: 45 additions & 0 deletions nexus_standalone_operations/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Nexus service handler and backing workflow for standalone operations sample."""

from __future__ import annotations

import uuid

import nexusrpc.handler
from temporalio import nexus, workflow

from nexus_standalone_operations.service import (
EchoInput,
EchoOutput,
HelloInput,
HelloOutput,
MyNexusService,
)


@workflow.defn
class HelloWorkflow:
@workflow.run
async def run(self, input: HelloInput) -> HelloOutput:
return HelloOutput(greeting=f"Hello, {input.name}!")


@nexusrpc.handler.service_handler(service=MyNexusService)
class MyNexusServiceHandler:
@nexusrpc.handler.sync_operation
async def echo(
self, _ctx: nexusrpc.handler.StartOperationContext, input: EchoInput
) -> EchoOutput:
return EchoOutput(message=input.message)

@nexus.temporal_operation
async def hello(
self,
_ctx: nexus.TemporalStartOperationContext,
client: nexus.TemporalNexusClient,
input: HelloInput,
) -> nexus.TemporalOperationResult[HelloOutput]:
return await client.start_workflow(
HelloWorkflow.run,
input,
id=str(uuid.uuid4()),
)
39 changes: 39 additions & 0 deletions nexus_standalone_operations/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Nexus service definition for standalone operations sample.

Defines a Nexus service with two operations:
- echo: a synchronous operation that echoes the input message
- hello: an asynchronous (workflow-backed) operation that returns a greeting

This service definition is used by both the handler (to validate operation
signatures) and the client (to create type-safe nexus clients).
"""

from dataclasses import dataclass

import nexusrpc


@dataclass
class EchoInput:
message: str


@dataclass
class EchoOutput:
message: str


@dataclass
class HelloInput:
name: str


@dataclass
class HelloOutput:
greeting: str


@nexusrpc.service
class MyNexusService:
echo: nexusrpc.Operation[EchoInput, EchoOutput]
hello: nexusrpc.Operation[HelloInput, HelloOutput]
83 changes: 83 additions & 0 deletions nexus_standalone_operations/starter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""Starter that demonstrates standalone Nexus operation execution.

Unlike other Nexus samples that call operations from within a workflow, this
sample executes Nexus operations directly from client code using the standalone
Nexus operation APIs.
"""

import asyncio
import uuid
from datetime import timedelta

from temporalio.client import Client
from temporalio.envconfig import ClientConfig

from nexus_standalone_operations.service import (
EchoInput,
EchoOutput,
HelloInput,
MyNexusService,
)

ENDPOINT_NAME = "nexus-standalone-operations-endpoint"


async def main() -> None:
config = ClientConfig.load_client_connect_config()
_ = config.setdefault("target_host", "localhost:7233")
client = await Client.connect(**config)

# Create a typed NexusClient bound to the endpoint and service.
# The endpoint must be pre-created on the server (see README).
nexus_client = client.create_nexus_client(
service=MyNexusService, endpoint=ENDPOINT_NAME
)

# Start sync echo operation and await the result immediately.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Should this comment say something about how this starts the standalone operation? Not important, but just reading the sample it's SUPER easy to miss what it's doing and how it's different.

operation_id = f"echo-{uuid.uuid4()}"
echo_result = await nexus_client.execute_operation(
MyNexusService.echo,
EchoInput(message="hello"),
id=operation_id,
schedule_to_close_timeout=timedelta(seconds=10),
)
print(f"Echo result: {echo_result.message}")

# Get a handle and the result of an existing operation
existing_op_handle = client.get_nexus_operation_handle(
operation_id, operation=MyNexusService.echo
)
existing_result = await existing_op_handle.result()
print(f"Echo result from existing operation handle: {existing_result.message}")

# Start async (workflow-backed) hello operation and get a NexusOperationHandle.
handle = await nexus_client.start_operation(
MyNexusService.hello,
HelloInput(name="World"),
id=f"hello-{uuid.uuid4()}",
schedule_to_close_timeout=timedelta(seconds=10),
)

print(f"\nStarted `MyNexusService.Hello`. OperationID: {handle.operation_id}")

# Use the NexusOperationHandle to await the result of the operation.
hello_result = await handle.result()
print(f"`MyNexusService.Hello` result: {hello_result.greeting}")

# List nexus operations.
print("\nListing Nexus operations:")
query = f'Endpoint = "{ENDPOINT_NAME}"'
async for op in client.list_nexus_operations(query):
print(
f" OperationId: {op.operation_id},",
f" Operation: {op.operation},",
f" Status: {op.status.name}",
)

# Count nexus operations.
count = await client.count_nexus_operations(query)
print(f"\nTotal Nexus operations: {count.count}")


if __name__ == "__main__":
asyncio.run(main())
41 changes: 41 additions & 0 deletions nexus_standalone_operations/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Worker that hosts the Nexus service for standalone operations sample."""

import asyncio
import logging

from temporalio.client import Client
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker

from nexus_standalone_operations.handler import HelloWorkflow, MyNexusServiceHandler

interrupt_event = asyncio.Event()

TASK_QUEUE = "nexus-standalone-operations"


async def main() -> None:
logging.basicConfig(level=logging.INFO)

config = ClientConfig.load_client_connect_config()
_ = config.setdefault("target_host", "localhost:7233")
client = await Client.connect(**config)

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[HelloWorkflow],
nexus_service_handlers=[MyNexusServiceHandler()],
):
logging.info("Worker started, ctrl+c to exit")
_ = await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
7 changes: 6 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,17 @@ async def env(request) -> AsyncGenerator[WorkflowEnvironment, None]:
env_type = request.config.getoption("--workflow-environment")
if env_type == "local":
env = await WorkflowEnvironment.start_local(
dev_server_download_version="v1.7.2-standalone-nexus-operations",
dev_server_extra_args=[
"--dynamic-config-value",
"frontend.enableExecuteMultiOperation=true",
"--dynamic-config-value",
"system.enableEagerWorkflowStart=true",
]
"--dynamic-config-value",
"nexusoperation.enableStandalone=true",
"--dynamic-config-value",
"history.enableChasmCallbacks=true",
],
)
elif env_type == "time-skipping":
env = await WorkflowEnvironment.start_time_skipping()
Expand Down
24 changes: 24 additions & 0 deletions tests/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import asyncio
import time
from collections.abc import Awaitable, Callable
from datetime import timedelta
from typing import TypeVar

T = TypeVar("T")


async def assert_eventually(
fn: Callable[[], Awaitable[T]],
*,
timeout: timedelta = timedelta(seconds=10),
interval: timedelta = timedelta(milliseconds=200),
) -> T:
start_sec = time.monotonic()
while True:
try:
res = await fn()
return res
except AssertionError:
if timedelta(seconds=time.monotonic() - start_sec) >= timeout:
raise
await asyncio.sleep(interval.total_seconds())
3 changes: 0 additions & 3 deletions tests/helpers/nexus.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import temporalio.api
import temporalio.api.common
import temporalio.api.common.v1
import temporalio.api.enums.v1
import temporalio.api.nexus
import temporalio.api.nexus.v1
import temporalio.api.operatorservice
Expand Down
Empty file.
Loading
Loading