Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1933,6 +1933,7 @@ To build the SDK from source for use as a dependency, the following prerequisite
* [uv](https://docs.astral.sh/uv/)
* [Rust](https://www.rust-lang.org/)
* [Protobuf Compiler](https://protobuf.dev/)
* [Node.js](https://nodejs.org/)

Use `uv` to install `poe`:

Expand Down Expand Up @@ -2074,6 +2075,12 @@ back from this downgrade, restore both of those files and run `uv sync --all-ext
run for protobuf version 3 by setting the `TEMPORAL_TEST_PROTO3` env var to `1` prior to running
tests.

The local build and lint flows also regenerate Temporal system Nexus models. By default this pulls
in `nexus-rpc-gen@0.1.0-alpha.4` via `npx`. To use an existing checkout instead, set
`TEMPORAL_NEXUS_RPC_GEN_DIR` to the `nexus-rpc-gen` repo root or its `src` directory before
running `poe build-develop`, `poe lint`, or `poe gen-protos`. The local checkout override path
also requires [`pnpm`](https://pnpm.io/) to be installed.

### Style

* Mostly [Google Style Guide](https://google.github.io/styleguide/pyguide.html). Notable exceptions:
Expand Down
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,14 @@ gen-protos = [
{ cmd = "uv run scripts/gen_protos.py" },
{ cmd = "uv run scripts/gen_payload_visitor.py" },
{ cmd = "uv run scripts/gen_bridge_client.py" },
{ ref = "gen-nexus-system-models" },
{ ref = "format" },
]
gen-protos-docker = [
{ cmd = "uv run scripts/gen_protos_docker.py" },
{ cmd = "uv run scripts/gen_payload_visitor.py" },
{ cmd = "uv run scripts/gen_bridge_client.py" },
{ ref = "gen-nexus-system-models" },
{ ref = "format" },
]
lint = [
Expand All @@ -102,6 +104,7 @@ lint-types = [
{ cmd = "uv run mypy --namespace-packages --check-untyped-defs ." },
{ cmd = "uv run basedpyright" },
]
gen-nexus-system-models = "uv run scripts/gen_nexus_system_models.py"
run-bench = "uv run python scripts/run_bench.py"
test = "uv run pytest"

Expand Down Expand Up @@ -139,14 +142,16 @@ environment = { PATH = "$PATH:$HOME/.cargo/bin", CARGO_NET_GIT_FETCH_WITH_CLI =
ignore_missing_imports = true
exclude = [
# Ignore generated code
'build/tool-cache',
'temporalio/api',
'temporalio/bridge/proto',
]

[tool.pydocstyle]
convention = "google"
# https://github.com/PyCQA/pydocstyle/issues/363#issuecomment-625563088
match_dir = "^(?!(docs|scripts|tests|api|proto|\\.)).*"
match_dir = "^(?!(build|docs|scripts|tests|api|proto|\\.)).*"
match = "^(?!_workflow_service_generated\\.py$).*\\.py"
add_ignore = [
# We like to wrap at a certain number of chars, even long summary sentences.
# https://github.com/PyCQA/pydocstyle/issues/184
Expand Down Expand Up @@ -211,7 +216,6 @@ exclude = [
# Exclude auto generated files
"temporalio/api",
"temporalio/bridge/proto",
"temporalio/bridge/_visitor.py",
"tests/worker/workflow_sandbox/testmodules/proto",
]

Expand Down
143 changes: 143 additions & 0 deletions scripts/gen_nexus_system_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from __future__ import annotations

import importlib
import subprocess
import sys
from pathlib import Path

NEXUS_RPC_GEN_VERSION = "0.1.0-alpha.4"


def main() -> None:
repo_root = Path(__file__).resolve().parent.parent
input_schema = (
repo_root
/ "temporalio"
/ "bridge"
/ "sdk-core"
/ "crates"
/ "common"
/ "protos"
/ "api_upstream"
/ "nexus"
/ "temporal-proto-models-nexusrpc.yaml"
)
output_file = (
repo_root / "temporalio" / "nexus" / "system" / "_workflow_service_generated.py"
)

if not input_schema.is_file():
raise RuntimeError(f"Expected Nexus schema at {input_schema}")

run_nexus_rpc_gen(
output_file=output_file,
input_schema=input_schema,
)
add_operation_registry(repo_root, output_file)
subprocess.run(
[
"uv",
"run",
"ruff",
"check",
"--select",
"I",
"--fix",
str(output_file),
],
cwd=repo_root,
check=True,
)
subprocess.run(
[
"uv",
"run",
"ruff",
"format",
str(output_file),
],
cwd=repo_root,
check=True,
)


def add_operation_registry(repo_root: Path, output_file: Path) -> None:
source = output_file.read_text()
source = ensure_typing_import(source)
services = discover_services(repo_root)
if not services:
output_file.write_text(source)
return
output_file.write_text(source.rstrip() + "\n\n" + emit_operation_registry(services))


def ensure_typing_import(source: str) -> str:
if "\nimport typing\n" in source:
return source
marker = "from __future__ import annotations\n\n"
if marker not in source:
raise RuntimeError("Expected future-annotations import in generated output")
return source.replace(marker, marker + "import typing\n", 1)


def discover_services(repo_root: Path) -> list[tuple[str, str, list[tuple[str, str]]]]:
module_name = "temporalio.nexus.system._workflow_service_generated"
sys.path.insert(0, str(repo_root))
try:
sys.modules.pop(module_name, None)
importlib.invalidate_caches()
module = importlib.import_module(module_name)
finally:
sys.path.pop(0)
services: list[tuple[str, str, list[tuple[str, str]]]] = []
for value in vars(module).values():
if not isinstance(value, type):
continue
definition = getattr(value, "__nexus_service_definition__", None)
if definition is None:
continue
operations = [
(operation_definition.method_name, operation_definition.name)
for operation_definition in definition.operation_definitions.values()
]
services.append((value.__name__, definition.name, operations))
return services


def emit_operation_registry(
services: list[tuple[str, str, list[tuple[str, str]]]],
) -> str:
lines = [
"__nexus_operation_registry__: dict[",
" tuple[str, str], Operation[typing.Any, typing.Any]",
"] = {",
]
for class_name, service_name, operations in services:
for attr_name, operation_name in operations:
lines.append(
f" ({service_name!r}, {operation_name!r}): {class_name}.{attr_name},"
)
lines.append("}")
return "\n".join(lines).rstrip() + "\n"


def run_nexus_rpc_gen(*, output_file: Path, input_schema: Path) -> None:
common_args = [
"--lang",
"py",
"--out-file",
str(output_file),
str(input_schema),
]
subprocess.run(
["npx", "--yes", f"nexus-rpc-gen@{NEXUS_RPC_GEN_VERSION}", *common_args],
check=True,
)


if __name__ == "__main__":
try:
main()
except Exception as err:
print(f"Failed to generate Nexus system models: {err}", file=sys.stderr)
raise
Loading