diff --git a/agent/src/prompt_builder.py b/agent/src/prompt_builder.py index 350debda..6d1489aa 100644 --- a/agent/src/prompt_builder.py +++ b/agent/src/prompt_builder.py @@ -159,12 +159,15 @@ def _channel_prompt_addendum(config: TaskConfig) -> str: "transition the issue state to `In Review` (fall back to `In Progress` " "if that state doesn't exist). If neither exists, skip the state " "transition — the PR comment alone is enough. Do not invent state " - "names or loop on `list_issue_statuses`.\n" - "3. **On completion or failure** — call `mcp__linear-server__save_comment` " - "with the final status (succeeded / failed + short reason).\n\n" - "Keep comments concise. Do not mirror the full agent transcript back to " - "Linear. Even small tasks must post all three updates — users rely on " - "them to track progress." + "names or loop on `list_issue_statuses`.\n\n" + "**Do NOT post a final 'task completed' or 'task failed' comment.** " + "The platform fan-out plane (issue #239) posts a structured " + "✅/⚠️/❌ summary on terminal events with cost / turns / duration / " + "PR-link metrics that you don't have visibility into. A redundant " + "agent-side completion comment would just stack two near-identical " + "comments on the issue.\n\n" + "Keep the start + PR-opened comments concise. Do not mirror the full " + "agent transcript back to Linear." ) diff --git a/cdk/src/constructs/fanout-consumer.ts b/cdk/src/constructs/fanout-consumer.ts index 0a2ddc44..dd3dc667 100644 --- a/cdk/src/constructs/fanout-consumer.ts +++ b/cdk/src/constructs/fanout-consumer.ts @@ -76,6 +76,23 @@ export interface FanOutConsumerProps { */ readonly slackSecretArnPattern?: string; + /** + * LinearWorkspaceRegistryTable — the Linear dispatcher reads this to + * resolve per-workspace OAuth tokens at comment-post time. Optional: + * when omitted, the dispatcher logs and skips so a deployment without + * Linear onboarding doesn't accumulate dangling IAM grants. + */ + readonly linearWorkspaceRegistryTable?: dynamodb.ITable; + + /** + * Secrets Manager ARN-prefix pattern for per-workspace Linear OAuth + * bundles. Mirrors ``slackSecretArnPattern`` shape — typically + * ``bgagent-linear-oauth-*``. Required when ``linearWorkspaceRegistryTable`` + * is set; without it the dispatcher would resolve the registry row but + * fail at the SM GetSecretValue call. + */ + readonly linearOauthSecretArnPattern?: string; + /** * Maximum batch size delivered to the Lambda per invocation. * @@ -173,6 +190,30 @@ export class FanOutConsumer extends Construct { })); } + // Linear dispatcher plumbing. Same guarded shape as Slack/GitHub: + // a deployment without Linear onboarding gets no IAM grants and + // the dispatcher logs-and-skips on missing env. The registry table + // tells us per-workspace OAuth-secret ARN; the secret holds the + // access token that ``postIssueComment`` uses to drive + // ``commentCreate`` GraphQL. + if (props.linearWorkspaceRegistryTable) { + props.linearWorkspaceRegistryTable.grantReadData(this.fn); + this.fn.addEnvironment( + 'LINEAR_WORKSPACE_REGISTRY_TABLE_NAME', + props.linearWorkspaceRegistryTable.tableName, + ); + } + if (props.linearOauthSecretArnPattern) { + this.fn.addToRolePolicy(new iam.PolicyStatement({ + // GetSecretValue + PutSecretValue: the resolver may rotate the + // OAuth token (writes the refreshed bundle back to SM) — same + // grants the LinearIntegration's webhook-processor Lambda holds + // for the same reason. + actions: ['secretsmanager:GetSecretValue', 'secretsmanager:PutSecretValue'], + resources: [props.linearOauthSecretArnPattern], + })); + } + this.fn.addEventSource(new DynamoEventSource(props.taskEventsTable, { startingPosition: StartingPosition.LATEST, batchSize: props.batchSize ?? 100, diff --git a/cdk/src/handlers/fanout-task-events.ts b/cdk/src/handlers/fanout-task-events.ts index e2159b55..78bf0d84 100644 --- a/cdk/src/handlers/fanout-task-events.ts +++ b/cdk/src/handlers/fanout-task-events.ts @@ -46,7 +46,9 @@ import type { DynamoDBStreamEvent, } from 'aws-lambda'; import { clearTokenCache, resolveGitHubToken } from './shared/context-hydration'; +import { classifyError } from './shared/error-classifier'; import { renderCommentBody, upsertTaskComment } from './shared/github-comment'; +import { postIssueComment } from './shared/linear-feedback'; import { logger } from './shared/logger'; import { coerceNumericOrNull } from './shared/numeric'; import { loadRepoConfig } from './shared/repo-config'; @@ -105,7 +107,7 @@ const APPROVAL_NOTIFICATION_EVENTS = [ * - Per-user rate limit of 10 approval-related messages per minute * is enforced in the dispatcher, not in this filter. */ -export type NotificationChannel = 'slack' | 'email' | 'github'; +export type NotificationChannel = 'slack' | 'email' | 'github' | 'linear'; export const CHANNEL_DEFAULTS: Record> = { // Slack is the "on-call" channel per §6.2 — all terminal outcomes @@ -155,6 +157,21 @@ export const CHANNEL_DEFAULTS: Record> ...TERMINAL_EVENT_TYPES, 'pr_created', ]), + // Linear posts a single deterministic final-status comment on + // terminal events. The agent's three-comment prompt contract (start / + // PR-opened / completion) covers in-flight progress; this dispatcher + // only fires once the task reaches a terminal state, with cost / + // turns / duration / pr_url metrics the requester wouldn't otherwise + // see. Crucially, this fires even when the agent crashes (e.g. + // error_max_turns, OOM) before reaching its own step-3 completion + // comment — the GH issue #239 motivating example. + // + // Linear's `save_comment` doesn't support edit, so this is post-once + // (no live updates a la GitHub edit-in-place). Approvals / milestones + // are excluded for the same reason — N comments rather than 1. + linear: new Set([ + ...TERMINAL_EVENT_TYPES, + ]), }; /** @@ -753,6 +770,227 @@ async function dispatchToEmail(event: FanOutEvent): Promise { }); } +/** + * Render the Linear final-status comment body. Inputs are already + * coerced to native types by the caller; this function only formats. + * + * The framing flips between three outcomes based on `(eventType, prUrl)`: + * + * 1. ``task_completed`` → ✅ "Task completed" + * 2. any non-completed terminal event WITH PR → ⚠️ "Shipped a PR but stopped early" + * (the motivating ABCA-91 case is max-turns-with-PR, but the same + * framing applies to any terminal failure — budget cap, agent + * crash, etc. — that managed to ship a PR before stopping) + * 3. any non-completed terminal event NO PR → ❌ "Task " + classifier title + * + * The ⚠️ frame appends the classifier title when one is available so the + * requester sees both outcomes (the PR shipped, AND the reason it + * stopped — "Hit max-turns cap" for ABCA-91). + * + * Cost / turns / duration appear as a subtitle line. Missing values + * (e.g. failure before the agent emitted any tokens) render as `—`. + */ +export function renderLinearFinalStatusComment(args: { + eventType: string; + prUrl: string | null; + costUsd: number | null; + turns: number | null; + maxTurns: number | null; + durationS: number | null; + taskId: string; + errorTitle: string | null; +}): string { + const isCompleted = args.eventType === 'task_completed'; + const shippedDespiteFailure = !isCompleted && args.prUrl != null; + + let header: string; + if (isCompleted) { + header = '✅ **Task completed**'; + } else if (shippedDespiteFailure) { + // Append the classifier title (when known) so the requester sees + // *why* the agent stopped, not just that it shipped a PR. For + // ABCA-91 this renders "...stopped early — Hit max-turns cap". + const reason = args.errorTitle ? ` — ${args.errorTitle}` : ''; + header = `⚠️ **Shipped a PR but stopped early${reason}** — review and decide if more work is needed`; + } else { + const reason = args.errorTitle ? `: ${args.errorTitle}` : ''; + header = `❌ **Task ${args.eventType.replace(/^task_/, '')}${reason}**`; + } + + const costStr = args.costUsd != null ? `$${args.costUsd.toFixed(2)}` : '—'; + const turnsStr = args.turns != null + ? `${args.turns}${args.maxTurns != null ? ` / ${args.maxTurns}` : ''}` + : '—'; + const durationStr = args.durationS != null + ? formatDuration(args.durationS) + : '—'; + + const lines: string[] = [ + header, + '', + `cost: ${costStr} • turns: ${turnsStr} • duration: ${durationStr}`, + ]; + // Render the PR URL only on the ⚠️ "shipped a PR but stopped early" + // path — that's the case where the agent's own step-2 "PR opened" + // comment is *not* guaranteed to have fired (the agent may have + // crashed between opening the PR and posting the comment, e.g. + // ABCA-91 hitting max-turns on turn 101). On the ✅ success path the + // agent's step-2 comment reliably carries the PR link, so duplicating + // it here is just noise. + if (args.prUrl && shippedDespiteFailure) { + lines.push('', `PR: ${args.prUrl}`); + } + lines.push('', `_task ${args.taskId}_`); + return lines.join('\n'); +} + +function formatDuration(seconds: number): string { + const total = Math.round(seconds); + if (total < 60) return `${total}s`; + const m = Math.floor(total / 60); + const s = total % 60; + return s === 0 ? `${m}m` : `${m}m ${s}s`; +} + +/** + * Linear dispatcher — posts a deterministic final-status comment when a + * Linear-origin task reaches a terminal event. Mirrors Slack's structural + * shape (channel_source gate, best-effort, single error-isolation point): + * + * 1. Load TaskRecord. Skip if missing (TTL eviction race). + * 2. Gate on ``channel_source === 'linear'`` so non-Linear tasks + * short-circuit after one DDB Get. + * 3. Read ``linear_issue_id`` + ``linear_workspace_id`` from + * ``channel_metadata``. Skip if either is missing — defensive, + * shouldn't happen for properly-admitted Linear tasks. + * 4. Render the comment + post via the existing ``postIssueComment`` + * helper, which itself swallows network/auth errors and returns + * false rather than throwing. + * + * Failure handling: ``postIssueComment`` is best-effort — a Linear API + * outage logs and returns false rather than throwing. We reflect that + * outcome in the dispatcher log but never reject the dispatcher + * promise: a failed Linear comment shouldn't trigger ``routeEvent``'s + * batch-retry path because retrying won't fix Linear's API. + */ +async function dispatchToLinear(event: FanOutEvent): Promise { + const registryTableName = process.env.LINEAR_WORKSPACE_REGISTRY_TABLE_NAME; + if (!registryTableName) { + // WARN with error_id so this is alarmable. The Linear comment is + // the *only* completion signal for the agent-crash case (#239), so a + // misconfigured env var would silently drop every Linear-origin + // task's metrics — exactly the gap this dispatcher was built to + // close. The GitHub dispatcher uses the same WARN+error_id pattern + // for its missing-env path. + logger.warn('[fanout/linear] LINEAR_WORKSPACE_REGISTRY_TABLE_NAME not set — skipping', { + event: 'fanout.linear.missing_env', + error_id: 'FANOUT_LINEAR_MISSING_ENV', + task_id: event.task_id, + }); + return; + } + + const task = await loadTaskForComment(event.task_id); + if (!task) { + logger.warn('[fanout/linear] task not found — skipping comment', { + event: 'fanout.linear.task_missing', + task_id: event.task_id, + }); + return; + } + + // channel_source gate — short-circuit non-Linear tasks. Same shape + // Slack uses to keep the GitHub edit-in-place comment from racing + // against the platform-side Linear comment when channel_source is + // 'github'/'slack'/'api'. + if (task.channel_source !== 'linear') { + return; + } + + const issueId = task.channel_metadata?.linear_issue_id; + const workspaceId = task.channel_metadata?.linear_workspace_id; + if (!issueId || !workspaceId) { + logger.warn('[fanout/linear] task missing linear_issue_id or linear_workspace_id — skipping', { + event: 'fanout.linear.metadata_missing', + task_id: event.task_id, + has_issue_id: Boolean(issueId), + has_workspace_id: Boolean(workspaceId), + }); + return; + } + + // Derive an error title from `error_message` via the shared classifier. + // Same data the API surfaces as `error_classification.title` — + // "Hit max-turns cap", "Insufficient GitHub permissions", etc. + // + // Returns null only when error_message is empty/undefined (the + // task_completed case). For any non-empty error_message that doesn't + // match a known pattern, returns the UNKNOWN_CLASSIFICATION fallback + // ("Unexpected error") — so a generic failure still gets a structured + // title rather than nothing. See error-classifier.ts. + const classification = classifyError(task.error_message); + + const body = renderLinearFinalStatusComment({ + eventType: event.event_type, + prUrl: task.pr_url ?? null, + // DDB returns numeric attributes as strings at the Document-client + // boundary; coerce so toFixed/comparisons work. Same pattern the + // GitHub dispatcher uses. + costUsd: coerceNumericOrNull( + task.cost_usd, + { field: 'cost_usd', task_id: task.task_id, event_id: event.event_id }, + logger, + ), + turns: coerceNumericOrNull( + task.turns_attempted, + { field: 'turns_attempted', task_id: task.task_id, event_id: event.event_id }, + logger, + ), + maxTurns: coerceNumericOrNull( + task.max_turns, + { field: 'max_turns', task_id: task.task_id, event_id: event.event_id }, + logger, + ), + durationS: coerceNumericOrNull( + task.duration_s, + { field: 'duration_s', task_id: task.task_id, event_id: event.event_id }, + logger, + ), + taskId: task.task_id, + errorTitle: classification?.title ?? null, + }); + + const ok = await postIssueComment( + { linearWorkspaceId: workspaceId, registryTableName }, + issueId, + body, + ); + + // Split the success / failure path so post-failure can be alarmed + // distinctly. The underlying linear-feedback.ts path already WARNs + // on the specific failure reason (auth, network, etc.); this + // backstop ensures a steady drip of post-failures shows up in the + // dispatcher's own log channel for cross-channel alarms. + if (ok) { + logger.info('[fanout/linear] comment dispatched', { + event: 'fanout.linear.dispatched', + task_id: task.task_id, + issue_id: issueId, + event_type: event.event_type, + posted: true, + }); + } else { + logger.warn('[fanout/linear] postIssueComment returned false — Linear API path failed', { + event: 'fanout.linear.post_failed', + error_id: 'FANOUT_LINEAR_POST_FAILED', + task_id: task.task_id, + issue_id: issueId, + event_type: event.event_type, + posted: false, + }); + } +} + /** Exposed for testing: the per-channel dispatcher callable by the * handler. Each key's absence from the routing map disables its * dispatcher; the signature is uniform so adding a channel is one @@ -760,6 +998,7 @@ async function dispatchToEmail(event: FanOutEvent): Promise { const DISPATCHERS: Record Promise> = { slack: dispatchToSlack, github: dispatchToGitHubComment, + linear: dispatchToLinear, email: dispatchToEmail, }; @@ -814,8 +1053,9 @@ export async function routeEvent( attempted.push(ch); tasks.push(DISPATCHERS[ch](ev)); } - // Parallelism is bounded by the dispatcher list (at most 3 channels), - // not by program input, so the unbounded-parallelism lint does not apply. + // Parallelism is bounded by the dispatcher list (4 channels: + // slack/github/linear/email), not by program input, so the + // unbounded-parallelism lint does not apply. const results = await Promise.allSettled(tasks); diff --git a/cdk/src/handlers/shared/types.ts b/cdk/src/handlers/shared/types.ts index 50db2a6e..2aeab0a1 100644 --- a/cdk/src/handlers/shared/types.ts +++ b/cdk/src/handlers/shared/types.ts @@ -219,6 +219,7 @@ export interface TaskNotificationsConfig { readonly slack?: ChannelConfig; readonly email?: ChannelConfig; readonly github?: ChannelConfig; + readonly linear?: ChannelConfig; } /** diff --git a/cdk/src/stacks/agent.ts b/cdk/src/stacks/agent.ts index e2718cdb..750c178f 100644 --- a/cdk/src/stacks/agent.ts +++ b/cdk/src/stacks/agent.ts @@ -651,28 +651,8 @@ export class AgentStack extends Stack { attachmentsBucket: attachmentsBucket.bucket, }); - // --- Fan-out plane consumer --- - // Consumes TaskEventsTable DynamoDB Streams and dispatches events to - // Slack / GitHub / email per per-channel default filters. GitHub - // dispatcher edits a single issue comment in place; Slack - // dispatcher (issue #64) reads per-workspace bot tokens from - // ``bgagent/slack/*``. Email remains a log-only stub until Phase 2. - new FanOutConsumer(this, 'FanOutConsumer', { - taskEventsTable: taskEventsTable.table, - taskTable: taskTable.table, - repoTable: repoTable.table, - githubTokenSecret, - // Slack bot-token grant is guarded on this prop — pass the - // ``bgagent/slack/*`` prefix so the FanOutConsumer can read - // workspace tokens. Same scope SlackIntegration uses for its - // own writers (PR #79 review #2). - slackSecretArnPattern: Stack.of(this).formatArn({ - service: 'secretsmanager', - resource: 'secret', - resourceName: 'bgagent/slack/*', - arnFormat: ArnFormat.COLON_RESOURCE_NAME, - }), - }); + // FanOutConsumer is constructed below LinearIntegration so the + // Linear dispatcher can receive ``linearIntegration.workspaceRegistryTable``. // --- Cedar HITL approval metrics publisher (Chunk 8, §11.3 / IMPL-28) --- // Consumer #2 of the TaskEventsTable stream (FanOutConsumer is #1). @@ -838,6 +818,42 @@ export class AgentStack extends Stack { description: 'Name of the DynamoDB Linear workspace registry — `bgagent linear setup` writes a row per OAuth-installed workspace', }); + // --- Fan-out plane consumer --- + // Consumes TaskEventsTable DynamoDB Streams and dispatches events to + // Slack / GitHub / Linear / email per per-channel default filters. + // GitHub dispatcher edits a single issue comment in place; Slack + // dispatcher (issue #64) reads per-workspace bot tokens from + // ``bgagent/slack/*``; Linear dispatcher (issue #239) posts a single + // deterministic final-status comment with cost/turns/duration. + // Email remains a log-only stub until SES wires. + new FanOutConsumer(this, 'FanOutConsumer', { + taskEventsTable: taskEventsTable.table, + taskTable: taskTable.table, + repoTable: repoTable.table, + githubTokenSecret, + // Slack bot-token grant is guarded on this prop — pass the + // ``bgagent/slack/*`` prefix so the FanOutConsumer can read + // workspace tokens. Same scope SlackIntegration uses for its + // own writers (PR #79 review #2). + slackSecretArnPattern: Stack.of(this).formatArn({ + service: 'secretsmanager', + resource: 'secret', + resourceName: 'bgagent/slack/*', + arnFormat: ArnFormat.COLON_RESOURCE_NAME, + }), + // Linear dispatcher reads workspace registry rows + per-workspace + // OAuth-secret JSON. Same scope `bgagent-linear-oauth-*` as the + // orchestrator and webhook processor — Lambdas in this stack share + // the rotated-token write path; the agent runtime gets read-only. + linearWorkspaceRegistryTable: linearIntegration.workspaceRegistryTable, + linearOauthSecretArnPattern: Stack.of(this).formatArn({ + service: 'secretsmanager', + resource: 'secret', + resourceName: 'bgagent-linear-oauth-*', + arnFormat: ArnFormat.COLON_RESOURCE_NAME, + }), + }); + // --- GitHub deployment-status → screenshot pipeline --- // Listens for GitHub deployment_status events from any provider // (Vercel, Amplify Hosting, Netlify, GitHub Actions custom CD), diff --git a/cdk/test/handlers/fanout-task-events.test.ts b/cdk/test/handlers/fanout-task-events.test.ts index 7f6f438e..deecf066 100644 --- a/cdk/test/handlers/fanout-task-events.test.ts +++ b/cdk/test/handlers/fanout-task-events.test.ts @@ -93,12 +93,28 @@ jest.mock('../../src/handlers/slack-notify', () => { }; }); +// Linear dispatcher posts via the existing `postIssueComment` helper +// in `linear-feedback.ts` (#239). Mock it here so dispatcher tests +// observe the call shape without exercising the real OAuth-resolver +// + GraphQL path. Default ``true`` so a test that forgets to script +// the mock still drives the happy path. +const mockPostIssueComment: jest.Mock = jest.fn().mockResolvedValue(true); +jest.mock('../../src/handlers/shared/linear-feedback', () => ({ + postIssueComment: ( + ctx: { linearWorkspaceId: string; registryTableName: string }, + issueId: string, + body: string, + ) => mockPostIssueComment(ctx, issueId, body), +})); + process.env.TASK_TABLE_NAME = 'Tasks'; process.env.GITHUB_TOKEN_SECRET_ARN = 'arn:aws:secretsmanager:us-east-1:0:secret:platform'; +process.env.LINEAR_WORKSPACE_REGISTRY_TABLE_NAME = 'LinearWorkspaceRegistry'; import { CHANNEL_DEFAULTS, parseStreamRecord, + renderLinearFinalStatusComment, resolveChannelFilter, routeEvent, shouldFanOut, @@ -358,23 +374,32 @@ describe('fanout-task-events: routeEvent (per-channel dispatch)', () => { timestamp: '2026-04-22T04:00:00Z', }); - test('task_completed routes to all three channels', async () => { + test('task_completed routes to all four channels (slack, github, linear, email)', async () => { + // Linear joined the dispatcher list in #239: terminal-events fan + // out to a deterministic platform-side comment for Linear-origin + // tasks. The dispatcher itself short-circuits on + // ``channel_source !== 'linear'`` so non-Linear tasks see no + // observable effect, but the routing layer still counts it as + // dispatched (the same way Slack's channel_source gate doesn't + // remove it from the dispatched list for non-Slack tasks). const outcome = await routeEvent(mk('task_completed')); - expect([...outcome.dispatched].sort()).toEqual(['email', 'github', 'slack']); + expect([...outcome.dispatched].sort()).toEqual(['email', 'github', 'linear', 'slack']); expect(outcome.infraRejections).toEqual([]); }); - test('task_cancelled skips Email per §6.2 (only Slack + GitHub)', async () => { + test('task_cancelled skips Email per §6.2 (Slack + GitHub + Linear)', async () => { // Regression guard against accidentally folding cancelled+stranded // into Email via a shared TERMINAL spread — design says Email is // minimal (task_completed, task_failed, approval_required only). + // Linear joined the terminal-event default in #239 alongside the + // existing Slack + GitHub. const outcome = await routeEvent(mk('task_cancelled')); - expect([...outcome.dispatched].sort()).toEqual(['github', 'slack']); + expect([...outcome.dispatched].sort()).toEqual(['github', 'linear', 'slack']); }); test('task_stranded skips Email per §6.2', async () => { const outcome = await routeEvent(mk('task_stranded')); - expect([...outcome.dispatched].sort()).toEqual(['github', 'slack']); + expect([...outcome.dispatched].sort()).toEqual(['github', 'linear', 'slack']); }); test('agent_error routes only to Slack', async () => { @@ -399,7 +424,7 @@ describe('fanout-task-events: routeEvent (per-channel dispatch)', () => { test('per-task override silences one channel without affecting others', async () => { const overrides: TaskNotificationsConfig = { slack: { enabled: false } }; const outcome = await routeEvent(mk('task_completed'), overrides); - expect([...outcome.dispatched].sort()).toEqual(['email', 'github']); + expect([...outcome.dispatched].sort()).toEqual(['email', 'github', 'linear']); expect(outcome.dispatched).not.toContain('slack'); }); }); @@ -447,8 +472,12 @@ describe('fanout-task-events: channel isolation', () => { expect(mockDispatchSlackEvent).toHaveBeenCalledTimes(1); // (2) Telemetry truthfulness: Slack must NOT be in ``dispatched`` - // because its dispatcher rejected. Email + GitHub are. - expect([...outcome.dispatched].sort()).toEqual(['email', 'github']); + // because its dispatcher rejected. Email + GitHub + Linear are. + // Linear joined the terminal-event dispatcher list in #239; for + // non-Linear tasks (this test omits channel_source — dispatcher + // short-circuits early but still resolves cleanly so it counts + // as dispatched). + expect([...outcome.dispatched].sort()).toEqual(['email', 'github', 'linear']); expect(outcome.dispatched).not.toContain('slack'); // (3) Slack landed in ``infraRejections`` so the handler will @@ -591,13 +620,29 @@ describe('fanout-task-events: GitHub dispatcher (Chunk J)', () => { beforeEach(() => { // Per-test-suite reset. After ``mockReset`` we re-establish a // permissive default so a test that forgets to script GetCommand - // doesn't crash with a TypeError. - mockDdbSend.mockReset().mockResolvedValue({ Item: undefined }); + // doesn't crash with a TypeError. Uses an implementation that + // dispatches by command type so the GitHub + Linear dispatchers + // can both call ``send`` (Get from each dispatcher, Update from + // GitHub) without the test having to script every call sequence. + mockDdbSend.mockReset().mockImplementation((cmd: { _type?: string }) => { + if (cmd?._type === 'Update') return Promise.resolve({}); + // Default Get → no Item (test overrides with ``mockResolvedValueOnce`` + // BEFORE invoking the handler). Pre-existing tests pass ``Item: TASK_RECORD_BASE`` + // via ``mockResolvedValueOnce`` chains; that takes precedence over this + // impl thanks to mockResolvedValueOnce's stacking semantics. + return Promise.resolve({ Item: undefined }); + }); mockUpsertTaskComment.mockReset(); mockRenderCommentBody.mockReset().mockReturnValue('rendered body'); mockLoadRepoConfig.mockReset().mockResolvedValue(null); mockResolveGitHubToken.mockReset().mockResolvedValue('ghp_fake'); mockClearTokenCache.mockReset(); + // Linear dispatcher's postIssueComment runs in parallel with the + // GitHub dispatcher under the new fan-out wiring (#239). Stub it as + // a no-op for these GitHub-focused tests so a non-Linear-channel + // task short-circuits inside the dispatcher (channel_source === + // 'api' / 'github'). Pre-existing tests don't assert on it. + mockPostIssueComment.mockReset().mockResolvedValue(true); }); test('first terminal event POSTs a new comment and persists the comment_id to TaskTable', async () => { @@ -626,8 +671,15 @@ describe('fanout-task-events: GitHub dispatcher (Chunk J)', () => { // that GitHub rejects with HTTP 400. The field must no longer be // passed on. expect(upsertArg).not.toHaveProperty('existingEtag'); - // UpdateCommand fired with the new id (no etag persistence). - const update = mockDdbSend.mock.calls[1][0] as { + // UpdateCommand fired with the new id (no etag persistence). Find it + // by command type rather than index — Linear's dispatcher ALSO + // calls GetCommand against the same shared mock (#239), so the + // call sequence is no longer a deterministic [Get, Update]. + const updateCall = mockDdbSend.mock.calls.find( + ([cmd]) => (cmd as { _type?: string })._type === 'Update', + ); + expect(updateCall).toBeDefined(); + const update = updateCall![0] as { input: { ExpressionAttributeValues: Record; UpdateExpression: string; @@ -643,9 +695,15 @@ describe('fanout-task-events: GitHub dispatcher (Chunk J)', () => { }); test('subsequent event passes the persisted comment_id so the helper PATCHes', async () => { - mockDdbSend - .mockResolvedValueOnce({ Item: { ...TASK_RECORD_BASE, github_comment_id: 555 } }); - // No UpdateCommand on a PATCH — nothing new to persist. + // Both dispatchers (GitHub + Linear) call GetCommand against the + // shared mock; provide the task record for both calls. PATCH path: + // no UpdateCommand on a PATCH because there's no new state. + mockDdbSend.mockReset().mockImplementation((cmd: { _type?: string }) => { + if (cmd?._type === 'Get') { + return Promise.resolve({ Item: { ...TASK_RECORD_BASE, github_comment_id: 555 } }); + } + return Promise.resolve({}); + }); mockUpsertTaskComment.mockResolvedValueOnce({ commentId: 555, created: false, @@ -656,22 +714,37 @@ describe('fanout-task-events: GitHub dispatcher (Chunk J)', () => { const upsertArg = mockUpsertTaskComment.mock.calls[0][0]; expect(upsertArg.existingCommentId).toBe(555); - // No second DDB call (no UpdateCommand) — the PATCH path skips - // ``saveCommentState`` since there's no new state. - expect(mockDdbSend).toHaveBeenCalledTimes(1); + // No UpdateCommand fired — the PATCH path skips ``saveCommentState`` + // since there's no new state. Linear's dispatcher only does a Get + // (then short-circuits on channel_source !== 'linear' for this 'api' + // task), so the only sends are: GitHub-Get, Linear-Get. No Update. + const updateCalls = mockDdbSend.mock.calls.filter( + ([cmd]) => (cmd as { _type?: string })._type === 'Update', + ); + expect(updateCalls).toHaveLength(0); }); test('task with no issue_number and no pr_number skips the GitHub dispatcher', async () => { - mockDdbSend.mockResolvedValueOnce({ - Item: { ...TASK_RECORD_BASE, pr_number: undefined, issue_number: undefined }, + mockDdbSend.mockReset().mockImplementation((cmd: { _type?: string }) => { + if (cmd?._type === 'Get') { + return Promise.resolve({ + Item: { ...TASK_RECORD_BASE, pr_number: undefined, issue_number: undefined }, + }); + } + return Promise.resolve({}); }); const event: DynamoDBStreamEvent = { Records: [mkEvent('task_completed', 't-gh')] }; await handler(event); expect(mockUpsertTaskComment).not.toHaveBeenCalled(); - // No UpdateItem either — nothing to persist. - expect(mockDdbSend).toHaveBeenCalledTimes(1); + // No UpdateCommand fired — nothing to persist. Both dispatchers + // ran their Get (Linear short-circuits on channel_source) but no + // writes happened. + const updateCalls = mockDdbSend.mock.calls.filter( + ([cmd]) => (cmd as { _type?: string })._type === 'Update', + ); + expect(updateCalls).toHaveLength(0); }); test('missing task record (TTL race) → skip without throwing', async () => { @@ -741,11 +814,14 @@ describe('fanout-task-events: GitHub dispatcher (Chunk J)', () => { // the UpdateItem must require ``github_comment_id = :prev`` so // we cannot silently overwrite a sibling fanout invocation that // already re-posted (or that beat us to writing a fresh id). - mockDdbSend - .mockResolvedValueOnce({ - Item: { ...TASK_RECORD_BASE, github_comment_id: 555 }, - }) - .mockResolvedValueOnce({}); // UpdateCommand for the re-POST + mockDdbSend.mockReset().mockImplementation((cmd: { _type?: string }) => { + if (cmd?._type === 'Get') { + return Promise.resolve({ + Item: { ...TASK_RECORD_BASE, github_comment_id: 555 }, + }); + } + return Promise.resolve({}); + }); mockUpsertTaskComment.mockResolvedValueOnce({ commentId: 999, // new id from the fallback POST created: true, @@ -754,7 +830,13 @@ describe('fanout-task-events: GitHub dispatcher (Chunk J)', () => { const event: DynamoDBStreamEvent = { Records: [mkEvent('task_completed', 't-gh')] }; await handler(event); - const update = mockDdbSend.mock.calls[1][0] as { + // Find the UpdateCommand by command type (Linear dispatcher's + // GetCommand sits between GitHub's Get and Update post-#239). + const updateCall = mockDdbSend.mock.calls.find( + ([cmd]) => (cmd as { _type?: string })._type === 'Update', + ); + expect(updateCall).toBeDefined(); + const update = updateCall![0] as { input: { ExpressionAttributeValues: Record; UpdateExpression: string; @@ -907,11 +989,19 @@ describe('fanout-task-events: GitHub dispatcher (Chunk J)', () => { // Benign: the task was TTL-evicted between the Get and the // Update. Subsequent events for this task will also skip, so // no duplicate-comment risk. Must NOT alarm operators. - mockDdbSend - .mockResolvedValueOnce({ Item: TASK_RECORD_BASE }) - .mockRejectedValueOnce( - Object.assign(new Error('condition failed'), { name: 'ConditionalCheckFailedException' }), - ); + // + // Linear dispatcher also calls Get against the same mock (#239); + // dispatch on command type so its Get returns the same Item but + // the GitHub UpdateCommand specifically rejects. + mockDdbSend.mockReset().mockImplementation((cmd: { _type?: string }) => { + if (cmd?._type === 'Get') return Promise.resolve({ Item: TASK_RECORD_BASE }); + if (cmd?._type === 'Update') { + return Promise.reject( + Object.assign(new Error('condition failed'), { name: 'ConditionalCheckFailedException' }), + ); + } + return Promise.resolve({}); + }); mockUpsertTaskComment.mockResolvedValueOnce({ commentId: 1, created: true }); const event: DynamoDBStreamEvent = { Records: [mkEvent('task_completed', 't-gh')] }; @@ -931,11 +1021,18 @@ describe('fanout-task-events: GitHub dispatcher (Chunk J)', () => { 'error', ).mockImplementation(errorSpy); - mockDdbSend - .mockResolvedValueOnce({ Item: TASK_RECORD_BASE }) - .mockRejectedValueOnce( - Object.assign(new Error('throttled'), { name: 'ProvisionedThroughputExceededException' }), - ); + // Linear dispatcher also calls Get; dispatch by command type so + // it gets the Item (then short-circuits on channel_source !== + // 'linear') while GitHub's Update specifically throttles. + mockDdbSend.mockReset().mockImplementation((cmd: { _type?: string }) => { + if (cmd?._type === 'Get') return Promise.resolve({ Item: TASK_RECORD_BASE }); + if (cmd?._type === 'Update') { + return Promise.reject( + Object.assign(new Error('throttled'), { name: 'ProvisionedThroughputExceededException' }), + ); + } + return Promise.resolve({}); + }); mockUpsertTaskComment.mockResolvedValueOnce({ commentId: 1, created: true }); const event: DynamoDBStreamEvent = { Records: [mkEvent('task_completed', 't-gh')] }; @@ -1215,6 +1312,373 @@ describe('fanout-task-events: Slack dispatcher (issue #64 migration)', () => { }); }); +// --------------------------------------------------------------------------- +// Linear dispatcher (#239) +// --------------------------------------------------------------------------- + +describe('fanout-task-events: Linear dispatcher (issue #239)', () => { + const TASK_RECORD_LINEAR = { + task_id: 't-lin', + user_id: 'u-1', + status: 'COMPLETED', + repo: 'owner/repo', + branch_name: 'bgagent/t-lin/fix', + channel_source: 'linear', + channel_metadata: { + linear_issue_id: 'issue-uuid-42', + linear_workspace_id: 'org-uuid-acme', + }, + status_created_at: 'COMPLETED#2026-04-30T12:00:00Z', + created_at: '2026-04-30T11:50:00Z', + updated_at: '2026-04-30T12:00:00Z', + cost_usd: 0.55, + turns_attempted: 27, + max_turns: 100, + duration_s: 221, + pr_url: 'https://github.com/owner/repo/pull/13', + }; + + beforeEach(() => { + mockDdbSend.mockReset().mockResolvedValue({ Item: undefined }); + mockPostIssueComment.mockReset().mockResolvedValue(true); + // Slack/GitHub mocks aren't asserted here but leaving them + // un-reset would let prior-test rejections bleed in. + mockDispatchSlackEvent.mockReset().mockResolvedValue(undefined); + // GitHub dispatcher resolves cleanly so it doesn't reject the + // batch — its dispatcher will skip on "no comment target" since + // the Linear test record has no pr_number/issue_number, but the + // upsertTaskComment mock is harmless either way. + mockUpsertTaskComment.mockReset().mockResolvedValue({ commentId: 1, created: false }); + mockRenderCommentBody.mockReset().mockReturnValue('rendered body'); + mockLoadRepoConfig.mockReset().mockResolvedValue(null); + mockResolveGitHubToken.mockReset().mockResolvedValue('ghp_fake'); + }); + + // Helper: configure the shared DDB mock so EVERY GetCommand returns + // the supplied Item. Both GitHub and Linear dispatchers call Get + // against the shared mock; they need the same record back. + const mockGet = (item: unknown) => { + mockDdbSend.mockReset().mockImplementation((cmd: { _type?: string }) => { + if (cmd?._type === 'Get') return Promise.resolve({ Item: item }); + return Promise.resolve({}); + }); + }; + + test('task_completed posts ✅ comment with cost / turns / duration on linked Linear issue', async () => { + mockGet(TASK_RECORD_LINEAR); + + const event: DynamoDBStreamEvent = { Records: [mkEvent('task_completed', 't-lin')] }; + await handler(event); + + expect(mockPostIssueComment).toHaveBeenCalledTimes(1); + const [ctx, issueId, body] = mockPostIssueComment.mock.calls[0]; + expect(ctx).toEqual({ + linearWorkspaceId: 'org-uuid-acme', + registryTableName: 'LinearWorkspaceRegistry', + }); + expect(issueId).toBe('issue-uuid-42'); + expect(body).toContain('✅'); + expect(body).toContain('Task completed'); + expect(body).toContain('$0.55'); + expect(body).toContain('27 / 100'); + expect(body).toContain('3m 41s'); + // PR URL is intentionally NOT rendered on the ✅ success path — + // the agent's step-2 "PR opened" comment already carries it, so + // duplicating it here just stacks two near-identical links on the + // Linear issue. (Smoke-test feedback after the first dev deploy.) + // The ⚠️ "shipped a PR but stopped early" path DOES render it, + // because the agent may have crashed before its step-2 comment + // fired — see the ABCA-91 case test below. + expect(body).not.toContain('https://github.com/owner/repo/pull/13'); + expect(body).toContain('t-lin'); + }); + + test('task_failed without PR renders ❌ frame', async () => { + mockGet({ + ...TASK_RECORD_LINEAR, + pr_url: undefined, + error_message: 'Generic crash', + }); + + const event: DynamoDBStreamEvent = { Records: [mkEvent('task_failed', 't-lin')] }; + await handler(event); + + expect(mockPostIssueComment).toHaveBeenCalledTimes(1); + const [, , body] = mockPostIssueComment.mock.calls[0]; + expect(body).toContain('❌'); + expect(body).not.toContain('Shipped a PR'); + }); + + test('error_max_turns + pr_url renders ⚠️ "shipped a PR but stopped early" frame (ABCA-91 case)', async () => { + // The motivating real-world case from #239: ABCA-91 hit max_turns + // on turn 101 but successfully opened PR #35 before the cap fired. + // The Linear comment should frame this as ⚠️ shipped-but-stopped, + // not ❌ failed — the work landed and the requester needs to see + // the PR link. + mockGet({ + ...TASK_RECORD_LINEAR, + // Terminal event-type stays 'task_failed' for max-turns; the + // classifier reads the error_message text to derive the title. + error_message: 'Task did not succeed: agent_status="error_max_turns"', + turns_attempted: 101, + cost_usd: 3.44, + duration_s: 1272, + }); + + const event: DynamoDBStreamEvent = { Records: [mkEvent('task_failed', 't-lin')] }; + await handler(event); + + const [, , body] = mockPostIssueComment.mock.calls[0]; + expect(body).toContain('⚠️'); + expect(body).toContain('Shipped a PR but stopped early'); + expect(body).toContain('https://github.com/owner/repo/pull/13'); + expect(body).toContain('$3.44'); + expect(body).toContain('101 / 100'); + expect(body).toContain('21m 12s'); + }); + + test('non-Linear task short-circuits — postIssueComment never called', async () => { + // The dispatcher gates on ``channel_source === 'linear'``. Slack + // and GitHub origin tasks (which still fan out to Linear's + // dispatcher because terminal-events are subscribed for all + // channels) must not cause a Linear API call. + mockGet({ ...TASK_RECORD_LINEAR, channel_source: 'github' }); + + const event: DynamoDBStreamEvent = { Records: [mkEvent('task_completed', 't-lin')] }; + await handler(event); + + expect(mockPostIssueComment).not.toHaveBeenCalled(); + }); + + test('Linear-origin task missing channel_metadata.linear_issue_id — skip with warning', async () => { + // Defensive: a properly-admitted Linear task should always have + // these fields, but if it doesn't we'd rather log + skip than + // throw or post a comment to the wrong issue. + mockGet({ + ...TASK_RECORD_LINEAR, + channel_metadata: { linear_workspace_id: 'org-uuid-acme' }, // no issue id + }); + + const event: DynamoDBStreamEvent = { Records: [mkEvent('task_completed', 't-lin')] }; + await handler(event); + + expect(mockPostIssueComment).not.toHaveBeenCalled(); + }); + + test('postIssueComment returning false (Linear API down) does not reject the dispatcher', async () => { + // postIssueComment is best-effort — a Linear outage returns false + // rather than throwing. The dispatcher logs the failure but + // resolves cleanly so the routing layer doesn't flag the record + // for retry (retrying won't fix Linear's API). + mockGet(TASK_RECORD_LINEAR); + mockPostIssueComment.mockReset().mockResolvedValue(false); + + const event: DynamoDBStreamEvent = { Records: [mkEvent('task_completed', 't-lin')] }; + const result = await handler(event); + + expect(mockPostIssueComment).toHaveBeenCalledTimes(1); + // Critical: resolve, don't reject. No batchItemFailures. + expect(result).toEqual({ batchItemFailures: [] }); + }); + + test('LINEAR_WORKSPACE_REGISTRY_TABLE_NAME unset → dispatcher logs WARN and skips', async () => { + // The deploy-misconfig safety valve: if a stack is built without the + // Linear integration but somehow ends up with the dispatcher in the + // map, the missing env var must short-circuit cleanly. WARN + + // error_id so the operator sees an alarmable signal — the Linear + // comment is the *only* completion signal for the agent-crash case + // (#239), so silent drops are exactly what this dispatcher exists + // to prevent. + const original = process.env.LINEAR_WORKSPACE_REGISTRY_TABLE_NAME; + delete process.env.LINEAR_WORKSPACE_REGISTRY_TABLE_NAME; + const loggerModule = await import('../../src/handlers/shared/logger'); + const warnSpy = jest.spyOn(loggerModule.logger, 'warn').mockImplementation(() => undefined); + try { + mockGet(TASK_RECORD_LINEAR); + + const event: DynamoDBStreamEvent = { Records: [mkEvent('task_completed', 't-lin')] }; + const result = await handler(event); + + expect(mockPostIssueComment).not.toHaveBeenCalled(); + expect(result).toEqual({ batchItemFailures: [] }); + const missingEnvWarn = warnSpy.mock.calls + .map(c => c[1] as Record | undefined) + .find(meta => meta?.event === 'fanout.linear.missing_env'); + expect(missingEnvWarn).toBeDefined(); + expect(missingEnvWarn?.error_id).toBe('FANOUT_LINEAR_MISSING_ENV'); + } finally { + warnSpy.mockRestore(); + if (original !== undefined) process.env.LINEAR_WORKSPACE_REGISTRY_TABLE_NAME = original; + } + }); + + test('error_max_turns WITHOUT pr_url renders ❌ frame, not ⚠️ (the no-PR boundary)', async () => { + // The flip the other direction: without a PR, even a max-turns + // failure is a plain ❌. Pins the (eventType, prUrl) discriminator — + // the requester only sees ⚠️ when the agent shipped something. + mockGet({ + ...TASK_RECORD_LINEAR, + pr_url: undefined, + error_message: 'Task did not succeed: agent_status="error_max_turns"', + }); + + const event: DynamoDBStreamEvent = { Records: [mkEvent('task_failed', 't-lin')] }; + await handler(event); + + const [, , body] = mockPostIssueComment.mock.calls[0]; + expect(body).toContain('❌'); + expect(body).not.toContain('⚠️'); + expect(body).not.toContain('Shipped a PR'); + // Classifier title still appears on the ❌ frame. The actual title + // for max-turns errors is "Exceeded max turns" (see error-classifier.ts). + expect(body).toContain('Exceeded max turns'); + }); +}); + +// --------------------------------------------------------------------------- +// renderLinearFinalStatusComment — table-driven tests for the formatter +// --------------------------------------------------------------------------- + +describe('renderLinearFinalStatusComment', () => { + // The dispatcher tests above exercise the renderer indirectly through + // the full handler stack. These tests call the exported renderer + // directly to cover edge cases the integration fixtures don't: + // null-metric fallbacks, formatDuration boundaries (`<60s`, + // exact-minute), and the title-on-⚠️ rendering for the ABCA-91 case. + + test('all metrics null → renders em-dash placeholders', () => { + // The crash-before-metrics case: the agent died so early that no + // turns were attempted, no cost was tagged, and duration was zero. + // Better to show `—` than `0` or `null`. + const body = renderLinearFinalStatusComment({ + eventType: 'task_failed', + prUrl: null, + costUsd: null, + turns: null, + maxTurns: null, + durationS: null, + taskId: 't-empty', + errorTitle: null, + }); + expect(body).toContain('cost: — • turns: — • duration: —'); + }); + + test('turns present but maxTurns null → renders just turns without slash', () => { + // A max-turns-cap config that never materialised on the task + // (older record, schema gap). Don't render `27 / null`. + const body = renderLinearFinalStatusComment({ + eventType: 'task_completed', + prUrl: null, + costUsd: 0.5, + turns: 27, + maxTurns: null, + durationS: 60, + taskId: 't', + errorTitle: null, + }); + expect(body).toContain('turns: 27 '); + expect(body).not.toContain('27 /'); + }); + + test('formatDuration: under 60s → seconds only', () => { + const body = renderLinearFinalStatusComment({ + eventType: 'task_completed', + prUrl: null, + costUsd: 0.01, + turns: 1, + maxTurns: 100, + durationS: 42, + taskId: 't', + errorTitle: null, + }); + expect(body).toContain('duration: 42s'); + }); + + test('formatDuration: exact minute → `Nm` without zero seconds', () => { + // ``180 → 3m`` not ``3m 0s``. Cosmetic but the regex anchored. + const body = renderLinearFinalStatusComment({ + eventType: 'task_completed', + prUrl: null, + costUsd: 0.01, + turns: 1, + maxTurns: 100, + durationS: 180, + taskId: 't', + errorTitle: null, + }); + expect(body).toContain('duration: 3m'); + expect(body).not.toContain('3m 0s'); + }); + + test('formatDuration: minutes + seconds → `Nm Ss`', () => { + const body = renderLinearFinalStatusComment({ + eventType: 'task_completed', + prUrl: null, + costUsd: 0.01, + turns: 1, + maxTurns: 100, + durationS: 221, + taskId: 't', + errorTitle: null, + }); + expect(body).toContain('duration: 3m 41s'); + }); + + test('⚠️ frame renders the classifier title (ABCA-91 contextual reason)', () => { + // Krokoko's review item #4: the most useful context for the ⚠️ + // case is *why* the agent stopped early. Render the classifier + // title alongside "Shipped a PR but stopped early" so the + // requester sees both outcomes. + const body = renderLinearFinalStatusComment({ + eventType: 'task_failed', + prUrl: 'https://github.com/owner/repo/pull/35', + costUsd: 3.44, + turns: 101, + maxTurns: 100, + durationS: 1272, + taskId: 't-abca-91', + errorTitle: 'Hit max-turns cap', + }); + expect(body).toContain('⚠️'); + expect(body).toContain('Shipped a PR but stopped early'); + expect(body).toContain('Hit max-turns cap'); + }); + + test('❌ frame includes classifier title when known', () => { + const body = renderLinearFinalStatusComment({ + eventType: 'task_failed', + prUrl: null, + costUsd: 0.05, + turns: 3, + maxTurns: 100, + durationS: 30, + taskId: 't', + errorTitle: 'Insufficient GitHub permissions', + }); + expect(body).toContain('❌'); + expect(body).toContain('Insufficient GitHub permissions'); + }); + + test('❌ frame renders without colon when errorTitle is null (clean fallback)', () => { + // Distinct from the "Unexpected error" case — this is what happens + // when the classifier returns null (empty error_message). Header + // should not render a stranded ": " trailing the subtype. + const body = renderLinearFinalStatusComment({ + eventType: 'task_cancelled', + prUrl: null, + costUsd: null, + turns: null, + maxTurns: null, + durationS: null, + taskId: 't', + errorTitle: null, + }); + expect(body).toContain('❌'); + expect(body).toContain('cancelled'); + expect(body).not.toMatch(/cancelled:\s/); + }); +}); + // --------------------------------------------------------------------------- // Scenario 7-extended — agent_milestone routing regression // ---------------------------------------------------------------------------