diff --git a/src/trace/context/extractor.ts b/src/trace/context/extractor.ts index 31fafa8f3..cc03521bc 100644 --- a/src/trace/context/extractor.ts +++ b/src/trace/context/extractor.ts @@ -5,9 +5,11 @@ import { XrayService } from "../xray-service"; import { AppSyncEventTraceExtractor, CustomTraceExtractor, + DurableExecutionEventTraceExtractor, EventBridgeEventTraceExtractor, EventBridgeSQSEventTraceExtractor, HTTPEventTraceExtractor, + isDurableExecutionEvent, KinesisEventTraceExtractor, LambdaContextTraceExtractor, SNSEventTraceExtractor, @@ -56,6 +58,9 @@ export class TraceContextExtractor { } } + // No stripping needed — trace context is stored in dedicated + // `_datadog_{N}` checkpoint operations. + if (spanContext === null) { this.stepFunctionContextService = StepFunctionContextService.instance(event); if (this.stepFunctionContextService?.context) { @@ -81,6 +86,9 @@ export class TraceContextExtractor { private getTraceEventExtractor(event: any): EventTraceExtractor | undefined { if (!event || typeof event !== "object") return; + // Check for durable execution event first (has DurableExecutionArn + CheckpointToken) + if (isDurableExecutionEvent(event)) return new DurableExecutionEventTraceExtractor(this.tracerWrapper); + const headers = event.headers ?? event.multiValueHeaders; if (headers !== null && typeof headers === "object") { return new HTTPEventTraceExtractor(this.tracerWrapper, this.config.decodeAuthorizerContext); diff --git a/src/trace/context/extractors/durable-execution.spec.ts b/src/trace/context/extractors/durable-execution.spec.ts new file mode 100644 index 000000000..fd8e46fc2 --- /dev/null +++ b/src/trace/context/extractors/durable-execution.spec.ts @@ -0,0 +1,148 @@ +import { createDurableExecutionRootSpan, DurableExecutionEventTraceExtractor } from "./durable-execution"; +import { TracerWrapper } from "../../tracer-wrapper"; + +jest.mock("dd-trace", () => ({ + startSpan: jest.fn(), +})); + +function makeTracerWrapper(extractReturn: any = null): TracerWrapper { + return { extract: jest.fn().mockReturnValue(extractReturn) } as unknown as TracerWrapper; +} + +describe("DurableExecutionEventTraceExtractor", () => { + const tracer = require("dd-trace"); + const startSpanMock = tracer.startSpan as jest.Mock; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it("delegates checkpoint headers to the standard propagator", () => { + const executionArn = + "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/abc"; + + const checkpointHeaders = { + "x-datadog-trace-id": "149750110124521191", + "x-datadog-parent-id": "987654321012345678", + "x-datadog-sampling-priority": "1", + }; + + const event = { + DurableExecutionArn: executionArn, + CheckpointToken: "t-1", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "_datadog_0", + Status: "SUCCEEDED", + StepDetails: { + Result: JSON.stringify(checkpointHeaders), + }, + }, + ], + }, + }; + + const sentinelContext = { sentinel: true }; + const tracerWrapper = makeTracerWrapper(sentinelContext); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); + const context = extractor.extract(event); + + expect(tracerWrapper.extract).toHaveBeenCalledWith(checkpointHeaders); + expect(context).toBe(sentinelContext); + }); + + it("returns null when no checkpoint or upstream context exists", () => { + const tracerWrapper = makeTracerWrapper(); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); + + const context = extractor.extract({ + DurableExecutionArn: "arn:aws:lambda:us-east-2:123:function:demo", + CheckpointToken: "t-empty", + InitialExecutionState: { Operations: [] }, + }); + + expect(context).toBeNull(); + expect(tracerWrapper.extract).not.toHaveBeenCalled(); + }); + + it("creates durable root span only for first invocation", () => { + const executionArn = + "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/first"; + + const spanContext: any = { + _spanId: null, + _parentId: null, + toTraceId: () => "1111111111111111111", + toSpanId: () => "2222222222222222222", + }; + const span = { + context: () => spanContext, + finish: jest.fn(), + }; + startSpanMock.mockReturnValue(span); + + const firstInvocationEvent = { + DurableExecutionArn: executionArn, + CheckpointToken: "t-first", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "input", + Status: "RUNNING", + StartTimestamp: 1710000000000, + ExecutionDetails: { + InputPayload: JSON.stringify({ hello: "world" }), + }, + }, + ], + }, + }; + + const root = createDurableExecutionRootSpan(firstInvocationEvent, null); + + expect(root).not.toBeNull(); + expect(startSpanMock).toHaveBeenCalledTimes(1); + }); + + it("skips durable root span creation on replay invocations", () => { + const executionArn = + "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/replay"; + + const replayEvent = { + DurableExecutionArn: executionArn, + CheckpointToken: "t-replay", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "_datadog_0", + Status: "SUCCEEDED", + StepDetails: { + Result: JSON.stringify({ + "x-datadog-trace-id": "149750110124521191", + "x-datadog-parent-id": "538591322263933970", + "x-datadog-sampling-priority": "1", + }), + }, + }, + { + Id: "op-2", + Name: "callback_step_prepare", + Status: "SUCCEEDED", + }, + ], + }, + }; + + const tracerWrapper = makeTracerWrapper({ source: "Event" }); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); + const extracted = extractor.extract(replayEvent); + const root = createDurableExecutionRootSpan(replayEvent, extracted); + + expect(root).toBeNull(); + expect(startSpanMock).not.toHaveBeenCalled(); + }); +}); diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts new file mode 100644 index 000000000..5fb6c2a55 --- /dev/null +++ b/src/trace/context/extractors/durable-execution.ts @@ -0,0 +1,396 @@ +/** + * Durable Execution Trace Extractor — Checkpoint/Upstream Approach + * + * Strategy: + * 1. Prefer trace context from the latest `_datadog_{N}` checkpoint. + * 2. If no trace checkpoint exists (first invocation), try upstream trace context + * from the original customer event stored in `Operations[0].ExecutionDetails.InputPayload`. + * 3. If neither exists, return null and let the default extraction path create the context. + * + * The dd-trace-js durable-execution plugin writes checkpoint headers via the + * standard HTTP propagator (`tracer.inject(span, 'http_headers', headers)`), + * so we just hand the resulting header dict back to `tracer.extract` here. + */ + +import { logDebug } from "../../../utils"; +import { SpanContextWrapper } from "../../span-context-wrapper"; +import { TracerWrapper } from "../../tracer-wrapper"; +import { EventTraceExtractor } from "../extractor"; + +/** + * Interface for operation data in durable execution state + */ +export interface DurableExecutionOperation { + Id: string; + Status: string; + Type?: string; + Name?: string; + ExecutionDetails?: { + InputPayload?: string; + }; + StepDetails?: { + Result?: string; + Error?: unknown; + NextAttemptTimestamp?: string; + }; + Payload?: string; + CallbackDetails?: { + Result?: string; + CallbackId?: string; + Error?: unknown; + }; + StartedAt?: string; + StartTimestamp?: number; + CompletedAt?: string; +} + +/** + * Interface for initial execution state in durable execution events + */ +export interface InitialExecutionState { + Operations?: DurableExecutionOperation[]; + Status?: string; +} + +/** + * Interface for durable execution event + */ +export interface DurableExecutionEvent { + DurableExecutionArn?: string; + CheckpointToken?: string; + InitialExecutionState?: InitialExecutionState; + Input?: unknown; +} + +/** + * Check if event is a durable execution event + */ +export function isDurableExecutionEvent(event: unknown): event is DurableExecutionEvent { + if (!event || typeof event !== "object") { + return false; + } + + const maybeEvent = event as Record; + return Boolean(maybeEvent.DurableExecutionArn && maybeEvent.CheckpointToken); +} + +/** + * Check if this is a replay invocation (has previous operations) + */ +export function isDurableExecutionReplay(event: unknown): boolean { + if (!isDurableExecutionEvent(event)) { + return false; + } + + const operations = event.InitialExecutionState?.Operations; + return Array.isArray(operations) && operations.length > 0; +} + +/** + * Get durable execution ARN from event + */ +export function getDurableExecutionArn(event: unknown): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + return event.DurableExecutionArn; +} + +/** + * Get checkpoint token from event + */ +export function getCheckpointToken(event: unknown): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + return event.CheckpointToken; +} + +// Terminal operation statuses that indicate an operation has completed +const TERMINAL_STATUSES = new Set(["SUCCEEDED", "FAILED", "CANCELLED", "STOPPED", "TIMED_OUT"]); + +const TRACE_CHECKPOINT_NAME_PREFIX = "_datadog_"; + +function parseTraceCheckpointNumber(name: unknown): number | null { + if (typeof name !== "string") return null; + + if (!name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)) return null; + const suffix = name.slice(TRACE_CHECKPOINT_NAME_PREFIX.length); + const n = Number.parseInt(suffix, 10); + if (Number.isNaN(n) || String(n) !== suffix) return null; + return n; +} + +function isTraceCheckpointName(name: unknown): boolean { + return parseTraceCheckpointNumber(name) !== null; +} + +/** + * Find the highest-numbered `_datadog_{N}` checkpoint in the event and return + * its parsed header dict. + * + * Each invocation that changes trace context saves a new checkpoint with N+1; + * the one with the highest N is the most recent. Headers are written by the + * dd-trace-js plugin via `tracer.inject(span, 'http_headers', headers)` so the + * payload is a standard HTTP-style header dict. + * + */ +function findLatestCheckpointHeaders(event: DurableExecutionEvent): Record | null { + const operations = event.InitialExecutionState?.Operations; + if (!operations || operations.length === 0) return null; + + let best: { number: number; op: DurableExecutionOperation } | null = null; + for (const op of operations) { + const n = parseTraceCheckpointNumber(op?.Name); + if (n === null) continue; + if (best === null || n > best.number) { + best = { number: n, op }; + } + } + if (best === null) return null; + + const raw = best.op.Payload ?? best.op.StepDetails?.Result; + if (!raw || typeof raw !== "string") return null; + try { + const parsed = JSON.parse(raw); + if (parsed && typeof parsed === "object") { + return parsed as Record; + } + } catch (e) { + logDebug(`Failed to parse trace checkpoint payload: ${e}`); + } + return null; +} + +/** + * Find upstream HTTP headers carried by the original customer event stored in + * `Operations[0].ExecutionDetails.InputPayload`. Returns the standard header + * dict (keys like `x-datadog-trace-id`, `traceparent`, etc.) or null. + */ +function findUpstreamHeaders(event: DurableExecutionEvent): Record | null { + try { + const operations = event.InitialExecutionState?.Operations; + if (!operations || operations.length === 0) return null; + + const inputPayloadStr = operations[0].ExecutionDetails?.InputPayload; + if (!inputPayloadStr) return null; + + const customerEvent = JSON.parse(inputPayloadStr); + if (!customerEvent || typeof customerEvent !== "object") return null; + + const headers = customerEvent.headers; + if (headers && typeof headers === "object") { + return headers as Record; + } + + const ddData = customerEvent._datadog; + if (ddData && typeof ddData === "object") { + return ddData as Record; + } + } catch (e) { + logDebug(`Failed to read upstream headers from durable input payload: ${e}`); + } + + return null; +} + +/** + * Durable Execution Trace Extractor + * + * Locates trace headers carried inside the durable execution envelope and hands + * them to the standard dd-trace propagator via `TracerWrapper.extract`. Order: + * 1. Latest `_datadog_{N}` checkpoint payload. + * 2. Upstream customer event headers from `InputPayload`. + * 3. Otherwise return null and let the default extraction path take over. + */ +export class DurableExecutionEventTraceExtractor implements EventTraceExtractor { + constructor(private tracerWrapper: TracerWrapper) {} + + extract(event: unknown): SpanContextWrapper | null { + if (!isDurableExecutionEvent(event)) { + logDebug("Event is not a durable execution event"); + return null; + } + if (!event.DurableExecutionArn) { + logDebug("No DurableExecutionArn in event"); + return null; + } + + const checkpointHeaders = findLatestCheckpointHeaders(event); + if (checkpointHeaders) { + logDebug("Extracting trace context from durable checkpoint"); + return this.tracerWrapper.extract(checkpointHeaders); + } + + const upstreamHeaders = findUpstreamHeaders(event); + if (upstreamHeaders) { + logDebug("Extracting trace context from upstream durable input payload"); + return this.tracerWrapper.extract(upstreamHeaders); + } + + logDebug("No durable trace context found; deferring to default extraction"); + return null; + } +} + +/** + * Utility to check if a durable operation is a replay + * + * An operation is a replay if it exists in the initial execution state + * with a terminal status (SUCCEEDED, FAILED, CANCELLED, STOPPED, TIMED_OUT) + * + * @param event - Lambda event + * @param stepId - The step ID to check (may be hashed) + * @returns true if the operation is a replay + */ +export function isOperationReplay(event: unknown, stepId: string): boolean { + if (!isDurableExecutionEvent(event)) { + return false; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations || operations.length === 0) { + return false; + } + + const operation = operations.find((op) => op.Id === stepId); + if (!operation) { + return false; + } + + return TERMINAL_STATUSES.has(operation.Status); +} + +/** + * Get the replay status of an operation + * + * @param event - Lambda event + * @param stepId - The step ID to check + * @returns Operation status if found, undefined otherwise + */ +export function getOperationStatus(event: unknown, stepId: string): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations) { + return undefined; + } + + const operation = operations.find((op) => op.Id === stepId); + return operation?.Status; +} + +/** + * Count the number of completed operations in the event + * + * @param event - Lambda event + * @returns Number of completed operations + */ +export function getCompletedOperationCount(event: unknown): number { + if (!isDurableExecutionEvent(event)) { + return 0; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations) { + return 0; + } + + return operations.filter((op) => + op.Status === "SUCCEEDED" || op.Status === "FAILED" + ).length; +} + +/** + * Create the durable execution root span for likely first invocations only. + * + * Replay invocations return null. The current first-invocation heuristic is: + * - no trace checkpoint operation exists + * - no operation has terminal status + * - operation count is <= 1 + * + * The created span is parented to the current aws.lambda span context. + * + * Returns an object with { span, finish() } or null if not a durable execution. + * Caller must call finish() when the invocation ends. + */ +export function createDurableExecutionRootSpan( + event: unknown, + parentSpanContext?: unknown, +): { span: any; finish: () => void } | null { + if (!isDurableExecutionEvent(event)) { + return null; + } + + const executionArn = event.DurableExecutionArn; + if (!executionArn) { + return null; + } + + const operations = event.InitialExecutionState?.Operations; + const hasCheckpoint = Boolean( + operations?.some((op) => isTraceCheckpointName(op?.Name)), + ); + const hasCompletedOperation = Boolean(operations?.some((op) => TERMINAL_STATUSES.has(op.Status))); + const isLikelyFirstInvocation = !hasCheckpoint && !hasCompletedOperation && (operations?.length ?? 0) <= 1; + + if (!isLikelyFirstInvocation) { + return null; + } + + // Use the first operation's StartTimestamp (unix milliseconds) so the root + // span's start time matches the actual start of the durable execution. + let startTime: number | undefined; + if (operations && operations.length > 0) { + const firstStartTs = operations[0].StartTimestamp; + if (firstStartTs != null) { + const parsed = Number(firstStartTs); + if (!isNaN(parsed)) { + startTime = parsed; + } + } + } + + try { + const tracer = require("dd-trace"); + + const serviceName = process.env.DD_DURABLE_EXECUTION_SERVICE || "aws.durable-execution"; + const resourceName = executionArn.includes(":") ? executionArn.split(":").pop() : executionArn; + + const spanOptions: Record = { + type: "serverless", + tags: { + "service.name": serviceName, + "resource.name": resourceName, + "durable.execution_arn": executionArn, + "durable.is_root_span": true, + "durable.invocation_count": operations?.length ?? 0, + }, + }; + + if (startTime !== undefined) { + spanOptions.startTime = startTime; + } + if (parentSpanContext) { + // Root span is modeled as a child of aws.lambda. + spanOptions.childOf = parentSpanContext; + } + + const span = tracer.startSpan("aws.durable-execution", spanOptions); + + logDebug(`Created root execution span: start_time=${startTime}`); + + return { + span, + finish: () => { + span.finish(); + logDebug("Finished root execution span"); + }, + }; + } catch (e) { + logDebug(`Failed to create durable execution root span: ${e}`); + return null; + } +} diff --git a/src/trace/context/extractors/index.ts b/src/trace/context/extractors/index.ts index 6bd690713..0fbbf081b 100644 --- a/src/trace/context/extractors/index.ts +++ b/src/trace/context/extractors/index.ts @@ -9,3 +9,4 @@ export { SNSSQSEventTraceExtractor } from "./sns-sqs"; export { StepFunctionEventTraceExtractor } from "./step-function"; export { LambdaContextTraceExtractor } from "./lambda-context"; export { CustomTraceExtractor } from "./custom"; +export { DurableExecutionEventTraceExtractor, isDurableExecutionEvent, isDurableExecutionReplay, createDurableExecutionRootSpan } from "./durable-execution";