diff --git a/CLAUDE.md b/CLAUDE.md index afcea5a1..46d11235 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -84,7 +84,9 @@ src/ email-login.ts # Magic link email delivery notifications/ # Web Push (VAPID keys, subscriptions, triggers) channels/ - slack.ts # Slack Socket Mode (primary channel, owner access control) + slack.ts # Slack Socket Mode (primary channel, owner access control, lifecycle metrics) + slack-metrics.ts # Phase 8a: prom-client metrics for Socket Mode lifecycle + dispatch + slack-channel-factory.ts # Routes between Socket Mode + HTTP receiver; AllowedSecretNamesMirror lives here telegram.ts # Telegram via Telegraf email.ts # IMAP/SMTP via ImapFlow + Nodemailer webhook.ts # HTTP webhooks with HMAC-SHA256 @@ -132,7 +134,7 @@ src/ tools.ts # phantom_create_page, phantom_generate_login login-page.ts # Login page HTML core/ - server.ts # Bun.serve() HTTP server, /health, /trigger, /webhook, /ui + server.ts # Bun.serve() HTTP server, /health, /metrics, /trigger, /webhook, /ui db/ schema.ts # SQLite migrations (7 total) connection.ts # Database connection @@ -154,6 +156,21 @@ After each session: EvolutionEngine runs 6-step reflection pipeline -> 5-gate va MCP flow: External client -> /mcp endpoint -> bearer auth -> MCP Server -> tool execution (some route through AgentRuntime for full Opus brain). +## Observability + +The Bun.serve() HTTP server exposes a Prometheus `/metrics` endpoint (Phase 8a, R7 dated 2026-04-30). Unauthenticated, matching the existing `/health` precedent: per-tenant isolation comes from the per-tenant URL behind Caddy, not per-route auth. + +Metric families exposed today (Slack Socket Mode lifecycle): + +- `phantom_slack_socket_state{state="connecting|authenticated|connected|reconnecting|disconnecting|disconnected|error"}` (gauge). Exactly one series is 1.0 at any instant; the rest are 0.0. Alerting watches `1 - max_over_time(phantom_slack_socket_state{state="connected"}[1m]) > 0` for "the tenant is offline". +- `phantom_slack_socket_reconnects_total` (counter). Bolt's auto-reconnect is on by default; this measures "the network wobbled". Alert at sustained >5/min. +- `phantom_slack_socket_connection_seconds` (histogram). Lifetime of a single Socket Mode connection from connect to disconnect. p99 should hold above 1 hour. +- `phantom_slack_event_dispatch_seconds{event_type=...}` (histogram). End-to-end Bolt middleware time. Slack's ack deadline is 3 seconds; alert on p99 > 2.5s. + +The metrics module owns a private `prom-client` Registry (no global registry pollution). Adding more channels (Telegram, email) means adding a sibling registry and merging at request time in `core/server.ts`. Future cross-channel metrics generalize via Phase 17 polish. + +Cross-repo invariant: `slack-channel-factory.ts` exports a frozen `AllowedSecretNamesMirror` array (`slack_bot_token`, `slack_app_token`, `slack_gateway_signing_secret`). The same names MUST appear in phantomd's `internal/secrets/types.go` `AllowedSecretNames` map. Drift breaks tenant boot with HTTP 404 (the gateway maps `ErrInvalidName` to 404 to defeat name enumeration). The factory test pins the mirror against its `SECRET_RESPONSES` test fixture; phantomd's `TestIsAllowedName_AcceptsSlackAppToken` (and the existing `*_AcceptsSlackGatewaySigningSecret`) pin the symmetric assertion. + ## Key Design Decisions **Qdrant over LanceDB:** WAL durability with crash recovery. Native hybrid search (dense + BM25 sparse vectors). Named vectors for separate embedding spaces. Mmap mode for low memory. TypeScript REST client works with Bun (no NAPI addon risk). diff --git a/bun.lock b/bun.lock index 701899b1..a1bbce62 100644 --- a/bun.lock +++ b/bun.lock @@ -14,6 +14,7 @@ "imapflow": "^1.2.18", "nodemailer": "^8.0.4", "playwright": "1.59.1", + "prom-client": "^15.1.3", "resend": "^6.9.4", "telegraf": "^4.16.3", "yaml": "^2.6.0", @@ -97,6 +98,8 @@ "@napi-rs/wasm-runtime": ["@napi-rs/wasm-runtime@1.1.3", "", { "dependencies": { "@tybys/wasm-util": "^0.10.1" }, "peerDependencies": { "@emnapi/core": "^1.7.1", "@emnapi/runtime": "^1.7.1" } }, "sha512-xK9sGVbJWYb08+mTJt3/YV24WxvxpXcXtP6B172paPZ+Ts69Re9dAr7lKwJoeIx8OoeuimEiRZ7umkiUVClmmQ=="], + "@opentelemetry/api": ["@opentelemetry/api@1.9.1", "", {}, "sha512-gLyJlPHPZYdAk1JENA9LeHejZe1Ti77/pTeFm/nMXmQH/HFZlcS/O2XJB+L8fkbrNSqhdtlvjBVjxwUYanNH5Q=="], + "@oxc-project/types": ["@oxc-project/types@0.124.0", "", {}, "sha512-VBFWMTBvHxS11Z5Lvlr3IWgrwhMTXV+Md+EQF0Xf60+wAdsGFTBx7X7K/hP4pi8N7dcm1RvcHwDxZ16Qx8keUg=="], "@pinojs/redact": ["@pinojs/redact@0.4.0", "", {}, "sha512-k2ENnmBugE/rzQfEcdWHcCY+/FM3VLzH9cYEsbdsoqrvzAKRhUZeRNhAZvB8OitQJ1TBed3yqWtdjzS6wJKBwg=="], @@ -205,6 +208,8 @@ "base64-arraybuffer": ["base64-arraybuffer@1.0.2", "", {}, "sha512-I3yl4r9QB5ZRY3XuJVEPfc2XhZO6YweFPI+UovAzn+8/hb3oJ6lnysaFcjVpkCPfVWFUDvoZ8kmVDP7WyRtYtQ=="], + "bintrees": ["bintrees@1.0.2", "", {}, "sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw=="], + "body-parser": ["body-parser@2.2.2", "", { "dependencies": { "bytes": "^3.1.2", "content-type": "^1.0.5", "debug": "^4.4.3", "http-errors": "^2.0.0", "iconv-lite": "^0.7.0", "on-finished": "^2.4.1", "qs": "^6.14.1", "raw-body": "^3.0.1", "type-is": "^2.0.1" } }, "sha512-oP5VkATKlNwcgvxi0vM0p/D3n2C3EReYVX+DNYs5TjZFn/oQt2j+4sVJtSMr18pdRr8wjTcBl6LoV+FUwzPmNA=="], "buffer-alloc": ["buffer-alloc@1.2.0", "", { "dependencies": { "buffer-alloc-unsafe": "^1.1.0", "buffer-fill": "^1.0.0" } }, "sha512-CFsHQgjtW1UChdXgbyJGtnm+O/uLQeZdtbDo8mfUgYXCHSM1wgrVxXm6bSyrUuErEb+4sYVGCzASBRot7zyrow=="], @@ -461,6 +466,8 @@ "process-warning": ["process-warning@5.0.0", "", {}, "sha512-a39t9ApHNx2L4+HBnQKqxxHNs1r7KF+Intd8Q/g1bUh6q0WIp9voPXJ/x0j+ZL45KF1pJd9+q2jLIRMfvEshkA=="], + "prom-client": ["prom-client@15.1.3", "", { "dependencies": { "@opentelemetry/api": "^1.4.0", "tdigest": "^0.1.1" } }, "sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g=="], + "proxy-addr": ["proxy-addr@2.0.7", "", { "dependencies": { "forwarded": "0.2.0", "ipaddr.js": "1.9.1" } }, "sha512-llQsMLSUDUPT44jdrU/O37qlnifitDP+ZwrmmZcoSKyLKvtZxpyV0n2/bD/N4tBAAZ/gJEdZU7KMraoK1+XYAg=="], "proxy-from-env": ["proxy-from-env@1.1.0", "", {}, "sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg=="], @@ -533,6 +540,8 @@ "svix": ["svix@1.86.0", "", { "dependencies": { "standardwebhooks": "1.0.0", "uuid": "^10.0.0" } }, "sha512-/HTvXwjLJe1l/MsLXAO1ddCYxElJk4eNR4DzOjDOEmGrPN/3BtBE8perGwMAaJ2sT5T172VkBYzmHcjUfM1JRQ=="], + "tdigest": ["tdigest@0.1.2", "", { "dependencies": { "bintrees": "1.0.2" } }, "sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA=="], + "telegraf": ["telegraf@4.16.3", "", { "dependencies": { "@telegraf/types": "^7.1.0", "abort-controller": "^3.0.0", "debug": "^4.3.4", "mri": "^1.2.0", "node-fetch": "^2.7.0", "p-timeout": "^4.1.0", "safe-compare": "^1.1.4", "sandwich-stream": "^2.0.2" }, "bin": { "telegraf": "lib/cli.mjs" } }, "sha512-yjEu2NwkHlXu0OARWoNhJlIjX09dRktiMQFsM678BAH/PEPVwctzL67+tvXqLCRQQvm3SDtki2saGO9hLlz68w=="], "thread-stream": ["thread-stream@4.0.0", "", { "dependencies": { "real-require": "^0.2.0" } }, "sha512-4iMVL6HAINXWf1ZKZjIPcz5wYaOdPhtO8ATvZ+Xqp3BTdaqtAwQkNmKORqcIo5YkQqGXq5cwfswDwMqqQNrpJA=="], diff --git a/package.json b/package.json index b1d70192..3a2ef267 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ "imapflow": "^1.2.18", "nodemailer": "^8.0.4", "playwright": "1.59.1", + "prom-client": "^15.1.3", "resend": "^6.9.4", "telegraf": "^4.16.3", "yaml": "^2.6.0", diff --git a/src/channels/__tests__/slack-channel-factory.test.ts b/src/channels/__tests__/slack-channel-factory.test.ts index 91a9e4df..f0503ed1 100644 --- a/src/channels/__tests__/slack-channel-factory.test.ts +++ b/src/channels/__tests__/slack-channel-factory.test.ts @@ -7,6 +7,7 @@ import type { ChannelsConfig } from "../../config/schemas.ts"; const mockApp = mock(() => ({ event: () => {}, action: () => {}, + use: () => {}, client: { auth: { test: () => Promise.resolve({ user_id: "U_BOT" }) }, chat: { postMessage: () => Promise.resolve({ ts: "1.0" }), update: () => Promise.resolve({ ok: true }) }, @@ -21,12 +22,23 @@ const mockReceiver = { stop: () => Promise.resolve(), }; +// Phase 8a: Socket Mode receiver mock. The constructor returns an object +// whose `client.on(...)` is a no-op for routing tests; lifecycle metric +// behaviour is exercised in slack-metrics.test.ts and the SlackChannel +// suite, not here. +const mockSocketModeReceiver = mock(() => ({ + client: { on: () => {} }, +})); + mock.module("@slack/bolt", () => ({ App: mockApp, ExpressReceiver: mock(() => mockReceiver), + SocketModeReceiver: mockSocketModeReceiver, })); -const { createSlackChannel, readSlackTransportFromEnv } = await import("../slack-channel-factory.ts"); +const { createSlackChannel, readSlackTransportFromEnv, AllowedSecretNamesMirror } = await import( + "../slack-channel-factory.ts" +); const { SlackChannel } = await import("../slack.ts"); const { SlackHttpChannel } = await import("../slack-http-receiver.ts"); @@ -48,16 +60,23 @@ const HTTP_IDENTITY = { }, }; -// Cross-repo invariant (audit Finding 1, dated 2026-04-25): the names -// "slack_bot_token" and "slack_gateway_signing_secret" must appear in -// phantomd's internal/secrets/types.go AllowedSecretNames map. Any drift -// between phantom and phantomd breaks SLACK_TRANSPORT=http boot with HTTP -// 404. This map is the SINGLE source of truth for the http-mode tests +// Cross-repo invariant: the names below must appear in phantomd's +// internal/secrets/types.go AllowedSecretNames map. Any drift between +// phantom and phantomd breaks tenant boot with HTTP 404 (the gateway maps +// ErrInvalidName to 404 to avoid name enumeration). +// +// Audit Finding 1 (2026-04-25) added slack_bot_token + slack_gateway_signing_secret. +// Phase 8a (R7 dated 2026-04-30) added slack_app_token for Socket Mode +// self-installed agent #2+ tenants. The phantomd side ships in PR #28 +// (TestIsAllowedName_AcceptsSlackAppToken pins the symmetric assertion). +// +// This fixture is the SINGLE source of truth for the http-mode tests // below; makeSecretFetcher() throws fail-loud on any name not listed // here, so a future production-side rename that misses one repo will // fail this test suite immediately instead of silently shipping a 404. const SECRET_RESPONSES: Record = { slack_bot_token: "xoxb-from-metadata", + slack_app_token: "xapp-1-from-metadata", slack_gateway_signing_secret: "0123456789abcdef".repeat(4), }; @@ -245,3 +264,37 @@ describe("createSlackChannel", () => { await expect(fetcher.get("totally_made_up")).rejects.toThrow(/AllowedSecretNames/); }); }); + +// Phase 8a (R7 2026-04-30): pin the cross-repo invariant for the new +// slack_app_token entry. AllowedSecretNamesMirror is the phantom-side +// authoritative list; phantomd's TestIsAllowedName_AcceptsSlackAppToken is +// the matching assertion in the symmetric position. If a future contributor +// removes the entry on either side without the matching edit, both test +// suites fail-loud. +describe("AllowedSecretNamesMirror", () => { + test("includes slack_bot_token", () => { + expect(AllowedSecretNamesMirror).toContain("slack_bot_token"); + }); + + test("includes slack_app_token (Phase 8a Socket Mode)", () => { + expect(AllowedSecretNamesMirror).toContain("slack_app_token"); + }); + + test("includes slack_gateway_signing_secret (audit F1, HTTP receiver)", () => { + expect(AllowedSecretNamesMirror).toContain("slack_gateway_signing_secret"); + }); + + test("matches the SECRET_RESPONSES test fixture set", () => { + // SECRET_RESPONSES is the test fixture for makeSecretFetcher; its + // keys must equal AllowedSecretNamesMirror. A drift here means the + // production code can fetch a name the test fixture rejects (or + // vice versa), and the audit-F1 fail-loud guard breaks down. + const fixtureKeys = Object.keys(SECRET_RESPONSES).sort(); + const mirror = [...AllowedSecretNamesMirror].sort(); + expect(fixtureKeys).toEqual(mirror); + }); + + test("entries are frozen (Object.freeze) so a runtime mutation is loud", () => { + expect(Object.isFrozen(AllowedSecretNamesMirror)).toBe(true); + }); +}); diff --git a/src/channels/__tests__/slack-http-receiver.test.ts b/src/channels/__tests__/slack-http-receiver.test.ts index bc7672b3..108e6a90 100644 --- a/src/channels/__tests__/slack-http-receiver.test.ts +++ b/src/channels/__tests__/slack-http-receiver.test.ts @@ -66,8 +66,19 @@ const MockApp = mock((opts: { receiver?: { init?: (app: unknown) => void } }) => return app; }); +// Phase 8a: SlackChannel (Socket Mode) now imports SocketModeReceiver from +// @slack/bolt. Even though this test exercises only the HTTP receiver, the +// module-mock layer is process-scoped under bun: a partial mock here +// shadows the real export for any other test file that loads slack.ts +// later. We mock SocketModeReceiver as a no-op constructor so the cross- +// suite loader stays consistent. +const mockSocketModeReceiver = mock(() => ({ + client: { on: () => {} }, +})); + mock.module("@slack/bolt", () => ({ App: MockApp, + SocketModeReceiver: mockSocketModeReceiver, })); // Import the channel AFTER the module mock so the constructor uses our doubles. diff --git a/src/channels/__tests__/slack-metrics.test.ts b/src/channels/__tests__/slack-metrics.test.ts new file mode 100644 index 00000000..fe9de061 --- /dev/null +++ b/src/channels/__tests__/slack-metrics.test.ts @@ -0,0 +1,412 @@ +import { afterEach, beforeEach, describe, expect, mock, test } from "bun:test"; +import promClient from "prom-client"; +import { NoopSlackMetrics, SlackMetrics, type SlackSocketState } from "../slack-metrics.ts"; + +// Mock the Slack Bolt SDK so importing SlackChannel below does not pull +// the real WebSocket client at module load. We re-use the same pattern +// the other Slack channel tests use; SocketModeReceiver returns a stub +// whose `client.on(...)` records listeners into a per-test map so we can +// fire lifecycle events synthetically. +type SocketEventHandler = (...args: unknown[]) => void; +const socketClientListeners = new Map(); +function fireSocketEvent(event: string, ...args: unknown[]): void { + for (const fn of socketClientListeners.get(event) ?? []) { + fn(...args); + } +} +function resetSocketListeners(): void { + socketClientListeners.clear(); +} + +const MockSocketModeReceiver = mock(() => ({ + client: { + on: (event: string, listener: SocketEventHandler) => { + const list = socketClientListeners.get(event) ?? []; + list.push(listener); + socketClientListeners.set(event, list); + }, + }, +})); + +type AppMiddleware = (args: { body: unknown; next: () => Promise }) => Promise; +const middlewares: AppMiddleware[] = []; + +const MockApp = mock(() => ({ + start: () => Promise.resolve(), + stop: () => Promise.resolve(), + event: () => {}, + action: () => {}, + use: (m: AppMiddleware) => { + middlewares.push(m); + }, + client: { + auth: { test: () => Promise.resolve({ user_id: "U_BOT" }) }, + chat: { + postMessage: () => Promise.resolve({ ts: "1.0" }), + update: () => Promise.resolve({ ok: true }), + }, + conversations: { open: () => Promise.resolve({ channel: { id: "D1" } }) }, + reactions: { + add: () => Promise.resolve({ ok: true }), + remove: () => Promise.resolve({ ok: true }), + }, + }, +})); + +mock.module("@slack/bolt", () => ({ + App: MockApp, + SocketModeReceiver: MockSocketModeReceiver, +})); + +const { SlackChannel } = await import("../slack.ts"); +const { extractEventType } = await import("../slack.ts"); + +const ALL_STATES: SlackSocketState[] = [ + "connecting", + "authenticated", + "connected", + "reconnecting", + "disconnecting", + "disconnected", + "error", +]; + +/** + * Helper: parse the prom-client text-format dump and return the value of + * the labeled gauge series with the given state. The text format is + * `{label="value"} ` per line; histograms expand to + * `_bucket`, `_sum`, `_count`, which we ignore here. + */ +function readGauge(text: string, metric: string, state: string): number | null { + const re = new RegExp(`^${metric}\\{state="${state}"\\}\\s+(\\S+)`, "m"); + const m = re.exec(text); + if (!m) return null; + return Number.parseFloat(m[1] ?? ""); +} + +function readCounter(text: string, metric: string): number | null { + const re = new RegExp(`^${metric}(?:\\{[^}]*\\})?\\s+(\\S+)`, "m"); + const m = re.exec(text); + if (!m) return null; + return Number.parseFloat(m[1] ?? ""); +} + +function readHistogramCount(text: string, metric: string, labelMatch?: string): number | null { + const labelClause = labelMatch ? `\\{${labelMatch}\\}` : "(?:\\{[^}]*\\})?"; + const re = new RegExp(`^${metric}_count${labelClause}\\s+(\\S+)`, "m"); + const m = re.exec(text); + if (!m) return null; + return Number.parseFloat(m[1] ?? ""); +} + +describe("SlackMetrics", () => { + let registry: promClient.Registry; + let metrics: SlackMetrics; + + beforeEach(() => { + registry = new promClient.Registry(); + metrics = new SlackMetrics(registry); + resetSocketListeners(); + middlewares.length = 0; + }); + + afterEach(() => { + registry.clear(); + }); + + test("registers all four metric families on construction", async () => { + const text = await registry.metrics(); + expect(text).toContain("phantom_slack_socket_state"); + expect(text).toContain("phantom_slack_socket_reconnects_total"); + expect(text).toContain("phantom_slack_socket_connection_seconds"); + expect(text).toContain("phantom_slack_event_dispatch_seconds"); + }); + + test("initializes every state series at 0 before any lifecycle event", async () => { + const text = await registry.metrics(); + for (const s of ALL_STATES) { + expect(readGauge(text, "phantom_slack_socket_state", s)).toBe(0); + } + }); + + test("recordState(connected) sets connected=1 and others=0", async () => { + metrics.recordState("connected"); + const text = await registry.metrics(); + expect(readGauge(text, "phantom_slack_socket_state", "connected")).toBe(1); + for (const s of ALL_STATES.filter((x) => x !== "connected")) { + expect(readGauge(text, "phantom_slack_socket_state", s)).toBe(0); + } + }); + + test("recordState transitions overwrite the previous state", async () => { + metrics.recordState("connecting"); + metrics.recordState("authenticated"); + metrics.recordState("connected"); + metrics.recordState("disconnected"); + const text = await registry.metrics(); + expect(readGauge(text, "phantom_slack_socket_state", "disconnected")).toBe(1); + expect(readGauge(text, "phantom_slack_socket_state", "connected")).toBe(0); + expect(readGauge(text, "phantom_slack_socket_state", "authenticated")).toBe(0); + expect(readGauge(text, "phantom_slack_socket_state", "connecting")).toBe(0); + }); + + test("recordState handles the error pseudo-state", async () => { + metrics.recordState("error"); + const text = await registry.metrics(); + expect(readGauge(text, "phantom_slack_socket_state", "error")).toBe(1); + expect(readGauge(text, "phantom_slack_socket_state", "connected")).toBe(0); + }); + + test("recordReconnect increments the counter", async () => { + metrics.recordReconnect(); + metrics.recordReconnect(); + metrics.recordReconnect(); + const text = await registry.metrics(); + expect(readCounter(text, "phantom_slack_socket_reconnects_total")).toBe(3); + }); + + test("observeConnectionDuration records a histogram observation", async () => { + metrics.observeConnectionDuration(42); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_socket_connection_seconds")).toBe(1); + }); + + test("observeConnectionDuration silently drops negative input", async () => { + metrics.observeConnectionDuration(-5); + metrics.observeConnectionDuration(Number.NaN); + metrics.observeConnectionDuration(Number.POSITIVE_INFINITY); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_socket_connection_seconds")).toBe(0); + }); + + test("observeEventDispatch labels by event type", async () => { + metrics.observeEventDispatch("app_mention", 0.123); + metrics.observeEventDispatch("app_mention", 0.456); + metrics.observeEventDispatch("message", 0.7); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_event_dispatch_seconds", `event_type="app_mention"`)).toBe(2); + expect(readHistogramCount(text, "phantom_slack_event_dispatch_seconds", `event_type="message"`)).toBe(1); + }); +}); + +describe("NoopSlackMetrics", () => { + test("every method is a no-op (does not throw on any input)", () => { + const noop = new NoopSlackMetrics(); + expect(() => noop.recordState("connected")).not.toThrow(); + expect(() => noop.recordReconnect()).not.toThrow(); + expect(() => noop.observeConnectionDuration(123)).not.toThrow(); + expect(() => noop.observeConnectionDuration(-5)).not.toThrow(); + expect(() => noop.observeEventDispatch("event", 0.5)).not.toThrow(); + }); +}); + +describe("extractEventType", () => { + test("returns body.event.type when present", () => { + expect(extractEventType({ event: { type: "app_mention" } })).toBe("app_mention"); + expect(extractEventType({ event: { type: "message" } })).toBe("message"); + }); + + test("falls back to body.type for envelopes without an inner event", () => { + expect(extractEventType({ type: "block_actions" })).toBe("block_actions"); + expect(extractEventType({ type: "view_submission" })).toBe("view_submission"); + }); + + test("body.event.type wins over body.type", () => { + expect(extractEventType({ type: "envelope", event: { type: "reaction_added" } })).toBe("reaction_added"); + }); + + test("returns 'unknown' for missing or non-string types", () => { + expect(extractEventType({})).toBe("unknown"); + expect(extractEventType({ type: 42 })).toBe("unknown"); + expect(extractEventType({ event: {} })).toBe("unknown"); + expect(extractEventType({ event: null })).toBe("unknown"); + expect(extractEventType(null)).toBe("unknown"); + expect(extractEventType(undefined)).toBe("unknown"); + expect(extractEventType("string")).toBe("unknown"); + }); +}); + +// SlackChannel + SlackMetrics integration tests. These verify the wiring +// between the Bolt SocketModeReceiver lifecycle events and the metrics +// emitter, which is the production failure surface (a hooked-but-broken +// emitter ships zero metrics and the fleet view goes silent). +describe("SlackChannel lifecycle hooks emit metrics", () => { + let registry: promClient.Registry; + let metrics: SlackMetrics; + + beforeEach(() => { + registry = new promClient.Registry(); + metrics = new SlackMetrics(registry); + resetSocketListeners(); + middlewares.length = 0; + }); + + afterEach(() => { + registry.clear(); + }); + + test("constructing SlackChannel attaches all six lifecycle listeners", () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + // Bolt's underlying SocketModeClient emits these six lifecycle events + // (verbatim from @slack/socket-mode/dist/src/SocketModeClient.js State enum). + const expected = ["connecting", "authenticated", "connected", "reconnecting", "disconnecting", "disconnected"]; + for (const e of expected) { + expect(socketClientListeners.has(e)).toBe(true); + expect((socketClientListeners.get(e) ?? []).length).toBeGreaterThanOrEqual(1); + } + }); + + test("constructor pre-emits 'disconnected' so /metrics scrapes are complete pre-connect", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + const text = await registry.metrics(); + expect(readGauge(text, "phantom_slack_socket_state", "disconnected")).toBe(1); + expect(readGauge(text, "phantom_slack_socket_state", "connected")).toBe(0); + }); + + test("firing the 'connected' event flips the gauge", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + fireSocketEvent("connected"); + const text = await registry.metrics(); + expect(readGauge(text, "phantom_slack_socket_state", "connected")).toBe(1); + expect(readGauge(text, "phantom_slack_socket_state", "disconnected")).toBe(0); + }); + + test("'reconnecting' event increments the reconnect counter and observes connection duration", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + fireSocketEvent("connected"); + // Simulate some time elapsing before the reconnect; the value + // observed must be >= 0 even with a sub-millisecond gap. + await new Promise((r) => setTimeout(r, 5)); + fireSocketEvent("reconnecting"); + const text = await registry.metrics(); + expect(readCounter(text, "phantom_slack_socket_reconnects_total")).toBe(1); + expect(readHistogramCount(text, "phantom_slack_socket_connection_seconds")).toBe(1); + expect(readGauge(text, "phantom_slack_socket_state", "reconnecting")).toBe(1); + }); + + test("'disconnected' after 'connected' observes connection duration", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + fireSocketEvent("connected"); + await new Promise((r) => setTimeout(r, 5)); + fireSocketEvent("disconnected"); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_socket_connection_seconds")).toBe(1); + expect(readGauge(text, "phantom_slack_socket_state", "disconnected")).toBe(1); + }); + + test("'disconnected' without a prior 'connected' does NOT observe a duration", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + // Disconnect before ever connecting (e.g. authentication fails before + // the WSS handshake completes). + fireSocketEvent("disconnected"); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_socket_connection_seconds")).toBe(0); + }); + + test("the dispatch middleware times event handler completion", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + // The SlackChannel registers exactly one global middleware on construction. + expect(middlewares.length).toBe(1); + const mw = middlewares[0]; + if (!mw) throw new Error("middleware not registered"); + await mw({ + body: { event: { type: "app_mention" } }, + next: async () => { + await new Promise((r) => setTimeout(r, 3)); + }, + }); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_event_dispatch_seconds", `event_type="app_mention"`)).toBe(1); + }); + + test("the dispatch middleware records 'unknown' for unclassified payloads", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + const mw = middlewares[0]; + if (!mw) throw new Error("middleware not registered"); + await mw({ body: {}, next: async () => {} }); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_event_dispatch_seconds", `event_type="unknown"`)).toBe(1); + }); + + test("the dispatch middleware still records on handler throw", async () => { + // Construction is the side effect under test: the SlackChannel ctor + // attaches the lifecycle listeners and the dispatch middleware. + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + metrics, + }); + const mw = middlewares[0]; + if (!mw) throw new Error("middleware not registered"); + await expect( + mw({ + body: { event: { type: "message" } }, + next: async () => { + throw new Error("handler exploded"); + }, + }), + ).rejects.toThrow(/handler exploded/); + const text = await registry.metrics(); + expect(readHistogramCount(text, "phantom_slack_event_dispatch_seconds", `event_type="message"`)).toBe(1); + }); + + test("omitting the metrics emitter falls back to NoopSlackMetrics without throwing", () => { + // Construction must succeed even when no emitter is wired (e.g. tests + // that exercise routing in isolation). + expect( + () => + new SlackChannel({ + botToken: "xoxb-test", + appToken: "xapp-test", + }), + ).not.toThrow(); + }); +}); diff --git a/src/channels/__tests__/slack.test.ts b/src/channels/__tests__/slack.test.ts index c9399660..41871c04 100644 --- a/src/channels/__tests__/slack.test.ts +++ b/src/channels/__tests__/slack.test.ts @@ -12,8 +12,31 @@ const mockReactionsRemove = mock(() => Promise.resolve({ ok: true })); const mockConversationsOpen = mock(() => Promise.resolve({ channel: { id: "D_REJECT_DM" } })); type EventHandler = (...args: unknown[]) => Promise; +type SocketEventHandler = (...args: unknown[]) => void; const eventHandlers = new Map(); const actionHandlers = new Map(); +type AppMiddleware = (args: { body: unknown; next: () => Promise }) => Promise; +const middlewares: AppMiddleware[] = []; + +// Per-test capture of the SocketModeReceiver lifecycle listeners. The +// SlackChannel constructor attaches these via `receiver.client.on(...)`. +// Tests assert against this map directly to verify metric emission. +const socketClientListeners = new Map(); + +// Resettable per beforeEach so listeners do not leak across cases. +function resetSocketListeners(): void { + socketClientListeners.clear(); +} + +const MockSocketModeReceiver = mock(() => ({ + client: { + on: (event: string, listener: SocketEventHandler) => { + const list = socketClientListeners.get(event) ?? []; + list.push(listener); + socketClientListeners.set(event, list); + }, + }, +})); const MockApp = mock(() => ({ start: mockStart, @@ -25,6 +48,9 @@ const MockApp = mock(() => ({ const key = pattern instanceof RegExp ? pattern.source : pattern; actionHandlers.set(key, handler); }, + use: (m: AppMiddleware) => { + middlewares.push(m); + }, client: { auth: { test: mockAuthTest }, chat: { @@ -44,6 +70,7 @@ const MockApp = mock(() => ({ // Replace the import with our mock mock.module("@slack/bolt", () => ({ App: MockApp, + SocketModeReceiver: MockSocketModeReceiver, })); const testConfig: SlackChannelConfig = { @@ -60,6 +87,8 @@ describe("SlackChannel", () => { beforeEach(() => { eventHandlers.clear(); actionHandlers.clear(); + middlewares.length = 0; + resetSocketListeners(); mockStart.mockClear(); mockStop.mockClear(); mockAuthTest.mockClear(); @@ -414,6 +443,8 @@ describe("SlackChannel owner access control", () => { beforeEach(() => { eventHandlers.clear(); actionHandlers.clear(); + middlewares.length = 0; + resetSocketListeners(); mockStart.mockClear(); mockStop.mockClear(); mockAuthTest.mockClear(); diff --git a/src/channels/slack-channel-factory.ts b/src/channels/slack-channel-factory.ts index 7a941ff8..3a18caee 100644 --- a/src/channels/slack-channel-factory.ts +++ b/src/channels/slack-channel-factory.ts @@ -7,11 +7,45 @@ import { DEFAULT_METADATA_BASE_URL, MetadataIdentityFetcher, type SlackIdentity import { MetadataSecretFetcher } from "../config/metadata-fetcher.ts"; import type { ChannelsConfig } from "../config/schemas.ts"; import { SlackHttpChannel } from "./slack-http-receiver.ts"; +import type { SlackMetricsEmitter } from "./slack-metrics.ts"; import type { SlackTransport } from "./slack-transport.ts"; import { SlackChannel } from "./slack.ts"; export type SlackTransportMode = "socket" | "http"; +/** + * Cross-repo invariant: every Slack-related secret name fetched from + * phantomd's metadata gateway MUST appear in phantomd's + * `internal/secrets/types.go` `AllowedSecretNames` map. The gateway maps a + * non-allowed name to ErrInvalidName -> HTTP 404 (name-enumeration defense), + * so a drift between this list and the phantomd allowlist breaks tenant + * boot with no actionable error in the in-VM Phantom logs. + * + * The list below is the authoritative phantom-side mirror. Adding a Slack + * secret to this array implies a matching addition in phantomd's + * `AllowedSecretNames`; removing one requires that no callsite in this file + * still fetches it. The mirror is exported (vs. private to this module) so + * the factory test fixture can pin the same constant against its + * `SECRET_RESPONSES` allowlist; future re-orderings won't silently desync. + * + * Phase 8a addition (R7 dated 2026-04-30): `slack_app_token` joins the + * pair, gating the Socket Mode WSS dial for self-installed agent #2+ + * tenants. The corresponding phantomd entry landed in PR #28 + * (TestIsAllowedName_AcceptsSlackAppToken pins the symmetric assertion). + * + * Audit Finding 1 (cross-repo audit, dated 2026-04-25): the + * `slack_gateway_signing_secret` entry pairs with phantom-slack-events' + * outbound HMAC that the SlackHttpChannel verifies on every forwarded + * event. Distinct from the Slack-issued `signing_secret` which lives in + * phantom-slack-events' TOML config and never traverses the metadata + * gateway. + */ +export const AllowedSecretNamesMirror = Object.freeze([ + "slack_bot_token", + "slack_app_token", + "slack_gateway_signing_secret", +] as const); + export type CreateSlackChannelInput = { transport: SlackTransportMode; channelsConfig: ChannelsConfig | null; @@ -21,6 +55,13 @@ export type CreateSlackChannelInput = { // factory constructs the link-local default fetchers. identityFetcher?: { get(): Promise<{ slack?: SlackIdentity }> }; secretsFetcher?: { get(name: string): Promise }; + /** + * Phase 8a metrics emitter. Forwarded to the Socket Mode `SlackChannel` + * so its lifecycle hooks update the four metric series exposed on + * `/metrics`. The HTTP receiver does not consume metrics in this PR; a + * follow-up generalizes the surface (Phase 17). + */ + metrics?: SlackMetricsEmitter; }; /** @@ -42,6 +83,7 @@ export async function createSlackChannel(input: CreateSlackChannelInput): Promis appToken: sc.app_token, defaultChannelId: sc.default_channel_id, ownerUserId: sc.owner_user_id, + metrics: input.metrics, }); } @@ -56,14 +98,17 @@ export async function createSlackChannel(input: CreateSlackChannelInput): Promis "Run the OAuth flow via phantom-control or revert to SLACK_TRANSPORT=socket.", ); } - // Cross-repo invariant (audit Finding 1, dated 2026-04-25): the names - // "slack_bot_token" and "slack_gateway_signing_secret" must appear in - // phantomd's internal/secrets/types.go AllowedSecretNames map. Drift on - // either side breaks SLACK_TRANSPORT=http boot with HTTP 404 (the + // Cross-repo invariant: see AllowedSecretNamesMirror at the top of this + // file. The HTTP path consumes "slack_bot_token" + + // "slack_gateway_signing_secret"; the Socket Mode path additionally + // consumes "slack_app_token" via channels.yaml (R7 ยง6.4 will move it + // onto the gateway in Phase 8b). Drift between this file and + // phantomd's AllowedSecretNames breaks tenant boot with HTTP 404 (the // gateway maps ErrInvalidName to 404 to avoid name enumeration). The // regression is pinned by slack-channel-factory.test.ts's name-aware // mock, which throws on any unexpected name; phantomd pins the same - // contract via TestIsAllowedName_AcceptsSlackGatewaySigningSecret. + // contract via TestIsAllowedName_Accepts{SlackGatewaySigningSecret, + // SlackAppToken}. const [botToken, signingSecret] = await Promise.all([ secFetcher.get("slack_bot_token"), secFetcher.get("slack_gateway_signing_secret"), diff --git a/src/channels/slack-metrics.ts b/src/channels/slack-metrics.ts new file mode 100644 index 00000000..79c20396 --- /dev/null +++ b/src/channels/slack-metrics.ts @@ -0,0 +1,199 @@ +// Phase 8a: Slack Socket Mode lifecycle observability. +// +// We expose four Prometheus metrics over `/metrics`: +// +// - `phantom_slack_socket_state` (gauge, label: `state`). One time series per +// state (`connecting`, `authenticated`, `connected`, `reconnecting`, +// `disconnecting`, `disconnected`). At any instant exactly one series is +// 1.0; the rest are 0.0. A sustained gap in `connected=1` is the signal +// "this tenant is offline"; alerting watches `1 - max_over_time(...)` over +// a 1-minute window. +// - `phantom_slack_socket_reconnects_total` (counter). Incremented every +// time the underlying SocketModeClient emits `reconnecting`. Bolt's +// auto-reconnect is on by default; this counter measures "the network +// wobbled" rather than "we lost the tenant". Alerting fires when the rate +// exceeds a per-minute threshold tuned to Slack-side hiccups (5/min is +// sustained pain; 1-2/min is normal weather). +// - `phantom_slack_socket_connection_seconds` (histogram). Time from +// `connected` to the next `disconnected`, observed at disconnect. Useful +// for the long-tail SLO: 99.9% of connections should hold for >1 hour. +// - `phantom_slack_event_dispatch_seconds` (histogram, label: `event_type`). +// End-to-end time from `slack_event` arrival to Bolt's `app.use` +// middleware completion. Captures handler latency including the agent's +// own work (LLM calls, memory writes) when triggered from Slack. +// +// The metrics emitter is intentionally per-channel. The in-VM Phantom runs +// one Slack channel; for self-hosters who eventually run multiple workspaces +// in one process we still keep per-channel emitters because the metrics +// names already disambiguate by container/host (Prometheus job + instance). +// +// Cross-repo: the prom-client default registry is NOT used. We own a +// dedicated `slackMetricsRegistry` so future shared-channel metrics (Telegram, +// email, webhook) can each own their own registry, and `core/server.ts` +// merges them at request time. This keeps a buggy second emitter from +// poisoning the global registry with name collisions. + +import promClient, { type Counter, type Gauge, type Histogram, type Registry } from "prom-client"; + +/** + * Lifecycle states emitted by `@slack/socket-mode`'s `SocketModeClient`. + * Authoritative source: the `State` enum in `node_modules/@slack/socket-mode/dist/src/SocketModeClient.js`. + * We translate `unable_to_socket_mode_start` (an unrecoverable Bolt error + * thrown by `start()`) onto an `error` derived state for the gauge so the + * fleet view does not silently flatline when the receiver never connects. + */ +export type SlackSocketState = + | "connecting" + | "authenticated" + | "connected" + | "reconnecting" + | "disconnecting" + | "disconnected" + | "error"; + +const ALL_STATES: SlackSocketState[] = [ + "connecting", + "authenticated", + "connected", + "reconnecting", + "disconnecting", + "disconnected", + "error", +]; + +/** + * Bucket layout for `phantom_slack_socket_connection_seconds`. Spans the + * realistic range from "Slack hiccup, reconnect within seconds" to "the + * connection held for a day". The default prom-client histogram buckets + * (top out at 10s) are useless for a long-running WebSocket; the layout + * here keeps p50/p99 visible at both ends. + */ +const CONNECTION_BUCKETS_SECONDS = [1, 5, 10, 30, 60, 300, 600, 1800, 3600, 21600, 86400]; + +/** + * Bucket layout for `phantom_slack_event_dispatch_seconds`. Slack's ack + * deadline is 3 seconds, so we want fine resolution under 1s and a tail + * to catch handlers that run away (LLM calls can spike to 30s+). + */ +const EVENT_DISPATCH_BUCKETS_SECONDS = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10, 30]; + +/** + * Public interface implemented by the production emitter (and by test + * doubles in `slack-metrics.test.ts`). The split means `SlackChannel` does + * not depend on prom-client directly; tests can substitute a recording + * stub without spinning up a registry per test case. + */ +export interface SlackMetricsEmitter { + /** Mark the gauge: write 1.0 for `state` and 0.0 for the others. */ + recordState(state: SlackSocketState): void; + /** Bump the reconnect counter on every `reconnecting` event. */ + recordReconnect(): void; + /** Observe a completed connection's lifetime in seconds. */ + observeConnectionDuration(seconds: number): void; + /** Observe one event-handler dispatch in seconds. */ + observeEventDispatch(eventType: string, seconds: number): void; +} + +/** + * Concrete prom-client backed emitter. Owns a `Registry` so the caller can + * expose `register.metrics()` over HTTP without touching prom-client's + * default global registry. Construction is idempotent: calling `create` + * twice with the same registry creates fresh metric handles each time, but + * production uses one process-global instance from `index.ts` boot. + */ +export class SlackMetrics implements SlackMetricsEmitter { + readonly registry: Registry; + private readonly stateGauge: Gauge<"state">; + private readonly reconnectsCounter: Counter; + private readonly connectionHistogram: Histogram; + private readonly eventDispatchHistogram: Histogram<"event_type">; + + constructor(registry?: Registry) { + // Tests pass a fresh Registry per case so positive/negative assertions + // do not leak counter state between tests. Production passes nothing + // and gets a private registry per emitter instance. + this.registry = registry ?? new promClient.Registry(); + + this.stateGauge = new promClient.Gauge({ + name: "phantom_slack_socket_state", + help: "Slack Socket Mode connection state. Exactly one series is 1.0 at any instant; the rest are 0.0.", + labelNames: ["state"] as const, + registers: [this.registry], + }); + + this.reconnectsCounter = new promClient.Counter({ + name: "phantom_slack_socket_reconnects_total", + help: "Count of Slack Socket Mode reconnection attempts since process start.", + registers: [this.registry], + }); + + this.connectionHistogram = new promClient.Histogram({ + name: "phantom_slack_socket_connection_seconds", + help: "Duration of a single Slack Socket Mode connection from connect to disconnect.", + buckets: CONNECTION_BUCKETS_SECONDS, + registers: [this.registry], + }); + + this.eventDispatchHistogram = new promClient.Histogram({ + name: "phantom_slack_event_dispatch_seconds", + help: "End-to-end Slack event dispatch latency from receive to handler completion.", + labelNames: ["event_type"] as const, + buckets: EVENT_DISPATCH_BUCKETS_SECONDS, + registers: [this.registry], + }); + + // Initialize every state series at 0 so a Prometheus scrape during the + // connecting window still has a complete time series matrix. Without + // this, alerts on `slack_socket_state{state="connected"} == 0` fire + // before the first lifecycle event lands. + for (const s of ALL_STATES) { + this.stateGauge.set({ state: s }, 0); + } + } + + recordState(state: SlackSocketState): void { + for (const s of ALL_STATES) { + this.stateGauge.set({ state: s }, s === state ? 1 : 0); + } + } + + recordReconnect(): void { + this.reconnectsCounter.inc(); + } + + observeConnectionDuration(seconds: number): void { + // Defensive guard: clock skew or a misuse pattern (observe() called + // twice for the same connect window) can yield a negative value. + // prom-client throws on negative input; we drop silently so the + // caller does not have to care. + if (seconds < 0 || !Number.isFinite(seconds)) return; + this.connectionHistogram.observe(seconds); + } + + observeEventDispatch(eventType: string, seconds: number): void { + if (seconds < 0 || !Number.isFinite(seconds)) return; + this.eventDispatchHistogram.observe({ event_type: eventType }, seconds); + } +} + +/** + * No-op emitter used when metrics are disabled (e.g. tests that only care + * about routing) or when the channel is constructed outside the boot path + * (e.g. unit tests of egress helpers). The lifecycle hooks still call + * `record*` so the production emitter is wired the same way; the no-op + * avoids guarding every callsite with `?.`. + */ +export class NoopSlackMetrics implements SlackMetricsEmitter { + recordState(_state: SlackSocketState): void { + /* no-op */ + } + recordReconnect(): void { + /* no-op */ + } + observeConnectionDuration(_seconds: number): void { + /* no-op */ + } + observeEventDispatch(_eventType: string, _seconds: number): void { + /* no-op */ + } +} diff --git a/src/channels/slack.ts b/src/channels/slack.ts index ebbaa7d3..457b5387 100644 --- a/src/channels/slack.ts +++ b/src/channels/slack.ts @@ -1,4 +1,4 @@ -import { App, type LogLevel } from "@slack/bolt"; +import { App, type LogLevel, SocketModeReceiver } from "@slack/bolt"; import type { SlackBlock } from "./feedback.ts"; import { registerSlackActions } from "./slack-actions.ts"; import { @@ -12,6 +12,7 @@ import { egressUpdateMessage, egressUpdateWithFeedback, } from "./slack-egress.ts"; +import { NoopSlackMetrics, type SlackMetricsEmitter } from "./slack-metrics.ts"; import type { Channel, ChannelCapabilities, InboundMessage, OutboundMessage, SentMessage } from "./types.ts"; export type SlackChannelConfig = { @@ -20,10 +21,28 @@ export type SlackChannelConfig = { defaultChannelId?: string; ownerUserId?: string; transport?: "socket"; + /** + * Optional Phase 8a metrics emitter. When omitted, a `NoopSlackMetrics` + * is used so the lifecycle hooks fire the same way in test paths that + * don't care about Prometheus output. Production wires the shared + * `SlackMetrics` instance from `core/server.ts`. + */ + metrics?: SlackMetricsEmitter; }; type ConnectionState = "disconnected" | "connecting" | "connected" | "error"; +/** + * Minimal shape of the underlying `@slack/socket-mode` `SocketModeClient` + * we hook for lifecycle metrics. Bolt's `SocketModeReceiver.client` is + * publicly typed (`receivers/SocketModeReceiver.d.ts:55`), but we depend + * only on the EventEmitter interface here so test mocks do not need to + * carry the full SDK surface. + */ +type SocketModeClientLike = { + on(event: string, listener: (...args: unknown[]) => void): unknown; +}; + type ReactionHandler = (event: { reaction: string; userId: string; @@ -45,6 +64,7 @@ export class SlackChannel implements Channel { }; private app: App; + private receiver: SocketModeReceiver; private messageHandler: ((message: InboundMessage) => Promise) | null = null; private reactionHandler: ReactionHandler | null = null; private connectionState: ConnectionState = "disconnected"; @@ -52,19 +72,109 @@ export class SlackChannel implements Channel { private ownerUserId: string | null; private phantomName: string; private rejectedUsers = new Set(); + private readonly metrics: SlackMetricsEmitter; + /** + * Wall-clock millis when the most recent `connected` event fired. Used to + * observe `phantom_slack_socket_connection_seconds` on the next + * `disconnected` event. Null when no connection is currently up. + */ + private connectedAtMs: number | null = null; constructor(config: SlackChannelConfig) { if (config.transport && config.transport !== "socket") { throw new Error("SlackChannel only supports Socket Mode. Use SlackHttpChannel for HTTP receiver mode."); } + // Construct the receiver explicitly (vs. `socketMode: true` shorthand) + // so we can hook the underlying `SocketModeClient` lifecycle events for + // the Phase 8a metrics surface. The receiver type publicly exposes + // `client: SocketModeClient` (Bolt 4.6 SocketModeReceiver.d.ts:55), so + // the cast below is type-safe at the package boundary; we narrow to a + // minimal `SocketModeClientLike` to keep test mocks cheap. + this.receiver = new SocketModeReceiver({ + appToken: config.appToken, + logLevel: "ERROR" as LogLevel, + }); this.app = new App({ token: config.botToken, - socketMode: true, - appToken: config.appToken, + receiver: this.receiver, logLevel: "ERROR" as LogLevel, }); + this.metrics = config.metrics ?? new NoopSlackMetrics(); this.ownerUserId = config.ownerUserId ?? null; this.phantomName = "Phantom"; + + // Initialize the gauge to `disconnected` so a Prometheus scrape that + // lands before `connect()` returns sees a complete time series matrix. + this.metrics.recordState("disconnected"); + + this.attachLifecycleHooks(); + this.attachDispatchTimer(); + } + + /** + * Subscribe the metrics emitter to every lifecycle event the underlying + * `SocketModeClient` emits. The state enum is verbatim from + * `@slack/socket-mode/dist/src/SocketModeClient.js` (the State enum). + * `unable_to_socket_mode_start` is NOT an emitted event in the current + * SDK; that path surfaces as a thrown `UnrecoverableSocketModeStartError` + * from `start()` which we map to the `error` state in `connect()`. + */ + private attachLifecycleHooks(): void { + const client = this.receiver.client as unknown as SocketModeClientLike; + + client.on("connecting", () => { + this.metrics.recordState("connecting"); + }); + client.on("authenticated", () => { + this.metrics.recordState("authenticated"); + }); + client.on("connected", () => { + this.connectedAtMs = Date.now(); + this.metrics.recordState("connected"); + }); + client.on("reconnecting", () => { + this.metrics.recordReconnect(); + this.metrics.recordState("reconnecting"); + // Reconnect implies the prior connection just dropped; observe its + // lifetime if we have one in flight. + this.observeConnectionLifetime(); + }); + client.on("disconnecting", () => { + this.metrics.recordState("disconnecting"); + }); + client.on("disconnected", () => { + this.metrics.recordState("disconnected"); + this.observeConnectionLifetime(); + }); + } + + /** + * Wraps Bolt's global middleware to time end-to-end event dispatch. The + * `next()` await returns when the matching listener (or the absence of + * one) has finished, including any async work the user's handler does. + * Captures the high-level event type from `body.event.type` when + * available; falls back to the envelope `body.type` for slash commands + * and interactive payloads, then to `unknown` for shapes Bolt did not + * route to a discriminated handler. + */ + private attachDispatchTimer(): void { + this.app.use(async ({ body, next }) => { + const startMs = Date.now(); + const eventType = extractEventType(body); + try { + await next(); + } finally { + const seconds = (Date.now() - startMs) / 1000; + this.metrics.observeEventDispatch(eventType, seconds); + } + }); + } + + private observeConnectionLifetime(): void { + if (this.connectedAtMs == null) return; + const seconds = (Date.now() - this.connectedAtMs) / 1000; + this.metrics.observeConnectionDuration(seconds); + this.connectedAtMs = null; } setPhantomName(name: string): void { @@ -126,6 +236,12 @@ export class SlackChannel implements Channel { console.log("[slack] Socket Mode connected"); } catch (err: unknown) { this.connectionState = "error"; + // `app.start()` throws `UnrecoverableSocketModeStartError` for the + // auth-revoked / invalid-app-token / connections:write-missing + // failure cases. Mark the gauge so the fleet view reflects the + // down tenant even though no `disconnected` event ever fired + // (the SocketModeClient never made it past handshake). + this.metrics.recordState("error"); const msg = err instanceof Error ? err.message : String(err); console.error(`[slack] Failed to connect: ${msg}`); throw err; @@ -327,3 +443,28 @@ export class SlackChannel implements Channel { function buildConversationId(channel: string, threadTs: string): string { return `slack:${channel}:${threadTs}`; } + +/** + * Pull a stable, low-cardinality string out of Bolt's middleware `body` so + * the `event_type` label on `phantom_slack_event_dispatch_seconds` does + * not blow up cardinality. Order: + * 1. `body.event.type` (the event-API envelope, e.g. `app_mention`, + * `message`, `reaction_added`). + * 2. `body.type` (slash-commands carry `command`, interactivity carries + * `block_actions` / `view_submission`). + * 3. `unknown` fallback so a payload Bolt has not classified still gets + * a series. + */ +export function extractEventType(body: unknown): string { + if (!body || typeof body !== "object") return "unknown"; + const b = body as Record; + const event = b.event; + if (event && typeof event === "object") { + const ev = event as Record; + const t = ev.type; + if (typeof t === "string" && t.length > 0) return t; + } + const top = b.type; + if (typeof top === "string" && top.length > 0) return top; + return "unknown"; +} diff --git a/src/core/server.ts b/src/core/server.ts index 7e363fc3..98fe2e1b 100644 --- a/src/core/server.ts +++ b/src/core/server.ts @@ -26,6 +26,19 @@ type OnboardingStatusProvider = () => string; type WebhookHandler = (req: Request) => Promise; type PeerHealthProvider = () => Record; type SchedulerHealthProvider = () => SchedulerHealthSummary | null; +/** + * Phase 8a: provider-shape for the Prometheus registry that backs + * `/metrics`. The provider returns an object with the prom-client `Registry` + * surface we depend on (`metrics()` for the text-format dump, + * `contentType` for the response header). Keeping the surface minimal + * means future emitters (Telegram, email) can plug in without taking a + * direct dependency on prom-client at this layer. + */ +type MetricsRegistryLike = { + metrics(): Promise; + contentType: string; +}; +type MetricsRegistryProvider = () => MetricsRegistryLike | null; type TriggerDeps = { runtime: AgentRuntime; slackChannel?: SlackTransport; @@ -41,6 +54,7 @@ let onboardingStatusProvider: OnboardingStatusProvider | null = null; let webhookHandler: WebhookHandler | null = null; let peerHealthProvider: PeerHealthProvider | null = null; let schedulerHealthProvider: SchedulerHealthProvider | null = null; +let metricsRegistryProvider: MetricsRegistryProvider | null = null; let triggerDeps: TriggerDeps | null = null; let chatHandler: ChatHandler | null = null; @@ -80,6 +94,16 @@ export function setSchedulerHealthProvider(provider: SchedulerHealthProvider): v schedulerHealthProvider = provider; } +/** + * Phase 8a: register the Prometheus registry that backs `/metrics`. Wired + * from `index.ts` boot with the `SlackMetrics.registry`. Returns null from + * the provider to disable the route entirely (e.g. tests that do not + * exercise the metrics path). + */ +export function setMetricsRegistryProvider(provider: MetricsRegistryProvider): void { + metricsRegistryProvider = provider; +} + export function setTriggerDeps(deps: TriggerDeps): void { triggerDeps = deps; } @@ -155,6 +179,28 @@ export function startServer(config: PhantomConfig, startedAt: number): ReturnTyp return Response.json(payload); } + // Phase 8a: Prometheus metrics surface. Unauthenticated by design, + // matching the existing `/health` precedent: this server's tenant + // isolation comes from the per-tenant URL behind Caddy, not from + // per-route auth. Returns 503 when no registry is wired (the + // process started without a metrics provider). + if (url.pathname === "/metrics" && req.method === "GET") { + const registry = metricsRegistryProvider?.(); + if (!registry) { + return new Response("metrics registry not configured", { + status: 503, + headers: { "Content-Type": "text/plain; charset=utf-8" }, + }); + } + const body = await registry.metrics(); + return new Response(body, { + headers: { + "Content-Type": registry.contentType, + "Cache-Control": "no-store", + }, + }); + } + if (url.pathname === "/mcp") { const mcpServer = mcpServerProvider?.(); if (!mcpServer) { diff --git a/src/index.ts b/src/index.ts index bfd54b16..39f25474 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,6 +15,7 @@ import { setActionFollowUpHandler } from "./channels/slack-actions.ts"; import { createSlackChannel, readSlackTransportFromEnv } from "./channels/slack-channel-factory.ts"; import { SlackHttpChannel } from "./channels/slack-http-receiver.ts"; import { setSlackHttpChannelProvider } from "./channels/slack-http-routes.ts"; +import { SlackMetrics } from "./channels/slack-metrics.ts"; import type { SlackTransport } from "./channels/slack-transport.ts"; import { createStatusReactionController } from "./channels/status-reactions.ts"; import { TelegramChannel } from "./channels/telegram.ts"; @@ -28,6 +29,7 @@ import { setEvolutionVersionProvider, setMcpServerProvider, setMemoryHealthProvider, + setMetricsRegistryProvider, setOnboardingStatusProvider, setPeerHealthProvider, setRoleInfoProvider, @@ -314,10 +316,19 @@ async function main(): Promise { // factory throws so a mis-provisioned tenant fails loudly. const channelsConfig = loadChannelsConfig(); const slackTransport = readSlackTransportFromEnv(); + // Phase 8a: a process-global SlackMetrics owns its prom-client Registry. + // `core/server.ts` exposes `slackMetrics.registry` at /metrics. We + // construct the emitter unconditionally (even when no Slack channel is + // configured) so the /metrics endpoint always emits the zero-state + // matrix; this keeps Prometheus scrape parity across multi-tenant pools + // and prevents alert flapping when Phantoms boot/reboot. + const slackMetrics = new SlackMetrics(); + setMetricsRegistryProvider(() => slackMetrics.registry); const slackChannel: SlackTransport | null = await createSlackChannel({ transport: slackTransport, channelsConfig, metadataBaseUrl: process.env.METADATA_BASE_URL, + metrics: slackMetrics, }); if (slackChannel) {