From 34427088f7d45e3e0a9e1e4fe1be5dd60960643d Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Wed, 29 Apr 2026 01:18:01 +0900 Subject: [PATCH 01/20] Report message queue depth Add optional MessageQueue.getDepth() support so queue implementations can report backlog size for observability. The core in-process queue reports ready and delayed messages, and the parallel queue delegates to its wrapped queue when available. Implement depth reporting for the Redis, PostgreSQL, MySQL, SQLite, and AMQP queue adapters. Document the API, backend support, and AMQP delayed queue tracking limits, and update the changelog for each affected package. Fixes https://github.com/fedify-dev/fedify/issues/735 Assisted-by: Codex:gpt-5.5 --- CHANGES.md | 36 ++++++++++ docs/manual/mq.md | 79 ++++++++++++++++++++++ packages/amqp/src/mq.test.ts | 41 ++++++++++++ packages/amqp/src/mq.ts | 47 +++++++++++++ packages/fedify/src/federation/mq.test.ts | 80 +++++++++++++++++++++++ packages/fedify/src/federation/mq.ts | 70 +++++++++++++++++++- packages/mysql/src/mq.test.ts | 26 ++++++++ packages/mysql/src/mq.ts | 22 +++++++ packages/postgres/src/mq.test.ts | 28 ++++++++ packages/postgres/src/mq.ts | 20 ++++++ packages/redis/src/mq.test.ts | 31 +++++++++ packages/redis/src/mq.ts | 14 ++++ packages/sqlite/src/mq.test.ts | 27 ++++++++ packages/sqlite/src/mq.ts | 22 +++++++ 14 files changed, 541 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f4add3089..ddc62d239 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,42 @@ Version 2.3.0 To be released. +### @fedify/fedify + + - Added optional `MessageQueue.getDepth()` support for reporting queue + backlog depth. `InProcessMessageQueue` can now report queued messages, + including ready and delayed counts, and `ParallelMessageQueue` delegates + depth reporting to its wrapped queue when supported. [[#735], [#748]] + +[#735]: https://github.com/fedify-dev/fedify/issues/735 +[#748]: https://github.com/fedify-dev/fedify/pull/748 + +### @fedify/amqp + + - Added `AmqpMessageQueue.getDepth()` for reporting queued, ready, and + delayed message counts. Delayed counts include queues created or tracked + by the same `AmqpMessageQueue` instance. [[#735], [#748]] + +### @fedify/mysql + + - Added `MysqlMessageQueue.getDepth()` for reporting queued, ready, and + delayed message counts. [[#735], [#748]] + +### @fedify/postgres + + - Added `PostgresMessageQueue.getDepth()` for reporting queued, ready, and + delayed message counts. [[#735], [#748]] + +### @fedify/redis + + - Added `RedisMessageQueue.getDepth()` for reporting queued, ready, and + delayed message counts. [[#735], [#748]] + +### @fedify/sqlite + + - Added `SqliteMessageQueue.getDepth()` for reporting queued, ready, and + delayed message counts. [[#735], [#748]] + Version 2.2.0 ------------- diff --git a/docs/manual/mq.md b/docs/manual/mq.md index 4adf552ca..807c81f1e 100644 --- a/docs/manual/mq.md +++ b/docs/manual/mq.md @@ -672,6 +672,7 @@ the `~MessageQueue.enqueue()` and `~MessageQueue.listen()` methods: ~~~~ typescript twoslash import type { MessageQueue, + MessageQueueDepth, MessageQueueEnqueueOptions, MessageQueueListenOptions, } from "@fedify/fedify"; @@ -693,6 +694,11 @@ class CustomMessageQueue implements MessageQueue { ): Promise { // Implementation here } + + async getDepth(): Promise { + // Optional: return queue depth for observability + return { queued: 0, ready: 0, delayed: 0 }; + } } ~~~~ @@ -747,6 +753,21 @@ you can set the `nativeRetrial` property to `true` to indicate this. When this property is `true`, Fedify will skip its own retry logic and rely on your backend to handle retries, avoiding duplicate retry mechanisms. +### Implement `~MessageQueue.getDepth()` method (optional) + +*This API is available since Fedify 2.3.0.* + +This optional method should return the number of messages still waiting in the +backend queue. It should not include messages that have already been handed to +a worker for processing. Return `queued` for the total waiting messages. If +your backend can cheaply distinguish scheduled messages, also return `ready` +for messages eligible for immediate processing and `delayed` for messages +scheduled for later delivery. + +Implement this method if your queue backend exposes an efficient count +operation. If the platform does not expose reliable counts, omit the method +rather than returning an approximate value that could mislead monitoring. + Parallel message processing --------------------------- @@ -951,6 +972,64 @@ Optimized performance : Backend-specific optimizations for retry logic. +Queue depth reporting +--------------------- + +*This API is available since Fedify 2.3.0.* + +Some message queue implementations expose `~MessageQueue.getDepth()` for +observability. Queue depth means messages still waiting in the backend queue: + +`queued` +: Total waiting messages. This excludes messages currently being handled by + a worker. + +`ready` +: Waiting messages eligible for immediate processing. This value is omitted + when the backend cannot distinguish ready and delayed messages cheaply. + +`delayed` +: Waiting messages scheduled for later delivery. This value is omitted when + the backend cannot distinguish ready and delayed messages cheaply. + +For example: + +~~~~ typescript twoslash +import type { MessageQueue } from "@fedify/fedify"; +declare const queue: MessageQueue; +// ---cut-before--- +const depth = await queue.getDepth?.(); +if (depth != null) { + console.log("Queued messages:", depth.queued); +} +~~~~ + +### Implementation support + +| Implementation | Queue Depth Support | +| ------------------------ | ----------------------------------------- | +| `InProcessMessageQueue` | `queued`, `ready`, `delayed` | +| [`DenoKvMessageQueue`] | No reliable platform count | +| [`RedisMessageQueue`] | `queued`, `ready`, `delayed` | +| [`PostgresMessageQueue`] | `queued`, `ready`, `delayed` | +| [`MysqlMessageQueue`] | `queued`, `ready`, `delayed` | +| [`AmqpMessageQueue`] | `queued`, `ready`, `delayed`[^amqp-depth] | +| [`SqliteMessageQueue`] | `queued`, `ready`, `delayed` | +| `WorkersMessageQueue` | No reliable platform count | + +If you pass the same `MessageQueue` instance as the shared queue for inbox, +outbox, and fanout work, observability code should report that queue once as a +shared queue. Reporting the same `getDepth()` result separately for each +logical role would double- or triple-count the backlog. + +[^amqp-depth]: `AmqpMessageQueue` can count the configured ready queues and + delayed queues created by the same `AmqpMessageQueue` instance. + AMQP 0-9-1 does not provide a portable queue-listing API, so + delayed queues created by another process before this instance + starts are not included until this instance creates or tracks + them. + + Ordering guarantees ------------------- diff --git a/packages/amqp/src/mq.test.ts b/packages/amqp/src/mq.test.ts index f8499bf66..908b45fb2 100644 --- a/packages/amqp/src/mq.test.ts +++ b/packages/amqp/src/mq.test.ts @@ -37,6 +37,47 @@ test( ), ); +test( + "AmqpMessageQueue.getDepth()", + { sanitizeOps: false, sanitizeExit: false, sanitizeResources: false }, + async () => { + const conn = await getConnection(); + const queue = getRandomKey("depth_queue"); + const delayedQueuePrefix = getRandomKey("depth_delayed") + "_"; + const mq = new AmqpMessageQueue(conn, { queue, delayedQueuePrefix }); + try { + assertEquals(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + await mq.enqueue("ready"); + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ seconds: 60 }), + }); + const started = Date.now(); + while (Date.now() - started < 15_000) { + const depth = await mq.getDepth(); + if (depth.queued === 2 && depth.ready === 1 && depth.delayed === 1) { + break; + } + await delay(100); + } + assertEquals(await mq.getDepth(), { + queued: 2, + ready: 1, + delayed: 1, + }); + } finally { + const channel = await conn.createChannel(); + await channel.deleteQueue(queue); + await channel.deleteQueue(`${delayedQueuePrefix}60000`).catch(() => {}); + await channel.close(); + await conn.close(); + } + }, +); + // Test with ordering key support (requires rabbitmq_consistent_hash_exchange plugin) const orderingConnections: ChannelModel[] = []; const orderingQueue = getRandomKey("ordering_queue"); diff --git a/packages/amqp/src/mq.ts b/packages/amqp/src/mq.ts index 641e01a16..0a7fb0d4c 100644 --- a/packages/amqp/src/mq.ts +++ b/packages/amqp/src/mq.ts @@ -1,5 +1,6 @@ import type { MessageQueue, + MessageQueueDepth, MessageQueueEnqueueOptions, MessageQueueListenOptions, } from "@fedify/fedify"; @@ -127,6 +128,7 @@ export class AmqpMessageQueue implements MessageQueue { queuePrefix: string; partitions: number; }; + #delayedQueues: Set = new Set(); #orderingPrepared: boolean = false; readonly nativeRetrial: boolean; @@ -263,6 +265,7 @@ export class AmqpMessageQueue implements MessageQueue { deadLetterRoutingKey, messageTtl: delay, }); + this.#delayedQueues.add(queue); } channel.sendToQueue( queue, @@ -345,6 +348,7 @@ export class AmqpMessageQueue implements MessageQueue { deadLetterRoutingKey, messageTtl: delay, }); + this.#delayedQueues.add(queue); } for (const message of messages) { @@ -359,6 +363,49 @@ export class AmqpMessageQueue implements MessageQueue { } } + async getDepth(): Promise { + let channel = await this.#connection.createChannel(); + try { + await this.#prepareQueue(channel); + await this.#prepareOrdering(channel); + + let ready = (await channel.checkQueue(this.#queue)).messageCount; + if (this.#ordering != null) { + for (let i = 0; i < this.#ordering.partitions; i++) { + ready += (await channel.checkQueue(this.#getOrderingQueueName(i))) + .messageCount; + } + } + + let delayed = 0; + for (const queue of [...this.#delayedQueues]) { + try { + delayed += (await channel.checkQueue(queue)).messageCount; + } catch { + this.#delayedQueues.delete(queue); + try { + await channel.close(); + } catch { + // The channel is usually already closed after a failed checkQueue(). + } + channel = await this.#connection.createChannel(); + } + } + + return { + queued: ready + delayed, + ready, + delayed, + }; + } finally { + try { + await channel.close(); + } catch { + // The channel can already be closed if a tracked delayed queue vanished. + } + } + } + async listen( // deno-lint-ignore no-explicit-any handler: (message: any) => void | Promise, diff --git a/packages/fedify/src/federation/mq.test.ts b/packages/fedify/src/federation/mq.test.ts index 8c7adc3de..1120507a6 100644 --- a/packages/fedify/src/federation/mq.test.ts +++ b/packages/fedify/src/federation/mq.test.ts @@ -34,6 +34,14 @@ test("InProcessMessageQueue", async (t) => { assertFalse(mq.nativeRetrial); }); + await t.step("getDepth() [empty]", async () => { + assertEquals(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + }); + const messages: string[] = []; const controller = new AbortController(); const listening = mq.listen((message: string) => { @@ -118,6 +126,70 @@ test("InProcessMessageQueue", async (t) => { await listening; }); +test("InProcessMessageQueue.getDepth()", async () => { + const mq = new InProcessMessageQueue(); + assertEquals(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + + await mq.enqueue("Ready message"); + await mq.enqueue("Delayed message", { + delay: Temporal.Duration.from({ seconds: 1 }), + }); + assertEquals(await mq.getDepth(), { + queued: 2, + ready: 1, + delayed: 1, + }); + + const messages: string[] = []; + const controller = new AbortController(); + const listening = mq.listen((message: string) => { + messages.push(message); + if (messages.length >= 2) controller.abort(); + }, { signal: controller.signal }); + + await waitFor(() => messages.length >= 2, 15_000); + await listening; + assertEquals(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); +}); + +test("InProcessMessageQueue.getDepth() excludes in-flight messages", async () => { + const mq = new InProcessMessageQueue(); + let resolveHandler: (() => void) | undefined; + const controller = new AbortController(); + const handled = new Promise((resolve) => { + resolveHandler = resolve; + }); + // Resolved after the message has been removed from the queue and handed + // to the handler. + let notifyStarted: () => void = () => {}; + const handlerStarted = new Promise((resolve) => { + notifyStarted = resolve; + }); + const listening = mq.listen(async () => { + notifyStarted(); + await handled; + controller.abort(); + }, { signal: controller.signal }); + + await mq.enqueue("in-flight"); + await handlerStarted; + assertEquals(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + resolveHandler?.(); + await listening; +}); + test("InProcessMessageQueue orderingKey", async (t) => { const mq = new InProcessMessageQueue(); @@ -277,6 +349,14 @@ for (const mqName in queues) { assertEquals(workers.nativeRetrial, mq.nativeRetrial); }); + await t.step("getDepth() delegation", async () => { + if (mq.getDepth == null) { + assertEquals(workers.getDepth, undefined); + } else { + assertEquals(await workers.getDepth?.(), await mq.getDepth()); + } + }); + const messages: string[] = []; const controller = new AbortController(); const listening = workers.listen(async (message: string) => { diff --git a/packages/fedify/src/federation/mq.ts b/packages/fedify/src/federation/mq.ts index 7f32ab007..60ad40ecf 100644 --- a/packages/fedify/src/federation/mq.ts +++ b/packages/fedify/src/federation/mq.ts @@ -39,6 +39,37 @@ export interface MessageQueueListenOptions { signal?: AbortSignal; } +/** + * The number of messages waiting in a message queue. + * + * @since 2.3.0 + */ +export interface MessageQueueDepth { + /** + * The total number of messages still waiting in the backend queue. + * + * This does not include messages that have already been handed to a worker + * for processing. + */ + readonly queued: number; + + /** + * The number of queued messages eligible for immediate processing. + * + * Queue backends that cannot cheaply distinguish ready and delayed messages + * may omit this field. + */ + readonly ready?: number; + + /** + * The number of queued messages scheduled for later delivery. + * + * Queue backends that cannot cheaply distinguish ready and delayed messages + * may omit this field. + */ + readonly delayed?: number; +} + /** * An abstract interface for a message queue. * @@ -88,6 +119,17 @@ export interface MessageQueue { handler: (message: any) => Promise | void, options?: MessageQueueListenOptions, ): Promise; + + /** + * Gets the number of messages waiting in the queue. + * + * This operation is optional, and may not be supported by all + * implementations. The returned counts exclude messages currently being + * handled by a worker. + * + * @since 2.3.0 + */ + getDepth?(): Promise; } /** @@ -121,6 +163,7 @@ export class InProcessMessageQueue implements MessageQueue { #messages: QueuedMessage[]; #monitors: Record, () => void>; #pollIntervalMs: number; + #delayedMessages: number; /** * Tracks which ordering keys are currently being processed to ensure * sequential processing for messages with the same key. @@ -143,6 +186,7 @@ export class InProcessMessageQueue implements MessageQueue { this.#pollIntervalMs = Temporal.Duration.from( options.pollInterval ?? { seconds: 5 }, ).total("millisecond"); + this.#delayedMessages = 0; this.#processingKeys = new Set(); } @@ -151,8 +195,12 @@ export class InProcessMessageQueue implements MessageQueue { ? 0 : Math.max(options.delay.total("millisecond"), 0); if (delay > 0) { + this.#delayedMessages++; setTimeout( - () => this.enqueue(message, { ...options, delay: undefined }), + () => { + this.#delayedMessages--; + void this.enqueue(message, { ...options, delay: undefined }); + }, delay, ); return Promise.resolve(); @@ -174,8 +222,12 @@ export class InProcessMessageQueue implements MessageQueue { ? 0 : Math.max(options.delay.total("millisecond"), 0); if (delay > 0) { + this.#delayedMessages += messages.length; setTimeout( - () => this.enqueueMany(messages, { ...options, delay: undefined }), + () => { + this.#delayedMessages -= messages.length; + void this.enqueueMany(messages, { ...options, delay: undefined }); + }, delay, ); return Promise.resolve(); @@ -227,6 +279,16 @@ export class InProcessMessageQueue implements MessageQueue { } } + getDepth(): Promise { + const ready = this.#messages.length; + const delayed = this.#delayedMessages; + return Promise.resolve({ + queued: ready + delayed, + ready, + delayed, + }); + } + #wait(ms: number, signal?: AbortSignal): Promise { let timer: ReturnType | null = null; return Promise.any([ @@ -288,6 +350,7 @@ export class ParallelMessageQueue implements MessageQueue { * @since 1.7.0 */ readonly nativeRetrial?: boolean; + readonly getDepth?: () => Promise; /** * Tracks which ordering keys are currently being processed to ensure @@ -320,6 +383,9 @@ export class ParallelMessageQueue implements MessageQueue { this.queue = queue; this.workers = workers; this.nativeRetrial = queue.nativeRetrial; + if (queue.getDepth != null) { + this.getDepth = () => queue.getDepth!(); + } } enqueue(message: any, options?: MessageQueueEnqueueOptions): Promise { diff --git a/packages/mysql/src/mq.test.ts b/packages/mysql/src/mq.test.ts index 0712c5899..f96a99f99 100644 --- a/packages/mysql/src/mq.test.ts +++ b/packages/mysql/src/mq.test.ts @@ -97,6 +97,32 @@ test("MysqlMessageQueue", { skip: dbUrl == null }, () => { ); }); +test("MysqlMessageQueue.getDepth()", { skip: dbUrl == null }, async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl); + const tableName = randomTableName("depth"); + const mq = new MysqlMessageQueue(pool, { tableName }); + try { + assert.deepStrictEqual(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + await mq.enqueue("ready"); + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ hours: 1 }), + }); + assert.deepStrictEqual(await mq.getDepth(), { + queued: 2, + ready: 1, + delayed: 1, + }); + } finally { + await mq.drop(); + await pool.end(); + } +}); + // --------------------------------------------------------------------------- // initialize() and drop() // --------------------------------------------------------------------------- diff --git a/packages/mysql/src/mq.ts b/packages/mysql/src/mq.ts index 9b43f6313..a9ab14260 100644 --- a/packages/mysql/src/mq.ts +++ b/packages/mysql/src/mq.ts @@ -1,5 +1,6 @@ import type { MessageQueue, + MessageQueueDepth, MessageQueueEnqueueOptions, MessageQueueListenOptions, } from "@fedify/fedify"; @@ -305,6 +306,27 @@ export class MysqlMessageQueue implements MessageQueue { }); } + /** + * {@inheritDoc MessageQueue.getDepth} + * @since 2.3.0 + */ + async getDepth(): Promise { + await this.initialize(); + const [rows] = await this.#pool.query( + `SELECT + COUNT(*) AS queued, + COALESCE(SUM(\`deliver_after\` <= NOW(6)), 0) AS ready + FROM \`${this.#tableName}\``, + ); + const queued = Number(rows[0].queued); + const ready = Number(rows[0].ready); + return { + queued, + ready, + delayed: queued - ready, + }; + } + /** * {@inheritDoc MessageQueue.listen} * @since 2.1.0 diff --git a/packages/postgres/src/mq.test.ts b/packages/postgres/src/mq.test.ts index 29c02f829..97216ce69 100644 --- a/packages/postgres/src/mq.test.ts +++ b/packages/postgres/src/mq.test.ts @@ -42,6 +42,34 @@ test("PostgresMessageQueue", { ignore: dbUrl == null }, () => { ); }); +test("PostgresMessageQueue.getDepth()", { ignore: dbUrl == null }, async () => { + if (dbUrl == null) return; // Bun does not support skip option + + const tableName = getRandomKey("message_depth"); + const channelName = getRandomKey("channel_depth"); + const sql = postgres(dbUrl); + const mq = new PostgresMessageQueue(sql, { tableName, channelName }); + try { + deepStrictEqual(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + await mq.enqueue("ready"); + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ hours: 1 }), + }); + deepStrictEqual(await mq.getDepth(), { + queued: 2, + ready: 1, + delayed: 1, + }); + } finally { + await mq.drop(); + await sql.end(); + } +}); + // Regression test for advisory lock not being fully released after processing // a message with an ordering key. This test verifies that after processing // a message through PostgresMessageQueue.listen(), the advisory lock is fully diff --git a/packages/postgres/src/mq.ts b/packages/postgres/src/mq.ts index 4fed2db8d..bee030246 100644 --- a/packages/postgres/src/mq.ts +++ b/packages/postgres/src/mq.ts @@ -1,5 +1,6 @@ import type { MessageQueue, + MessageQueueDepth, MessageQueueEnqueueOptions, MessageQueueListenOptions, } from "@fedify/fedify"; @@ -227,6 +228,25 @@ export class PostgresMessageQueue implements MessageQueue { }); } + async getDepth(): Promise { + await this.initialize(); + const result = await this.#sql` + SELECT + COUNT(*) AS queued, + COUNT(*) FILTER ( + WHERE created + delay < CURRENT_TIMESTAMP + ) AS ready + FROM ${this.#sql(this.#tableName)} + `; + const queued = Number(result[0].queued); + const ready = Number(result[0].ready); + return { + queued, + ready, + delayed: queued - ready, + }; + } + async listen( // deno-lint-ignore no-explicit-any handler: (message: any) => void | Promise, diff --git a/packages/redis/src/mq.test.ts b/packages/redis/src/mq.test.ts index fad3a7dc3..793642a8a 100644 --- a/packages/redis/src/mq.test.ts +++ b/packages/redis/src/mq.test.ts @@ -1,6 +1,7 @@ import { test } from "@fedify/fixture"; import { RedisMessageQueue } from "@fedify/redis/mq"; import { getRandomKey, testMessageQueue } from "@fedify/testing"; +import assert from "node:assert/strict"; import process from "node:process"; import { Redis } from "ioredis"; @@ -41,3 +42,33 @@ test("RedisMessageQueue", { ignore: dbUrl == null }, () => { { testOrderingKey: true }, ); }); + +test("RedisMessageQueue.getDepth()", { ignore: dbUrl == null }, async () => { + if (dbUrl == null) return; // Bun does not support skip option + const channelKey = getRandomKey("channel_depth"); + const queueKey = getRandomKey("queue_depth"); + const lockKey = getRandomKey("lock_depth"); + const mq = new RedisMessageQueue(() => new Redis(dbUrl), { + channelKey, + queueKey, + lockKey, + }); + try { + assert.deepStrictEqual(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + await mq.enqueue("ready"); + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ hours: 1 }), + }); + assert.deepStrictEqual(await mq.getDepth(), { + queued: 2, + ready: 1, + delayed: 1, + }); + } finally { + await disposeMessageQueue(mq); + } +}); diff --git a/packages/redis/src/mq.ts b/packages/redis/src/mq.ts index 32e4e352a..031a706f0 100644 --- a/packages/redis/src/mq.ts +++ b/packages/redis/src/mq.ts @@ -1,6 +1,7 @@ // deno-lint-ignore-file no-explicit-any import type { MessageQueue, + MessageQueueDepth, MessageQueueEnqueueOptions, MessageQueueListenOptions, } from "@fedify/fedify"; @@ -201,6 +202,19 @@ export class RedisMessageQueue implements MessageQueue, Disposable { if (baseTs <= now) this.#redis.publish(this.#channelKey, ""); } + async getDepth(): Promise { + const now = Temporal.Now.instant().epochMilliseconds; + const [queued, ready] = await Promise.all([ + this.#redis.zcard(this.#queueKey), + this.#redis.zcount(this.#queueKey, "-inf", now), + ]); + return { + queued, + ready, + delayed: queued - ready, + }; + } + /** * Returns the Redis key used to lock a specific ordering key. */ diff --git a/packages/sqlite/src/mq.test.ts b/packages/sqlite/src/mq.test.ts index 1846ed1af..626d9f1c3 100644 --- a/packages/sqlite/src/mq.test.ts +++ b/packages/sqlite/src/mq.test.ts @@ -2,6 +2,7 @@ import { PlatformDatabase } from "#sqlite"; import { test } from "@fedify/fixture"; import { SqliteMessageQueue } from "@fedify/sqlite/mq"; import { getRandomKey, testMessageQueue } from "@fedify/testing"; +import assert from "node:assert/strict"; import { mkdtemp } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; @@ -22,3 +23,29 @@ test("SqliteMessageQueue", () => }, { testOrderingKey: true }, )); + +test("SqliteMessageQueue.getDepth()", async () => { + const dbPath = join(dbDir, `${getRandomKey("sqlite_depth")}.db`); + const db = new PlatformDatabase(dbPath); + const tableName = getRandomKey("message_depth").replaceAll("-", "_"); + const mq = new SqliteMessageQueue(db, { tableName }); + try { + assert.deepStrictEqual(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + await mq.enqueue("ready"); + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ hours: 1 }), + }); + assert.deepStrictEqual(await mq.getDepth(), { + queued: 2, + ready: 1, + delayed: 1, + }); + } finally { + mq.drop(); + mq[Symbol.dispose](); + } +}); diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 48199eca8..1f2b6e040 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -1,6 +1,7 @@ import { type PlatformDatabase, SqliteDatabase } from "#sqlite"; import type { MessageQueue, + MessageQueueDepth, MessageQueueEnqueueOptions, MessageQueueListenOptions, } from "@fedify/fedify"; @@ -260,6 +261,27 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { }); } + /** + * {@inheritDoc MessageQueue.getDepth} + */ + getDepth(): Promise { + this.initialize(); + const now = Temporal.Now.instant().epochMilliseconds; + const row = this.#db + .prepare( + `SELECT + COUNT(*) AS queued, + COALESCE(SUM(CASE WHEN scheduled <= ? THEN 1 ELSE 0 END), 0) AS ready + FROM "${this.#tableName}"`, + ) + .get(now) as { queued: number; ready: number }; + return Promise.resolve({ + queued: row.queued, + ready: row.ready, + delayed: row.queued - row.ready, + }); + } + /** * {@inheritDoc MessageQueue.listen} */ From 6bfd11604b348516fdf0a1b5595446147020f075 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Wed, 29 Apr 2026 01:55:29 +0900 Subject: [PATCH 02/20] Tighten queue depth accounting Snapshot delayed in-process message batches before scheduling them so later caller mutations cannot affect queued depth or delivery. Read Redis queued and ready counts in one Lua script so concurrent writes cannot produce a negative delayed count, and count PostgreSQL messages scheduled exactly at CURRENT_TIMESTAMP as ready. https://github.com/fedify-dev/fedify/pull/748#discussion_r3155713403 https://github.com/fedify-dev/fedify/pull/748#discussion_r3155699831 https://github.com/fedify-dev/fedify/pull/748#discussion_r3155713426 Assisted-by: Codex:gpt-5.5 --- packages/fedify/src/federation/mq.test.ts | 25 +++++++++++++++++++++++ packages/fedify/src/federation/mq.ts | 11 +++++++--- packages/postgres/src/mq.ts | 2 +- packages/redis/src/mq.ts | 17 +++++++++++---- 4 files changed, 47 insertions(+), 8 deletions(-) diff --git a/packages/fedify/src/federation/mq.test.ts b/packages/fedify/src/federation/mq.test.ts index 1120507a6..4924b0680 100644 --- a/packages/fedify/src/federation/mq.test.ts +++ b/packages/fedify/src/federation/mq.test.ts @@ -160,6 +160,31 @@ test("InProcessMessageQueue.getDepth()", async () => { }); }); +test("InProcessMessageQueue.getDepth() snapshots delayed batches", async () => { + const mq = new InProcessMessageQueue(); + const messages = ["first", "second"]; + await mq.enqueueMany(messages, { + delay: Temporal.Duration.from({ milliseconds: 10 }), + }); + messages.length = 0; + assertEquals(await mq.getDepth(), { + queued: 2, + ready: 0, + delayed: 2, + }); + + const handled: string[] = []; + const controller = new AbortController(); + const listening = mq.listen((message: string) => { + handled.push(message); + if (handled.length >= 2) controller.abort(); + }, { signal: controller.signal }); + + await waitFor(() => handled.length >= 2, 15_000); + await listening; + assertEquals(handled, ["first", "second"]); +}); + test("InProcessMessageQueue.getDepth() excludes in-flight messages", async () => { const mq = new InProcessMessageQueue(); let resolveHandler: (() => void) | undefined; diff --git a/packages/fedify/src/federation/mq.ts b/packages/fedify/src/federation/mq.ts index 60ad40ecf..cd5e4d924 100644 --- a/packages/fedify/src/federation/mq.ts +++ b/packages/fedify/src/federation/mq.ts @@ -222,11 +222,16 @@ export class InProcessMessageQueue implements MessageQueue { ? 0 : Math.max(options.delay.total("millisecond"), 0); if (delay > 0) { - this.#delayedMessages += messages.length; + const delayedCount = messages.length; + const deferredMessages = [...messages]; + this.#delayedMessages += delayedCount; setTimeout( () => { - this.#delayedMessages -= messages.length; - void this.enqueueMany(messages, { ...options, delay: undefined }); + this.#delayedMessages -= delayedCount; + void this.enqueueMany(deferredMessages, { + ...options, + delay: undefined, + }); }, delay, ); diff --git a/packages/postgres/src/mq.ts b/packages/postgres/src/mq.ts index bee030246..b207193c6 100644 --- a/packages/postgres/src/mq.ts +++ b/packages/postgres/src/mq.ts @@ -234,7 +234,7 @@ export class PostgresMessageQueue implements MessageQueue { SELECT COUNT(*) AS queued, COUNT(*) FILTER ( - WHERE created + delay < CURRENT_TIMESTAMP + WHERE created + delay <= CURRENT_TIMESTAMP ) AS ready FROM ${this.#sql(this.#tableName)} `; diff --git a/packages/redis/src/mq.ts b/packages/redis/src/mq.ts index 031a706f0..a6300ff2b 100644 --- a/packages/redis/src/mq.ts +++ b/packages/redis/src/mq.ts @@ -204,10 +204,19 @@ export class RedisMessageQueue implements MessageQueue, Disposable { async getDepth(): Promise { const now = Temporal.Now.instant().epochMilliseconds; - const [queued, ready] = await Promise.all([ - this.#redis.zcard(this.#queueKey), - this.#redis.zcount(this.#queueKey, "-inf", now), - ]); + const [queuedCount, readyCount] = await this.#redis.eval( + ` + return { + redis.call("ZCARD", KEYS[1]), + redis.call("ZCOUNT", KEYS[1], "-inf", ARGV[1]) + } + `, + 1, + this.#queueKey, + now, + ) as [number, number]; + const queued = Number(queuedCount); + const ready = Number(readyCount); return { queued, ready, From da0335b63146e8640440510f97e85fd2aea3142a Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Wed, 29 Apr 2026 01:57:38 +0900 Subject: [PATCH 03/20] Handle missing AMQP delayed queues Treat only AMQP 404 queue-not-found errors as evidence that a tracked delayed queue has disappeared. Other passive queue-check failures now propagate instead of silently dropping queue tracking, and the temporary channel is closed through a single helper that tolerates server-closed channels. https://github.com/fedify-dev/fedify/pull/748#discussion_r3155697626 https://github.com/fedify-dev/fedify/pull/748#discussion_r3155713391 Assisted-by: Codex:gpt-5.5 --- packages/amqp/src/mq.ts | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/packages/amqp/src/mq.ts b/packages/amqp/src/mq.ts index 0a7fb0d4c..ad42de7ee 100644 --- a/packages/amqp/src/mq.ts +++ b/packages/amqp/src/mq.ts @@ -8,6 +8,11 @@ import type { import type { Channel, ChannelModel, ConsumeMessage } from "amqplib"; import { Buffer } from "node:buffer"; +function isQueueNotFoundError(error: unknown): boolean { + return typeof error === "object" && error != null && + "code" in error && error.code === 404; +} + /** * Options for ordering key support in {@link AmqpMessageQueue}. * @@ -364,7 +369,17 @@ export class AmqpMessageQueue implements MessageQueue { } async getDepth(): Promise { - let channel = await this.#connection.createChannel(); + let channel: Channel | undefined = await this.#connection.createChannel(); + const closeChannel = async () => { + if (channel == null) return; + const currentChannel = channel; + channel = undefined; + try { + await currentChannel.close(); + } catch { + // The channel can already be closed by a failed passive queue check. + } + }; try { await this.#prepareQueue(channel); await this.#prepareOrdering(channel); @@ -381,13 +396,12 @@ export class AmqpMessageQueue implements MessageQueue { for (const queue of [...this.#delayedQueues]) { try { delayed += (await channel.checkQueue(queue)).messageCount; - } catch { - this.#delayedQueues.delete(queue); - try { - await channel.close(); - } catch { - // The channel is usually already closed after a failed checkQueue(). + } catch (error) { + if (!isQueueNotFoundError(error)) { + throw error; } + this.#delayedQueues.delete(queue); + await closeChannel(); channel = await this.#connection.createChannel(); } } @@ -398,11 +412,7 @@ export class AmqpMessageQueue implements MessageQueue { delayed, }; } finally { - try { - await channel.close(); - } catch { - // The channel can already be closed if a tracked delayed queue vanished. - } + await closeChannel(); } } From 9d5cd364c8e18bf9fbb756f2628d7eacaa9db89e Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Wed, 29 Apr 2026 02:00:29 +0900 Subject: [PATCH 04/20] Use fixture for MySQL depth test Run the new MysqlMessageQueue.getDepth() coverage through the @fedify/fixture test adapter so the package test remains runtime-agnostic. https://github.com/fedify-dev/fedify/pull/748#discussion_r3155713416 Assisted-by: Codex:gpt-5.5 --- packages/mysql/src/mq.test.ts | 55 +++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/packages/mysql/src/mq.test.ts b/packages/mysql/src/mq.test.ts index f96a99f99..3ca67fbc3 100644 --- a/packages/mysql/src/mq.test.ts +++ b/packages/mysql/src/mq.test.ts @@ -1,3 +1,4 @@ +import { test as fixtureTest } from "@fedify/fixture"; import { MysqlMessageQueue } from "@fedify/mysql/mq"; import { getRandomKey, testMessageQueue, waitFor } from "@fedify/testing"; import * as temporal from "@js-temporal/polyfill"; @@ -97,31 +98,35 @@ test("MysqlMessageQueue", { skip: dbUrl == null }, () => { ); }); -test("MysqlMessageQueue.getDepth()", { skip: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option - const pool = mysql.createPool(dbUrl); - const tableName = randomTableName("depth"); - const mq = new MysqlMessageQueue(pool, { tableName }); - try { - assert.deepStrictEqual(await mq.getDepth(), { - queued: 0, - ready: 0, - delayed: 0, - }); - await mq.enqueue("ready"); - await mq.enqueue("delayed", { - delay: Temporal.Duration.from({ hours: 1 }), - }); - assert.deepStrictEqual(await mq.getDepth(), { - queued: 2, - ready: 1, - delayed: 1, - }); - } finally { - await mq.drop(); - await pool.end(); - } -}); +fixtureTest( + "MysqlMessageQueue.getDepth()", + { ignore: dbUrl == null }, + async () => { + if (dbUrl == null) return; // Bun does not support skip option + const pool = mysql.createPool(dbUrl); + const tableName = randomTableName("depth"); + const mq = new MysqlMessageQueue(pool, { tableName }); + try { + assert.deepStrictEqual(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + await mq.enqueue("ready"); + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ hours: 1 }), + }); + assert.deepStrictEqual(await mq.getDepth(), { + queued: 2, + ready: 1, + delayed: 1, + }); + } finally { + await mq.drop(); + await pool.end(); + } + }, +); // --------------------------------------------------------------------------- // initialize() and drop() From 5b4c2bacebd20d1ca88042d6c706a9df61791cd8 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Wed, 29 Apr 2026 02:02:30 +0900 Subject: [PATCH 05/20] Clarify queue depth documentation Mention the MessageQueueDepth return type in the changelog, avoid showing a zero-filled custom getDepth() fallback, and document that ParallelMessageQueue reports the same depth support as its wrapped queue. https://github.com/fedify-dev/fedify/pull/748#discussion_r3155713362 https://github.com/fedify-dev/fedify/pull/748#discussion_r3155713374 https://github.com/fedify-dev/fedify/pull/748#discussion_r3155713389 Assisted-by: Codex:gpt-5.5 --- CHANGES.md | 9 +++++---- docs/manual/mq.md | 9 +++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ddc62d239..a101698bc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,10 +10,11 @@ To be released. ### @fedify/fedify - - Added optional `MessageQueue.getDepth()` support for reporting queue - backlog depth. `InProcessMessageQueue` can now report queued messages, - including ready and delayed counts, and `ParallelMessageQueue` delegates - depth reporting to its wrapped queue when supported. [[#735], [#748]] + - Added optional `MessageQueue.getDepth()` support, using the new + `MessageQueueDepth` return type, for reporting queue backlog depth. + `InProcessMessageQueue` can now report queued messages, including ready + and delayed counts, and `ParallelMessageQueue` delegates depth reporting + to its wrapped queue when supported. [[#735], [#748]] [#735]: https://github.com/fedify-dev/fedify/issues/735 [#748]: https://github.com/fedify-dev/fedify/pull/748 diff --git a/docs/manual/mq.md b/docs/manual/mq.md index 807c81f1e..db78998d3 100644 --- a/docs/manual/mq.md +++ b/docs/manual/mq.md @@ -695,10 +695,10 @@ class CustomMessageQueue implements MessageQueue { // Implementation here } - async getDepth(): Promise { - // Optional: return queue depth for observability - return { queued: 0, ready: 0, delayed: 0 }; - } + // Optional: implement only if your backend can report real counts. + // async getDepth(): Promise { + // return { queued, ready, delayed }; + // } } ~~~~ @@ -1016,6 +1016,7 @@ if (depth != null) { | [`AmqpMessageQueue`] | `queued`, `ready`, `delayed`[^amqp-depth] | | [`SqliteMessageQueue`] | `queued`, `ready`, `delayed` | | `WorkersMessageQueue` | No reliable platform count | +| `ParallelMessageQueue` | Same as wrapped queue | If you pass the same `MessageQueue` instance as the shared queue for inbox, outbox, and fanout work, observability code should report that queue once as a From bc8fac01a7ceea90054b2bf19e39e2bb264606e8 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Wed, 29 Apr 2026 02:12:57 +0900 Subject: [PATCH 06/20] Use Temporal polyfill in depth tests Mirror the existing PostgreSQL and MySQL test pattern so Redis, AMQP, and SQLite depth tests can construct Temporal durations when running under Node versions without a native Temporal global. Assisted-by: Codex:gpt-5.5 --- packages/amqp/src/mq.test.ts | 3 +++ packages/redis/src/mq.test.ts | 3 +++ packages/sqlite/src/mq.test.ts | 3 +++ 3 files changed, 9 insertions(+) diff --git a/packages/amqp/src/mq.test.ts b/packages/amqp/src/mq.test.ts index 908b45fb2..cc29e5e47 100644 --- a/packages/amqp/src/mq.test.ts +++ b/packages/amqp/src/mq.test.ts @@ -1,12 +1,15 @@ import { suite } from "@alinea/suite"; import { AmqpMessageQueue } from "@fedify/amqp/mq"; import { getRandomKey, testMessageQueue, waitFor } from "@fedify/testing"; +import * as temporal from "@js-temporal/polyfill"; import { assert, assertEquals, assertFalse, assertGreater } from "@std/assert"; import { delay } from "@std/async/delay"; // @deno-types="npm:@types/amqplib" import { type ChannelModel, connect } from "amqplib"; import process from "node:process"; +const Temporal = globalThis.Temporal ?? temporal.Temporal; + const AMQP_URL = process.env.AMQP_URL; const test = AMQP_URL ? suite(import.meta) : suite(import.meta).skip; diff --git a/packages/redis/src/mq.test.ts b/packages/redis/src/mq.test.ts index 793642a8a..a2069e954 100644 --- a/packages/redis/src/mq.test.ts +++ b/packages/redis/src/mq.test.ts @@ -1,11 +1,14 @@ import { test } from "@fedify/fixture"; import { RedisMessageQueue } from "@fedify/redis/mq"; import { getRandomKey, testMessageQueue } from "@fedify/testing"; +import * as temporal from "@js-temporal/polyfill"; import assert from "node:assert/strict"; import process from "node:process"; import { Redis } from "ioredis"; +const Temporal = globalThis.Temporal ?? temporal.Temporal; + const dbUrl = process.env.REDIS_URL; async function disposeMessageQueue(mq: object): Promise { diff --git a/packages/sqlite/src/mq.test.ts b/packages/sqlite/src/mq.test.ts index 626d9f1c3..e1dc2059d 100644 --- a/packages/sqlite/src/mq.test.ts +++ b/packages/sqlite/src/mq.test.ts @@ -2,11 +2,14 @@ import { PlatformDatabase } from "#sqlite"; import { test } from "@fedify/fixture"; import { SqliteMessageQueue } from "@fedify/sqlite/mq"; import { getRandomKey, testMessageQueue } from "@fedify/testing"; +import * as temporal from "@js-temporal/polyfill"; import assert from "node:assert/strict"; import { mkdtemp } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; +const Temporal = globalThis.Temporal ?? temporal.Temporal; + const dbDir = await mkdtemp(join(tmpdir(), "fedify-sqlite-")); const dbPath = join(dbDir, `${getRandomKey("sqlite")}.db`); const db = new PlatformDatabase(dbPath); From 7ee9b24e921eaac9f1b6617f740d42fc90cb952b Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Wed, 29 Apr 2026 02:25:13 +0900 Subject: [PATCH 07/20] Harden queue depth counts Clamp delayed queue depth counts defensively across database adapters and route SQLite depth reads through the existing busy retry path. https://github.com/fedify-dev/fedify/pull/748#discussion_r3155951927 https://github.com/fedify-dev/fedify/pull/748#discussion_r3155951940 https://github.com/fedify-dev/fedify/pull/748#discussion_r3155951947 https://github.com/fedify-dev/fedify/pull/748#discussion_r3155951961 https://github.com/fedify-dev/fedify/pull/748#discussion_r3155952352 Assisted-by: Codex:gpt-5.5 --- packages/mysql/src/mq.ts | 2 +- packages/postgres/src/mq.ts | 2 +- packages/redis/src/mq.ts | 2 +- packages/sqlite/src/mq.ts | 28 +++++++++++++++------------- 4 files changed, 18 insertions(+), 16 deletions(-) diff --git a/packages/mysql/src/mq.ts b/packages/mysql/src/mq.ts index a9ab14260..e954d8ff6 100644 --- a/packages/mysql/src/mq.ts +++ b/packages/mysql/src/mq.ts @@ -323,7 +323,7 @@ export class MysqlMessageQueue implements MessageQueue { return { queued, ready, - delayed: queued - ready, + delayed: Math.max(0, queued - ready), }; } diff --git a/packages/postgres/src/mq.ts b/packages/postgres/src/mq.ts index b207193c6..6418f77b7 100644 --- a/packages/postgres/src/mq.ts +++ b/packages/postgres/src/mq.ts @@ -243,7 +243,7 @@ export class PostgresMessageQueue implements MessageQueue { return { queued, ready, - delayed: queued - ready, + delayed: Math.max(0, queued - ready), }; } diff --git a/packages/redis/src/mq.ts b/packages/redis/src/mq.ts index a6300ff2b..d658d04f3 100644 --- a/packages/redis/src/mq.ts +++ b/packages/redis/src/mq.ts @@ -220,7 +220,7 @@ export class RedisMessageQueue implements MessageQueue, Disposable { return { queued, ready, - delayed: queued - ready, + delayed: Math.max(0, queued - ready), }; } diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 1f2b6e040..30332b784 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -264,22 +264,24 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { /** * {@inheritDoc MessageQueue.getDepth} */ - getDepth(): Promise { + async getDepth(): Promise { this.initialize(); - const now = Temporal.Now.instant().epochMilliseconds; - const row = this.#db - .prepare( - `SELECT - COUNT(*) AS queued, - COALESCE(SUM(CASE WHEN scheduled <= ? THEN 1 ELSE 0 END), 0) AS ready - FROM "${this.#tableName}"`, - ) - .get(now) as { queued: number; ready: number }; - return Promise.resolve({ + const row = await this.#retryOnBusy(() => { + const now = Temporal.Now.instant().epochMilliseconds; + return this.#db + .prepare( + `SELECT + COUNT(*) AS queued, + COALESCE(SUM(CASE WHEN scheduled <= ? THEN 1 ELSE 0 END), 0) AS ready + FROM "${this.#tableName}"`, + ) + .get(now) as { queued: number; ready: number }; + }); + return { queued: row.queued, ready: row.ready, - delayed: row.queued - row.ready, - }); + delayed: Math.max(0, row.queued - row.ready), + }; } /** From c8d1b576e572f0d477c078e23ca6583e757305f0 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Wed, 29 Apr 2026 02:27:42 +0900 Subject: [PATCH 08/20] Harden AMQP depth checks Prune tracked delayed queues after their TTL window and add an error listener to temporary depth channels so passive check failures cannot surface as unhandled channel errors. https://github.com/fedify-dev/fedify/pull/748#discussion_r3155952359 https://github.com/fedify-dev/fedify/pull/748#discussion_r3155960444 Assisted-by: Codex:gpt-5.5 --- packages/amqp/src/mq.ts | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/packages/amqp/src/mq.ts b/packages/amqp/src/mq.ts index ad42de7ee..fa2577df6 100644 --- a/packages/amqp/src/mq.ts +++ b/packages/amqp/src/mq.ts @@ -133,7 +133,7 @@ export class AmqpMessageQueue implements MessageQueue { queuePrefix: string; partitions: number; }; - #delayedQueues: Set = new Set(); + #delayedQueues: Map = new Map(); #orderingPrepared: boolean = false; readonly nativeRetrial: boolean; @@ -270,7 +270,7 @@ export class AmqpMessageQueue implements MessageQueue { deadLetterRoutingKey, messageTtl: delay, }); - this.#delayedQueues.add(queue); + this.#trackDelayedQueue(queue, delay); } channel.sendToQueue( queue, @@ -353,7 +353,7 @@ export class AmqpMessageQueue implements MessageQueue { deadLetterRoutingKey, messageTtl: delay, }); - this.#delayedQueues.add(queue); + this.#trackDelayedQueue(queue, delay); } for (const message of messages) { @@ -368,8 +368,26 @@ export class AmqpMessageQueue implements MessageQueue { } } + #trackDelayedQueue(queue: string, delay: number): void { + this.#delayedQueues.set(queue, Date.now() + Math.max(0, delay) + 60_000); + this.#pruneDelayedQueues(); + } + + #pruneDelayedQueues(): void { + const now = Date.now(); + for (const [queue, expiresAt] of this.#delayedQueues) { + if (expiresAt <= now) this.#delayedQueues.delete(queue); + } + } + + async #createDepthChannel(): Promise { + const channel = await this.#connection.createChannel(); + channel.on("error", () => undefined); + return channel; + } + async getDepth(): Promise { - let channel: Channel | undefined = await this.#connection.createChannel(); + let channel: Channel | undefined = await this.#createDepthChannel(); const closeChannel = async () => { if (channel == null) return; const currentChannel = channel; @@ -393,7 +411,8 @@ export class AmqpMessageQueue implements MessageQueue { } let delayed = 0; - for (const queue of [...this.#delayedQueues]) { + this.#pruneDelayedQueues(); + for (const queue of [...this.#delayedQueues.keys()]) { try { delayed += (await channel.checkQueue(queue)).messageCount; } catch (error) { @@ -402,7 +421,7 @@ export class AmqpMessageQueue implements MessageQueue { } this.#delayedQueues.delete(queue); await closeChannel(); - channel = await this.#connection.createChannel(); + channel = await this.#createDepthChannel(); } } From b14f2ef48c8a8e539c28ce0444005c36f2192849 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Wed, 29 Apr 2026 02:27:45 +0900 Subject: [PATCH 09/20] Reduce delayed depth test flakiness Increase the delayed batch snapshot window so busy CI workers do not turn the delayed messages ready before the depth assertion runs. https://github.com/fedify-dev/fedify/pull/748#discussion_r3155960461 Assisted-by: Codex:gpt-5.5 --- packages/fedify/src/federation/mq.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/fedify/src/federation/mq.test.ts b/packages/fedify/src/federation/mq.test.ts index 4924b0680..1c5b807af 100644 --- a/packages/fedify/src/federation/mq.test.ts +++ b/packages/fedify/src/federation/mq.test.ts @@ -164,7 +164,7 @@ test("InProcessMessageQueue.getDepth() snapshots delayed batches", async () => { const mq = new InProcessMessageQueue(); const messages = ["first", "second"]; await mq.enqueueMany(messages, { - delay: Temporal.Duration.from({ milliseconds: 10 }), + delay: Temporal.Duration.from({ milliseconds: 250 }), }); messages.length = 0; assertEquals(await mq.getDepth(), { From 3459af26a889fb1ff4aadc6bdc7609428cb4767f Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Wed, 29 Apr 2026 02:30:37 +0900 Subject: [PATCH 10/20] Make AMQP depth test cleanup best-effort Ensure a teardown failure while deleting one queue does not skip the remaining queue cleanup or channel and connection close calls. https://github.com/fedify-dev/fedify/pull/748#discussion_r3155997102 Assisted-by: Codex:gpt-5.5 --- packages/amqp/src/mq.test.ts | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/packages/amqp/src/mq.test.ts b/packages/amqp/src/mq.test.ts index cc29e5e47..27dc76279 100644 --- a/packages/amqp/src/mq.test.ts +++ b/packages/amqp/src/mq.test.ts @@ -72,11 +72,15 @@ test( delayed: 1, }); } finally { - const channel = await conn.createChannel(); - await channel.deleteQueue(queue); - await channel.deleteQueue(`${delayedQueuePrefix}60000`).catch(() => {}); - await channel.close(); - await conn.close(); + const channel = await conn.createChannel().catch(() => undefined); + try { + await channel?.deleteQueue(queue).catch(() => {}); + await channel?.deleteQueue(`${delayedQueuePrefix}60000`).catch(() => { + }); + } finally { + await channel?.close().catch(() => {}); + await conn.close().catch(() => {}); + } } }, ); From 07173a5570c157082d928777b25a98feab61b81e Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Wed, 29 Apr 2026 02:46:34 +0900 Subject: [PATCH 11/20] Keep AMQP depth probes passive Avoid declaring queues or ordering exchanges while reporting AMQP queue depth. Missing queues now count as zero while preserving the existing channel recreation path for passive checks, and delayed queue tracking is iterated without copying all keys first. https://github.com/fedify-dev/fedify/pull/748#discussion_r3156067173 https://github.com/fedify-dev/fedify/pull/748#discussion_r3156100077 Assisted-by: Codex:gpt-5.5 --- packages/amqp/src/mq.ts | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/packages/amqp/src/mq.ts b/packages/amqp/src/mq.ts index fa2577df6..c9125da8e 100644 --- a/packages/amqp/src/mq.ts +++ b/packages/amqp/src/mq.ts @@ -398,30 +398,37 @@ export class AmqpMessageQueue implements MessageQueue { // The channel can already be closed by a failed passive queue check. } }; + const checkQueue = async ( + queueName: string, + ): Promise => { + if (channel == null) channel = await this.#createDepthChannel(); + try { + return (await channel.checkQueue(queueName)).messageCount; + } catch (error) { + if (!isQueueNotFoundError(error)) { + throw error; + } + await closeChannel(); + channel = await this.#createDepthChannel(); + return undefined; + } + }; try { - await this.#prepareQueue(channel); - await this.#prepareOrdering(channel); - - let ready = (await channel.checkQueue(this.#queue)).messageCount; + let ready = (await checkQueue(this.#queue)) ?? 0; if (this.#ordering != null) { for (let i = 0; i < this.#ordering.partitions; i++) { - ready += (await channel.checkQueue(this.#getOrderingQueueName(i))) - .messageCount; + ready += (await checkQueue(this.#getOrderingQueueName(i))) ?? 0; } } let delayed = 0; this.#pruneDelayedQueues(); - for (const queue of [...this.#delayedQueues.keys()]) { - try { - delayed += (await channel.checkQueue(queue)).messageCount; - } catch (error) { - if (!isQueueNotFoundError(error)) { - throw error; - } + for (const queue of this.#delayedQueues.keys()) { + const messageCount = await checkQueue(queue); + if (messageCount == null) { this.#delayedQueues.delete(queue); - await closeChannel(); - channel = await this.#createDepthChannel(); + } else { + delayed += messageCount; } } From 20b46d59420311970f972c561e1182a7a5a3f3ff Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Wed, 29 Apr 2026 02:46:38 +0900 Subject: [PATCH 12/20] Clean up in-flight depth listener Make the in-flight depth test release its blocked listener even when the assertion fails, so the test cannot leave a pending listen promise behind. https://github.com/fedify-dev/fedify/pull/748#discussion_r3156067178 Assisted-by: Codex:gpt-5.5 --- packages/fedify/src/federation/mq.test.ts | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/packages/fedify/src/federation/mq.test.ts b/packages/fedify/src/federation/mq.test.ts index 1c5b807af..9c2dcdba9 100644 --- a/packages/fedify/src/federation/mq.test.ts +++ b/packages/fedify/src/federation/mq.test.ts @@ -204,15 +204,19 @@ test("InProcessMessageQueue.getDepth() excludes in-flight messages", async () => controller.abort(); }, { signal: controller.signal }); - await mq.enqueue("in-flight"); - await handlerStarted; - assertEquals(await mq.getDepth(), { - queued: 0, - ready: 0, - delayed: 0, - }); - resolveHandler?.(); - await listening; + try { + await mq.enqueue("in-flight"); + await handlerStarted; + assertEquals(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + } finally { + resolveHandler?.(); + controller.abort(); + await listening; + } }); test("InProcessMessageQueue orderingKey", async (t) => { From ae22d1c59d76be7a64dd4c0559577c182a4d0b89 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sat, 2 May 2026 15:19:01 +0900 Subject: [PATCH 13/20] Use fixture for MySQL MQ tests Route the full MySQL message queue test file through the runtime-agnostic fixture test adapter and use fixture ignore options for database-gated cases. Clear the long-poll deadline timer so Deno fixture leak checks stay clean. https://github.com/fedify-dev/fedify/pull/748#discussion_r3176187578 Assisted-by: Codex:gpt-5.5 --- packages/mysql/src/mq.test.ts | 122 ++++++++++++++++++---------------- 1 file changed, 63 insertions(+), 59 deletions(-) diff --git a/packages/mysql/src/mq.test.ts b/packages/mysql/src/mq.test.ts index 3ca67fbc3..9e93451b7 100644 --- a/packages/mysql/src/mq.test.ts +++ b/packages/mysql/src/mq.test.ts @@ -1,10 +1,9 @@ -import { test as fixtureTest } from "@fedify/fixture"; +import { test } from "@fedify/fixture"; import { MysqlMessageQueue } from "@fedify/mysql/mq"; import { getRandomKey, testMessageQueue, waitFor } from "@fedify/testing"; import * as temporal from "@js-temporal/polyfill"; import assert from "node:assert/strict"; import process from "node:process"; -import { test } from "node:test"; import mysql from "mysql2/promise"; const Temporal = globalThis.Temporal ?? temporal.Temporal; @@ -75,8 +74,8 @@ test("MysqlMessageQueue uses default options when none are provided", () => { // Standard shared test suite // --------------------------------------------------------------------------- -test("MysqlMessageQueue", { skip: dbUrl == null }, () => { - if (dbUrl == null) return; // Bun does not support skip option +test("MysqlMessageQueue", { ignore: dbUrl == null }, () => { + if (dbUrl == null) return; const tableName = randomTableName("mq"); const pools: mysql.Pool[] = []; @@ -98,11 +97,11 @@ test("MysqlMessageQueue", { skip: dbUrl == null }, () => { ); }); -fixtureTest( +test( "MysqlMessageQueue.getDepth()", { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl); const tableName = randomTableName("depth"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -132,8 +131,8 @@ fixtureTest( // initialize() and drop() // --------------------------------------------------------------------------- -test("MysqlMessageQueue.initialize()", { skip: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option +test("MysqlMessageQueue.initialize()", { ignore: dbUrl == null }, async () => { + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("init"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -197,9 +196,9 @@ test("MysqlMessageQueue.initialize()", { skip: dbUrl == null }, async () => { test( "MysqlMessageQueue.initialize() is idempotent", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("idem"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -222,8 +221,8 @@ test( }, ); -test("MysqlMessageQueue.drop()", { skip: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option +test("MysqlMessageQueue.drop()", { ignore: dbUrl == null }, async () => { + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("drop"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -245,9 +244,9 @@ test("MysqlMessageQueue.drop()", { skip: dbUrl == null }, async () => { test( "MysqlMessageQueue.drop() resets initialized flag so re-initialize works", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("reinit"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -273,9 +272,9 @@ test( test( "MysqlMessageQueue.drop() waits for in-flight initialize() before dropping", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("droprace"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -308,9 +307,9 @@ test( test( "MysqlMessageQueue concurrent initialization does not throw", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pools: mysql.Pool[] = []; const tableName = randomTableName("concinit"); try { @@ -334,9 +333,9 @@ test( test( "MysqlMessageQueue enqueue() and listen() racing on initialize() is safe", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("race"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -369,9 +368,9 @@ test( test( "MysqlMessageQueue processes messages enqueued before listen() starts", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("preq"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -410,9 +409,9 @@ test( test( "MysqlMessageQueue delayed message is not delivered early", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("delay"); const mq = new MysqlMessageQueue(pool, { @@ -467,9 +466,9 @@ test( test( "MysqlMessageQueue handles 30 concurrent enqueue() calls", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("stress"); const mq = new MysqlMessageQueue(pool, { @@ -511,9 +510,9 @@ test( test( "MysqlMessageQueue.enqueueMany() with empty array is a no-op", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("emptyb"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -540,9 +539,9 @@ test( test( "MysqlMessageQueue.enqueueMany() inserts all messages atomically", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("batcha"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -563,9 +562,9 @@ test( test( "MysqlMessageQueue.enqueueMany() delivers all 100 messages via single INSERT", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("bulk"); const mq = new MysqlMessageQueue(pool, { @@ -602,9 +601,9 @@ test( test( "MysqlMessageQueue.enqueueMany() preserves insertion order for same ordering key", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("bord"); const mq = new MysqlMessageQueue(pool, { @@ -648,9 +647,9 @@ test( test( "MysqlMessageQueue listener survives handler errors", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("hderr"); const mq = new MysqlMessageQueue(pool, { @@ -692,9 +691,9 @@ test( test( "MysqlMessageQueue handlerTimeout prevents hung handler from blocking queue", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("hdto"); const mq = new MysqlMessageQueue(pool, { @@ -737,9 +736,9 @@ test( test( "MysqlMessageQueue handlerTimeout with ordering key releases the lock", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("hdtolk"); const mq = new MysqlMessageQueue(pool, { @@ -785,9 +784,9 @@ test( test( "MysqlMessageQueue advisory lock is released after processing (regression for lock-leak)", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; // Use a pool with a small max to make lock leaks visible const pool = mysql.createPool({ uri: dbUrl!, connectionLimit: 3 }); const tableName = randomTableName("lockleak"); @@ -830,9 +829,9 @@ test( test( "MysqlMessageQueue GET_LOCK succeeds with a very long ordering key", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; // A 200-char ordering key combined with a 20-char table name produces a // raw lock name that is 221 chars — well over MySQL's 64-char limit. // The lock name hashing logic must shorten it before calling GET_LOCK, @@ -874,9 +873,9 @@ test( test( "MysqlMessageQueue delivers each message to exactly one worker", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool1 = mysql.createPool(dbUrl!); const pool2 = mysql.createPool(dbUrl!); const tableName = randomTableName("onceonly"); @@ -934,9 +933,9 @@ test( test( "MysqlMessageQueue two workers preserve ordering-key order", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool1 = mysql.createPool(dbUrl!); const pool2 = mysql.createPool(dbUrl!); const tableName = randomTableName("twowork"); @@ -993,9 +992,9 @@ test( test( "MysqlMessageQueue listen() resolves promptly when aborted during poll interval", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("clrtmo"); // Use a very long poll interval so the test would hang if clearTimeout @@ -1014,13 +1013,18 @@ test( controller.abort(); // listen() must resolve well within the 60-second poll interval. - const deadline = new Promise((_, reject) => - setTimeout( + let deadlineTimer: ReturnType | undefined; + const deadline = new Promise((_, reject) => { + deadlineTimer = setTimeout( () => reject(new Error("listen() did not resolve in time")), 3_000, - ) - ); - await Promise.race([listening, deadline]); + ); + }); + try { + await Promise.race([listening, deadline]); + } finally { + clearTimeout(deadlineTimer); + } } finally { await mq.drop(); await pool.end(); @@ -1034,9 +1038,9 @@ test( test( "MysqlMessageQueue works with getRandomKey() from @fedify/testing", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; // getRandomKey returns names like "fedify_test_mq_" which may contain // hyphens — unsuitable for MySQL identifiers. Users must replace hyphens. const rawKey = getRandomKey("mq"); @@ -1071,9 +1075,9 @@ test( test( "MysqlMessageQueue with initialized: true skips DDL on first use", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("preini"); // Create table manually first From 51ff7a0a9fdc7170dca28b793c022c06897ced04 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sat, 2 May 2026 16:06:14 +0900 Subject: [PATCH 14/20] Probe AMQP queue depths concurrently Use bounded parallel passive queue checks for AMQP queue depth probes. Each probe uses its own short-lived channel so a missing delayed queue can close that channel without affecting other checks. https://github.com/fedify-dev/fedify/pull/748#discussion_r3176249380 Assisted-by: Codex:gpt-5.5 --- packages/amqp/src/mq.test.ts | 82 ++++++++++++++++++++++++++- packages/amqp/src/mq.ts | 107 +++++++++++++++++++++-------------- 2 files changed, 143 insertions(+), 46 deletions(-) diff --git a/packages/amqp/src/mq.test.ts b/packages/amqp/src/mq.test.ts index 27dc76279..3e5bc2dae 100644 --- a/packages/amqp/src/mq.test.ts +++ b/packages/amqp/src/mq.test.ts @@ -5,13 +5,91 @@ import * as temporal from "@js-temporal/polyfill"; import { assert, assertEquals, assertFalse, assertGreater } from "@std/assert"; import { delay } from "@std/async/delay"; // @deno-types="npm:@types/amqplib" -import { type ChannelModel, connect } from "amqplib"; +import { type Channel, type ChannelModel, connect } from "amqplib"; import process from "node:process"; const Temporal = globalThis.Temporal ?? temporal.Temporal; const AMQP_URL = process.env.AMQP_URL; -const test = AMQP_URL ? suite(import.meta) : suite(import.meta).skip; +const unitTest = suite(import.meta); +const test = AMQP_URL ? unitTest : unitTest.skip; + +class FakeDepthChannel { + constructor(private readonly connection: FakeDepthConnection) { + } + + on(): void { + } + + assertQueue(queue: string): Promise { + this.connection.queues.add(queue); + return Promise.resolve(); + } + + sendToQueue(queue: string): boolean { + this.connection.messageCounts.set( + queue, + (this.connection.messageCounts.get(queue) ?? 0) + 1, + ); + return true; + } + + async checkQueue(queue: string): Promise<{ messageCount: number }> { + this.connection.activeChecks++; + this.connection.maxActiveChecks = Math.max( + this.connection.maxActiveChecks, + this.connection.activeChecks, + ); + try { + await delay(25); + return { messageCount: this.connection.messageCounts.get(queue) ?? 0 }; + } finally { + this.connection.activeChecks--; + } + } + + async close(): Promise { + } +} + +class FakeDepthConnection { + readonly queues = new Set(); + readonly messageCounts = new Map(); + activeChecks = 0; + maxActiveChecks = 0; + + createChannel(): Promise { + return Promise.resolve(new FakeDepthChannel(this) as unknown as Channel); + } +} + +unitTest( + "AmqpMessageQueue.getDepth() probes delayed queues concurrently", + async () => { + const conn = new FakeDepthConnection(); + const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, { + queue: "ready", + delayedQueuePrefix: "delayed_", + }); + + await mq.enqueue("first", { + delay: Temporal.Duration.from({ milliseconds: 1_000 }), + }); + await mq.enqueue("second", { + delay: Temporal.Duration.from({ milliseconds: 2_000 }), + }); + await mq.enqueue("third", { + delay: Temporal.Duration.from({ milliseconds: 3_000 }), + }); + + assertEquals(await mq.getDepth(), { + queued: 3, + ready: 0, + delayed: 3, + }); + assertGreater(conn.maxActiveChecks, 1); + }, +); function getConnection(): Promise { return connect(AMQP_URL!); diff --git a/packages/amqp/src/mq.ts b/packages/amqp/src/mq.ts index c9125da8e..a2146d8af 100644 --- a/packages/amqp/src/mq.ts +++ b/packages/amqp/src/mq.ts @@ -13,6 +13,8 @@ function isQueueNotFoundError(error: unknown): boolean { "code" in error && error.code === 404; } +const depthProbeConcurrency = 8; + /** * Options for ordering key support in {@link AmqpMessageQueue}. * @@ -386,60 +388,77 @@ export class AmqpMessageQueue implements MessageQueue { return channel; } - async getDepth(): Promise { - let channel: Channel | undefined = await this.#createDepthChannel(); - const closeChannel = async () => { - if (channel == null) return; - const currentChannel = channel; - channel = undefined; + async #checkQueueDepth(queueName: string): Promise { + const channel = await this.#createDepthChannel(); + try { + return (await channel.checkQueue(queueName)).messageCount; + } catch (error) { + if (!isQueueNotFoundError(error)) { + throw error; + } + return undefined; + } finally { try { - await currentChannel.close(); + await channel.close(); } catch { // The channel can already be closed by a failed passive queue check. } - }; - const checkQueue = async ( - queueName: string, - ): Promise => { - if (channel == null) channel = await this.#createDepthChannel(); - try { - return (await channel.checkQueue(queueName)).messageCount; - } catch (error) { - if (!isQueueNotFoundError(error)) { - throw error; - } - await closeChannel(); - channel = await this.#createDepthChannel(); - return undefined; + } + } + + async #checkQueueDepths( + queueNames: readonly string[], + ): Promise { + const results = new Array( + queueNames.length, + ); + let nextIndex = 0; + const worker = async () => { + while (nextIndex < queueNames.length) { + const index = nextIndex++; + const queue = queueNames[index]; + results[index] = [queue, await this.#checkQueueDepth(queue)]; } }; - try { - let ready = (await checkQueue(this.#queue)) ?? 0; - if (this.#ordering != null) { - for (let i = 0; i < this.#ordering.partitions; i++) { - ready += (await checkQueue(this.#getOrderingQueueName(i))) ?? 0; - } - } + const workers = Array.from( + { length: Math.min(depthProbeConcurrency, queueNames.length) }, + () => worker(), + ); + await Promise.all(workers); + return results; + } - let delayed = 0; - this.#pruneDelayedQueues(); - for (const queue of this.#delayedQueues.keys()) { - const messageCount = await checkQueue(queue); - if (messageCount == null) { - this.#delayedQueues.delete(queue); - } else { - delayed += messageCount; - } + async getDepth(): Promise { + const readyQueues = [this.#queue]; + if (this.#ordering != null) { + for (let i = 0; i < this.#ordering.partitions; i++) { + readyQueues.push(this.#getOrderingQueueName(i)); } + } - return { - queued: ready + delayed, - ready, - delayed, - }; - } finally { - await closeChannel(); + let ready = 0; + for (const [, messageCount] of await this.#checkQueueDepths(readyQueues)) { + ready += messageCount ?? 0; } + + let delayed = 0; + this.#pruneDelayedQueues(); + const delayedQueues = [...this.#delayedQueues.keys()]; + for ( + const [queue, messageCount] of await this.#checkQueueDepths(delayedQueues) + ) { + if (messageCount == null) { + this.#delayedQueues.delete(queue); + } else { + delayed += messageCount; + } + } + + return { + queued: ready + delayed, + ready, + delayed, + }; } async listen( From efd5ccc7a2e53070c7bbdde2b2f91da2c8d6b0a3 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sat, 2 May 2026 16:06:19 +0900 Subject: [PATCH 15/20] Avoid deferred in-process re-enqueue Deliver delayed in-process messages through private ready-queue helpers instead of recursively calling the public enqueue methods from timer callbacks. This avoids ignored promise rejections if a subclass changes those public methods to perform asynchronous work. https://github.com/fedify-dev/fedify/pull/748#discussion_r3176249382 https://github.com/fedify-dev/fedify/pull/748#discussion_r3176249384 Assisted-by: Codex:gpt-5.5 --- packages/fedify/src/federation/mq.test.ts | 72 +++++++++++++++++++++++ packages/fedify/src/federation/mq.ts | 30 ++++++---- 2 files changed, 92 insertions(+), 10 deletions(-) diff --git a/packages/fedify/src/federation/mq.test.ts b/packages/fedify/src/federation/mq.test.ts index 9c2dcdba9..e7c402908 100644 --- a/packages/fedify/src/federation/mq.test.ts +++ b/packages/fedify/src/federation/mq.test.ts @@ -219,6 +219,78 @@ test("InProcessMessageQueue.getDepth() excludes in-flight messages", async () => } }); +test("InProcessMessageQueue delayed enqueue uses the internal ready path", async () => { + class RejectingReadyQueue extends InProcessMessageQueue { + override enqueue( + message: unknown, + options?: { delay?: Temporal.Duration }, + ): Promise { + if (options?.delay == null) { + return Promise.reject(new Error("ready enqueue should not be called")); + } + return super.enqueue(message, options); + } + } + + const mq = new RejectingReadyQueue({ + pollInterval: { milliseconds: 10 }, + }); + const messages: string[] = []; + const controller = new AbortController(); + const listening = mq.listen((message: string) => { + messages.push(message); + controller.abort(); + }, { signal: controller.signal }); + + try { + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ milliseconds: 10 }), + }); + await waitFor(() => messages.length > 0, 2_000); + assertEquals(messages, ["delayed"]); + } finally { + controller.abort(); + await listening; + } +}); + +test("InProcessMessageQueue delayed enqueueMany uses the internal ready path", async () => { + class RejectingReadyQueue extends InProcessMessageQueue { + override enqueueMany( + messages: readonly unknown[], + options?: { delay?: Temporal.Duration }, + ): Promise { + if (options?.delay == null) { + return Promise.reject( + new Error("ready enqueueMany should not be called"), + ); + } + return super.enqueueMany(messages, options); + } + } + + const mq = new RejectingReadyQueue({ + pollInterval: { milliseconds: 10 }, + }); + const messages: string[] = []; + const controller = new AbortController(); + const listening = mq.listen((message: string) => { + messages.push(message); + if (messages.length >= 2) controller.abort(); + }, { signal: controller.signal }); + + try { + await mq.enqueueMany(["first", "second"], { + delay: Temporal.Duration.from({ milliseconds: 10 }), + }); + await waitFor(() => messages.length >= 2, 2_000); + assertEquals(messages, ["first", "second"]); + } finally { + controller.abort(); + await listening; + } +}); + test("InProcessMessageQueue orderingKey", async (t) => { const mq = new InProcessMessageQueue(); diff --git a/packages/fedify/src/federation/mq.ts b/packages/fedify/src/federation/mq.ts index cd5e4d924..36ba9900c 100644 --- a/packages/fedify/src/federation/mq.ts +++ b/packages/fedify/src/federation/mq.ts @@ -199,18 +199,26 @@ export class InProcessMessageQueue implements MessageQueue { setTimeout( () => { this.#delayedMessages--; - void this.enqueue(message, { ...options, delay: undefined }); + this.#enqueueReady(message, options); }, delay, ); return Promise.resolve(); } + this.#enqueueReady(message, options); + return Promise.resolve(); + } + + #enqueueReady(message: any, options?: MessageQueueEnqueueOptions): void { const orderingKey = options?.orderingKey ?? null; this.#messages.push({ message, orderingKey }); + this.#notifyMonitors(); + } + + #notifyMonitors(): void { for (const monitorId in this.#monitors) { this.#monitors[monitorId as ReturnType](); } - return Promise.resolve(); } enqueueMany( @@ -228,23 +236,25 @@ export class InProcessMessageQueue implements MessageQueue { setTimeout( () => { this.#delayedMessages -= delayedCount; - void this.enqueueMany(deferredMessages, { - ...options, - delay: undefined, - }); + this.#enqueueManyReady(deferredMessages, options); }, delay, ); return Promise.resolve(); } + this.#enqueueManyReady(messages, options); + return Promise.resolve(); + } + + #enqueueManyReady( + messages: readonly any[], + options?: MessageQueueEnqueueOptions, + ): void { const orderingKey = options?.orderingKey ?? null; for (const message of messages) { this.#messages.push({ message, orderingKey }); } - for (const monitorId in this.#monitors) { - this.#monitors[monitorId as ReturnType](); - } - return Promise.resolve(); + this.#notifyMonitors(); } async listen( From 07d70b2782a4e6a42f30e8ec08cb772ecd183454 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sat, 2 May 2026 16:15:58 +0900 Subject: [PATCH 16/20] Defer AMQP delayed queue pruning Keep AMQP delayed queue tracking cheap on enqueue by recording the expiry without scanning the whole map. Stale delayed queue names are still pruned before getDepth() probes them. https://github.com/fedify-dev/fedify/pull/748#discussion_r3176301051 Assisted-by: Codex:gpt-5.5 --- packages/amqp/src/mq.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/amqp/src/mq.ts b/packages/amqp/src/mq.ts index a2146d8af..a04223cb8 100644 --- a/packages/amqp/src/mq.ts +++ b/packages/amqp/src/mq.ts @@ -372,7 +372,6 @@ export class AmqpMessageQueue implements MessageQueue { #trackDelayedQueue(queue: string, delay: number): void { this.#delayedQueues.set(queue, Date.now() + Math.max(0, delay) + 60_000); - this.#pruneDelayedQueues(); } #pruneDelayedQueues(): void { From 442ab0a440cd72f25f2b06d5d931f821d618dd7a Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sat, 2 May 2026 16:27:25 +0900 Subject: [PATCH 17/20] Keep AMQP delayed queues tracked Track AMQP delayed queue names until the broker reports that the queue is missing. This avoids under-reporting depth when RabbitMQ delays TTL cleanup or dead-letter transfer beyond a local wall-clock estimate. https://github.com/fedify-dev/fedify/pull/748#discussion_r3176309090 Assisted-by: Codex:gpt-5.5 --- packages/amqp/src/mq.test.ts | 29 +++++++++++++++++++++++++++++ packages/amqp/src/mq.ts | 20 ++++++-------------- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/packages/amqp/src/mq.test.ts b/packages/amqp/src/mq.test.ts index 3e5bc2dae..31288679a 100644 --- a/packages/amqp/src/mq.test.ts +++ b/packages/amqp/src/mq.test.ts @@ -91,6 +91,35 @@ unitTest( }, ); +unitTest( + "AmqpMessageQueue.getDepth() keeps delayed queues past local expiry", + async () => { + const now = Date.now; + const started = now(); + Date.now = () => started; + try { + const conn = new FakeDepthConnection(); + const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, { + queue: "ready", + delayedQueuePrefix: "delayed_", + }); + + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ milliseconds: 1_000 }), + }); + Date.now = () => started + 62_000; + + assertEquals(await mq.getDepth(), { + queued: 1, + ready: 0, + delayed: 1, + }); + } finally { + Date.now = now; + } + }, +); + function getConnection(): Promise { return connect(AMQP_URL!); } diff --git a/packages/amqp/src/mq.ts b/packages/amqp/src/mq.ts index a04223cb8..1fe9043c8 100644 --- a/packages/amqp/src/mq.ts +++ b/packages/amqp/src/mq.ts @@ -135,7 +135,7 @@ export class AmqpMessageQueue implements MessageQueue { queuePrefix: string; partitions: number; }; - #delayedQueues: Map = new Map(); + #delayedQueues: Set = new Set(); #orderingPrepared: boolean = false; readonly nativeRetrial: boolean; @@ -272,7 +272,7 @@ export class AmqpMessageQueue implements MessageQueue { deadLetterRoutingKey, messageTtl: delay, }); - this.#trackDelayedQueue(queue, delay); + this.#trackDelayedQueue(queue); } channel.sendToQueue( queue, @@ -355,7 +355,7 @@ export class AmqpMessageQueue implements MessageQueue { deadLetterRoutingKey, messageTtl: delay, }); - this.#trackDelayedQueue(queue, delay); + this.#trackDelayedQueue(queue); } for (const message of messages) { @@ -370,15 +370,8 @@ export class AmqpMessageQueue implements MessageQueue { } } - #trackDelayedQueue(queue: string, delay: number): void { - this.#delayedQueues.set(queue, Date.now() + Math.max(0, delay) + 60_000); - } - - #pruneDelayedQueues(): void { - const now = Date.now(); - for (const [queue, expiresAt] of this.#delayedQueues) { - if (expiresAt <= now) this.#delayedQueues.delete(queue); - } + #trackDelayedQueue(queue: string): void { + this.#delayedQueues.add(queue); } async #createDepthChannel(): Promise { @@ -441,8 +434,7 @@ export class AmqpMessageQueue implements MessageQueue { } let delayed = 0; - this.#pruneDelayedQueues(); - const delayedQueues = [...this.#delayedQueues.keys()]; + const delayedQueues = [...this.#delayedQueues]; for ( const [queue, messageCount] of await this.#checkQueueDepths(delayedQueues) ) { From 4b16dcfde4e1771032d3f79429616b041f7b8880 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sat, 2 May 2026 17:15:56 +0900 Subject: [PATCH 18/20] Expire stale AMQP delayed queues Declare AMQP delayed queues with an expiry margin beyond their message TTL so stale empty delay-specific queues are deleted by the broker. This keeps the tracked delayed queue set from growing without bound while preserving depth accounting until the broker reports the queue as gone. https://github.com/fedify-dev/fedify/pull/748#discussion_r3176324779 https://github.com/fedify-dev/fedify/pull/748#discussion_r3176325597 Assisted-by: Codex:gpt-5.5 --- packages/amqp/src/mq.test.ts | 23 ++++++++++++++++++++++- packages/amqp/src/mq.ts | 3 +++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/packages/amqp/src/mq.test.ts b/packages/amqp/src/mq.test.ts index 31288679a..409ebde24 100644 --- a/packages/amqp/src/mq.test.ts +++ b/packages/amqp/src/mq.test.ts @@ -21,8 +21,14 @@ class FakeDepthChannel { on(): void { } - assertQueue(queue: string): Promise { + assertQueue( + queue: string, + options?: { expires?: number }, + ): Promise { this.connection.queues.add(queue); + if (options?.expires != null) { + this.connection.queueExpires.set(queue, options.expires); + } return Promise.resolve(); } @@ -54,6 +60,7 @@ class FakeDepthChannel { class FakeDepthConnection { readonly queues = new Set(); + readonly queueExpires = new Map(); readonly messageCounts = new Map(); activeChecks = 0; maxActiveChecks = 0; @@ -91,6 +98,20 @@ unitTest( }, ); +unitTest("AmqpMessageQueue sets delayed queue expiry", async () => { + const conn = new FakeDepthConnection(); + const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, { + queue: "ready", + delayedQueuePrefix: "delayed_", + }); + + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ milliseconds: 1_000 }), + }); + + assertEquals(conn.queueExpires.get("delayed_1000"), 61_000); +}); + unitTest( "AmqpMessageQueue.getDepth() keeps delayed queues past local expiry", async () => { diff --git a/packages/amqp/src/mq.ts b/packages/amqp/src/mq.ts index 1fe9043c8..b4aad20f7 100644 --- a/packages/amqp/src/mq.ts +++ b/packages/amqp/src/mq.ts @@ -14,6 +14,7 @@ function isQueueNotFoundError(error: unknown): boolean { } const depthProbeConcurrency = 8; +const delayedQueueExpiryMargin = 60_000; /** * Options for ordering key support in {@link AmqpMessageQueue}. @@ -270,6 +271,7 @@ export class AmqpMessageQueue implements MessageQueue { durable: this.#durable, deadLetterExchange, deadLetterRoutingKey, + expires: delay + delayedQueueExpiryMargin, messageTtl: delay, }); this.#trackDelayedQueue(queue); @@ -353,6 +355,7 @@ export class AmqpMessageQueue implements MessageQueue { durable: this.#durable, deadLetterExchange, deadLetterRoutingKey, + expires: delay + delayedQueueExpiryMargin, messageTtl: delay, }); this.#trackDelayedQueue(queue); From b9b41a65d2ca823248e36a4267005edf0b4dc963 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sat, 2 May 2026 17:26:28 +0900 Subject: [PATCH 19/20] Harden AMQP delayed queue tracking Fall back to the legacy delayed queue declaration when RabbitMQ rejects redeclaring an existing delay queue with x-expires. Cap the local tracking set so services that never call getDepth() do not keep every unique delay queue name forever. https://github.com/fedify-dev/fedify/pull/748#discussion_r3176382979 https://github.com/fedify-dev/fedify/pull/748#discussion_r3176383592 Assisted-by: Codex:gpt-5.5 --- packages/amqp/src/mq.test.ts | 71 +++++++++++++++++++++++++++++++++++- packages/amqp/src/mq.ts | 70 +++++++++++++++++++++++++++++------ 2 files changed, 127 insertions(+), 14 deletions(-) diff --git a/packages/amqp/src/mq.test.ts b/packages/amqp/src/mq.test.ts index 409ebde24..0ec1d7592 100644 --- a/packages/amqp/src/mq.test.ts +++ b/packages/amqp/src/mq.test.ts @@ -15,6 +15,8 @@ const unitTest = suite(import.meta); const test = AMQP_URL ? unitTest : unitTest.skip; class FakeDepthChannel { + #closed = false; + constructor(private readonly connection: FakeDepthConnection) { } @@ -25,6 +27,19 @@ class FakeDepthChannel { queue: string, options?: { expires?: number }, ): Promise { + if (this.#closed) { + return Promise.reject(new Error("Channel is closed")); + } + if ( + options?.expires != null && + this.connection.preconditionOnExpires.has(queue) + ) { + this.#closed = true; + this.connection.preconditionOnExpires.delete(queue); + return Promise.reject( + Object.assign(new Error("PRECONDITION_FAILED"), { code: 406 }), + ); + } this.connection.queues.add(queue); if (options?.expires != null) { this.connection.queueExpires.set(queue, options.expires); @@ -33,6 +48,7 @@ class FakeDepthChannel { } sendToQueue(queue: string): boolean { + if (this.#closed) throw new Error("Channel is closed"); this.connection.messageCounts.set( queue, (this.connection.messageCounts.get(queue) ?? 0) + 1, @@ -41,20 +57,25 @@ class FakeDepthChannel { } async checkQueue(queue: string): Promise<{ messageCount: number }> { + if (this.#closed) throw new Error("Channel is closed"); this.connection.activeChecks++; this.connection.maxActiveChecks = Math.max( this.connection.maxActiveChecks, this.connection.activeChecks, ); try { - await delay(25); + if (this.connection.checkDelayMs > 0) { + await delay(this.connection.checkDelayMs); + } return { messageCount: this.connection.messageCounts.get(queue) ?? 0 }; } finally { this.connection.activeChecks--; } } - async close(): Promise { + close(): Promise { + this.#closed = true; + return Promise.resolve(); } } @@ -62,10 +83,16 @@ class FakeDepthConnection { readonly queues = new Set(); readonly queueExpires = new Map(); readonly messageCounts = new Map(); + readonly preconditionOnExpires = new Set(); activeChecks = 0; + channelCount = 0; maxActiveChecks = 0; + constructor(readonly checkDelayMs: number = 25) { + } + createChannel(): Promise { + this.channelCount++; return Promise.resolve(new FakeDepthChannel(this) as unknown as Channel); } } @@ -112,6 +139,46 @@ unitTest("AmqpMessageQueue sets delayed queue expiry", async () => { assertEquals(conn.queueExpires.get("delayed_1000"), 61_000); }); +unitTest( + "AmqpMessageQueue falls back for existing delayed queues without expiry", + async () => { + const conn = new FakeDepthConnection(); + conn.preconditionOnExpires.add("delayed_1000"); + const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, { + queue: "ready", + delayedQueuePrefix: "delayed_", + }); + + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ milliseconds: 1_000 }), + }); + + assertEquals(conn.messageCounts.get("delayed_1000"), 1); + assertEquals(conn.queueExpires.get("delayed_1000"), undefined); + assertGreater(conn.channelCount, 1); + }, +); + +unitTest("AmqpMessageQueue caps delayed queue tracking", async () => { + const conn = new FakeDepthConnection(0); + const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, { + queue: "ready", + delayedQueuePrefix: "delayed_", + }); + + for (let milliseconds = 1; milliseconds <= 4097; milliseconds++) { + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ milliseconds }), + }); + } + + assertEquals(await mq.getDepth(), { + queued: 4096, + ready: 0, + delayed: 4096, + }); +}); + unitTest( "AmqpMessageQueue.getDepth() keeps delayed queues past local expiry", async () => { diff --git a/packages/amqp/src/mq.ts b/packages/amqp/src/mq.ts index b4aad20f7..8f0fc1a5a 100644 --- a/packages/amqp/src/mq.ts +++ b/packages/amqp/src/mq.ts @@ -13,8 +13,14 @@ function isQueueNotFoundError(error: unknown): boolean { "code" in error && error.code === 404; } +function isPreconditionFailedError(error: unknown): boolean { + return typeof error === "object" && error != null && + "code" in error && error.code === 406; +} + const depthProbeConcurrency = 8; const delayedQueueExpiryMargin = 60_000; +const delayedQueueTrackingLimit = 4096; /** * Options for ordering key support in {@link AmqpMessageQueue}. @@ -204,6 +210,15 @@ export class AmqpMessageQueue implements MessageQueue { return channel; } + async #dropSenderChannel(channel: Channel): Promise { + if (this.#senderChannel === channel) this.#senderChannel = undefined; + try { + await channel.close(); + } catch { + // The channel may already have been closed by an AMQP exception. + } + } + /** * Enqueues a message to be processed. * @@ -224,7 +239,7 @@ export class AmqpMessageQueue implements MessageQueue { message: any, options?: MessageQueueEnqueueOptions, ): Promise { - const channel = await this.#getSenderChannel(); + let channel = await this.#getSenderChannel(); const delay = options?.delay?.total("millisecond"); const orderingKey = options?.orderingKey; @@ -266,13 +281,10 @@ export class AmqpMessageQueue implements MessageQueue { deadLetterExchange = ""; deadLetterRoutingKey = this.#queue; } - await channel.assertQueue(queue, { - autoDelete: true, - durable: this.#durable, + channel = await this.#assertDelayedQueue(channel, queue, { deadLetterExchange, deadLetterRoutingKey, - expires: delay + delayedQueueExpiryMargin, - messageTtl: delay, + delay, }); this.#trackDelayedQueue(queue); } @@ -306,7 +318,7 @@ export class AmqpMessageQueue implements MessageQueue { messages: readonly any[], options?: MessageQueueEnqueueOptions, ): Promise { - const channel = await this.#getSenderChannel(); + let channel = await this.#getSenderChannel(); const delay = options?.delay?.total("millisecond"); const orderingKey = options?.orderingKey; @@ -350,13 +362,10 @@ export class AmqpMessageQueue implements MessageQueue { deadLetterExchange = ""; deadLetterRoutingKey = this.#queue; } - await channel.assertQueue(queue, { - autoDelete: true, - durable: this.#durable, + channel = await this.#assertDelayedQueue(channel, queue, { deadLetterExchange, deadLetterRoutingKey, - expires: delay + delayedQueueExpiryMargin, - messageTtl: delay, + delay, }); this.#trackDelayedQueue(queue); } @@ -375,6 +384,43 @@ export class AmqpMessageQueue implements MessageQueue { #trackDelayedQueue(queue: string): void { this.#delayedQueues.add(queue); + while (this.#delayedQueues.size > delayedQueueTrackingLimit) { + const oldestQueue = this.#delayedQueues.values().next().value; + if (oldestQueue == null) break; + this.#delayedQueues.delete(oldestQueue); + } + } + + async #assertDelayedQueue( + channel: Channel, + queue: string, + options: { + deadLetterExchange?: string; + deadLetterRoutingKey?: string; + delay: number; + }, + ): Promise { + const assertOptions = { + autoDelete: true, + durable: this.#durable, + deadLetterExchange: options.deadLetterExchange, + deadLetterRoutingKey: options.deadLetterRoutingKey, + messageTtl: options.delay, + }; + try { + await channel.assertQueue(queue, { + ...assertOptions, + expires: options.delay + delayedQueueExpiryMargin, + }); + return channel; + } catch (error) { + if (!isPreconditionFailedError(error)) throw error; + await this.#dropSenderChannel(channel); + } + + const fallbackChannel = await this.#getSenderChannel(); + await fallbackChannel.assertQueue(queue, assertOptions); + return fallbackChannel; } async #createDepthChannel(): Promise { From 595d35724aeecc2d70ab21505c4d269016ccdd07 Mon Sep 17 00:00:00 2001 From: Hong Minhee Date: Sat, 2 May 2026 17:40:34 +0900 Subject: [PATCH 20/20] Preserve AMQP delayed depth accuracy Keep delayed queues tracked when the cleanup threshold is exceeded and prune only queues that RabbitMQ reports as missing. Reuse depth-probe channels per worker so checking many tracked queues no longer opens one channel for every queue in the common case. https://github.com/fedify-dev/fedify/pull/748#discussion_r3176391822 https://github.com/fedify-dev/fedify/pull/748#discussion_r3176392549 Assisted-by: Codex:gpt-5.5 --- packages/amqp/src/mq.test.ts | 36 +++++++++++++++-- packages/amqp/src/mq.ts | 78 +++++++++++++++++++++++------------- 2 files changed, 83 insertions(+), 31 deletions(-) diff --git a/packages/amqp/src/mq.test.ts b/packages/amqp/src/mq.test.ts index 0ec1d7592..39deb4dcf 100644 --- a/packages/amqp/src/mq.test.ts +++ b/packages/amqp/src/mq.test.ts @@ -159,26 +159,54 @@ unitTest( }, ); -unitTest("AmqpMessageQueue caps delayed queue tracking", async () => { +unitTest("AmqpMessageQueue reuses depth probe channels", async () => { const conn = new FakeDepthConnection(0); const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, { queue: "ready", delayedQueuePrefix: "delayed_", }); - for (let milliseconds = 1; milliseconds <= 4097; milliseconds++) { + for (let milliseconds = 1; milliseconds <= 12; milliseconds++) { await mq.enqueue("delayed", { delay: Temporal.Duration.from({ milliseconds }), }); } + conn.channelCount = 0; assertEquals(await mq.getDepth(), { - queued: 4096, + queued: 12, ready: 0, - delayed: 4096, + delayed: 12, }); + assert( + conn.channelCount <= 9, + `expected at most 9 depth probe channels, got ${conn.channelCount}`, + ); }); +unitTest( + "AmqpMessageQueue keeps delayed queues past cleanup threshold", + async () => { + const conn = new FakeDepthConnection(0); + const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, { + queue: "ready", + delayedQueuePrefix: "delayed_", + }); + + for (let milliseconds = 1; milliseconds <= 4097; milliseconds++) { + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ milliseconds }), + }); + } + + assertEquals(await mq.getDepth(), { + queued: 4097, + ready: 0, + delayed: 4097, + }); + }, +); + unitTest( "AmqpMessageQueue.getDepth() keeps delayed queues past local expiry", async () => { diff --git a/packages/amqp/src/mq.ts b/packages/amqp/src/mq.ts index 8f0fc1a5a..ef27026c8 100644 --- a/packages/amqp/src/mq.ts +++ b/packages/amqp/src/mq.ts @@ -20,7 +20,7 @@ function isPreconditionFailedError(error: unknown): boolean { const depthProbeConcurrency = 8; const delayedQueueExpiryMargin = 60_000; -const delayedQueueTrackingLimit = 4096; +const delayedQueueCleanupThreshold = 4096; /** * Options for ordering key support in {@link AmqpMessageQueue}. @@ -143,6 +143,7 @@ export class AmqpMessageQueue implements MessageQueue { partitions: number; }; #delayedQueues: Set = new Set(); + #delayedQueueCleanup?: Promise; #orderingPrepared: boolean = false; readonly nativeRetrial: boolean; @@ -384,10 +385,15 @@ export class AmqpMessageQueue implements MessageQueue { #trackDelayedQueue(queue: string): void { this.#delayedQueues.add(queue); - while (this.#delayedQueues.size > delayedQueueTrackingLimit) { - const oldestQueue = this.#delayedQueues.values().next().value; - if (oldestQueue == null) break; - this.#delayedQueues.delete(oldestQueue); + if ( + this.#delayedQueues.size > delayedQueueCleanupThreshold && + this.#delayedQueueCleanup == null + ) { + this.#delayedQueueCleanup = this.#pruneMissingDelayedQueues() + .catch(() => undefined) + .finally(() => { + this.#delayedQueueCleanup = undefined; + }); } } @@ -429,24 +435,6 @@ export class AmqpMessageQueue implements MessageQueue { return channel; } - async #checkQueueDepth(queueName: string): Promise { - const channel = await this.#createDepthChannel(); - try { - return (await channel.checkQueue(queueName)).messageCount; - } catch (error) { - if (!isQueueNotFoundError(error)) { - throw error; - } - return undefined; - } finally { - try { - await channel.close(); - } catch { - // The channel can already be closed by a failed passive queue check. - } - } - } - async #checkQueueDepths( queueNames: readonly string[], ): Promise { @@ -455,10 +443,37 @@ export class AmqpMessageQueue implements MessageQueue { ); let nextIndex = 0; const worker = async () => { - while (nextIndex < queueNames.length) { - const index = nextIndex++; - const queue = queueNames[index]; - results[index] = [queue, await this.#checkQueueDepth(queue)]; + let channel: Channel | undefined; + const closeChannel = async () => { + if (channel == null) return; + const currentChannel = channel; + channel = undefined; + try { + await currentChannel.close(); + } catch { + // The channel can already be closed by a failed passive queue check. + } + }; + const checkQueue = async ( + queue: string, + ): Promise => { + channel ??= await this.#createDepthChannel(); + try { + return (await channel.checkQueue(queue)).messageCount; + } catch (error) { + await closeChannel(); + if (!isQueueNotFoundError(error)) throw error; + return undefined; + } + }; + try { + while (nextIndex < queueNames.length) { + const index = nextIndex++; + const queue = queueNames[index]; + results[index] = [queue, await checkQueue(queue)]; + } + } finally { + await closeChannel(); } }; const workers = Array.from( @@ -469,6 +484,15 @@ export class AmqpMessageQueue implements MessageQueue { return results; } + async #pruneMissingDelayedQueues(): Promise { + const delayedQueues = [...this.#delayedQueues]; + for ( + const [queue, messageCount] of await this.#checkQueueDepths(delayedQueues) + ) { + if (messageCount == null) this.#delayedQueues.delete(queue); + } + } + async getDepth(): Promise { const readyQueues = [this.#queue]; if (this.#ordering != null) {