Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
33a2f15
Add temporalio.contrib.pubsub — reusable pub/sub for workflows
jssmith Apr 6, 2026
e2712e2
Fix PubSubState CAN serialization and simplify subscribe error handling
jssmith Apr 6, 2026
17952ae
Polish pub/sub contrib: README, flush safety, init guard, factory method
jssmith Apr 6, 2026
d1dfce7
Add init guards to poll/query handlers and fix README CAN example
jssmith Apr 6, 2026
f20ba36
Guard validator against missing init_pubsub, fix PubSubState docstring
jssmith Apr 6, 2026
70bf747
Guard get_pubsub_state/drain_pubsub, add replay and max_batch_size tests
jssmith Apr 6, 2026
70898d0
Add review comments and design addenda for pubsub redesign
jssmith Apr 7, 2026
5ff7e27
Implement pubsub redesign: dedup, base_offset, flush safety, API cleanup
jssmith Apr 7, 2026
6fbb168
TLA+-verified dedup rewrite, TTL pruning, truncation, API improvements
jssmith Apr 7, 2026
42b0df1
Remove TLA+ proof references from implementation code
jssmith Apr 7, 2026
c87a65a
Update uv.lock
jssmith Apr 7, 2026
d5a23c3
Add signal vs update dedup analysis; clarify ordering guarantees
jssmith Apr 7, 2026
3089b12
Add end-to-end dedup analysis: proper layering for three duplicate types
jssmith Apr 7, 2026
f06a53e
Expand DESIGN-v2 with offset model rationale and BFF/SSE reconnection…
jssmith Apr 7, 2026
990a6a7
pubsub: use base64 wire format with native bytes API
jssmith Apr 7, 2026
f2c6e55
pubsub: remove poll timeout and update design doc
jssmith Apr 8, 2026
a9abc20
Add token-level streaming to OpenAI and ADK Temporal plugins
jssmith Apr 8, 2026
20dafc0
pubsub: replace PubSubState Pydantic model with plain dataclass
jssmith Apr 9, 2026
5a8716c
pubsub: add per-item offsets to PubSubItem and _WireItem
jssmith Apr 10, 2026
eda55d5
pubsub: add design addendum for per-item offsets
jssmith Apr 10, 2026
7bc830a
pubsub: fix truncated offset crash and add recovery
jssmith Apr 10, 2026
475df95
Add cross-workflow and cross-namespace pub/sub tests
jssmith Apr 19, 2026
90d753e
pubsub: cap poll response at ~1MB and skip cooldown when more data ready
jssmith Apr 19, 2026
c76a774
Add compatibility contract to pub/sub design doc
jssmith Apr 19, 2026
97be29c
Fix sequence reuse after retry timeout (TLA+-verified)
jssmith Apr 19, 2026
6f0f345
Merge remote-tracking branch 'origin/main' into contrib/pubsub
jssmith Apr 19, 2026
9d0a259
Remove backward-compat code and historical design docs from pubsub
jssmith Apr 20, 2026
c4ec6e7
Update pubsub README: rename for_workflow → create, streamline docs
jssmith Apr 20, 2026
4945cbc
Fix continue-as-new example to show application state carried alongsi…
jssmith Apr 20, 2026
6d9ea42
Add motivation and architectural context to pubsub README intro
jssmith Apr 20, 2026
7d42b29
Move bytes/base64 payload detail to Cross-Language Protocol section
jssmith Apr 20, 2026
436430c
Move analysis docs and TLA+ verification out of pubsub module
jssmith Apr 20, 2026
c09ad49
Remove TLA+ references, document opaque-bytes and JSON converter rati…
jssmith Apr 20, 2026
e683c5c
Merge branch 'main' into contrib/pubsub
jssmith Apr 21, 2026
3a71028
Clean up pubsub tests: remove redundant cases, de-flake barriers
jssmith Apr 22, 2026
4ab7ce4
Replace remaining brittle sleeps in pubsub tests and type handle helpers
jssmith Apr 23, 2026
2fbe0d4
Clarify that pubsub truncation is workflow-side only
jssmith Apr 23, 2026
fdbb339
Switch test truncate from signal to update for explicit completion
jssmith Apr 23, 2026
5a0796f
Delete test_mixin_coexistence
jssmith Apr 23, 2026
3541790
Force interleaving in test_concurrent_subscribers
jssmith Apr 23, 2026
68ad53d
Strengthen CAN test, widen TTL margins, document Any-field pitfall
jssmith Apr 23, 2026
682c420
Hoist inline imports to module level in pubsub tests
jssmith Apr 23, 2026
368d023
Fix __aexit__ drain race and strengthen pubsub tests
jssmith Apr 23, 2026
beacec9
Style + docstring cleanups in pubsub contrib module
jssmith Apr 23, 2026
56789ed
Apply pubsub review feedback: init pattern, force_flush, from_activity
jssmith Apr 23, 2026
6193f80
Migrate pubsub payloads from opaque bytes to Temporal Payload
jssmith Apr 23, 2026
4f9d669
Bump sdk-core submodule to match temporalio-client 0.2.0
jssmith Apr 23, 2026
e9d4e6b
Port Notion narrative into DESIGN-v2.md and add sync-policy note
jssmith Apr 23, 2026
75efe24
Merge remote-tracking branch 'origin/main' into contrib/pubsub
jssmith Apr 23, 2026
68c719e
Apply pubsub API renames to ADK/OpenAI streaming plugins
jssmith Apr 23, 2026
72d296e
Replace PubSubMixin with PubSub dynamic handler registration
jssmith Apr 24, 2026
ef7e041
Document per-poll fan-out and list future-work items in DESIGN-v2
jssmith Apr 24, 2026
99a7a8a
openai_agents: publish raw stream events, drop normalization layer
jssmith Apr 24, 2026
4205242
Fix lint findings from CI (ruff format, pyright, pydocstyle)
jssmith Apr 24, 2026
dddbcef
Fix Python 3.10 lint/type errors in pubsub tests
jssmith Apr 24, 2026
47ee940
pubsub tests: also suppress reportUnreachable on the 3.11 import branch
jssmith Apr 24, 2026
8a971d0
pubsub tests: attach reportUnreachable ignore to the import-stmt line
jssmith Apr 24, 2026
736b570
pubsub: fix dynamic-signal-vs-update race and pydoctor cross-ref
jssmith Apr 24, 2026
47106ad
pubsub: document sync-handler/publish race with asyncio.sleep(0) recipe
jssmith Apr 24, 2026
885d0e8
pubsub tests: switch TruncateWorkflow.truncate to the async recipe
jssmith Apr 24, 2026
2d76877
pubsub: add public async flush() barrier
jssmith Apr 25, 2026
8e5c3e4
pubsub: document migration to server-side request_id dedup
jssmith Apr 25, 2026
9274670
pubsub: accept a single string for subscribe(topics=...)
jssmith Apr 25, 2026
b11748b
pubsub: prefix internal handler names with __temporal_
jssmith Apr 25, 2026
48645d4
pubsub: clean up three lint suppressions flagged by codex review
jssmith Apr 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ dev = [
"opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2",
"opentelemetry-semantic-conventions>=0.40b0,<1",
"opentelemetry-sdk-extension-aws>=2.0.0,<3",
"async-timeout>=4.0,<6; python_version < '3.11'",
]

