diff --git a/.github/workflows/dts-e2e-tests.yaml b/.github/workflows/dts-e2e-tests.yaml index c9987f3..c2a383a 100644 --- a/.github/workflows/dts-e2e-tests.yaml +++ b/.github/workflows/dts-e2e-tests.yaml @@ -1,7 +1,7 @@ name: ๐Ÿงช DTS Emulator E2E Tests # This workflow runs E2E tests against the Durable Task Scheduler (DTS) emulator. -# It mirrors the Python testing setup at durabletask-python for Azure-managed tests. +# Tests are split across parallel jobs with separate task hubs for isolation. on: push: @@ -20,6 +20,16 @@ jobs: fail-fast: false matrix: node-version: ["22.x", "24.x"] + test-group: + - name: "entity" + pattern: "test/e2e-azuremanaged/entity.spec.ts" + - name: "orchestration" + pattern: "test/e2e-azuremanaged/orchestration.spec.ts" + - name: "query-restart" + pattern: "test/e2e-azuremanaged/query-apis.spec.ts test/e2e-azuremanaged/restart.spec.ts" + - name: "retry-history-rewind" + pattern: "test/e2e-azuremanaged/retry-handler.spec.ts test/e2e-azuremanaged/retry-advanced.spec.ts test/e2e-azuremanaged/history.spec.ts test/e2e-azuremanaged/rewind.spec.ts" + name: "e2e (${{ matrix.test-group.name }}, node ${{ matrix.node-version }})" env: EMULATOR_VERSION: "latest" runs-on: ubuntu-latest @@ -36,7 +46,7 @@ jobs: docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:$EMULATOR_VERSION - name: โณ Wait for container to be ready - run: sleep 10 # Adjust if your service needs more time to start + run: sleep 10 - name: ๐Ÿ”ง Set environment variables run: | @@ -50,7 +60,8 @@ jobs: registry-url: "https://registry.npmjs.org" - name: โš™๏ธ Install dependencies - run: npm install + run: npm ci - - name: โœ… Run E2E tests against DTS emulator - run: npm run test:e2e:azuremanaged:internal + - name: โœ… Run E2E tests โ€” ${{ matrix.test-group.name }} + run: npx jest ${{ matrix.test-group.pattern }} --runInBand --detectOpenHandles + timeout-minutes: 15 diff --git a/.github/workflows/pr-validation.yaml b/.github/workflows/pr-validation.yaml index 54a56fb..191d01e 100644 --- a/.github/workflows/pr-validation.yaml +++ b/.github/workflows/pr-validation.yaml @@ -16,7 +16,7 @@ jobs: runs-on: ubuntu-latest env: - NODE_VER: 18.x + NODE_VER: 22.x steps: - name: ๐Ÿ“ฅ Checkout code @@ -29,7 +29,7 @@ jobs: registry-url: "https://registry.npmjs.org" - name: โš™๏ธ Install dependencies - run: npm install + run: npm ci - name: ๐Ÿ” Run linting run: npm run lint @@ -56,7 +56,7 @@ jobs: registry-url: "https://registry.npmjs.org" - name: โš™๏ธ Install dependencies - run: npm install + run: npm ci # Install Go SDK for durabletask-go sidecar - name: ๐Ÿ”ง Install Go SDK diff --git a/package-lock.json b/package-lock.json index 6ea04a0..c302015 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2272,13 +2272,13 @@ } }, "node_modules/@typescript-eslint/typescript-estree/node_modules/minimatch": { - "version": "9.0.5", - "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-9.0.5.tgz", - "integrity": "sha512-G6T0ZX48xgozx7587koeX9Ys2NYy6Gmv//P89sEte9V9whIapMNF4idKxnW2QtCcLiTWlb/wfCabAtAFWhhBow==", + "version": "9.0.9", + "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-9.0.9.tgz", + "integrity": "sha512-OBwBN9AL4dqmETlpS2zasx+vTeWclWzkblfZk7KTA5j3jeOONz/tRCnZomUyvNg83wL5Zv9Ss6HMJXAgL8R2Yg==", "dev": true, "license": "ISC", "dependencies": { - "brace-expansion": "^2.0.1" + "brace-expansion": "^2.0.2" }, "engines": { "node": ">=16 || 14 >=14.17" @@ -2399,9 +2399,9 @@ } }, "node_modules/ajv": { - "version": "6.12.6", - "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.12.6.tgz", - "integrity": "sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==", + "version": "6.14.0", + "resolved": "https://registry.npmjs.org/ajv/-/ajv-6.14.0.tgz", + "integrity": "sha512-IWrosm/yrn43eiKqkfkHis7QioDleaXQHdDVPKg0FSwwd/DuvyX79TZnFOnYpB7dcsFAMmtFztZuXPDvSePkFw==", "dev": true, "license": "MIT", "dependencies": { @@ -5489,9 +5489,9 @@ } }, "node_modules/minimatch": { - "version": "3.1.2", - "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz", - "integrity": "sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw==", + "version": "3.1.5", + "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.5.tgz", + "integrity": "sha512-VgjWUsnnT6n+NUk6eZq77zeFdpW2LWDzP6zFGrCbHXiYNul5Dzqk2HHQ5uFH2DNW5Xbp8+jVzaeNt94ssEEl4w==", "dev": true, "license": "ISC", "dependencies": { diff --git a/package.json b/package.json index 5f5d7c8..b2f8d66 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,7 @@ "test:e2e": "./scripts/test-e2e.sh", "test:e2e:internal": "jest tests/e2e --runInBand --detectOpenHandles", "test:e2e:one": "jest tests/e2e --runInBand --detectOpenHandles --testNamePattern", - "test:e2e:azuremanaged:internal": "jest test/e2e-azuremanaged --runInBand --detectOpenHandles", + "test:e2e:azuremanaged:internal": "jest test/e2e-azuremanaged --detectOpenHandles", "test:e2e:azuremanaged": "./scripts/test-e2e-azuremanaged.sh", "lint": "eslint .", "pretty": "prettier --list-different \"**/*.{ts,tsx,js,jsx,json,md}\"", diff --git a/packages/durabletask-js-azuremanaged/src/credential-factory.ts b/packages/durabletask-js-azuremanaged/src/credential-factory.ts index 2119f7e..fa8880e 100644 --- a/packages/durabletask-js-azuremanaged/src/credential-factory.ts +++ b/packages/durabletask-js-azuremanaged/src/credential-factory.ts @@ -39,34 +39,17 @@ export function getCredentialFromAuthenticationType( } case "workloadidentity": { - const options: { - clientId?: string; - tenantId?: string; - tokenFilePath?: string; - additionallyAllowedTenants?: string[]; - } = {}; - const clientId = connectionString.getClientId(); - if (clientId) { - options.clientId = clientId; - } - const tenantId = connectionString.getTenantId(); - if (tenantId) { - options.tenantId = tenantId; - } - const tokenFilePath = connectionString.getTokenFilePath(); - if (tokenFilePath) { - options.tokenFilePath = tokenFilePath; - } - const additionallyAllowedTenants = connectionString.getAdditionallyAllowedTenants(); - if (additionallyAllowedTenants) { - options.additionallyAllowedTenants = additionallyAllowedTenants; - } - return new WorkloadIdentityCredential(options); + return new WorkloadIdentityCredential({ + ...(clientId && { clientId }), + ...(tenantId && { tenantId }), + ...(tokenFilePath && { tokenFilePath }), + ...(additionallyAllowedTenants && { additionallyAllowedTenants }), + }); } case "environment": diff --git a/packages/durabletask-js-azuremanaged/src/options.ts b/packages/durabletask-js-azuremanaged/src/options.ts index 095853e..30f9c55 100644 --- a/packages/durabletask-js-azuremanaged/src/options.ts +++ b/packages/durabletask-js-azuremanaged/src/options.ts @@ -91,14 +91,14 @@ abstract class DurableTaskAzureManagedOptionsBase { // Add https:// prefix if no protocol is specified if (!endpoint.startsWith("http://") && !endpoint.startsWith("https://")) { - endpoint = "https://" + endpoint; + endpoint = `https://${endpoint}`; } try { const url = new URL(endpoint); let authority = url.hostname; if (url.port) { - authority += ":" + url.port; + authority = `${authority}:${url.port}`; } return authority; } catch { diff --git a/packages/durabletask-js/src/orchestration/page.ts b/packages/durabletask-js/src/orchestration/page.ts index dcc1232..51195ae 100644 --- a/packages/durabletask-js/src/orchestration/page.ts +++ b/packages/durabletask-js/src/orchestration/page.ts @@ -30,7 +30,7 @@ export class Page { * Returns true if there are more pages available. */ get hasMoreResults(): boolean { - return this.continuationToken !== undefined && this.continuationToken !== ""; + return !!this.continuationToken; } } diff --git a/packages/durabletask-js/src/tracing/trace-helper.ts b/packages/durabletask-js/src/tracing/trace-helper.ts index 9a8e76a..1a9f62a 100644 --- a/packages/durabletask-js/src/tracing/trace-helper.ts +++ b/packages/durabletask-js/src/tracing/trace-helper.ts @@ -517,17 +517,11 @@ export function processActionsForTracing( } } -/** - * Emits a span for calling an entity from an orchestration (request/response). - * - * @param orchestrationSpan - The parent orchestration span. - * @param operationName - The entity operation name. - * @param targetInstanceId - The target entity instance ID. - * @param taskId - The sequential task ID. - */ -export function emitSpanForEntityCall( +function emitSpanForEntityOperation( orchestrationSpan: Span, operationName: string, + taskType: string, + getSpanKind: (otel: any) => any, targetInstanceId?: string, taskId?: number, ): void { @@ -535,15 +529,15 @@ export function emitSpanForEntityCall( const tracer = getTracer(); if (!otel || !tracer) return; - const spanName = createSpanName(TaskType.CALL_ENTITY, operationName); + const spanName = createSpanName(taskType, operationName); const parentContext = otel.trace.setSpan(otel.context.active(), orchestrationSpan); const span = tracer.startSpan( spanName, { - kind: otel.SpanKind.CLIENT, + kind: getSpanKind(otel), attributes: { - [DurableTaskAttributes.TYPE]: TaskType.CALL_ENTITY, + [DurableTaskAttributes.TYPE]: taskType, [DurableTaskAttributes.ENTITY_OPERATION]: operationName, ...(targetInstanceId ? { [DurableTaskAttributes.ENTITY_INSTANCE_ID]: targetInstanceId } : {}), ...(taskId !== undefined ? { [DurableTaskAttributes.TASK_TASK_ID]: taskId } : {}), @@ -555,6 +549,30 @@ export function emitSpanForEntityCall( span.end(); } +/** + * Emits a span for calling an entity from an orchestration (request/response). + * + * @param orchestrationSpan - The parent orchestration span. + * @param operationName - The entity operation name. + * @param targetInstanceId - The target entity instance ID. + * @param taskId - The sequential task ID. + */ +export function emitSpanForEntityCall( + orchestrationSpan: Span, + operationName: string, + targetInstanceId?: string, + taskId?: number, +): void { + emitSpanForEntityOperation( + orchestrationSpan, + operationName, + TaskType.CALL_ENTITY, + (otel) => otel.SpanKind.CLIENT, + targetInstanceId, + taskId, + ); +} + /** * Emits a span for signaling an entity from an orchestration (fire-and-forget). * @@ -569,28 +587,14 @@ export function emitSpanForEntitySignal( targetInstanceId?: string, taskId?: number, ): void { - const otel = getOtelApi(); - const tracer = getTracer(); - if (!otel || !tracer) return; - - const spanName = createSpanName(TaskType.SIGNAL_ENTITY, operationName); - const parentContext = otel.trace.setSpan(otel.context.active(), orchestrationSpan); - - const span = tracer.startSpan( - spanName, - { - kind: otel.SpanKind.PRODUCER, - attributes: { - [DurableTaskAttributes.TYPE]: TaskType.SIGNAL_ENTITY, - [DurableTaskAttributes.ENTITY_OPERATION]: operationName, - ...(targetInstanceId ? { [DurableTaskAttributes.ENTITY_INSTANCE_ID]: targetInstanceId } : {}), - ...(taskId !== undefined ? { [DurableTaskAttributes.TASK_TASK_ID]: taskId } : {}), - }, - }, - parentContext, + emitSpanForEntityOperation( + orchestrationSpan, + operationName, + TaskType.SIGNAL_ENTITY, + (otel) => otel.SpanKind.PRODUCER, + targetInstanceId, + taskId, ); - - span.end(); } /** diff --git a/packages/durabletask-js/src/utils/history-event-converter.ts b/packages/durabletask-js/src/utils/history-event-converter.ts index f89a1e6..1d33bf2 100644 --- a/packages/durabletask-js/src/utils/history-event-converter.ts +++ b/packages/durabletask-js/src/utils/history-event-converter.ts @@ -1,483 +1,521 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -import * as pb from "../proto/orchestrator_service_pb"; -import { - HistoryEvent, - HistoryEventType, - OrchestrationInstance, - ParentInstanceInfo, - TraceContext, -} from "../orchestration/history-event"; -import { FailureDetails } from "../task/failure-details"; - -// Map OrchestrationStatus enum values to their string names -const ORCHESTRATION_STATUS_MAP: Record = { - [pb.OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING]: "ORCHESTRATION_STATUS_RUNNING", - [pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED]: "ORCHESTRATION_STATUS_COMPLETED", - [pb.OrchestrationStatus.ORCHESTRATION_STATUS_CONTINUED_AS_NEW]: "ORCHESTRATION_STATUS_CONTINUED_AS_NEW", - [pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED]: "ORCHESTRATION_STATUS_FAILED", - [pb.OrchestrationStatus.ORCHESTRATION_STATUS_CANCELED]: "ORCHESTRATION_STATUS_CANCELED", - [pb.OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED]: "ORCHESTRATION_STATUS_TERMINATED", - [pb.OrchestrationStatus.ORCHESTRATION_STATUS_PENDING]: "ORCHESTRATION_STATUS_PENDING", - [pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED]: "ORCHESTRATION_STATUS_SUSPENDED", -}; - -function convertOrchestrationStatus(status: number): string { - return ORCHESTRATION_STATUS_MAP[status] ?? `UNKNOWN_STATUS_${status}`; -} - -/** - * Converts a protobuf HistoryEvent to a TypeScript HistoryEvent. - * @param protoEvent The protobuf HistoryEvent to convert. - * @returns The converted HistoryEvent, or undefined if the event type is not recognized. - */ -export function convertProtoHistoryEvent(protoEvent: pb.HistoryEvent): HistoryEvent | undefined { - const eventId = protoEvent.getEventid(); - const timestamp = protoEvent.getTimestamp()?.toDate() ?? new Date(0); - const eventTypeCase = protoEvent.getEventtypeCase(); - - switch (eventTypeCase) { - case pb.HistoryEvent.EventtypeCase.EXECUTIONSTARTED: { - const event = protoEvent.getExecutionstarted(); - if (!event) return undefined; - - const orchInstance = event.getOrchestrationinstance(); - const parentInfo = event.getParentinstance(); - const scheduledTime = event.getScheduledstarttimestamp(); - const tagsMap = event.getTagsMap(); - const traceCtx = event.getParenttracecontext(); - - return { - eventId, - timestamp, - type: HistoryEventType.ExecutionStarted, - name: event.getName(), - version: event.getVersion()?.getValue(), - input: event.getInput()?.getValue(), - orchestrationInstance: orchInstance ? convertOrchestrationInstance(orchInstance) : undefined, - parentInstance: parentInfo ? convertParentInstanceInfo(parentInfo) : undefined, - scheduledStartTimestamp: scheduledTime ? scheduledTime.toDate() : undefined, - tags: tagsMap ? convertTagsMap(tagsMap) : undefined, - parentTraceContext: traceCtx ? convertTraceContext(traceCtx) : undefined, - orchestrationSpanId: event.getOrchestrationspanid()?.getValue(), - }; - } - - case pb.HistoryEvent.EventtypeCase.EXECUTIONCOMPLETED: { - const event = protoEvent.getExecutioncompleted(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.ExecutionCompleted, - orchestrationStatus: convertOrchestrationStatus(event.getOrchestrationstatus()), - result: event.getResult()?.getValue(), - failureDetails: convertFailureDetails(event.getFailuredetails()), - }; - } - - case pb.HistoryEvent.EventtypeCase.EXECUTIONTERMINATED: { - const event = protoEvent.getExecutionterminated(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.ExecutionTerminated, - input: event.getInput()?.getValue(), - recurse: event.getRecurse(), - }; - } - - case pb.HistoryEvent.EventtypeCase.EXECUTIONSUSPENDED: { - const event = protoEvent.getExecutionsuspended(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.ExecutionSuspended, - input: event.getInput()?.getValue(), - }; - } - - case pb.HistoryEvent.EventtypeCase.EXECUTIONRESUMED: { - const event = protoEvent.getExecutionresumed(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.ExecutionResumed, - input: event.getInput()?.getValue(), - }; - } - - case pb.HistoryEvent.EventtypeCase.EXECUTIONREWOUND: { - const event = protoEvent.getExecutionrewound(); - if (!event) return undefined; - - const parentInfo = event.getParentinstance(); - const traceCtx = event.getParenttracecontext(); - const tagsMap = event.getTagsMap(); - - return { - eventId, - timestamp, - type: HistoryEventType.ExecutionRewound, - reason: event.getReason()?.getValue(), - parentExecutionId: event.getParentexecutionid()?.getValue(), - instanceId: event.getInstanceid()?.getValue(), - parentTraceContext: traceCtx ? convertTraceContext(traceCtx) : undefined, - name: event.getName()?.getValue(), - version: event.getVersion()?.getValue(), - input: event.getInput()?.getValue(), - parentInstance: parentInfo ? convertParentInstanceInfo(parentInfo) : undefined, - tags: tagsMap && tagsMap.getLength() > 0 ? convertTagsMap(tagsMap) : undefined, - }; - } - - case pb.HistoryEvent.EventtypeCase.TASKSCHEDULED: { - const event = protoEvent.getTaskscheduled(); - if (!event) return undefined; - - const tagsMap = event.getTagsMap(); - const traceCtx = event.getParenttracecontext(); - - return { - eventId, - timestamp, - type: HistoryEventType.TaskScheduled, - name: event.getName(), - version: event.getVersion()?.getValue(), - input: event.getInput()?.getValue(), - tags: tagsMap ? convertTagsMap(tagsMap) : undefined, - parentTraceContext: traceCtx ? convertTraceContext(traceCtx) : undefined, - }; - } - - case pb.HistoryEvent.EventtypeCase.TASKCOMPLETED: { - const event = protoEvent.getTaskcompleted(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.TaskCompleted, - taskScheduledId: event.getTaskscheduledid(), - result: event.getResult()?.getValue(), - }; - } - - case pb.HistoryEvent.EventtypeCase.TASKFAILED: { - const event = protoEvent.getTaskfailed(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.TaskFailed, - taskScheduledId: event.getTaskscheduledid(), - failureDetails: convertFailureDetails(event.getFailuredetails()), - }; - } - - case pb.HistoryEvent.EventtypeCase.SUBORCHESTRATIONINSTANCECREATED: { - const event = protoEvent.getSuborchestrationinstancecreated(); - if (!event) return undefined; - - const tagsMap = event.getTagsMap(); - const traceCtx = event.getParenttracecontext(); - - return { - eventId, - timestamp, - type: HistoryEventType.SubOrchestrationInstanceCreated, - name: event.getName(), - version: event.getVersion()?.getValue(), - instanceId: event.getInstanceid(), - input: event.getInput()?.getValue(), - tags: tagsMap ? convertTagsMap(tagsMap) : undefined, - parentTraceContext: traceCtx ? convertTraceContext(traceCtx) : undefined, - }; - } - - case pb.HistoryEvent.EventtypeCase.SUBORCHESTRATIONINSTANCECOMPLETED: { - const event = protoEvent.getSuborchestrationinstancecompleted(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.SubOrchestrationInstanceCompleted, - taskScheduledId: event.getTaskscheduledid(), - result: event.getResult()?.getValue(), - }; - } - - case pb.HistoryEvent.EventtypeCase.SUBORCHESTRATIONINSTANCEFAILED: { - const event = protoEvent.getSuborchestrationinstancefailed(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.SubOrchestrationInstanceFailed, - taskScheduledId: event.getTaskscheduledid(), - failureDetails: convertFailureDetails(event.getFailuredetails()), - }; - } - - case pb.HistoryEvent.EventtypeCase.TIMERCREATED: { - const event = protoEvent.getTimercreated(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.TimerCreated, - fireAt: event.getFireat()?.toDate() ?? new Date(0), - }; - } - - case pb.HistoryEvent.EventtypeCase.TIMERFIRED: { - const event = protoEvent.getTimerfired(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.TimerFired, - fireAt: event.getFireat()?.toDate() ?? new Date(0), - timerId: event.getTimerid(), - }; - } - - case pb.HistoryEvent.EventtypeCase.ORCHESTRATORSTARTED: { - return { - eventId, - timestamp, - type: HistoryEventType.OrchestratorStarted, - }; - } - - case pb.HistoryEvent.EventtypeCase.ORCHESTRATORCOMPLETED: { - return { - eventId, - timestamp, - type: HistoryEventType.OrchestratorCompleted, - }; - } - - case pb.HistoryEvent.EventtypeCase.EVENTSENT: { - const event = protoEvent.getEventsent(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.EventSent, - name: event.getName(), - instanceId: event.getInstanceid(), - input: event.getInput()?.getValue(), - }; - } - - case pb.HistoryEvent.EventtypeCase.EVENTRAISED: { - const event = protoEvent.getEventraised(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.EventRaised, - name: event.getName(), - input: event.getInput()?.getValue(), - }; - } - - case pb.HistoryEvent.EventtypeCase.GENERICEVENT: { - const event = protoEvent.getGenericevent(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.GenericEvent, - data: event.getData()?.getValue(), - }; - } - - case pb.HistoryEvent.EventtypeCase.HISTORYSTATE: { - return { - eventId, - timestamp, - type: HistoryEventType.HistoryState, - }; - } - - case pb.HistoryEvent.EventtypeCase.CONTINUEASNEW: { - const event = protoEvent.getContinueasnew(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.ContinueAsNew, - input: event.getInput()?.getValue(), - }; - } - - case pb.HistoryEvent.EventtypeCase.ENTITYOPERATIONSIGNALED: { - const event = protoEvent.getEntityoperationsignaled(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.EntityOperationSignaled, - requestId: event.getRequestid(), - operation: event.getOperation(), - targetInstanceId: event.getTargetinstanceid()?.getValue(), - scheduledTime: event.getScheduledtime()?.toDate(), - input: event.getInput()?.getValue(), - }; - } - - case pb.HistoryEvent.EventtypeCase.ENTITYOPERATIONCALLED: { - const event = protoEvent.getEntityoperationcalled(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.EntityOperationCalled, - requestId: event.getRequestid(), - operation: event.getOperation(), - targetInstanceId: event.getTargetinstanceid()?.getValue(), - parentInstanceId: event.getParentinstanceid()?.getValue(), - scheduledTime: event.getScheduledtime()?.toDate(), - input: event.getInput()?.getValue(), - }; - } - - case pb.HistoryEvent.EventtypeCase.ENTITYOPERATIONCOMPLETED: { - const event = protoEvent.getEntityoperationcompleted(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.EntityOperationCompleted, - requestId: event.getRequestid(), - output: event.getOutput()?.getValue(), - }; - } - - case pb.HistoryEvent.EventtypeCase.ENTITYOPERATIONFAILED: { - const event = protoEvent.getEntityoperationfailed(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.EntityOperationFailed, - requestId: event.getRequestid(), - failureDetails: convertFailureDetails(event.getFailuredetails()), - }; - } - - case pb.HistoryEvent.EventtypeCase.ENTITYLOCKREQUESTED: { - const event = protoEvent.getEntitylockrequested(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.EntityLockRequested, - criticalSectionId: event.getCriticalsectionid(), - lockSet: event.getLocksetList(), - position: event.getPosition(), - parentInstanceId: event.getParentinstanceid()?.getValue(), - }; - } - - case pb.HistoryEvent.EventtypeCase.ENTITYLOCKGRANTED: { - const event = protoEvent.getEntitylockgranted(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.EntityLockGranted, - criticalSectionId: event.getCriticalsectionid(), - }; - } - - case pb.HistoryEvent.EventtypeCase.ENTITYUNLOCKSENT: { - const event = protoEvent.getEntityunlocksent(); - if (!event) return undefined; - - return { - eventId, - timestamp, - type: HistoryEventType.EntityUnlockSent, - criticalSectionId: event.getCriticalsectionid(), - parentInstanceId: event.getParentinstanceid()?.getValue(), - targetInstanceId: event.getTargetinstanceid()?.getValue(), - }; - } - - default: - return undefined; - } -} - -function convertOrchestrationInstance(instance: pb.OrchestrationInstance): OrchestrationInstance { - return { - instanceId: instance.getInstanceid(), - executionId: instance.getExecutionid()?.getValue(), - }; -} - -function convertParentInstanceInfo(parent: pb.ParentInstanceInfo): ParentInstanceInfo { - const orchInstance = parent.getOrchestrationinstance(); - return { - name: parent.getName()?.getValue(), - version: parent.getVersion()?.getValue(), - taskScheduledId: parent.getTaskscheduledid(), - orchestrationInstance: orchInstance ? convertOrchestrationInstance(orchInstance) : undefined, - }; -} - -function convertTraceContext(traceContext: pb.TraceContext): TraceContext { - return { - traceParent: traceContext.getTraceparent(), - spanId: traceContext.getSpanid(), - traceState: traceContext.getTracestate()?.getValue(), - }; -} - -function convertFailureDetails(details: pb.TaskFailureDetails | undefined): FailureDetails | undefined { - if (!details) return undefined; - - return new FailureDetails( - details.getErrormessage(), - details.getErrortype(), - details.getStacktrace()?.getValue(), - ); -} - -function convertTagsMap(tagsMap: ReturnType): Record | undefined { - const result: Record = {}; - let hasEntries = false; - - tagsMap.forEach((value: string, key: string) => { - result[key] = value; - hasEntries = true; - }); - - return hasEntries ? result : undefined; -} +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +import * as pb from "../proto/orchestrator_service_pb"; +import { + HistoryEvent, + HistoryEventType, + OrchestrationInstance, + ParentInstanceInfo, + TraceContext, +} from "../orchestration/history-event"; +import { FailureDetails } from "../task/failure-details"; + +// Map OrchestrationStatus enum values to their string names +const ORCHESTRATION_STATUS_MAP: Record = { + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_RUNNING]: "ORCHESTRATION_STATUS_RUNNING", + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED]: "ORCHESTRATION_STATUS_COMPLETED", + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_CONTINUED_AS_NEW]: "ORCHESTRATION_STATUS_CONTINUED_AS_NEW", + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED]: "ORCHESTRATION_STATUS_FAILED", + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_CANCELED]: "ORCHESTRATION_STATUS_CANCELED", + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED]: "ORCHESTRATION_STATUS_TERMINATED", + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_PENDING]: "ORCHESTRATION_STATUS_PENDING", + [pb.OrchestrationStatus.ORCHESTRATION_STATUS_SUSPENDED]: "ORCHESTRATION_STATUS_SUSPENDED", +}; + +function convertOrchestrationStatus(status: number): string { + return ORCHESTRATION_STATUS_MAP[status] ?? `UNKNOWN_STATUS_${status}`; +} + +// Individual converter functions for each history event type + +type ConverterFn = (protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date) => HistoryEvent | undefined; + +function convertExecutionStarted(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getExecutionstarted(); + if (!event) return undefined; + + const orchInstance = event.getOrchestrationinstance(); + const parentInfo = event.getParentinstance(); + const scheduledTime = event.getScheduledstarttimestamp(); + const tagsMap = event.getTagsMap(); + const traceCtx = event.getParenttracecontext(); + + return { + eventId, + timestamp, + type: HistoryEventType.ExecutionStarted, + name: event.getName(), + version: event.getVersion()?.getValue(), + input: event.getInput()?.getValue(), + orchestrationInstance: orchInstance ? convertOrchestrationInstance(orchInstance) : undefined, + parentInstance: parentInfo ? convertParentInstanceInfo(parentInfo) : undefined, + scheduledStartTimestamp: scheduledTime ? scheduledTime.toDate() : undefined, + tags: tagsMap ? convertTagsMap(tagsMap) : undefined, + parentTraceContext: traceCtx ? convertTraceContext(traceCtx) : undefined, + orchestrationSpanId: event.getOrchestrationspanid()?.getValue(), + }; +} + +function convertExecutionCompleted(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getExecutioncompleted(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.ExecutionCompleted, + orchestrationStatus: convertOrchestrationStatus(event.getOrchestrationstatus()), + result: event.getResult()?.getValue(), + failureDetails: convertFailureDetails(event.getFailuredetails()), + }; +} + +function convertExecutionTerminated(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getExecutionterminated(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.ExecutionTerminated, + input: event.getInput()?.getValue(), + recurse: event.getRecurse(), + }; +} + +function convertExecutionSuspended(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getExecutionsuspended(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.ExecutionSuspended, + input: event.getInput()?.getValue(), + }; +} + +function convertExecutionResumed(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getExecutionresumed(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.ExecutionResumed, + input: event.getInput()?.getValue(), + }; +} + +function convertExecutionRewound(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getExecutionrewound(); + if (!event) return undefined; + + const parentInfo = event.getParentinstance(); + const traceCtx = event.getParenttracecontext(); + const tagsMap = event.getTagsMap(); + + return { + eventId, + timestamp, + type: HistoryEventType.ExecutionRewound, + reason: event.getReason()?.getValue(), + parentExecutionId: event.getParentexecutionid()?.getValue(), + instanceId: event.getInstanceid()?.getValue(), + parentTraceContext: traceCtx ? convertTraceContext(traceCtx) : undefined, + name: event.getName()?.getValue(), + version: event.getVersion()?.getValue(), + input: event.getInput()?.getValue(), + parentInstance: parentInfo ? convertParentInstanceInfo(parentInfo) : undefined, + tags: tagsMap && tagsMap.getLength() > 0 ? convertTagsMap(tagsMap) : undefined, + }; +} + +function convertTaskScheduled(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getTaskscheduled(); + if (!event) return undefined; + + const tagsMap = event.getTagsMap(); + const traceCtx = event.getParenttracecontext(); + + return { + eventId, + timestamp, + type: HistoryEventType.TaskScheduled, + name: event.getName(), + version: event.getVersion()?.getValue(), + input: event.getInput()?.getValue(), + tags: tagsMap ? convertTagsMap(tagsMap) : undefined, + parentTraceContext: traceCtx ? convertTraceContext(traceCtx) : undefined, + }; +} + +function convertTaskCompleted(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getTaskcompleted(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.TaskCompleted, + taskScheduledId: event.getTaskscheduledid(), + result: event.getResult()?.getValue(), + }; +} + +function convertTaskFailed(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getTaskfailed(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.TaskFailed, + taskScheduledId: event.getTaskscheduledid(), + failureDetails: convertFailureDetails(event.getFailuredetails()), + }; +} + +function convertSubOrchestrationInstanceCreated(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getSuborchestrationinstancecreated(); + if (!event) return undefined; + + const tagsMap = event.getTagsMap(); + const traceCtx = event.getParenttracecontext(); + + return { + eventId, + timestamp, + type: HistoryEventType.SubOrchestrationInstanceCreated, + name: event.getName(), + version: event.getVersion()?.getValue(), + instanceId: event.getInstanceid(), + input: event.getInput()?.getValue(), + tags: tagsMap ? convertTagsMap(tagsMap) : undefined, + parentTraceContext: traceCtx ? convertTraceContext(traceCtx) : undefined, + }; +} + +function convertSubOrchestrationInstanceCompleted(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getSuborchestrationinstancecompleted(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.SubOrchestrationInstanceCompleted, + taskScheduledId: event.getTaskscheduledid(), + result: event.getResult()?.getValue(), + }; +} + +function convertSubOrchestrationInstanceFailed(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getSuborchestrationinstancefailed(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.SubOrchestrationInstanceFailed, + taskScheduledId: event.getTaskscheduledid(), + failureDetails: convertFailureDetails(event.getFailuredetails()), + }; +} + +function convertTimerCreated(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getTimercreated(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.TimerCreated, + fireAt: event.getFireat()?.toDate() ?? new Date(0), + }; +} + +function convertTimerFired(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getTimerfired(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.TimerFired, + fireAt: event.getFireat()?.toDate() ?? new Date(0), + timerId: event.getTimerid(), + }; +} + +function convertOrchestratorStarted(_protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent { + return { + eventId, + timestamp, + type: HistoryEventType.OrchestratorStarted, + }; +} + +function convertOrchestratorCompleted(_protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent { + return { + eventId, + timestamp, + type: HistoryEventType.OrchestratorCompleted, + }; +} + +function convertEventSent(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getEventsent(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.EventSent, + name: event.getName(), + instanceId: event.getInstanceid(), + input: event.getInput()?.getValue(), + }; +} + +function convertEventRaised(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getEventraised(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.EventRaised, + name: event.getName(), + input: event.getInput()?.getValue(), + }; +} + +function convertGenericEvent(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getGenericevent(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.GenericEvent, + data: event.getData()?.getValue(), + }; +} + +function convertHistoryState(_protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent { + return { + eventId, + timestamp, + type: HistoryEventType.HistoryState, + }; +} + +function convertContinueAsNew(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getContinueasnew(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.ContinueAsNew, + input: event.getInput()?.getValue(), + }; +} + +function convertEntityOperationSignaled(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getEntityoperationsignaled(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.EntityOperationSignaled, + requestId: event.getRequestid(), + operation: event.getOperation(), + targetInstanceId: event.getTargetinstanceid()?.getValue(), + scheduledTime: event.getScheduledtime()?.toDate(), + input: event.getInput()?.getValue(), + }; +} + +function convertEntityOperationCalled(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getEntityoperationcalled(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.EntityOperationCalled, + requestId: event.getRequestid(), + operation: event.getOperation(), + targetInstanceId: event.getTargetinstanceid()?.getValue(), + parentInstanceId: event.getParentinstanceid()?.getValue(), + scheduledTime: event.getScheduledtime()?.toDate(), + input: event.getInput()?.getValue(), + }; +} + +function convertEntityOperationCompleted(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getEntityoperationcompleted(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.EntityOperationCompleted, + requestId: event.getRequestid(), + output: event.getOutput()?.getValue(), + }; +} + +function convertEntityOperationFailed(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getEntityoperationfailed(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.EntityOperationFailed, + requestId: event.getRequestid(), + failureDetails: convertFailureDetails(event.getFailuredetails()), + }; +} + +function convertEntityLockRequested(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getEntitylockrequested(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.EntityLockRequested, + criticalSectionId: event.getCriticalsectionid(), + lockSet: event.getLocksetList(), + position: event.getPosition(), + parentInstanceId: event.getParentinstanceid()?.getValue(), + }; +} + +function convertEntityLockGranted(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getEntitylockgranted(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.EntityLockGranted, + criticalSectionId: event.getCriticalsectionid(), + }; +} + +function convertEntityUnlockSent(protoEvent: pb.HistoryEvent, eventId: number, timestamp: Date): HistoryEvent | undefined { + const event = protoEvent.getEntityunlocksent(); + if (!event) return undefined; + + return { + eventId, + timestamp, + type: HistoryEventType.EntityUnlockSent, + criticalSectionId: event.getCriticalsectionid(), + parentInstanceId: event.getParentinstanceid()?.getValue(), + targetInstanceId: event.getTargetinstanceid()?.getValue(), + }; +} + +// Dispatch map from proto event type case to converter function +const EVENT_TYPE_CONVERTERS: Partial> = { + [pb.HistoryEvent.EventtypeCase.EXECUTIONSTARTED]: convertExecutionStarted, + [pb.HistoryEvent.EventtypeCase.EXECUTIONCOMPLETED]: convertExecutionCompleted, + [pb.HistoryEvent.EventtypeCase.EXECUTIONTERMINATED]: convertExecutionTerminated, + [pb.HistoryEvent.EventtypeCase.EXECUTIONSUSPENDED]: convertExecutionSuspended, + [pb.HistoryEvent.EventtypeCase.EXECUTIONRESUMED]: convertExecutionResumed, + [pb.HistoryEvent.EventtypeCase.EXECUTIONREWOUND]: convertExecutionRewound, + [pb.HistoryEvent.EventtypeCase.TASKSCHEDULED]: convertTaskScheduled, + [pb.HistoryEvent.EventtypeCase.TASKCOMPLETED]: convertTaskCompleted, + [pb.HistoryEvent.EventtypeCase.TASKFAILED]: convertTaskFailed, + [pb.HistoryEvent.EventtypeCase.SUBORCHESTRATIONINSTANCECREATED]: convertSubOrchestrationInstanceCreated, + [pb.HistoryEvent.EventtypeCase.SUBORCHESTRATIONINSTANCECOMPLETED]: convertSubOrchestrationInstanceCompleted, + [pb.HistoryEvent.EventtypeCase.SUBORCHESTRATIONINSTANCEFAILED]: convertSubOrchestrationInstanceFailed, + [pb.HistoryEvent.EventtypeCase.TIMERCREATED]: convertTimerCreated, + [pb.HistoryEvent.EventtypeCase.TIMERFIRED]: convertTimerFired, + [pb.HistoryEvent.EventtypeCase.ORCHESTRATORSTARTED]: convertOrchestratorStarted, + [pb.HistoryEvent.EventtypeCase.ORCHESTRATORCOMPLETED]: convertOrchestratorCompleted, + [pb.HistoryEvent.EventtypeCase.EVENTSENT]: convertEventSent, + [pb.HistoryEvent.EventtypeCase.EVENTRAISED]: convertEventRaised, + [pb.HistoryEvent.EventtypeCase.GENERICEVENT]: convertGenericEvent, + [pb.HistoryEvent.EventtypeCase.HISTORYSTATE]: convertHistoryState, + [pb.HistoryEvent.EventtypeCase.CONTINUEASNEW]: convertContinueAsNew, + [pb.HistoryEvent.EventtypeCase.ENTITYOPERATIONSIGNALED]: convertEntityOperationSignaled, + [pb.HistoryEvent.EventtypeCase.ENTITYOPERATIONCALLED]: convertEntityOperationCalled, + [pb.HistoryEvent.EventtypeCase.ENTITYOPERATIONCOMPLETED]: convertEntityOperationCompleted, + [pb.HistoryEvent.EventtypeCase.ENTITYOPERATIONFAILED]: convertEntityOperationFailed, + [pb.HistoryEvent.EventtypeCase.ENTITYLOCKREQUESTED]: convertEntityLockRequested, + [pb.HistoryEvent.EventtypeCase.ENTITYLOCKGRANTED]: convertEntityLockGranted, + [pb.HistoryEvent.EventtypeCase.ENTITYUNLOCKSENT]: convertEntityUnlockSent, +}; + +/** + * Converts a protobuf HistoryEvent to a TypeScript HistoryEvent. + * @param protoEvent The protobuf HistoryEvent to convert. + * @returns The converted HistoryEvent, or undefined if the event type is not recognized. + */ +export function convertProtoHistoryEvent(protoEvent: pb.HistoryEvent): HistoryEvent | undefined { + const eventId = protoEvent.getEventid(); + const timestamp = protoEvent.getTimestamp()?.toDate() ?? new Date(0); + const eventTypeCase = protoEvent.getEventtypeCase(); + + const converter = EVENT_TYPE_CONVERTERS[eventTypeCase]; + if (!converter) { + return undefined; + } + + return converter(protoEvent, eventId, timestamp); +} + +function convertOrchestrationInstance(instance: pb.OrchestrationInstance): OrchestrationInstance { + return { + instanceId: instance.getInstanceid(), + executionId: instance.getExecutionid()?.getValue(), + }; +} + +function convertParentInstanceInfo(parent: pb.ParentInstanceInfo): ParentInstanceInfo { + const orchInstance = parent.getOrchestrationinstance(); + return { + name: parent.getName()?.getValue(), + version: parent.getVersion()?.getValue(), + taskScheduledId: parent.getTaskscheduledid(), + orchestrationInstance: orchInstance ? convertOrchestrationInstance(orchInstance) : undefined, + }; +} + +function convertTraceContext(traceContext: pb.TraceContext): TraceContext { + return { + traceParent: traceContext.getTraceparent(), + spanId: traceContext.getSpanid(), + traceState: traceContext.getTracestate()?.getValue(), + }; +} + +function convertFailureDetails(details: pb.TaskFailureDetails | undefined): FailureDetails | undefined { + if (!details) return undefined; + + return new FailureDetails( + details.getErrormessage(), + details.getErrortype(), + details.getStacktrace()?.getValue(), + ); +} + +function convertTagsMap(tagsMap: ReturnType): Record | undefined { + const result: Record = {}; + let hasEntries = false; + + tagsMap.forEach((value: string, key: string) => { + result[key] = value; + hasEntries = true; + }); + + return hasEntries ? result : undefined; +} diff --git a/packages/durabletask-js/src/utils/pb-helper.util.ts b/packages/durabletask-js/src/utils/pb-helper.util.ts index 080613f..dd03077 100644 --- a/packages/durabletask-js/src/utils/pb-helper.util.ts +++ b/packages/durabletask-js/src/utils/pb-helper.util.ts @@ -418,6 +418,23 @@ export function isEmpty(v?: StringValue | null): boolean { return v == null || v.getValue() === ""; } +/** + * Parses the value of a StringValue protobuf field as JSON. + * Returns undefined if the value is null/undefined or empty. + * + * @param v The StringValue field to parse. + * @returns The parsed JSON value, or undefined. + */ +export function parseJsonField(v?: StringValue | null): any { + if (v == null || v.getValue() === "") return undefined; + try { + return JSON.parse(v.getValue()); + } catch (err) { + // Wrap JSON.parse errors to provide clearer context to callers while preserving the original error + throw new Error(`Failed to parse JSON from StringValue: ${(err as Error).message}`, { cause: err }); + } +} + // Pre-built reverse map for O(1) orchestration status string lookups const orchestrationStatusStrMap = new Map(); for (const [name, value] of Object.entries(pb.OrchestrationStatus)) { @@ -471,14 +488,7 @@ export function newSendEntityMessageSignalAction( signalEvent.setScheduledtime(ts); } - const sendEntityMessage = new pb.SendEntityMessageAction(); - sendEntityMessage.setEntityoperationsignaled(signalEvent); - - const action = new pb.OrchestratorAction(); - action.setId(id); - action.setSendentitymessage(sendEntityMessage); - - return action; + return wrapEntityMessageAction(id, (msg) => msg.setEntityoperationsignaled(signalEvent)); } /** @@ -522,14 +532,7 @@ export function newSendEntityMessageCallAction( callEvent.setScheduledtime(ts); } - const sendEntityMessage = new pb.SendEntityMessageAction(); - sendEntityMessage.setEntityoperationcalled(callEvent); - - const action = new pb.OrchestratorAction(); - action.setId(id); - action.setSendentitymessage(sendEntityMessage); - - return action; + return wrapEntityMessageAction(id, (msg) => msg.setEntityoperationcalled(callEvent)); } /** @@ -558,14 +561,7 @@ export function newSendEntityMessageLockAction( lockEvent.setPosition(0); lockEvent.setParentinstanceid(getStringValue(parentInstanceId)); - const sendEntityMessage = new pb.SendEntityMessageAction(); - sendEntityMessage.setEntitylockrequested(lockEvent); - - const action = new pb.OrchestratorAction(); - action.setId(id); - action.setSendentitymessage(sendEntityMessage); - - return action; + return wrapEntityMessageAction(id, (msg) => msg.setEntitylockrequested(lockEvent)); } /** @@ -592,13 +588,18 @@ export function newSendEntityMessageUnlockAction( unlockEvent.setTargetinstanceid(getStringValue(targetInstanceId)); unlockEvent.setParentinstanceid(getStringValue(parentInstanceId)); - const sendEntityMessage = new pb.SendEntityMessageAction(); - sendEntityMessage.setEntityunlocksent(unlockEvent); + return wrapEntityMessageAction(id, (msg) => msg.setEntityunlocksent(unlockEvent)); +} +function wrapEntityMessageAction( + id: number, + setEvent: (msg: pb.SendEntityMessageAction) => void, +): pb.OrchestratorAction { + const sendEntityMessage = new pb.SendEntityMessageAction(); + setEvent(sendEntityMessage); const action = new pb.OrchestratorAction(); action.setId(id); action.setSendentitymessage(sendEntityMessage); - return action; } diff --git a/packages/durabletask-js/src/worker/entity-executor.ts b/packages/durabletask-js/src/worker/entity-executor.ts index 3a26e4a..1176cf6 100644 --- a/packages/durabletask-js/src/worker/entity-executor.ts +++ b/packages/durabletask-js/src/worker/entity-executor.ts @@ -68,7 +68,7 @@ class StateShim implements TaskEntityState { } get hasState(): boolean { - return this.serializedValue !== undefined && this.serializedValue !== null; + return this.serializedValue != null; } getState(defaultValue?: T): T | undefined { @@ -90,7 +90,7 @@ class StateShim implements TaskEntityState { this.cachedValue = state; try { this.serializedValue = - state !== undefined && state !== null ? JSON.stringify(state) : undefined; + state != null ? JSON.stringify(state) : undefined; this.cacheValid = true; } catch (e) { diff --git a/packages/durabletask-js/src/worker/orchestration-executor.ts b/packages/durabletask-js/src/worker/orchestration-executor.ts index 8456b58..c5eea00 100644 --- a/packages/durabletask-js/src/worker/orchestration-executor.ts +++ b/packages/durabletask-js/src/worker/orchestration-executor.ts @@ -19,7 +19,8 @@ import { RetryHandlerTask } from "../task/retry-handler-task"; import { RetryTimerTask } from "../task/retry-timer-task"; import { TOrchestrator } from "../types/orchestrator.type"; import { enumValueToKey } from "../utils/enum.util"; -import { isEmpty } from "../utils/pb-helper.util"; +import { isEmpty, parseJsonField } from "../utils/pb-helper.util"; +import type { StringValue } from "google-protobuf/google/protobuf/wrappers_pb"; import { OrchestratorNotRegisteredError } from "./exception/orchestrator-not-registered-error"; import { StopIterationError } from "./exception/stop-iteration-error"; import { Registry } from "./registry"; @@ -129,614 +130,67 @@ export class OrchestrationExecutor { try { switch (eventType) { case pb.HistoryEvent.EventtypeCase.ORCHESTRATORSTARTED: - ctx._currentUtcDatetime = event.getTimestamp()?.toDate() ?? ctx._currentUtcDatetime; + await this.handleOrchestratorStarted(ctx, event); break; case pb.HistoryEvent.EventtypeCase.EXECUTIONSTARTED: - { - // TODO: Check if we already started the orchestration - const executionStartedEvent = event.getExecutionstarted(); - const fn = this._registry.getOrchestrator( - executionStartedEvent ? executionStartedEvent.getName() : undefined, - ); - - if (!fn) { - throw new OrchestratorNotRegisteredError(executionStartedEvent?.getName()); - } - - // Set the execution ID from the orchestration instance - const executionId = executionStartedEvent?.getOrchestrationinstance()?.getExecutionid()?.getValue(); - if (executionId) { - ctx._executionId = executionId; - } - - // Track the orchestrator name for lifecycle logs - this._orchestratorName = executionStartedEvent?.getName() ?? "(unknown)"; - - // Log orchestration start (EventId 600) - WorkerLogs.orchestrationStarted(this._logger, ctx._instanceId, this._orchestratorName); - - // Set the version from the execution started event - ctx._version = executionStartedEvent?.getVersion()?.getValue() ?? ""; - - // Extract parent instance info if this is a sub-orchestration - const parentInstance = executionStartedEvent?.getParentinstance(); - if (parentInstance) { - const parentOrchestrationInstance = parentInstance.getOrchestrationinstance(); - ctx._parent = { - name: parentInstance.getName()?.getValue() ?? "", - instanceId: parentOrchestrationInstance?.getInstanceid() ?? "", - taskScheduledId: parentInstance.getTaskscheduledid(), - }; - } - - // Deserialize the input, if any - let input = undefined; - - if (executionStartedEvent?.getInput() && executionStartedEvent?.getInput()?.toString() !== "") { - input = JSON.parse(executionStartedEvent.getInput()?.toString() || "{}"); - } - - // This does not execute the generator, it creates it - // since we create an async iterator, we await the creation (so we can use await in the generator itself beside yield) - const result = await fn(ctx, input); - - const isAsyncGenerator = typeof result?.[Symbol.asyncIterator] === "function"; - if (isAsyncGenerator) { - // Start the orchestrator's generator function - await ctx.run(result); - } else { - const resultType = Object.prototype.toString.call(result); - WorkerLogs.orchestrationNoTasks(this._logger, resultType); - - // This is an orchestrator that doesn't schedule any tasks - ctx.setComplete(result, pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); - } - } + await this.handleExecutionStarted(ctx, event); break; case pb.HistoryEvent.EventtypeCase.TIMERCREATED: - // This history event confirms that the timer was successfully scheduled. Remove the timerCreated event from the pending action list so we don't schedule it again. - { - const timerId = event.getEventid(); - const action = ctx._pendingActions[timerId]; - - // Delete it - delete ctx._pendingActions[timerId]; - - const isTimerAction = action?.getCreatetimer(); - - if (!action) { - throw getNonDeterminismError(timerId, getName(ctx.createTimer)); - } else if (!isTimerAction) { - const expectedMethodName = getName(ctx.createTimer); - throw getWrongActionTypeError(timerId, expectedMethodName, action); - } - } + await this.handleTimerCreated(ctx, event); break; case pb.HistoryEvent.EventtypeCase.TIMERFIRED: - { - const timerFiredEvent = event.getTimerfired(); - const timerId = timerFiredEvent ? timerFiredEvent.getTimerid() : undefined; - - if (timerId === undefined) { - if (!ctx._isReplaying) { - WorkerLogs.orchestrationUnexpectedEvent(this._logger, ctx._instanceId, "timerFired"); - } - return; - } - - const timerTask = ctx._pendingTasks[timerId]; - delete ctx._pendingTasks[timerId]; - - if (!timerTask) { - // TODO: Should this be an error? When would it ever happen? - if (!ctx._isReplaying) { - WorkerLogs.orchestrationUnexpectedEvent(this._logger, ctx._instanceId, "timerFired", timerId); - } - return; - } - - // Check if this is a retry timer - if (timerTask instanceof RetryTimerTask) { - // Get the parent retry task and reschedule it - const retryTask = timerTask.retryableParent; - // Reschedule the original action - this will add it back to pendingActions - ctx.rescheduleRetryTask(retryTask); - // Don't resume the orchestrator - we're just rescheduling the task - return; - } - - timerTask.complete(undefined); - await ctx.resume(); - } + await this.handleTimerFired(ctx, event); break; - // This history event confirms that the activity execution was successfully scheduled. Remove the taskscheduled event from the pending action list so we don't schedule it again. case pb.HistoryEvent.EventtypeCase.TASKSCHEDULED: - { - const taskId = event.getEventid(); - const action = ctx._pendingActions[taskId]; - delete ctx._pendingActions[taskId]; - - const isScheduleTaskAction = action?.hasScheduletask(); - - if (!action) { - throw getNonDeterminismError(taskId, getName(ctx.callActivity)); - } else if (!isScheduleTaskAction) { - const expectedMethodName = getName(ctx.callActivity); - throw getWrongActionTypeError(taskId, expectedMethodName, action); - } else if (action.getScheduletask()?.getName() != event.getTaskscheduled()?.getName()) { - throw getWrongActionNameError( - taskId, - getName(ctx.callActivity), - event.getTaskscheduled()?.getName(), - action.getScheduletask()?.getName(), - ); - } - } - + await this.handleTaskScheduled(ctx, event); break; - // This history event contains the result of a completed activity task case pb.HistoryEvent.EventtypeCase.TASKCOMPLETED: - { - const taskCompletedEvent = event.getTaskcompleted(); - const taskId = taskCompletedEvent ? taskCompletedEvent.getTaskscheduledid() : undefined; - - let activityTask; - - if (taskId) { - activityTask = ctx._pendingTasks[taskId]; - delete ctx._pendingTasks[taskId]; - } - - if (!activityTask) { - // TODO: Should this be an error? When would it ever happen? - if (!ctx._isReplaying) { - WorkerLogs.orchestrationUnexpectedEvent(this._logger, ctx._instanceId, "taskCompleted", taskId); - } - - return; - } - - let result; - - if (!isEmpty(event.getTaskcompleted()?.getResult())) { - result = JSON.parse(event.getTaskcompleted()?.getResult()?.toString() || ""); - } - - activityTask.complete(result); - await ctx.resume(); - } + await this.handleTaskCompleted(ctx, event); break; case pb.HistoryEvent.EventtypeCase.TASKFAILED: - { - const taskFailedEvent = event.getTaskfailed(); - const taskId = taskFailedEvent ? taskFailedEvent.getTaskscheduledid() : undefined; - - if (taskId === undefined) { - if (!ctx._isReplaying) { - WorkerLogs.orchestrationUnexpectedEvent(this._logger, ctx._instanceId, "taskFailed"); - } - return; - } - - const failureDetails = event.getTaskfailed()?.getFailuredetails(); - const errorMessage = failureDetails?.getErrormessage() || "Unknown error"; - - // Get the task (don't delete yet - we might retry) - const activityTask = ctx._pendingTasks[taskId]; - - if (!activityTask) { - if (!ctx._isReplaying) { - WorkerLogs.orchestrationUnexpectedEvent(this._logger, ctx._instanceId, "taskFailed", taskId); - } - return; - } - - // Check if this task supports retry and handle it - const retried = await this.tryHandleRetry(activityTask, errorMessage, failureDetails, taskId, ctx); - if (retried) { - return; - } - - // No retry - fail the task - delete ctx._pendingTasks[taskId]; - - activityTask.fail( - `Activity task #${taskId} failed: ${errorMessage}`, - failureDetails, - ); - - await ctx.resume(); - } + await this.handleTaskFailed(ctx, event); break; - // This history event confirms that the sub-orcehstration execution was successfully scheduled. Remove the subOrchestrationInstanceCreated event from the pending action list so we don't schedule it again. case pb.HistoryEvent.EventtypeCase.SUBORCHESTRATIONINSTANCECREATED: - { - const taskId = event.getEventid(); - const action = ctx._pendingActions[taskId]; - delete ctx._pendingActions[taskId]; - - const isCreateSubOrchestrationAction = action?.hasCreatesuborchestration(); - - if (!action) { - throw getNonDeterminismError(taskId, getName(ctx.callSubOrchestrator)); - } else if (!isCreateSubOrchestrationAction) { - const expectedMethodName = getName(ctx.callSubOrchestrator); - throw getWrongActionTypeError(taskId, expectedMethodName, action); - } else if ( - action.getCreatesuborchestration()?.getName() != event.getSuborchestrationinstancecreated()?.getName() - ) { - throw getWrongActionNameError( - taskId, - getName(ctx.callSubOrchestrator), - event.getSuborchestrationinstancecreated()?.getName(), - action.getCreatesuborchestration()?.getName(), - ); - } - } + await this.handleSubOrchestrationCreated(ctx, event); break; case pb.HistoryEvent.EventtypeCase.SUBORCHESTRATIONINSTANCECOMPLETED: - { - const subOrchestrationInstanceCompletedEvent = event.getSuborchestrationinstancecompleted(); - const taskId = subOrchestrationInstanceCompletedEvent - ? subOrchestrationInstanceCompletedEvent.getTaskscheduledid() - : undefined; - - let subOrchTask; - - if (taskId) { - subOrchTask = ctx._pendingTasks[taskId]; - delete ctx._pendingTasks[taskId]; - } - - let result; - - if (!isEmpty(event.getSuborchestrationinstancecompleted()?.getResult())) { - result = JSON.parse(event.getSuborchestrationinstancecompleted()?.getResult()?.toString() || ""); - } - - if (subOrchTask) { - subOrchTask.complete(result); - } - - await ctx.resume(); - } + await this.handleSubOrchestrationCompleted(ctx, event); break; case pb.HistoryEvent.EventtypeCase.SUBORCHESTRATIONINSTANCEFAILED: - { - const subOrchestrationInstanceFailedEvent = event.getSuborchestrationinstancefailed(); - const taskId = subOrchestrationInstanceFailedEvent - ? subOrchestrationInstanceFailedEvent.getTaskscheduledid() - : undefined; - - if (taskId === undefined) { - if (!ctx._isReplaying) { - WorkerLogs.orchestrationUnexpectedEvent(this._logger, ctx._instanceId, "subOrchestrationInstanceFailed"); - } - return; - } - - const failureDetails = event.getSuborchestrationinstancefailed()?.getFailuredetails(); - const errorMessage = failureDetails?.getErrormessage() || "Unknown error"; - - // Get the task (don't delete yet - we might retry) - const subOrchTask = ctx._pendingTasks[taskId]; - - if (!subOrchTask) { - if (!ctx._isReplaying) { - WorkerLogs.orchestrationUnexpectedEvent( - this._logger, ctx._instanceId, "subOrchestrationInstanceFailed", taskId, - ); - } - return; - } - - // Check if this task supports retry and handle it - const retried = await this.tryHandleRetry(subOrchTask, errorMessage, failureDetails, taskId, ctx); - if (retried) { - return; - } - - // No retry - fail the task - delete ctx._pendingTasks[taskId]; - - subOrchTask.fail( - `Sub-orchestration task #${taskId} failed: ${errorMessage}`, - failureDetails, - ); - - await ctx.resume(); - } + await this.handleSubOrchestrationFailed(ctx, event); break; case pb.HistoryEvent.EventtypeCase.EVENTRAISED: - { - // Event names are case-insensitive - const eventName = event.getEventraised()?.getName()?.toLowerCase(); - - if (!ctx._isReplaying) { - WorkerLogs.orchestrationEventRaised(this._logger, ctx._instanceId, eventName!); - } - - let taskList; - - if (eventName) { - taskList = ctx._pendingEvents[eventName]; - } - - let decodedResult; - - if (taskList) { - const eventTask = taskList.shift(); - - if (!isEmpty(event.getEventraised()?.getInput())) { - decodedResult = JSON.parse(event.getEventraised()?.getInput()?.toString() || ""); - } - - if (eventTask) { - eventTask.complete(decodedResult); - } - - if ((taskList?.length ?? 0) === 0 && eventName) { - delete ctx._pendingEvents[eventName]; - } - - await ctx.resume(); - } else { - // Buffer the event - let eventList: any[] | undefined = []; - - if (eventName) { - eventList = ctx._receivedEvents[eventName]; - - if (!eventList?.length) { - eventList = []; - ctx._receivedEvents[eventName] = eventList; - } - } - - if (!isEmpty(event.getEventraised()?.getInput())) { - decodedResult = JSON.parse(event.getEventraised()?.getInput()?.toString() || ""); - } - - eventList?.push(decodedResult); - - if (!ctx._isReplaying) { - WorkerLogs.orchestrationEventBuffered(this._logger, ctx._instanceId, eventName!); - } - } - } + await this.handleEventRaised(ctx, event); break; case pb.HistoryEvent.EventtypeCase.EXECUTIONSUSPENDED: - { - if (!this._isSuspended && !ctx._isReplaying) { - WorkerLogs.orchestrationSuspended(this._logger, ctx._instanceId); - } - - this._isSuspended = true; - } + await this.handleExecutionSuspended(ctx, event); break; case pb.HistoryEvent.EventtypeCase.EXECUTIONRESUMED: - if (!this._isSuspended) { - return; - } - - this._isSuspended = false; - - for (const e of this._suspendedEvents) { - await this.processEvent(ctx, e); - } - - this._suspendedEvents = []; + await this.handleExecutionResumed(ctx, event); break; - case pb.HistoryEvent.EventtypeCase.EXECUTIONTERMINATED: { - if (!ctx._isReplaying) { - WorkerLogs.orchestrationTerminated(this._logger, ctx._instanceId); - } - - let encodedOutput; - - if (!isEmpty(event.getExecutionterminated()?.getInput())) { - encodedOutput = event.getExecutionterminated()?.getInput()?.getValue(); - } - - ctx.setComplete(encodedOutput, pb.OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED, true); + case pb.HistoryEvent.EventtypeCase.EXECUTIONTERMINATED: + await this.handleExecutionTerminated(ctx, event); break; - } - // This history event confirms that the entity call was successfully scheduled. - // Remove the action from the pending action list so we don't schedule it again. case pb.HistoryEvent.EventtypeCase.ENTITYOPERATIONCALLED: - { - const eventId = event.getEventid(); - const action = ctx._pendingActions[eventId]; - delete ctx._pendingActions[eventId]; - - const isSendEntityMessageAction = action?.hasSendentitymessage(); - - if (!action) { - throw getNonDeterminismError(eventId, "callEntity"); - } else if (!isSendEntityMessageAction) { - throw getWrongActionTypeError(eventId, "callEntity", action); - } else if (!action.getSendentitymessage()?.hasEntityoperationcalled()) { - throw getWrongActionTypeError(eventId, "callEntity (EntityOperationCalled)", action); - } - } + await this.handleEntityOperationCalled(ctx, event); break; - // This history event confirms that the entity signal was successfully scheduled. - // Remove the action from the pending action list so we don't schedule it again. case pb.HistoryEvent.EventtypeCase.ENTITYOPERATIONSIGNALED: - { - const eventId = event.getEventid(); - const action = ctx._pendingActions[eventId]; - delete ctx._pendingActions[eventId]; - - const isSendEntityMessageAction = action?.hasSendentitymessage(); - - if (!action) { - throw getNonDeterminismError(eventId, "signalEntity"); - } else if (!isSendEntityMessageAction) { - throw getWrongActionTypeError(eventId, "signalEntity", action); - } else if (!action.getSendentitymessage()?.hasEntityoperationsignaled()) { - throw getWrongActionTypeError(eventId, "signalEntity (EntityOperationSignaled)", action); - } - } + await this.handleEntityOperationSignaled(ctx, event); break; - // This history event confirms that the lock request was successfully scheduled. - // Remove the action from the pending action list so we don't schedule it again. - // The pending lock request in _entityFeature.pendingLockRequests remains to receive the granted event. case pb.HistoryEvent.EventtypeCase.ENTITYLOCKREQUESTED: - { - const eventId = event.getEventid(); - const action = ctx._pendingActions[eventId]; - delete ctx._pendingActions[eventId]; - - const isSendEntityMessageAction = action?.hasSendentitymessage(); - - if (!action) { - throw getNonDeterminismError(eventId, "lockEntities"); - } else if (!isSendEntityMessageAction) { - throw getWrongActionTypeError(eventId, "lockEntities", action); - } else if (!action.getSendentitymessage()?.hasEntitylockrequested()) { - throw getWrongActionTypeError(eventId, "lockEntities (EntityLockRequested)", action); - } - } + await this.handleEntityLockRequested(ctx, event); break; case pb.HistoryEvent.EventtypeCase.ENTITYOPERATIONCOMPLETED: - { - const completedEvent = event.getEntityoperationcompleted(); - const requestId = completedEvent?.getRequestid(); - - if (!requestId) { - WorkerLogs.entityEventIgnored( - this._logger, - ctx._instanceId, - "EntityOperationCompletedEvent", - "no requestId", - ); - return; - } - - // Find the pending entity call by requestId - const pendingCall = ctx._entityFeature.pendingEntityCalls.get(requestId); - if (!pendingCall) { - // This could happen during replay or if the call was already processed - if (!ctx._isReplaying) { - WorkerLogs.entityEventIgnored( - this._logger, - ctx._instanceId, - "EntityOperationCompletedEvent", - `unexpected requestId = ${requestId}`, - ); - } - return; - } - - // Remove from pending calls - ctx._entityFeature.pendingEntityCalls.delete(requestId); - - // If in a critical section, recover the lock for this entity - ctx._entityFeature.recoverLockAfterCall(pendingCall.entityId); - - // Parse the result and complete the task - let result; - if (!isEmpty(completedEvent?.getOutput())) { - result = JSON.parse(completedEvent?.getOutput()?.getValue() || "null"); - } - - pendingCall.task.complete(result); - await ctx.resume(); - } + await this.handleEntityOperationCompleted(ctx, event); break; case pb.HistoryEvent.EventtypeCase.ENTITYOPERATIONFAILED: - { - const failedEvent = event.getEntityoperationfailed(); - const requestId = failedEvent?.getRequestid(); - - if (!requestId) { - WorkerLogs.entityEventIgnored( - this._logger, - ctx._instanceId, - "EntityOperationFailedEvent", - "no requestId", - ); - return; - } - - // Find the pending entity call by requestId - const pendingCall = ctx._entityFeature.pendingEntityCalls.get(requestId); - if (!pendingCall) { - // This could happen during replay or if the call was already processed - if (!ctx._isReplaying) { - WorkerLogs.entityEventIgnored( - this._logger, - ctx._instanceId, - "EntityOperationFailedEvent", - `unexpected requestId = ${requestId}`, - ); - } - return; - } - - // Remove from pending calls - ctx._entityFeature.pendingEntityCalls.delete(requestId); - - // If in a critical section, recover the lock for this entity - ctx._entityFeature.recoverLockAfterCall(pendingCall.entityId); - - // Convert failure details and throw EntityOperationFailedException - const failureDetails = createTaskFailureDetails(failedEvent?.getFailuredetails()); - if (!failureDetails) { - pendingCall.task.fail( - `Entity operation '${pendingCall.operationName}' failed with unknown error`, - ); - } else { - const exception = new EntityOperationFailedException( - pendingCall.entityId, - pendingCall.operationName, - failureDetails, - ); - pendingCall.task.fail(exception.message, failedEvent?.getFailuredetails()); - } - - await ctx.resume(); - } + await this.handleEntityOperationFailed(ctx, event); break; case pb.HistoryEvent.EventtypeCase.ENTITYLOCKGRANTED: - { - const lockGrantedEvent = event.getEntitylockgranted(); - const criticalSectionId = lockGrantedEvent?.getCriticalsectionid(); - - if (!criticalSectionId) { - WorkerLogs.entityEventIgnored( - this._logger, - ctx._instanceId, - "EntityLockGrantedEvent", - "no criticalSectionId", - ); - return; - } - - // Find the pending lock request by criticalSectionId - const pendingRequest = ctx._entityFeature.pendingLockRequests.get(criticalSectionId); - if (!pendingRequest) { - // This could happen during replay or if the lock was already acquired - if (!ctx._isReplaying) { - WorkerLogs.entityEventIgnored( - this._logger, - ctx._instanceId, - "EntityLockGrantedEvent", - `unexpected criticalSectionId = ${criticalSectionId}`, - ); - } - return; - } - - // Complete the lock acquisition - ctx._entityFeature.completeLockAcquisition(criticalSectionId); - await ctx.resume(); - } + await this.handleEntityLockGranted(ctx, event); break; default: WorkerLogs.orchestrationUnknownEvent(this._logger, eventTypeName, eventType); - // throw new OrchestrationStateError(`Unknown history event type: ${eventTypeName} (value: ${eventType})`); } } catch (e: unknown) { // StopIteration is thrown when the generator is finished and didn't return a task as its next action @@ -752,6 +206,558 @@ export class OrchestrationExecutor { } } + private async handleOrchestratorStarted(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + ctx._currentUtcDatetime = event.getTimestamp()?.toDate() ?? ctx._currentUtcDatetime; + } + + private async handleExecutionStarted(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + // TODO: Check if we already started the orchestration + const executionStartedEvent = event.getExecutionstarted(); + const fn = this._registry.getOrchestrator( + executionStartedEvent ? executionStartedEvent.getName() : undefined, + ); + + if (!fn) { + throw new OrchestratorNotRegisteredError(executionStartedEvent?.getName()); + } + + // Set the execution ID from the orchestration instance + const executionId = executionStartedEvent?.getOrchestrationinstance()?.getExecutionid()?.getValue(); + if (executionId) { + ctx._executionId = executionId; + } + + // Track the orchestrator name for lifecycle logs + this._orchestratorName = executionStartedEvent?.getName() ?? "(unknown)"; + + // Log orchestration start (EventId 600) + WorkerLogs.orchestrationStarted(this._logger, ctx._instanceId, this._orchestratorName); + + // Set the version from the execution started event + ctx._version = executionStartedEvent?.getVersion()?.getValue() ?? ""; + + // Extract parent instance info if this is a sub-orchestration + const parentInstance = executionStartedEvent?.getParentinstance(); + if (parentInstance) { + const parentOrchestrationInstance = parentInstance.getOrchestrationinstance(); + ctx._parent = { + name: parentInstance.getName()?.getValue() ?? "", + instanceId: parentOrchestrationInstance?.getInstanceid() ?? "", + taskScheduledId: parentInstance.getTaskscheduledid(), + }; + } + + // Deserialize the input, if any + const inputField = executionStartedEvent?.getInput(); + const input = isEmpty(inputField) ? undefined : parseJsonField(inputField); + + // This does not execute the generator, it creates it + // since we create an async iterator, we await the creation (so we can use await in the generator itself beside yield) + const result = await fn(ctx, input); + + const isAsyncGenerator = typeof result?.[Symbol.asyncIterator] === "function"; + if (isAsyncGenerator) { + // Start the orchestrator's generator function + await ctx.run(result); + } else { + const resultType = Object.prototype.toString.call(result); + WorkerLogs.orchestrationNoTasks(this._logger, resultType); + + // This is an orchestrator that doesn't schedule any tasks + ctx.setComplete(result, pb.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED); + } + } + + private async handleTimerCreated(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + // This history event confirms that the timer was successfully scheduled. Remove the timerCreated event from the pending action list so we don't schedule it again. + const timerId = event.getEventid(); + const action = ctx._pendingActions[timerId]; + + // Delete it + delete ctx._pendingActions[timerId]; + + const isTimerAction = action?.getCreatetimer(); + + if (!action) { + throw getNonDeterminismError(timerId, getName(ctx.createTimer)); + } else if (!isTimerAction) { + const expectedMethodName = getName(ctx.createTimer); + throw getWrongActionTypeError(timerId, expectedMethodName, action); + } + } + + private async handleTimerFired(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + const timerFiredEvent = event.getTimerfired(); + const timerId = timerFiredEvent ? timerFiredEvent.getTimerid() : undefined; + + if (timerId === undefined) { + if (!ctx._isReplaying) { + WorkerLogs.orchestrationUnexpectedEvent(this._logger, ctx._instanceId, "timerFired"); + } + return; + } + + const timerTask = ctx._pendingTasks[timerId]; + delete ctx._pendingTasks[timerId]; + + if (!timerTask) { + // TODO: Should this be an error? When would it ever happen? + if (!ctx._isReplaying) { + WorkerLogs.orchestrationUnexpectedEvent(this._logger, ctx._instanceId, "timerFired", timerId); + } + return; + } + + // Check if this is a retry timer + if (timerTask instanceof RetryTimerTask) { + // Get the parent retry task and reschedule it + const retryTask = timerTask.retryableParent; + // Reschedule the original action - this will add it back to pendingActions + ctx.rescheduleRetryTask(retryTask); + // Don't resume the orchestrator - we're just rescheduling the task + return; + } + + timerTask.complete(undefined); + await ctx.resume(); + } + + private async handleTaskScheduled(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + // This history event confirms that the activity execution was successfully scheduled. Remove the taskscheduled event from the pending action list so we don't schedule it again. + const taskId = event.getEventid(); + const action = ctx._pendingActions[taskId]; + delete ctx._pendingActions[taskId]; + + const isScheduleTaskAction = action?.hasScheduletask(); + + if (!action) { + throw getNonDeterminismError(taskId, getName(ctx.callActivity)); + } else if (!isScheduleTaskAction) { + const expectedMethodName = getName(ctx.callActivity); + throw getWrongActionTypeError(taskId, expectedMethodName, action); + } else if (action.getScheduletask()?.getName() != event.getTaskscheduled()?.getName()) { + throw getWrongActionNameError( + taskId, + getName(ctx.callActivity), + event.getTaskscheduled()?.getName(), + action.getScheduletask()?.getName(), + ); + } + } + + private async handleTaskCompleted(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + const taskCompletedEvent = event.getTaskcompleted(); + const taskId = taskCompletedEvent ? taskCompletedEvent.getTaskscheduledid() : undefined; + const result = taskCompletedEvent?.getResult(); + const normalizedResult = isEmpty(result) ? undefined : result; + await this.handleCompletedTask(ctx, taskId, normalizedResult, "taskCompleted"); + } + + private async handleTaskFailed(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + const taskFailedEvent = event.getTaskfailed(); + const taskId = taskFailedEvent ? taskFailedEvent.getTaskscheduledid() : undefined; + const failureDetails = taskFailedEvent?.getFailuredetails(); + await this.handleFailedTask(ctx, taskId, failureDetails, "taskFailed", "Activity task"); + } + + private async handleSubOrchestrationCreated(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + // This history event confirms that the sub-orchestration execution was successfully scheduled. Remove the subOrchestrationInstanceCreated event from the pending action list so we don't schedule it again. + const taskId = event.getEventid(); + const action = ctx._pendingActions[taskId]; + delete ctx._pendingActions[taskId]; + + const isCreateSubOrchestrationAction = action?.hasCreatesuborchestration(); + + if (!action) { + throw getNonDeterminismError(taskId, getName(ctx.callSubOrchestrator)); + } else if (!isCreateSubOrchestrationAction) { + const expectedMethodName = getName(ctx.callSubOrchestrator); + throw getWrongActionTypeError(taskId, expectedMethodName, action); + } else if ( + action.getCreatesuborchestration()?.getName() != event.getSuborchestrationinstancecreated()?.getName() + ) { + throw getWrongActionNameError( + taskId, + getName(ctx.callSubOrchestrator), + event.getSuborchestrationinstancecreated()?.getName(), + action.getCreatesuborchestration()?.getName(), + ); + } + } + + private async handleSubOrchestrationCompleted(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + const subOrchestrationInstanceCompletedEvent = event.getSuborchestrationinstancecompleted(); + const taskId = subOrchestrationInstanceCompletedEvent + ? subOrchestrationInstanceCompletedEvent.getTaskscheduledid() + : undefined; + + let subOrchTask; + + if (taskId) { + subOrchTask = ctx._pendingTasks[taskId]; + delete ctx._pendingTasks[taskId]; + } + + const result = parseJsonField(subOrchestrationInstanceCompletedEvent?.getResult()); + + if (subOrchTask) { + subOrchTask.complete(result); + } + + await ctx.resume(); + } + + private async handleSubOrchestrationFailed(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + const subOrchestrationInstanceFailedEvent = event.getSuborchestrationinstancefailed(); + const taskId = subOrchestrationInstanceFailedEvent + ? subOrchestrationInstanceFailedEvent.getTaskscheduledid() + : undefined; + const failureDetails = subOrchestrationInstanceFailedEvent?.getFailuredetails(); + await this.handleFailedTask(ctx, taskId, failureDetails, "subOrchestrationInstanceFailed", "Sub-orchestration task"); + } + + private async handleEventRaised(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + // Event names are case-insensitive + const eventName = event.getEventraised()?.getName()?.toLowerCase(); + + if (!ctx._isReplaying) { + WorkerLogs.orchestrationEventRaised(this._logger, ctx._instanceId, eventName!); + } + + let taskList; + + if (eventName) { + taskList = ctx._pendingEvents[eventName]; + } + + let decodedResult; + + if (taskList) { + const eventTask = taskList.shift(); + + decodedResult = parseJsonField(event.getEventraised()?.getInput()); + + if (eventTask) { + eventTask.complete(decodedResult); + } + + if ((taskList?.length ?? 0) === 0 && eventName) { + delete ctx._pendingEvents[eventName]; + } + + await ctx.resume(); + } else { + // Buffer the event + let eventList: any[] | undefined = []; + + if (eventName) { + eventList = ctx._receivedEvents[eventName]; + + if (!eventList?.length) { + eventList = []; + ctx._receivedEvents[eventName] = eventList; + } + } + + decodedResult = parseJsonField(event.getEventraised()?.getInput()); + + eventList?.push(decodedResult); + + if (!ctx._isReplaying) { + WorkerLogs.orchestrationEventBuffered(this._logger, ctx._instanceId, eventName!); + } + } + } + + private async handleExecutionSuspended(ctx: RuntimeOrchestrationContext, _event: pb.HistoryEvent): Promise { + if (!this._isSuspended && !ctx._isReplaying) { + WorkerLogs.orchestrationSuspended(this._logger, ctx._instanceId); + } + + this._isSuspended = true; + } + + private async handleExecutionResumed(ctx: RuntimeOrchestrationContext, _event: pb.HistoryEvent): Promise { + if (!this._isSuspended) { + return; + } + + this._isSuspended = false; + + for (const e of this._suspendedEvents) { + await this.processEvent(ctx, e); + } + + this._suspendedEvents = []; + } + + private async handleExecutionTerminated(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + if (!ctx._isReplaying) { + WorkerLogs.orchestrationTerminated(this._logger, ctx._instanceId); + } + + let encodedOutput; + + if (!isEmpty(event.getExecutionterminated()?.getInput())) { + encodedOutput = event.getExecutionterminated()?.getInput()?.getValue(); + } + + ctx.setComplete(encodedOutput, pb.OrchestrationStatus.ORCHESTRATION_STATUS_TERMINATED, true); + } + + private async handleEntityOperationCalled(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + this.validateEntityAction( + ctx, + event, + "callEntity", + (msg) => msg.hasEntityoperationcalled(), + "callEntity (EntityOperationCalled)", + ); + } + + private async handleEntityOperationSignaled(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + this.validateEntityAction( + ctx, + event, + "signalEntity", + (msg) => msg.hasEntityoperationsignaled(), + "signalEntity (EntityOperationSignaled)", + ); + } + + private async handleEntityLockRequested(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + this.validateEntityAction( + ctx, + event, + "lockEntities", + (msg) => msg.hasEntitylockrequested(), + "lockEntities (EntityLockRequested)", + ); + } + + private async handleEntityOperationCompleted(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + const completedEvent = event.getEntityoperationcompleted(); + const requestId = completedEvent?.getRequestid(); + + if (!requestId) { + WorkerLogs.entityEventIgnored( + this._logger, + ctx._instanceId, + "EntityOperationCompletedEvent", + "no requestId", + ); + return; + } + + // Find the pending entity call by requestId + const pendingCall = ctx._entityFeature.pendingEntityCalls.get(requestId); + if (!pendingCall) { + // This could happen during replay or if the call was already processed + if (!ctx._isReplaying) { + WorkerLogs.entityEventIgnored( + this._logger, + ctx._instanceId, + "EntityOperationCompletedEvent", + `unexpected requestId = ${requestId}`, + ); + } + return; + } + + // Remove from pending calls + ctx._entityFeature.pendingEntityCalls.delete(requestId); + + // If in a critical section, recover the lock for this entity + ctx._entityFeature.recoverLockAfterCall(pendingCall.entityId); + + // Parse the result and complete the task + const result = parseJsonField(completedEvent?.getOutput()); + + pendingCall.task.complete(result); + await ctx.resume(); + } + + private async handleEntityOperationFailed(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + const failedEvent = event.getEntityoperationfailed(); + const requestId = failedEvent?.getRequestid(); + + if (!requestId) { + WorkerLogs.entityEventIgnored( + this._logger, + ctx._instanceId, + "EntityOperationFailedEvent", + "no requestId", + ); + return; + } + + // Find the pending entity call by requestId + const pendingCall = ctx._entityFeature.pendingEntityCalls.get(requestId); + if (!pendingCall) { + // This could happen during replay or if the call was already processed + if (!ctx._isReplaying) { + WorkerLogs.entityEventIgnored( + this._logger, + ctx._instanceId, + "EntityOperationFailedEvent", + `unexpected requestId = ${requestId}`, + ); + } + return; + } + + // Remove from pending calls + ctx._entityFeature.pendingEntityCalls.delete(requestId); + + // If in a critical section, recover the lock for this entity + ctx._entityFeature.recoverLockAfterCall(pendingCall.entityId); + + // Convert failure details and throw EntityOperationFailedException + const failureDetails = createTaskFailureDetails(failedEvent?.getFailuredetails()); + if (!failureDetails) { + pendingCall.task.fail( + `Entity operation '${pendingCall.operationName}' failed with unknown error`, + ); + } else { + const exception = new EntityOperationFailedException( + pendingCall.entityId, + pendingCall.operationName, + failureDetails, + ); + pendingCall.task.fail(exception.message, failedEvent?.getFailuredetails()); + } + + await ctx.resume(); + } + + private async handleEntityLockGranted(ctx: RuntimeOrchestrationContext, event: pb.HistoryEvent): Promise { + const lockGrantedEvent = event.getEntitylockgranted(); + const criticalSectionId = lockGrantedEvent?.getCriticalsectionid(); + + if (!criticalSectionId) { + WorkerLogs.entityEventIgnored( + this._logger, + ctx._instanceId, + "EntityLockGrantedEvent", + "no criticalSectionId", + ); + return; + } + + // Find the pending lock request by criticalSectionId + const pendingRequest = ctx._entityFeature.pendingLockRequests.get(criticalSectionId); + if (!pendingRequest) { + // This could happen during replay or if the lock was already acquired + if (!ctx._isReplaying) { + WorkerLogs.entityEventIgnored( + this._logger, + ctx._instanceId, + "EntityLockGrantedEvent", + `unexpected criticalSectionId = ${criticalSectionId}`, + ); + } + return; + } + + // Complete the lock acquisition + ctx._entityFeature.completeLockAcquisition(criticalSectionId); + await ctx.resume(); + } + + private validateEntityAction( + ctx: RuntimeOrchestrationContext, + event: pb.HistoryEvent, + operationName: string, + hasEventCheck: (msg: pb.SendEntityMessageAction) => boolean, + detailedName: string, + ): void { + const eventId = event.getEventid(); + const action = ctx._pendingActions[eventId]; + delete ctx._pendingActions[eventId]; + + const isSendEntityMessageAction = action?.hasSendentitymessage(); + + if (!action) { + throw getNonDeterminismError(eventId, operationName); + } else if (!isSendEntityMessageAction) { + throw getWrongActionTypeError(eventId, operationName, action); + } else if (!hasEventCheck(action.getSendentitymessage()!)) { + throw getWrongActionTypeError(eventId, detailedName, action); + } + } + + private async handleCompletedTask( + ctx: RuntimeOrchestrationContext, + taskId: number | undefined, + result: StringValue | undefined, + eventName: string, + ): Promise { + let task; + + if (taskId) { + task = ctx._pendingTasks[taskId]; + delete ctx._pendingTasks[taskId]; + } + + if (!task) { + // TODO: Should this be an error? When would it ever happen? + if (!ctx._isReplaying) { + WorkerLogs.orchestrationUnexpectedEvent(this._logger, ctx._instanceId, eventName, taskId); + } + + return; + } + + const parsedResult = parseJsonField(result); + + task.complete(parsedResult); + await ctx.resume(); + } + + private async handleFailedTask( + ctx: RuntimeOrchestrationContext, + taskId: number | undefined, + failureDetails: pb.TaskFailureDetails | undefined, + eventName: string, + taskLabel: string, + ): Promise { + if (taskId === undefined) { + if (!ctx._isReplaying) { + WorkerLogs.orchestrationUnexpectedEvent(this._logger, ctx._instanceId, eventName); + } + return; + } + + const errorMessage = failureDetails?.getErrormessage() || "Unknown error"; + + // Get the task (don't delete yet - we might retry) + const task = ctx._pendingTasks[taskId]; + + if (!task) { + if (!ctx._isReplaying) { + WorkerLogs.orchestrationUnexpectedEvent(this._logger, ctx._instanceId, eventName, taskId); + } + return; + } + + // Check if this task supports retry and handle it + const retried = await this.tryHandleRetry(task, errorMessage, failureDetails, taskId, ctx); + if (retried) { + return; + } + + // No retry - fail the task + delete ctx._pendingTasks[taskId]; + + task.fail( + `${taskLabel} #${taskId} failed: ${errorMessage}`, + failureDetails, + ); + + await ctx.resume(); + } + + /** * Checks if a failed task supports retry and handles the retry if applicable. * Supports both RetryableTask (policy-based with timer delay) and RetryHandlerTask diff --git a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts index 160cc72..cd8b327 100644 --- a/packages/durabletask-js/src/worker/runtime-orchestration-context.ts +++ b/packages/durabletask-js/src/worker/runtime-orchestration-context.ts @@ -209,11 +209,6 @@ export class RuntimeOrchestrationContext extends OrchestrationContext { } setFailed(e: Error) { - // should allow orchestration to fail, even it's completed. - // if (this._isComplete) { - // return; - // } - this._isComplete = true; this._completionStatus = pb.OrchestrationStatus.ORCHESTRATION_STATUS_FAILED; // Note: Do NOT clear pending actions here - fire-and-forget actions like sendEvent diff --git a/tsconfig.base.json b/tsconfig.base.json index 85ffe26..87e6a65 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -1,6 +1,7 @@ { "compilerOptions": { - "target": "ES2020", + "target": "ES2022", + "lib": ["ES2022"], "module": "commonjs", "declaration": true, "strict": true,