Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions agent/src/prompt_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,15 @@ def _channel_prompt_addendum(config: TaskConfig) -> str:
"transition the issue state to `In Review` (fall back to `In Progress` "
"if that state doesn't exist). If neither exists, skip the state "
"transition — the PR comment alone is enough. Do not invent state "
"names or loop on `list_issue_statuses`.\n"
"3. **On completion or failure** — call `mcp__linear-server__save_comment` "
"with the final status (succeeded / failed + short reason).\n\n"
"Keep comments concise. Do not mirror the full agent transcript back to "
"Linear. Even small tasks must post all three updates — users rely on "
"them to track progress."
"names or loop on `list_issue_statuses`.\n\n"
"**Do NOT post a final 'task completed' or 'task failed' comment.** "
"The platform fan-out plane (issue #239) posts a structured "
"✅/⚠️/❌ summary on terminal events with cost / turns / duration / "
"PR-link metrics that you don't have visibility into. A redundant "
"agent-side completion comment would just stack two near-identical "
"comments on the issue.\n\n"
"Keep the start + PR-opened comments concise. Do not mirror the full "
"agent transcript back to Linear."
)


Expand Down
41 changes: 41 additions & 0 deletions cdk/src/constructs/fanout-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,23 @@ export interface FanOutConsumerProps {
*/
readonly slackSecretArnPattern?: string;

/**
* LinearWorkspaceRegistryTable — the Linear dispatcher reads this to
* resolve per-workspace OAuth tokens at comment-post time. Optional:
* when omitted, the dispatcher logs and skips so a deployment without
* Linear onboarding doesn't accumulate dangling IAM grants.
*/
readonly linearWorkspaceRegistryTable?: dynamodb.ITable;

/**
* Secrets Manager ARN-prefix pattern for per-workspace Linear OAuth
* bundles. Mirrors ``slackSecretArnPattern`` shape — typically
* ``bgagent-linear-oauth-*``. Required when ``linearWorkspaceRegistryTable``
* is set; without it the dispatcher would resolve the registry row but
* fail at the SM GetSecretValue call.
*/
readonly linearOauthSecretArnPattern?: string;

/**
* Maximum batch size delivered to the Lambda per invocation.
*
Expand Down Expand Up @@ -173,6 +190,30 @@ export class FanOutConsumer extends Construct {
}));
}

// Linear dispatcher plumbing. Same guarded shape as Slack/GitHub:
// a deployment without Linear onboarding gets no IAM grants and
// the dispatcher logs-and-skips on missing env. The registry table
// tells us per-workspace OAuth-secret ARN; the secret holds the
// access token that ``postIssueComment`` uses to drive
// ``commentCreate`` GraphQL.
if (props.linearWorkspaceRegistryTable) {
props.linearWorkspaceRegistryTable.grantReadData(this.fn);
this.fn.addEnvironment(
'LINEAR_WORKSPACE_REGISTRY_TABLE_NAME',
props.linearWorkspaceRegistryTable.tableName,
);
}
if (props.linearOauthSecretArnPattern) {
this.fn.addToRolePolicy(new iam.PolicyStatement({
// GetSecretValue + PutSecretValue: the resolver may rotate the
// OAuth token (writes the refreshed bundle back to SM) — same
// grants the LinearIntegration's webhook-processor Lambda holds
// for the same reason.
actions: ['secretsmanager:GetSecretValue', 'secretsmanager:PutSecretValue'],
resources: [props.linearOauthSecretArnPattern],
}));
}

this.fn.addEventSource(new DynamoEventSource(props.taskEventsTable, {
startingPosition: StartingPosition.LATEST,
batchSize: props.batchSize ?? 100,
Expand Down
246 changes: 243 additions & 3 deletions cdk/src/handlers/fanout-task-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ import type {
DynamoDBStreamEvent,
} from 'aws-lambda';
import { clearTokenCache, resolveGitHubToken } from './shared/context-hydration';
import { classifyError } from './shared/error-classifier';
import { renderCommentBody, upsertTaskComment } from './shared/github-comment';
import { postIssueComment } from './shared/linear-feedback';
import { logger } from './shared/logger';
import { coerceNumericOrNull } from './shared/numeric';
import { loadRepoConfig } from './shared/repo-config';
Expand Down Expand Up @@ -105,7 +107,7 @@ const APPROVAL_NOTIFICATION_EVENTS = [
* - Per-user rate limit of 10 approval-related messages per minute
* is enforced in the dispatcher, not in this filter.
*/
export type NotificationChannel = 'slack' | 'email' | 'github';
export type NotificationChannel = 'slack' | 'email' | 'github' | 'linear';

export const CHANNEL_DEFAULTS: Record<NotificationChannel, ReadonlySet<string>> = {
// Slack is the "on-call" channel per §6.2 — all terminal outcomes
Expand Down Expand Up @@ -155,6 +157,21 @@ export const CHANNEL_DEFAULTS: Record<NotificationChannel, ReadonlySet<string>>
...TERMINAL_EVENT_TYPES,
'pr_created',
]),
// Linear posts a single deterministic final-status comment on
// terminal events. The agent's three-comment prompt contract (start /
// PR-opened / completion) covers in-flight progress; this dispatcher
// only fires once the task reaches a terminal state, with cost /
// turns / duration / pr_url metrics the requester wouldn't otherwise
// see. Crucially, this fires even when the agent crashes (e.g.
// error_max_turns, OOM) before reaching its own step-3 completion
// comment — the GH issue #239 motivating example.
//
// Linear's `save_comment` doesn't support edit, so this is post-once
// (no live updates a la GitHub edit-in-place). Approvals / milestones
// are excluded for the same reason — N comments rather than 1.
linear: new Set<string>([
...TERMINAL_EVENT_TYPES,
]),
};