[tool.poe.tasks]
Expand Down
94 changes: 91 additions & 3 deletions temporalio/contrib/google_adk_agents/_model.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
import json
import logging
from collections.abc import AsyncGenerator, Callable
from datetime import timedelta
from datetime import datetime, timedelta, timezone

from google.adk.models import BaseLlm, LLMRegistry
from google.adk.models.llm_request import LlmRequest
from google.adk.models.llm_response import LlmResponse

import temporalio.workflow
from temporalio import activity, workflow
from temporalio.contrib.pubsub import PubSubClient
from temporalio.workflow import ActivityConfig

logger = logging.getLogger(__name__)

EVENTS_TOPIC = "events"


def _make_event(event_type: str, **data: object) -> bytes:
return json.dumps(
{
"type": event_type,
"timestamp": datetime.now(timezone.utc).isoformat(),
"data": data,
}
).encode()


@activity.defn
async def invoke_model(llm_request: LlmRequest) -> list[LlmResponse]:
Expand Down Expand Up @@ -36,13 +53,78 @@ async def invoke_model(llm_request: LlmRequest) -> list[LlmResponse]:
]


@activity.defn
async def invoke_model_streaming(llm_request: LlmRequest) -> list[LlmResponse]:
"""Streaming-aware model activity.

Calls the LLM with stream=True, publishes TEXT_DELTA events via
PubSubClient as tokens arrive, and returns the collected responses.

The PubSubClient auto-detects the activity context to find the parent
workflow for publishing.

Args:
llm_request: The LLM request containing model name and parameters.

Returns:
List of LLM responses from the model.
"""
if llm_request.model is None:
raise ValueError("No model name provided, could not create LLM.")

