Skip to content

feat(model-engine): add GcpPubSubQueueEndpointResourceDelegate for async endpoints on GCP#827

Open
postevanus-scale wants to merge 4 commits into
mainfrom
feat/gcp-pubsub-queue-delegate
Open

feat(model-engine): add GcpPubSubQueueEndpointResourceDelegate for async endpoints on GCP#827
postevanus-scale wants to merge 4 commits into
mainfrom
feat/gcp-pubsub-queue-delegate

Conversation

@postevanus-scale
Copy link
Copy Markdown
Collaborator

@postevanus-scale postevanus-scale commented May 11, 2026

Summary

Adds a Google Cloud Pub/Sub-backed implementation of QueueEndpointResourceDelegate so that cloud_provider=gcp clusters can deploy async inference endpoints.

Before this PR, async deploys on GCP fail at live_endpoint_resource_gateway.create_queue because the only wired delegate is SQS (AWS), which raises NoCredentialsError on a GCP node. We see this consistently on gke_scale-dev-mofa_asia-northeast3_sgp-pmodev-kubernetes-cluster.

Repro

Temporal workflow IDs from a recent attempt on the GCP cluster:

  • 1020f4b2-e55f-4b6e-acf3-7c0090a4c5c2 (qwen3-e-4b-7, sync — fails at workflow create_activity, separate bug)
  • 9f86bd97-4804-4945-b2e1-29e0dd2c794a (qwen3-e-4b-9-async, async — fails at sqs_queue_endpoint_resource_delegate.create_queue_if_not_exists with NoCredentialsError: Unable to locate credentials)

Changes

  • New: infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py mirrors the ASB shape. Creates a topic + pull subscription per endpoint, idempotent (catches AlreadyExists/NotFound).
  • Wires the new delegate into the 3 factory sites: api/dependencies.py, entrypoints/start_batch_job_orchestration.py, entrypoints/k8s_cache.py.
  • Adds an elif "num_undelivered_messages" in sqs_attributes branch in live_endpoint_resource_gateway.py so the gateway recognizes the new attribute shape.
  • Adds google-cloud-pubsub to requirements.in.
  • Unit tests mirror the ASB suite.
  • Helm chart: surfaces gcp.project_id, gcp.pubsub_topic_prefix, gcp.pubsub_subscription_prefix and pipes them as env vars to the deployments that build/manage queues.

Out of scope (deliberately)

  • Cloud Tasks as an alternative: Pub/Sub picked here because it's the closest semantic match to SQS/ASB (queue with pull subscription). The launch team may prefer Cloud Tasks for at-least-once HTTP push delivery — happy to switch direction.
  • IAM / Workload Identity wiring: deployer-side. Bind the chart's serviceAccount.annotations GCP SA with roles/pubsub.editor on the target project.
  • Observability for num_undelivered_messages: Pub/Sub doesn't expose an undelivered-count attribute synchronously. The delegate currently returns -1 — wiring Cloud Monitoring's pubsub.googleapis.com/subscription/num_undelivered_messages metric belongs in a follow-up.
  • Migration of existing endpoints: this PR only affects newly-created async endpoints. Existing failed-state SGP records aren't backfilled.
  • Benchmarks: no perf comparison yet against SQS/ASB. The Pub/Sub path is API-symmetric so behavior should be similar; will measure once a real deploy lands.

Test plan

  • Unit tests for the new delegate
  • python -m py_compile on every Python file touched
  • helm template charts/model-engine renders without new errors
  • Manual integration test on GCP cluster (deferred — needs IAM wiring on the cluster side first)

Verification commands

# Verify factory wiring import-resolves
python -c "from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import GcpPubSubQueueEndpointResourceDelegate"

# Verify chart still renders
helm template charts/model-engine -f charts/model-engine/values_sample.yaml > /dev/null

Greptile Summary

  • Adds GcpPubSubQueueEndpointResourceDelegate (topic + pull subscription per endpoint) and wires it into all three factory sites (dependencies.py, k8s_cache.py, start_batch_job_orchestration.py) so that async inference endpoints work on GCP clusters without hitting the AWS NoCredentialsError.
  • The live_endpoint_resource_gateway gains an elif branch that recognises the num_undelivered_messages attribute shape returned by the new delegate, with a >= 0 guard that treats the -1 placeholder as "unknown" rather than propagating a bogus queue depth to the autoscaler.
  • Helm chart surfaces gcp.project_id, gcp.pubsub_topic_prefix, and gcp.pubsub_subscription_prefix and injects them as env vars for GCP-provider deployments; GCP_PROJECT_ID is read first in the factories with a fallback to infra_config().gcp_project_id.

Confidence Score: 3/5

Safe to merge for feature completeness, but the topic-leak bug in delete_queue should be addressed before production rollout.

One P1 defect: when subscription deletion fails with a non-NotFound error, the exception propagates immediately and the topic is never cleaned up, causing a GCP resource leak. All other prior concerns from the review thread have been addressed in this version of the code.