/**
Expand Down Expand Up @@ -753,13 +770,235 @@ async function dispatchToEmail(event: FanOutEvent): Promise<void> {
});
}

/**
* Render the Linear final-status comment body. Inputs are already
* coerced to native types by the caller; this function only formats.
*
* The framing flips between three outcomes based on `(eventType, prUrl)`:
*
* 1. ``task_completed`` → ✅ "Task completed"
* 2. any non-completed terminal event WITH PR → ⚠️ "Shipped a PR but stopped early"
* (the motivating ABCA-91 case is max-turns-with-PR, but the same
* framing applies to any terminal failure — budget cap, agent
* crash, etc. — that managed to ship a PR before stopping)
* 3. any non-completed terminal event NO PR → ❌ "Task <subtype>" + classifier title
*
* The ⚠️ frame appends the classifier title when one is available so the
* requester sees both outcomes (the PR shipped, AND the reason it
* stopped — "Hit max-turns cap" for ABCA-91).
*
* Cost / turns / duration appear as a subtitle line. Missing values
* (e.g. failure before the agent emitted any tokens) render as `—`.
*/
export function renderLinearFinalStatusComment(args: {
eventType: string;
prUrl: string | null;
costUsd: number | null;
turns: number | null;
maxTurns: number | null;
durationS: number | null;
taskId: string;
errorTitle: string | null;
}): string {
const isCompleted = args.eventType === 'task_completed';
const shippedDespiteFailure = !isCompleted && args.prUrl != null;

let header: string;
if (isCompleted) {
header = '✅ **Task completed**';
} else if (shippedDespiteFailure) {
// Append the classifier title (when known) so the requester sees
// *why* the agent stopped, not just that it shipped a PR. For
// ABCA-91 this renders "...stopped early — Hit max-turns cap".
const reason = args.errorTitle ? ` — ${args.errorTitle}` : '';
header = `⚠️ **Shipped a PR but stopped early${reason}** — review and decide if more work is needed`;
} else {
const reason = args.errorTitle ? `: ${args.errorTitle}` : '';
header = `❌ **Task ${args.eventType.replace(/^task_/, '')}${reason}**`;
}

const costStr = args.costUsd != null ? `$${args.costUsd.toFixed(2)}` : '—';
const turnsStr = args.turns != null
? `${args.turns}${args.maxTurns != null ? ` / ${args.maxTurns}` : ''}`
: '—';
const durationStr = args.durationS != null
? formatDuration(args.durationS)
: '—';

const lines: string[] = [
header,
'',
`cost: ${costStr} • turns: ${turnsStr} • duration: ${durationStr}`,
];
// Render the PR URL only on the ⚠️ "shipped a PR but stopped early"
// path — that's the case where the agent's own step-2 "PR opened"
// comment is *not* guaranteed to have fired (the agent may have
// crashed between opening the PR and posting the comment, e.g.
// ABCA-91 hitting max-turns on turn 101). On the ✅ success path the
// agent's step-2 comment reliably carries the PR link, so duplicating
// it here is just noise.
if (args.prUrl && shippedDespiteFailure) {
lines.push('', `PR: ${args.prUrl}`);
}
lines.push('', `_task ${args.taskId}_`);
return lines.join('\n');
}

