feat(model-engine): add GcpPubSubQueueEndpointResourceDelegate for async endpoints on GCP#827
feat(model-engine): add GcpPubSubQueueEndpointResourceDelegate for async endpoints on GCP#827postevanus-scale wants to merge 4 commits into
Conversation
Three small fixes uncovered while diagnosing a stuck endpoint build on GKE running model-engine 798747b…: 1. service_template_config_map.yaml: HPA `type: Pods` was paired with `target.type: Value`, which is invalid for HPA v2. Kubernetes rejects the metric (status: `InvalidMetricSourceType`) and falls back to `minReplicas`. Switch to the correct `AverageValue`. 2. service_template_config_map.yaml: `virtual-service.yaml` and `destination-rule.yaml` are gated on `Values.virtualservice.enabled` and `Values.destinationrule.enabled` respectively, but the runtime code (`k8s_endpoint_resource_delegate.py:_create_or_update_resources`) reads `config.values.launch.istio_enabled` and unconditionally calls `load_k8s_yaml("virtual-service.yaml", …)` / `destination-rule.yaml` whenever that flag is true. When the two flags disagree, the build task crashes with `FileNotFoundError` and the endpoint never reaches READY (SGP reports `deployment_timeout`). Couple the chart gating to the runtime flag — the templates are now emitted when *either* `virtualservice.enabled` / `destinationrule.enabled` *or* `config.values.launch.istio_enabled` is true. Existing operators who explicitly set the chart flags see no change. 3. endpoint_builder_deployment.yaml: `readinessProbe` hardcoded `bash -c 'test -f /tmp/readyz'`. Minimal-base images (e.g. the 798747b… build) no longer ship `bash`, so the probe permanently errors with `executable file not found in $PATH` and the pod stays `0/1` Ready, which times out `helm --wait` at 1200s and stalls the whole HelmRelease. Make the probe overridable via `endpointBuilder.readinessProbe`; default behavior unchanged. Render-verified with `helm template`: - HPA target.type renders as `AverageValue`. - VS / DR templates appear when either flag is true; absent otherwise. - Default readinessProbe still uses `bash`; override via values works.
…ync endpoints on GCP Async inference endpoints fail on GCP clusters with NoCredentialsError because the codebase only supports SQS / ASB / OnPrem queue delegates. This wires a Pub/Sub-based delegate so cloud_provider=gcp can create and manage the queues that async workers consume from. Affects: launch namespace queue resource creation for async deploys.
lilyz-ai
left a comment
There was a problem hiding this comment.
Please address greptile comments and fix ci tests.
…bsub delegate - Cache PublisherClient/SubscriberClient in __init__ (avoid per-call gRPC handshake) - Refresh ack_deadline on AlreadyExists in create_subscription - Wrap non-NotFound errors in EndpointResourceInfraException on delete - Validate project_id at construction time (fail loud on misconfig) - Either thread topic_prefix/subscription_prefix or drop unused args (worker A's call) - live_endpoint_resource_gateway: guard num_undelivered_messages=-1 sentinel - start_batch_job_orchestration: add gcp branch for task-queue gateway routing - Tests: assert subscription-deleted-before-topic order - Helm: hoist \$gcp_cloud_provider local variable - Fix: apply black formatting to pass CI formatting check
Addresses Greptile P1 on PR #827: the Helm chart injects GCP_PROJECT_ID as a pod env var (from .Values.gcp.project_id), but the prior code only read it from infra_config().gcp_project_id (a different source rendered from .Values.config.values.infra). On any GCP cluster that follows the sample values, infra_config().gcp_project_id was None and the delegate's project_id guard always raised ValueError at startup. Mirror the SQS_PROFILE pattern (os.getenv("SQS_PROFILE", hmi_config.sqs_profile)) in all three factory sites: - api/dependencies.py - entrypoints/start_batch_job_orchestration.py - entrypoints/k8s_cache.py Also: black --config .black.toml + isort to clear the run_unit_tests_server CI check (formatting drift introduced by the previous commits).
| try: | ||
| self._subscriber.delete_subscription(subscription=subscription_path) | ||
| except gcp_exceptions.NotFound: | ||
| logger.info( | ||
| f"Could not find Pub/Sub subscription {subscription_path} for endpoint {endpoint_id}" | ||
| ) | ||
| except gcp_exceptions.GoogleAPIError as e: | ||
| raise EndpointResourceInfraException( | ||
| f"Failed to delete Pub/Sub subscription {subscription_path} for endpoint {endpoint_id}: {e}" | ||
| ) from e | ||
|
|
||
| try: | ||
| self._publisher.delete_topic(topic=topic_path) | ||
| except gcp_exceptions.NotFound: | ||
| logger.info(f"Could not find Pub/Sub topic {topic_path} for endpoint {endpoint_id}") | ||
| except gcp_exceptions.GoogleAPIError as e: | ||
| raise EndpointResourceInfraException( | ||
| f"Failed to delete Pub/Sub topic {topic_path} for endpoint {endpoint_id}: {e}" | ||
| ) from e |
There was a problem hiding this comment.
Topic orphaned when subscription deletion raises a non-
NotFound error
When delete_subscription fails with any GoogleAPIError that isn't NotFound (e.g., a transient network error or PermissionDenied), the method wraps and re-raises the exception immediately — the delete_topic block is never executed. The GCP topic is then orphaned (topic exists, subscription is gone or stale), and subsequent create_queue_if_not_exists calls will silently succeed on the topic but fail trying to create the subscription. Consider attempting topic deletion regardless of subscription-deletion outcome, collecting both errors before deciding whether to raise.
Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py
Line: 100-118
Comment:
**Topic orphaned when subscription deletion raises a non-`NotFound` error**
When `delete_subscription` fails with any `GoogleAPIError` that isn't `NotFound` (e.g., a transient network error or `PermissionDenied`), the method wraps and re-raises the exception immediately — the `delete_topic` block is never executed. The GCP topic is then orphaned (topic exists, subscription is gone or stale), and subsequent `create_queue_if_not_exists` calls will silently succeed on the topic but fail trying to create the subscription. Consider attempting topic deletion regardless of subscription-deletion outcome, collecting both errors before deciding whether to raise.
How can I resolve this? If you propose a fix, please make it concise.
Summary
Adds a Google Cloud Pub/Sub-backed implementation of
QueueEndpointResourceDelegateso thatcloud_provider=gcpclusters can deploy async inference endpoints.Before this PR, async deploys on GCP fail at
live_endpoint_resource_gateway.create_queuebecause the only wired delegate is SQS (AWS), which raisesNoCredentialsErroron a GCP node. We see this consistently ongke_scale-dev-mofa_asia-northeast3_sgp-pmodev-kubernetes-cluster.Repro
Temporal workflow IDs from a recent attempt on the GCP cluster:
1020f4b2-e55f-4b6e-acf3-7c0090a4c5c2(qwen3-e-4b-7, sync — fails at workflow create_activity, separate bug)9f86bd97-4804-4945-b2e1-29e0dd2c794a(qwen3-e-4b-9-async, async — fails atsqs_queue_endpoint_resource_delegate.create_queue_if_not_existswithNoCredentialsError: Unable to locate credentials)Changes
infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.pymirrors the ASB shape. Creates a topic + pull subscription per endpoint, idempotent (catchesAlreadyExists/NotFound).api/dependencies.py,entrypoints/start_batch_job_orchestration.py,entrypoints/k8s_cache.py.elif "num_undelivered_messages" in sqs_attributesbranch inlive_endpoint_resource_gateway.pyso the gateway recognizes the new attribute shape.google-cloud-pubsubtorequirements.in.gcp.project_id,gcp.pubsub_topic_prefix,gcp.pubsub_subscription_prefixand pipes them as env vars to the deployments that build/manage queues.Out of scope (deliberately)
serviceAccount.annotationsGCP SA withroles/pubsub.editoron the target project.num_undelivered_messages: Pub/Sub doesn't expose an undelivered-count attribute synchronously. The delegate currently returns-1— wiring Cloud Monitoring'spubsub.googleapis.com/subscription/num_undelivered_messagesmetric belongs in a follow-up.Test plan
python -m py_compileon every Python file touchedhelm template charts/model-enginerenders without new errorsVerification commands
Greptile Summary
GcpPubSubQueueEndpointResourceDelegate(topic + pull subscription per endpoint) and wires it into all three factory sites (dependencies.py,k8s_cache.py,start_batch_job_orchestration.py) so that async inference endpoints work on GCP clusters without hitting the AWSNoCredentialsError.live_endpoint_resource_gatewaygains anelifbranch that recognises thenum_undelivered_messagesattribute shape returned by the new delegate, with a>= 0guard that treats the-1placeholder as "unknown" rather than propagating a bogus queue depth to the autoscaler.gcp.project_id,gcp.pubsub_topic_prefix, andgcp.pubsub_subscription_prefixand injects them as env vars for GCP-provider deployments;GCP_PROJECT_IDis read first in the factories with a fallback toinfra_config().gcp_project_id.Confidence Score: 3/5
Safe to merge for feature completeness, but the topic-leak bug in delete_queue should be addressed before production rollout.
One P1 defect: when subscription deletion fails with a non-NotFound error, the exception propagates immediately and the topic is never cleaned up, causing a GCP resource leak. All other prior concerns from the review thread have been addressed in this version of the code.
model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py — specifically the delete_queue method's two-phase error handling.
Important Files Changed
Sequence Diagram
sequenceDiagram participant API as API / k8s_cache participant Delegate as GcpPubSubQueueEndpointResourceDelegate participant PubSub as GCP Pub/Sub API Note over API,PubSub: Async endpoint creation API->>Delegate: create_queue_if_not_exists(endpoint_id) Delegate->>PubSub: "create_topic(projects/{id}/topics/{prefix}{endpoint_id})" PubSub-->>Delegate: OK / AlreadyExists (ignored) Delegate->>PubSub: create_subscription(name, topic, ack_deadline) PubSub-->>Delegate: OK / AlreadyExists → update_subscription Delegate-->>API: "QueueInfo(queue_name, queue_url=None)" Note over API,PubSub: Async endpoint deletion API->>Delegate: delete_queue(endpoint_id) Delegate->>PubSub: delete_subscription(subscription_path) PubSub-->>Delegate: OK / NotFound (logged) Note over Delegate: Non-NotFound GoogleAPIError raises here,<br/>skipping delete_topic below (topic leak) Delegate->>PubSub: delete_topic(topic_path) PubSub-->>Delegate: OK / NotFound (logged) Note over API,PubSub: Queue attribute polling API->>Delegate: get_queue_attributes(endpoint_id) Delegate-->>API: "{name, num_undelivered_messages: -1}" API->>API: "gcp_count >= 0 check → skip assignment (no autoscaler update)"Prompt To Fix All With AI
Reviews (3): Last reviewed commit: "fix(model-engine): read GCP_PROJECT_ID e..." | Re-trigger Greptile