model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py — specifically the delete_queue method's two-phase error handling.

Important Files Changed

Filename Overview
model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py New GCP Pub/Sub delegate implementing QueueEndpointResourceDelegate. Has a topic-leak bug in delete_queue when subscription deletion fails with a non-NotFound error (topic is never cleaned up); topic/subscription prefixes and client lifecycle are handled correctly.
model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py Added elif branch for num_undelivered_messages with a proper >= 0 guard to skip the -1 placeholder from the Pub/Sub delegate; no issues found.
model-engine/model_engine_server/api/dependencies.py Wires GcpPubSubQueueEndpointResourceDelegate for gcp cloud_provider; reads GCP_PROJECT_ID from env first then falls back to infra_config, matching the established SQS_PROFILE pattern.
charts/model-engine/templates/_helpers.tpl Conditionally injects GCP_PROJECT_ID, PUBSUB_TOPIC_PREFIX, PUBSUB_SUBSCRIPTION_PREFIX env vars for gcp cloud_provider deployments; logic looks correct.
model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py Comprehensive unit test suite covering happy paths, AlreadyExists idempotency, error propagation, call ordering, and queue attribute shape; no issues.

Sequence Diagram

sequenceDiagram
    participant API as API / k8s_cache
    participant Delegate as GcpPubSubQueueEndpointResourceDelegate
    participant PubSub as GCP Pub/Sub API

    Note over API,PubSub: Async endpoint creation
    API->>Delegate: create_queue_if_not_exists(endpoint_id)
    Delegate->>PubSub: "create_topic(projects/{id}/topics/{prefix}{endpoint_id})"
    PubSub-->>Delegate: OK / AlreadyExists (ignored)
    Delegate->>PubSub: create_subscription(name, topic, ack_deadline)
    PubSub-->>Delegate: OK / AlreadyExists → update_subscription
    Delegate-->>API: "QueueInfo(queue_name, queue_url=None)"

    Note over API,PubSub: Async endpoint deletion
    API->>Delegate: delete_queue(endpoint_id)
    Delegate->>PubSub: delete_subscription(subscription_path)
    PubSub-->>Delegate: OK / NotFound (logged)
    Note over Delegate: Non-NotFound GoogleAPIError raises here,<br/>skipping delete_topic below (topic leak)
    Delegate->>PubSub: delete_topic(topic_path)
    PubSub-->>Delegate: OK / NotFound (logged)

    Note over API,PubSub: Queue attribute polling
    API->>Delegate: get_queue_attributes(endpoint_id)
    Delegate-->>API: "{name, num_undelivered_messages: -1}"
    API->>API: "gcp_count >= 0 check → skip assignment (no autoscaler update)"
Loading

Fix All in Cursor Fix All in Claude Code Fix All in Codex

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py:100-118
**Topic orphaned when subscription deletion raises a non-`NotFound` error**

When `delete_subscription` fails with any `GoogleAPIError` that isn't `NotFound` (e.g., a transient network error or `PermissionDenied`), the method wraps and re-raises the exception immediately — the `delete_topic` block is never executed. The GCP topic is then orphaned (topic exists, subscription is gone or stale), and subsequent `create_queue_if_not_exists` calls will silently succeed on the topic but fail trying to create the subscription. Consider attempting topic deletion regardless of subscription-deletion outcome, collecting both errors before deciding whether to raise.

Reviews (3): Last reviewed commit: "fix(model-engine): read GCP_PROJECT_ID e..." | Re-trigger Greptile

Greptile also left 1 inline comment on this PR.

Three small fixes uncovered while diagnosing a stuck endpoint build on
GKE running model-engine 798747b…:

1. service_template_config_map.yaml: HPA `type: Pods` was paired with
   `target.type: Value`, which is invalid for HPA v2. Kubernetes rejects
   the metric (status: `InvalidMetricSourceType`) and falls back to
   `minReplicas`. Switch to the correct `AverageValue`.

2. service_template_config_map.yaml: `virtual-service.yaml` and
   `destination-rule.yaml` are gated on `Values.virtualservice.enabled`
   and `Values.destinationrule.enabled` respectively, but the runtime
   code (`k8s_endpoint_resource_delegate.py:_create_or_update_resources`)
   reads `config.values.launch.istio_enabled` and unconditionally calls
   `load_k8s_yaml("virtual-service.yaml", …)` / `destination-rule.yaml`
   whenever that flag is true. When the two flags disagree, the build
   task crashes with `FileNotFoundError` and the endpoint never reaches
   READY (SGP reports `deployment_timeout`).

   Couple the chart gating to the runtime flag — the templates are now
   emitted when *either* `virtualservice.enabled` / `destinationrule.enabled`
   *or* `config.values.launch.istio_enabled` is true. Existing operators
   who explicitly set the chart flags see no change.

