From 4e1078cd077c296dcf48720520bcea0b9eb7a218 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 29 Apr 2026 19:29:39 -0400 Subject: [PATCH 1/7] initial working version --- src/trace/context/extractor.ts | 32 + .../extractors/durable-execution.spec.ts | 137 +++ .../context/extractors/durable-execution.ts | 890 ++++++++++++++++++ src/trace/context/extractors/index.ts | 1 + src/trace/listener.ts | 41 + 5 files changed, 1101 insertions(+) create mode 100644 src/trace/context/extractors/durable-execution.spec.ts create mode 100644 src/trace/context/extractors/durable-execution.ts diff --git a/src/trace/context/extractor.ts b/src/trace/context/extractor.ts index 31fafa8f3..233887308 100644 --- a/src/trace/context/extractor.ts +++ b/src/trace/context/extractor.ts @@ -5,9 +5,11 @@ import { XrayService } from "../xray-service"; import { AppSyncEventTraceExtractor, CustomTraceExtractor, + DurableExecutionEventTraceExtractor, EventBridgeEventTraceExtractor, EventBridgeSQSEventTraceExtractor, HTTPEventTraceExtractor, + isDurableExecutionEvent, KinesisEventTraceExtractor, LambdaContextTraceExtractor, SNSEventTraceExtractor, @@ -44,6 +46,12 @@ export class TraceContextExtractor { async extract(event: any, context: Context): Promise { let spanContext: SpanContextWrapper | null = null; + const durableTraceDebugEnabled = (() => { + const value = process.env.DD_DURABLE_TRACE_DEBUG; + if (!value) return false; + const normalized = value.toLowerCase(); + return normalized === "1" || normalized === "true" || normalized === "yes" || normalized === "on"; + })(); if (this.config.traceExtractor) { const customExtractor = new CustomTraceExtractor(this.config.traceExtractor); spanContext = await customExtractor.extract(event, context); @@ -53,9 +61,21 @@ export class TraceContextExtractor { const eventExtractor = this.getTraceEventExtractor(event); if (eventExtractor !== undefined) { spanContext = eventExtractor.extract(event); + if (durableTraceDebugEnabled && isDurableExecutionEvent(event)) { + console.log("[dd-lambda][durable-trace] Event extractor result", { + extractor: eventExtractor.constructor?.name, + extracted: spanContext ? { + traceId: spanContext.toTraceId(), + parentId: spanContext.toSpanId(), + sampleMode: spanContext.sampleMode(), + } : null, + }); + } } } + // No stripping needed — deterministic approach never modifies checkpoint payloads. + if (spanContext === null) { this.stepFunctionContextService = StepFunctionContextService.instance(event); if (this.stepFunctionContextService?.context) { @@ -67,6 +87,15 @@ export class TraceContextExtractor { if (spanContext === null) { const contextExtractor = new LambdaContextTraceExtractor(this.tracerWrapper); spanContext = contextExtractor.extract(context); + if (durableTraceDebugEnabled && isDurableExecutionEvent(event)) { + console.log("[dd-lambda][durable-trace] Falling back to Lambda context extraction", { + extracted: spanContext ? { + traceId: spanContext.toTraceId(), + parentId: spanContext.toSpanId(), + sampleMode: spanContext.sampleMode(), + } : null, + }); + } } if (spanContext !== null) { @@ -81,6 +110,9 @@ export class TraceContextExtractor { private getTraceEventExtractor(event: any): EventTraceExtractor | undefined { if (!event || typeof event !== "object") return; + // Check for durable execution event first (has DurableExecutionArn + CheckpointToken) + if (isDurableExecutionEvent(event)) return new DurableExecutionEventTraceExtractor(); + const headers = event.headers ?? event.multiValueHeaders; if (headers !== null && typeof headers === "object") { return new HTTPEventTraceExtractor(this.tracerWrapper, this.config.decodeAuthorizerContext); diff --git a/src/trace/context/extractors/durable-execution.spec.ts b/src/trace/context/extractors/durable-execution.spec.ts new file mode 100644 index 000000000..70b2adc2e --- /dev/null +++ b/src/trace/context/extractors/durable-execution.spec.ts @@ -0,0 +1,137 @@ +import { createHash } from "crypto"; +import { createDurableExecutionRootSpan, DurableExecutionEventTraceExtractor } from "./durable-execution"; + +jest.mock("dd-trace", () => ({ + startSpan: jest.fn(), +})); + +function deterministicRootSpanId(executionArn: string): string { + const hash = createHash("sha256").update(`durable-root:${executionArn}`).digest("hex"); + const masked = BigInt(`0x${hash}`) & 0x7fffffffffffffffn; + return masked === 0n ? "1" : masked.toString(10); +} + +describe("DurableExecutionEventTraceExtractor", () => { + const tracer = require("dd-trace"); + const startSpanMock = tracer.startSpan as jest.Mock; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it("extracts a deterministic durable root span id from executionArn", () => { + const executionArn = + "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/abc"; + + const event = { + DurableExecutionArn: executionArn, + CheckpointToken: "t-1", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "_dd_trace_context_0", + Status: "SUCCEEDED", + StepDetails: { + Result: JSON.stringify({ + "x-datadog-trace-id": "149750110124521191", + "x-datadog-parent-id": "987654321012345678", + "x-datadog-sampling-priority": "1", + }), + }, + }, + ], + }, + }; + + const extractor = new DurableExecutionEventTraceExtractor(); + const context = extractor.extract(event); + + expect(context).not.toBeNull(); + expect(context?.toTraceId()).toBe("149750110124521191"); + expect(context?.toSpanId()).toBe("987654321012345678"); + }); + + it("creates durable root span only for first invocation", () => { + const executionArn = + "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/first"; + + const spanContext: any = { + _spanId: null, + _parentId: null, + toTraceId: () => "1111111111111111111", + toSpanId: () => "2222222222222222222", + }; + const span = { + context: () => spanContext, + finish: jest.fn(), + }; + startSpanMock.mockReturnValue(span); + + const firstInvocationEvent = { + DurableExecutionArn: executionArn, + CheckpointToken: "t-first", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "input", + Status: "RUNNING", + StartTimestamp: 1710000000000, + ExecutionDetails: { + InputPayload: JSON.stringify({ hello: "world" }), + }, + }, + ], + }, + }; + + const extractor = new DurableExecutionEventTraceExtractor(); + const extracted = extractor.extract(firstInvocationEvent); + + const root = createDurableExecutionRootSpan(firstInvocationEvent, extracted); + + expect(root).not.toBeNull(); + expect(startSpanMock).toHaveBeenCalledTimes(1); + expect(root?.span.context()._spanId.toString(10)).toBe("2222222222222222222"); + }); + + it("skips durable root span creation on replay invocations", () => { + const executionArn = + "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/replay"; + + const replayEvent = { + DurableExecutionArn: executionArn, + CheckpointToken: "t-replay", + InitialExecutionState: { + Operations: [ + { + Id: "op-1", + Name: "_dd_trace_context_0", + Status: "SUCCEEDED", + StepDetails: { + Result: JSON.stringify({ + "x-datadog-trace-id": "149750110124521191", + "x-datadog-parent-id": "538591322263933970", + "x-datadog-sampling-priority": "1", + }), + }, + }, + { + Id: "op-2", + Name: "callback_step_prepare", + Status: "SUCCEEDED", + }, + ], + }, + }; + + const extractor = new DurableExecutionEventTraceExtractor(); + const extracted = extractor.extract(replayEvent); + const root = createDurableExecutionRootSpan(replayEvent, extracted); + + expect(root).toBeNull(); + expect(startSpanMock).not.toHaveBeenCalled(); + }); +}); + diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts new file mode 100644 index 000000000..88b9f5bdf --- /dev/null +++ b/src/trace/context/extractors/durable-execution.ts @@ -0,0 +1,890 @@ +/** + * Durable Execution Trace Extractor — Deterministic Approach + * + * Generates deterministic trace context from AWS Lambda Durable Execution events + * using SHA-256 hashing of the execution ARN and operation identifiers. + * + * Strategy: + * 1. First, try to extract a real trace context from the original customer event + * (stored in Operations[0].ExecutionDetails.InputPayload). If found, the durable + * execution trace connects to the upstream caller's trace. + * 2. If no upstream context exists, fall back to deterministic hashing from the + * execution ARN, generating a full 128-bit trace ID (lower 64 bits + _dd.p.tid) + * for W3C/OpenTelemetry compatibility. + * + * Flow: + * 1. Every invocation receives the same DurableExecutionArn + * 2. The original customer event is stored in Operations[0].ExecutionDetails.InputPayload + * and is identical across all invocations — any upstream trace headers persist + * 3. trace_id = real upstream or hash("durable-trace:{arn}") lower 64 bits + * 4. _dd.p.tid = real upstream or hash("durable-trace:{arn}") upper 64 bits + * 5. parent_id = hash("durable-span:{arn}#{last_completed_op_id}") — links to previous invocation + * 6. Checkpoint payloads are never modified + */ + +import { createHash, randomBytes } from "crypto"; +import { logDebug } from "../../../utils"; +import { SpanContextWrapper } from "../../span-context-wrapper"; +import { SampleMode, TraceSource } from "../../trace-context-service"; +import { EventTraceExtractor } from "../extractor"; + +const DURABLE_TRACE_DEBUG_ENV = "DD_DURABLE_TRACE_DEBUG"; + +function durableTraceDebugEnabled(): boolean { + const value = process.env[DURABLE_TRACE_DEBUG_ENV]; + if (!value) return false; + const normalized = value.toLowerCase(); + return normalized === "1" || normalized === "true" || normalized === "yes" || normalized === "on"; +} + +function durableTraceDebugLog(message: string, details?: Record): void { + if (!durableTraceDebugEnabled()) return; + if (details) { + console.log(`[dd-lambda][durable-trace] ${message}`, details); + return; + } + console.log(`[dd-lambda][durable-trace] ${message}`); +} + +function parseTraceparentHex( + traceparent: unknown, +): { traceIdHex: string; parentIdHex: string; lower64TraceIdDec: string; upper64TraceIdHex: string; parentIdDec: string } | null { + if (typeof traceparent !== "string") return null; + const parts = traceparent.split("-"); + if (parts.length !== 4) return null; + const [, traceIdHex, parentIdHex] = parts; + if (!/^[0-9a-f]{32}$/i.test(traceIdHex) || !/^[0-9a-f]{16}$/i.test(parentIdHex)) { + return null; + } + + const lower64TraceIdHex = traceIdHex.slice(16); + const upper64TraceIdHex = traceIdHex.slice(0, 16); + + try { + return { + traceIdHex, + parentIdHex, + lower64TraceIdDec: BigInt(`0x${lower64TraceIdHex}`).toString(10), + upper64TraceIdHex, + parentIdDec: BigInt(`0x${parentIdHex}`).toString(10), + }; + } catch { + return null; + } +} + +function normalizeParentIdToDecimal(parentId: unknown): string | null { + if (typeof parentId !== "string") return null; + const value = parentId.trim(); + if (!value) return null; + + if (/^[0-9]+$/.test(value)) { + return value; + } + + if (/^[0-9a-f]+$/i.test(value)) { + const hex = value.length > 16 ? value.slice(-16) : value; + try { + return BigInt(`0x${hex}`).toString(10); + } catch { + return null; + } + } + + return null; +} + +function normalizeTraceIdToDecimal( + traceId: unknown, +): { traceId: string | null; ptidFromTraceId?: string } { + if (typeof traceId !== "string") { + return { traceId: null }; + } + + const value = traceId.trim(); + if (!value) { + return { traceId: null }; + } + + if (/^[0-9]+$/.test(value)) { + return { traceId: value }; + } + + if (/^[0-9a-f]+$/i.test(value)) { + // If a 128-bit hex trace ID was accidentally put here, split it like traceparent: + // lower 64 bits for Datadog trace_id, upper 64 bits for _dd.p.tid. + if (value.length > 16) { + const upperHex = value.slice(-32, -16).padStart(16, "0"); + const lowerHex = value.slice(-16); + try { + return { + traceId: BigInt(`0x${lowerHex}`).toString(10), + ptidFromTraceId: upperHex.toLowerCase(), + }; + } catch { + return { traceId: null }; + } + } + + try { + return { + traceId: BigInt(`0x${value}`).toString(10), + }; + } catch { + return { traceId: null }; + } + } + + return { traceId: null }; +} + +/** + * Interface for operation data in durable execution state + */ +export interface DurableExecutionOperation { + Id: string; + Status: string; + Type?: string; + Name?: string; + ExecutionDetails?: { + InputPayload?: string; + }; + StepDetails?: { + Result?: string; + Error?: unknown; + NextAttemptTimestamp?: string; + }; + Payload?: string; + CallbackDetails?: { + Result?: string; + CallbackId?: string; + Error?: unknown; + }; + StartedAt?: string; + StartTimestamp?: number; + CompletedAt?: string; +} + +/** + * Interface for initial execution state in durable execution events + */ +export interface InitialExecutionState { + Operations?: DurableExecutionOperation[]; + Status?: string; +} + +/** + * Interface for durable execution event + */ +export interface DurableExecutionEvent { + DurableExecutionArn?: string; + CheckpointToken?: string; + InitialExecutionState?: InitialExecutionState; + Input?: unknown; +} + +/** + * Check if event is a durable execution event + */ +export function isDurableExecutionEvent(event: unknown): event is DurableExecutionEvent { + if (!event || typeof event !== "object") { + return false; + } + + const maybeEvent = event as Record; + return Boolean(maybeEvent.DurableExecutionArn && maybeEvent.CheckpointToken); +} + +/** + * Check if this is a replay invocation (has previous operations) + */ +export function isDurableExecutionReplay(event: unknown): boolean { + if (!isDurableExecutionEvent(event)) { + return false; + } + + const operations = event.InitialExecutionState?.Operations; + return Array.isArray(operations) && operations.length > 0; +} + +/** + * Get durable execution ARN from event + */ +export function getDurableExecutionArn(event: unknown): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + return event.DurableExecutionArn; +} + +/** + * Get checkpoint token from event + */ +export function getCheckpointToken(event: unknown): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + return event.CheckpointToken; +} + +function generateRandomPositiveId(): string { + const bytes = randomBytes(8); + bytes[0] = bytes[0] & 0x7f; // keep it positive int64 + const value = bufferToBigInt(bytes); + return value === 0n ? "1" : value.toString(10); +} + +function generateRandomTraceId128(): { traceId: string; ptid: string } { + const bytes = randomBytes(16); + + // Upper 64 bits -> _dd.p.tid + const upperBytes = Buffer.from(bytes.subarray(0, 8)); + const upperValue = bufferToBigInt(upperBytes); + const ptid = (upperValue === 0n ? 1n : upperValue).toString(16).padStart(16, "0"); + + // Lower 64 bits -> Datadog trace_id (decimal) + const lowerBytes = Buffer.from(bytes.subarray(8, 16)); + lowerBytes[0] = lowerBytes[0] & 0x7f; // keep positive int64 + const lowerValue = bufferToBigInt(lowerBytes); + const traceId = lowerValue === 0n ? "1" : lowerValue.toString(10); + + return { traceId, ptid }; +} + +function bufferToBigInt(buf: Buffer): bigint { + let result = 0n; + for (let i = 0; i < buf.length; i++) { + result = (result << 8n) | BigInt(buf[i]); + } + return result; +} + +// Terminal operation statuses that indicate an operation has completed +const TERMINAL_STATUSES = new Set(["SUCCEEDED", "FAILED", "CANCELLED", "STOPPED", "TIMED_OUT"]); + +const TRACE_CHECKPOINT_NAME_PREFIX = "_dd_trace_context_"; + +function deterministicSha256Hash(s: string): string { + const hash = createHash("sha256").update(s).digest("hex"); + const fullBigInt = BigInt("0x" + hash); + const masked = fullBigInt & 0x7fffffffffffffffn; + return masked === 0n ? "1" : masked.toString(10); +} + +function getDurableExecutionRootSpanId(executionArn: string): string { + return deterministicSha256Hash(`durable-root:${executionArn}`); +} + +/** + * Find the highest-numbered `_dd_trace_context_{N}` checkpoint in the event. + * Each invocation that changes trace context saves a new checkpoint with + * N+1; the one with the highest N is the most recent. + */ +function findLatestTraceContextCheckpoint( + event: DurableExecutionEvent, +): { number: number; headers: Record } | null { + const operations = event.InitialExecutionState?.Operations; + if (!operations || operations.length === 0) return null; + + let best: { number: number; op: DurableExecutionOperation } | null = null; + for (const op of operations) { + const name = op?.Name; + if (!name || !name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)) continue; + const suffix = name.slice(TRACE_CHECKPOINT_NAME_PREFIX.length); + const n = Number.parseInt(suffix, 10); + if (Number.isNaN(n) || String(n) !== suffix) continue; + if (best === null || n > best.number) { + best = { number: n, op }; + } + } + if (best === null) return null; + + const raw = best.op.Payload ?? best.op.StepDetails?.Result; + if (!raw || typeof raw !== "string") return null; + try { + const parsed = JSON.parse(raw); + if (parsed && typeof parsed === "object") { + durableTraceDebugLog("Found latest trace-context checkpoint", { + checkpointName: best.op.Name, + checkpointNumber: best.number, + operationId: best.op.Id, + hasPayload: Boolean(best.op.Payload), + hasStepResult: Boolean(best.op.StepDetails?.Result), + }); + return { number: best.number, headers: parsed as Record }; + } + } catch (e) { + logDebug(`Failed to parse trace checkpoint payload: ${e}`); + durableTraceDebugLog("Failed to parse trace-context checkpoint payload", { + checkpointName: best.op.Name, + checkpointNumber: best.number, + operationId: best.op.Id, + parseError: e instanceof Error ? e.message : String(e), + }); + } + return null; +} + +/** + * Try to extract a real Datadog trace context from the original customer event + * stored inside the durable execution envelope. + * + * The original event is stored in Operations[0].ExecutionDetails.InputPayload. + * Since all invocations replay the same stored event, any trace headers injected + * by an upstream Datadog-traced service will be present on every invocation. + * + * Returns extracted context info or null. + */ +function extractUpstreamTraceContext( + event: DurableExecutionEvent, +): { traceId: string; parentId: string; samplingPriority: string; ptid: string } | null { + try { + const operations = event.InitialExecutionState?.Operations; + if (!operations || operations.length === 0) return null; + + const firstOp = operations[0]; + const inputPayloadStr = firstOp.ExecutionDetails?.InputPayload; + if (!inputPayloadStr) return null; + + const customerEvent = JSON.parse(inputPayloadStr); + if (!customerEvent || typeof customerEvent !== "object") return null; + + // Try headers (API Gateway, ALB, Function URL) + const headers = customerEvent.headers; + if (headers && typeof headers === "object") { + const traceId = headers["x-datadog-trace-id"]; + const parentId = headers["x-datadog-parent-id"]; + if (traceId && parentId) { + const samplingPriority = headers["x-datadog-sampling-priority"] || "1"; + const tags = headers["x-datadog-tags"] || ""; + const ptid = parsePtid(tags); + logDebug(`Found upstream trace context in customer event headers`); + return { traceId, parentId, samplingPriority, ptid }; + } + } + + // Try _datadog field (direct invocation / Step Functions) + const ddData = customerEvent._datadog; + if (ddData && typeof ddData === "object") { + const traceId = ddData["x-datadog-trace-id"]; + const parentId = ddData["x-datadog-parent-id"]; + if (traceId && parentId) { + const samplingPriority = ddData["x-datadog-sampling-priority"] || "1"; + const tags = ddData["x-datadog-tags"] || ""; + const ptid = parsePtid(tags); + logDebug(`Found upstream trace context in customer event _datadog field`); + return { traceId, parentId, samplingPriority, ptid }; + } + } + } catch (e) { + logDebug(`Failed to extract upstream trace context from durable event: ${e}`); + } + + return null; +} + +/** + * Parse _dd.p.tid from x-datadog-tags string. + * Format: "_dd.p.tid=66bcb5eb00000000,_dd.p.dm=-0" + */ +function parsePtid(tags: string): string { + if (!tags) return ""; + for (const tag of tags.split(",")) { + if (tag.includes("_dd.p.tid=")) { + return tag.split("=")[1] || ""; + } + } + return ""; +} + +/** + * Durable Execution Trace Extractor — Deterministic Approach with W3C Support + * + * Strategy: + * 1. Try to extract real upstream trace context from customer event + * 2. Fall back to deterministic 128-bit trace ID from execution ARN + * + * In both cases: + * - parent_id links to the last completed operation for replay chaining + * - _dd.p.tid is set for full 128-bit W3C trace ID support + */ +export class DurableExecutionEventTraceExtractor implements EventTraceExtractor { + extract(event: unknown): SpanContextWrapper | null { + if (!isDurableExecutionEvent(event)) { + logDebug("Event is not a durable execution event"); + return null; + } + + const executionArn = event.DurableExecutionArn; + if (!executionArn) { + logDebug("No DurableExecutionArn in event"); + return null; + } + + const operations = event.InitialExecutionState?.Operations; + durableTraceDebugLog("Durable invocation event received", { + executionArn, + checkpointToken: event.CheckpointToken, + operationCount: operations?.length ?? 0, + }); + if (operations?.length) { + const checkpointOperations = operations + .filter((op) => typeof op?.Name === "string" && op.Name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)) + .map((op) => ({ + id: op.Id, + name: op.Name, + status: op.Status, + hasPayload: Boolean(op.Payload), + hasStepResult: Boolean(op.StepDetails?.Result), + })); + durableTraceDebugLog("Trace-context checkpoint operations present in event", { + checkpoints: checkpointOperations, + }); + } + + // --- Step 0: Prefer a previously-saved trace-context checkpoint --- + // If a previous invocation saved a `_dd_trace_context_{N}` checkpoint, use + // the one with the highest N — it reflects the latest trace-context state + // of the ongoing durable execution. Same scheme as dd-trace-py. + const latestCheckpoint = findLatestTraceContextCheckpoint(event); + const executionRootSpanId = getDurableExecutionRootSpanId(executionArn); + if (latestCheckpoint) { + logDebug( + `Using trace context from checkpoint _dd_trace_context_${latestCheckpoint.number}`, + ); + const traceIdStr = latestCheckpoint.headers["x-datadog-trace-id"]; + const parentIdStr = latestCheckpoint.headers["x-datadog-parent-id"]; + const samplingPriorityStr = latestCheckpoint.headers["x-datadog-sampling-priority"] || "1"; + const tagsStr = latestCheckpoint.headers["x-datadog-tags"] || ""; + let ptidFromTags = parsePtid(tagsStr); + let effectiveTraceId = traceIdStr; + let effectiveParentId = parentIdStr; + + if ((!effectiveTraceId || !effectiveParentId) && latestCheckpoint.headers.traceparent) { + const parsedTraceparent = parseTraceparentHex(latestCheckpoint.headers.traceparent); + if (parsedTraceparent) { + effectiveTraceId = effectiveTraceId || parsedTraceparent.lower64TraceIdDec; + effectiveParentId = effectiveParentId || parsedTraceparent.parentIdDec; + ptidFromTags = ptidFromTags || parsedTraceparent.upper64TraceIdHex; + durableTraceDebugLog("Derived Datadog IDs from traceparent in checkpoint", { + checkpointNumber: latestCheckpoint.number, + traceparent: latestCheckpoint.headers.traceparent, + derivedTraceId: effectiveTraceId, + derivedParentId: effectiveParentId, + derivedPtid: ptidFromTags, + }); + } + } + + const normalizedTraceId = normalizeTraceIdToDecimal(effectiveTraceId); + const normalizedParentId = normalizeParentIdToDecimal(effectiveParentId); + if (!ptidFromTags && normalizedTraceId.ptidFromTraceId) { + ptidFromTags = normalizedTraceId.ptidFromTraceId; + } + + durableTraceDebugLog("Normalized checkpoint IDs", { + checkpointNumber: latestCheckpoint.number, + rawTraceId: effectiveTraceId, + rawParentId: effectiveParentId, + normalizedTraceId: normalizedTraceId.traceId, + normalizedParentId, + executionRootSpanId, + ptid: ptidFromTags, + }); + + durableTraceDebugLog("Checkpoint headers selected for extraction", { + checkpointNumber: latestCheckpoint.number, + headerKeys: Object.keys(latestCheckpoint.headers), + traceId: normalizedTraceId.traceId, + parentId: normalizedParentId, + samplingPriority: samplingPriorityStr, + ptid: ptidFromTags, + }); + + if (normalizedTraceId.traceId) { + try { + const _DatadogSpanContext = require("dd-trace/packages/dd-trace/src/opentracing/span_context"); + const id = require("dd-trace/packages/dd-trace/src/id"); + + const ddSpanContext = new _DatadogSpanContext({ + traceId: id(normalizedTraceId.traceId, 10), + // Use deterministic root span id from executionArn so all invocations + // remain anchored to the same durable root regardless of checkpoint payload format. + spanId: id(executionRootSpanId, 10), + sampling: { priority: samplingPriorityStr }, + }); + + if (ptidFromTags) { + ddSpanContext._trace.tags["_dd.p.tid"] = ptidFromTags; + } + + durableTraceDebugLog("Activated trace context from checkpoint", { + checkpointNumber: latestCheckpoint.number, + activatedTraceId: normalizedTraceId.traceId, + activatedParentId: executionRootSpanId, + activatedPtid: ptidFromTags, + checkpointParentId: normalizedParentId, + }); + return new SpanContextWrapper(ddSpanContext, TraceSource.Event); + } catch (e) { + logDebug(`Failed to construct SpanContext from checkpoint: ${e}`); + durableTraceDebugLog("Failed to activate trace context from checkpoint", { + checkpointNumber: latestCheckpoint.number, + activationError: e instanceof Error ? e.message : String(e), + traceId: normalizedTraceId.traceId, + parentId: executionRootSpanId, + ptid: ptidFromTags, + }); + // Fall through to existing paths + } + } else { + durableTraceDebugLog("Checkpoint did not contain usable trace identifiers", { + checkpointNumber: latestCheckpoint.number, + hasTraceId: Boolean(normalizedTraceId.traceId), + hasParentId: Boolean(normalizedParentId || executionRootSpanId), + traceparent: latestCheckpoint.headers.traceparent, + }); + } + } else { + durableTraceDebugLog("No trace-context checkpoint found in event operations", { + executionArn, + operationCount: operations?.length ?? 0, + }); + } + + // --- Step 1: Try to use real upstream trace context --- + const upstream = extractUpstreamTraceContext(event); + + let traceId: string; + let ptid: string; + const rootSpanId = executionRootSpanId; + let samplingPriority: string; + + if (upstream) { + const normalizedUpstreamTrace = normalizeTraceIdToDecimal(upstream.traceId); + const normalizedTraceId = normalizedUpstreamTrace.traceId; + + if (normalizedTraceId) { + traceId = normalizedTraceId; + ptid = upstream.ptid || normalizedUpstreamTrace.ptidFromTraceId || ""; + samplingPriority = upstream.samplingPriority; + logDebug(`Using upstream trace_id=${traceId}, _dd.p.tid=${ptid}`); + } else { + const randomTrace = generateRandomTraceId128(); + traceId = randomTrace.traceId; + ptid = randomTrace.ptid; + samplingPriority = SampleMode.AUTO_KEEP.toString(); + logDebug(`Upstream trace_id invalid, generated new trace_id=${traceId}, _dd.p.tid=${ptid}`); + } + + // For the first invocation (no checkpoint), use the deterministic durable + // root span id derived from executionArn. + durableTraceDebugLog("No checkpoint found; generated durable root context from upstream trace", { + traceId, + rootSpanId, + upstreamParentId: upstream.parentId, + samplingPriority, + ptid, + }); + } else { + // --- Step 2: No checkpoint and no upstream context --- + // Start a new trace and use deterministic root span id that will be + // referenced by checkpoints in later invocations. + const randomTrace = generateRandomTraceId128(); + traceId = randomTrace.traceId; + ptid = randomTrace.ptid; + samplingPriority = SampleMode.AUTO_KEEP.toString(); + + logDebug(`No upstream context, generated trace_id=${traceId}, root_span_id=${rootSpanId}, _dd.p.tid=${ptid}`); + durableTraceDebugLog("No checkpoint/upstream found; generated new durable trace context", { + executionArn, + traceId, + rootSpanId, + ptid, + }); + } + + logDebug(`Generated initial durable root context: trace_id=${traceId}, root_span_id=${rootSpanId}, _dd.p.tid=${ptid}`); + + // Construct span context with _dd.p.tid for 128-bit W3C trace ID support + // Similar to Step Functions' approach in step-function-service.ts + try { + const _DatadogSpanContext = require("dd-trace/packages/dd-trace/src/opentracing/span_context"); + const id = require("dd-trace/packages/dd-trace/src/id"); + + const ddSpanContext = new _DatadogSpanContext({ + traceId: id(traceId, 10), + spanId: id(rootSpanId, 10), + sampling: { priority: samplingPriority }, + }); + + // Set _dd.p.tid for upper 64 bits of 128-bit trace ID + if (ptid) { + ddSpanContext._trace.tags["_dd.p.tid"] = ptid; + } + + return new SpanContextWrapper(ddSpanContext, TraceSource.Event); + } catch (error) { + if (error instanceof Error) { + logDebug("Couldn't generate SpanContext with tracer, falling back.", error); + } + } + + // Fallback without _dd.p.tid if dd-trace is not available + return SpanContextWrapper.fromTraceContext({ + traceId, + parentId: rootSpanId, + sampleMode: parseInt(samplingPriority, 10), + source: TraceSource.Event, + }); + } +} + +/** + * Utility to check if a durable operation is a replay + * + * An operation is a replay if it exists in the initial execution state + * with a terminal status (SUCCEEDED, FAILED, CANCELLED, STOPPED, TIMED_OUT) + * + * @param event - Lambda event + * @param stepId - The step ID to check (may be hashed) + * @returns true if the operation is a replay + */ +export function isOperationReplay(event: unknown, stepId: string): boolean { + if (!isDurableExecutionEvent(event)) { + return false; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations || operations.length === 0) { + return false; + } + + const operation = operations.find((op) => op.Id === stepId); + if (!operation) { + return false; + } + + return TERMINAL_STATUSES.has(operation.Status); +} + +/** + * Get the replay status of an operation + * + * @param event - Lambda event + * @param stepId - The step ID to check + * @returns Operation status if found, undefined otherwise + */ +export function getOperationStatus(event: unknown, stepId: string): string | undefined { + if (!isDurableExecutionEvent(event)) { + return undefined; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations) { + return undefined; + } + + const operation = operations.find((op) => op.Id === stepId); + return operation?.Status; +} + +/** + * Count the number of completed operations in the event + * + * @param event - Lambda event + * @returns Number of completed operations + */ +export function getCompletedOperationCount(event: unknown): number { + if (!isDurableExecutionEvent(event)) { + return 0; + } + + const operations = event.InitialExecutionState?.Operations; + if (!operations) { + return 0; + } + + return operations.filter((op) => + op.Status === "SUCCEEDED" || op.Status === "FAILED" + ).length; +} + +/** + * Create (or re-emit) the durable execution root span. + * + * Every invocation emits this span with the same propagated root span_id. + * The Datadog backend deduplicates by span_id, so the last invocation's + * version wins with the correct total duration (start → final end). + * + * Returns an object with { span, finish() } or null if not a durable execution. + * Caller must call finish() when the invocation ends. + */ +export function createDurableExecutionRootSpan( + event: unknown, + extractedRootContext?: SpanContextWrapper | null, +): { span: any; finish: () => void } | null { + if (!isDurableExecutionEvent(event)) { + return null; + } + + const executionArn = event.DurableExecutionArn; + if (!executionArn) { + return null; + } + + const operations = event.InitialExecutionState?.Operations; + const hasCheckpoint = Boolean( + operations?.some((op) => typeof op?.Name === "string" && op.Name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)), + ); + const hasCompletedOperation = Boolean(operations?.some((op) => TERMINAL_STATUSES.has(op.Status))); + const isLikelyFirstInvocation = !hasCheckpoint && !hasCompletedOperation && (operations?.length ?? 0) <= 1; + + if (!isLikelyFirstInvocation) { + durableTraceDebugLog("Skipping durable root span creation for replay invocation", { + executionArn, + operationCount: operations?.length ?? 0, + hasCheckpoint, + hasCompletedOperation, + }); + return null; + } + + const rootSpanId = getDurableExecutionRootSpanId(executionArn); + const extractedTraceId = extractedRootContext?.toTraceId(); + const extractedSpanId = extractedRootContext?.toSpanId(); + durableTraceDebugLog("Preparing durable root span creation", { + executionArn, + extractedTraceId, + extractedSpanId, + expectedRootSpanId: rootSpanId, + operationCount: operations?.length ?? 0, + }); + + // Determine consistent start_time from the first operation's StartTimestamp + // StartTimestamp is unix milliseconds from the durable execution SDK + let startTime: number | undefined; + const replayOperations = event.InitialExecutionState?.Operations; + if (replayOperations && replayOperations.length > 0) { + const firstStartTs = replayOperations[0].StartTimestamp; + if (firstStartTs != null) { + const parsed = Number(firstStartTs); + if (!isNaN(parsed)) { + startTime = parsed; // already in millis, dd-trace startSpan expects millis + } + } + } + + try { + const tracer = require("dd-trace"); + const id = require("dd-trace/packages/dd-trace/src/id"); + + const serviceName = process.env.DD_DURABLE_EXECUTION_SERVICE || "aws.durable-execution"; + const resourceName = executionArn.includes(":") ? executionArn.split(":").pop() : executionArn; + + const spanOptions: Record = { + type: "serverless", + tags: { + "service.name": serviceName, + "resource.name": resourceName, + "durable.execution_arn": executionArn, + "durable.is_root_span": true, + "durable.invocation_count": replayOperations?.length ?? 0, + }, + }; + + if (startTime !== undefined) { + spanOptions.startTime = startTime; + } + if (extractedRootContext?.spanContext) { + // Ensure the durable root span stays in the same trace as the extracted + // durable invocation context even when there is no active scope. + spanOptions.childOf = extractedRootContext.spanContext; + } + + let activeBefore: any; + try { + activeBefore = tracer?.scope?.().active?.(); + } catch { + activeBefore = undefined; + } + durableTraceDebugLog("Creating durable root span", { + executionArn, + startTime, + expectedRootSpanId: rootSpanId, + activeScopeBefore: activeBefore ? { + traceId: activeBefore.context?.()?.toTraceId?.(), + spanId: activeBefore.context?.()?.toSpanId?.(), + } : null, + }); + + const span = tracer.startSpan("aws.durable-execution", spanOptions); + + // Re-emit root span using the extracted root span_id so all invocations + // refer to the same durable execution root (propagated by checkpoints). + try { + if (rootSpanId) { + span.context()._spanId = id(rootSpanId, 10); + } + } catch (e) { + logDebug(`Failed to set durable root span_id: ${e}`); + } + + // Fix parent_id: the active context has span_id=root_span_id (set by + // DurableExecutionEventTraceExtractor.extract), so tracer.startSpan() + // inherits that as parent_id, causing self-parenting. The root span's + // parent should be the upstream caller (if extracted) or 0 (true root). + try { + const upstream = extractUpstreamTraceContext(event as DurableExecutionEvent); + if (upstream) { + span.context()._parentId = id(upstream.parentId, 10); + } else { + span.context()._parentId = id("0", 10); + } + } catch (e) { + logDebug(`Failed to set root span parent_id: ${e}`); + } + + const createdTraceId = span.context().toTraceId?.(); + const createdSpanId = span.context().toSpanId?.(); + const createdParentId = span.context()._parentId?.toString?.(10) ?? span.context()._parentId?.toString?.(); + durableTraceDebugLog("Durable root span created", { + executionArn, + createdTraceId, + createdSpanId, + createdParentId, + expectedRootSpanId: rootSpanId, + extractedTraceId, + extractedSpanId, + traceMatchesExtracted: extractedTraceId ? createdTraceId === extractedTraceId : undefined, + spanMatchesExpectedRoot: createdSpanId === rootSpanId, + }); + if (extractedTraceId && createdTraceId && extractedTraceId !== createdTraceId) { + durableTraceDebugLog("WARNING: durable root span trace differs from extracted durable context", { + executionArn, + extractedTraceId, + createdTraceId, + note: "This can cause the durable root span to appear standalone.", + }); + } + + logDebug(`Created root execution span: span_id=${rootSpanId ?? "auto"}, start_time=${startTime}`); + + return { + span, + finish: () => { + durableTraceDebugLog("Finishing durable root span", { + executionArn, + traceId: span.context().toTraceId?.(), + spanId: span.context().toSpanId?.(), + parentId: span.context()._parentId?.toString?.(10) ?? span.context()._parentId?.toString?.(), + }); + span.finish(); + logDebug("Finished root execution span"); + }, + }; + } catch (e) { + logDebug(`Failed to create durable execution root span: ${e}`); + return null; + } +} diff --git a/src/trace/context/extractors/index.ts b/src/trace/context/extractors/index.ts index 6bd690713..0fbbf081b 100644 --- a/src/trace/context/extractors/index.ts +++ b/src/trace/context/extractors/index.ts @@ -9,3 +9,4 @@ export { SNSSQSEventTraceExtractor } from "./sns-sqs"; export { StepFunctionEventTraceExtractor } from "./step-function"; export { LambdaContextTraceExtractor } from "./lambda-context"; export { CustomTraceExtractor } from "./custom"; +export { DurableExecutionEventTraceExtractor, isDurableExecutionEvent, isDurableExecutionReplay, createDurableExecutionRootSpan } from "./durable-execution"; diff --git a/src/trace/listener.ts b/src/trace/listener.ts index f068c5352..628f8ec42 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -27,6 +27,7 @@ import { } from "./durable-function-context"; import { XrayService } from "./xray-service"; import { AUTHORIZING_REQUEST_ID_HEADER } from "./context/extractors/http"; +import { createDurableExecutionRootSpan } from "./context/extractors/durable-execution"; import { getSpanPointerAttributes, SpanPointerAttributes } from "../utils/span-pointers"; export type TraceExtractor = (event: any, context: Context) => Promise | TraceContext; @@ -102,6 +103,7 @@ export class TraceListener { private wrappedCurrentSpan?: SpanWrapper; private triggerTags?: { [key: string]: string }; private lambdaSpanParentContext?: SpanContext; + private durableRootSpan?: { span: any; finish: () => void }; private spanPointerAttributesList: SpanPointerAttributes[] | undefined; public get currentTraceHeaders() { @@ -115,6 +117,12 @@ export class TraceListener { } public async onStartInvocation(event: any, context: Context) { + const durableTraceDebugEnabled = (() => { + const value = process.env.DD_DURABLE_TRACE_DEBUG; + if (!value) return false; + const normalized = value.toLowerCase(); + return normalized === "1" || normalized === "true" || normalized === "yes" || normalized === "on"; + })(); const tracerInitialized = this.tracerWrapper.isTracerAvailable; if (this.config.injectLogContext) { patchConsole(console, this.contextService); @@ -143,6 +151,32 @@ export class TraceListener { traceSource: this.contextService.traceSource, }); } + // Create the durable execution root span before everything else. + // This span uses the propagated root span_id and is re-emitted on every + // invocation (last one wins in the backend with correct total duration). + if (durableTraceDebugEnabled && spanContextWrapper) { + console.log("[dd-lambda][durable-trace] Listener root-span inputs", { + traceSource: this.contextService.traceSource, + extracted: { + traceId: spanContextWrapper.toTraceId(), + parentId: spanContextWrapper.toSpanId(), + sampleMode: spanContextWrapper.sampleMode(), + }, + }); + } + this.durableRootSpan = createDurableExecutionRootSpan(event, spanContextWrapper) ?? undefined; + if (durableTraceDebugEnabled) { + const durableRootSpanContext = this.durableRootSpan?.span?.context?.(); + console.log("[dd-lambda][durable-trace] Listener root-span creation result", { + created: Boolean(this.durableRootSpan), + rootSpan: durableRootSpanContext ? { + traceId: durableRootSpanContext.toTraceId?.(), + spanId: durableRootSpanContext.toSpanId?.(), + parentId: durableRootSpanContext._parentId?.toString?.(10) ?? durableRootSpanContext._parentId?.toString?.(), + } : null, + }); + } + if (this.config.createInferredSpan) { this.inferredSpan = this.inferrer.createInferredSpan( event, @@ -309,6 +343,13 @@ export class TraceListener { this.injectAuthorizerSpan(result, event?.requestContext?.requestId, finishTime || Date.now()); } + // Finish the durable execution root span after all other spans. + // Re-emitted every invocation; the last one wins in the backend. + if (this.durableRootSpan) { + this.durableRootSpan.finish(); + this.durableRootSpan = undefined; + } + // Reset singletons and trace context this.stepFunctionContext = undefined; this.durableFunctionContext = undefined; From 965710ebb023393c7525e9a133880d5d40e68d86 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Wed, 29 Apr 2026 23:53:14 -0400 Subject: [PATCH 2/7] some updates and fixes --- src/trace/context/extractor.ts | 28 +--- .../extractors/durable-execution.spec.ts | 14 +- .../context/extractors/durable-execution.ts | 125 +++++++++--------- 3 files changed, 66 insertions(+), 101 deletions(-) diff --git a/src/trace/context/extractor.ts b/src/trace/context/extractor.ts index 233887308..34afff0f6 100644 --- a/src/trace/context/extractor.ts +++ b/src/trace/context/extractor.ts @@ -46,12 +46,6 @@ export class TraceContextExtractor { async extract(event: any, context: Context): Promise { let spanContext: SpanContextWrapper | null = null; - const durableTraceDebugEnabled = (() => { - const value = process.env.DD_DURABLE_TRACE_DEBUG; - if (!value) return false; - const normalized = value.toLowerCase(); - return normalized === "1" || normalized === "true" || normalized === "yes" || normalized === "on"; - })(); if (this.config.traceExtractor) { const customExtractor = new CustomTraceExtractor(this.config.traceExtractor); spanContext = await customExtractor.extract(event, context); @@ -61,20 +55,11 @@ export class TraceContextExtractor { const eventExtractor = this.getTraceEventExtractor(event); if (eventExtractor !== undefined) { spanContext = eventExtractor.extract(event); - if (durableTraceDebugEnabled && isDurableExecutionEvent(event)) { - console.log("[dd-lambda][durable-trace] Event extractor result", { - extractor: eventExtractor.constructor?.name, - extracted: spanContext ? { - traceId: spanContext.toTraceId(), - parentId: spanContext.toSpanId(), - sampleMode: spanContext.sampleMode(), - } : null, - }); - } } } - // No stripping needed — deterministic approach never modifies checkpoint payloads. + // No stripping needed — trace context is stored in dedicated + // `_datadog_{N}` checkpoint operations. if (spanContext === null) { this.stepFunctionContextService = StepFunctionContextService.instance(event); @@ -87,15 +72,6 @@ export class TraceContextExtractor { if (spanContext === null) { const contextExtractor = new LambdaContextTraceExtractor(this.tracerWrapper); spanContext = contextExtractor.extract(context); - if (durableTraceDebugEnabled && isDurableExecutionEvent(event)) { - console.log("[dd-lambda][durable-trace] Falling back to Lambda context extraction", { - extracted: spanContext ? { - traceId: spanContext.toTraceId(), - parentId: spanContext.toSpanId(), - sampleMode: spanContext.sampleMode(), - } : null, - }); - } } if (spanContext !== null) { diff --git a/src/trace/context/extractors/durable-execution.spec.ts b/src/trace/context/extractors/durable-execution.spec.ts index 70b2adc2e..864a35e9c 100644 --- a/src/trace/context/extractors/durable-execution.spec.ts +++ b/src/trace/context/extractors/durable-execution.spec.ts @@ -1,16 +1,9 @@ -import { createHash } from "crypto"; import { createDurableExecutionRootSpan, DurableExecutionEventTraceExtractor } from "./durable-execution"; jest.mock("dd-trace", () => ({ startSpan: jest.fn(), })); -function deterministicRootSpanId(executionArn: string): string { - const hash = createHash("sha256").update(`durable-root:${executionArn}`).digest("hex"); - const masked = BigInt(`0x${hash}`) & 0x7fffffffffffffffn; - return masked === 0n ? "1" : masked.toString(10); -} - describe("DurableExecutionEventTraceExtractor", () => { const tracer = require("dd-trace"); const startSpanMock = tracer.startSpan as jest.Mock; @@ -19,7 +12,7 @@ describe("DurableExecutionEventTraceExtractor", () => { jest.clearAllMocks(); }); - it("extracts a deterministic durable root span id from executionArn", () => { + it("extracts trace context from the latest trace checkpoint", () => { const executionArn = "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/abc"; @@ -30,7 +23,7 @@ describe("DurableExecutionEventTraceExtractor", () => { Operations: [ { Id: "op-1", - Name: "_dd_trace_context_0", + Name: "_datadog_0", Status: "SUCCEEDED", StepDetails: { Result: JSON.stringify({ @@ -93,7 +86,7 @@ describe("DurableExecutionEventTraceExtractor", () => { expect(root).not.toBeNull(); expect(startSpanMock).toHaveBeenCalledTimes(1); - expect(root?.span.context()._spanId.toString(10)).toBe("2222222222222222222"); + expect(root?.span.context()._spanId.toString(10)).toBe(extracted?.toSpanId()); }); it("skips durable root span creation on replay invocations", () => { @@ -134,4 +127,3 @@ describe("DurableExecutionEventTraceExtractor", () => { expect(startSpanMock).not.toHaveBeenCalled(); }); }); - diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index 88b9f5bdf..c2976b6ee 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -1,28 +1,14 @@ /** - * Durable Execution Trace Extractor — Deterministic Approach - * - * Generates deterministic trace context from AWS Lambda Durable Execution events - * using SHA-256 hashing of the execution ARN and operation identifiers. + * Durable Execution Trace Extractor — Checkpoint/Upstream Approach * * Strategy: - * 1. First, try to extract a real trace context from the original customer event - * (stored in Operations[0].ExecutionDetails.InputPayload). If found, the durable - * execution trace connects to the upstream caller's trace. - * 2. If no upstream context exists, fall back to deterministic hashing from the - * execution ARN, generating a full 128-bit trace ID (lower 64 bits + _dd.p.tid) - * for W3C/OpenTelemetry compatibility. - * - * Flow: - * 1. Every invocation receives the same DurableExecutionArn - * 2. The original customer event is stored in Operations[0].ExecutionDetails.InputPayload - * and is identical across all invocations — any upstream trace headers persist - * 3. trace_id = real upstream or hash("durable-trace:{arn}") lower 64 bits - * 4. _dd.p.tid = real upstream or hash("durable-trace:{arn}") upper 64 bits - * 5. parent_id = hash("durable-span:{arn}#{last_completed_op_id}") — links to previous invocation - * 6. Checkpoint payloads are never modified + * 1. Prefer trace context from the latest `_datadog_{N}` checkpoint. + * 2. If no trace checkpoint exists (first invocation), try upstream trace context + * from the original customer event stored in `Operations[0].ExecutionDetails.InputPayload`. + * 3. If neither exists, start a new trace with random IDs. */ -import { createHash, randomBytes } from "crypto"; +import { randomBytes } from "crypto"; import { logDebug } from "../../../utils"; import { SpanContextWrapper } from "../../span-context-wrapper"; import { SampleMode, TraceSource } from "../../trace-context-service"; @@ -262,37 +248,45 @@ function bufferToBigInt(buf: Buffer): bigint { // Terminal operation statuses that indicate an operation has completed const TERMINAL_STATUSES = new Set(["SUCCEEDED", "FAILED", "CANCELLED", "STOPPED", "TIMED_OUT"]); -const TRACE_CHECKPOINT_NAME_PREFIX = "_dd_trace_context_"; +const TRACE_CHECKPOINT_NAME_PREFIX = "_datadog_"; +const LEGACY_TRACE_CHECKPOINT_NAME_PREFIX = "_dd_trace_context_"; +const TRACE_CHECKPOINT_NAME_PREFIXES = [ + TRACE_CHECKPOINT_NAME_PREFIX, + LEGACY_TRACE_CHECKPOINT_NAME_PREFIX, +]; + +function parseTraceCheckpointNumber(name: unknown): number | null { + if (typeof name !== "string") return null; -function deterministicSha256Hash(s: string): string { - const hash = createHash("sha256").update(s).digest("hex"); - const fullBigInt = BigInt("0x" + hash); - const masked = fullBigInt & 0x7fffffffffffffffn; - return masked === 0n ? "1" : masked.toString(10); + const prefix = TRACE_CHECKPOINT_NAME_PREFIXES.find((candidate) => name.startsWith(candidate)); + if (!prefix) return null; + + const suffix = name.slice(prefix.length); + const n = Number.parseInt(suffix, 10); + if (Number.isNaN(n) || String(n) !== suffix) return null; + return n; } -function getDurableExecutionRootSpanId(executionArn: string): string { - return deterministicSha256Hash(`durable-root:${executionArn}`); +function isTraceCheckpointName(name: unknown): boolean { + return parseTraceCheckpointNumber(name) !== null; } /** - * Find the highest-numbered `_dd_trace_context_{N}` checkpoint in the event. + * Find the highest-numbered `_datadog_{N}` checkpoint in the event. + * Also supports legacy `_dd_trace_context_{N}` checkpoints for compatibility. * Each invocation that changes trace context saves a new checkpoint with * N+1; the one with the highest N is the most recent. */ function findLatestTraceContextCheckpoint( event: DurableExecutionEvent, -): { number: number; headers: Record } | null { +): { number: number; name: string; headers: Record } | null { const operations = event.InitialExecutionState?.Operations; if (!operations || operations.length === 0) return null; let best: { number: number; op: DurableExecutionOperation } | null = null; for (const op of operations) { - const name = op?.Name; - if (!name || !name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)) continue; - const suffix = name.slice(TRACE_CHECKPOINT_NAME_PREFIX.length); - const n = Number.parseInt(suffix, 10); - if (Number.isNaN(n) || String(n) !== suffix) continue; + const n = parseTraceCheckpointNumber(op?.Name); + if (n === null) continue; if (best === null || n > best.number) { best = { number: n, op }; } @@ -311,7 +305,11 @@ function findLatestTraceContextCheckpoint( hasPayload: Boolean(best.op.Payload), hasStepResult: Boolean(best.op.StepDetails?.Result), }); - return { number: best.number, headers: parsed as Record }; + return { + number: best.number, + name: String(best.op.Name), + headers: parsed as Record, + }; } } catch (e) { logDebug(`Failed to parse trace checkpoint payload: ${e}`); @@ -398,15 +396,12 @@ function parsePtid(tags: string): string { } /** - * Durable Execution Trace Extractor — Deterministic Approach with W3C Support + * Durable Execution Trace Extractor * * Strategy: - * 1. Try to extract real upstream trace context from customer event - * 2. Fall back to deterministic 128-bit trace ID from execution ARN - * - * In both cases: - * - parent_id links to the last completed operation for replay chaining - * - _dd.p.tid is set for full 128-bit W3C trace ID support + * 1. Prefer `_datadog_{N}` checkpoint context when present. + * 2. Otherwise, derive trace linkage from upstream customer event context. + * 3. If none exists, start a new random trace context. */ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor { extract(event: unknown): SpanContextWrapper | null { @@ -429,7 +424,7 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor }); if (operations?.length) { const checkpointOperations = operations - .filter((op) => typeof op?.Name === "string" && op.Name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)) + .filter((op) => isTraceCheckpointName(op?.Name)) .map((op) => ({ id: op.Id, name: op.Name, @@ -443,14 +438,13 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor } // --- Step 0: Prefer a previously-saved trace-context checkpoint --- - // If a previous invocation saved a `_dd_trace_context_{N}` checkpoint, use + // If a previous invocation saved a `_datadog_{N}` checkpoint, use // the one with the highest N — it reflects the latest trace-context state // of the ongoing durable execution. Same scheme as dd-trace-py. const latestCheckpoint = findLatestTraceContextCheckpoint(event); - const executionRootSpanId = getDurableExecutionRootSpanId(executionArn); if (latestCheckpoint) { logDebug( - `Using trace context from checkpoint _dd_trace_context_${latestCheckpoint.number}`, + `Using trace context from checkpoint ${latestCheckpoint.name}`, ); const traceIdStr = latestCheckpoint.headers["x-datadog-trace-id"]; const parentIdStr = latestCheckpoint.headers["x-datadog-parent-id"]; @@ -488,7 +482,6 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor rawParentId: effectiveParentId, normalizedTraceId: normalizedTraceId.traceId, normalizedParentId, - executionRootSpanId, ptid: ptidFromTags, }); @@ -501,16 +494,14 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor ptid: ptidFromTags, }); - if (normalizedTraceId.traceId) { + if (normalizedTraceId.traceId && normalizedParentId) { try { const _DatadogSpanContext = require("dd-trace/packages/dd-trace/src/opentracing/span_context"); const id = require("dd-trace/packages/dd-trace/src/id"); const ddSpanContext = new _DatadogSpanContext({ traceId: id(normalizedTraceId.traceId, 10), - // Use deterministic root span id from executionArn so all invocations - // remain anchored to the same durable root regardless of checkpoint payload format. - spanId: id(executionRootSpanId, 10), + spanId: id(normalizedParentId, 10), sampling: { priority: samplingPriorityStr }, }); @@ -521,9 +512,8 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor durableTraceDebugLog("Activated trace context from checkpoint", { checkpointNumber: latestCheckpoint.number, activatedTraceId: normalizedTraceId.traceId, - activatedParentId: executionRootSpanId, + activatedParentId: normalizedParentId, activatedPtid: ptidFromTags, - checkpointParentId: normalizedParentId, }); return new SpanContextWrapper(ddSpanContext, TraceSource.Event); } catch (e) { @@ -532,16 +522,24 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor checkpointNumber: latestCheckpoint.number, activationError: e instanceof Error ? e.message : String(e), traceId: normalizedTraceId.traceId, - parentId: executionRootSpanId, + parentId: normalizedParentId, ptid: ptidFromTags, }); - // Fall through to existing paths + const fallback = SpanContextWrapper.fromTraceContext({ + traceId: normalizedTraceId.traceId, + parentId: normalizedParentId, + sampleMode: parseInt(samplingPriorityStr, 10), + source: TraceSource.Event, + }); + if (fallback) { + return fallback; + } } } else { durableTraceDebugLog("Checkpoint did not contain usable trace identifiers", { checkpointNumber: latestCheckpoint.number, hasTraceId: Boolean(normalizedTraceId.traceId), - hasParentId: Boolean(normalizedParentId || executionRootSpanId), + hasParentId: Boolean(normalizedParentId), traceparent: latestCheckpoint.headers.traceparent, }); } @@ -557,7 +555,7 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor let traceId: string; let ptid: string; - const rootSpanId = executionRootSpanId; + const rootSpanId = generateRandomPositiveId(); let samplingPriority: string; if (upstream) { @@ -577,8 +575,7 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor logDebug(`Upstream trace_id invalid, generated new trace_id=${traceId}, _dd.p.tid=${ptid}`); } - // For the first invocation (no checkpoint), use the deterministic durable - // root span id derived from executionArn. + // For first invocation, create a new durable root span id and chain aws.lambda to it. durableTraceDebugLog("No checkpoint found; generated durable root context from upstream trace", { traceId, rootSpanId, @@ -588,8 +585,8 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor }); } else { // --- Step 2: No checkpoint and no upstream context --- - // Start a new trace and use deterministic root span id that will be - // referenced by checkpoints in later invocations. + // Start a new trace and create a random durable root span id that + // checkpoints will carry across subsequent invocations. const randomTrace = generateRandomTraceId128(); traceId = randomTrace.traceId; ptid = randomTrace.ptid; @@ -735,7 +732,7 @@ export function createDurableExecutionRootSpan( const operations = event.InitialExecutionState?.Operations; const hasCheckpoint = Boolean( - operations?.some((op) => typeof op?.Name === "string" && op.Name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)), + operations?.some((op) => isTraceCheckpointName(op?.Name)), ); const hasCompletedOperation = Boolean(operations?.some((op) => TERMINAL_STATUSES.has(op.Status))); const isLikelyFirstInvocation = !hasCheckpoint && !hasCompletedOperation && (operations?.length ?? 0) <= 1; @@ -750,9 +747,9 @@ export function createDurableExecutionRootSpan( return null; } - const rootSpanId = getDurableExecutionRootSpanId(executionArn); const extractedTraceId = extractedRootContext?.toTraceId(); const extractedSpanId = extractedRootContext?.toSpanId(); + const rootSpanId = extractedSpanId || generateRandomPositiveId(); durableTraceDebugLog("Preparing durable root span creation", { executionArn, extractedTraceId, From ad4a2c4ef78a8a2e01453e10c7c35c831b68dda1 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 30 Apr 2026 21:59:59 -0400 Subject: [PATCH 3/7] remove debugging ENV --- .../context/extractors/durable-execution.ts | 179 +----------------- src/trace/listener.ts | 27 --- 2 files changed, 1 insertion(+), 205 deletions(-) diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index c2976b6ee..e2200e344 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -14,24 +14,6 @@ import { SpanContextWrapper } from "../../span-context-wrapper"; import { SampleMode, TraceSource } from "../../trace-context-service"; import { EventTraceExtractor } from "../extractor"; -const DURABLE_TRACE_DEBUG_ENV = "DD_DURABLE_TRACE_DEBUG"; - -function durableTraceDebugEnabled(): boolean { - const value = process.env[DURABLE_TRACE_DEBUG_ENV]; - if (!value) return false; - const normalized = value.toLowerCase(); - return normalized === "1" || normalized === "true" || normalized === "yes" || normalized === "on"; -} - -function durableTraceDebugLog(message: string, details?: Record): void { - if (!durableTraceDebugEnabled()) return; - if (details) { - console.log(`[dd-lambda][durable-trace] ${message}`, details); - return; - } - console.log(`[dd-lambda][durable-trace] ${message}`); -} - function parseTraceparentHex( traceparent: unknown, ): { traceIdHex: string; parentIdHex: string; lower64TraceIdDec: string; upper64TraceIdHex: string; parentIdDec: string } | null { @@ -298,13 +280,6 @@ function findLatestTraceContextCheckpoint( try { const parsed = JSON.parse(raw); if (parsed && typeof parsed === "object") { - durableTraceDebugLog("Found latest trace-context checkpoint", { - checkpointName: best.op.Name, - checkpointNumber: best.number, - operationId: best.op.Id, - hasPayload: Boolean(best.op.Payload), - hasStepResult: Boolean(best.op.StepDetails?.Result), - }); return { number: best.number, name: String(best.op.Name), @@ -313,12 +288,6 @@ function findLatestTraceContextCheckpoint( } } catch (e) { logDebug(`Failed to parse trace checkpoint payload: ${e}`); - durableTraceDebugLog("Failed to parse trace-context checkpoint payload", { - checkpointName: best.op.Name, - checkpointNumber: best.number, - operationId: best.op.Id, - parseError: e instanceof Error ? e.message : String(e), - }); } return null; } @@ -416,27 +385,6 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor return null; } - const operations = event.InitialExecutionState?.Operations; - durableTraceDebugLog("Durable invocation event received", { - executionArn, - checkpointToken: event.CheckpointToken, - operationCount: operations?.length ?? 0, - }); - if (operations?.length) { - const checkpointOperations = operations - .filter((op) => isTraceCheckpointName(op?.Name)) - .map((op) => ({ - id: op.Id, - name: op.Name, - status: op.Status, - hasPayload: Boolean(op.Payload), - hasStepResult: Boolean(op.StepDetails?.Result), - })); - durableTraceDebugLog("Trace-context checkpoint operations present in event", { - checkpoints: checkpointOperations, - }); - } - // --- Step 0: Prefer a previously-saved trace-context checkpoint --- // If a previous invocation saved a `_datadog_{N}` checkpoint, use // the one with the highest N — it reflects the latest trace-context state @@ -460,13 +408,6 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor effectiveTraceId = effectiveTraceId || parsedTraceparent.lower64TraceIdDec; effectiveParentId = effectiveParentId || parsedTraceparent.parentIdDec; ptidFromTags = ptidFromTags || parsedTraceparent.upper64TraceIdHex; - durableTraceDebugLog("Derived Datadog IDs from traceparent in checkpoint", { - checkpointNumber: latestCheckpoint.number, - traceparent: latestCheckpoint.headers.traceparent, - derivedTraceId: effectiveTraceId, - derivedParentId: effectiveParentId, - derivedPtid: ptidFromTags, - }); } } @@ -476,24 +417,6 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor ptidFromTags = normalizedTraceId.ptidFromTraceId; } - durableTraceDebugLog("Normalized checkpoint IDs", { - checkpointNumber: latestCheckpoint.number, - rawTraceId: effectiveTraceId, - rawParentId: effectiveParentId, - normalizedTraceId: normalizedTraceId.traceId, - normalizedParentId, - ptid: ptidFromTags, - }); - - durableTraceDebugLog("Checkpoint headers selected for extraction", { - checkpointNumber: latestCheckpoint.number, - headerKeys: Object.keys(latestCheckpoint.headers), - traceId: normalizedTraceId.traceId, - parentId: normalizedParentId, - samplingPriority: samplingPriorityStr, - ptid: ptidFromTags, - }); - if (normalizedTraceId.traceId && normalizedParentId) { try { const _DatadogSpanContext = require("dd-trace/packages/dd-trace/src/opentracing/span_context"); @@ -508,23 +431,9 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor if (ptidFromTags) { ddSpanContext._trace.tags["_dd.p.tid"] = ptidFromTags; } - - durableTraceDebugLog("Activated trace context from checkpoint", { - checkpointNumber: latestCheckpoint.number, - activatedTraceId: normalizedTraceId.traceId, - activatedParentId: normalizedParentId, - activatedPtid: ptidFromTags, - }); return new SpanContextWrapper(ddSpanContext, TraceSource.Event); } catch (e) { logDebug(`Failed to construct SpanContext from checkpoint: ${e}`); - durableTraceDebugLog("Failed to activate trace context from checkpoint", { - checkpointNumber: latestCheckpoint.number, - activationError: e instanceof Error ? e.message : String(e), - traceId: normalizedTraceId.traceId, - parentId: normalizedParentId, - ptid: ptidFromTags, - }); const fallback = SpanContextWrapper.fromTraceContext({ traceId: normalizedTraceId.traceId, parentId: normalizedParentId, @@ -535,19 +444,7 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor return fallback; } } - } else { - durableTraceDebugLog("Checkpoint did not contain usable trace identifiers", { - checkpointNumber: latestCheckpoint.number, - hasTraceId: Boolean(normalizedTraceId.traceId), - hasParentId: Boolean(normalizedParentId), - traceparent: latestCheckpoint.headers.traceparent, - }); } - } else { - durableTraceDebugLog("No trace-context checkpoint found in event operations", { - executionArn, - operationCount: operations?.length ?? 0, - }); } // --- Step 1: Try to use real upstream trace context --- @@ -575,14 +472,6 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor logDebug(`Upstream trace_id invalid, generated new trace_id=${traceId}, _dd.p.tid=${ptid}`); } - // For first invocation, create a new durable root span id and chain aws.lambda to it. - durableTraceDebugLog("No checkpoint found; generated durable root context from upstream trace", { - traceId, - rootSpanId, - upstreamParentId: upstream.parentId, - samplingPriority, - ptid, - }); } else { // --- Step 2: No checkpoint and no upstream context --- // Start a new trace and create a random durable root span id that @@ -593,12 +482,6 @@ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor samplingPriority = SampleMode.AUTO_KEEP.toString(); logDebug(`No upstream context, generated trace_id=${traceId}, root_span_id=${rootSpanId}, _dd.p.tid=${ptid}`); - durableTraceDebugLog("No checkpoint/upstream found; generated new durable trace context", { - executionArn, - traceId, - rootSpanId, - ptid, - }); } logDebug(`Generated initial durable root context: trace_id=${traceId}, root_span_id=${rootSpanId}, _dd.p.tid=${ptid}`); @@ -738,25 +621,10 @@ export function createDurableExecutionRootSpan( const isLikelyFirstInvocation = !hasCheckpoint && !hasCompletedOperation && (operations?.length ?? 0) <= 1; if (!isLikelyFirstInvocation) { - durableTraceDebugLog("Skipping durable root span creation for replay invocation", { - executionArn, - operationCount: operations?.length ?? 0, - hasCheckpoint, - hasCompletedOperation, - }); return null; } - const extractedTraceId = extractedRootContext?.toTraceId(); - const extractedSpanId = extractedRootContext?.toSpanId(); - const rootSpanId = extractedSpanId || generateRandomPositiveId(); - durableTraceDebugLog("Preparing durable root span creation", { - executionArn, - extractedTraceId, - extractedSpanId, - expectedRootSpanId: rootSpanId, - operationCount: operations?.length ?? 0, - }); + const rootSpanId = extractedRootContext?.toSpanId() || generateRandomPositiveId(); // Determine consistent start_time from the first operation's StartTimestamp // StartTimestamp is unix milliseconds from the durable execution SDK @@ -799,22 +667,6 @@ export function createDurableExecutionRootSpan( spanOptions.childOf = extractedRootContext.spanContext; } - let activeBefore: any; - try { - activeBefore = tracer?.scope?.().active?.(); - } catch { - activeBefore = undefined; - } - durableTraceDebugLog("Creating durable root span", { - executionArn, - startTime, - expectedRootSpanId: rootSpanId, - activeScopeBefore: activeBefore ? { - traceId: activeBefore.context?.()?.toTraceId?.(), - spanId: activeBefore.context?.()?.toSpanId?.(), - } : null, - }); - const span = tracer.startSpan("aws.durable-execution", spanOptions); // Re-emit root span using the extracted root span_id so all invocations @@ -842,40 +694,11 @@ export function createDurableExecutionRootSpan( logDebug(`Failed to set root span parent_id: ${e}`); } - const createdTraceId = span.context().toTraceId?.(); - const createdSpanId = span.context().toSpanId?.(); - const createdParentId = span.context()._parentId?.toString?.(10) ?? span.context()._parentId?.toString?.(); - durableTraceDebugLog("Durable root span created", { - executionArn, - createdTraceId, - createdSpanId, - createdParentId, - expectedRootSpanId: rootSpanId, - extractedTraceId, - extractedSpanId, - traceMatchesExtracted: extractedTraceId ? createdTraceId === extractedTraceId : undefined, - spanMatchesExpectedRoot: createdSpanId === rootSpanId, - }); - if (extractedTraceId && createdTraceId && extractedTraceId !== createdTraceId) { - durableTraceDebugLog("WARNING: durable root span trace differs from extracted durable context", { - executionArn, - extractedTraceId, - createdTraceId, - note: "This can cause the durable root span to appear standalone.", - }); - } - logDebug(`Created root execution span: span_id=${rootSpanId ?? "auto"}, start_time=${startTime}`); return { span, finish: () => { - durableTraceDebugLog("Finishing durable root span", { - executionArn, - traceId: span.context().toTraceId?.(), - spanId: span.context().toSpanId?.(), - parentId: span.context()._parentId?.toString?.(10) ?? span.context()._parentId?.toString?.(), - }); span.finish(); logDebug("Finished root execution span"); }, diff --git a/src/trace/listener.ts b/src/trace/listener.ts index 628f8ec42..4b1aa1f95 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -117,12 +117,6 @@ export class TraceListener { } public async onStartInvocation(event: any, context: Context) { - const durableTraceDebugEnabled = (() => { - const value = process.env.DD_DURABLE_TRACE_DEBUG; - if (!value) return false; - const normalized = value.toLowerCase(); - return normalized === "1" || normalized === "true" || normalized === "yes" || normalized === "on"; - })(); const tracerInitialized = this.tracerWrapper.isTracerAvailable; if (this.config.injectLogContext) { patchConsole(console, this.contextService); @@ -154,28 +148,7 @@ export class TraceListener { // Create the durable execution root span before everything else. // This span uses the propagated root span_id and is re-emitted on every // invocation (last one wins in the backend with correct total duration). - if (durableTraceDebugEnabled && spanContextWrapper) { - console.log("[dd-lambda][durable-trace] Listener root-span inputs", { - traceSource: this.contextService.traceSource, - extracted: { - traceId: spanContextWrapper.toTraceId(), - parentId: spanContextWrapper.toSpanId(), - sampleMode: spanContextWrapper.sampleMode(), - }, - }); - } this.durableRootSpan = createDurableExecutionRootSpan(event, spanContextWrapper) ?? undefined; - if (durableTraceDebugEnabled) { - const durableRootSpanContext = this.durableRootSpan?.span?.context?.(); - console.log("[dd-lambda][durable-trace] Listener root-span creation result", { - created: Boolean(this.durableRootSpan), - rootSpan: durableRootSpanContext ? { - traceId: durableRootSpanContext.toTraceId?.(), - spanId: durableRootSpanContext.toSpanId?.(), - parentId: durableRootSpanContext._parentId?.toString?.(10) ?? durableRootSpanContext._parentId?.toString?.(), - } : null, - }); - } if (this.config.createInferredSpan) { this.inferredSpan = this.inferrer.createInferredSpan( From e149fa639498c5f057c86e00fdd1e98eec842772 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 30 Apr 2026 22:05:15 -0400 Subject: [PATCH 4/7] update comments --- src/trace/context/extractors/durable-execution.ts | 13 +++++++------ src/trace/listener.ts | 7 ++++--- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index e2200e344..aa3dbf066 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -591,11 +591,12 @@ export function getCompletedOperationCount(event: unknown): number { } /** - * Create (or re-emit) the durable execution root span. + * Create the durable execution root span for likely first invocations only. * - * Every invocation emits this span with the same propagated root span_id. - * The Datadog backend deduplicates by span_id, so the last invocation's - * version wins with the correct total duration (start → final end). + * Replay invocations return null. The current first-invocation heuristic is: + * - no trace checkpoint operation exists + * - no operation has terminal status + * - operation count is <= 1 * * Returns an object with { span, finish() } or null if not a durable execution. * Caller must call finish() when the invocation ends. @@ -669,8 +670,8 @@ export function createDurableExecutionRootSpan( const span = tracer.startSpan("aws.durable-execution", spanOptions); - // Re-emit root span using the extracted root span_id so all invocations - // refer to the same durable execution root (propagated by checkpoints). + // Use the extracted durable root span_id when available to keep the + // durable root identity stable with propagated checkpoint context. try { if (rootSpanId) { span.context()._spanId = id(rootSpanId, 10); diff --git a/src/trace/listener.ts b/src/trace/listener.ts index 4b1aa1f95..d3e8d8136 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -145,9 +145,10 @@ export class TraceListener { traceSource: this.contextService.traceSource, }); } - // Create the durable execution root span before everything else. - // This span uses the propagated root span_id and is re-emitted on every - // invocation (last one wins in the backend with correct total duration). + // Create the durable execution root span before everything else so later + // spans can parent correctly. Root creation is gated in + // createDurableExecutionRootSpan() and only happens for likely first + // invocations; replay invocations return null. this.durableRootSpan = createDurableExecutionRootSpan(event, spanContextWrapper) ?? undefined; if (this.config.createInferredSpan) { From 4f0e77fdd15062b72e0760d698f6bb5b78bc9b61 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 30 Apr 2026 23:14:55 -0400 Subject: [PATCH 5/7] refactor away the AI's complex extractor --- src/trace/context/extractor.ts | 2 +- .../extractors/durable-execution.spec.ts | 51 ++- .../context/extractors/durable-execution.ts | 364 +++--------------- src/trace/listener.ts | 1 - 4 files changed, 87 insertions(+), 331 deletions(-) diff --git a/src/trace/context/extractor.ts b/src/trace/context/extractor.ts index 34afff0f6..cc03521bc 100644 --- a/src/trace/context/extractor.ts +++ b/src/trace/context/extractor.ts @@ -87,7 +87,7 @@ export class TraceContextExtractor { if (!event || typeof event !== "object") return; // Check for durable execution event first (has DurableExecutionArn + CheckpointToken) - if (isDurableExecutionEvent(event)) return new DurableExecutionEventTraceExtractor(); + if (isDurableExecutionEvent(event)) return new DurableExecutionEventTraceExtractor(this.tracerWrapper); const headers = event.headers ?? event.multiValueHeaders; if (headers !== null && typeof headers === "object") { diff --git a/src/trace/context/extractors/durable-execution.spec.ts b/src/trace/context/extractors/durable-execution.spec.ts index 864a35e9c..b2353795b 100644 --- a/src/trace/context/extractors/durable-execution.spec.ts +++ b/src/trace/context/extractors/durable-execution.spec.ts @@ -1,9 +1,14 @@ import { createDurableExecutionRootSpan, DurableExecutionEventTraceExtractor } from "./durable-execution"; +import { TracerWrapper } from "../../tracer-wrapper"; jest.mock("dd-trace", () => ({ startSpan: jest.fn(), })); +function makeTracerWrapper(extractReturn: any = null): TracerWrapper { + return { extract: jest.fn().mockReturnValue(extractReturn) } as unknown as TracerWrapper; +} + describe("DurableExecutionEventTraceExtractor", () => { const tracer = require("dd-trace"); const startSpanMock = tracer.startSpan as jest.Mock; @@ -12,10 +17,16 @@ describe("DurableExecutionEventTraceExtractor", () => { jest.clearAllMocks(); }); - it("extracts trace context from the latest trace checkpoint", () => { + it("delegates checkpoint headers to the standard propagator", () => { const executionArn = "arn:aws:lambda:us-east-2:123456789012:function:demo:$LATEST/durable-execution/demo/abc"; + const checkpointHeaders = { + "x-datadog-trace-id": "149750110124521191", + "x-datadog-parent-id": "987654321012345678", + "x-datadog-sampling-priority": "1", + }; + const event = { DurableExecutionArn: executionArn, CheckpointToken: "t-1", @@ -26,23 +37,34 @@ describe("DurableExecutionEventTraceExtractor", () => { Name: "_datadog_0", Status: "SUCCEEDED", StepDetails: { - Result: JSON.stringify({ - "x-datadog-trace-id": "149750110124521191", - "x-datadog-parent-id": "987654321012345678", - "x-datadog-sampling-priority": "1", - }), + Result: JSON.stringify(checkpointHeaders), }, }, ], }, }; - const extractor = new DurableExecutionEventTraceExtractor(); + const sentinelContext = { sentinel: true }; + const tracerWrapper = makeTracerWrapper(sentinelContext); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); const context = extractor.extract(event); - expect(context).not.toBeNull(); - expect(context?.toTraceId()).toBe("149750110124521191"); - expect(context?.toSpanId()).toBe("987654321012345678"); + expect(tracerWrapper.extract).toHaveBeenCalledWith(checkpointHeaders); + expect(context).toBe(sentinelContext); + }); + + it("returns null when no checkpoint or upstream context exists", () => { + const tracerWrapper = makeTracerWrapper(); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); + + const context = extractor.extract({ + DurableExecutionArn: "arn:aws:lambda:us-east-2:123:function:demo", + CheckpointToken: "t-empty", + InitialExecutionState: { Operations: [] }, + }); + + expect(context).toBeNull(); + expect(tracerWrapper.extract).not.toHaveBeenCalled(); }); it("creates durable root span only for first invocation", () => { @@ -79,14 +101,10 @@ describe("DurableExecutionEventTraceExtractor", () => { }, }; - const extractor = new DurableExecutionEventTraceExtractor(); - const extracted = extractor.extract(firstInvocationEvent); - - const root = createDurableExecutionRootSpan(firstInvocationEvent, extracted); + const root = createDurableExecutionRootSpan(firstInvocationEvent, null); expect(root).not.toBeNull(); expect(startSpanMock).toHaveBeenCalledTimes(1); - expect(root?.span.context()._spanId.toString(10)).toBe(extracted?.toSpanId()); }); it("skips durable root span creation on replay invocations", () => { @@ -119,7 +137,8 @@ describe("DurableExecutionEventTraceExtractor", () => { }, }; - const extractor = new DurableExecutionEventTraceExtractor(); + const tracerWrapper = makeTracerWrapper({ source: "Event" }); + const extractor = new DurableExecutionEventTraceExtractor(tracerWrapper); const extracted = extractor.extract(replayEvent); const root = createDurableExecutionRootSpan(replayEvent, extracted); diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index aa3dbf066..9865eb0df 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -5,107 +5,19 @@ * 1. Prefer trace context from the latest `_datadog_{N}` checkpoint. * 2. If no trace checkpoint exists (first invocation), try upstream trace context * from the original customer event stored in `Operations[0].ExecutionDetails.InputPayload`. - * 3. If neither exists, start a new trace with random IDs. + * 3. If neither exists, return null and let the default extraction path create the context. + * + * The dd-trace-js durable-execution plugin writes checkpoint headers via the + * standard HTTP propagator (`tracer.inject(span, 'http_headers', headers)`), + * so we just hand the resulting header dict back to `tracer.extract` here. */ import { randomBytes } from "crypto"; import { logDebug } from "../../../utils"; import { SpanContextWrapper } from "../../span-context-wrapper"; -import { SampleMode, TraceSource } from "../../trace-context-service"; +import { TracerWrapper } from "../../tracer-wrapper"; import { EventTraceExtractor } from "../extractor"; -function parseTraceparentHex( - traceparent: unknown, -): { traceIdHex: string; parentIdHex: string; lower64TraceIdDec: string; upper64TraceIdHex: string; parentIdDec: string } | null { - if (typeof traceparent !== "string") return null; - const parts = traceparent.split("-"); - if (parts.length !== 4) return null; - const [, traceIdHex, parentIdHex] = parts; - if (!/^[0-9a-f]{32}$/i.test(traceIdHex) || !/^[0-9a-f]{16}$/i.test(parentIdHex)) { - return null; - } - - const lower64TraceIdHex = traceIdHex.slice(16); - const upper64TraceIdHex = traceIdHex.slice(0, 16); - - try { - return { - traceIdHex, - parentIdHex, - lower64TraceIdDec: BigInt(`0x${lower64TraceIdHex}`).toString(10), - upper64TraceIdHex, - parentIdDec: BigInt(`0x${parentIdHex}`).toString(10), - }; - } catch { - return null; - } -} - -function normalizeParentIdToDecimal(parentId: unknown): string | null { - if (typeof parentId !== "string") return null; - const value = parentId.trim(); - if (!value) return null; - - if (/^[0-9]+$/.test(value)) { - return value; - } - - if (/^[0-9a-f]+$/i.test(value)) { - const hex = value.length > 16 ? value.slice(-16) : value; - try { - return BigInt(`0x${hex}`).toString(10); - } catch { - return null; - } - } - - return null; -} - -function normalizeTraceIdToDecimal( - traceId: unknown, -): { traceId: string | null; ptidFromTraceId?: string } { - if (typeof traceId !== "string") { - return { traceId: null }; - } - - const value = traceId.trim(); - if (!value) { - return { traceId: null }; - } - - if (/^[0-9]+$/.test(value)) { - return { traceId: value }; - } - - if (/^[0-9a-f]+$/i.test(value)) { - // If a 128-bit hex trace ID was accidentally put here, split it like traceparent: - // lower 64 bits for Datadog trace_id, upper 64 bits for _dd.p.tid. - if (value.length > 16) { - const upperHex = value.slice(-32, -16).padStart(16, "0"); - const lowerHex = value.slice(-16); - try { - return { - traceId: BigInt(`0x${lowerHex}`).toString(10), - ptidFromTraceId: upperHex.toLowerCase(), - }; - } catch { - return { traceId: null }; - } - } - - try { - return { - traceId: BigInt(`0x${value}`).toString(10), - }; - } catch { - return { traceId: null }; - } - } - - return { traceId: null }; -} - /** * Interface for operation data in durable execution state */ @@ -202,23 +114,6 @@ function generateRandomPositiveId(): string { return value === 0n ? "1" : value.toString(10); } -function generateRandomTraceId128(): { traceId: string; ptid: string } { - const bytes = randomBytes(16); - - // Upper 64 bits -> _dd.p.tid - const upperBytes = Buffer.from(bytes.subarray(0, 8)); - const upperValue = bufferToBigInt(upperBytes); - const ptid = (upperValue === 0n ? 1n : upperValue).toString(16).padStart(16, "0"); - - // Lower 64 bits -> Datadog trace_id (decimal) - const lowerBytes = Buffer.from(bytes.subarray(8, 16)); - lowerBytes[0] = lowerBytes[0] & 0x7f; // keep positive int64 - const lowerValue = bufferToBigInt(lowerBytes); - const traceId = lowerValue === 0n ? "1" : lowerValue.toString(10); - - return { traceId, ptid }; -} - function bufferToBigInt(buf: Buffer): bigint { let result = 0n; for (let i = 0; i < buf.length; i++) { @@ -254,14 +149,17 @@ function isTraceCheckpointName(name: unknown): boolean { } /** - * Find the highest-numbered `_datadog_{N}` checkpoint in the event. - * Also supports legacy `_dd_trace_context_{N}` checkpoints for compatibility. - * Each invocation that changes trace context saves a new checkpoint with - * N+1; the one with the highest N is the most recent. + * Find the highest-numbered `_datadog_{N}` checkpoint in the event and return + * its parsed header dict. + * + * Each invocation that changes trace context saves a new checkpoint with N+1; + * the one with the highest N is the most recent. Headers are written by the + * dd-trace-js plugin via `tracer.inject(span, 'http_headers', headers)` so the + * payload is a standard HTTP-style header dict. + * + * Also accepts legacy `_dd_trace_context_{N}` names for compatibility. */ -function findLatestTraceContextCheckpoint( - event: DurableExecutionEvent, -): { number: number; name: string; headers: Record } | null { +function findLatestCheckpointHeaders(event: DurableExecutionEvent): Record | null { const operations = event.InitialExecutionState?.Operations; if (!operations || operations.length === 0) return null; @@ -280,11 +178,7 @@ function findLatestTraceContextCheckpoint( try { const parsed = JSON.parse(raw); if (parsed && typeof parsed === "object") { - return { - number: best.number, - name: String(best.op.Name), - headers: parsed as Record, - }; + return parsed as Record; } } catch (e) { logDebug(`Failed to parse trace checkpoint payload: ${e}`); @@ -293,230 +187,73 @@ function findLatestTraceContextCheckpoint( } /** - * Try to extract a real Datadog trace context from the original customer event - * stored inside the durable execution envelope. - * - * The original event is stored in Operations[0].ExecutionDetails.InputPayload. - * Since all invocations replay the same stored event, any trace headers injected - * by an upstream Datadog-traced service will be present on every invocation. - * - * Returns extracted context info or null. + * Find upstream HTTP headers carried by the original customer event stored in + * `Operations[0].ExecutionDetails.InputPayload`. Returns the standard header + * dict (keys like `x-datadog-trace-id`, `traceparent`, etc.) or null. */ -function extractUpstreamTraceContext( - event: DurableExecutionEvent, -): { traceId: string; parentId: string; samplingPriority: string; ptid: string } | null { +function findUpstreamHeaders(event: DurableExecutionEvent): Record | null { try { const operations = event.InitialExecutionState?.Operations; if (!operations || operations.length === 0) return null; - const firstOp = operations[0]; - const inputPayloadStr = firstOp.ExecutionDetails?.InputPayload; + const inputPayloadStr = operations[0].ExecutionDetails?.InputPayload; if (!inputPayloadStr) return null; const customerEvent = JSON.parse(inputPayloadStr); if (!customerEvent || typeof customerEvent !== "object") return null; - // Try headers (API Gateway, ALB, Function URL) const headers = customerEvent.headers; if (headers && typeof headers === "object") { - const traceId = headers["x-datadog-trace-id"]; - const parentId = headers["x-datadog-parent-id"]; - if (traceId && parentId) { - const samplingPriority = headers["x-datadog-sampling-priority"] || "1"; - const tags = headers["x-datadog-tags"] || ""; - const ptid = parsePtid(tags); - logDebug(`Found upstream trace context in customer event headers`); - return { traceId, parentId, samplingPriority, ptid }; - } + return headers as Record; } - // Try _datadog field (direct invocation / Step Functions) const ddData = customerEvent._datadog; if (ddData && typeof ddData === "object") { - const traceId = ddData["x-datadog-trace-id"]; - const parentId = ddData["x-datadog-parent-id"]; - if (traceId && parentId) { - const samplingPriority = ddData["x-datadog-sampling-priority"] || "1"; - const tags = ddData["x-datadog-tags"] || ""; - const ptid = parsePtid(tags); - logDebug(`Found upstream trace context in customer event _datadog field`); - return { traceId, parentId, samplingPriority, ptid }; - } + return ddData as Record; } } catch (e) { - logDebug(`Failed to extract upstream trace context from durable event: ${e}`); + logDebug(`Failed to read upstream headers from durable input payload: ${e}`); } return null; } -/** - * Parse _dd.p.tid from x-datadog-tags string. - * Format: "_dd.p.tid=66bcb5eb00000000,_dd.p.dm=-0" - */ -function parsePtid(tags: string): string { - if (!tags) return ""; - for (const tag of tags.split(",")) { - if (tag.includes("_dd.p.tid=")) { - return tag.split("=")[1] || ""; - } - } - return ""; -} - /** * Durable Execution Trace Extractor * - * Strategy: - * 1. Prefer `_datadog_{N}` checkpoint context when present. - * 2. Otherwise, derive trace linkage from upstream customer event context. - * 3. If none exists, start a new random trace context. + * Locates trace headers carried inside the durable execution envelope and hands + * them to the standard dd-trace propagator via `TracerWrapper.extract`. Order: + * 1. Latest `_datadog_{N}` checkpoint payload. + * 2. Upstream customer event headers from `InputPayload`. + * 3. Otherwise return null and let the default extraction path take over. */ export class DurableExecutionEventTraceExtractor implements EventTraceExtractor { + constructor(private tracerWrapper: TracerWrapper) {} + extract(event: unknown): SpanContextWrapper | null { if (!isDurableExecutionEvent(event)) { logDebug("Event is not a durable execution event"); return null; } - - const executionArn = event.DurableExecutionArn; - if (!executionArn) { + if (!event.DurableExecutionArn) { logDebug("No DurableExecutionArn in event"); return null; } - // --- Step 0: Prefer a previously-saved trace-context checkpoint --- - // If a previous invocation saved a `_datadog_{N}` checkpoint, use - // the one with the highest N — it reflects the latest trace-context state - // of the ongoing durable execution. Same scheme as dd-trace-py. - const latestCheckpoint = findLatestTraceContextCheckpoint(event); - if (latestCheckpoint) { - logDebug( - `Using trace context from checkpoint ${latestCheckpoint.name}`, - ); - const traceIdStr = latestCheckpoint.headers["x-datadog-trace-id"]; - const parentIdStr = latestCheckpoint.headers["x-datadog-parent-id"]; - const samplingPriorityStr = latestCheckpoint.headers["x-datadog-sampling-priority"] || "1"; - const tagsStr = latestCheckpoint.headers["x-datadog-tags"] || ""; - let ptidFromTags = parsePtid(tagsStr); - let effectiveTraceId = traceIdStr; - let effectiveParentId = parentIdStr; - - if ((!effectiveTraceId || !effectiveParentId) && latestCheckpoint.headers.traceparent) { - const parsedTraceparent = parseTraceparentHex(latestCheckpoint.headers.traceparent); - if (parsedTraceparent) { - effectiveTraceId = effectiveTraceId || parsedTraceparent.lower64TraceIdDec; - effectiveParentId = effectiveParentId || parsedTraceparent.parentIdDec; - ptidFromTags = ptidFromTags || parsedTraceparent.upper64TraceIdHex; - } - } - - const normalizedTraceId = normalizeTraceIdToDecimal(effectiveTraceId); - const normalizedParentId = normalizeParentIdToDecimal(effectiveParentId); - if (!ptidFromTags && normalizedTraceId.ptidFromTraceId) { - ptidFromTags = normalizedTraceId.ptidFromTraceId; - } - - if (normalizedTraceId.traceId && normalizedParentId) { - try { - const _DatadogSpanContext = require("dd-trace/packages/dd-trace/src/opentracing/span_context"); - const id = require("dd-trace/packages/dd-trace/src/id"); - - const ddSpanContext = new _DatadogSpanContext({ - traceId: id(normalizedTraceId.traceId, 10), - spanId: id(normalizedParentId, 10), - sampling: { priority: samplingPriorityStr }, - }); - - if (ptidFromTags) { - ddSpanContext._trace.tags["_dd.p.tid"] = ptidFromTags; - } - return new SpanContextWrapper(ddSpanContext, TraceSource.Event); - } catch (e) { - logDebug(`Failed to construct SpanContext from checkpoint: ${e}`); - const fallback = SpanContextWrapper.fromTraceContext({ - traceId: normalizedTraceId.traceId, - parentId: normalizedParentId, - sampleMode: parseInt(samplingPriorityStr, 10), - source: TraceSource.Event, - }); - if (fallback) { - return fallback; - } - } - } + const checkpointHeaders = findLatestCheckpointHeaders(event); + if (checkpointHeaders) { + logDebug("Extracting trace context from durable checkpoint"); + return this.tracerWrapper.extract(checkpointHeaders); } - // --- Step 1: Try to use real upstream trace context --- - const upstream = extractUpstreamTraceContext(event); - - let traceId: string; - let ptid: string; - const rootSpanId = generateRandomPositiveId(); - let samplingPriority: string; - - if (upstream) { - const normalizedUpstreamTrace = normalizeTraceIdToDecimal(upstream.traceId); - const normalizedTraceId = normalizedUpstreamTrace.traceId; - - if (normalizedTraceId) { - traceId = normalizedTraceId; - ptid = upstream.ptid || normalizedUpstreamTrace.ptidFromTraceId || ""; - samplingPriority = upstream.samplingPriority; - logDebug(`Using upstream trace_id=${traceId}, _dd.p.tid=${ptid}`); - } else { - const randomTrace = generateRandomTraceId128(); - traceId = randomTrace.traceId; - ptid = randomTrace.ptid; - samplingPriority = SampleMode.AUTO_KEEP.toString(); - logDebug(`Upstream trace_id invalid, generated new trace_id=${traceId}, _dd.p.tid=${ptid}`); - } - - } else { - // --- Step 2: No checkpoint and no upstream context --- - // Start a new trace and create a random durable root span id that - // checkpoints will carry across subsequent invocations. - const randomTrace = generateRandomTraceId128(); - traceId = randomTrace.traceId; - ptid = randomTrace.ptid; - samplingPriority = SampleMode.AUTO_KEEP.toString(); - - logDebug(`No upstream context, generated trace_id=${traceId}, root_span_id=${rootSpanId}, _dd.p.tid=${ptid}`); + const upstreamHeaders = findUpstreamHeaders(event); + if (upstreamHeaders) { + logDebug("Extracting trace context from upstream durable input payload"); + return this.tracerWrapper.extract(upstreamHeaders); } - logDebug(`Generated initial durable root context: trace_id=${traceId}, root_span_id=${rootSpanId}, _dd.p.tid=${ptid}`); - - // Construct span context with _dd.p.tid for 128-bit W3C trace ID support - // Similar to Step Functions' approach in step-function-service.ts - try { - const _DatadogSpanContext = require("dd-trace/packages/dd-trace/src/opentracing/span_context"); - const id = require("dd-trace/packages/dd-trace/src/id"); - - const ddSpanContext = new _DatadogSpanContext({ - traceId: id(traceId, 10), - spanId: id(rootSpanId, 10), - sampling: { priority: samplingPriority }, - }); - - // Set _dd.p.tid for upper 64 bits of 128-bit trace ID - if (ptid) { - ddSpanContext._trace.tags["_dd.p.tid"] = ptid; - } - - return new SpanContextWrapper(ddSpanContext, TraceSource.Event); - } catch (error) { - if (error instanceof Error) { - logDebug("Couldn't generate SpanContext with tracer, falling back.", error); - } - } - - // Fallback without _dd.p.tid if dd-trace is not available - return SpanContextWrapper.fromTraceContext({ - traceId, - parentId: rootSpanId, - sampleMode: parseInt(samplingPriority, 10), - source: TraceSource.Event, - }); + logDebug("No durable trace context found; deferring to default extraction"); + return null; } } @@ -680,14 +417,15 @@ export function createDurableExecutionRootSpan( logDebug(`Failed to set durable root span_id: ${e}`); } - // Fix parent_id: the active context has span_id=root_span_id (set by - // DurableExecutionEventTraceExtractor.extract), so tracer.startSpan() - // inherits that as parent_id, causing self-parenting. The root span's - // parent should be the upstream caller (if extracted) or 0 (true root). + // Fix parent_id: when an extracted span context exists, tracer.startSpan() + // inherits its span_id as parent_id and we just overwrote our own span_id + // to match — that would self-parent. The root span's parent should be the + // upstream caller (if any) or 0 (true root). try { - const upstream = extractUpstreamTraceContext(event as DurableExecutionEvent); - if (upstream) { - span.context()._parentId = id(upstream.parentId, 10); + const upstreamHeaders = findUpstreamHeaders(event as DurableExecutionEvent); + const upstreamParentId = upstreamHeaders?.["x-datadog-parent-id"]; + if (upstreamParentId) { + span.context()._parentId = id(String(upstreamParentId), 10); } else { span.context()._parentId = id("0", 10); } diff --git a/src/trace/listener.ts b/src/trace/listener.ts index d3e8d8136..07ef4faed 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -318,7 +318,6 @@ export class TraceListener { } // Finish the durable execution root span after all other spans. - // Re-emitted every invocation; the last one wins in the backend. if (this.durableRootSpan) { this.durableRootSpan.finish(); this.durableRootSpan = undefined; From 0d5ab4ade5d259e1dce8b7d1b56c0a66b8f9c91e Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Thu, 30 Apr 2026 23:32:27 -0400 Subject: [PATCH 6/7] clean up AI complicated stuff... --- .../context/extractors/durable-execution.ts | 64 +++---------------- src/trace/listener.ts | 9 ++- 2 files changed, 15 insertions(+), 58 deletions(-) diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index 9865eb0df..39ac2b23c 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -12,7 +12,6 @@ * so we just hand the resulting header dict back to `tracer.extract` here. */ -import { randomBytes } from "crypto"; import { logDebug } from "../../../utils"; import { SpanContextWrapper } from "../../span-context-wrapper"; import { TracerWrapper } from "../../tracer-wrapper"; @@ -107,21 +106,6 @@ export function getCheckpointToken(event: unknown): string | undefined { return event.CheckpointToken; } -function generateRandomPositiveId(): string { - const bytes = randomBytes(8); - bytes[0] = bytes[0] & 0x7f; // keep it positive int64 - const value = bufferToBigInt(bytes); - return value === 0n ? "1" : value.toString(10); -} - -function bufferToBigInt(buf: Buffer): bigint { - let result = 0n; - for (let i = 0; i < buf.length; i++) { - result = (result << 8n) | BigInt(buf[i]); - } - return result; -} - // Terminal operation statuses that indicate an operation has completed const TERMINAL_STATUSES = new Set(["SUCCEEDED", "FAILED", "CANCELLED", "STOPPED", "TIMED_OUT"]); @@ -362,25 +346,21 @@ export function createDurableExecutionRootSpan( return null; } - const rootSpanId = extractedRootContext?.toSpanId() || generateRandomPositiveId(); - - // Determine consistent start_time from the first operation's StartTimestamp - // StartTimestamp is unix milliseconds from the durable execution SDK + // Use the first operation's StartTimestamp (unix milliseconds) so the root + // span's start time matches the actual start of the durable execution. let startTime: number | undefined; - const replayOperations = event.InitialExecutionState?.Operations; - if (replayOperations && replayOperations.length > 0) { - const firstStartTs = replayOperations[0].StartTimestamp; + if (operations && operations.length > 0) { + const firstStartTs = operations[0].StartTimestamp; if (firstStartTs != null) { const parsed = Number(firstStartTs); if (!isNaN(parsed)) { - startTime = parsed; // already in millis, dd-trace startSpan expects millis + startTime = parsed; } } } try { const tracer = require("dd-trace"); - const id = require("dd-trace/packages/dd-trace/src/id"); const serviceName = process.env.DD_DURABLE_EXECUTION_SERVICE || "aws.durable-execution"; const resourceName = executionArn.includes(":") ? executionArn.split(":").pop() : executionArn; @@ -392,7 +372,7 @@ export function createDurableExecutionRootSpan( "resource.name": resourceName, "durable.execution_arn": executionArn, "durable.is_root_span": true, - "durable.invocation_count": replayOperations?.length ?? 0, + "durable.invocation_count": operations?.length ?? 0, }, }; @@ -400,40 +380,14 @@ export function createDurableExecutionRootSpan( spanOptions.startTime = startTime; } if (extractedRootContext?.spanContext) { - // Ensure the durable root span stays in the same trace as the extracted - // durable invocation context even when there is no active scope. + // Stay in the same trace as the upstream caller even when there is no + // active scope yet. aws.lambda will be parented to this span downstream. spanOptions.childOf = extractedRootContext.spanContext; } const span = tracer.startSpan("aws.durable-execution", spanOptions); - // Use the extracted durable root span_id when available to keep the - // durable root identity stable with propagated checkpoint context. - try { - if (rootSpanId) { - span.context()._spanId = id(rootSpanId, 10); - } - } catch (e) { - logDebug(`Failed to set durable root span_id: ${e}`); - } - - // Fix parent_id: when an extracted span context exists, tracer.startSpan() - // inherits its span_id as parent_id and we just overwrote our own span_id - // to match — that would self-parent. The root span's parent should be the - // upstream caller (if any) or 0 (true root). - try { - const upstreamHeaders = findUpstreamHeaders(event as DurableExecutionEvent); - const upstreamParentId = upstreamHeaders?.["x-datadog-parent-id"]; - if (upstreamParentId) { - span.context()._parentId = id(String(upstreamParentId), 10); - } else { - span.context()._parentId = id("0", 10); - } - } catch (e) { - logDebug(`Failed to set root span parent_id: ${e}`); - } - - logDebug(`Created root execution span: span_id=${rootSpanId ?? "auto"}, start_time=${startTime}`); + logDebug(`Created root execution span: start_time=${startTime}`); return { span, diff --git a/src/trace/listener.ts b/src/trace/listener.ts index 07ef4faed..2c526532e 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -148,19 +148,22 @@ export class TraceListener { // Create the durable execution root span before everything else so later // spans can parent correctly. Root creation is gated in // createDurableExecutionRootSpan() and only happens for likely first - // invocations; replay invocations return null. + // invocations; replay invocations return null. The root span inherits + // trace_id and sampling from spanContextWrapper, so anything parented to + // it (aws.lambda below) joins the same trace automatically. this.durableRootSpan = createDurableExecutionRootSpan(event, spanContextWrapper) ?? undefined; + const durableRootSpanContext = this.durableRootSpan?.span?.context(); if (this.config.createInferredSpan) { this.inferredSpan = this.inferrer.createInferredSpan( event, context, - parentSpanContext, + durableRootSpanContext || parentSpanContext, this.config.encodeAuthorizerContext, ); } - this.lambdaSpanParentContext = this.inferredSpan?.span || parentSpanContext; + this.lambdaSpanParentContext = this.inferredSpan?.span || durableRootSpanContext || parentSpanContext; this.context = context; const eventSource = parseEventSource(event); this.triggerTags = extractTriggerTags(event, context, eventSource); From c625b2412be414edde2b22e386c99abb8577f935 Mon Sep 17 00:00:00 2001 From: Joey Zhao <5253430+joeyzhao2018@users.noreply.github.com> Date: Fri, 1 May 2026 14:00:50 -0400 Subject: [PATCH 7/7] rearrange and enhancements --- .../extractors/durable-execution.spec.ts | 12 +++++----- .../context/extractors/durable-execution.ts | 23 +++++++------------ src/trace/listener.ts | 21 ++--------------- 3 files changed, 16 insertions(+), 40 deletions(-) diff --git a/src/trace/context/extractors/durable-execution.spec.ts b/src/trace/context/extractors/durable-execution.spec.ts index b2353795b..fd8e46fc2 100644 --- a/src/trace/context/extractors/durable-execution.spec.ts +++ b/src/trace/context/extractors/durable-execution.spec.ts @@ -116,12 +116,12 @@ describe("DurableExecutionEventTraceExtractor", () => { CheckpointToken: "t-replay", InitialExecutionState: { Operations: [ - { - Id: "op-1", - Name: "_dd_trace_context_0", - Status: "SUCCEEDED", - StepDetails: { - Result: JSON.stringify({ + { + Id: "op-1", + Name: "_datadog_0", + Status: "SUCCEEDED", + StepDetails: { + Result: JSON.stringify({ "x-datadog-trace-id": "149750110124521191", "x-datadog-parent-id": "538591322263933970", "x-datadog-sampling-priority": "1", diff --git a/src/trace/context/extractors/durable-execution.ts b/src/trace/context/extractors/durable-execution.ts index 39ac2b23c..5fb6c2a55 100644 --- a/src/trace/context/extractors/durable-execution.ts +++ b/src/trace/context/extractors/durable-execution.ts @@ -110,19 +110,12 @@ export function getCheckpointToken(event: unknown): string | undefined { const TERMINAL_STATUSES = new Set(["SUCCEEDED", "FAILED", "CANCELLED", "STOPPED", "TIMED_OUT"]); const TRACE_CHECKPOINT_NAME_PREFIX = "_datadog_"; -const LEGACY_TRACE_CHECKPOINT_NAME_PREFIX = "_dd_trace_context_"; -const TRACE_CHECKPOINT_NAME_PREFIXES = [ - TRACE_CHECKPOINT_NAME_PREFIX, - LEGACY_TRACE_CHECKPOINT_NAME_PREFIX, -]; function parseTraceCheckpointNumber(name: unknown): number | null { if (typeof name !== "string") return null; - const prefix = TRACE_CHECKPOINT_NAME_PREFIXES.find((candidate) => name.startsWith(candidate)); - if (!prefix) return null; - - const suffix = name.slice(prefix.length); + if (!name.startsWith(TRACE_CHECKPOINT_NAME_PREFIX)) return null; + const suffix = name.slice(TRACE_CHECKPOINT_NAME_PREFIX.length); const n = Number.parseInt(suffix, 10); if (Number.isNaN(n) || String(n) !== suffix) return null; return n; @@ -141,7 +134,6 @@ function isTraceCheckpointName(name: unknown): boolean { * dd-trace-js plugin via `tracer.inject(span, 'http_headers', headers)` so the * payload is a standard HTTP-style header dict. * - * Also accepts legacy `_dd_trace_context_{N}` names for compatibility. */ function findLatestCheckpointHeaders(event: DurableExecutionEvent): Record | null { const operations = event.InitialExecutionState?.Operations; @@ -319,12 +311,14 @@ export function getCompletedOperationCount(event: unknown): number { * - no operation has terminal status * - operation count is <= 1 * + * The created span is parented to the current aws.lambda span context. + * * Returns an object with { span, finish() } or null if not a durable execution. * Caller must call finish() when the invocation ends. */ export function createDurableExecutionRootSpan( event: unknown, - extractedRootContext?: SpanContextWrapper | null, + parentSpanContext?: unknown, ): { span: any; finish: () => void } | null { if (!isDurableExecutionEvent(event)) { return null; @@ -379,10 +373,9 @@ export function createDurableExecutionRootSpan( if (startTime !== undefined) { spanOptions.startTime = startTime; } - if (extractedRootContext?.spanContext) { - // Stay in the same trace as the upstream caller even when there is no - // active scope yet. aws.lambda will be parented to this span downstream. - spanOptions.childOf = extractedRootContext.spanContext; + if (parentSpanContext) { + // Root span is modeled as a child of aws.lambda. + spanOptions.childOf = parentSpanContext; } const span = tracer.startSpan("aws.durable-execution", spanOptions); diff --git a/src/trace/listener.ts b/src/trace/listener.ts index 2c526532e..f068c5352 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -27,7 +27,6 @@ import { } from "./durable-function-context"; import { XrayService } from "./xray-service"; import { AUTHORIZING_REQUEST_ID_HEADER } from "./context/extractors/http"; -import { createDurableExecutionRootSpan } from "./context/extractors/durable-execution"; import { getSpanPointerAttributes, SpanPointerAttributes } from "../utils/span-pointers"; export type TraceExtractor = (event: any, context: Context) => Promise | TraceContext; @@ -103,7 +102,6 @@ export class TraceListener { private wrappedCurrentSpan?: SpanWrapper; private triggerTags?: { [key: string]: string }; private lambdaSpanParentContext?: SpanContext; - private durableRootSpan?: { span: any; finish: () => void }; private spanPointerAttributesList: SpanPointerAttributes[] | undefined; public get currentTraceHeaders() { @@ -145,25 +143,16 @@ export class TraceListener { traceSource: this.contextService.traceSource, }); } - // Create the durable execution root span before everything else so later - // spans can parent correctly. Root creation is gated in - // createDurableExecutionRootSpan() and only happens for likely first - // invocations; replay invocations return null. The root span inherits - // trace_id and sampling from spanContextWrapper, so anything parented to - // it (aws.lambda below) joins the same trace automatically. - this.durableRootSpan = createDurableExecutionRootSpan(event, spanContextWrapper) ?? undefined; - const durableRootSpanContext = this.durableRootSpan?.span?.context(); - if (this.config.createInferredSpan) { this.inferredSpan = this.inferrer.createInferredSpan( event, context, - durableRootSpanContext || parentSpanContext, + parentSpanContext, this.config.encodeAuthorizerContext, ); } - this.lambdaSpanParentContext = this.inferredSpan?.span || durableRootSpanContext || parentSpanContext; + this.lambdaSpanParentContext = this.inferredSpan?.span || parentSpanContext; this.context = context; const eventSource = parseEventSource(event); this.triggerTags = extractTriggerTags(event, context, eventSource); @@ -320,12 +309,6 @@ export class TraceListener { this.injectAuthorizerSpan(result, event?.requestContext?.requestId, finishTime || Date.now()); } - // Finish the durable execution root span after all other spans. - if (this.durableRootSpan) { - this.durableRootSpan.finish(); - this.durableRootSpan = undefined; - } - // Reset singletons and trace context this.stepFunctionContext = undefined; this.durableFunctionContext = undefined;