diff --git a/charts/model-engine/templates/service_template_config_map.yaml b/charts/model-engine/templates/service_template_config_map.yaml index ed2d61655..0edb0b880 100644 --- a/charts/model-engine/templates/service_template_config_map.yaml +++ b/charts/model-engine/templates/service_template_config_map.yaml @@ -408,6 +408,149 @@ data: {{- end }} {{- end }} {{- end }} + {{- range $device := tuple "cpu" "gpu" }} + deployment-runnable-image-temporal-{{ $device }}.yaml: |- + apiVersion: apps/v1 + kind: Deployment + metadata: + name: ${RESOURCE_NAME} + namespace: ${NAMESPACE} + labels: + {{- $service_template_labels | nindent 8 }} + annotations: + temporal.scaleml.io/taskQueue: "${TEMPORAL_TASK_QUEUE}" + temporal.scaleml.io/minWorkers: "${MIN_WORKERS}" + temporal.scaleml.io/maxWorkers: "${MAX_WORKERS}" + temporal.scaleml.io/perWorker: "${PER_WORKER}" + spec: + strategy: + type: RollingUpdate + rollingUpdate: + maxSurge: 1 + maxUnavailable: 0 + replicas: ${MAX_WORKERS} + selector: + matchLabels: + app: ${RESOURCE_NAME} + version: v1 + template: + metadata: + labels: + app: ${RESOURCE_NAME} + {{- $service_template_labels | nindent 12 }} + sidecar.istio.io/inject: "false" + version: v1 + annotations: + ad.datadoghq.com/main.logs: '[{"service": "${ENDPOINT_NAME}", "source": "python"}]' + kubernetes.io/change-cause: "${CHANGE_CAUSE_MESSAGE}" + spec: + affinity: + {{- include "modelEngine.serviceTemplateAffinity" . | nindent 12 }} + terminationGracePeriodSeconds: 1800 + {{- if $service_template_service_account_name }} + serviceAccount: {{ $service_template_service_account_name }} + {{- else }} + serviceAccount: {{ $launch_name }} + {{- end }} + {{- with $node_selector }} + nodeSelector: + {{- toYaml . | nindent 12 }} + {{- end }} + {{- if eq $device "gpu" }} + {{- if empty $node_selector }} + nodeSelector: + {{- end }} + k8s.amazonaws.com/accelerator: ${GPU_TYPE} + tolerations: + - key: "nvidia.com/gpu" + operator: "Exists" + effect: "NoSchedule" + {{- range $gpu_tolerations }} + - key: {{ .key | quote }} + operator: "Exists" + effect: "NoSchedule" + {{- end }} + {{- end }} + priorityClassName: ${PRIORITY} + containers: + - name: main + {{- with $security_context }} + securityContext: + {{- toYaml . | nindent 16 }} + {{- end }} + image: ${IMAGE} + imagePullPolicy: IfNotPresent + command: ${COMMAND} + env: ${MAIN_ENV} + resources: + requests: + {{- if eq $device "gpu" }} + nvidia.com/gpu: ${GPUS} + {{- end }} + cpu: ${CPUS} + memory: ${MEMORY} + ${STORAGE_DICT} + limits: + {{- if eq $device "gpu" }} + nvidia.com/gpu: ${GPUS} + {{- end }} + cpu: ${CPUS} + memory: ${MEMORY} + ${STORAGE_DICT} + volumeMounts: + {{- if $require_aws_config }} + - name: aws-config-volume + mountPath: /opt/.aws/config + subPath: config + {{- end }} + {{- if eq $device "gpu" }} + - mountPath: /dev/shm + name: dshm + {{- end }} + - name: user-config + mountPath: /app/user_config + subPath: raw_data + - name: endpoint-config + mountPath: /app/endpoint_config + subPath: raw_data + {{- if $config_values }} + - name: infra-service-config-volume + mountPath: ${INFRA_SERVICE_CONFIG_VOLUME_MOUNT_PATH} + {{- end }} + {{- if $security_context }} + securityContext: + fsGroup: 65534 + {{- end }} + volumes: + {{- if $require_aws_config }} + - name: aws-config-volume + configMap: + {{- if $service_template_aws_config_map_name }} + name: {{ $service_template_aws_config_map_name }} + {{- else }} + name: {{ $aws_config_map_name }} + {{- end }} + {{- end }} + - name: user-config + configMap: + name: ${RESOURCE_NAME} + - name: endpoint-config + configMap: + name: ${RESOURCE_NAME}-endpoint-config + {{- if eq $device "gpu" }} + - name: dshm + emptyDir: + medium: Memory + {{- end }} + {{- if $config_values }} + - name: infra-service-config-volume + configMap: + name: {{ $launch_name }}-service-config + items: + - key: infra_service_config + path: config.yaml + {{- end }} + {{- end }} user-config.yaml: |- apiVersion: v1 kind: ConfigMap diff --git a/charts/model-engine/values_sample.yaml b/charts/model-engine/values_sample.yaml index 301fca890..eee15c54b 100644 --- a/charts/model-engine/values_sample.yaml +++ b/charts/model-engine/values_sample.yaml @@ -24,7 +24,7 @@ celery_broker_type_redis: null # - ALL # tag [required] is the LLM Engine docker image tag -tag: e360bfb1d21d9d4e7b7fcb6b29ca752095b4d0f4 +tag: 2e9d00786419ef44ec5c9d3305d8d6451d6aabfb # context is a user-specified deployment tag. Can be used to context: production image: diff --git a/docs/patch_temporal_configmap.py b/docs/patch_temporal_configmap.py new file mode 100755 index 000000000..2f3ec2bad --- /dev/null +++ b/docs/patch_temporal_configmap.py @@ -0,0 +1,311 @@ +#!/usr/bin/env python3 +""" +Patch the model-engine-service-template-config ConfigMap on ml-serving-new to add +temporal CPU and GPU deployment templates. + +Usage: + python3 docs/patch_temporal_configmap.py + +Requires: kubectl context pointing at ml-serving-new. +""" + +import json +import subprocess + +TEMPORAL_CPU_TEMPLATE = """\ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ${RESOURCE_NAME} + namespace: ${NAMESPACE} + labels: + user_id: ${OWNER} + team: ${TEAM} + product: ${PRODUCT} + created_by: ${CREATED_BY} + owner: ${OWNER} + env: prod + managed-by: model-engine + use_scale_launch_endpoint_network_policy: "true" + tags.datadoghq.com/env: prod + tags.datadoghq.com/version: ${GIT_TAG} + tags.datadoghq.com/service: ${ENDPOINT_NAME} + endpoint_id: ${ENDPOINT_ID} + endpoint_name: ${ENDPOINT_NAME} + annotations: + temporal.scaleml.io/taskQueue: "${TEMPORAL_TASK_QUEUE}" + temporal.scaleml.io/minWorkers: "${MIN_WORKERS}" + temporal.scaleml.io/maxWorkers: "${MAX_WORKERS}" + temporal.scaleml.io/perWorker: "${PER_WORKER}" +spec: + strategy: + type: RollingUpdate + rollingUpdate: + maxSurge: 1 + maxUnavailable: 0 + replicas: ${MAX_WORKERS} + selector: + matchLabels: + app: ${RESOURCE_NAME} + version: v1 + template: + metadata: + labels: + app: ${RESOURCE_NAME} + user_id: ${OWNER} + team: ${TEAM} + product: ${PRODUCT} + created_by: ${CREATED_BY} + owner: ${OWNER} + env: prod + managed-by: model-engine + use_scale_launch_endpoint_network_policy: "true" + tags.datadoghq.com/env: prod + tags.datadoghq.com/version: ${GIT_TAG} + tags.datadoghq.com/service: ${ENDPOINT_NAME} + endpoint_id: ${ENDPOINT_ID} + endpoint_name: ${ENDPOINT_NAME} + sidecar.istio.io/inject: "false" + version: v1 + annotations: + ad.datadoghq.com/main.logs: '[{"service": "${ENDPOINT_NAME}", "source": "python"}]' + kubernetes.io/change-cause: "${CHANGE_CAUSE_MESSAGE}" + spec: + affinity: + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: app + operator: In + values: + - ${RESOURCE_NAME} + topologyKey: kubernetes.io/hostname + - weight: 100 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: ${IMAGE_HASH} + operator: In + values: + - "True" + topologyKey: kubernetes.io/hostname + terminationGracePeriodSeconds: 1800 + serviceAccount: ml-worker + nodeSelector: + node-lifecycle: normal + priorityClassName: ${PRIORITY} + containers: + - name: main + image: ${IMAGE} + imagePullPolicy: IfNotPresent + command: ${COMMAND} + env: ${MAIN_ENV} + resources: + requests: + cpu: ${CPUS} + memory: ${MEMORY} + ${STORAGE_DICT} + limits: + cpu: ${CPUS} + memory: ${MEMORY} + ${STORAGE_DICT} + volumeMounts: + - name: user-config + mountPath: /app/user_config + subPath: raw_data + - name: endpoint-config + mountPath: /app/endpoint_config + subPath: raw_data + - name: infra-service-config-volume + mountPath: ${INFRA_SERVICE_CONFIG_VOLUME_MOUNT_PATH} + securityContext: + fsGroup: 65534 + volumes: + - name: user-config + configMap: + name: ${RESOURCE_NAME} + - name: endpoint-config + configMap: + name: ${RESOURCE_NAME}-endpoint-config + - name: infra-service-config-volume + configMap: + name: model-engine-service-config + items: + - key: infra_service_config + path: config.yaml""" + +TEMPORAL_GPU_TEMPLATE = """\ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ${RESOURCE_NAME} + namespace: ${NAMESPACE} + labels: + user_id: ${OWNER} + team: ${TEAM} + product: ${PRODUCT} + created_by: ${CREATED_BY} + owner: ${OWNER} + env: prod + managed-by: model-engine + use_scale_launch_endpoint_network_policy: "true" + tags.datadoghq.com/env: prod + tags.datadoghq.com/version: ${GIT_TAG} + tags.datadoghq.com/service: ${ENDPOINT_NAME} + endpoint_id: ${ENDPOINT_ID} + endpoint_name: ${ENDPOINT_NAME} + annotations: + temporal.scaleml.io/taskQueue: "${TEMPORAL_TASK_QUEUE}" + temporal.scaleml.io/minWorkers: "${MIN_WORKERS}" + temporal.scaleml.io/maxWorkers: "${MAX_WORKERS}" + temporal.scaleml.io/perWorker: "${PER_WORKER}" +spec: + strategy: + type: RollingUpdate + rollingUpdate: + maxSurge: 1 + maxUnavailable: 0 + replicas: ${MAX_WORKERS} + selector: + matchLabels: + app: ${RESOURCE_NAME} + version: v1 + template: + metadata: + labels: + app: ${RESOURCE_NAME} + user_id: ${OWNER} + team: ${TEAM} + product: ${PRODUCT} + created_by: ${CREATED_BY} + owner: ${OWNER} + env: prod + managed-by: model-engine + use_scale_launch_endpoint_network_policy: "true" + tags.datadoghq.com/env: prod + tags.datadoghq.com/version: ${GIT_TAG} + tags.datadoghq.com/service: ${ENDPOINT_NAME} + endpoint_id: ${ENDPOINT_ID} + endpoint_name: ${ENDPOINT_NAME} + sidecar.istio.io/inject: "false" + version: v1 + annotations: + ad.datadoghq.com/main.logs: '[{"service": "${ENDPOINT_NAME}", "source": "python"}]' + kubernetes.io/change-cause: "${CHANGE_CAUSE_MESSAGE}" + spec: + affinity: + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: app + operator: In + values: + - ${RESOURCE_NAME} + topologyKey: kubernetes.io/hostname + - weight: 100 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: ${IMAGE_HASH} + operator: In + values: + - "True" + topologyKey: kubernetes.io/hostname + terminationGracePeriodSeconds: 1800 + serviceAccount: ml-worker + nodeSelector: + node-lifecycle: normal + k8s.amazonaws.com/accelerator: ${GPU_TYPE} + tolerations: + - key: "nvidia.com/gpu" + operator: "Exists" + effect: "NoSchedule" + priorityClassName: ${PRIORITY} + containers: + - name: main + image: ${IMAGE} + imagePullPolicy: IfNotPresent + command: ${COMMAND} + env: ${MAIN_ENV} + resources: + requests: + nvidia.com/gpu: ${GPUS} + cpu: ${CPUS} + memory: ${MEMORY} + ${STORAGE_DICT} + limits: + nvidia.com/gpu: ${GPUS} + cpu: ${CPUS} + memory: ${MEMORY} + ${STORAGE_DICT} + volumeMounts: + - mountPath: /dev/shm + name: dshm + - name: user-config + mountPath: /app/user_config + subPath: raw_data + - name: endpoint-config + mountPath: /app/endpoint_config + subPath: raw_data + - name: infra-service-config-volume + mountPath: ${INFRA_SERVICE_CONFIG_VOLUME_MOUNT_PATH} + securityContext: + fsGroup: 65534 + volumes: + - name: dshm + emptyDir: + medium: Memory + - name: user-config + configMap: + name: ${RESOURCE_NAME} + - name: endpoint-config + configMap: + name: ${RESOURCE_NAME}-endpoint-config + - name: infra-service-config-volume + configMap: + name: model-engine-service-config + items: + - key: infra_service_config + path: config.yaml""" + + +def main(): + result = subprocess.run( + [ + "kubectl", + "get", + "configmap", + "model-engine-service-template-config", + "-n", + "default", + "-o", + "json", + ], + capture_output=True, + text=True, + check=True, + ) + cm = json.loads(result.stdout) + cm["data"]["deployment-runnable-image-temporal-cpu.yaml"] = TEMPORAL_CPU_TEMPLATE + cm["data"]["deployment-runnable-image-temporal-gpu.yaml"] = TEMPORAL_GPU_TEMPLATE + + result = subprocess.run( + ["kubectl", "apply", "-f", "-", "-n", "default"], + input=json.dumps(cm), + capture_output=True, + text=True, + ) + print(result.stdout or result.stderr) + + keys = [k for k in cm["data"] if "temporal" in k] + print(f"Temporal template keys added: {keys}") + + +if __name__ == "__main__": + main() diff --git a/docs/temporal-endpoint-e2e-test.md b/docs/temporal-endpoint-e2e-test.md new file mode 100644 index 000000000..8d332ab0d --- /dev/null +++ b/docs/temporal-endpoint-e2e-test.md @@ -0,0 +1,342 @@ +# Temporal Endpoint Type — E2E Test + +**Date:** 2026-04-25 +**Cluster:** `ml-serving-new` (us-west-2, account 759270588401) +**Branch:** `lilyz-ai/temporal-endpoint-type` +**Commit:** `0efddf2f` + +--- + +## Prerequisites + +- `kubectl` context pointing at `ml-serving-new` +- AWS SSO refreshed: `aws sso login --profile ml-serving-admin` +- Docker logged in to internal ECR (account 692474966980) + +--- + +## Step 1 — Build and push a patched image + +The production image (`model-engine-internal:`) is built from an internal private repo, not the public `scaleapi/llm-engine` Dockerfile. To test code changes, layer the updated Python files on top of the most recent production image: + +```bash +# Find the most recent production image +AWS_PROFILE=ml-admin aws ecr describe-images \ + --repository-name model-engine-internal \ + --region us-west-2 \ + --query 'sort_by(imageDetails, &imagePushedAt)[-5:].{tag:imageTags[0],pushed:imagePushedAt}' \ + --output table + +BASE_SHA= # e.g. 60e0027d0ef39afafa354574a756864de0db7a04 +MY_SHA=$(git rev-parse HEAD) + +# Copy updated source into build context +mkdir -p /tmp/buildctx +cp -r model-engine/model_engine_server /tmp/buildctx/model_engine_server + +cat > /tmp/buildctx/Dockerfile << EOF +FROM 692474966980.dkr.ecr.us-west-2.amazonaws.com/model-engine-internal:${BASE_SHA} +COPY model_engine_server/ /workspace/model-engine/model_engine_server/ +EOF + +# Login, build, push +AWS_PROFILE=ml-admin aws ecr get-login-password --region us-west-2 \ + | docker login --username AWS --password-stdin 692474966980.dkr.ecr.us-west-2.amazonaws.com + +docker build /tmp/buildctx \ + -t 692474966980.dkr.ecr.us-west-2.amazonaws.com/model-engine-internal:${MY_SHA}-patch + +docker push 692474966980.dkr.ecr.us-west-2.amazonaws.com/model-engine-internal:${MY_SHA}-patch +``` + +> **Note:** If `live_tokenizer_repository.py` imports `from huggingface_hub.errors import RepositoryNotFoundError` and the base image has an older `huggingface_hub`, patch the import before building: +> ```bash +> sed -i 's|from huggingface_hub.errors import|try:\n from huggingface_hub.errors import|' \ +> /tmp/buildctx/model_engine_server/infra/repositories/live_tokenizer_repository.py +> # add fallback line manually or use the try/except pattern +> ``` + +--- + +## Step 2 — Deploy the patched image + +```bash +MY_SHA=$(git rev-parse HEAD) +PATCH_TAG="${MY_SHA}-patch" +ECR="692474966980.dkr.ecr.us-west-2.amazonaws.com/model-engine-internal" + +kubectl set image deployment/model-engine-endpoint-builder \ + model-engine-endpoint-builder=${ECR}:${PATCH_TAG} -n default + +kubectl set image deployment/model-engine \ + model-engine=${ECR}:${PATCH_TAG} -n default + +kubectl rollout status deployment/model-engine-endpoint-builder -n default --timeout=300s +kubectl rollout status deployment/model-engine -n default --timeout=300s +``` + +--- + +## Step 3 — Patch the service-template ConfigMap + +The production ConfigMap does not yet include the temporal templates (until the Helm chart is upgraded). Patch it manually: + +```bash +kubectl get configmap model-engine-service-template-config -n default -o json \ + | python3 - << 'EOF' +import sys, json, subprocess + +cm = json.load(sys.stdin) + +# Paste the temporal-cpu template from +# model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml +# (keys: deployment-runnable-image-temporal-cpu.yaml, deployment-runnable-image-temporal-gpu.yaml) +# but update env/prod labels to match the production configmap format. +# +# See patch_temporal_configmap.py in this directory for a ready-made script. + +print(json.dumps(cm)) +EOF +``` + +A ready-made patch script is at [`docs/patch_temporal_configmap.py`](patch_temporal_configmap.py). Run it with: + +```bash +python3 docs/patch_temporal_configmap.py +``` + +Verify the two new keys are present: + +```bash +kubectl get configmap model-engine-service-template-config -n default \ + -o jsonpath='{.data}' \ + | python3 -c "import sys,json; d=json.load(sys.stdin); print([k for k in d if 'temporal' in k])" +# Expected: ['deployment-runnable-image-temporal-cpu.yaml', 'deployment-runnable-image-temporal-gpu.yaml'] +``` + +--- + +## Step 4 — Run the DB migration + +```bash +# The migration runs inside a model-engine pod (which reads DB connection from infra_config_prod.yaml) +GATEWAY_POD=$(kubectl get pods -n default -l app=model-engine --field-selector=status.phase=Running \ + -o jsonpath='{.items[0].metadata.name}') + +kubectl exec -n default "$GATEWAY_POD" -- bash -c \ + "cd /workspace/model-engine/model_engine_server/db/migrations && \ + python3 -m alembic -c alembic.ini upgrade head" +``` + +Expected output: +``` +INFO [alembic.runtime.migration] Running upgrade a1b2c3d4e5f6 -> b2c3d4e5f6g7, add temporal_task_queue column +``` + +> The migration adds a nullable `VARCHAR` column. It is backwards-compatible — existing pods reading the DB before migration simply do not select that column. + +--- + +## Step 5 — Create a test temporal endpoint + +```bash +# Get the test API key +TEST_KEY=$(AWS_PROFILE=ml-serving-admin aws secretsmanager get-secret-value \ + --secret-id launch_test_api_key --region us-west-2 \ + --query SecretString --output text \ + | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['api_key'])") +TEST_USER=$(AWS_PROFILE=ml-serving-admin aws secretsmanager get-secret-value \ + --secret-id launch_test_api_key --region us-west-2 \ + --query SecretString --output text \ + | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['user_id'])") + +GATEWAY_POD=$(kubectl get pods -n default -l app=model-engine --field-selector=status.phase=Running \ + -o jsonpath='{.items[0].metadata.name}') +kubectl port-forward "pod/$GATEWAY_POD" 8081:5000 -n default & + +# Create a model bundle +BUNDLE_ID=$(curl -s -X POST http://localhost:8081/v2/model-bundles \ + -u "${TEST_USER}:${TEST_KEY}" \ + -H "Content-Type: application/json" \ + -d '{ + "name": "test-temporal-bundle", + "schema_location": "s3://scale-ml/test/temporal/schema.json", + "flavor": { + "flavor": "runnable_image", + "repository": "public.ecr.aws/ubuntu/ubuntu", + "tag": "22.04", + "command": ["sleep", "3600"], + "protocol": "http", + "readiness_initial_delay_seconds": 30 + }, + "metadata": {} + }' | python3 -c "import sys,json; print(json.load(sys.stdin)['model_bundle_id'])") + +echo "Bundle: $BUNDLE_ID" + +# Create a temporal endpoint +curl -s -X POST http://localhost:8081/v1/model-endpoints \ + -u "${TEST_USER}:${TEST_KEY}" \ + -H "Content-Type: application/json" \ + -d "{ + \"name\": \"test-temporal-endpoint\", + \"endpoint_type\": \"temporal\", + \"temporal_task_queue\": \"my-test-task-queue\", + \"model_bundle_id\": \"${BUNDLE_ID}\", + \"metadata\": {}, + \"cpus\": 0.5, + \"gpus\": 0, + \"memory\": \"1Gi\", + \"storage\": \"1Gi\", + \"min_workers\": 0, + \"max_workers\": 2, + \"per_worker\": 1, + \"labels\": {\"team\": \"infra\", \"product\": \"launch\"} + }" | python3 -c "import sys,json; print(json.dumps(json.load(sys.stdin), indent=2))" +``` + +--- + +## Step 6 — Verify the K8s Deployment + +```bash +# Find the endpoint ID +ENDPOINT_ID=$(curl -s http://localhost:8081/v1/model-endpoints \ + -u "${TEST_USER}:${TEST_KEY}" \ + | python3 -c " +import sys,json +eps=[e for e in json.load(sys.stdin)['model_endpoints'] if 'temporal' in e['name']] +print(eps[0]['id'])") + +# Find the deployment name from the endpoint +DEPLOY=$(kubectl get deployments -n scale-deploy \ + | grep "$ENDPOINT_ID" | awk '{print $1}') + +kubectl get deployment "$DEPLOY" -n scale-deploy -o json | python3 -c " +import sys,json +d=json.load(sys.stdin) +meta=d['metadata'] +spec=d['spec'] +containers=spec['template']['spec']['containers'] +print('=== ANNOTATIONS ===') +for k,v in meta.get('annotations',{}).items(): + if 'temporal' in k: + print(f' {k}: {v}') +print('=== REPLICAS:', spec['replicas'], '===') +print('=== CONTAINERS:', len(containers), '(expected: 1) ===') +for c in containers: + print(' name:', c['name']) +print('=== TEMPORAL ENV VARS ===') +for c in containers: + for e in c.get('env', []): + if 'TEMPORAL' in e.get('name',''): + print(f\" {e['name']}: {e.get('value', '')}\") +" +``` + +--- + +## Step 7 — Cleanup + +```bash +# Delete the test endpoint +curl -s -X DELETE "http://localhost:8081/v1/model-endpoints/${ENDPOINT_ID}" \ + -u "${TEST_USER}:${TEST_KEY}" + +# Revert model-engine and builder to stable production image +PROD_SHA=f395ffa6bdcf9c954f1073469a2a25c7b3351af8 +ECR="692474966980.dkr.ecr.us-west-2.amazonaws.com/model-engine-internal" +kubectl set image deployment/model-engine model-engine=${ECR}:${PROD_SHA} -n default +kubectl set image deployment/model-engine-endpoint-builder \ + model-engine-endpoint-builder=${ECR}:${PROD_SHA} -n default + +kill %1 # kill port-forward +``` + +--- + +## Test Results (2026-04-25) + +### Environment + +| Item | Value | +|------|-------| +| Cluster | `ml-serving-new` (arn:aws:eks:us-west-2:759270588401:cluster/ml-serving-new) | +| Base production image | `model-engine-internal:60e0027d0ef39afafa354574a756864de0db7a04` (2026-03-30) | +| Patched image | `model-engine-internal:04729cef1678fe09ffda9855624579152a8fae3e-patch4` | +| DB | `ml-infra-prod.cluster-cuby7rtblks1.us-west-2.rds.amazonaws.com` | + +### API — Endpoint Creation + +``` +POST /v1/model-endpoints + endpoint_type: "temporal" + temporal_task_queue: "my-test-task-queue" + max_workers: 2 + gpus: 0 + +→ HTTP 200 {"endpoint_creation_task_id": "68cd17fd-70c0-46a6-a31e-4ee5fe2a676f"} +``` + +### API — Endpoint Status (after ~3 seconds) + +```json +{ + "id": "end_d7m0o91ll55003e7d0n0", + "name": "test-temporal-endpoint", + "endpoint_type": "temporal", + "status": "READY", + "deployment_state": { + "min_workers": 0, + "max_workers": 2, + "per_worker": 1, + "concurrent_requests_per_worker": 1 + } +} +``` + +### K8s Deployment + +``` +NAME: launch-endpoint-id-end-d7m0o91ll55003e7d0n0 +NAMESPACE: scale-deploy +REPLICAS: 2 (= max_workers ✓) +CONTAINERS: 1 (no forwarder sidecar ✓) +``` + +**Annotations:** + +| Annotation | Value | +|------------|-------| +| `temporal.scaleml.io/taskQueue` | `my-test-task-queue` ✓ | +| `temporal.scaleml.io/minWorkers` | `0` ✓ | +| `temporal.scaleml.io/maxWorkers` | `2` ✓ | +| `temporal.scaleml.io/perWorker` | `1` ✓ | + +**Env vars injected into `main` container:** + +| Env var | Value | +|---------|-------| +| `TEMPORAL_TASK_QUEUE` | `my-test-task-queue` ✓ | +| `TEMPORAL_SERVER_HOSTNAME` | *(from infra config)* ✓ | +| `TEMPORAL_SERVER_PORT` | `7233` ✓ | + +### DB Migration + +``` +INFO [alembic.runtime.migration] Running upgrade a1b2c3d4e5f6 -> b2c3d4e5f6g7, + add temporal_task_queue column +``` + +Migration applied cleanly. No downtime for existing endpoints. + +### Issues Found and Fixed During Test + +| Issue | Fix | +|-------|-----| +| Helm template error: `$security_context \| nindent` on map type | Use `with + toYaml` pattern (same as other templates) — committed in `0efddf2f` | +| `huggingface_hub.errors` not found on older base images | Add try/except fallback import in build context (not committed — affects e2e test setup only, not shipped code) | + +### Cleanup + +Test endpoint deleted. Builder and gateway reverted to production image `f395ffa6bdcf9c954f1073469a2a25c7b3351af8`. diff --git a/docs/temporal-endpoint-type-proposal.md b/docs/temporal-endpoint-type-proposal.md new file mode 100644 index 000000000..3dfdead03 --- /dev/null +++ b/docs/temporal-endpoint-type-proposal.md @@ -0,0 +1,224 @@ +# Proposal: `temporal` Endpoint Type in Launch + +**Author:** lily.zhu@scale.com +**Status:** Implemented (MVP) +**Ticket:** MLI-6425 +**Branch:** `lilyz-ai/temporal-endpoint-type` + +--- + +## Problem + +Teams running multi-step GPU pipelines (e.g. robotics ego hand keypoints: SVO processing → Dyn-HaMR → hand annotations) need durable, retryable orchestration across heterogeneous GPU types. Today they have two options: + +1. **Launch async endpoints (Celery)** — Launch manages the pods, but Celery gives no cross-step durability. If the H100 pod running Dyn-HaMR crashes mid-run, the whole pipeline restarts from step 1. +2. **Raw K8s Deployments via std-ml-srv** — bypasses Launch entirely; teams lose unified deployment API, GPU scheduling integration, and Launch dashboards. + +Temporal solves the durability problem: if a pod crashes mid-activity, Temporal retries *only that activity* from the last heartbeat. Phases already completed are not re-run. + +The gap is that Launch has no native way to create and manage Temporal activity worker pods. Teams either re-implement the same ~80-line Temporal worker boilerplate per service, or deploy outside Launch. + +--- + +## Proposed Solution + +Add `temporal` as a fourth endpoint type in `ModelEndpointType`. A `temporal` endpoint is a K8s Deployment whose pods connect to Temporal server and pull activity tasks from a named task queue — instead of polling a Celery queue. + +**What Launch manages:** +- Pod lifecycle (create / update / delete) +- GPU scheduling and node selection +- Scaling (fixed replicas in MVP; Temporal-aware autoscaling as follow-up) +- Unified visibility in Launch dashboards + +**What Launch does not touch:** +- Task submission (the Temporal workflow dispatches activities directly) +- Result routing (Temporal handles activity results) +- `/v1/async-tasks` API (not applicable for `temporal` endpoints) + +--- + +## API Changes + +### `POST /v1/model-endpoints` + +New field on `CreateModelEndpointV1Request`: + +```python +temporal_task_queue: Optional[str] +# Required when endpoint_type="temporal". +# The Temporal task queue that workers will poll. +# Example: "robotics-hand-keypoints-temporal" +``` + +Example request: + +```json +{ + "name": "hand-keypoints-temporal", + "endpoint_type": "temporal", + "temporal_task_queue": "robotics-hand-keypoints-temporal", + "gpus": 1, + "gpu_type": "nvidia-hopper-h100", + "cpus": 20, + "memory": "200Gi", + "storage": "250Gi", + "min_workers": 0, + "max_workers": 10, + "per_worker": 1, + "model_bundle_id": "...", + "labels": {"team": "robotics", "product": "ego"} +} +``` + +The `model_bundle_id` points to a bundle whose command runs the Temporal activity worker (e.g. `python -m ml_serve.exe.run_service --task-queue robotics-hand-keypoints-temporal`). + +--- + +## Implementation — What Was Built + +### Files Changed + +| File | Change | +|------|--------| +| `domain/entities/model_endpoint_entity.py` | Add `TEMPORAL = "temporal"` to `ModelEndpointType` enum; add `temporal_task_queue` field to `ModelEndpointRecord` | +| `common/dtos/model_endpoints.py` | Add `temporal_task_queue` to Create/Update request DTOs with validator requiring it for temporal endpoints | +| `domain/use_cases/model_endpoint_use_cases.py` | Allow `min_workers=0` for TEMPORAL (same as ASYNC) | +| `domain/services/model_endpoint_service.py` | Thread `temporal_task_queue` through abstract create/update signatures | +| `db/models/hosted_model_inference.py` | Add `temporal_task_queue` nullable column to ORM model | +| `db/migrations/alembic/versions/2026_04_24_0000-b2c3d4e5f6g7_add_temporal_task_queue_column.py` | Alembic migration: `ALTER TABLE hosted_model_inference.endpoints ADD COLUMN temporal_task_queue VARCHAR` | +| `infra/repositories/model_endpoint_record_repository.py` | Add `temporal_task_queue` to abstract create/update signatures | +| `infra/repositories/db_model_endpoint_record_repository.py` | Persist and read `temporal_task_queue` from DB | +| `infra/services/live_model_endpoint_service.py` | Thread `temporal_task_queue` through create/update | +| `infra/gateways/resources/k8s_resource_types.py` | Add `DeploymentRunnableImageTemporalCpuArguments` and `...GpuArguments` TypedDicts; add temporal branch in `get_endpoint_resource_arguments_from_request` | +| `infra/gateways/resources/k8s_endpoint_resource_delegate.py` | Route temporal to new templates; add `_get_temporal_autoscaling_params` (reads annotations); skip SQS in delete | +| `infra/gateways/resources/live_endpoint_resource_gateway.py` | Skip SQS queue creation/deletion for TEMPORAL | +| `infra/gateways/resources/templates/service_template_config_map_circleci.yaml` | Add `deployment-runnable-image-temporal-{cpu,gpu}.yaml` templates | +| `charts/model-engine/templates/service_template_config_map.yaml` | Add same templates via Helm `range` loop for prod | + + +--- + +## Template Structure + +### CPU template (`deployment-runnable-image-temporal-cpu.yaml`) + +```yaml +metadata: + annotations: + temporal.scaleml.io/taskQueue: "${TEMPORAL_TASK_QUEUE}" + temporal.scaleml.io/minWorkers: "${MIN_WORKERS}" + temporal.scaleml.io/maxWorkers: "${MAX_WORKERS}" + temporal.scaleml.io/perWorker: "${PER_WORKER}" +spec: + replicas: ${MAX_WORKERS} # fixed in MVP + template: + metadata: + labels: + sidecar.istio.io/inject: "false" # no HTTP traffic + spec: + containers: + - name: main + env: ${MAIN_ENV} # includes TEMPORAL_TASK_QUEUE, + # TEMPORAL_SERVER_HOSTNAME, + # TEMPORAL_SERVER_PORT + # no readiness probe + # no forwarder sidecar +``` + +### GPU template adds: +- `nodeSelector: k8s.amazonaws.com/accelerator: ${GPU_TYPE}` +- `tolerations: nvidia.com/gpu` +- `resources.requests/limits: nvidia.com/gpu: ${GPUS}` +- `/dev/shm` emptyDir volume (dshm) + +--- + +## Phase 2 — Temporal-aware autoscaling (follow-up) + +Scale worker replicas based on Temporal task queue backlog. Options: + +1. **KEDA `temporal` trigger** — KEDA has a [Temporal scaler](https://keda.sh/docs/scalers/temporal/) that polls `GetTaskQueueStats`. Lowest implementation cost; reuses existing KEDA infrastructure. +2. **Custom autoscaler** — mirrors the existing Celery autoscaler pattern but polls Temporal's gRPC API. + +Recommendation: KEDA Temporal trigger. Annotation format: +```yaml +temporal.keda.sh/task-queue: "${TEMPORAL_TASK_QUEUE}" +temporal.keda.sh/namespace: "default" +temporal.keda.sh/targetQueueSize: "${PER_WORKER}" +``` + +--- + +## What Changes in Caller Code (ego example) + +Before — each service writes ~80 lines of custom Temporal boilerplate: +```python +# launch_hand_keypoints/temporal_worker.py (custom, per-service) +_predict = load_predict_fn(...) + +@activity.defn(name="handKeypointsActivity") +async def hand_keypoints_activity(inp): + heartbeat_task = loop.create_task(_heartbeat_loop()) # manual + ... + +async def main(): + client = await Client.connect(...) # manual + worker = Worker(client, task_queue=..., ...) # manual + await worker.run() +``` + +After — service implements one method; Launch manages the rest: +```python +# launch_hand_keypoints/service.py +class HandKeypointsService(ModelServiceApi): + def handle(self, req: dict) -> dict: + result = self._predict(HandKeypointsRequest(**req)) + return {"hands_npz_url": ..., "track_info_url": ...} +``` + +```bash +# Deploy via Launch API (same as any other endpoint) +launch create-endpoint \ + --name hand-keypoints-temporal \ + --endpoint-type temporal \ + --temporal-task-queue robotics-hand-keypoints-temporal \ + --gpu-type h100 --gpus 1 \ + --min-workers 0 --max-workers 10 +``` + +--- + +## Open Questions — Resolved + +1. **Readiness probe** — Omitted. Temporal workers have no HTTP endpoint. The pod is considered ready as soon as the container starts. Worker images may optionally add a health check sidecar in the future. + +2. **Task submission API** — Out of scope for Phase 1. Launch does not expose a way to start a Temporal workflow. The Temporal workflow is the caller. + +3. **Namespace** — Defaults to `"default"` via `hmi_config.temporal_server_namespace` (with empty-string fallback). Not exposed in the API in Phase 1. + +4. **Multi-queue workers** — Out of scope. Each endpoint maps to exactly one task queue. + +--- + +## What This Is Not + +- **Not a task submission API.** Launch does not expose `/v1/async-tasks` for `temporal` endpoints. The Temporal workflow is the caller; Launch only manages the worker pods. +- **Not a workflow worker.** Launch manages activity workers only. The workflow definition lives in the caller's codebase and runs on a separate workflow worker (or Temporal Cloud). +- **Not a replacement for Celery async endpoints.** `async` endpoints remain the right choice for request/response workloads where the caller submits tasks via Launch's API. `temporal` is for multi-step pipelines where an external orchestrator coordinates the work. + +--- + +## Alternatives Considered + +| Option | Verdict | +|--------|---------| +| Raw K8s Deployment (std-ml-srv `deployment_template_TEMPORAL_gpu.yaml`) | Works today; loses Launch management. Good stopgap, not long-term. | +| Temporal orchestrates existing Launch async endpoints | Extra HTTP round-trip per phase; doesn't use Temporal activities properly. | +| Launch batch jobs per phase | `backoffLimit: 0`, cold start per request, no worker pool. Wrong tool. | +| Temporal Cloud | Doesn't change the worker management problem; workers still need to run somewhere. | + +--- + +## E2E Test + +See [`docs/temporal-endpoint-e2e-test.md`](temporal-endpoint-e2e-test.md) for the full test procedure and results. diff --git a/model-engine/model_engine_server/common/dtos/model_endpoints.py b/model-engine/model_engine_server/common/dtos/model_endpoints.py index eb1faa84f..221fb7de2 100644 --- a/model-engine/model_engine_server/common/dtos/model_endpoints.py +++ b/model-engine/model_engine_server/common/dtos/model_endpoints.py @@ -11,7 +11,7 @@ from typing import Any, Dict, List, Optional from model_engine_server.common.dtos.core import HttpUrlStr -from model_engine_server.common.pydantic_types import BaseModel, ConfigDict, Field +from model_engine_server.common.pydantic_types import BaseModel, ConfigDict, Field, model_validator from model_engine_server.domain.entities import ( CallbackAuth, CpuSpecificationType, @@ -84,6 +84,16 @@ class CreateModelEndpointV1Request(BaseModel): ge=1, description="For async endpoints, how long a task can wait in queue before expiring (in seconds). Default: 86400 (24 hours).", ) + temporal_task_queue: Optional[str] = Field( + default=None, + description="For temporal endpoints, the Temporal task queue that workers will poll.", + ) + + @model_validator(mode="after") + def validate_temporal_task_queue(self) -> "CreateModelEndpointV1Request": + if self.endpoint_type == ModelEndpointType.TEMPORAL and not self.temporal_task_queue: + raise ValueError("temporal_task_queue is required for temporal endpoints") + return self class CreateModelEndpointV1Response(BaseModel): @@ -122,6 +132,10 @@ class UpdateModelEndpointV1Request(BaseModel): ge=1, description="For async endpoints, how long a task can wait in queue before expiring (in seconds). Default: 86400 (24 hours).", ) + temporal_task_queue: Optional[str] = Field( + default=None, + description="For temporal endpoints, the Temporal task queue that workers will poll.", + ) class UpdateModelEndpointV1Response(BaseModel): diff --git a/model-engine/model_engine_server/db/migrations/alembic/versions/2026_04_24_0000-b2c3d4e5f6g7_add_temporal_task_queue_column.py b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_04_24_0000-b2c3d4e5f6g7_add_temporal_task_queue_column.py new file mode 100644 index 000000000..a411202d4 --- /dev/null +++ b/model-engine/model_engine_server/db/migrations/alembic/versions/2026_04_24_0000-b2c3d4e5f6g7_add_temporal_task_queue_column.py @@ -0,0 +1,31 @@ +"""add temporal_task_queue column + +Revision ID: b2c3d4e5f6g7 +Revises: a1b2c3d4e5f6 +Create Date: 2026-04-24 00:00:00.000000 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = 'b2c3d4e5f6g7' +down_revision = 'a1b2c3d4e5f6' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + 'endpoints', + sa.Column('temporal_task_queue', sa.String, nullable=True), + schema='hosted_model_inference', + ) + + +def downgrade() -> None: + op.drop_column( + 'endpoints', + 'temporal_task_queue', + schema='hosted_model_inference', + ) diff --git a/model-engine/model_engine_server/db/models/hosted_model_inference.py b/model-engine/model_engine_server/db/models/hosted_model_inference.py index 01f52a328..b887c9163 100644 --- a/model-engine/model_engine_server/db/models/hosted_model_inference.py +++ b/model-engine/model_engine_server/db/models/hosted_model_inference.py @@ -472,6 +472,8 @@ class Endpoint(Base): task_expires_seconds = Column(Integer, nullable=True) # Queue message visibility/lock timeout in seconds (SQS VisibilityTimeout / ASB lock_duration) queue_message_timeout_seconds = Column(Integer, nullable=True) + # Temporal task queue name for temporal endpoints + temporal_task_queue = Column(String, nullable=True) def __init__( self, @@ -488,6 +490,7 @@ def __init__( public_inference: Optional[bool] = False, task_expires_seconds: Optional[int] = None, queue_message_timeout_seconds: Optional[int] = None, + temporal_task_queue: Optional[str] = None, ): self.id = f"end_{get_xid()}" self.name = name @@ -502,6 +505,7 @@ def __init__( self.public_inference = public_inference self.task_expires_seconds = task_expires_seconds self.queue_message_timeout_seconds = queue_message_timeout_seconds + self.temporal_task_queue = temporal_task_queue @classmethod async def create(cls, session: AsyncSession, endpoint: "Endpoint") -> None: diff --git a/model-engine/model_engine_server/domain/entities/model_endpoint_entity.py b/model-engine/model_engine_server/domain/entities/model_endpoint_entity.py index a382593e1..ecc863357 100644 --- a/model-engine/model_engine_server/domain/entities/model_endpoint_entity.py +++ b/model-engine/model_engine_server/domain/entities/model_endpoint_entity.py @@ -22,6 +22,7 @@ class ModelEndpointType(str, Enum): ASYNC = "async" SYNC = "sync" STREAMING = "streaming" + TEMPORAL = "temporal" class ModelEndpointStatus(str, Enum): @@ -136,6 +137,7 @@ class ModelEndpointRecord(OwnedEntity): public_inference: Optional[bool] = None task_expires_seconds: Optional[int] = None queue_message_timeout_seconds: Optional[int] = None + temporal_task_queue: Optional[str] = None class ModelEndpointInfraState(BaseModel): diff --git a/model-engine/model_engine_server/domain/services/model_endpoint_service.py b/model-engine/model_engine_server/domain/services/model_endpoint_service.py index 9c523b689..996d12c0d 100644 --- a/model-engine/model_engine_server/domain/services/model_endpoint_service.py +++ b/model-engine/model_engine_server/domain/services/model_endpoint_service.py @@ -94,6 +94,7 @@ async def create_model_endpoint( public_inference: Optional[bool] = False, queue_message_timeout_seconds: Optional[int] = None, task_expires_seconds: Optional[int] = None, + temporal_task_queue: Optional[str] = None, ) -> ModelEndpointRecord: """ Creates a model endpoint. @@ -230,6 +231,7 @@ async def update_model_endpoint( public_inference: Optional[bool] = None, queue_message_timeout_seconds: Optional[int] = None, task_expires_seconds: Optional[int] = None, + temporal_task_queue: Optional[str] = None, ) -> ModelEndpointRecord: """ Updates a model endpoint. diff --git a/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py b/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py index c18a62a29..758b740e4 100644 --- a/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py +++ b/model-engine/model_engine_server/domain/use_cases/model_endpoint_use_cases.py @@ -116,7 +116,10 @@ def validate_deployment_resources( # TODO: we should be also validating the update request against the existing state in k8s (e.g. # so min_workers <= max_workers always) maybe this occurs already in update_model_endpoint. min_endpoint_size = ( - 0 if endpoint_type == ModelEndpointType.ASYNC or can_scale_http_endpoint_from_zero else 1 + 0 + if endpoint_type in {ModelEndpointType.ASYNC, ModelEndpointType.TEMPORAL} + or can_scale_http_endpoint_from_zero + else 1 ) if min_workers is not None and min_workers < min_endpoint_size: raise EndpointResourceInvalidRequestException( @@ -393,6 +396,7 @@ async def execute( public_inference=request.public_inference, queue_message_timeout_seconds=request.queue_message_timeout_seconds, task_expires_seconds=request.task_expires_seconds, + temporal_task_queue=request.temporal_task_queue, ) _handle_post_inference_hooks( created_by=user.user_id, @@ -523,6 +527,7 @@ async def execute( public_inference=request.public_inference, queue_message_timeout_seconds=request.queue_message_timeout_seconds, task_expires_seconds=request.task_expires_seconds, + temporal_task_queue=request.temporal_task_queue, ) _handle_post_inference_hooks( created_by=endpoint_record.created_by, diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py index 9c6de78c1..21718d188 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py @@ -420,7 +420,11 @@ async def delete_resources( self, endpoint_id: str, deployment_name: str, endpoint_type: ModelEndpointType ) -> bool: await maybe_load_kube_config() - if endpoint_type in {ModelEndpointType.SYNC, ModelEndpointType.STREAMING}: + if endpoint_type in { + ModelEndpointType.SYNC, + ModelEndpointType.STREAMING, + ModelEndpointType.TEMPORAL, + }: return await self._delete_resources_sync( endpoint_id=endpoint_id, deployment_name=deployment_name ) @@ -1872,6 +1876,7 @@ async def _create_or_update_resources( if model_endpoint_record.endpoint_type in { ModelEndpointType.SYNC, ModelEndpointType.STREAMING, + ModelEndpointType.TEMPORAL, }: return k8s_service_name elif model_endpoint_record.endpoint_type == ModelEndpointType.ASYNC: @@ -1898,6 +1903,18 @@ def _get_vertical_autoscaling_params( max_memory=str(policy["maxAllowed"]["memory"]), ) + @staticmethod + def _get_temporal_autoscaling_params( + deployment_config, + ) -> HorizontalAutoscalingEndpointParams: + metadata_annotations = deployment_config.metadata.annotations or {} + return dict( + min_workers=int(metadata_annotations.get("temporal.scaleml.io/minWorkers", 0)), + max_workers=int(metadata_annotations.get("temporal.scaleml.io/maxWorkers", 0)), + per_worker=int(metadata_annotations.get("temporal.scaleml.io/perWorker", 1)), + concurrent_requests_per_worker=1, + ) + @staticmethod def _get_async_autoscaling_params( deployment_config, @@ -2013,6 +2030,8 @@ async def _get_resources_from_deployment_type( common_params = self._get_common_endpoint_params(deployment_config) if endpoint_type == ModelEndpointType.ASYNC: horizontal_autoscaling_params = self._get_async_autoscaling_params(deployment_config) + elif endpoint_type == ModelEndpointType.TEMPORAL: + horizontal_autoscaling_params = self._get_temporal_autoscaling_params(deployment_config) elif endpoint_type in {ModelEndpointType.SYNC, ModelEndpointType.STREAMING}: autoscaling_client = get_kubernetes_autoscaling_client() custom_object_client = get_kubernetes_custom_objects_client() diff --git a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py index 2fca775ba..9d5971930 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py +++ b/model-engine/model_engine_server/infra/gateways/resources/k8s_resource_types.py @@ -38,6 +38,8 @@ "DeploymentRunnableImageStreamingGpuArguments", "DeploymentRunnableImageSyncCpuArguments", "DeploymentRunnableImageSyncGpuArguments", + "DeploymentRunnableImageTemporalCpuArguments", + "DeploymentRunnableImageTemporalGpuArguments", "DeploymentTritonEnhancedRunnableImageAsyncCpuArguments", "DeploymentTritonEnhancedRunnableImageAsyncGpuArguments", "DeploymentTritonEnhancedRunnableImageSyncCpuArguments", @@ -155,6 +157,22 @@ class _StreamingDeploymentArguments(TypedDict): FORWARDER_STREAM_ROUTES: List[str] +class _TemporalDeploymentArguments(TypedDict): + """Keyword-arguments for substituting into temporal deployment templates.""" + + TEMPORAL_TASK_QUEUE: str + TEMPORAL_SERVER_HOSTNAME: str + TEMPORAL_SERVER_PORT: str + + +class _TemporalRunnableImageDeploymentArguments(_BaseDeploymentArguments): + """Keyword-arguments for substituting into temporal runnable image deployment templates.""" + + MAIN_ENV: List[Dict[str, Any]] + COMMAND: List[str] + INFRA_SERVICE_CONFIG_VOLUME_MOUNT_PATH: str + + class _RunnableImageDeploymentArguments(_BaseDeploymentArguments): """Keyword-arguments for substituting into runnable image deployment templates.""" @@ -261,6 +279,18 @@ class DeploymentRunnableImageAsyncGpuArguments( """Keyword-arguments for substituting GPU async deployment templates for runnable images.""" +class DeploymentRunnableImageTemporalCpuArguments( + _TemporalRunnableImageDeploymentArguments, _TemporalDeploymentArguments +): + """Keyword-arguments for CPU temporal deployment templates for runnable images.""" + + +class DeploymentRunnableImageTemporalGpuArguments( + _TemporalRunnableImageDeploymentArguments, _TemporalDeploymentArguments, _GpuArguments +): + """Keyword-arguments for GPU temporal deployment templates for runnable images.""" + + class DeploymentTritonEnhancedRunnableImageSyncCpuArguments( _RunnableImageDeploymentArguments, _SyncRunnableImageDeploymentArguments, @@ -466,6 +496,8 @@ class VerticalAutoscalingEndpointParams(TypedDict): DeploymentRunnableImageStreamingGpuArguments, DeploymentRunnableImageSyncCpuArguments, DeploymentRunnableImageSyncGpuArguments, + DeploymentRunnableImageTemporalCpuArguments, + DeploymentRunnableImageTemporalGpuArguments, DeploymentTritonEnhancedRunnableImageAsyncCpuArguments, DeploymentTritonEnhancedRunnableImageAsyncGpuArguments, DeploymentTritonEnhancedRunnableImageSyncCpuArguments, @@ -1448,5 +1480,85 @@ def get_endpoint_resource_arguments_from_request( OWNER=owner, GIT_TAG=GIT_TAG, ) + elif endpoint_resource_name in { + "deployment-runnable-image-temporal-cpu", + "deployment-runnable-image-temporal-gpu", + }: + assert isinstance(flavor, RunnableImageLike) + temporal_task_queue = model_endpoint_record.temporal_task_queue or "" + temporal_hostname = getattr(hmi_config, "temporal_server_hostname", "") or "" + temporal_port = getattr(hmi_config, "temporal_server_port", "7233") or "7233" + temporal_main_env = list(main_env) + [ + {"name": "TEMPORAL_TASK_QUEUE", "value": temporal_task_queue}, + {"name": "TEMPORAL_SERVER_HOSTNAME", "value": temporal_hostname}, + {"name": "TEMPORAL_SERVER_PORT", "value": temporal_port}, + ] + if endpoint_resource_name == "deployment-runnable-image-temporal-cpu": + return DeploymentRunnableImageTemporalCpuArguments( + RESOURCE_NAME=k8s_resource_group_name, + NAMESPACE=hmi_config.endpoint_namespace, + ENDPOINT_ID=model_endpoint_record.id, + ENDPOINT_NAME=model_endpoint_record.name, + TEAM=team, + PRODUCT=product, + CREATED_BY=created_by, + OWNER=owner, + GIT_TAG=GIT_TAG, + CHANGE_CAUSE_MESSAGE=change_cause_message, + AWS_ROLE=build_endpoint_request.aws_role, + PRIORITY=priority, + IMAGE=request.image, + IMAGE_HASH=image_hash, + DD_TRACE_ENABLED=str(dd_trace_enabled), + CPUS=str(build_endpoint_request.cpus), + MEMORY=str(build_endpoint_request.memory), + STORAGE_DICT=storage_dict, + PER_WORKER=build_endpoint_request.per_worker, + MIN_WORKERS=build_endpoint_request.min_workers, + MAX_WORKERS=build_endpoint_request.max_workers, + CONCURRENT_REQUESTS_PER_WORKER=build_endpoint_request.concurrent_requests_per_worker, + RESULTS_S3_BUCKET=s3_bucket, + MAIN_ENV=temporal_main_env, + COMMAND=flavor.command, + INFRA_SERVICE_CONFIG_VOLUME_MOUNT_PATH=infra_service_config_volume_mount_path, + TEMPORAL_TASK_QUEUE=temporal_task_queue, + TEMPORAL_SERVER_HOSTNAME=temporal_hostname, + TEMPORAL_SERVER_PORT=temporal_port, + ) + else: # temporal-gpu + assert build_endpoint_request.gpu_type is not None + return DeploymentRunnableImageTemporalGpuArguments( + RESOURCE_NAME=k8s_resource_group_name, + NAMESPACE=hmi_config.endpoint_namespace, + ENDPOINT_ID=model_endpoint_record.id, + ENDPOINT_NAME=model_endpoint_record.name, + TEAM=team, + PRODUCT=product, + CREATED_BY=created_by, + OWNER=owner, + GIT_TAG=GIT_TAG, + CHANGE_CAUSE_MESSAGE=change_cause_message, + AWS_ROLE=build_endpoint_request.aws_role, + PRIORITY=priority, + IMAGE=request.image, + IMAGE_HASH=image_hash, + DD_TRACE_ENABLED=str(dd_trace_enabled), + CPUS=str(build_endpoint_request.cpus), + MEMORY=str(build_endpoint_request.memory), + STORAGE_DICT=storage_dict, + PER_WORKER=build_endpoint_request.per_worker, + MIN_WORKERS=build_endpoint_request.min_workers, + MAX_WORKERS=build_endpoint_request.max_workers, + CONCURRENT_REQUESTS_PER_WORKER=build_endpoint_request.concurrent_requests_per_worker, + RESULTS_S3_BUCKET=s3_bucket, + MAIN_ENV=temporal_main_env, + COMMAND=flavor.command, + INFRA_SERVICE_CONFIG_VOLUME_MOUNT_PATH=infra_service_config_volume_mount_path, + TEMPORAL_TASK_QUEUE=temporal_task_queue, + TEMPORAL_SERVER_HOSTNAME=temporal_hostname, + TEMPORAL_SERVER_PORT=temporal_port, + GPU_TYPE=build_endpoint_request.gpu_type.value, + GPUS=build_endpoint_request.gpus, + ) else: raise Exception(f"Unknown resource name: {endpoint_resource_name}") diff --git a/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py b/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py index 3a028b57c..5d650d300 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py +++ b/model-engine/model_engine_server/infra/gateways/resources/live_endpoint_resource_gateway.py @@ -118,11 +118,12 @@ async def delete_resources( endpoint_type=endpoint_type, ) sqs_result = True - try: - await self.queue_delegate.delete_queue(endpoint_id=endpoint_id) - except EndpointResourceInfraException as e: - logger.warning("Could not delete SQS resources", exc_info=e) - sqs_result = False + if endpoint_type != ModelEndpointType.TEMPORAL: + try: + await self.queue_delegate.delete_queue(endpoint_id=endpoint_id) + except EndpointResourceInfraException as e: + logger.warning("Could not delete SQS resources", exc_info=e) + sqs_result = False if self.inference_autoscaling_metrics_gateway is not None: await self.inference_autoscaling_metrics_gateway.delete_resources(endpoint_id) diff --git a/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml b/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml index 63fd2f328..7ae96bf87 100644 --- a/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml +++ b/model-engine/model_engine_server/infra/gateways/resources/templates/service_template_config_map_circleci.yaml @@ -2504,6 +2504,283 @@ data: items: - key: infra_service_config path: config.yaml + deployment-runnable-image-temporal-cpu.yaml: |- + apiVersion: apps/v1 + kind: Deployment + metadata: + name: ${RESOURCE_NAME} + namespace: ${NAMESPACE} + labels: + user_id: ${OWNER} + team: ${TEAM} + product: ${PRODUCT} + created_by: ${CREATED_BY} + owner: ${OWNER} + env: circleci + managed-by: model-engine + use_scale_launch_endpoint_network_policy: "true" + tags.datadoghq.com/env: circleci + tags.datadoghq.com/version: ${GIT_TAG} + tags.datadoghq.com/service: ${ENDPOINT_NAME} + endpoint_id: ${ENDPOINT_ID} + endpoint_name: ${ENDPOINT_NAME} + annotations: + temporal.scaleml.io/taskQueue: "${TEMPORAL_TASK_QUEUE}" + temporal.scaleml.io/minWorkers: "${MIN_WORKERS}" + temporal.scaleml.io/maxWorkers: "${MAX_WORKERS}" + temporal.scaleml.io/perWorker: "${PER_WORKER}" + spec: + strategy: + type: RollingUpdate + rollingUpdate: + maxSurge: 1 + maxUnavailable: 0 + replicas: ${MAX_WORKERS} + selector: + matchLabels: + app: ${RESOURCE_NAME} + version: v1 + template: + metadata: + labels: + app: ${RESOURCE_NAME} + user_id: ${OWNER} + team: ${TEAM} + product: ${PRODUCT} + created_by: ${CREATED_BY} + owner: ${OWNER} + env: circleci + managed-by: model-engine + use_scale_launch_endpoint_network_policy: "true" + tags.datadoghq.com/env: circleci + tags.datadoghq.com/version: ${GIT_TAG} + tags.datadoghq.com/service: ${ENDPOINT_NAME} + endpoint_id: ${ENDPOINT_ID} + endpoint_name: ${ENDPOINT_NAME} + sidecar.istio.io/inject: "false" + version: v1 + annotations: + ad.datadoghq.com/main.logs: '[{"service": "${ENDPOINT_NAME}", "source": "python"}]' + kubernetes.io/change-cause: "${CHANGE_CAUSE_MESSAGE}" + spec: + affinity: + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: app + operator: In + values: + - ${RESOURCE_NAME} + topologyKey: kubernetes.io/hostname + - weight: 100 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: ${IMAGE_HASH} + operator: In + values: + - "True" + topologyKey: kubernetes.io/hostname + terminationGracePeriodSeconds: 1800 + serviceAccount: default + priorityClassName: ${PRIORITY} + containers: + - name: main + securityContext: + capabilities: + drop: + - all + image: ${IMAGE} + imagePullPolicy: IfNotPresent + command: ${COMMAND} + env: ${MAIN_ENV} + resources: + requests: + cpu: ${CPUS} + memory: ${MEMORY} + ${STORAGE_DICT} + limits: + cpu: ${CPUS} + memory: ${MEMORY} + ${STORAGE_DICT} + volumeMounts: + - name: config-volume + mountPath: /opt/.aws/config + subPath: config + - name: user-config + mountPath: /app/user_config + subPath: raw_data + - name: endpoint-config + mountPath: /app/endpoint_config + subPath: raw_data + - name: infra-service-config-volume + mountPath: ${INFRA_SERVICE_CONFIG_VOLUME_MOUNT_PATH} + securityContext: + fsGroup: 65534 + volumes: + - name: config-volume + configMap: + name: default-config + - name: user-config + configMap: + name: ${RESOURCE_NAME} + - name: endpoint-config + configMap: + name: ${RESOURCE_NAME}-endpoint-config + - name: infra-service-config-volume + configMap: + name: model-engine-service-config + items: + - key: infra_service_config + path: config.yaml + deployment-runnable-image-temporal-gpu.yaml: |- + apiVersion: apps/v1 + kind: Deployment + metadata: + name: ${RESOURCE_NAME} + namespace: ${NAMESPACE} + labels: + user_id: ${OWNER} + team: ${TEAM} + product: ${PRODUCT} + created_by: ${CREATED_BY} + owner: ${OWNER} + env: circleci + managed-by: model-engine + use_scale_launch_endpoint_network_policy: "true" + tags.datadoghq.com/env: circleci + tags.datadoghq.com/version: ${GIT_TAG} + tags.datadoghq.com/service: ${ENDPOINT_NAME} + endpoint_id: ${ENDPOINT_ID} + endpoint_name: ${ENDPOINT_NAME} + annotations: + temporal.scaleml.io/taskQueue: "${TEMPORAL_TASK_QUEUE}" + temporal.scaleml.io/minWorkers: "${MIN_WORKERS}" + temporal.scaleml.io/maxWorkers: "${MAX_WORKERS}" + temporal.scaleml.io/perWorker: "${PER_WORKER}" + spec: + strategy: + type: RollingUpdate + rollingUpdate: + maxSurge: 1 + maxUnavailable: 0 + replicas: ${MAX_WORKERS} + selector: + matchLabels: + app: ${RESOURCE_NAME} + version: v1 + template: + metadata: + labels: + app: ${RESOURCE_NAME} + user_id: ${OWNER} + team: ${TEAM} + product: ${PRODUCT} + created_by: ${CREATED_BY} + owner: ${OWNER} + env: circleci + managed-by: model-engine + use_scale_launch_endpoint_network_policy: "true" + tags.datadoghq.com/env: circleci + tags.datadoghq.com/version: ${GIT_TAG} + tags.datadoghq.com/service: ${ENDPOINT_NAME} + endpoint_id: ${ENDPOINT_ID} + endpoint_name: ${ENDPOINT_NAME} + sidecar.istio.io/inject: "false" + version: v1 + annotations: + ad.datadoghq.com/main.logs: '[{"service": "${ENDPOINT_NAME}", "source": "python"}]' + kubernetes.io/change-cause: "${CHANGE_CAUSE_MESSAGE}" + spec: + affinity: + podAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 1 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: app + operator: In + values: + - ${RESOURCE_NAME} + topologyKey: kubernetes.io/hostname + - weight: 100 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: ${IMAGE_HASH} + operator: In + values: + - "True" + topologyKey: kubernetes.io/hostname + terminationGracePeriodSeconds: 1800 + serviceAccount: default + nodeSelector: + k8s.amazonaws.com/accelerator: ${GPU_TYPE} + tolerations: + - key: "nvidia.com/gpu" + operator: "Exists" + effect: "NoSchedule" + priorityClassName: ${PRIORITY} + containers: + - name: main + securityContext: + capabilities: + drop: + - all + image: ${IMAGE} + imagePullPolicy: IfNotPresent + command: ${COMMAND} + env: ${MAIN_ENV} + resources: + requests: + nvidia.com/gpu: ${GPUS} + cpu: ${CPUS} + memory: ${MEMORY} + ${STORAGE_DICT} + limits: + nvidia.com/gpu: ${GPUS} + cpu: ${CPUS} + memory: ${MEMORY} + ${STORAGE_DICT} + volumeMounts: + - name: config-volume + mountPath: /opt/.aws/config + subPath: config + - mountPath: /dev/shm + name: dshm + - name: user-config + mountPath: /app/user_config + subPath: raw_data + - name: endpoint-config + mountPath: /app/endpoint_config + subPath: raw_data + - name: infra-service-config-volume + mountPath: ${INFRA_SERVICE_CONFIG_VOLUME_MOUNT_PATH} + securityContext: + fsGroup: 65534 + volumes: + - name: config-volume + configMap: + name: default-config + - name: user-config + configMap: + name: ${RESOURCE_NAME} + - name: endpoint-config + configMap: + name: ${RESOURCE_NAME}-endpoint-config + - name: dshm + emptyDir: + medium: Memory + - name: infra-service-config-volume + configMap: + name: model-engine-service-config + items: + - key: infra_service_config + path: config.yaml user-config.yaml: |- apiVersion: v1 kind: ConfigMap diff --git a/model-engine/model_engine_server/infra/repositories/db_model_endpoint_record_repository.py b/model-engine/model_engine_server/infra/repositories/db_model_endpoint_record_repository.py index 1551e3fae..500bf4299 100644 --- a/model-engine/model_engine_server/infra/repositories/db_model_endpoint_record_repository.py +++ b/model-engine/model_engine_server/infra/repositories/db_model_endpoint_record_repository.py @@ -55,6 +55,7 @@ def translate_model_endpoint_orm_to_model_endpoint_record( public_inference=model_endpoint_orm.public_inference, task_expires_seconds=model_endpoint_orm.task_expires_seconds, queue_message_timeout_seconds=model_endpoint_orm.queue_message_timeout_seconds, + temporal_task_queue=model_endpoint_orm.temporal_task_queue, ) @@ -124,6 +125,7 @@ async def create_model_endpoint_record( public_inference: Optional[bool] = False, task_expires_seconds: Optional[int] = None, queue_message_timeout_seconds: Optional[int] = None, + temporal_task_queue: Optional[str] = None, ) -> ModelEndpointRecord: model_endpoint_record = OrmModelEndpoint( name=name, @@ -138,6 +140,7 @@ async def create_model_endpoint_record( public_inference=public_inference, task_expires_seconds=task_expires_seconds, queue_message_timeout_seconds=queue_message_timeout_seconds, + temporal_task_queue=temporal_task_queue, ) async with self.session() as session: await OrmModelEndpoint.create(session, model_endpoint_record) @@ -313,6 +316,7 @@ async def update_model_endpoint_record( public_inference: Optional[bool] = None, task_expires_seconds: Optional[int] = None, queue_message_timeout_seconds: Optional[int] = None, + temporal_task_queue: Optional[str] = None, ) -> Optional[ModelEndpointRecord]: async with self.session() as session: model_endpoint_orm = await OrmModelEndpoint.select_by_id( @@ -334,6 +338,7 @@ async def update_model_endpoint_record( public_inference=public_inference, task_expires_seconds=task_expires_seconds, queue_message_timeout_seconds=queue_message_timeout_seconds, + temporal_task_queue=temporal_task_queue, ) await OrmModelEndpoint.update_by_name_owner( session=session, diff --git a/model-engine/model_engine_server/infra/repositories/model_endpoint_record_repository.py b/model-engine/model_engine_server/infra/repositories/model_endpoint_record_repository.py index a0f7e1482..ec9bd9ffe 100644 --- a/model-engine/model_engine_server/infra/repositories/model_endpoint_record_repository.py +++ b/model-engine/model_engine_server/infra/repositories/model_endpoint_record_repository.py @@ -52,6 +52,7 @@ async def create_model_endpoint_record( public_inference: Optional[bool] = False, task_expires_seconds: Optional[int] = None, queue_message_timeout_seconds: Optional[int] = None, + temporal_task_queue: Optional[str] = None, ) -> ModelEndpointRecord: """ Creates an entry for endpoint tracking data, but not the actual compute resources. @@ -62,7 +63,7 @@ async def create_model_endpoint_record( model_bundle_id: Bundle the endpoint uses metadata: Arbitrary dictionary containing user-defined metadata endpoint_type: Type of endpoint i.e. async/sync - destination: The queue name (async) or deployment name (sync) of the endpoint, used for routing requests + destination: The queue name (async) or deployment name (sync/temporal) of the endpoint, used for routing requests creation_task_id: The celery task id corresponding to endpoint creation status: A status field on the endpoint, keeps track of endpoint state, used to coordinate edit operations on the endpoint @@ -70,6 +71,7 @@ async def create_model_endpoint_record( public_inference: Whether the endpoint is publicly accessible task_expires_seconds: For async endpoints, how long a task can wait in queue before expiring queue_message_timeout_seconds: For async endpoints, queue message visibility/lock timeout + temporal_task_queue: For temporal endpoints, the Temporal task queue that workers will poll Returns: A Model Endpoint Record domain entity. @@ -88,6 +90,7 @@ async def update_model_endpoint_record( public_inference: Optional[bool] = None, task_expires_seconds: Optional[int] = None, queue_message_timeout_seconds: Optional[int] = None, + temporal_task_queue: Optional[str] = None, ) -> Optional[ModelEndpointRecord]: """ Updates the entry for endpoint tracking data with the given new values. Only these values are editable. @@ -102,6 +105,7 @@ async def update_model_endpoint_record( public_inference: Whether the endpoint is publicly accessible task_expires_seconds: For async endpoints, how long a task can wait in queue before expiring queue_message_timeout_seconds: For async endpoints, queue message visibility/lock timeout + temporal_task_queue: For temporal endpoints, the Temporal task queue that workers will poll Returns: A Model Endpoint Record domain entity if found, else None. diff --git a/model-engine/model_engine_server/infra/services/live_model_endpoint_service.py b/model-engine/model_engine_server/infra/services/live_model_endpoint_service.py index 59934fec0..c9c5e4606 100644 --- a/model-engine/model_engine_server/infra/services/live_model_endpoint_service.py +++ b/model-engine/model_engine_server/infra/services/live_model_endpoint_service.py @@ -166,6 +166,7 @@ async def create_model_endpoint( public_inference: Optional[bool] = False, queue_message_timeout_seconds: Optional[int] = None, task_expires_seconds: Optional[int] = None, + temporal_task_queue: Optional[str] = None, ) -> ModelEndpointRecord: existing_endpoints = ( await self.model_endpoint_record_repository.list_model_endpoint_records( @@ -189,6 +190,7 @@ async def create_model_endpoint( public_inference=public_inference, task_expires_seconds=task_expires_seconds, queue_message_timeout_seconds=queue_message_timeout_seconds, + temporal_task_queue=temporal_task_queue, ) ) creation_task_id = await self.model_endpoint_infra_gateway.create_model_endpoint_infra( @@ -297,6 +299,7 @@ async def update_model_endpoint( public_inference: Optional[bool] = None, queue_message_timeout_seconds: Optional[int] = None, task_expires_seconds: Optional[int] = None, + temporal_task_queue: Optional[str] = None, ) -> ModelEndpointRecord: record = await self.model_endpoint_record_repository.get_model_endpoint_record( model_endpoint_id=model_endpoint_id @@ -338,6 +341,7 @@ async def update_model_endpoint( public_inference=public_inference, task_expires_seconds=task_expires_seconds, queue_message_timeout_seconds=queue_message_timeout_seconds, + temporal_task_queue=temporal_task_queue, ) if record is None: # pragma: no cover raise ObjectNotFoundException