llm = LLMRegistry.new_llm(llm_request.model)
if not llm:
raise ValueError(f"Failed to create LLM for model: {llm_request.model}")

pubsub = PubSubClient.from_activity(batch_interval=0.1)
responses: list[LlmResponse] = []
text_buffer = ""

async with pubsub:
pubsub.publish(EVENTS_TOPIC, _make_event("LLM_CALL_START"), force_flush=True)

async for response in llm.generate_content_async(
llm_request=llm_request, stream=True
):
activity.heartbeat()
responses.append(response)

if response.content and response.content.parts:
for part in response.content.parts:
if part.text:
text_buffer += part.text
pubsub.publish(
EVENTS_TOPIC,
_make_event("TEXT_DELTA", delta=part.text),
)
if part.function_call:
pubsub.publish(
EVENTS_TOPIC,
_make_event(
"TOOL_CALL_START",
tool_name=part.function_call.name,
),
)

if text_buffer:
pubsub.publish(
EVENTS_TOPIC,
_make_event("TEXT_COMPLETE", text=text_buffer),
force_flush=True,
)
pubsub.publish(EVENTS_TOPIC, _make_event("LLM_CALL_COMPLETE"), force_flush=True)

return responses


class TemporalModel(BaseLlm):
"""A Temporal-based LLM model that executes model invocations as activities."""

def __init__(
self,
model_name: str,
activity_config: ActivityConfig | None = None,
streaming: bool = False,
*,
summary_fn: Callable[[LlmRequest], str | None] | None = None,
) -> None:
Expand All @@ -51,6 +133,9 @@ def __init__(
Args:
model_name: The name of the model to use.
activity_config: Configuration options for the activity execution.
streaming: When True, the model activity uses the streaming LLM
endpoint and publishes token events via PubSubClient. The
workflow is unaffected -- it still receives complete responses.
summary_fn: Optional callable that receives the LlmRequest and
returns a summary string (or None) for the activity. Must be
deterministic as it is called during workflow execution. If
Expand All @@ -62,6 +147,7 @@ def __init__(
"""
super().__init__(model=model_name)
self._model_name = model_name
self._streaming = streaming
self._summary_fn = summary_fn
self._activity_config = ActivityConfig(
start_to_close_timeout=timedelta(seconds=60)
Expand All @@ -80,7 +166,8 @@ async def generate_content_async(

Args:
llm_request: The LLM request containing model parameters and content.
stream: Whether to stream the response (currently ignored).
stream: Whether to stream the response (currently ignored; use the
``streaming`` constructor parameter instead).

Yields:
The responses from the model.
Expand All @@ -103,8 +190,9 @@ async def generate_content_async(
agent_name = llm_request.config.labels.get("adk_agent_name")
if agent_name:
config["summary"] = agent_name
activity_fn = invoke_model_streaming if self._streaming else invoke_model
responses = await workflow.execute_activity(
invoke_model,
activity_fn,
args=[llm_request],
**config,
)
Expand Down
7 changes: 5 additions & 2 deletions temporalio/contrib/google_adk_agents/_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

from temporalio import workflow
from temporalio.contrib.google_adk_agents._mcp import TemporalMcpToolSetProvider
from temporalio.contrib.google_adk_agents._model import invoke_model
from temporalio.contrib.google_adk_agents._model import (
invoke_model,
invoke_model_streaming,
)
from temporalio.contrib.pydantic import (
PydanticPayloadConverter,
ToJsonOptions,
Expand Down Expand Up @@ -95,7 +98,7 @@ def workflow_runner(runner: WorkflowRunner | None) -> WorkflowRunner:
)
return runner

new_activities = [invoke_model]
new_activities = [invoke_model, invoke_model_streaming]
if toolset_providers is not None:
for toolset_provider in toolset_providers:
new_activities.extend(toolset_provider._get_activities())
Expand Down
Loading
Loading