function formatDuration(seconds: number): string {
const total = Math.round(seconds);
if (total < 60) return `${total}s`;
const m = Math.floor(total / 60);
const s = total % 60;
return s === 0 ? `${m}m` : `${m}m ${s}s`;
}

/**
* Linear dispatcher — posts a deterministic final-status comment when a
* Linear-origin task reaches a terminal event. Mirrors Slack's structural
* shape (channel_source gate, best-effort, single error-isolation point):
*
* 1. Load TaskRecord. Skip if missing (TTL eviction race).
* 2. Gate on ``channel_source === 'linear'`` so non-Linear tasks
* short-circuit after one DDB Get.
* 3. Read ``linear_issue_id`` + ``linear_workspace_id`` from
* ``channel_metadata``. Skip if either is missing — defensive,
* shouldn't happen for properly-admitted Linear tasks.
* 4. Render the comment + post via the existing ``postIssueComment``
* helper, which itself swallows network/auth errors and returns
* false rather than throwing.
*
* Failure handling: ``postIssueComment`` is best-effort — a Linear API
* outage logs and returns false rather than throwing. We reflect that
* outcome in the dispatcher log but never reject the dispatcher
* promise: a failed Linear comment shouldn't trigger ``routeEvent``'s
* batch-retry path because retrying won't fix Linear's API.
*/
async function dispatchToLinear(event: FanOutEvent): Promise<void> {
const registryTableName = process.env.LINEAR_WORKSPACE_REGISTRY_TABLE_NAME;
if (!registryTableName) {
// WARN with error_id so this is alarmable. The Linear comment is
// the *only* completion signal for the agent-crash case (#239), so a
// misconfigured env var would silently drop every Linear-origin
// task's metrics — exactly the gap this dispatcher was built to
// close. The GitHub dispatcher uses the same WARN+error_id pattern
// for its missing-env path.
logger.warn('[fanout/linear] LINEAR_WORKSPACE_REGISTRY_TABLE_NAME not set — skipping', {
event: 'fanout.linear.missing_env',
error_id: 'FANOUT_LINEAR_MISSING_ENV',
task_id: event.task_id,
});
return;
}

const task = await loadTaskForComment(event.task_id);
if (!task) {
logger.warn('[fanout/linear] task not found — skipping comment', {
event: 'fanout.linear.task_missing',
task_id: event.task_id,
});
return;
}

// channel_source gate — short-circuit non-Linear tasks. Same shape
// Slack uses to keep the GitHub edit-in-place comment from racing
// against the platform-side Linear comment when channel_source is
// 'github'/'slack'/'api'.
if (task.channel_source !== 'linear') {
return;
}

const issueId = task.channel_metadata?.linear_issue_id;
const workspaceId = task.channel_metadata?.linear_workspace_id;
if (!issueId || !workspaceId) {
logger.warn('[fanout/linear] task missing linear_issue_id or linear_workspace_id — skipping', {
event: 'fanout.linear.metadata_missing',
task_id: event.task_id,
has_issue_id: Boolean(issueId),
has_workspace_id: Boolean(workspaceId),
});
return;
}

// Derive an error title from `error_message` via the shared classifier.
// Same data the API surfaces as `error_classification.title` —
// "Hit max-turns cap", "Insufficient GitHub permissions", etc.
//
// Returns null only when error_message is empty/undefined (the
// task_completed case). For any non-empty error_message that doesn't
// match a known pattern, returns the UNKNOWN_CLASSIFICATION fallback
// ("Unexpected error") — so a generic failure still gets a structured
// title rather than nothing. See error-classifier.ts.
const classification = classifyError(task.error_message);

const body = renderLinearFinalStatusComment({
eventType: event.event_type,
prUrl: task.pr_url ?? null,
// DDB returns numeric attributes as strings at the Document-client
// boundary; coerce so toFixed/comparisons work. Same pattern the
// GitHub dispatcher uses.
costUsd: coerceNumericOrNull(
task.cost_usd,
{ field: 'cost_usd', task_id: task.task_id, event_id: event.event_id },
logger,
),
turns: coerceNumericOrNull(
task.turns_attempted,
{ field: 'turns_attempted', task_id: task.task_id, event_id: event.event_id },
logger,
),
maxTurns: coerceNumericOrNull(
task.max_turns,
{ field: 'max_turns', task_id: task.task_id, event_id: event.event_id },
logger,
),
durationS: coerceNumericOrNull(
task.duration_s,
{ field: 'duration_s', task_id: task.task_id, event_id: event.event_id },
logger,
),
taskId: task.task_id,
errorTitle: classification?.title ?? null,
});

const ok = await postIssueComment(
{ linearWorkspaceId: workspaceId, registryTableName },
issueId,
body,
);

// Split the success / failure path so post-failure can be alarmed
// distinctly. The underlying linear-feedback.ts path already WARNs
// on the specific failure reason (auth, network, etc.); this
// backstop ensures a steady drip of post-failures shows up in the
// dispatcher's own log channel for cross-channel alarms.
if (ok) {
logger.info('[fanout/linear] comment dispatched', {
event: 'fanout.linear.dispatched',
task_id: task.task_id,
issue_id: issueId,
event_type: event.event_type,
posted: true,
});
} else {
logger.warn('[fanout/linear] postIssueComment returned false — Linear API path failed', {
event: 'fanout.linear.post_failed',
error_id: 'FANOUT_LINEAR_POST_FAILED',
task_id: task.task_id,
issue_id: issueId,
event_type: event.event_type,
posted: false,
});
}
}

/** Exposed for testing: the per-channel dispatcher callable by the
* handler. Each key's absence from the routing map disables its
* dispatcher; the signature is uniform so adding a channel is one
* entry. */
const DISPATCHERS: Record<NotificationChannel, (ev: FanOutEvent) => Promise<void>> = {
slack: dispatchToSlack,
github: dispatchToGitHubComment,
linear: dispatchToLinear,
email: dispatchToEmail,
};

Expand Down Expand Up @@ -814,8 +1053,9 @@ export async function routeEvent(
attempted.push(ch);
tasks.push(DISPATCHERS[ch](ev));
}
// Parallelism is bounded by the dispatcher list (at most 3 channels),
// not by program input, so the unbounded-parallelism lint does not apply.
// Parallelism is bounded by the dispatcher list (4 channels:
// slack/github/linear/email), not by program input, so the
// unbounded-parallelism lint does not apply.

const results = await Promise.allSettled(tasks);

Expand Down
1 change: 1 addition & 0 deletions cdk/src/handlers/shared/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ export interface TaskNotificationsConfig {
readonly slack?: ChannelConfig;
readonly email?: ChannelConfig;
readonly github?: ChannelConfig;
readonly linear?: ChannelConfig;
}

/**
Expand Down
Loading