From 9285d264a222e049db164e63e12b86b487735666 Mon Sep 17 00:00:00 2001 From: Muhammad Ahmed Cheema Date: Thu, 30 Apr 2026 22:41:56 -0700 Subject: [PATCH] channels: Socket Mode lifecycle metrics + slack_app_token in AllowedSecretNamesMirror MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit R7 Phase 8a (multi-agent Slack flow, dated 2026-04-30) hardens the existing Socket Mode receiver with Prometheus-grade observability and locks the cross-repo allowlist mirror against phantomd's AllowedSecretNames. Metrics surface (new /metrics endpoint): - phantom_slack_socket_state{state} (gauge, 7 series). Exactly one series is 1.0 at any instant. Powers "tenant offline" alerts. - phantom_slack_socket_reconnects_total (counter). Bolt's auto- reconnect runs under the hood; this measures wobble rate. - phantom_slack_socket_connection_seconds (histogram). Connection lifetime from connect to disconnect; long-tail SLO. - phantom_slack_event_dispatch_seconds{event_type} (histogram). End-to-end Bolt middleware time. Slack ack deadline is 3s. Implementation: - New SocketModeReceiver hookup (vs. socketMode: true shorthand) to reach receiver.client and subscribe lifecycle events. The State enum is verbatim from @slack/socket-mode SocketModeClient.js: connecting, authenticated, connected, reconnecting, disconnecting, disconnected. - The unrecoverable start() error path maps to a synthetic "error" state on the gauge so the fleet view does not silently flatline when the receiver never gets past handshake. - Dispatch timing via app.use() global middleware: try/await next() / finally observe(). Captures handler completion including any async LLM/memory work the agent does in response. - prom-client backed; the metrics module owns its own Registry so Telegram/email can plug in via parallel registries without name collisions on the global registry. AllowedSecretNamesMirror cross-repo invariant: - New frozen const exported from slack-channel-factory.ts listing slack_bot_token + slack_app_token + slack_gateway_signing_secret. - The phantomd side ships in the symmetric PR (phantomd #28, TestIsAllowedName_AcceptsSlackAppToken pins the assertion). - Drift on either side breaks tenant boot with HTTP 404 (the metadata gateway maps ErrInvalidName to 404 to defeat name enumeration). The factory test pins the Mirror against the SECRET_RESPONSES test fixture so a future production-side rename fails loud in CI. Tests (29 net-new): - 24 in slack-metrics.test.ts: each metric family, the noop emitter, the dispatch middleware including throw-path, the lifecycle hooks including disconnect-without-prior-connect. - 5 in slack-channel-factory.test.ts: AllowedSecretNamesMirror contains all three names, matches SECRET_RESPONSES, is frozen. - All 2111 existing tests still green; full suite: 2111 pass / 0 fail / 0 errors. Bun-mock cross-suite hygiene: every test that previously mocked @slack/bolt now also exports SocketModeReceiver in its mock object. Without this, a test file that ran AFTER one of those partial mocks would import the real module and hit "Export named 'SocketModeReceiver' not found" at module load time. Caught by running the full channels/ suite together. Net diff: 11 files, ~390 LOC. Follows R7 §3.1's "150 net-new" floor; the rest is test mirror coverage and documentation. --- CLAUDE.md | 21 +- bun.lock | 9 + package.json | 1 + .../__tests__/slack-channel-factory.test.ts | 65 ++- .../__tests__/slack-http-receiver.test.ts | 11 + src/channels/__tests__/slack-metrics.test.ts | 412 ++++++++++++++++++ src/channels/__tests__/slack.test.ts | 31 ++ src/channels/slack-channel-factory.ts | 55 ++- src/channels/slack-metrics.ts | 199 +++++++++ src/channels/slack.ts | 147 ++++++- src/core/server.ts | 46 ++ src/index.ts | 11 + 12 files changed, 992 insertions(+), 16 deletions(-) create mode 100644 src/channels/__tests__/slack-metrics.test.ts create mode 100644 src/channels/slack-metrics.ts diff --git a/CLAUDE.md b/CLAUDE.md index afcea5a1..46d11235 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -84,7 +84,9 @@ src/ email-login.ts # Magic link email delivery notifications/ # Web Push (VAPID keys, subscriptions, triggers) channels/ - slack.ts # Slack Socket Mode (primary channel, owner access control) + slack.ts # Slack Socket Mode (primary channel, owner access control, lifecycle metrics) + slack-metrics.ts # Phase 8a: prom-client metrics for Socket Mode lifecycle + dispatch + slack-channel-factory.ts # Routes between Socket Mode + HTTP receiver; AllowedSecretNamesMirror lives here telegram.ts # Telegram via Telegraf email.ts # IMAP/SMTP via ImapFlow + Nodemailer webhook.ts # HTTP webhooks with HMAC-SHA256 @@ -132,7 +134,7 @@ src/ tools.ts # phantom_create_page, phantom_generate_login login-page.ts # Login page HTML core/ - server.ts # Bun.serve() HTTP server, /health, /trigger, /webhook, /ui + server.ts # Bun.serve() HTTP server, /health, /metrics, /trigger, /webhook, /ui db/ schema.ts # SQLite migrations (7 total) connection.ts # Database connection @@ -154,6 +156,21 @@ After each session: EvolutionEngine runs 6-step reflection pipeline -> 5-gate va MCP flow: External client -> /mcp endpoint -> bearer auth -> MCP Server -> tool execution (some route through AgentRuntime for full Opus brain). +## Observability + +The Bun.serve() HTTP server exposes a Prometheus `/metrics` endpoint (Phase 8a, R7 dated 2026-04-30). Unauthenticated, matching the existing `/health` precedent: per-tenant isolation comes from the per-tenant URL behind Caddy, not per-route auth. + +Metric families exposed today (Slack Socket Mode lifecycle): + +- `phantom_slack_socket_state{state="connecting|authenticated|connected|reconnecting|disconnecting|disconnected|error"}` (gauge). Exactly one series is 1.0 at any instant; the rest are 0.0. Alerting watches `1 - max_over_time(phantom_slack_socket_state{state="connected"}[1m]) > 0` for "the tenant is offline". +- `phantom_slack_socket_reconnects_total` (counter). Bolt's auto-reconnect is on by default; this measures "the network wobbled". Alert at sustained >5/min. +- `phantom_slack_socket_connection_seconds` (histogram). Lifetime of a single Socket Mode connection from connect to disconnect. p99 should hold above 1 hour. +- `phantom_slack_event_dispatch_seconds{event_type=...}` (histogram). End-to-end Bolt middleware time. Slack's ack deadline is 3 seconds; alert on p99 > 2.5s. + +The metrics module owns a private `prom-client` Registry (no global registry pollution). Adding more channels (Telegram, email) means adding a sibling registry and merging at request time in `core/server.ts`. Future cross-channel metrics generalize via Phase 17 polish. + +Cross-repo invariant: `slack-channel-factory.ts` exports a frozen `AllowedSecretNamesMirror` array (`slack_bot_token`, `slack_app_token`, `slack_gateway_signing_secret`). The same names MUST appear in phantomd's `internal/secrets/types.go` `AllowedSecretNames` map. Drift breaks tenant boot with HTTP 404 (the gateway maps `ErrInvalidName` to 404 to defeat name enumeration). The factory test pins the mirror against its `SECRET_RESPONSES` test fixture; phantomd's `TestIsAllowedName_AcceptsSlackAppToken` (and the existing `*_AcceptsSlackGatewaySigningSecret`) pin the symmetric assertion. + ## Key Design Decisions **Qdrant over LanceDB:** WAL durability with crash recovery. Native hybrid search (dense + BM25 sparse vectors). Named vectors for separate embedding spaces. Mmap mode for low memory. TypeScript REST client works with Bun (no NAPI addon risk). diff --git a/bun.lock b/bun.lock index 701899b1..a1bbce62 100644 --- a/bun.lock +++ b/bun.lock @@ -14,6 +14,7 @@ "imapflow": "^1.2.18", "nodemailer": "^8.0.4", "playwright": "1.59.1", + "prom-client": "^15.1.3", "resend": "^6.9.4", "telegraf": "^4.16.3", "yaml": "^2.6.0", @@ -97,6 +98,8 @@ "@napi-rs/wasm-runtime": ["@napi-rs/wasm-runtime@1.1.3", "", { "dependencies": { "@tybys/wasm-util": "^0.10.1" }, "peerDependencies": { "@emnapi/core": "^1.7.1", "@emnapi/runtime": "^1.7.1" } }, "sha512-xK9sGVbJWYb08+mTJt3/YV24WxvxpXcXtP6B172paPZ+Ts69Re9dAr7lKwJoeIx8OoeuimEiRZ7umkiUVClmmQ=="], + "@opentelemetry/api": ["@opentelemetry/api@1.9.1", "", {}, "sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q=="], + "@oxc-project/types": ["@oxc-project/types@0.124.0", "", {}, "sha512-VBFWMTBvHxS11Z5Lvlr3IWgrwhMTXV+Md+EQF0Xf60+wAdsGFTBx7X7K/hP4pi8N7dcm1RvcHwDxZ16Qx8keUg=="], "@pinojs/redact": ["@pinojs/redact@0.4.0", "", {}, "sha512-k2ENnmBugE/rzQfEcdWHcCY+/FM3VLzH9cYEsbdsoqrvzAKRhUZeRNhAZvB8OitQJ1TBed3yqWtdjzS6wJKBwg=="], @@ -205,6 +208,8 @@ "base64-arraybuffer": ["base64-arraybuffer@1.0.2", "", {}, "sha512-I3yl4r9QB5ZRY3XuJVEPfc2XhZO6YweFPI+UovAzn+8/hb3oJ6lnysaFcjVpkCPfVWFUDvoZ8kmVDP7WyRtYtQ=="], + "bintrees": ["bintrees@1.0.2", "", {}, "sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw=="], + "body-parser": ["body-parser@2.2.2", "", { "dependencies": { "bytes": "^3.1.2", "content-type": "^1.0.5", "debug": "^4.4.3", "http-errors": "^2.0.0", "iconv-lite": "^0.7.0", "on-finished": "^2.4.1", "qs": "^6.14.1", "raw-body": "^3.0.1", "type-is": "^2.0.1" } }, "sha512-oP5VkATKlNwcgvxi0vM0p/D3n2C3EReYVX+DNYs5TjZFn/oQt2j+4sVJtSMr18pdRr8wjTcBl6LoV+FUwzPmNA=="], "buffer-alloc": ["buffer-alloc@1.2.0", "", { "dependencies": { "buffer-alloc-unsafe": "^1.1.0", "buffer-fill": "^1.0.0" } }, "sha512-CFsHQgjtW1UChdXgbyJGtnm+O/uLQeZdtbDo8mfUgYXCHSM1wgrVxXm6bSyrUuErEb+4sYVGCzASBRot7zyrow=="], @@ -461,6 +466,8 @@ "process-warning": ["process-warning@5.0.0", "", {}, "sha512-a39t9ApHNx2L4+HBnQKqxxHNs1r7KF+Intd8Q/g1bUh6q0WIp9voPXJ/x0j+ZL45KF1pJd9+q2jLIRMfvEshkA=="], + "prom-client": ["prom-client@15.1.3", "", { "dependencies": { "@opentelemetry/api": "^1.4.0", "tdigest": "^0.1.1" } }, "sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g=="], + "proxy-addr": ["proxy-addr@2.0.7", "", { "dependencies": { "forwarded": "0.2.0", "ipaddr.js": "1.9.1" } }, "sha512-llQsMLSUDUPT44jdrU/O37qlnifitDP+ZwrmmZcoSKyLKvtZxpyV0n2/bD/N4tBAAZ/gJEdZU7KMraoK1+XYAg=="], "proxy-from-env": ["proxy-from-env@1.1.0", "", {}, "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg=="], @@ -533,6 +540,8 @@ "svix": ["svix@1.86.0", "", { "dependencies": { "standardwebhooks": "1.0.0", "uuid": "^10.0.0" } }, "sha512-/HTvXwjLJe1l/MsLXAO1ddCYxElJk4eNR4DzOjDOEmGrPN/3BtBE8perGwMAaJ2sT5T172VkBYzmHcjUfM1JRQ=="], + "tdigest": ["tdigest@0.1.2", "", { "dependencies": { "bintrees": "1.0.2" } }, "sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA=="], + "telegraf": ["telegraf@4.16.3", "", { "dependencies": { "@telegraf/types": "^7.1.0", "abort-controller": "^3.0.0", "debug": "^4.3.4", "mri": "^1.2.0", "node-fetch": "^2.7.0", "p-timeout": "^4.1.0", "safe-compare": "^1.1.4", "sandwich-stream": "^2.0.2" }, "bin": { "telegraf": "lib/cli.mjs" } }, "sha512-yjEu2NwkHlXu0OARWoNhJlIjX09dRktiMQFsM678BAH/PEPVwctzL67+tvXqLCRQQvm3SDtki2saGO9hLlz68w=="], "thread-stream": ["thread-stream@4.0.0", "", { "dependencies": { "real-require": "^0.2.0" } }, "sha512-4iMVL6HAINXWf1ZKZjIPcz5wYaOdPhtO8ATvZ+Xqp3BTdaqtAwQkNmKORqcIo5YkQqGXq5cwfswDwMqqQNrpJA=="], diff --git a/package.json b/package.json index b1d70192..3a2ef267 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ "imapflow": "^1.2.18", "nodemailer": "^8.0.4", "playwright": "1.59.1", + "prom-client": "^15.1.3", "resend": "^6.9.4", "telegraf": "^4.16.3", "yaml": "^2.6.0", diff --git a/src/channels/__tests__/slack-channel-factory.test.ts b/src/channels/__tests__/slack-channel-factory.test.ts index 91a9e4df..f0503ed1 100644 --- a/src/channels/__tests__/slack-channel-factory.test.ts +++ b/src/channels/__tests__/slack-channel-factory.test.ts @@ -7,6 +7,7 @@ import type { ChannelsConfig } from "../../config/schemas.ts"; const mockApp = mock(() => ({ event: () => {}, action: () => {}, + use: () => {}, client: { auth: { test: () => Promise.resolve({ user_id: "U_BOT" }) }, chat: { postMessage: () => Promise.resolve({ ts: "1.0" }), update: () => Promise.resolve({ ok: true }) }, @@ -21,12 +22,23 @@ const mockReceiver = { stop: () => Promise.resolve(), }; +// Phase 8a: Socket Mode receiver mock. The constructor returns an object +// whose `client.on(...)` is a no-op for routing tests; lifecycle metric +// behaviour is exercised in slack-metrics.test.ts and the SlackChannel +// suite, not here. +const mockSocketModeReceiver = mock(() => ({ + client: { on: () => {} }, +})); + mock.module("@slack/bolt", () => ({ App: mockApp, ExpressReceiver: mock(() => mockReceiver), + SocketModeReceiver: mockSocketModeReceiver, })); -const { createSlackChannel, readSlackTransportFromEnv } = await import("../slack-channel-factory.ts"); +const { createSlackChannel, readSlackTransportFromEnv, AllowedSecretNamesMirror } = await import( + "../slack-channel-factory.ts" +); const { SlackChannel } = await import("../slack.ts"); const { SlackHttpChannel } = await import("../slack-http-receiver.ts"); @@ -48,16 +60,23 @@ const HTTP_IDENTITY = { }, }; -// Cross-repo invariant (audit Finding 1, dated 2026-04-25): the names -// "slack_bot_token" and "slack_gateway_signing_secret" must appear in -// phantomd's internal/secrets/types.go AllowedSecretNames map. Any drift -// between phantom and phantomd breaks SLACK_TRANSPORT=http boot with HTTP -// 404. This map is the SINGLE source of truth for the http-mode tests +// Cross-repo invariant: the names below must appear in phantomd's +// internal/secrets/types.go AllowedSecretNames map. Any drift between +// phantom and phantomd breaks tenant boot with HTTP 404 (the gateway maps +// ErrInvalidName to 404 to avoid name enumeration). +// +// Audit Finding 1 (2026-04-25) added slack_bot_token + slack_gateway_signing_secret. +// Phase 8a (R7 dated 2026-04-30) added slack_app_token for Socket Mode +// self-installed agent #2+ tenants. The phantomd side ships in PR #28 +// (TestIsAllowedName_AcceptsSlackAppToken pins the symmetric assertion). +// +// This fixture is the SINGLE source of truth for the http-mode tests // below; makeSecretFetcher() throws fail-loud on any name not listed // here, so a future production-side rename that misses one repo will // fail this test suite immediately instead of silently shipping a 404. const SECRET_RESPONSES: Record = { slack_bot_token: "xoxb-from-metadata", + slack_app_token: "xapp-1-from-metadata", slack_gateway_signing_secret: "0123456789abcdef".repeat(4), }; @@ -245,3 +264,37 @@ describe("createSlackChannel", () => { await expect(fetcher.get("totally_made_up")).rejects.toThrow(/AllowedSecretNames/); }); }); + +// Phase 8a (R7 2026-04-30): pin the cross-repo invariant for the new +// slack_app_token entry. AllowedSecretNamesMirror is the phantom-side +// authoritative list; phantomd's TestIsAllowedName_AcceptsSlackAppToken is +// the matching assertion in the symmetric position. If a future contributor +// removes the entry on either side without the matching edit, both test +// suites fail-loud. +describe("AllowedSecretNamesMirror", () => { + test("includes slack_bot_token", () => { + expect(AllowedSecretNamesMirror).toContain("slack_bot_token"); + }); + + test("includes slack_app_token (Phase 8a Socket Mode)", () => { + expect(AllowedSecretNamesMirror).toContain("slack_app_token"); + }); + + test("includes slack_gateway_signing_secret (audit F1, HTTP receiver)", () => { + expect(AllowedSecretNamesMirror).toContain("slack_gateway_signing_secret"); + }); + + test("matches the SECRET_RESPONSES test fixture set", () => { + // SECRET_RESPONSES is the test fixture for makeSecretFetcher; its + // keys must equal AllowedSecretNamesMirror. A drift here means the + // production code can fetch a name the test fixture rejects (or + // vice versa), and the audit-F1 fail-loud guard breaks down. + const fixtureKeys = Object.keys(SECRET_RESPONSES).sort(); + const mirror = [...AllowedSecretNamesMirror].sort(); + expect(fixtureKeys).toEqual(mirror); + }); + + test("entries are frozen (Object.freeze) so a runtime mutation is loud", () => { + expect(Object.isFrozen(AllowedSecretNamesMirror)).toBe(true); + }); +}); diff --git a/src/channels/__tests__/slack-http-receiver.test.ts b/src/channels/__tests__/slack-http-receiver.test.ts index bc7672b3..108e6a90 100644 --- a/src/channels/__tests__/slack-http-receiver.test.ts +++ b/src/channels/__tests__/slack-http-receiver.test.ts @@ -66,8 +66,19 @@ const MockApp = mock((opts: { receiver?: { init?: (app: unknown) => void } }) => return app; }); +// Phase 8a: SlackChannel (Socket Mode) now imports SocketModeReceiver from +// @slack/bolt. Even though this test exercises only the HTTP receiver, the +// module-mock layer is process-scoped under bun: a partial mock here +// shadows the real export for any other test file that loads slack.ts +// later. We mock SocketModeReceiver as a no-op constructor so the cross- +// suite loader stays consistent. +const mockSocketModeReceiver = mock(() => ({ + client: { on: () => {} }, +})); + mock.module("@slack/bolt", () => ({ App: MockApp, + SocketModeReceiver: mockSocketModeReceiver, })); // Import the channel AFTER the module mock so the constructor uses our doubles. diff --git a/src/channels/__tests__/slack-metrics.test.ts b/src/channels/__tests__/slack-metrics.test.ts new file mode 100644 index 00000000..fe9de061 --- /dev/null +++ b/src/channels/__tests__/slack-metrics.test.ts @@ -0,0 +1,412 @@ +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; +import promClient from "prom-client"; +import { NoopSlackMetrics, SlackMetrics, type SlackSocketState } from "../slack-metrics.ts"; + +// Mock the Slack Bolt SDK so importing SlackChannel below does not pull +// the real WebSocket client at module load. We re-use the same pattern +// the other Slack channel tests use; SocketModeReceiver returns a stub +// whose `client.on(...)` records listeners into a per-test map so we can +// fire lifecycle events synthetically. +type SocketEventHandler = (...args: unknown[]) => void; +const socketClientListeners = new Map(); +function fireSocketEvent(event: string, ...args: unknown[]): void { + for (const fn of socketClientListeners.get(event) ?? []) { + fn(...args); + } +} +function resetSocketListeners(): void { + socketClientListeners.clear(); +} + +const MockSocketModeReceiver = mock(() => ({ + client: { + on: (event: string, listener: SocketEventHandler) => { + const list = socketClientListeners.get(event) ?? []; + list.push(listener); + socketClientListeners.set(event, list); + }, + }, +})); + +type AppMiddleware = (args: { body: unknown; next: () => Promise }) => Promise; +const middlewares: AppMiddleware[] = []; + +const MockApp = mock(() => ({ + start: () => Promise.resolve(), + stop: () => Promise.resolve(), + event: () => {}, + action: () => {}, + use: (m: AppMiddleware) => { + middlewares.push(m); + }, + client: { + auth: { test: () => Promise.resolve({ user_id: "U_BOT" }) }, + chat: { + postMessage: () => Promise.resolve({ ts: "1.0" }), + update: () => Promise.resolve({ ok: true }), + }, + conversations: { open: () => Promise.resolve({ channel: { id: "D1" } }) }, + reactions: { + add: () => Promise.resolve({ ok: true }), + remove: () => Promise.resolve({ ok: true }), + }, + }, +})); + +mock.module("@slack/bolt", () => ({ + App: MockApp, + SocketModeReceiver: MockSocketModeReceiver, +})); + +const { SlackChannel } = await import("../slack.ts"); +const { extractEventType } = await import("../slack.ts"); + +const ALL_STATES: SlackSocketState[] = [ + "connecting", + "authenticated", + "connected", + "reconnecting", + "disconnecting", + "disconnected", + "error", +]; + +/** + * Helper: parse the prom-client text-format dump and return the value of + * the labeled gauge series with the given state. The text format is + * `{label="value"} ` per line; histograms expand to + * `_bucket`, `_sum`, `_count`, which we ignore here. + */ +function readGauge(text: string, metric: string, state: string): number | null { + const re = new RegExp(`^${metric}\\{state="${state}"\\}\\s+(\\S+)`, "m"); + const m = re.exec(text); + if (!m) return null; + return Number.parseFloat(m[1] ?? ""); +} + +function readCounter(text: string, metric: string): number | null { + const re = new RegExp(`^${metric}(?:\\{[^}]*\\})?\\s+(\\S+)`, "m"); + const m = re.exec(text); + if (!m) return null; + return Number.parseFloat(m[1] ?? ""); +} + +function readHistogramCount(text: string, metric: string, labelMatch?: string): number | null { + const labelClause = labelMatch ? `\\{${labelMatch}\\}` : "(?:\\{[^}]*\\})?"; + const re = new RegExp(`^${metric}_count${labelClause}\\s+(\\S+)`, "m"); + const m = re.exec(text); + if (!m) return null; + return Number.parseFloat(m[1] ?? ""); +} + +describe("SlackMetrics", () => { + let registry: promClient.Registry; + let metrics: SlackMetrics; + + beforeEach(() => { + registry = new promClient.Registry(); + metrics = new SlackMetrics(registry); + resetSocketListeners(); + middlewares.length = 0; + }); + + afterEach(() => { + registry.clear(); + }); + + test("registers all four metric families on construction", async () => { + const text = await registry.metrics(); + expect(text).toContain("phantom_slack_socket_state"); + expect(text).toContain("phantom_slack_socket_reconnects_total"); + expect(text).toContain("phantom_slack_socket_connection_seconds"); + expect(text).toContain("phantom_slack_event_dispatch_seconds"); + }); + + test("initializes every state series at 0 before any lifecycle event", async () => { + const text = await registry.metrics(); + for (const s of ALL_STATES) { + expect(readGauge(text, "phantom_slack_socket_state", s)).toBe(0); + } + }); + + test("recordState(connected) sets connected=1 and others=0", async () => { + metrics.recordState("connected"); + const text = await registry.metrics(); + expect(readGauge(text, "phantom_slack_socket_state", "connected")).toBe(1); + for (const s of ALL_STATES.filter((x) => x !== "connected")) { + expect(readGauge(text, "phantom_slack_socket_state", s)).toBe(0); + } + }); + + test("recordState transitions overwrite the previous state", async () => { + metrics.recordState("connecting"); + metrics.recordState("authenticated"); + metrics.recordState("connected"); + metrics.recordState("disconnected"); + const text = await registry.metrics(); + expect(readGauge(text, "phantom_slack_socket_state", "disconnected")).toBe(1); + expect(readGauge(text, "phantom_slack_socket_state", "connected")).toBe(0); + expect(readGauge(text, "phantom_slack_socket_state", "authenticated")).toBe(0); + expect(readGauge(text, "phantom_slack_socket_state", "connecting")).toBe(0); + }); + + test("recordState handles the error pseudo-state", async () => { + metrics.recordState("error"); + const text = await registry.metrics(); + expect(readGauge(text, "phantom_slack_socket_state", "error")).toBe(1); + expect(readGauge(text, "phantom_slack_socket_state", "connected")).toBe(0); + }); + + test("recordReconnect increments the counter", async () => { + metrics.recordReconnect(); + metrics.recordReconnect(); + metrics.recordReconnect(); + const text = await registry.metrics(); + expect(readCounter(text, "phantom_slack_socket_reconnects_total")).toBe(3); + }); + + test("observeConnectionDuration records a histogram observation", async () => { + metrics.observeConnectionDuration(42); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_socket_connection_seconds")).toBe(1); + }); + + test("observeConnectionDuration silently drops negative input", async () => { + metrics.observeConnectionDuration(-5); + metrics.observeConnectionDuration(Number.NaN); + metrics.observeConnectionDuration(Number.POSITIVE_INFINITY); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_socket_connection_seconds")).toBe(0); + }); + + test("observeEventDispatch labels by event type", async () => { + metrics.observeEventDispatch("app_mention", 0.123); + metrics.observeEventDispatch("app_mention", 0.456); + metrics.observeEventDispatch("message", 0.7); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_event_dispatch_seconds", `event_type="app_mention"`)).toBe(2); + expect(readHistogramCount(text, "phantom_slack_event_dispatch_seconds", `event_type="message"`)).toBe(1); + }); +}); + +describe("NoopSlackMetrics", () => { + test("every method is a no-op (does not throw on any input)", () => { + const noop = new NoopSlackMetrics(); + expect(() => noop.recordState("connected")).not.toThrow(); + expect(() => noop.recordReconnect()).not.toThrow(); + expect(() => noop.observeConnectionDuration(123)).not.toThrow(); + expect(() => noop.observeConnectionDuration(-5)).not.toThrow(); + expect(() => noop.observeEventDispatch("event", 0.5)).not.toThrow(); + }); +}); + +describe("extractEventType", () => { + test("returns body.event.type when present", () => { + expect(extractEventType({ event: { type: "app_mention" } })).toBe("app_mention"); + expect(extractEventType({ event: { type: "message" } })).toBe("message"); + }); + + test("falls back to body.type for envelopes without an inner event", () => { + expect(extractEventType({ type: "block_actions" })).toBe("block_actions"); + expect(extractEventType({ type: "view_submission" })).toBe("view_submission"); + }); + + test("body.event.type wins over body.type", () => { + expect(extractEventType({ type: "envelope", event: { type: "reaction_added" } })).toBe("reaction_added"); + }); + + test("returns 'unknown' for missing or non-string types", () => { + expect(extractEventType({})).toBe("unknown"); + expect(extractEventType({ type: 42 })).toBe("unknown"); + expect(extractEventType({ event: {} })).toBe("unknown"); + expect(extractEventType({ event: null })).toBe("unknown"); + expect(extractEventType(null)).toBe("unknown"); + expect(extractEventType(undefined)).toBe("unknown"); + expect(extractEventType("string")).toBe("unknown"); + }); +}); + +// SlackChannel + SlackMetrics integration tests. These verify the wiring +// between the Bolt SocketModeReceiver lifecycle events and the metrics +// emitter, which is the production failure surface (a hooked-but-broken +// emitter ships zero metrics and the fleet view goes silent). +describe("SlackChannel lifecycle hooks emit metrics", () => { + let registry: promClient.Registry; + let metrics: SlackMetrics; + + beforeEach(() => { + registry = new promClient.Registry(); + metrics = new SlackMetrics(registry); + resetSocketListeners(); + middlewares.length = 0; + }); + + afterEach(() => { + registry.clear(); + }); + + test("constructing SlackChannel attaches all six lifecycle listeners", () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + // Bolt's underlying SocketModeClient emits these six lifecycle events + // (verbatim from @slack/socket-mode/dist/src/SocketModeClient.js State enum). + const expected = ["connecting", "authenticated", "connected", "reconnecting", "disconnecting", "disconnected"]; + for (const e of expected) { + expect(socketClientListeners.has(e)).toBe(true); + expect((socketClientListeners.get(e) ?? []).length).toBeGreaterThanOrEqual(1); + } + }); + + test("constructor pre-emits 'disconnected' so /metrics scrapes are complete pre-connect", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + const text = await registry.metrics(); + expect(readGauge(text, "phantom_slack_socket_state", "disconnected")).toBe(1); + expect(readGauge(text, "phantom_slack_socket_state", "connected")).toBe(0); + }); + + test("firing the 'connected' event flips the gauge", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + fireSocketEvent("connected"); + const text = await registry.metrics(); + expect(readGauge(text, "phantom_slack_socket_state", "connected")).toBe(1); + expect(readGauge(text, "phantom_slack_socket_state", "disconnected")).toBe(0); + }); + + test("'reconnecting' event increments the reconnect counter and observes connection duration", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + fireSocketEvent("connected"); + // Simulate some time elapsing before the reconnect; the value + // observed must be >= 0 even with a sub-millisecond gap. + await new Promise((r) => setTimeout(r, 5)); + fireSocketEvent("reconnecting"); + const text = await registry.metrics(); + expect(readCounter(text, "phantom_slack_socket_reconnects_total")).toBe(1); + expect(readHistogramCount(text, "phantom_slack_socket_connection_seconds")).toBe(1); + expect(readGauge(text, "phantom_slack_socket_state", "reconnecting")).toBe(1); + }); + + test("'disconnected' after 'connected' observes connection duration", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + fireSocketEvent("connected"); + await new Promise((r) => setTimeout(r, 5)); + fireSocketEvent("disconnected"); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_socket_connection_seconds")).toBe(1); + expect(readGauge(text, "phantom_slack_socket_state", "disconnected")).toBe(1); + }); + + test("'disconnected' without a prior 'connected' does NOT observe a duration", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + // Disconnect before ever connecting (e.g. authentication fails before + // the WSS handshake completes). + fireSocketEvent("disconnected"); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_socket_connection_seconds")).toBe(0); + }); + + test("the dispatch middleware times event handler completion", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + // The SlackChannel registers exactly one global middleware on construction. + expect(middlewares.length).toBe(1); + const mw = middlewares[0]; + if (!mw) throw new Error("middleware not registered"); + await mw({ + body: { event: { type: "app_mention" } }, + next: async () => { + await new Promise((r) => setTimeout(r, 3)); + }, + }); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_event_dispatch_seconds", `event_type="app_mention"`)).toBe(1); + }); + + test("the dispatch middleware records 'unknown' for unclassified payloads", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + const mw = middlewares[0]; + if (!mw) throw new Error("middleware not registered"); + await mw({ body: {}, next: async () => {} }); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_event_dispatch_seconds", `event_type="unknown"`)).toBe(1); + }); + + test("the dispatch middleware still records on handler throw", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + const mw = middlewares[0]; + if (!mw) throw new Error("middleware not registered"); + await expect( + mw({ + body: { event: { type: "message" } }, + next: async () => { + throw new Error("handler exploded"); + }, + }), + ).rejects.toThrow(/handler exploded/); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_event_dispatch_seconds", `event_type="message"`)).toBe(1); + }); + + test("omitting the metrics emitter falls back to NoopSlackMetrics without throwing", () => { + // Construction must succeed even when no emitter is wired (e.g. tests + // that exercise routing in isolation). + expect( + () => + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + }), + ).not.toThrow(); + }); +}); diff --git a/src/channels/__tests__/slack.test.ts b/src/channels/__tests__/slack.test.ts index c9399660..41871c04 100644 --- a/src/channels/__tests__/slack.test.ts +++ b/src/channels/__tests__/slack.test.ts @@ -12,8 +12,31 @@ const mockReactionsRemove = mock(() => Promise.resolve({ ok: true })); const mockConversationsOpen = mock(() => Promise.resolve({ channel: { id: "D_REJECT_DM" } })); type EventHandler = (...args: unknown[]) => Promise; +type SocketEventHandler = (...args: unknown[]) => void; const eventHandlers = new Map(); const actionHandlers = new Map(); +type AppMiddleware = (args: { body: unknown; next: () => Promise }) => Promise; +const middlewares: AppMiddleware[] = []; + +// Per-test capture of the SocketModeReceiver lifecycle listeners. The +// SlackChannel constructor attaches these via `receiver.client.on(...)`. +// Tests assert against this map directly to verify metric emission. +const socketClientListeners = new Map(); + +// Resettable per beforeEach so listeners do not leak across cases. +function resetSocketListeners(): void { + socketClientListeners.clear(); +} + +const MockSocketModeReceiver = mock(() => ({ + client: { + on: (event: string, listener: SocketEventHandler) => { + const list = socketClientListeners.get(event) ?? []; + list.push(listener); + socketClientListeners.set(event, list); + }, + }, +})); const MockApp = mock(() => ({ start: mockStart, @@ -25,6 +48,9 @@ const MockApp = mock(() => ({ const key = pattern instanceof RegExp ? pattern.source : pattern; actionHandlers.set(key, handler); }, + use: (m: AppMiddleware) => { + middlewares.push(m); + }, client: { auth: { test: mockAuthTest }, chat: { @@ -44,6 +70,7 @@ const MockApp = mock(() => ({ // Replace the import with our mock mock.module("@slack/bolt", () => ({ App: MockApp, + SocketModeReceiver: MockSocketModeReceiver, })); const testConfig: SlackChannelConfig = { @@ -60,6 +87,8 @@ describe("SlackChannel", () => { beforeEach(() => { eventHandlers.clear(); actionHandlers.clear(); + middlewares.length = 0; + resetSocketListeners(); mockStart.mockClear(); mockStop.mockClear(); mockAuthTest.mockClear(); @@ -414,6 +443,8 @@ describe("SlackChannel owner access control", () => { beforeEach(() => { eventHandlers.clear(); actionHandlers.clear(); + middlewares.length = 0; + resetSocketListeners(); mockStart.mockClear(); mockStop.mockClear(); mockAuthTest.mockClear(); diff --git a/src/channels/slack-channel-factory.ts b/src/channels/slack-channel-factory.ts index 7a941ff8..3a18caee 100644 --- a/src/channels/slack-channel-factory.ts +++ b/src/channels/slack-channel-factory.ts @@ -7,11 +7,45 @@ import { DEFAULT_METADATA_BASE_URL, MetadataIdentityFetcher, type SlackIdentity import { MetadataSecretFetcher } from "../config/metadata-fetcher.ts"; import type { ChannelsConfig } from "../config/schemas.ts"; import { SlackHttpChannel } from "./slack-http-receiver.ts"; +import type { SlackMetricsEmitter } from "./slack-metrics.ts"; import type { SlackTransport } from "./slack-transport.ts"; import { SlackChannel } from "./slack.ts"; export type SlackTransportMode = "socket" | "http"; +/** + * Cross-repo invariant: every Slack-related secret name fetched from + * phantomd's metadata gateway MUST appear in phantomd's + * `internal/secrets/types.go` `AllowedSecretNames` map. The gateway maps a + * non-allowed name to ErrInvalidName -> HTTP 404 (name-enumeration defense), + * so a drift between this list and the phantomd allowlist breaks tenant + * boot with no actionable error in the in-VM Phantom logs. + * + * The list below is the authoritative phantom-side mirror. Adding a Slack + * secret to this array implies a matching addition in phantomd's + * `AllowedSecretNames`; removing one requires that no callsite in this file + * still fetches it. The mirror is exported (vs. private to this module) so + * the factory test fixture can pin the same constant against its + * `SECRET_RESPONSES` allowlist; future re-orderings won't silently desync. + * + * Phase 8a addition (R7 dated 2026-04-30): `slack_app_token` joins the + * pair, gating the Socket Mode WSS dial for self-installed agent #2+ + * tenants. The corresponding phantomd entry landed in PR #28 + * (TestIsAllowedName_AcceptsSlackAppToken pins the symmetric assertion). + * + * Audit Finding 1 (cross-repo audit, dated 2026-04-25): the + * `slack_gateway_signing_secret` entry pairs with phantom-slack-events' + * outbound HMAC that the SlackHttpChannel verifies on every forwarded + * event. Distinct from the Slack-issued `signing_secret` which lives in + * phantom-slack-events' TOML config and never traverses the metadata + * gateway. + */ +export const AllowedSecretNamesMirror = Object.freeze([ + "slack_bot_token", + "slack_app_token", + "slack_gateway_signing_secret", +] as const); + export type CreateSlackChannelInput = { transport: SlackTransportMode; channelsConfig: ChannelsConfig | null; @@ -21,6 +55,13 @@ export type CreateSlackChannelInput = { // factory constructs the link-local default fetchers. identityFetcher?: { get(): Promise<{ slack?: SlackIdentity }> }; secretsFetcher?: { get(name: string): Promise }; + /** + * Phase 8a metrics emitter. Forwarded to the Socket Mode `SlackChannel` + * so its lifecycle hooks update the four metric series exposed on + * `/metrics`. The HTTP receiver does not consume metrics in this PR; a + * follow-up generalizes the surface (Phase 17). + */ + metrics?: SlackMetricsEmitter; }; /** @@ -42,6 +83,7 @@ export async function createSlackChannel(input: CreateSlackChannelInput): Promis appToken: sc.app_token, defaultChannelId: sc.default_channel_id, ownerUserId: sc.owner_user_id, + metrics: input.metrics, }); } @@ -56,14 +98,17 @@ export async function createSlackChannel(input: CreateSlackChannelInput): Promis "Run the OAuth flow via phantom-control or revert to SLACK_TRANSPORT=socket.", ); } - // Cross-repo invariant (audit Finding 1, dated 2026-04-25): the names - // "slack_bot_token" and "slack_gateway_signing_secret" must appear in - // phantomd's internal/secrets/types.go AllowedSecretNames map. Drift on - // either side breaks SLACK_TRANSPORT=http boot with HTTP 404 (the + // Cross-repo invariant: see AllowedSecretNamesMirror at the top of this + // file. The HTTP path consumes "slack_bot_token" + + // "slack_gateway_signing_secret"; the Socket Mode path additionally + // consumes "slack_app_token" via channels.yaml (R7 §6.4 will move it + // onto the gateway in Phase 8b). Drift between this file and + // phantomd's AllowedSecretNames breaks tenant boot with HTTP 404 (the // gateway maps ErrInvalidName to 404 to avoid name enumeration). The // regression is pinned by slack-channel-factory.test.ts's name-aware // mock, which throws on any unexpected name; phantomd pins the same - // contract via TestIsAllowedName_AcceptsSlackGatewaySigningSecret. + // contract via TestIsAllowedName_Accepts{SlackGatewaySigningSecret, + // SlackAppToken}. const [botToken, signingSecret] = await Promise.all([ secFetcher.get("slack_bot_token"), secFetcher.get("slack_gateway_signing_secret"), diff --git a/src/channels/slack-metrics.ts b/src/channels/slack-metrics.ts new file mode 100644 index 00000000..79c20396 --- /dev/null +++ b/src/channels/slack-metrics.ts @@ -0,0 +1,199 @@ +// Phase 8a: Slack Socket Mode lifecycle observability. +// +// We expose four Prometheus metrics over `/metrics`: +// +// - `phantom_slack_socket_state` (gauge, label: `state`). One time series per +// state (`connecting`, `authenticated`, `connected`, `reconnecting`, +// `disconnecting`, `disconnected`). At any instant exactly one series is +// 1.0; the rest are 0.0. A sustained gap in `connected=1` is the signal +// "this tenant is offline"; alerting watches `1 - max_over_time(...)` over +// a 1-minute window. +// - `phantom_slack_socket_reconnects_total` (counter). Incremented every +// time the underlying SocketModeClient emits `reconnecting`. Bolt's +// auto-reconnect is on by default; this counter measures "the network +// wobbled" rather than "we lost the tenant". Alerting fires when the rate +// exceeds a per-minute threshold tuned to Slack-side hiccups (5/min is +// sustained pain; 1-2/min is normal weather). +// - `phantom_slack_socket_connection_seconds` (histogram). Time from +// `connected` to the next `disconnected`, observed at disconnect. Useful +// for the long-tail SLO: 99.9% of connections should hold for >1 hour. +// - `phantom_slack_event_dispatch_seconds` (histogram, label: `event_type`). +// End-to-end time from `slack_event` arrival to Bolt's `app.use` +// middleware completion. Captures handler latency including the agent's +// own work (LLM calls, memory writes) when triggered from Slack. +// +// The metrics emitter is intentionally per-channel. The in-VM Phantom runs +// one Slack channel; for self-hosters who eventually run multiple workspaces +// in one process we still keep per-channel emitters because the metrics +// names already disambiguate by container/host (Prometheus job + instance). +// +// Cross-repo: the prom-client default registry is NOT used. We own a +// dedicated `slackMetricsRegistry` so future shared-channel metrics (Telegram, +// email, webhook) can each own their own registry, and `core/server.ts` +// merges them at request time. This keeps a buggy second emitter from +// poisoning the global registry with name collisions. + +import promClient, { type Counter, type Gauge, type Histogram, type Registry } from "prom-client"; + +/** + * Lifecycle states emitted by `@slack/socket-mode`'s `SocketModeClient`. + * Authoritative source: the `State` enum in `node_modules/@slack/socket-mode/dist/src/SocketModeClient.js`. + * We translate `unable_to_socket_mode_start` (an unrecoverable Bolt error + * thrown by `start()`) onto an `error` derived state for the gauge so the + * fleet view does not silently flatline when the receiver never connects. + */ +export type SlackSocketState = + | "connecting" + | "authenticated" + | "connected" + | "reconnecting" + | "disconnecting" + | "disconnected" + | "error"; + +const ALL_STATES: SlackSocketState[] = [ + "connecting", + "authenticated", + "connected", + "reconnecting", + "disconnecting", + "disconnected", + "error", +]; + +/** + * Bucket layout for `phantom_slack_socket_connection_seconds`. Spans the + * realistic range from "Slack hiccup, reconnect within seconds" to "the + * connection held for a day". The default prom-client histogram buckets + * (top out at 10s) are useless for a long-running WebSocket; the layout + * here keeps p50/p99 visible at both ends. + */ +const CONNECTION_BUCKETS_SECONDS = [1, 5, 10, 30, 60, 300, 600, 1800, 3600, 21600, 86400]; + +/** + * Bucket layout for `phantom_slack_event_dispatch_seconds`. Slack's ack + * deadline is 3 seconds, so we want fine resolution under 1s and a tail + * to catch handlers that run away (LLM calls can spike to 30s+). + */ +const EVENT_DISPATCH_BUCKETS_SECONDS = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 30]; + +/** + * Public interface implemented by the production emitter (and by test + * doubles in `slack-metrics.test.ts`). The split means `SlackChannel` does + * not depend on prom-client directly; tests can substitute a recording + * stub without spinning up a registry per test case. + */ +export interface SlackMetricsEmitter { + /** Mark the gauge: write 1.0 for `state` and 0.0 for the others. */ + recordState(state: SlackSocketState): void; + /** Bump the reconnect counter on every `reconnecting` event. */ + recordReconnect(): void; + /** Observe a completed connection's lifetime in seconds. */ + observeConnectionDuration(seconds: number): void; + /** Observe one event-handler dispatch in seconds. */ + observeEventDispatch(eventType: string, seconds: number): void; +} + +/** + * Concrete prom-client backed emitter. Owns a `Registry` so the caller can + * expose `register.metrics()` over HTTP without touching prom-client's + * default global registry. Construction is idempotent: calling `create` + * twice with the same registry creates fresh metric handles each time, but + * production uses one process-global instance from `index.ts` boot. + */ +export class SlackMetrics implements SlackMetricsEmitter { + readonly registry: Registry; + private readonly stateGauge: Gauge<"state">; + private readonly reconnectsCounter: Counter; + private readonly connectionHistogram: Histogram; + private readonly eventDispatchHistogram: Histogram<"event_type">; + + constructor(registry?: Registry) { + // Tests pass a fresh Registry per case so positive/negative assertions + // do not leak counter state between tests. Production passes nothing + // and gets a private registry per emitter instance. + this.registry = registry ?? new promClient.Registry(); + + this.stateGauge = new promClient.Gauge({ + name: "phantom_slack_socket_state", + help: "Slack Socket Mode connection state. Exactly one series is 1.0 at any instant; the rest are 0.0.", + labelNames: ["state"] as const, + registers: [this.registry], + }); + + this.reconnectsCounter = new promClient.Counter({ + name: "phantom_slack_socket_reconnects_total", + help: "Count of Slack Socket Mode reconnection attempts since process start.", + registers: [this.registry], + }); + + this.connectionHistogram = new promClient.Histogram({ + name: "phantom_slack_socket_connection_seconds", + help: "Duration of a single Slack Socket Mode connection from connect to disconnect.", + buckets: CONNECTION_BUCKETS_SECONDS, + registers: [this.registry], + }); + + this.eventDispatchHistogram = new promClient.Histogram({ + name: "phantom_slack_event_dispatch_seconds", + help: "End-to-end Slack event dispatch latency from receive to handler completion.", + labelNames: ["event_type"] as const, + buckets: EVENT_DISPATCH_BUCKETS_SECONDS, + registers: [this.registry], + }); + + // Initialize every state series at 0 so a Prometheus scrape during the + // connecting window still has a complete time series matrix. Without + // this, alerts on `slack_socket_state{state="connected"} == 0` fire + // before the first lifecycle event lands. + for (const s of ALL_STATES) { + this.stateGauge.set({ state: s }, 0); + } + } + + recordState(state: SlackSocketState): void { + for (const s of ALL_STATES) { + this.stateGauge.set({ state: s }, s === state ? 1 : 0); + } + } + + recordReconnect(): void { + this.reconnectsCounter.inc(); + } + + observeConnectionDuration(seconds: number): void { + // Defensive guard: clock skew or a misuse pattern (observe() called + // twice for the same connect window) can yield a negative value. + // prom-client throws on negative input; we drop silently so the + // caller does not have to care. + if (seconds < 0 || !Number.isFinite(seconds)) return; + this.connectionHistogram.observe(seconds); + } + + observeEventDispatch(eventType: string, seconds: number): void { + if (seconds < 0 || !Number.isFinite(seconds)) return; + this.eventDispatchHistogram.observe({ event_type: eventType }, seconds); + } +} + +/** + * No-op emitter used when metrics are disabled (e.g. tests that only care + * about routing) or when the channel is constructed outside the boot path + * (e.g. unit tests of egress helpers). The lifecycle hooks still call + * `record*` so the production emitter is wired the same way; the no-op + * avoids guarding every callsite with `?.`. + */ +export class NoopSlackMetrics implements SlackMetricsEmitter { + recordState(_state: SlackSocketState): void { + /* no-op */ + } + recordReconnect(): void { + /* no-op */ + } + observeConnectionDuration(_seconds: number): void { + /* no-op */ + } + observeEventDispatch(_eventType: string, _seconds: number): void { + /* no-op */ + } +} diff --git a/src/channels/slack.ts b/src/channels/slack.ts index ebbaa7d3..457b5387 100644 --- a/src/channels/slack.ts +++ b/src/channels/slack.ts @@ -1,4 +1,4 @@ -import { App, type LogLevel } from "@slack/bolt"; +import { App, type LogLevel, SocketModeReceiver } from "@slack/bolt"; import type { SlackBlock } from "./feedback.ts"; import { registerSlackActions } from "./slack-actions.ts"; import { @@ -12,6 +12,7 @@ import { egressUpdateMessage, egressUpdateWithFeedback, } from "./slack-egress.ts"; +import { NoopSlackMetrics, type SlackMetricsEmitter } from "./slack-metrics.ts"; import type { Channel, ChannelCapabilities, InboundMessage, OutboundMessage, SentMessage } from "./types.ts"; export type SlackChannelConfig = { @@ -20,10 +21,28 @@ export type SlackChannelConfig = { defaultChannelId?: string; ownerUserId?: string; transport?: "socket"; + /** + * Optional Phase 8a metrics emitter. When omitted, a `NoopSlackMetrics` + * is used so the lifecycle hooks fire the same way in test paths that + * don't care about Prometheus output. Production wires the shared + * `SlackMetrics` instance from `core/server.ts`. + */ + metrics?: SlackMetricsEmitter; }; type ConnectionState = "disconnected" | "connecting" | "connected" | "error"; +/** + * Minimal shape of the underlying `@slack/socket-mode` `SocketModeClient` + * we hook for lifecycle metrics. Bolt's `SocketModeReceiver.client` is + * publicly typed (`receivers/SocketModeReceiver.d.ts:55`), but we depend + * only on the EventEmitter interface here so test mocks do not need to + * carry the full SDK surface. + */ +type SocketModeClientLike = { + on(event: string, listener: (...args: unknown[]) => void): unknown; +}; + type ReactionHandler = (event: { reaction: string; userId: string; @@ -45,6 +64,7 @@ export class SlackChannel implements Channel { }; private app: App; + private receiver: SocketModeReceiver; private messageHandler: ((message: InboundMessage) => Promise) | null = null; private reactionHandler: ReactionHandler | null = null; private connectionState: ConnectionState = "disconnected"; @@ -52,19 +72,109 @@ export class SlackChannel implements Channel { private ownerUserId: string | null; private phantomName: string; private rejectedUsers = new Set(); + private readonly metrics: SlackMetricsEmitter; + /** + * Wall-clock millis when the most recent `connected` event fired. Used to + * observe `phantom_slack_socket_connection_seconds` on the next + * `disconnected` event. Null when no connection is currently up. + */ + private connectedAtMs: number | null = null; constructor(config: SlackChannelConfig) { if (config.transport && config.transport !== "socket") { throw new Error("SlackChannel only supports Socket Mode. Use SlackHttpChannel for HTTP receiver mode."); } + // Construct the receiver explicitly (vs. `socketMode: true` shorthand) + // so we can hook the underlying `SocketModeClient` lifecycle events for + // the Phase 8a metrics surface. The receiver type publicly exposes + // `client: SocketModeClient` (Bolt 4.6 SocketModeReceiver.d.ts:55), so + // the cast below is type-safe at the package boundary; we narrow to a + // minimal `SocketModeClientLike` to keep test mocks cheap. + this.receiver = new SocketModeReceiver({ + appToken: config.appToken, + logLevel: "ERROR" as LogLevel, + }); this.app = new App({ token: config.botToken, - socketMode: true, - appToken: config.appToken, + receiver: this.receiver, logLevel: "ERROR" as LogLevel, }); + this.metrics = config.metrics ?? new NoopSlackMetrics(); this.ownerUserId = config.ownerUserId ?? null; this.phantomName = "Phantom"; + + // Initialize the gauge to `disconnected` so a Prometheus scrape that + // lands before `connect()` returns sees a complete time series matrix. + this.metrics.recordState("disconnected"); + + this.attachLifecycleHooks(); + this.attachDispatchTimer(); + } + + /** + * Subscribe the metrics emitter to every lifecycle event the underlying + * `SocketModeClient` emits. The state enum is verbatim from + * `@slack/socket-mode/dist/src/SocketModeClient.js` (the State enum). + * `unable_to_socket_mode_start` is NOT an emitted event in the current + * SDK; that path surfaces as a thrown `UnrecoverableSocketModeStartError` + * from `start()` which we map to the `error` state in `connect()`. + */ + private attachLifecycleHooks(): void { + const client = this.receiver.client as unknown as SocketModeClientLike; + + client.on("connecting", () => { + this.metrics.recordState("connecting"); + }); + client.on("authenticated", () => { + this.metrics.recordState("authenticated"); + }); + client.on("connected", () => { + this.connectedAtMs = Date.now(); + this.metrics.recordState("connected"); + }); + client.on("reconnecting", () => { + this.metrics.recordReconnect(); + this.metrics.recordState("reconnecting"); + // Reconnect implies the prior connection just dropped; observe its + // lifetime if we have one in flight. + this.observeConnectionLifetime(); + }); + client.on("disconnecting", () => { + this.metrics.recordState("disconnecting"); + }); + client.on("disconnected", () => { + this.metrics.recordState("disconnected"); + this.observeConnectionLifetime(); + }); + } + + /** + * Wraps Bolt's global middleware to time end-to-end event dispatch. The + * `next()` await returns when the matching listener (or the absence of + * one) has finished, including any async work the user's handler does. + * Captures the high-level event type from `body.event.type` when + * available; falls back to the envelope `body.type` for slash commands + * and interactive payloads, then to `unknown` for shapes Bolt did not + * route to a discriminated handler. + */ + private attachDispatchTimer(): void { + this.app.use(async ({ body, next }) => { + const startMs = Date.now(); + const eventType = extractEventType(body); + try { + await next(); + } finally { + const seconds = (Date.now() - startMs) / 1000; + this.metrics.observeEventDispatch(eventType, seconds); + } + }); + } + + private observeConnectionLifetime(): void { + if (this.connectedAtMs == null) return; + const seconds = (Date.now() - this.connectedAtMs) / 1000; + this.metrics.observeConnectionDuration(seconds); + this.connectedAtMs = null; } setPhantomName(name: string): void { @@ -126,6 +236,12 @@ export class SlackChannel implements Channel { console.log("[slack] Socket Mode connected"); } catch (err: unknown) { this.connectionState = "error"; + // `app.start()` throws `UnrecoverableSocketModeStartError` for the + // auth-revoked / invalid-app-token / connections:write-missing + // failure cases. Mark the gauge so the fleet view reflects the + // down tenant even though no `disconnected` event ever fired + // (the SocketModeClient never made it past handshake). + this.metrics.recordState("error"); const msg = err instanceof Error ? err.message : String(err); console.error(`[slack] Failed to connect: ${msg}`); throw err; @@ -327,3 +443,28 @@ export class SlackChannel implements Channel { function buildConversationId(channel: string, threadTs: string): string { return `slack:${channel}:${threadTs}`; } + +/** + * Pull a stable, low-cardinality string out of Bolt's middleware `body` so + * the `event_type` label on `phantom_slack_event_dispatch_seconds` does + * not blow up cardinality. Order: + * 1. `body.event.type` (the event-API envelope, e.g. `app_mention`, + * `message`, `reaction_added`). + * 2. `body.type` (slash-commands carry `command`, interactivity carries + * `block_actions` / `view_submission`). + * 3. `unknown` fallback so a payload Bolt has not classified still gets + * a series. + */ +export function extractEventType(body: unknown): string { + if (!body || typeof body !== "object") return "unknown"; + const b = body as Record; + const event = b.event; + if (event && typeof event === "object") { + const ev = event as Record; + const t = ev.type; + if (typeof t === "string" && t.length > 0) return t; + } + const top = b.type; + if (typeof top === "string" && top.length > 0) return top; + return "unknown"; +} diff --git a/src/core/server.ts b/src/core/server.ts index 7e363fc3..98fe2e1b 100644 --- a/src/core/server.ts +++ b/src/core/server.ts @@ -26,6 +26,19 @@ type OnboardingStatusProvider = () => string; type WebhookHandler = (req: Request) => Promise; type PeerHealthProvider = () => Record; type SchedulerHealthProvider = () => SchedulerHealthSummary | null; +/** + * Phase 8a: provider-shape for the Prometheus registry that backs + * `/metrics`. The provider returns an object with the prom-client `Registry` + * surface we depend on (`metrics()` for the text-format dump, + * `contentType` for the response header). Keeping the surface minimal + * means future emitters (Telegram, email) can plug in without taking a + * direct dependency on prom-client at this layer. + */ +type MetricsRegistryLike = { + metrics(): Promise; + contentType: string; +}; +type MetricsRegistryProvider = () => MetricsRegistryLike | null; type TriggerDeps = { runtime: AgentRuntime; slackChannel?: SlackTransport; @@ -41,6 +54,7 @@ let onboardingStatusProvider: OnboardingStatusProvider | null = null; let webhookHandler: WebhookHandler | null = null; let peerHealthProvider: PeerHealthProvider | null = null; let schedulerHealthProvider: SchedulerHealthProvider | null = null; +let metricsRegistryProvider: MetricsRegistryProvider | null = null; let triggerDeps: TriggerDeps | null = null; let chatHandler: ChatHandler | null = null; @@ -80,6 +94,16 @@ export function setSchedulerHealthProvider(provider: SchedulerHealthProvider): v schedulerHealthProvider = provider; } +/** + * Phase 8a: register the Prometheus registry that backs `/metrics`. Wired + * from `index.ts` boot with the `SlackMetrics.registry`. Returns null from + * the provider to disable the route entirely (e.g. tests that do not + * exercise the metrics path). + */ +export function setMetricsRegistryProvider(provider: MetricsRegistryProvider): void { + metricsRegistryProvider = provider; +} + export function setTriggerDeps(deps: TriggerDeps): void { triggerDeps = deps; } @@ -155,6 +179,28 @@ export function startServer(config: PhantomConfig, startedAt: number): ReturnTyp return Response.json(payload); } + // Phase 8a: Prometheus metrics surface. Unauthenticated by design, + // matching the existing `/health` precedent: this server's tenant + // isolation comes from the per-tenant URL behind Caddy, not from + // per-route auth. Returns 503 when no registry is wired (the + // process started without a metrics provider). + if (url.pathname === "/metrics" && req.method === "GET") { + const registry = metricsRegistryProvider?.(); + if (!registry) { + return new Response("metrics registry not configured", { + status: 503, + headers: { "Content-Type": "text/plain; charset=utf-8" }, + }); + } + const body = await registry.metrics(); + return new Response(body, { + headers: { + "Content-Type": registry.contentType, + "Cache-Control": "no-store", + }, + }); + } + if (url.pathname === "/mcp") { const mcpServer = mcpServerProvider?.(); if (!mcpServer) { diff --git a/src/index.ts b/src/index.ts index bfd54b16..39f25474 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,6 +15,7 @@ import { setActionFollowUpHandler } from "./channels/slack-actions.ts"; import { createSlackChannel, readSlackTransportFromEnv } from "./channels/slack-channel-factory.ts"; import { SlackHttpChannel } from "./channels/slack-http-receiver.ts"; import { setSlackHttpChannelProvider } from "./channels/slack-http-routes.ts"; +import { SlackMetrics } from "./channels/slack-metrics.ts"; import type { SlackTransport } from "./channels/slack-transport.ts"; import { createStatusReactionController } from "./channels/status-reactions.ts"; import { TelegramChannel } from "./channels/telegram.ts"; @@ -28,6 +29,7 @@ import { setEvolutionVersionProvider, setMcpServerProvider, setMemoryHealthProvider, + setMetricsRegistryProvider, setOnboardingStatusProvider, setPeerHealthProvider, setRoleInfoProvider, @@ -314,10 +316,19 @@ async function main(): Promise { // factory throws so a mis-provisioned tenant fails loudly. const channelsConfig = loadChannelsConfig(); const slackTransport = readSlackTransportFromEnv(); + // Phase 8a: a process-global SlackMetrics owns its prom-client Registry. + // `core/server.ts` exposes `slackMetrics.registry` at /metrics. We + // construct the emitter unconditionally (even when no Slack channel is + // configured) so the /metrics endpoint always emits the zero-state + // matrix; this keeps Prometheus scrape parity across multi-tenant pools + // and prevents alert flapping when Phantoms boot/reboot. + const slackMetrics = new SlackMetrics(); + setMetricsRegistryProvider(() => slackMetrics.registry); const slackChannel: SlackTransport | null = await createSlackChannel({ transport: slackTransport, channelsConfig, metadataBaseUrl: process.env.METADATA_BASE_URL, + metrics: slackMetrics, }); if (slackChannel) {