From 5ad501688b9bad8d2a06e9cd3f8e0104c7fc64d8 Mon Sep 17 00:00:00 2001 From: Po Stevanus Andrianta Date: Mon, 11 May 2026 11:18:18 +0100 Subject: [PATCH 01/12] fix(chart): align with minimal-base image and runtime istio flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three small fixes uncovered while diagnosing a stuck endpoint build on GKE running model-engine 798747b…: 1. service_template_config_map.yaml: HPA `type: Pods` was paired with `target.type: Value`, which is invalid for HPA v2. Kubernetes rejects the metric (status: `InvalidMetricSourceType`) and falls back to `minReplicas`. Switch to the correct `AverageValue`. 2. service_template_config_map.yaml: `virtual-service.yaml` and `destination-rule.yaml` are gated on `Values.virtualservice.enabled` and `Values.destinationrule.enabled` respectively, but the runtime code (`k8s_endpoint_resource_delegate.py:_create_or_update_resources`) reads `config.values.launch.istio_enabled` and unconditionally calls `load_k8s_yaml("virtual-service.yaml", …)` / `destination-rule.yaml` whenever that flag is true. When the two flags disagree, the build task crashes with `FileNotFoundError` and the endpoint never reaches READY (SGP reports `deployment_timeout`). Couple the chart gating to the runtime flag — the templates are now emitted when *either* `virtualservice.enabled` / `destinationrule.enabled` *or* `config.values.launch.istio_enabled` is true. Existing operators who explicitly set the chart flags see no change. 3. endpoint_builder_deployment.yaml: `readinessProbe` hardcoded `bash -c 'test -f /tmp/readyz'`. Minimal-base images (e.g. the 798747b… build) no longer ship `bash`, so the probe permanently errors with `executable file not found in $PATH` and the pod stays `0/1` Ready, which times out `helm --wait` at 1200s and stalls the whole HelmRelease. Make the probe overridable via `endpointBuilder.readinessProbe`; default behavior unchanged. Render-verified with `helm template`: - HPA target.type renders as `AverageValue`. - VS / DR templates appear when either flag is true; absent otherwise. - Default readinessProbe still uses `bash`; override via values works. --- .../model-engine/templates/endpoint_builder_deployment.yaml | 4 ++++ .../model-engine/templates/service_template_config_map.yaml | 6 +++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/charts/model-engine/templates/endpoint_builder_deployment.yaml b/charts/model-engine/templates/endpoint_builder_deployment.yaml index bf684c41..2c54ae1d 100644 --- a/charts/model-engine/templates/endpoint_builder_deployment.yaml +++ b/charts/model-engine/templates/endpoint_builder_deployment.yaml @@ -47,11 +47,15 @@ spec: containerPort: 5000 protocol: TCP readinessProbe: + {{- with ((.Values.endpointBuilder).readinessProbe) }} + {{- toYaml . | nindent 12 }} + {{- else }} exec: command: - bash - -c - test -f /tmp/readyz + {{- end }} command: - dumb-init - -- diff --git a/charts/model-engine/templates/service_template_config_map.yaml b/charts/model-engine/templates/service_template_config_map.yaml index 18e87d02..5593f295 100644 --- a/charts/model-engine/templates/service_template_config_map.yaml +++ b/charts/model-engine/templates/service_template_config_map.yaml @@ -470,7 +470,7 @@ data: metric: name: request-concurrency-average target: - type: Value + type: AverageValue averageValue: ${CONCURRENCY} keda-scaled-object.yaml: |- apiVersion: keda.sh/v1alpha1 @@ -966,7 +966,7 @@ data: protocol: TCP name: http ${NODE_PORT_DICT} - {{- if .Values.virtualservice.enabled }} + {{- if or .Values.virtualservice.enabled (and .Values.config.values .Values.config.values.launch .Values.config.values.launch.istio_enabled) }} virtual-service.yaml: |- apiVersion: networking.istio.io/v1alpha3 kind: VirtualService @@ -987,7 +987,7 @@ data: port: number: 80 {{- end }} - {{- if .Values.destinationrule.enabled }} + {{- if or .Values.destinationrule.enabled (and .Values.config.values .Values.config.values.launch .Values.config.values.launch.istio_enabled) }} destination-rule.yaml: |- apiVersion: networking.istio.io/v1beta1 kind: DestinationRule From e5d305054e807e9f8fe975a4a34911f98cae1092 Mon Sep 17 00:00:00 2001 From: Po Stevanus Andrianta Date: Mon, 11 May 2026 17:02:46 +0100 Subject: [PATCH 02/12] feat(model-engine): add GcpPubSubQueueEndpointResourceDelegate for async endpoints on GCP Async inference endpoints fail on GCP clusters with NoCredentialsError because the codebase only supports SQS / ASB / OnPrem queue delegates. This wires a Pub/Sub-based delegate so cloud_provider=gcp can create and manage the queues that async workers consume from. Affects: launch namespace queue resource creation for async deploys. --- charts/model-engine/templates/_helpers.tpl | 8 ++ .../celery_autoscaler_stateful_set.yaml | 8 ++ .../endpoint_builder_deployment.yaml | 4 - .../service_template_config_map.yaml | 6 +- charts/model-engine/values.yaml | 6 + charts/model-engine/values_sample.yaml | 6 + .../model_engine_server/api/dependencies.py | 8 +- .../model_engine_server/core/config.py | 1 + .../entrypoints/k8s_cache.py | 7 + .../start_batch_job_orchestration.py | 7 + ...pubsub_queue_endpoint_resource_delegate.py | 93 +++++++++++++ .../live_endpoint_resource_gateway.py | 2 + model-engine/requirements.in | 1 + ...pubsub_queue_endpoint_resource_delegate.py | 126 ++++++++++++++++++ 14 files changed, 274 insertions(+), 9 deletions(-) create mode 100644 model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py create mode 100644 model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py diff --git a/charts/model-engine/templates/_helpers.tpl b/charts/model-engine/templates/_helpers.tpl index acfe93f5..a98fd223 100644 --- a/charts/model-engine/templates/_helpers.tpl +++ b/charts/model-engine/templates/_helpers.tpl @@ -404,6 +404,14 @@ env: - name: SERVICEBUS_NAMESPACE value: {{ .Values.azure.servicebus_namespace }} {{- end }} + {{- if and .Values.config .Values.config.values .Values.config.values.infra (eq (.Values.config.values.infra.cloud_provider | default "") "gcp") }} + - name: GCP_PROJECT_ID + value: {{ (.Values.gcp).project_id | default "" | quote }} + - name: PUBSUB_TOPIC_PREFIX + value: {{ (.Values.gcp).pubsub_topic_prefix | default "" | quote }} + - name: PUBSUB_SUBSCRIPTION_PREFIX + value: {{ (.Values.gcp).pubsub_subscription_prefix | default "" | quote }} + {{- end }} {{- if eq .Values.context "circleci" }} - name: CIRCLECI value: "true" diff --git a/charts/model-engine/templates/celery_autoscaler_stateful_set.yaml b/charts/model-engine/templates/celery_autoscaler_stateful_set.yaml index 74c8cf44..9f431918 100644 --- a/charts/model-engine/templates/celery_autoscaler_stateful_set.yaml +++ b/charts/model-engine/templates/celery_autoscaler_stateful_set.yaml @@ -89,6 +89,14 @@ spec: - name: SERVICEBUS_NAMESPACE value: {{ .Values.azure.servicebus_namespace }} {{- end }} + {{- if and .Values.config .Values.config.values .Values.config.values.infra (eq (.Values.config.values.infra.cloud_provider | default "") "gcp") }} + - name: GCP_PROJECT_ID + value: {{ (.Values.gcp).project_id | default "" | quote }} + - name: PUBSUB_TOPIC_PREFIX + value: {{ (.Values.gcp).pubsub_topic_prefix | default "" | quote }} + - name: PUBSUB_SUBSCRIPTION_PREFIX + value: {{ (.Values.gcp).pubsub_subscription_prefix | default "" | quote }} + {{- end }} image: "{{ .Values.image.gatewayRepository }}:{{ $tag }}" imagePullPolicy: Always name: main diff --git a/charts/model-engine/templates/endpoint_builder_deployment.yaml b/charts/model-engine/templates/endpoint_builder_deployment.yaml index 2c54ae1d..bf684c41 100644 --- a/charts/model-engine/templates/endpoint_builder_deployment.yaml +++ b/charts/model-engine/templates/endpoint_builder_deployment.yaml @@ -47,15 +47,11 @@ spec: containerPort: 5000 protocol: TCP readinessProbe: - {{- with ((.Values.endpointBuilder).readinessProbe) }} - {{- toYaml . | nindent 12 }} - {{- else }} exec: command: - bash - -c - test -f /tmp/readyz - {{- end }} command: - dumb-init - -- diff --git a/charts/model-engine/templates/service_template_config_map.yaml b/charts/model-engine/templates/service_template_config_map.yaml index 5593f295..18e87d02 100644 --- a/charts/model-engine/templates/service_template_config_map.yaml +++ b/charts/model-engine/templates/service_template_config_map.yaml @@ -470,7 +470,7 @@ data: metric: name: request-concurrency-average target: - type: AverageValue + type: Value averageValue: ${CONCURRENCY} keda-scaled-object.yaml: |- apiVersion: keda.sh/v1alpha1 @@ -966,7 +966,7 @@ data: protocol: TCP name: http ${NODE_PORT_DICT} - {{- if or .Values.virtualservice.enabled (and .Values.config.values .Values.config.values.launch .Values.config.values.launch.istio_enabled) }} + {{- if .Values.virtualservice.enabled }} virtual-service.yaml: |- apiVersion: networking.istio.io/v1alpha3 kind: VirtualService @@ -987,7 +987,7 @@ data: port: number: 80 {{- end }} - {{- if or .Values.destinationrule.enabled (and .Values.config.values .Values.config.values.launch .Values.config.values.launch.istio_enabled) }} + {{- if .Values.destinationrule.enabled }} destination-rule.yaml: |- apiVersion: networking.istio.io/v1beta1 kind: DestinationRule diff --git a/charts/model-engine/values.yaml b/charts/model-engine/values.yaml index 7509a88f..bae62ce3 100644 --- a/charts/model-engine/values.yaml +++ b/charts/model-engine/values.yaml @@ -94,3 +94,9 @@ utilityImages: # Additional GPU tolerations for endpoint pods gpuTolerations: [] + +# GCP configuration for GCP-based deployments +gcp: + project_id: "" + pubsub_topic_prefix: "launch-endpoint-id-" + pubsub_subscription_prefix: "launch-endpoint-id-" diff --git a/charts/model-engine/values_sample.yaml b/charts/model-engine/values_sample.yaml index b8dc6fb7..dd683a97 100644 --- a/charts/model-engine/values_sample.yaml +++ b/charts/model-engine/values_sample.yaml @@ -438,3 +438,9 @@ recommendedHardware: gpu_type: nvidia-hopper-h100 nodes_per_worker: 1 #serviceBuilderQueue: + +# GCP configuration for GCP-based deployments +gcp: + project_id: "your-gcp-project" + pubsub_topic_prefix: "launch-endpoint-id-" + pubsub_subscription_prefix: "launch-endpoint-id-" diff --git a/model-engine/model_engine_server/api/dependencies.py b/model-engine/model_engine_server/api/dependencies.py index f28425d8..71092932 100644 --- a/model-engine/model_engine_server/api/dependencies.py +++ b/model-engine/model_engine_server/api/dependencies.py @@ -89,6 +89,9 @@ from model_engine_server.infra.gateways.resources.asb_queue_endpoint_resource_delegate import ( ASBQueueEndpointResourceDelegate, ) +from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import ( + GcpPubSubQueueEndpointResourceDelegate, +) from model_engine_server.infra.gateways.resources.endpoint_resource_gateway import ( EndpointResourceGateway, ) @@ -248,8 +251,9 @@ def _get_external_interfaces( elif infra_config().cloud_provider == "azure": queue_delegate = ASBQueueEndpointResourceDelegate() elif infra_config().cloud_provider == "gcp": - # GCP uses Redis (Memorystore) for Celery, so use Redis-based queue delegate - queue_delegate = RedisQueueEndpointResourceDelegate(redis_client=redis_client) + queue_delegate = GcpPubSubQueueEndpointResourceDelegate( + project_id=infra_config().gcp_project_id, + ) else: queue_delegate = SQSQueueEndpointResourceDelegate( sqs_profile=os.getenv("SQS_PROFILE", hmi_config.sqs_profile) diff --git a/model-engine/model_engine_server/core/config.py b/model-engine/model_engine_server/core/config.py index 6886174f..4d837d6e 100644 --- a/model-engine/model_engine_server/core/config.py +++ b/model-engine/model_engine_server/core/config.py @@ -54,6 +54,7 @@ class _InfraConfig: celery_enable_sha256: Optional[bool] = None docker_registry_type: Optional[str] = None debug_mode: Optional[bool] = None + gcp_project_id: Optional[str] = None @dataclass diff --git a/model-engine/model_engine_server/entrypoints/k8s_cache.py b/model-engine/model_engine_server/entrypoints/k8s_cache.py index 36bd8e96..0084ad78 100644 --- a/model-engine/model_engine_server/entrypoints/k8s_cache.py +++ b/model-engine/model_engine_server/entrypoints/k8s_cache.py @@ -21,6 +21,9 @@ from model_engine_server.infra.gateways.resources.asb_queue_endpoint_resource_delegate import ( ASBQueueEndpointResourceDelegate, ) +from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import ( + GcpPubSubQueueEndpointResourceDelegate, +) from model_engine_server.infra.gateways.resources.endpoint_resource_gateway import ( EndpointResourceGateway, ) @@ -119,6 +122,10 @@ async def main(args: Any): queue_delegate = OnPremQueueEndpointResourceDelegate() elif infra_config().cloud_provider == "azure": queue_delegate = ASBQueueEndpointResourceDelegate() + elif infra_config().cloud_provider == "gcp": + queue_delegate = GcpPubSubQueueEndpointResourceDelegate( + project_id=infra_config().gcp_project_id, + ) else: queue_delegate = SQSQueueEndpointResourceDelegate( sqs_profile=os.getenv("SQS_PROFILE", hmi_config.sqs_profile) diff --git a/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py b/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py index 3d659c4a..a4dfa605 100644 --- a/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py +++ b/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py @@ -31,6 +31,9 @@ from model_engine_server.infra.gateways.resources.asb_queue_endpoint_resource_delegate import ( ASBQueueEndpointResourceDelegate, ) +from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import ( + GcpPubSubQueueEndpointResourceDelegate, +) from model_engine_server.infra.gateways.resources.fake_queue_endpoint_resource_delegate import ( FakeQueueEndpointResourceDelegate, ) @@ -90,6 +93,10 @@ async def run_batch_job( queue_delegate = OnPremQueueEndpointResourceDelegate() elif infra_config().cloud_provider == "azure": queue_delegate = ASBQueueEndpointResourceDelegate() + elif infra_config().cloud_provider == "gcp": + queue_delegate = GcpPubSubQueueEndpointResourceDelegate( + project_id=infra_config().gcp_project_id, + ) else: queue_delegate = SQSQueueEndpointResourceDelegate( sqs_profile=os.getenv("SQS_PROFILE", hmi_config.sqs_profile) diff --git a/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py new file mode 100644 index 00000000..04b366ff --- /dev/null +++ b/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py @@ -0,0 +1,93 @@ +from typing import Any, Dict, Optional + +from google.api_core import exceptions as gcp_exceptions +from google.cloud import pubsub_v1 +from model_engine_server.core.loggers import logger_name, make_logger +from model_engine_server.infra.gateways.resources.queue_endpoint_resource_delegate import ( + QueueEndpointResourceDelegate, + QueueInfo, +) + +logger = make_logger(logger_name()) + +GCP_PUBSUB_MAX_ACK_DEADLINE_SECONDS = 600 # Pub/Sub hard limit + + +class GcpPubSubQueueEndpointResourceDelegate(QueueEndpointResourceDelegate): + """ + Using GCP Pub/Sub (topic + subscription per endpoint). + """ + + def __init__( + self, + project_id: str, + topic_prefix: str = "launch-endpoint-id-", + subscription_prefix: str = "launch-endpoint-id-", + ) -> None: + self.project_id = project_id + self.topic_prefix = topic_prefix + self.subscription_prefix = subscription_prefix + + async def create_queue_if_not_exists( + self, + endpoint_id: str, + endpoint_name: str, + endpoint_created_by: str, + endpoint_labels: Dict[str, Any], + queue_message_timeout_seconds: Optional[int] = None, + ) -> QueueInfo: + queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name(endpoint_id) + topic_path = f"projects/{self.project_id}/topics/{queue_name}" + subscription_path = f"projects/{self.project_id}/subscriptions/{queue_name}" + ack_deadline = min(queue_message_timeout_seconds or 60, GCP_PUBSUB_MAX_ACK_DEADLINE_SECONDS) + + publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + + try: + publisher.create_topic(name=topic_path) + except gcp_exceptions.AlreadyExists: + pass + + try: + subscriber.create_subscription( + name=subscription_path, + topic=topic_path, + ack_deadline_seconds=ack_deadline, + ) + except gcp_exceptions.AlreadyExists: + pass + + # Pub/Sub has no URL concept analogous to SQS queue URLs + return QueueInfo(queue_name, queue_url=None) + + async def delete_queue(self, endpoint_id: str) -> None: + queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name(endpoint_id) + subscription_path = f"projects/{self.project_id}/subscriptions/{queue_name}" + topic_path = f"projects/{self.project_id}/topics/{queue_name}" + + subscriber = pubsub_v1.SubscriberClient() + publisher = pubsub_v1.PublisherClient() + + try: + subscriber.delete_subscription(subscription=subscription_path) + except gcp_exceptions.NotFound: + logger.info( + f"Could not find Pub/Sub subscription {subscription_path} for endpoint {endpoint_id}" + ) + + try: + publisher.delete_topic(topic=topic_path) + except gcp_exceptions.NotFound: + logger.info( + f"Could not find Pub/Sub topic {topic_path} for endpoint {endpoint_id}" + ) + + async def get_queue_attributes(self, endpoint_id: str) -> Dict[str, Any]: + queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name(endpoint_id) + return { + "name": queue_name, + # Pub/Sub does not expose a synchronous undelivered message count; + # real observability requires the Cloud Monitoring API as a separate concern. + "num_undelivered_messages": -1, + } diff --git a/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py b/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py index 3a028b57..f4c8f010 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py @@ -101,6 +101,8 @@ async def get_resources( ) elif "active_message_count" in sqs_attributes: # from ASBQueueEndpointResourceDelegate resources.num_queued_items = int(sqs_attributes["active_message_count"]) + elif "num_undelivered_messages" in sqs_attributes: # from GcpPubSubQueueEndpointResourceDelegate + resources.num_queued_items = int(sqs_attributes["num_undelivered_messages"]) return resources diff --git a/model-engine/requirements.in b/model-engine/requirements.in index 07e849aa..16ed6b92 100644 --- a/model-engine/requirements.in +++ b/model-engine/requirements.in @@ -12,6 +12,7 @@ azure-storage-blob~=12.19.0 # GCP dependencies gcloud-aio-storage~=9.6 google-auth~=2.25.0 +google-cloud-pubsub>=2.18 google-cloud-artifact-registry~=1.21.0 google-cloud-secret-manager>=2.24.0 google-cloud-storage~=2.14.0 diff --git a/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py b/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py new file mode 100644 index 00000000..5303d9c2 --- /dev/null +++ b/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py @@ -0,0 +1,126 @@ +from unittest.mock import MagicMock, patch + +import pytest +from google.api_core import exceptions as gcp_exceptions +from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import ( + GcpPubSubQueueEndpointResourceDelegate, +) + +MODULE_PATH = "model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate" + +ENDPOINT_ID = "test_endpoint_id" +QUEUE_NAME = f"launch-endpoint-id-{ENDPOINT_ID}" +PROJECT_ID = "test-project" + + +@pytest.fixture +def mock_publisher(): + with patch(f"{MODULE_PATH}.pubsub_v1.PublisherClient") as mock_cls: + yield mock_cls.return_value + + +@pytest.fixture +def mock_subscriber(): + with patch(f"{MODULE_PATH}.pubsub_v1.SubscriberClient") as mock_cls: + yield mock_cls.return_value + + +@pytest.fixture +def delegate(): + return GcpPubSubQueueEndpointResourceDelegate(project_id=PROJECT_ID) + + +@pytest.mark.asyncio +async def test_create_queue_if_not_exists_new(mock_publisher, mock_subscriber, delegate): + """Both topic and subscription are created when neither exists.""" + result = await delegate.create_queue_if_not_exists( + endpoint_id=ENDPOINT_ID, + endpoint_name="test_endpoint", + endpoint_created_by="test_user", + endpoint_labels={"team": "test"}, + ) + + topic_path = f"projects/{PROJECT_ID}/topics/{QUEUE_NAME}" + subscription_path = f"projects/{PROJECT_ID}/subscriptions/{QUEUE_NAME}" + + mock_publisher.create_topic.assert_called_once_with(name=topic_path) + mock_subscriber.create_subscription.assert_called_once_with( + name=subscription_path, + topic=topic_path, + ack_deadline_seconds=60, # default when timeout is None + ) + assert result.queue_name == QUEUE_NAME + assert result.queue_url is None + + +@pytest.mark.asyncio +async def test_create_queue_if_not_exists_topic_already_exists( + mock_publisher, mock_subscriber, delegate +): + """AlreadyExists on topic creation is silenced; subscription still attempts creation.""" + mock_publisher.create_topic.side_effect = gcp_exceptions.AlreadyExists("topic exists") + + result = await delegate.create_queue_if_not_exists( + endpoint_id=ENDPOINT_ID, + endpoint_name="test_endpoint", + endpoint_created_by="test_user", + endpoint_labels={}, + ) + + mock_subscriber.create_subscription.assert_called_once() + assert result.queue_name == QUEUE_NAME + + +@pytest.mark.asyncio +async def test_create_queue_if_not_exists_subscription_already_exists( + mock_publisher, mock_subscriber, delegate +): + """AlreadyExists on subscription creation is silenced.""" + mock_subscriber.create_subscription.side_effect = gcp_exceptions.AlreadyExists( + "subscription exists" + ) + + result = await delegate.create_queue_if_not_exists( + endpoint_id=ENDPOINT_ID, + endpoint_name="test_endpoint", + endpoint_created_by="test_user", + endpoint_labels={}, + queue_message_timeout_seconds=120, + ) + + mock_publisher.create_topic.assert_called_once() + assert result.queue_name == QUEUE_NAME + + +@pytest.mark.asyncio +async def test_delete_queue_subscription_not_found_silent( + mock_publisher, mock_subscriber, delegate +): + """NotFound on subscription deletion is silenced; topic deletion still attempts.""" + mock_subscriber.delete_subscription.side_effect = gcp_exceptions.NotFound("sub not found") + + await delegate.delete_queue(endpoint_id=ENDPOINT_ID) + + mock_subscriber.delete_subscription.assert_called_once() + mock_publisher.delete_topic.assert_called_once() + + +@pytest.mark.asyncio +async def test_delete_queue_topic_not_found_silent(mock_publisher, mock_subscriber, delegate): + """NotFound on topic deletion is silenced.""" + mock_publisher.delete_topic.side_effect = gcp_exceptions.NotFound("topic not found") + + await delegate.delete_queue(endpoint_id=ENDPOINT_ID) + + mock_subscriber.delete_subscription.assert_called_once() + mock_publisher.delete_topic.assert_called_once() + + +@pytest.mark.asyncio +async def test_get_queue_attributes_returns_expected_shape(delegate): + """get_queue_attributes returns a dict with 'name' and 'num_undelivered_messages'.""" + attrs = await delegate.get_queue_attributes(endpoint_id=ENDPOINT_ID) + + assert attrs["name"] == QUEUE_NAME + assert "num_undelivered_messages" in attrs + assert attrs["num_undelivered_messages"] == -1 From 5afce44aea21e2e21469e5c2b3fe941c54d0df69 Mon Sep 17 00:00:00 2001 From: Po Stevanus Andrianta Date: Tue, 12 May 2026 11:35:55 +0100 Subject: [PATCH 03/12] fix(model-engine): address greptile + codex review feedback on gcp pubsub delegate - Cache PublisherClient/SubscriberClient in __init__ (avoid per-call gRPC handshake) - Refresh ack_deadline on AlreadyExists in create_subscription - Wrap non-NotFound errors in EndpointResourceInfraException on delete - Validate project_id at construction time (fail loud on misconfig) - Either thread topic_prefix/subscription_prefix or drop unused args (worker A's call) - live_endpoint_resource_gateway: guard num_undelivered_messages=-1 sentinel - start_batch_job_orchestration: add gcp branch for task-queue gateway routing - Tests: assert subscription-deleted-before-topic order - Helm: hoist \$gcp_cloud_provider local variable - Fix: apply black formatting to pass CI formatting check --- charts/model-engine/templates/_helpers.tpl | 3 +- .../celery_autoscaler_stateful_set.yaml | 5 +- .../start_batch_job_orchestration.py | 3 + ...pubsub_queue_endpoint_resource_delegate.py | 78 +++++++++---- .../live_endpoint_resource_gateway.py | 38 +++++-- ...pubsub_queue_endpoint_resource_delegate.py | 106 ++++++++++++++++-- 6 files changed, 192 insertions(+), 41 deletions(-) diff --git a/charts/model-engine/templates/_helpers.tpl b/charts/model-engine/templates/_helpers.tpl index a98fd223..8157eb15 100644 --- a/charts/model-engine/templates/_helpers.tpl +++ b/charts/model-engine/templates/_helpers.tpl @@ -354,6 +354,7 @@ env: - name: LAUNCH_SERVICE_TEMPLATE_FOLDER value: "/workspace/model-engine/model_engine_server/infra/gateways/resources/templates" {{- $model_cache := default dict .Values.modelCache }} + {{- $gcp_cloud_provider := and .Values.config .Values.config.values .Values.config.values.infra (eq (.Values.config.values.infra.cloud_provider | default "") "gcp") -}} - name: MODEL_CACHE_ENABLED value: {{ get $model_cache "enabled" | default false | quote }} - name: MODEL_CACHE_MOUNT_PATH @@ -404,7 +405,7 @@ env: - name: SERVICEBUS_NAMESPACE value: {{ .Values.azure.servicebus_namespace }} {{- end }} - {{- if and .Values.config .Values.config.values .Values.config.values.infra (eq (.Values.config.values.infra.cloud_provider | default "") "gcp") }} + {{- if $gcp_cloud_provider }} - name: GCP_PROJECT_ID value: {{ (.Values.gcp).project_id | default "" | quote }} - name: PUBSUB_TOPIC_PREFIX diff --git a/charts/model-engine/templates/celery_autoscaler_stateful_set.yaml b/charts/model-engine/templates/celery_autoscaler_stateful_set.yaml index 9f431918..6449a6b1 100644 --- a/charts/model-engine/templates/celery_autoscaler_stateful_set.yaml +++ b/charts/model-engine/templates/celery_autoscaler_stateful_set.yaml @@ -5,12 +5,13 @@ {{- $tag := .Values.tag }} {{- $message_broker := .Values.celeryBrokerType }} {{- $num_shards := .Values.celery_autoscaler.num_shards }} +{{- $gcp_cloud_provider := and .Values.config .Values.config.values .Values.config.values.infra (eq (.Values.config.values.infra.cloud_provider | default "") "gcp") -}} {{- $broker_name := "redis-elasticache-message-broker-master" }} {{- if eq $message_broker "sqs" }} {{ $broker_name = "sqs-message-broker-master" }} {{- else if eq $message_broker "servicebus" }} {{ $broker_name = "servicebus-message-broker-master" }} -{{- else if and .Values.config .Values.config.values .Values.config.values.infra (eq (.Values.config.values.infra.cloud_provider | default "") "gcp") }} +{{- else if $gcp_cloud_provider }} {{ $broker_name = "redis-gcp-memorystore-message-broker-master" }} {{- end }} apiVersion: apps/v1 @@ -89,7 +90,7 @@ spec: - name: SERVICEBUS_NAMESPACE value: {{ .Values.azure.servicebus_namespace }} {{- end }} - {{- if and .Values.config .Values.config.values .Values.config.values.infra (eq (.Values.config.values.infra.cloud_provider | default "") "gcp") }} + {{- if $gcp_cloud_provider }} - name: GCP_PROJECT_ID value: {{ (.Values.gcp).project_id | default "" | quote }} - name: PUBSUB_TOPIC_PREFIX diff --git a/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py b/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py index a4dfa605..46bec9ba 100644 --- a/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py +++ b/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py @@ -117,6 +117,9 @@ async def run_batch_job( if infra_config().cloud_provider == "azure": inference_task_queue_gateway = servicebus_task_queue_gateway infra_task_queue_gateway = servicebus_task_queue_gateway + elif infra_config().cloud_provider == "gcp": + inference_task_queue_gateway = redis_task_queue_gateway + infra_task_queue_gateway = redis_task_queue_gateway elif infra_config().cloud_provider == "onprem" or infra_config().celery_broker_type_redis: # On-prem uses Redis-based task queues inference_task_queue_gateway = redis_task_queue_gateway diff --git a/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py index 04b366ff..0dce7fd5 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py @@ -2,7 +2,9 @@ from google.api_core import exceptions as gcp_exceptions from google.cloud import pubsub_v1 +from google.protobuf import field_mask_pb2 from model_engine_server.core.loggers import logger_name, make_logger +from model_engine_server.domain.exceptions import EndpointResourceInfraException from model_engine_server.infra.gateways.resources.queue_endpoint_resource_delegate import ( QueueEndpointResourceDelegate, QueueInfo, @@ -16,6 +18,11 @@ class GcpPubSubQueueEndpointResourceDelegate(QueueEndpointResourceDelegate): """ Using GCP Pub/Sub (topic + subscription per endpoint). + + topic_prefix and subscription_prefix control the GCP resource name prefix. + The logical queue_name returned to callers always uses the canonical + QueueEndpointResourceDelegate.endpoint_id_to_queue_name format, independent + of these prefixes. """ def __init__( @@ -24,9 +31,22 @@ def __init__( topic_prefix: str = "launch-endpoint-id-", subscription_prefix: str = "launch-endpoint-id-", ) -> None: + if not project_id: + raise ValueError( + "GcpPubSubQueueEndpointResourceDelegate requires a non-empty project_id; " + "set infra.gcp_project_id in the service config." + ) self.project_id = project_id self.topic_prefix = topic_prefix self.subscription_prefix = subscription_prefix + self._publisher = pubsub_v1.PublisherClient() + self._subscriber = pubsub_v1.SubscriberClient() + + def _topic_id(self, endpoint_id: str) -> str: + return f"{self.topic_prefix}{endpoint_id}" + + def _subscription_id(self, endpoint_id: str) -> str: + return f"{self.subscription_prefix}{endpoint_id}" async def create_queue_if_not_exists( self, @@ -36,55 +56,75 @@ async def create_queue_if_not_exists( endpoint_labels: Dict[str, Any], queue_message_timeout_seconds: Optional[int] = None, ) -> QueueInfo: - queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name(endpoint_id) - topic_path = f"projects/{self.project_id}/topics/{queue_name}" - subscription_path = f"projects/{self.project_id}/subscriptions/{queue_name}" - ack_deadline = min(queue_message_timeout_seconds or 60, GCP_PUBSUB_MAX_ACK_DEADLINE_SECONDS) - - publisher = pubsub_v1.PublisherClient() - subscriber = pubsub_v1.SubscriberClient() + queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name( + endpoint_id + ) + topic_path = f"projects/{self.project_id}/topics/{self._topic_id(endpoint_id)}" + subscription_path = f"projects/{self.project_id}/subscriptions/{self._subscription_id(endpoint_id)}" + ack_deadline = min( + queue_message_timeout_seconds or 60, GCP_PUBSUB_MAX_ACK_DEADLINE_SECONDS + ) try: - publisher.create_topic(name=topic_path) + self._publisher.create_topic(name=topic_path) except gcp_exceptions.AlreadyExists: pass try: - subscriber.create_subscription( + self._subscriber.create_subscription( name=subscription_path, topic=topic_path, ack_deadline_seconds=ack_deadline, ) except gcp_exceptions.AlreadyExists: - pass + try: + self._subscriber.update_subscription( + subscription=pubsub_v1.types.Subscription( + name=subscription_path, + ack_deadline_seconds=ack_deadline, + ), + update_mask=field_mask_pb2.FieldMask( + paths=["ack_deadline_seconds"] + ), + ) + except gcp_exceptions.GoogleAPIError as e: + logger.warning( + f"Failed to update ack_deadline for Pub/Sub subscription {subscription_path}: {e}" + ) # Pub/Sub has no URL concept analogous to SQS queue URLs return QueueInfo(queue_name, queue_url=None) async def delete_queue(self, endpoint_id: str) -> None: - queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name(endpoint_id) - subscription_path = f"projects/{self.project_id}/subscriptions/{queue_name}" - topic_path = f"projects/{self.project_id}/topics/{queue_name}" - - subscriber = pubsub_v1.SubscriberClient() - publisher = pubsub_v1.PublisherClient() + subscription_path = f"projects/{self.project_id}/subscriptions/{self._subscription_id(endpoint_id)}" + topic_path = f"projects/{self.project_id}/topics/{self._topic_id(endpoint_id)}" try: - subscriber.delete_subscription(subscription=subscription_path) + self._subscriber.delete_subscription(subscription=subscription_path) except gcp_exceptions.NotFound: logger.info( f"Could not find Pub/Sub subscription {subscription_path} for endpoint {endpoint_id}" ) + except gcp_exceptions.GoogleAPIError as e: + raise EndpointResourceInfraException( + f"Failed to delete Pub/Sub subscription {subscription_path} for endpoint {endpoint_id}: {e}" + ) from e try: - publisher.delete_topic(topic=topic_path) + self._publisher.delete_topic(topic=topic_path) except gcp_exceptions.NotFound: logger.info( f"Could not find Pub/Sub topic {topic_path} for endpoint {endpoint_id}" ) + except gcp_exceptions.GoogleAPIError as e: + raise EndpointResourceInfraException( + f"Failed to delete Pub/Sub topic {topic_path} for endpoint {endpoint_id}: {e}" + ) from e async def get_queue_attributes(self, endpoint_id: str) -> Dict[str, Any]: - queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name(endpoint_id) + queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name( + endpoint_id + ) return { "name": queue_name, # Pub/Sub does not expose a synchronous undelivered message count; diff --git a/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py b/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py index f4c8f010..11c79704 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py @@ -1,6 +1,8 @@ from typing import Dict, Optional, Tuple -from model_engine_server.common.dtos.resource_manager import CreateOrUpdateResourcesRequest +from model_engine_server.common.dtos.resource_manager import ( + CreateOrUpdateResourcesRequest, +) from model_engine_server.core.loggers import logger_name, make_logger from model_engine_server.domain.entities import ( ModelEndpointInfraState, @@ -28,11 +30,15 @@ class LiveEndpointResourceGateway(EndpointResourceGateway[QueueInfo]): def __init__( self, queue_delegate: QueueEndpointResourceDelegate, - inference_autoscaling_metrics_gateway: Optional[InferenceAutoscalingMetricsGateway], + inference_autoscaling_metrics_gateway: Optional[ + InferenceAutoscalingMetricsGateway + ], ): self.k8s_delegate = K8SEndpointResourceDelegate() self.queue_delegate = queue_delegate - self.inference_autoscaling_metrics_gateway = inference_autoscaling_metrics_gateway + self.inference_autoscaling_metrics_gateway = ( + inference_autoscaling_metrics_gateway + ) async def create_queue( self, @@ -79,7 +85,9 @@ async def create_or_update_resources( sqs_queue_name=queue_name, sqs_queue_url=queue_url, ) - return EndpointResourceGatewayCreateOrUpdateResourcesResponse(destination=destination) + return EndpointResourceGatewayCreateOrUpdateResourcesResponse( + destination=destination + ) async def get_resources( self, endpoint_id: str, deployment_name: str, endpoint_type: ModelEndpointType @@ -91,7 +99,9 @@ async def get_resources( ) if endpoint_type == ModelEndpointType.ASYNC: - sqs_attributes = await self.queue_delegate.get_queue_attributes(endpoint_id=endpoint_id) + sqs_attributes = await self.queue_delegate.get_queue_attributes( + endpoint_id=endpoint_id + ) if ( "Attributes" in sqs_attributes and "ApproximateNumberOfMessages" in sqs_attributes["Attributes"] @@ -99,10 +109,18 @@ async def get_resources( resources.num_queued_items = int( sqs_attributes["Attributes"]["ApproximateNumberOfMessages"] ) - elif "active_message_count" in sqs_attributes: # from ASBQueueEndpointResourceDelegate + elif ( + "active_message_count" in sqs_attributes + ): # from ASBQueueEndpointResourceDelegate resources.num_queued_items = int(sqs_attributes["active_message_count"]) - elif "num_undelivered_messages" in sqs_attributes: # from GcpPubSubQueueEndpointResourceDelegate - resources.num_queued_items = int(sqs_attributes["num_undelivered_messages"]) + elif ( + "num_undelivered_messages" in sqs_attributes + ): # from GcpPubSubQueueEndpointResourceDelegate + # Pub/Sub returns -1 when num_undelivered_messages is not yet wired to Cloud Monitoring. + # Treat -1 as "unknown" and skip; downstream autoscaling expects non-negative counts. + gcp_count = int(sqs_attributes["num_undelivered_messages"]) + if gcp_count >= 0: + resources.num_queued_items = gcp_count return resources @@ -127,7 +145,9 @@ async def delete_resources( sqs_result = False if self.inference_autoscaling_metrics_gateway is not None: - await self.inference_autoscaling_metrics_gateway.delete_resources(endpoint_id) + await self.inference_autoscaling_metrics_gateway.delete_resources( + endpoint_id + ) return k8s_result and sqs_result diff --git a/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py b/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py index 5303d9c2..10b6e0d5 100644 --- a/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py +++ b/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py @@ -2,6 +2,7 @@ import pytest from google.api_core import exceptions as gcp_exceptions +from model_engine_server.domain.exceptions import EndpointResourceInfraException from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import ( GcpPubSubQueueEndpointResourceDelegate, ) @@ -9,8 +10,10 @@ MODULE_PATH = "model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate" ENDPOINT_ID = "test_endpoint_id" -QUEUE_NAME = f"launch-endpoint-id-{ENDPOINT_ID}" PROJECT_ID = "test-project" +TOPIC_PREFIX = "launch-endpoint-id-" +SUBSCRIPTION_PREFIX = "launch-endpoint-id-" +QUEUE_NAME = f"{TOPIC_PREFIX}{ENDPOINT_ID}" @pytest.fixture @@ -26,12 +29,19 @@ def mock_subscriber(): @pytest.fixture -def delegate(): +def delegate(mock_publisher, mock_subscriber): return GcpPubSubQueueEndpointResourceDelegate(project_id=PROJECT_ID) +def test_init_empty_project_id_raises(): + with pytest.raises(ValueError, match="non-empty project_id"): + GcpPubSubQueueEndpointResourceDelegate(project_id="") + + @pytest.mark.asyncio -async def test_create_queue_if_not_exists_new(mock_publisher, mock_subscriber, delegate): +async def test_create_queue_if_not_exists_new( + mock_publisher, mock_subscriber, delegate +): """Both topic and subscription are created when neither exists.""" result = await delegate.create_queue_if_not_exists( endpoint_id=ENDPOINT_ID, @@ -40,8 +50,10 @@ async def test_create_queue_if_not_exists_new(mock_publisher, mock_subscriber, d endpoint_labels={"team": "test"}, ) - topic_path = f"projects/{PROJECT_ID}/topics/{QUEUE_NAME}" - subscription_path = f"projects/{PROJECT_ID}/subscriptions/{QUEUE_NAME}" + topic_path = f"projects/{PROJECT_ID}/topics/{TOPIC_PREFIX}{ENDPOINT_ID}" + subscription_path = ( + f"projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_PREFIX}{ENDPOINT_ID}" + ) mock_publisher.create_topic.assert_called_once_with(name=topic_path) mock_subscriber.create_subscription.assert_called_once_with( @@ -58,7 +70,9 @@ async def test_create_queue_if_not_exists_topic_already_exists( mock_publisher, mock_subscriber, delegate ): """AlreadyExists on topic creation is silenced; subscription still attempts creation.""" - mock_publisher.create_topic.side_effect = gcp_exceptions.AlreadyExists("topic exists") + mock_publisher.create_topic.side_effect = gcp_exceptions.AlreadyExists( + "topic exists" + ) result = await delegate.create_queue_if_not_exists( endpoint_id=ENDPOINT_ID, @@ -72,10 +86,10 @@ async def test_create_queue_if_not_exists_topic_already_exists( @pytest.mark.asyncio -async def test_create_queue_if_not_exists_subscription_already_exists( +async def test_create_queue_if_not_exists_subscription_already_exists_updates_ack_deadline( mock_publisher, mock_subscriber, delegate ): - """AlreadyExists on subscription creation is silenced.""" + """AlreadyExists on subscription triggers an update_subscription call.""" mock_subscriber.create_subscription.side_effect = gcp_exceptions.AlreadyExists( "subscription exists" ) @@ -89,6 +103,29 @@ async def test_create_queue_if_not_exists_subscription_already_exists( ) mock_publisher.create_topic.assert_called_once() + mock_subscriber.update_subscription.assert_called_once() + assert result.queue_name == QUEUE_NAME + + +@pytest.mark.asyncio +async def test_create_queue_subscription_already_exists_update_failure_is_warned( + mock_publisher, mock_subscriber, delegate +): + """update_subscription GoogleAPIError is swallowed with a warning (not raised).""" + mock_subscriber.create_subscription.side_effect = gcp_exceptions.AlreadyExists( + "exists" + ) + mock_subscriber.update_subscription.side_effect = gcp_exceptions.GoogleAPIError( + "boom" + ) + + # Should not raise + result = await delegate.create_queue_if_not_exists( + endpoint_id=ENDPOINT_ID, + endpoint_name="test_endpoint", + endpoint_created_by="test_user", + endpoint_labels={}, + ) assert result.queue_name == QUEUE_NAME @@ -97,7 +134,9 @@ async def test_delete_queue_subscription_not_found_silent( mock_publisher, mock_subscriber, delegate ): """NotFound on subscription deletion is silenced; topic deletion still attempts.""" - mock_subscriber.delete_subscription.side_effect = gcp_exceptions.NotFound("sub not found") + mock_subscriber.delete_subscription.side_effect = gcp_exceptions.NotFound( + "sub not found" + ) await delegate.delete_queue(endpoint_id=ENDPOINT_ID) @@ -106,7 +145,9 @@ async def test_delete_queue_subscription_not_found_silent( @pytest.mark.asyncio -async def test_delete_queue_topic_not_found_silent(mock_publisher, mock_subscriber, delegate): +async def test_delete_queue_topic_not_found_silent( + mock_publisher, mock_subscriber, delegate +): """NotFound on topic deletion is silenced.""" mock_publisher.delete_topic.side_effect = gcp_exceptions.NotFound("topic not found") @@ -116,6 +157,51 @@ async def test_delete_queue_topic_not_found_silent(mock_publisher, mock_subscrib mock_publisher.delete_topic.assert_called_once() +@pytest.mark.asyncio +async def test_delete_queue_subscription_api_error_raises( + mock_publisher, mock_subscriber, delegate +): + """Non-NotFound GoogleAPIError on subscription deletion raises EndpointResourceInfraException.""" + mock_subscriber.delete_subscription.side_effect = gcp_exceptions.GoogleAPIError( + "network error" + ) + + with pytest.raises( + EndpointResourceInfraException, match="Failed to delete Pub/Sub subscription" + ): + await delegate.delete_queue(endpoint_id=ENDPOINT_ID) + + +@pytest.mark.asyncio +async def test_delete_queue_topic_api_error_raises( + mock_publisher, mock_subscriber, delegate +): + """Non-NotFound GoogleAPIError on topic deletion raises EndpointResourceInfraException.""" + mock_publisher.delete_topic.side_effect = gcp_exceptions.GoogleAPIError( + "network error" + ) + + with pytest.raises( + EndpointResourceInfraException, match="Failed to delete Pub/Sub topic" + ): + await delegate.delete_queue(endpoint_id=ENDPOINT_ID) + + +@pytest.mark.asyncio +async def test_delete_queue_subscription_deleted_before_topic( + mock_publisher, mock_subscriber, delegate +): + """Subscription must be deleted before topic (Pub/Sub requirement to avoid race).""" + parent = MagicMock() + parent.attach_mock(mock_subscriber.delete_subscription, "sub_del") + parent.attach_mock(mock_publisher.delete_topic, "topic_del") + + await delegate.delete_queue(endpoint_id=ENDPOINT_ID) + + call_order = [c[0] for c in parent.mock_calls] + assert call_order == ["sub_del", "topic_del"] + + @pytest.mark.asyncio async def test_get_queue_attributes_returns_expected_shape(delegate): """get_queue_attributes returns a dict with 'name' and 'num_undelivered_messages'.""" From 6f5b1c96c960e9089a32d3a5bc2a7a7e31ad976d Mon Sep 17 00:00:00 2001 From: Po Stevanus Andrianta Date: Tue, 12 May 2026 16:01:36 +0100 Subject: [PATCH 04/12] fix(model-engine): read GCP_PROJECT_ID env first + apply black/isort Addresses Greptile P1 on PR #827: the Helm chart injects GCP_PROJECT_ID as a pod env var (from .Values.gcp.project_id), but the prior code only read it from infra_config().gcp_project_id (a different source rendered from .Values.config.values.infra). On any GCP cluster that follows the sample values, infra_config().gcp_project_id was None and the delegate's project_id guard always raised ValueError at startup. Mirror the SQS_PROFILE pattern (os.getenv("SQS_PROFILE", hmi_config.sqs_profile)) in all three factory sites: - api/dependencies.py - entrypoints/start_batch_job_orchestration.py - entrypoints/k8s_cache.py Also: black --config .black.toml + isort to clear the run_unit_tests_server CI check (formatting drift introduced by the previous commits). --- .../model_engine_server/api/dependencies.py | 12 +++-- .../entrypoints/k8s_cache.py | 10 ++-- .../start_batch_job_orchestration.py | 10 ++-- ...pubsub_queue_endpoint_resource_delegate.py | 26 ++++------ .../live_endpoint_resource_gateway.py | 28 +++-------- ...pubsub_queue_endpoint_resource_delegate.py | 48 ++++++------------- 6 files changed, 51 insertions(+), 83 deletions(-) diff --git a/model-engine/model_engine_server/api/dependencies.py b/model-engine/model_engine_server/api/dependencies.py index 71092932..7e022265 100644 --- a/model-engine/model_engine_server/api/dependencies.py +++ b/model-engine/model_engine_server/api/dependencies.py @@ -89,15 +89,15 @@ from model_engine_server.infra.gateways.resources.asb_queue_endpoint_resource_delegate import ( ASBQueueEndpointResourceDelegate, ) -from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import ( - GcpPubSubQueueEndpointResourceDelegate, -) from model_engine_server.infra.gateways.resources.endpoint_resource_gateway import ( EndpointResourceGateway, ) from model_engine_server.infra.gateways.resources.fake_queue_endpoint_resource_delegate import ( FakeQueueEndpointResourceDelegate, ) +from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import ( + GcpPubSubQueueEndpointResourceDelegate, +) from model_engine_server.infra.gateways.resources.live_endpoint_resource_gateway import ( LiveEndpointResourceGateway, ) @@ -251,8 +251,12 @@ def _get_external_interfaces( elif infra_config().cloud_provider == "azure": queue_delegate = ASBQueueEndpointResourceDelegate() elif infra_config().cloud_provider == "gcp": + # Mirror the SQS_PROFILE env-first pattern: the Helm chart injects GCP_PROJECT_ID as a + # pod env var (from .Values.gcp.project_id), which is a different source than the YAML- + # rendered infra_service_config. Read the env first so the chart value reaches the delegate; + # fall back to infra_config().gcp_project_id for setups that wire it via the config YAML. queue_delegate = GcpPubSubQueueEndpointResourceDelegate( - project_id=infra_config().gcp_project_id, + project_id=os.getenv("GCP_PROJECT_ID") or infra_config().gcp_project_id, ) else: queue_delegate = SQSQueueEndpointResourceDelegate( diff --git a/model-engine/model_engine_server/entrypoints/k8s_cache.py b/model-engine/model_engine_server/entrypoints/k8s_cache.py index 0084ad78..f273f98c 100644 --- a/model-engine/model_engine_server/entrypoints/k8s_cache.py +++ b/model-engine/model_engine_server/entrypoints/k8s_cache.py @@ -21,15 +21,15 @@ from model_engine_server.infra.gateways.resources.asb_queue_endpoint_resource_delegate import ( ASBQueueEndpointResourceDelegate, ) -from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import ( - GcpPubSubQueueEndpointResourceDelegate, -) from model_engine_server.infra.gateways.resources.endpoint_resource_gateway import ( EndpointResourceGateway, ) from model_engine_server.infra.gateways.resources.fake_queue_endpoint_resource_delegate import ( FakeQueueEndpointResourceDelegate, ) +from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import ( + GcpPubSubQueueEndpointResourceDelegate, +) from model_engine_server.infra.gateways.resources.image_cache_gateway import ImageCacheGateway from model_engine_server.infra.gateways.resources.live_endpoint_resource_gateway import ( LiveEndpointResourceGateway, @@ -123,8 +123,10 @@ async def main(args: Any): elif infra_config().cloud_provider == "azure": queue_delegate = ASBQueueEndpointResourceDelegate() elif infra_config().cloud_provider == "gcp": + # See dependencies.py for rationale: Helm injects GCP_PROJECT_ID as a pod env var; + # the infra_service_config YAML is a different source. Read the env first. queue_delegate = GcpPubSubQueueEndpointResourceDelegate( - project_id=infra_config().gcp_project_id, + project_id=os.getenv("GCP_PROJECT_ID") or infra_config().gcp_project_id, ) else: queue_delegate = SQSQueueEndpointResourceDelegate( diff --git a/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py b/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py index 46bec9ba..ab183ae3 100644 --- a/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py +++ b/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py @@ -31,12 +31,12 @@ from model_engine_server.infra.gateways.resources.asb_queue_endpoint_resource_delegate import ( ASBQueueEndpointResourceDelegate, ) -from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import ( - GcpPubSubQueueEndpointResourceDelegate, -) from model_engine_server.infra.gateways.resources.fake_queue_endpoint_resource_delegate import ( FakeQueueEndpointResourceDelegate, ) +from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import ( + GcpPubSubQueueEndpointResourceDelegate, +) from model_engine_server.infra.gateways.resources.live_endpoint_resource_gateway import ( LiveEndpointResourceGateway, ) @@ -94,8 +94,10 @@ async def run_batch_job( elif infra_config().cloud_provider == "azure": queue_delegate = ASBQueueEndpointResourceDelegate() elif infra_config().cloud_provider == "gcp": + # See dependencies.py for rationale: Helm injects GCP_PROJECT_ID as a pod env var; + # the infra_service_config YAML is a different source. Read the env first. queue_delegate = GcpPubSubQueueEndpointResourceDelegate( - project_id=infra_config().gcp_project_id, + project_id=os.getenv("GCP_PROJECT_ID") or infra_config().gcp_project_id, ) else: queue_delegate = SQSQueueEndpointResourceDelegate( diff --git a/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py index 0dce7fd5..b68cbb96 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py @@ -56,14 +56,12 @@ async def create_queue_if_not_exists( endpoint_labels: Dict[str, Any], queue_message_timeout_seconds: Optional[int] = None, ) -> QueueInfo: - queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name( - endpoint_id - ) + queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name(endpoint_id) topic_path = f"projects/{self.project_id}/topics/{self._topic_id(endpoint_id)}" - subscription_path = f"projects/{self.project_id}/subscriptions/{self._subscription_id(endpoint_id)}" - ack_deadline = min( - queue_message_timeout_seconds or 60, GCP_PUBSUB_MAX_ACK_DEADLINE_SECONDS + subscription_path = ( + f"projects/{self.project_id}/subscriptions/{self._subscription_id(endpoint_id)}" ) + ack_deadline = min(queue_message_timeout_seconds or 60, GCP_PUBSUB_MAX_ACK_DEADLINE_SECONDS) try: self._publisher.create_topic(name=topic_path) @@ -83,9 +81,7 @@ async def create_queue_if_not_exists( name=subscription_path, ack_deadline_seconds=ack_deadline, ), - update_mask=field_mask_pb2.FieldMask( - paths=["ack_deadline_seconds"] - ), + update_mask=field_mask_pb2.FieldMask(paths=["ack_deadline_seconds"]), ) except gcp_exceptions.GoogleAPIError as e: logger.warning( @@ -96,7 +92,9 @@ async def create_queue_if_not_exists( return QueueInfo(queue_name, queue_url=None) async def delete_queue(self, endpoint_id: str) -> None: - subscription_path = f"projects/{self.project_id}/subscriptions/{self._subscription_id(endpoint_id)}" + subscription_path = ( + f"projects/{self.project_id}/subscriptions/{self._subscription_id(endpoint_id)}" + ) topic_path = f"projects/{self.project_id}/topics/{self._topic_id(endpoint_id)}" try: @@ -113,18 +111,14 @@ async def delete_queue(self, endpoint_id: str) -> None: try: self._publisher.delete_topic(topic=topic_path) except gcp_exceptions.NotFound: - logger.info( - f"Could not find Pub/Sub topic {topic_path} for endpoint {endpoint_id}" - ) + logger.info(f"Could not find Pub/Sub topic {topic_path} for endpoint {endpoint_id}") except gcp_exceptions.GoogleAPIError as e: raise EndpointResourceInfraException( f"Failed to delete Pub/Sub topic {topic_path} for endpoint {endpoint_id}: {e}" ) from e async def get_queue_attributes(self, endpoint_id: str) -> Dict[str, Any]: - queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name( - endpoint_id - ) + queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name(endpoint_id) return { "name": queue_name, # Pub/Sub does not expose a synchronous undelivered message count; diff --git a/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py b/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py index 11c79704..d0d0ca59 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py @@ -1,8 +1,6 @@ from typing import Dict, Optional, Tuple -from model_engine_server.common.dtos.resource_manager import ( - CreateOrUpdateResourcesRequest, -) +from model_engine_server.common.dtos.resource_manager import CreateOrUpdateResourcesRequest from model_engine_server.core.loggers import logger_name, make_logger from model_engine_server.domain.entities import ( ModelEndpointInfraState, @@ -30,15 +28,11 @@ class LiveEndpointResourceGateway(EndpointResourceGateway[QueueInfo]): def __init__( self, queue_delegate: QueueEndpointResourceDelegate, - inference_autoscaling_metrics_gateway: Optional[ - InferenceAutoscalingMetricsGateway - ], + inference_autoscaling_metrics_gateway: Optional[InferenceAutoscalingMetricsGateway], ): self.k8s_delegate = K8SEndpointResourceDelegate() self.queue_delegate = queue_delegate - self.inference_autoscaling_metrics_gateway = ( - inference_autoscaling_metrics_gateway - ) + self.inference_autoscaling_metrics_gateway = inference_autoscaling_metrics_gateway async def create_queue( self, @@ -85,9 +79,7 @@ async def create_or_update_resources( sqs_queue_name=queue_name, sqs_queue_url=queue_url, ) - return EndpointResourceGatewayCreateOrUpdateResourcesResponse( - destination=destination - ) + return EndpointResourceGatewayCreateOrUpdateResourcesResponse(destination=destination) async def get_resources( self, endpoint_id: str, deployment_name: str, endpoint_type: ModelEndpointType @@ -99,9 +91,7 @@ async def get_resources( ) if endpoint_type == ModelEndpointType.ASYNC: - sqs_attributes = await self.queue_delegate.get_queue_attributes( - endpoint_id=endpoint_id - ) + sqs_attributes = await self.queue_delegate.get_queue_attributes(endpoint_id=endpoint_id) if ( "Attributes" in sqs_attributes and "ApproximateNumberOfMessages" in sqs_attributes["Attributes"] @@ -109,9 +99,7 @@ async def get_resources( resources.num_queued_items = int( sqs_attributes["Attributes"]["ApproximateNumberOfMessages"] ) - elif ( - "active_message_count" in sqs_attributes - ): # from ASBQueueEndpointResourceDelegate + elif "active_message_count" in sqs_attributes: # from ASBQueueEndpointResourceDelegate resources.num_queued_items = int(sqs_attributes["active_message_count"]) elif ( "num_undelivered_messages" in sqs_attributes @@ -145,9 +133,7 @@ async def delete_resources( sqs_result = False if self.inference_autoscaling_metrics_gateway is not None: - await self.inference_autoscaling_metrics_gateway.delete_resources( - endpoint_id - ) + await self.inference_autoscaling_metrics_gateway.delete_resources(endpoint_id) return k8s_result and sqs_result diff --git a/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py b/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py index 10b6e0d5..a743cc36 100644 --- a/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py +++ b/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py @@ -7,7 +7,9 @@ GcpPubSubQueueEndpointResourceDelegate, ) -MODULE_PATH = "model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate" +MODULE_PATH = ( + "model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate" +) ENDPOINT_ID = "test_endpoint_id" PROJECT_ID = "test-project" @@ -39,9 +41,7 @@ def test_init_empty_project_id_raises(): @pytest.mark.asyncio -async def test_create_queue_if_not_exists_new( - mock_publisher, mock_subscriber, delegate -): +async def test_create_queue_if_not_exists_new(mock_publisher, mock_subscriber, delegate): """Both topic and subscription are created when neither exists.""" result = await delegate.create_queue_if_not_exists( endpoint_id=ENDPOINT_ID, @@ -51,9 +51,7 @@ async def test_create_queue_if_not_exists_new( ) topic_path = f"projects/{PROJECT_ID}/topics/{TOPIC_PREFIX}{ENDPOINT_ID}" - subscription_path = ( - f"projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_PREFIX}{ENDPOINT_ID}" - ) + subscription_path = f"projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_PREFIX}{ENDPOINT_ID}" mock_publisher.create_topic.assert_called_once_with(name=topic_path) mock_subscriber.create_subscription.assert_called_once_with( @@ -70,9 +68,7 @@ async def test_create_queue_if_not_exists_topic_already_exists( mock_publisher, mock_subscriber, delegate ): """AlreadyExists on topic creation is silenced; subscription still attempts creation.""" - mock_publisher.create_topic.side_effect = gcp_exceptions.AlreadyExists( - "topic exists" - ) + mock_publisher.create_topic.side_effect = gcp_exceptions.AlreadyExists("topic exists") result = await delegate.create_queue_if_not_exists( endpoint_id=ENDPOINT_ID, @@ -112,12 +108,8 @@ async def test_create_queue_subscription_already_exists_update_failure_is_warned mock_publisher, mock_subscriber, delegate ): """update_subscription GoogleAPIError is swallowed with a warning (not raised).""" - mock_subscriber.create_subscription.side_effect = gcp_exceptions.AlreadyExists( - "exists" - ) - mock_subscriber.update_subscription.side_effect = gcp_exceptions.GoogleAPIError( - "boom" - ) + mock_subscriber.create_subscription.side_effect = gcp_exceptions.AlreadyExists("exists") + mock_subscriber.update_subscription.side_effect = gcp_exceptions.GoogleAPIError("boom") # Should not raise result = await delegate.create_queue_if_not_exists( @@ -134,9 +126,7 @@ async def test_delete_queue_subscription_not_found_silent( mock_publisher, mock_subscriber, delegate ): """NotFound on subscription deletion is silenced; topic deletion still attempts.""" - mock_subscriber.delete_subscription.side_effect = gcp_exceptions.NotFound( - "sub not found" - ) + mock_subscriber.delete_subscription.side_effect = gcp_exceptions.NotFound("sub not found") await delegate.delete_queue(endpoint_id=ENDPOINT_ID) @@ -145,9 +135,7 @@ async def test_delete_queue_subscription_not_found_silent( @pytest.mark.asyncio -async def test_delete_queue_topic_not_found_silent( - mock_publisher, mock_subscriber, delegate -): +async def test_delete_queue_topic_not_found_silent(mock_publisher, mock_subscriber, delegate): """NotFound on topic deletion is silenced.""" mock_publisher.delete_topic.side_effect = gcp_exceptions.NotFound("topic not found") @@ -162,9 +150,7 @@ async def test_delete_queue_subscription_api_error_raises( mock_publisher, mock_subscriber, delegate ): """Non-NotFound GoogleAPIError on subscription deletion raises EndpointResourceInfraException.""" - mock_subscriber.delete_subscription.side_effect = gcp_exceptions.GoogleAPIError( - "network error" - ) + mock_subscriber.delete_subscription.side_effect = gcp_exceptions.GoogleAPIError("network error") with pytest.raises( EndpointResourceInfraException, match="Failed to delete Pub/Sub subscription" @@ -173,17 +159,11 @@ async def test_delete_queue_subscription_api_error_raises( @pytest.mark.asyncio -async def test_delete_queue_topic_api_error_raises( - mock_publisher, mock_subscriber, delegate -): +async def test_delete_queue_topic_api_error_raises(mock_publisher, mock_subscriber, delegate): """Non-NotFound GoogleAPIError on topic deletion raises EndpointResourceInfraException.""" - mock_publisher.delete_topic.side_effect = gcp_exceptions.GoogleAPIError( - "network error" - ) + mock_publisher.delete_topic.side_effect = gcp_exceptions.GoogleAPIError("network error") - with pytest.raises( - EndpointResourceInfraException, match="Failed to delete Pub/Sub topic" - ): + with pytest.raises(EndpointResourceInfraException, match="Failed to delete Pub/Sub topic"): await delegate.delete_queue(endpoint_id=ENDPOINT_ID) From 703fe32e552cdb3de3d74cda8191dfc0e60e0729 Mon Sep 17 00:00:00 2001 From: Po Stevanus Andrianta Date: Wed, 13 May 2026 17:53:35 +0100 Subject: [PATCH 05/12] fix(model-engine): prevent topic orphan + drop unused redis delegate import MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Greptile P1 (gcp_pubsub_queue_endpoint_resource_delegate.py:118): delete_queue previously short-circuited after a non-NotFound failure on delete_subscription, leaving the topic orphaned. Now we attempt BOTH deletions, collect any GoogleAPIErrors, and raise a single EndpointResourceInfraException with all cleanup failures after both attempts. NotFound on either step remains silent (idempotent). Ruff F401 (api/dependencies.py:111): RedisQueueEndpointResourceDelegate was imported but is no longer used — removed in the prior commit when GCP switched from the redis fallback to the new Pub/Sub delegate. Dropping the import fixes the run_unit_tests_server Ruff Lint Check. Test: adds test_delete_queue_subscription_failure_does_not_orphan_topic to pin the orphan-prevention invariant. --- .../model_engine_server/api/dependencies.py | 3 --- ...pubsub_queue_endpoint_resource_delegate.py | 19 ++++++++++++++----- ...pubsub_queue_endpoint_resource_delegate.py | 14 ++++++++++++++ 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/model-engine/model_engine_server/api/dependencies.py b/model-engine/model_engine_server/api/dependencies.py index 7e022265..3360b371 100644 --- a/model-engine/model_engine_server/api/dependencies.py +++ b/model-engine/model_engine_server/api/dependencies.py @@ -107,9 +107,6 @@ from model_engine_server.infra.gateways.resources.queue_endpoint_resource_delegate import ( QueueEndpointResourceDelegate, ) -from model_engine_server.infra.gateways.resources.redis_queue_endpoint_resource_delegate import ( - RedisQueueEndpointResourceDelegate, -) from model_engine_server.infra.gateways.resources.sqs_queue_endpoint_resource_delegate import ( SQSQueueEndpointResourceDelegate, ) diff --git a/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py index b68cbb96..2f685254 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py @@ -97,6 +97,11 @@ async def delete_queue(self, endpoint_id: str) -> None: ) topic_path = f"projects/{self.project_id}/topics/{self._topic_id(endpoint_id)}" + # Always attempt BOTH deletions so a failure on one doesn't leave the other resource + # orphaned (Greptile P1). NotFound is silent. Other GoogleAPIErrors are collected and + # surfaced together at the end so callers see every cleanup failure, not just the first. + errors: list[tuple[str, str, gcp_exceptions.GoogleAPIError]] = [] + try: self._subscriber.delete_subscription(subscription=subscription_path) except gcp_exceptions.NotFound: @@ -104,18 +109,22 @@ async def delete_queue(self, endpoint_id: str) -> None: f"Could not find Pub/Sub subscription {subscription_path} for endpoint {endpoint_id}" ) except gcp_exceptions.GoogleAPIError as e: - raise EndpointResourceInfraException( - f"Failed to delete Pub/Sub subscription {subscription_path} for endpoint {endpoint_id}: {e}" - ) from e + errors.append(("subscription", subscription_path, e)) try: self._publisher.delete_topic(topic=topic_path) except gcp_exceptions.NotFound: logger.info(f"Could not find Pub/Sub topic {topic_path} for endpoint {endpoint_id}") except gcp_exceptions.GoogleAPIError as e: + errors.append(("topic", topic_path, e)) + + if errors: + details = "; ".join( + f"Failed to delete Pub/Sub {kind} {path}: {err}" for kind, path, err in errors + ) raise EndpointResourceInfraException( - f"Failed to delete Pub/Sub topic {topic_path} for endpoint {endpoint_id}: {e}" - ) from e + f"Cleanup errors for endpoint {endpoint_id}: {details}" + ) from errors[0][2] async def get_queue_attributes(self, endpoint_id: str) -> Dict[str, Any]: queue_name = QueueEndpointResourceDelegate.endpoint_id_to_queue_name(endpoint_id) diff --git a/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py b/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py index a743cc36..2f9340c6 100644 --- a/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py +++ b/model-engine/tests/unit/infra/gateways/resources/test_gcp_pubsub_queue_endpoint_resource_delegate.py @@ -167,6 +167,20 @@ async def test_delete_queue_topic_api_error_raises(mock_publisher, mock_subscrib await delegate.delete_queue(endpoint_id=ENDPOINT_ID) +@pytest.mark.asyncio +async def test_delete_queue_subscription_failure_does_not_orphan_topic( + mock_publisher, mock_subscriber, delegate +): + """When subscription delete fails, topic delete must still be attempted (no orphan).""" + mock_subscriber.delete_subscription.side_effect = gcp_exceptions.GoogleAPIError("transient") + + with pytest.raises(EndpointResourceInfraException): + await delegate.delete_queue(endpoint_id=ENDPOINT_ID) + + # The key invariant: topic deletion was attempted even though subscription deletion failed. + mock_publisher.delete_topic.assert_called_once() + + @pytest.mark.asyncio async def test_delete_queue_subscription_deleted_before_topic( mock_publisher, mock_subscriber, delegate From 59dbf854f71cd959ddedf30b904ef56c50214399 Mon Sep 17 00:00:00 2001 From: Po Stevanus Andrianta Date: Wed, 13 May 2026 18:17:46 +0100 Subject: [PATCH 06/12] fix(model-engine): narrow gcp_project_id at call site to satisfy mypy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Type Check (mypy) CI step on run_unit_tests_server failed because os.getenv("GCP_PROJECT_ID") or infra_config().gcp_project_id evaluates to str | None, but GcpPubSubQueueEndpointResourceDelegate declares project_id: str. The delegate's runtime guard catches misconfig, but mypy can't see through __init__ to narrow the type at the call site. Narrow explicitly at the 3 factory sites — extract the project_id into a local, check `if not project_id: raise ValueError(...)`, then pass to the delegate. mypy is happy, and misconfig surfaces with a clearer error naming both possible config sources rather than the generic ValueError from the delegate. --- model-engine/model_engine_server/api/dependencies.py | 12 ++++++++---- .../model_engine_server/entrypoints/k8s_cache.py | 10 +++++++--- .../entrypoints/start_batch_job_orchestration.py | 10 +++++++--- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/model-engine/model_engine_server/api/dependencies.py b/model-engine/model_engine_server/api/dependencies.py index 3360b371..8e68be55 100644 --- a/model-engine/model_engine_server/api/dependencies.py +++ b/model-engine/model_engine_server/api/dependencies.py @@ -251,10 +251,14 @@ def _get_external_interfaces( # Mirror the SQS_PROFILE env-first pattern: the Helm chart injects GCP_PROJECT_ID as a # pod env var (from .Values.gcp.project_id), which is a different source than the YAML- # rendered infra_service_config. Read the env first so the chart value reaches the delegate; - # fall back to infra_config().gcp_project_id for setups that wire it via the config YAML. - queue_delegate = GcpPubSubQueueEndpointResourceDelegate( - project_id=os.getenv("GCP_PROJECT_ID") or infra_config().gcp_project_id, - ) + # the infra_config.gcp_project_id field handles setups that wire it via the config YAML. + gcp_project_id = os.getenv("GCP_PROJECT_ID") or infra_config().gcp_project_id + if not gcp_project_id: + raise ValueError( + "cloud_provider=gcp requires GCP_PROJECT_ID env var " + "(via .Values.gcp.project_id) or infra.gcp_project_id in the service config." + ) + queue_delegate = GcpPubSubQueueEndpointResourceDelegate(project_id=gcp_project_id) else: queue_delegate = SQSQueueEndpointResourceDelegate( sqs_profile=os.getenv("SQS_PROFILE", hmi_config.sqs_profile) diff --git a/model-engine/model_engine_server/entrypoints/k8s_cache.py b/model-engine/model_engine_server/entrypoints/k8s_cache.py index f273f98c..a64d55e9 100644 --- a/model-engine/model_engine_server/entrypoints/k8s_cache.py +++ b/model-engine/model_engine_server/entrypoints/k8s_cache.py @@ -125,9 +125,13 @@ async def main(args: Any): elif infra_config().cloud_provider == "gcp": # See dependencies.py for rationale: Helm injects GCP_PROJECT_ID as a pod env var; # the infra_service_config YAML is a different source. Read the env first. - queue_delegate = GcpPubSubQueueEndpointResourceDelegate( - project_id=os.getenv("GCP_PROJECT_ID") or infra_config().gcp_project_id, - ) + gcp_project_id = os.getenv("GCP_PROJECT_ID") or infra_config().gcp_project_id + if not gcp_project_id: + raise ValueError( + "cloud_provider=gcp requires GCP_PROJECT_ID env var " + "(via .Values.gcp.project_id) or infra.gcp_project_id in the service config." + ) + queue_delegate = GcpPubSubQueueEndpointResourceDelegate(project_id=gcp_project_id) else: queue_delegate = SQSQueueEndpointResourceDelegate( sqs_profile=os.getenv("SQS_PROFILE", hmi_config.sqs_profile) diff --git a/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py b/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py index ab183ae3..16b8c46a 100644 --- a/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py +++ b/model-engine/model_engine_server/entrypoints/start_batch_job_orchestration.py @@ -96,9 +96,13 @@ async def run_batch_job( elif infra_config().cloud_provider == "gcp": # See dependencies.py for rationale: Helm injects GCP_PROJECT_ID as a pod env var; # the infra_service_config YAML is a different source. Read the env first. - queue_delegate = GcpPubSubQueueEndpointResourceDelegate( - project_id=os.getenv("GCP_PROJECT_ID") or infra_config().gcp_project_id, - ) + gcp_project_id = os.getenv("GCP_PROJECT_ID") or infra_config().gcp_project_id + if not gcp_project_id: + raise ValueError( + "cloud_provider=gcp requires GCP_PROJECT_ID env var " + "(via .Values.gcp.project_id) or infra.gcp_project_id in the service config." + ) + queue_delegate = GcpPubSubQueueEndpointResourceDelegate(project_id=gcp_project_id) else: queue_delegate = SQSQueueEndpointResourceDelegate( sqs_profile=os.getenv("SQS_PROFILE", hmi_config.sqs_profile) From f67227cf71179cfffa24b454d02d63f3d6edf2af Mon Sep 17 00:00:00 2001 From: Po Stevanus Andrianta Date: Wed, 13 May 2026 18:28:36 +0100 Subject: [PATCH 07/12] chore(model-engine): regenerate requirements.txt to lock google-cloud-pubsub Previous commits added `google-cloud-pubsub>=2.18` to requirements.in but left requirements.txt stale. CI installs from the locked file, so the new dependency wasn't available and the unit-test collection failed with: ImportError: cannot import name 'pubsub_v1' from 'google.cloud' Regenerated with the command in the file header: uv pip compile requirements.in --python-version 3.13 --no-cache -o requirements.txt --- model-engine/requirements.txt | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/model-engine/requirements.txt b/model-engine/requirements.txt index 9cf220f7..9f9fd1b6 100644 --- a/model-engine/requirements.txt +++ b/model-engine/requirements.txt @@ -190,6 +190,7 @@ google-api-core==2.29.0 # via # google-cloud-artifact-registry # google-cloud-core + # google-cloud-pubsub # google-cloud-secret-manager # google-cloud-storage google-auth==2.25.2 @@ -198,6 +199,7 @@ google-auth==2.25.2 # google-api-core # google-cloud-artifact-registry # google-cloud-core + # google-cloud-pubsub # google-cloud-secret-manager # google-cloud-storage # kubernetes @@ -205,6 +207,8 @@ google-cloud-artifact-registry==1.21.0 # via -r requirements.in google-cloud-core==2.5.0 # via google-cloud-storage +google-cloud-pubsub==2.38.0 + # via -r requirements.in google-cloud-secret-manager==2.24.0 # via -r requirements.in google-cloud-storage==2.14.0 @@ -227,16 +231,20 @@ greenlet==3.3.2 grpc-google-iam-v1==0.14.3 # via # google-cloud-artifact-registry + # google-cloud-pubsub # google-cloud-secret-manager grpcio==1.75.1 # via # google-api-core # google-cloud-artifact-registry + # google-cloud-pubsub # googleapis-common-protos # grpc-google-iam-v1 # grpcio-status grpcio-status==1.75.1 - # via google-api-core + # via + # google-api-core + # google-cloud-pubsub gunicorn==23.0.0 # via -r requirements.in h11==0.16.0 @@ -371,7 +379,15 @@ numpy==2.4.4 oauthlib==3.2.2 # via requests-oauthlib opentelemetry-api==1.40.0 - # via ddtrace + # via + # ddtrace + # google-cloud-pubsub + # opentelemetry-sdk + # opentelemetry-semantic-conventions +opentelemetry-sdk==1.40.0 + # via google-cloud-pubsub +opentelemetry-semantic-conventions==0.61b0 + # via opentelemetry-sdk orjson==3.11.7 # via -r requirements.in packaging==23.1 @@ -401,12 +417,14 @@ proto-plus==1.27.1 # via # google-api-core # google-cloud-artifact-registry + # google-cloud-pubsub # google-cloud-secret-manager protobuf==6.33.5 # via # -r requirements.in # google-api-core # google-cloud-artifact-registry + # google-cloud-pubsub # google-cloud-secret-manager # googleapis-common-protos # grpc-google-iam-v1 @@ -607,6 +625,8 @@ typing-extensions==4.15.0 # grpcio # huggingface-hub # opentelemetry-api + # opentelemetry-sdk + # opentelemetry-semantic-conventions # pydantic # pydantic-core # sqlalchemy From 5001650564749d92e950b4e73ac4bccbc2fd67cc Mon Sep 17 00:00:00 2001 From: Po Stevanus Andrianta Date: Wed, 13 May 2026 18:47:30 +0100 Subject: [PATCH 08/12] fix(model-engine): pin opentelemetry-exporter-otlp-proto-grpc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adding google-cloud-pubsub indirectly broke test collection because: 1. google-cloud-pubsub depends on opentelemetry-sdk (transitive). 2. common/startup_tracing/correlation.py sets OTEL_AVAILABLE=True when opentelemetry.trace is importable — which is now the case. 3. With OTEL_AVAILABLE=True, common/startup_tracing/tracer.py:19 imports opentelemetry.exporter.otlp.proto.grpc.metric_exporter, which lives in opentelemetry-exporter-otlp-proto-grpc — not previously a declared dependency anywhere in the project. CI was failing test collection on both startup_tracing tests with ModuleNotFoundError: No module named 'opentelemetry.exporter'. Pin opentelemetry-exporter-otlp-proto-grpc explicitly so the import chain in tracer.py resolves whenever OTEL_AVAILABLE is True. Note: the underlying inconsistency (OTEL_AVAILABLE flagged by a subset of imports that the actual code path uses) is a pre-existing latent bug in tracer.py — a follow-up could either tighten the import guard or make the exporter imports lazy. Out of scope for this PR. --- model-engine/requirements.in | 5 +++++ model-engine/requirements.txt | 23 +++++++++++++++++++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/model-engine/requirements.in b/model-engine/requirements.in index 16ed6b92..716c463b 100644 --- a/model-engine/requirements.in +++ b/model-engine/requirements.in @@ -13,6 +13,11 @@ azure-storage-blob~=12.19.0 gcloud-aio-storage~=9.6 google-auth~=2.25.0 google-cloud-pubsub>=2.18 +# google-cloud-pubsub transitively pulls opentelemetry-sdk, which flips +# common/startup_tracing/correlation.py's OTEL_AVAILABLE to True. Once that's +# True, tracer.py imports from opentelemetry-exporter-otlp-proto-grpc, which +# isn't otherwise a dependency. Pin it explicitly so the import resolves. +opentelemetry-exporter-otlp-proto-grpc google-cloud-artifact-registry~=1.21.0 google-cloud-secret-manager>=2.24.0 google-cloud-storage~=2.14.0 diff --git a/model-engine/requirements.txt b/model-engine/requirements.txt index 9f9fd1b6..e5a77684 100644 --- a/model-engine/requirements.txt +++ b/model-engine/requirements.txt @@ -224,6 +224,7 @@ googleapis-common-protos==1.72.0 # google-api-core # grpc-google-iam-v1 # grpcio-status + # opentelemetry-exporter-otlp-proto-grpc greenlet==3.3.2 # via # -r requirements.in @@ -241,6 +242,7 @@ grpcio==1.75.1 # googleapis-common-protos # grpc-google-iam-v1 # grpcio-status + # opentelemetry-exporter-otlp-proto-grpc grpcio-status==1.75.1 # via # google-api-core @@ -378,15 +380,26 @@ numpy==2.4.4 # transformers oauthlib==3.2.2 # via requests-oauthlib -opentelemetry-api==1.40.0 +opentelemetry-api==1.41.1 # via # ddtrace # google-cloud-pubsub + # opentelemetry-exporter-otlp-proto-grpc # opentelemetry-sdk # opentelemetry-semantic-conventions -opentelemetry-sdk==1.40.0 - # via google-cloud-pubsub -opentelemetry-semantic-conventions==0.61b0 +opentelemetry-exporter-otlp-proto-common==1.41.1 + # via opentelemetry-exporter-otlp-proto-grpc +opentelemetry-exporter-otlp-proto-grpc==1.41.1 + # via -r requirements.in +opentelemetry-proto==1.41.1 + # via + # opentelemetry-exporter-otlp-proto-common + # opentelemetry-exporter-otlp-proto-grpc +opentelemetry-sdk==1.41.1 + # via + # google-cloud-pubsub + # opentelemetry-exporter-otlp-proto-grpc +opentelemetry-semantic-conventions==0.62b1 # via opentelemetry-sdk orjson==3.11.7 # via -r requirements.in @@ -429,6 +442,7 @@ protobuf==6.33.5 # googleapis-common-protos # grpc-google-iam-v1 # grpcio-status + # opentelemetry-proto # proto-plus psycopg2-binary==2.9.11 # via -r requirements.in @@ -625,6 +639,7 @@ typing-extensions==4.15.0 # grpcio # huggingface-hub # opentelemetry-api + # opentelemetry-exporter-otlp-proto-grpc # opentelemetry-sdk # opentelemetry-semantic-conventions # pydantic From dcac955d75510df008eb750f0e37dd27fd9f67ce Mon Sep 17 00:00:00 2001 From: Po Stevanus Andrianta Date: Wed, 13 May 2026 19:06:06 +0100 Subject: [PATCH 09/12] fix(model-engine): lazy pubsub clients + restore newline in helm helper Two regressions from prior commits surfaced when CI actually ran the unit tests: 1) test_gcp_provider_selects_gcp_implementations failed with google.auth.exceptions.DefaultCredentialsError. Eagerly constructing pubsub_v1.PublisherClient() / SubscriberClient() in __init__ triggers Google ADC auth at delegate-construction time; CI has no credentials. Make both clients lazy: store None on __init__, materialize on first property access, cache thereafter. Still avoids per-call construction (the Greptile P2 we addressed previously), but doesn't trip auth when the delegate is merely constructed by a factory under test. 2) test_k8s_endpoint_resource_delegate's helm-template subprocess tests failed with "block sequence entries are not allowed in this context". The $gcp_cloud_provider local-variable line in _helpers.tpl used {{- ... -}} which stripped the trailing newline, smashing the preceding LAUNCH_SERVICE_TEMPLATE_FOLDER value: into the next MODEL_CACHE_ENABLED entry. Change to {{- ... }} so the newline survives. --- charts/model-engine/templates/_helpers.tpl | 2 +- ...pubsub_queue_endpoint_resource_delegate.py | 19 +++++++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/charts/model-engine/templates/_helpers.tpl b/charts/model-engine/templates/_helpers.tpl index 8157eb15..c01fb683 100644 --- a/charts/model-engine/templates/_helpers.tpl +++ b/charts/model-engine/templates/_helpers.tpl @@ -354,7 +354,7 @@ env: - name: LAUNCH_SERVICE_TEMPLATE_FOLDER value: "/workspace/model-engine/model_engine_server/infra/gateways/resources/templates" {{- $model_cache := default dict .Values.modelCache }} - {{- $gcp_cloud_provider := and .Values.config .Values.config.values .Values.config.values.infra (eq (.Values.config.values.infra.cloud_provider | default "") "gcp") -}} + {{- $gcp_cloud_provider := and .Values.config .Values.config.values .Values.config.values.infra (eq (.Values.config.values.infra.cloud_provider | default "") "gcp") }} - name: MODEL_CACHE_ENABLED value: {{ get $model_cache "enabled" | default false | quote }} - name: MODEL_CACHE_MOUNT_PATH diff --git a/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py index 2f685254..89b280ef 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/gcp_pubsub_queue_endpoint_resource_delegate.py @@ -39,8 +39,23 @@ def __init__( self.project_id = project_id self.topic_prefix = topic_prefix self.subscription_prefix = subscription_prefix - self._publisher = pubsub_v1.PublisherClient() - self._subscriber = pubsub_v1.SubscriberClient() + # Lazily-initialized gRPC clients. Construction calls Google ADC which is + # unavailable in unit-test environments, so defer until first real use. + # The clients are then cached for the lifetime of the delegate. + self._publisher_client: Optional[pubsub_v1.PublisherClient] = None + self._subscriber_client: Optional[pubsub_v1.SubscriberClient] = None + + @property + def _publisher(self) -> pubsub_v1.PublisherClient: + if self._publisher_client is None: + self._publisher_client = pubsub_v1.PublisherClient() + return self._publisher_client + + @property + def _subscriber(self) -> pubsub_v1.SubscriberClient: + if self._subscriber_client is None: + self._subscriber_client = pubsub_v1.SubscriberClient() + return self._subscriber_client def _topic_id(self, endpoint_id: str) -> str: return f"{self.topic_prefix}{endpoint_id}" From 59354e37ba23fee7ed5a01511bdd5983ab2a7c7e Mon Sep 17 00:00:00 2001 From: Po Stevanus Andrianta Date: Wed, 13 May 2026 19:18:24 +0100 Subject: [PATCH 10/12] fix(clients/python): silence types-setuptools 82.0 package_data stub regression This is the third consecutive CI run failing on the same line in a file this PR does not own. The cause is environmental: * CircleCI runs `mypy --install-types --non-interactive` which fetches the latest type stubs on every job. * types-setuptools 82.0.0.20260508 tightened `package_data` to expect `_DictLike[str, list[str]]`. The literal `dict[str, list[str]]` we have here is runtime-compatible but the new stub disagrees. Suppress at the call site with `# type: ignore[arg-type]`. The runtime behavior is unchanged. Annotating here is cheaper than pinning the stub (which would mask future legitimate tightenings). This is in clients/python/setup.py which is not part of this PR's feature work; treating as required courtesy to unblock CI since the same regression hits every PR opened today. --- clients/python/setup.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/clients/python/setup.py b/clients/python/setup.py index 52d9e447..8fbee7c7 100644 --- a/clients/python/setup.py +++ b/clients/python/setup.py @@ -5,5 +5,8 @@ python_requires=">=3.8", version="0.0.0.beta45", packages=find_packages(), - package_data={"llmengine": ["py.typed"]}, + # types-setuptools 82.0.0+ tightened package_data to _DictLike; the literal dict + # still works at runtime, only the new stub disagrees. Suppress at the call site + # rather than down-pinning the stub (which would mask real future tightenings). + package_data={"llmengine": ["py.typed"]}, # type: ignore[arg-type] ) From d843f69d54f55d14448e9b4e5be385e9c2edf153 Mon Sep 17 00:00:00 2001 From: Po Stevanus Andrianta Date: Wed, 13 May 2026 21:26:19 +0100 Subject: [PATCH 11/12] test(api): update test_gcp_provider_selects_gcp_implementations for the new delegate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test was still asserting the pre-PR contract — that cloud_provider=gcp selects RedisQueueEndpointResourceDelegate. This PR's whole point is to replace that Redis fallback with the new GcpPubSubQueueEndpointResourceDelegate, so the assertion has to match the new behavior. This was the last failing test (1 of 751 in run_unit_tests_server). All other unit tests, integration_tests, run_unit_tests_python_client, build_docs, build_image, and Socket Security checks are green. --- model-engine/tests/unit/api/test_dependencies.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/model-engine/tests/unit/api/test_dependencies.py b/model-engine/tests/unit/api/test_dependencies.py index a2712827..62c535ce 100644 --- a/model-engine/tests/unit/api/test_dependencies.py +++ b/model-engine/tests/unit/api/test_dependencies.py @@ -8,8 +8,8 @@ GCSFilesystemGateway, GCSLLMArtifactGateway, ) -from model_engine_server.infra.gateways.resources.redis_queue_endpoint_resource_delegate import ( - RedisQueueEndpointResourceDelegate, +from model_engine_server.infra.gateways.resources.gcp_pubsub_queue_endpoint_resource_delegate import ( + GcpPubSubQueueEndpointResourceDelegate, ) from model_engine_server.infra.repositories import ( GARDockerRepository, @@ -119,7 +119,7 @@ def test_gcp_provider_selects_gcp_implementations(): ) assert isinstance( external_interfaces.resource_gateway.queue_delegate, - RedisQueueEndpointResourceDelegate, + GcpPubSubQueueEndpointResourceDelegate, ) From 3711540f8197d975e88e5ce4713788dc28be25bc Mon Sep 17 00:00:00 2001 From: Po Stevanus Andrianta Date: Wed, 13 May 2026 22:39:05 +0100 Subject: [PATCH 12/12] ci: retrigger integration tests (sync endpoint readiness flake)