3. endpoint_builder_deployment.yaml: `readinessProbe` hardcoded
   `bash -c 'test -f /tmp/readyz'`. Minimal-base images (e.g. the
   798747b… build) no longer ship `bash`, so the probe permanently
   errors with `executable file not found in $PATH` and the pod stays
   `0/1` Ready, which times out `helm --wait` at 1200s and stalls the
   whole HelmRelease. Make the probe overridable via
   `endpointBuilder.readinessProbe`; default behavior unchanged.

Render-verified with `helm template`:
- HPA target.type renders as `AverageValue`.
- VS / DR templates appear when either flag is true; absent otherwise.
- Default readinessProbe still uses `bash`; override via values works.
…ync endpoints on GCP

Async inference endpoints fail on GCP clusters with NoCredentialsError
because the codebase only supports SQS / ASB / OnPrem queue delegates.
This wires a Pub/Sub-based delegate so cloud_provider=gcp can create and
manage the queues that async workers consume from.

Affects: launch namespace queue resource creation for async deploys.
@postevanus-scale postevanus-scale requested a review from a team May 11, 2026 19:06
@postevanus-scale postevanus-scale marked this pull request as ready for review May 11, 2026 19:06
Comment thread model-engine/model_engine_server/api/dependencies.py
Copy link
Copy Markdown
Collaborator

@lilyz-ai lilyz-ai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please address greptile comments and fix ci tests.

…bsub delegate

- Cache PublisherClient/SubscriberClient in __init__ (avoid per-call gRPC handshake)
- Refresh ack_deadline on AlreadyExists in create_subscription
- Wrap non-NotFound errors in EndpointResourceInfraException on delete
- Validate project_id at construction time (fail loud on misconfig)
- Either thread topic_prefix/subscription_prefix or drop unused args (worker A's call)
- live_endpoint_resource_gateway: guard num_undelivered_messages=-1 sentinel
- start_batch_job_orchestration: add gcp branch for task-queue gateway routing
- Tests: assert subscription-deleted-before-topic order
- Helm: hoist \$gcp_cloud_provider local variable
- Fix: apply black formatting to pass CI formatting check
Comment thread model-engine/model_engine_server/api/dependencies.py Outdated
Addresses Greptile P1 on PR #827:
the Helm chart injects GCP_PROJECT_ID as a pod env var (from .Values.gcp.project_id),
but the prior code only read it from infra_config().gcp_project_id (a different source
rendered from .Values.config.values.infra). On any GCP cluster that follows the sample
values, infra_config().gcp_project_id was None and the delegate's project_id guard always
raised ValueError at startup.

Mirror the SQS_PROFILE pattern (os.getenv("SQS_PROFILE", hmi_config.sqs_profile)) in all
three factory sites:
- api/dependencies.py
- entrypoints/start_batch_job_orchestration.py
- entrypoints/k8s_cache.py

Also: black --config .black.toml + isort to clear the run_unit_tests_server CI check
(formatting drift introduced by the previous commits).
Comment on lines +100 to +118
try:
self._subscriber.delete_subscription(subscription=subscription_path)
except gcp_exceptions.NotFound:
logger.info(
f"Could not find Pub/Sub subscription {subscription_path} for endpoint {endpoint_id}"
)
except gcp_exceptions.GoogleAPIError as e:
raise EndpointResourceInfraException(
f"Failed to delete Pub/Sub subscription {subscription_path} for endpoint {endpoint_id}: {e}"
) from e

try:
self._publisher.delete_topic(topic=topic_path)
except gcp_exceptions.NotFound:
logger.info(f"Could not find Pub/Sub topic {topic_path} for endpoint {endpoint_id}")
except gcp_exceptions.GoogleAPIError as e:
raise EndpointResourceInfraException(
f"Failed to delete Pub/Sub topic {topic_path} for endpoint {endpoint_id}: {e}"
) from e
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Topic orphaned when subscription deletion raises a non-NotFound error

When delete_subscription fails with any GoogleAPIError that isn't NotFound (e.g., a transient network error or PermissionDenied), the method wraps and re-raises the exception immediately — the delete_topic block is never executed. The GCP topic is then orphaned (topic exists, subscription is gone or stale), and subsequent create_queue_if_not_exists calls will silently succeed on the topic but fail trying to create the subscription. Consider attempting topic deletion regardless of subscription-deletion outcome, collecting both errors before deciding whether to raise.

Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py
Line: 100-118

Comment:
**Topic orphaned when subscription deletion raises a non-`NotFound` error**

When `delete_subscription` fails with any `GoogleAPIError` that isn't `NotFound` (e.g., a transient network error or `PermissionDenied`), the method wraps and re-raises the exception immediately — the `delete_topic` block is never executed. The GCP topic is then orphaned (topic exists, subscription is gone or stale), and subsequent `create_queue_if_not_exists` calls will silently succeed on the topic but fail trying to create the subscription. Consider attempting topic deletion regardless of subscription-deletion outcome, collecting both errors before deciding whether to raise.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Claude Code Fix in Codex

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants