diff --git a/CHANGES.md b/CHANGES.md index f4add3089..a101698bc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,43 @@ Version 2.3.0 To be released. +### @fedify/fedify + + - Added optional `MessageQueue.getDepth()` support, using the new + `MessageQueueDepth` return type, for reporting queue backlog depth. + `InProcessMessageQueue` can now report queued messages, including ready + and delayed counts, and `ParallelMessageQueue` delegates depth reporting + to its wrapped queue when supported. [[#735], [#748]] + +[#735]: https://github.com/fedify-dev/fedify/issues/735 +[#748]: https://github.com/fedify-dev/fedify/pull/748 + +### @fedify/amqp + + - Added `AmqpMessageQueue.getDepth()` for reporting queued, ready, and + delayed message counts. Delayed counts include queues created or tracked + by the same `AmqpMessageQueue` instance. [[#735], [#748]] + +### @fedify/mysql + + - Added `MysqlMessageQueue.getDepth()` for reporting queued, ready, and + delayed message counts. [[#735], [#748]] + +### @fedify/postgres + + - Added `PostgresMessageQueue.getDepth()` for reporting queued, ready, and + delayed message counts. [[#735], [#748]] + +### @fedify/redis + + - Added `RedisMessageQueue.getDepth()` for reporting queued, ready, and + delayed message counts. [[#735], [#748]] + +### @fedify/sqlite + + - Added `SqliteMessageQueue.getDepth()` for reporting queued, ready, and + delayed message counts. [[#735], [#748]] + Version 2.2.0 ------------- diff --git a/docs/manual/mq.md b/docs/manual/mq.md index 4adf552ca..db78998d3 100644 --- a/docs/manual/mq.md +++ b/docs/manual/mq.md @@ -672,6 +672,7 @@ the `~MessageQueue.enqueue()` and `~MessageQueue.listen()` methods: ~~~~ typescript twoslash import type { MessageQueue, + MessageQueueDepth, MessageQueueEnqueueOptions, MessageQueueListenOptions, } from "@fedify/fedify"; @@ -693,6 +694,11 @@ class CustomMessageQueue implements MessageQueue { ): Promise { // Implementation here } + + // Optional: implement only if your backend can report real counts. + // async getDepth(): Promise { + // return { queued, ready, delayed }; + // } } ~~~~ @@ -747,6 +753,21 @@ you can set the `nativeRetrial` property to `true` to indicate this. When this property is `true`, Fedify will skip its own retry logic and rely on your backend to handle retries, avoiding duplicate retry mechanisms. +### Implement `~MessageQueue.getDepth()` method (optional) + +*This API is available since Fedify 2.3.0.* + +This optional method should return the number of messages still waiting in the +backend queue. It should not include messages that have already been handed to +a worker for processing. Return `queued` for the total waiting messages. If +your backend can cheaply distinguish scheduled messages, also return `ready` +for messages eligible for immediate processing and `delayed` for messages +scheduled for later delivery. + +Implement this method if your queue backend exposes an efficient count +operation. If the platform does not expose reliable counts, omit the method +rather than returning an approximate value that could mislead monitoring. + Parallel message processing --------------------------- @@ -951,6 +972,65 @@ Optimized performance : Backend-specific optimizations for retry logic. +Queue depth reporting +--------------------- + +*This API is available since Fedify 2.3.0.* + +Some message queue implementations expose `~MessageQueue.getDepth()` for +observability. Queue depth means messages still waiting in the backend queue: + +`queued` +: Total waiting messages. This excludes messages currently being handled by + a worker. + +`ready` +: Waiting messages eligible for immediate processing. This value is omitted + when the backend cannot distinguish ready and delayed messages cheaply. + +`delayed` +: Waiting messages scheduled for later delivery. This value is omitted when + the backend cannot distinguish ready and delayed messages cheaply. + +For example: + +~~~~ typescript twoslash +import type { MessageQueue } from "@fedify/fedify"; +declare const queue: MessageQueue; +// ---cut-before--- +const depth = await queue.getDepth?.(); +if (depth != null) { + console.log("Queued messages:", depth.queued); +} +~~~~ + +### Implementation support + +| Implementation | Queue Depth Support | +| ------------------------ | ----------------------------------------- | +| `InProcessMessageQueue` | `queued`, `ready`, `delayed` | +| [`DenoKvMessageQueue`] | No reliable platform count | +| [`RedisMessageQueue`] | `queued`, `ready`, `delayed` | +| [`PostgresMessageQueue`] | `queued`, `ready`, `delayed` | +| [`MysqlMessageQueue`] | `queued`, `ready`, `delayed` | +| [`AmqpMessageQueue`] | `queued`, `ready`, `delayed`[^amqp-depth] | +| [`SqliteMessageQueue`] | `queued`, `ready`, `delayed` | +| `WorkersMessageQueue` | No reliable platform count | +| `ParallelMessageQueue` | Same as wrapped queue | + +If you pass the same `MessageQueue` instance as the shared queue for inbox, +outbox, and fanout work, observability code should report that queue once as a +shared queue. Reporting the same `getDepth()` result separately for each +logical role would double- or triple-count the backlog. + +[^amqp-depth]: `AmqpMessageQueue` can count the configured ready queues and + delayed queues created by the same `AmqpMessageQueue` instance. + AMQP 0-9-1 does not provide a portable queue-listing API, so + delayed queues created by another process before this instance + starts are not included until this instance creates or tracks + them. + + Ordering guarantees ------------------- diff --git a/packages/amqp/src/mq.test.ts b/packages/amqp/src/mq.test.ts index f8499bf66..39deb4dcf 100644 --- a/packages/amqp/src/mq.test.ts +++ b/packages/amqp/src/mq.test.ts @@ -1,14 +1,240 @@ import { suite } from "@alinea/suite"; import { AmqpMessageQueue } from "@fedify/amqp/mq"; import { getRandomKey, testMessageQueue, waitFor } from "@fedify/testing"; +import * as temporal from "@js-temporal/polyfill"; import { assert, assertEquals, assertFalse, assertGreater } from "@std/assert"; import { delay } from "@std/async/delay"; // @deno-types="npm:@types/amqplib" -import { type ChannelModel, connect } from "amqplib"; +import { type Channel, type ChannelModel, connect } from "amqplib"; import process from "node:process"; +const Temporal = globalThis.Temporal ?? temporal.Temporal; + const AMQP_URL = process.env.AMQP_URL; -const test = AMQP_URL ? suite(import.meta) : suite(import.meta).skip; +const unitTest = suite(import.meta); +const test = AMQP_URL ? unitTest : unitTest.skip; + +class FakeDepthChannel { + #closed = false; + + constructor(private readonly connection: FakeDepthConnection) { + } + + on(): void { + } + + assertQueue( + queue: string, + options?: { expires?: number }, + ): Promise { + if (this.#closed) { + return Promise.reject(new Error("Channel is closed")); + } + if ( + options?.expires != null && + this.connection.preconditionOnExpires.has(queue) + ) { + this.#closed = true; + this.connection.preconditionOnExpires.delete(queue); + return Promise.reject( + Object.assign(new Error("PRECONDITION_FAILED"), { code: 406 }), + ); + } + this.connection.queues.add(queue); + if (options?.expires != null) { + this.connection.queueExpires.set(queue, options.expires); + } + return Promise.resolve(); + } + + sendToQueue(queue: string): boolean { + if (this.#closed) throw new Error("Channel is closed"); + this.connection.messageCounts.set( + queue, + (this.connection.messageCounts.get(queue) ?? 0) + 1, + ); + return true; + } + + async checkQueue(queue: string): Promise<{ messageCount: number }> { + if (this.#closed) throw new Error("Channel is closed"); + this.connection.activeChecks++; + this.connection.maxActiveChecks = Math.max( + this.connection.maxActiveChecks, + this.connection.activeChecks, + ); + try { + if (this.connection.checkDelayMs > 0) { + await delay(this.connection.checkDelayMs); + } + return { messageCount: this.connection.messageCounts.get(queue) ?? 0 }; + } finally { + this.connection.activeChecks--; + } + } + + close(): Promise { + this.#closed = true; + return Promise.resolve(); + } +} + +class FakeDepthConnection { + readonly queues = new Set(); + readonly queueExpires = new Map(); + readonly messageCounts = new Map(); + readonly preconditionOnExpires = new Set(); + activeChecks = 0; + channelCount = 0; + maxActiveChecks = 0; + + constructor(readonly checkDelayMs: number = 25) { + } + + createChannel(): Promise { + this.channelCount++; + return Promise.resolve(new FakeDepthChannel(this) as unknown as Channel); + } +} + +unitTest( + "AmqpMessageQueue.getDepth() probes delayed queues concurrently", + async () => { + const conn = new FakeDepthConnection(); + const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, { + queue: "ready", + delayedQueuePrefix: "delayed_", + }); + + await mq.enqueue("first", { + delay: Temporal.Duration.from({ milliseconds: 1_000 }), + }); + await mq.enqueue("second", { + delay: Temporal.Duration.from({ milliseconds: 2_000 }), + }); + await mq.enqueue("third", { + delay: Temporal.Duration.from({ milliseconds: 3_000 }), + }); + + assertEquals(await mq.getDepth(), { + queued: 3, + ready: 0, + delayed: 3, + }); + assertGreater(conn.maxActiveChecks, 1); + }, +); + +unitTest("AmqpMessageQueue sets delayed queue expiry", async () => { + const conn = new FakeDepthConnection(); + const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, { + queue: "ready", + delayedQueuePrefix: "delayed_", + }); + + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ milliseconds: 1_000 }), + }); + + assertEquals(conn.queueExpires.get("delayed_1000"), 61_000); +}); + +unitTest( + "AmqpMessageQueue falls back for existing delayed queues without expiry", + async () => { + const conn = new FakeDepthConnection(); + conn.preconditionOnExpires.add("delayed_1000"); + const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, { + queue: "ready", + delayedQueuePrefix: "delayed_", + }); + + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ milliseconds: 1_000 }), + }); + + assertEquals(conn.messageCounts.get("delayed_1000"), 1); + assertEquals(conn.queueExpires.get("delayed_1000"), undefined); + assertGreater(conn.channelCount, 1); + }, +); + +unitTest("AmqpMessageQueue reuses depth probe channels", async () => { + const conn = new FakeDepthConnection(0); + const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, { + queue: "ready", + delayedQueuePrefix: "delayed_", + }); + + for (let milliseconds = 1; milliseconds <= 12; milliseconds++) { + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ milliseconds }), + }); + } + conn.channelCount = 0; + + assertEquals(await mq.getDepth(), { + queued: 12, + ready: 0, + delayed: 12, + }); + assert( + conn.channelCount <= 9, + `expected at most 9 depth probe channels, got ${conn.channelCount}`, + ); +}); + +unitTest( + "AmqpMessageQueue keeps delayed queues past cleanup threshold", + async () => { + const conn = new FakeDepthConnection(0); + const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, { + queue: "ready", + delayedQueuePrefix: "delayed_", + }); + + for (let milliseconds = 1; milliseconds <= 4097; milliseconds++) { + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ milliseconds }), + }); + } + + assertEquals(await mq.getDepth(), { + queued: 4097, + ready: 0, + delayed: 4097, + }); + }, +); + +unitTest( + "AmqpMessageQueue.getDepth() keeps delayed queues past local expiry", + async () => { + const now = Date.now; + const started = now(); + Date.now = () => started; + try { + const conn = new FakeDepthConnection(); + const mq = new AmqpMessageQueue(conn as unknown as ChannelModel, { + queue: "ready", + delayedQueuePrefix: "delayed_", + }); + + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ milliseconds: 1_000 }), + }); + Date.now = () => started + 62_000; + + assertEquals(await mq.getDepth(), { + queued: 1, + ready: 0, + delayed: 1, + }); + } finally { + Date.now = now; + } + }, +); function getConnection(): Promise { return connect(AMQP_URL!); @@ -37,6 +263,51 @@ test( ), ); +test( + "AmqpMessageQueue.getDepth()", + { sanitizeOps: false, sanitizeExit: false, sanitizeResources: false }, + async () => { + const conn = await getConnection(); + const queue = getRandomKey("depth_queue"); + const delayedQueuePrefix = getRandomKey("depth_delayed") + "_"; + const mq = new AmqpMessageQueue(conn, { queue, delayedQueuePrefix }); + try { + assertEquals(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + await mq.enqueue("ready"); + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ seconds: 60 }), + }); + const started = Date.now(); + while (Date.now() - started < 15_000) { + const depth = await mq.getDepth(); + if (depth.queued === 2 && depth.ready === 1 && depth.delayed === 1) { + break; + } + await delay(100); + } + assertEquals(await mq.getDepth(), { + queued: 2, + ready: 1, + delayed: 1, + }); + } finally { + const channel = await conn.createChannel().catch(() => undefined); + try { + await channel?.deleteQueue(queue).catch(() => {}); + await channel?.deleteQueue(`${delayedQueuePrefix}60000`).catch(() => { + }); + } finally { + await channel?.close().catch(() => {}); + await conn.close().catch(() => {}); + } + } + }, +); + // Test with ordering key support (requires rabbitmq_consistent_hash_exchange plugin) const orderingConnections: ChannelModel[] = []; const orderingQueue = getRandomKey("ordering_queue"); diff --git a/packages/amqp/src/mq.ts b/packages/amqp/src/mq.ts index 641e01a16..ef27026c8 100644 --- a/packages/amqp/src/mq.ts +++ b/packages/amqp/src/mq.ts @@ -1,5 +1,6 @@ import type { MessageQueue, + MessageQueueDepth, MessageQueueEnqueueOptions, MessageQueueListenOptions, } from "@fedify/fedify"; @@ -7,6 +8,20 @@ import type { import type { Channel, ChannelModel, ConsumeMessage } from "amqplib"; import { Buffer } from "node:buffer"; +function isQueueNotFoundError(error: unknown): boolean { + return typeof error === "object" && error != null && + "code" in error && error.code === 404; +} + +function isPreconditionFailedError(error: unknown): boolean { + return typeof error === "object" && error != null && + "code" in error && error.code === 406; +} + +const depthProbeConcurrency = 8; +const delayedQueueExpiryMargin = 60_000; +const delayedQueueCleanupThreshold = 4096; + /** * Options for ordering key support in {@link AmqpMessageQueue}. * @@ -127,6 +142,8 @@ export class AmqpMessageQueue implements MessageQueue { queuePrefix: string; partitions: number; }; + #delayedQueues: Set = new Set(); + #delayedQueueCleanup?: Promise; #orderingPrepared: boolean = false; readonly nativeRetrial: boolean; @@ -194,6 +211,15 @@ export class AmqpMessageQueue implements MessageQueue { return channel; } + async #dropSenderChannel(channel: Channel): Promise { + if (this.#senderChannel === channel) this.#senderChannel = undefined; + try { + await channel.close(); + } catch { + // The channel may already have been closed by an AMQP exception. + } + } + /** * Enqueues a message to be processed. * @@ -214,7 +240,7 @@ export class AmqpMessageQueue implements MessageQueue { message: any, options?: MessageQueueEnqueueOptions, ): Promise { - const channel = await this.#getSenderChannel(); + let channel = await this.#getSenderChannel(); const delay = options?.delay?.total("millisecond"); const orderingKey = options?.orderingKey; @@ -256,13 +282,12 @@ export class AmqpMessageQueue implements MessageQueue { deadLetterExchange = ""; deadLetterRoutingKey = this.#queue; } - await channel.assertQueue(queue, { - autoDelete: true, - durable: this.#durable, + channel = await this.#assertDelayedQueue(channel, queue, { deadLetterExchange, deadLetterRoutingKey, - messageTtl: delay, + delay, }); + this.#trackDelayedQueue(queue); } channel.sendToQueue( queue, @@ -294,7 +319,7 @@ export class AmqpMessageQueue implements MessageQueue { messages: readonly any[], options?: MessageQueueEnqueueOptions, ): Promise { - const channel = await this.#getSenderChannel(); + let channel = await this.#getSenderChannel(); const delay = options?.delay?.total("millisecond"); const orderingKey = options?.orderingKey; @@ -338,13 +363,12 @@ export class AmqpMessageQueue implements MessageQueue { deadLetterExchange = ""; deadLetterRoutingKey = this.#queue; } - await channel.assertQueue(queue, { - autoDelete: true, - durable: this.#durable, + channel = await this.#assertDelayedQueue(channel, queue, { deadLetterExchange, deadLetterRoutingKey, - messageTtl: delay, + delay, }); + this.#trackDelayedQueue(queue); } for (const message of messages) { @@ -359,6 +383,148 @@ export class AmqpMessageQueue implements MessageQueue { } } + #trackDelayedQueue(queue: string): void { + this.#delayedQueues.add(queue); + if ( + this.#delayedQueues.size > delayedQueueCleanupThreshold && + this.#delayedQueueCleanup == null + ) { + this.#delayedQueueCleanup = this.#pruneMissingDelayedQueues() + .catch(() => undefined) + .finally(() => { + this.#delayedQueueCleanup = undefined; + }); + } + } + + async #assertDelayedQueue( + channel: Channel, + queue: string, + options: { + deadLetterExchange?: string; + deadLetterRoutingKey?: string; + delay: number; + }, + ): Promise { + const assertOptions = { + autoDelete: true, + durable: this.#durable, + deadLetterExchange: options.deadLetterExchange, + deadLetterRoutingKey: options.deadLetterRoutingKey, + messageTtl: options.delay, + }; + try { + await channel.assertQueue(queue, { + ...assertOptions, + expires: options.delay + delayedQueueExpiryMargin, + }); + return channel; + } catch (error) { + if (!isPreconditionFailedError(error)) throw error; + await this.#dropSenderChannel(channel); + } + + const fallbackChannel = await this.#getSenderChannel(); + await fallbackChannel.assertQueue(queue, assertOptions); + return fallbackChannel; + } + + async #createDepthChannel(): Promise { + const channel = await this.#connection.createChannel(); + channel.on("error", () => undefined); + return channel; + } + + async #checkQueueDepths( + queueNames: readonly string[], + ): Promise { + const results = new Array( + queueNames.length, + ); + let nextIndex = 0; + const worker = async () => { + let channel: Channel | undefined; + const closeChannel = async () => { + if (channel == null) return; + const currentChannel = channel; + channel = undefined; + try { + await currentChannel.close(); + } catch { + // The channel can already be closed by a failed passive queue check. + } + }; + const checkQueue = async ( + queue: string, + ): Promise => { + channel ??= await this.#createDepthChannel(); + try { + return (await channel.checkQueue(queue)).messageCount; + } catch (error) { + await closeChannel(); + if (!isQueueNotFoundError(error)) throw error; + return undefined; + } + }; + try { + while (nextIndex < queueNames.length) { + const index = nextIndex++; + const queue = queueNames[index]; + results[index] = [queue, await checkQueue(queue)]; + } + } finally { + await closeChannel(); + } + }; + const workers = Array.from( + { length: Math.min(depthProbeConcurrency, queueNames.length) }, + () => worker(), + ); + await Promise.all(workers); + return results; + } + + async #pruneMissingDelayedQueues(): Promise { + const delayedQueues = [...this.#delayedQueues]; + for ( + const [queue, messageCount] of await this.#checkQueueDepths(delayedQueues) + ) { + if (messageCount == null) this.#delayedQueues.delete(queue); + } + } + + async getDepth(): Promise { + const readyQueues = [this.#queue]; + if (this.#ordering != null) { + for (let i = 0; i < this.#ordering.partitions; i++) { + readyQueues.push(this.#getOrderingQueueName(i)); + } + } + + let ready = 0; + for (const [, messageCount] of await this.#checkQueueDepths(readyQueues)) { + ready += messageCount ?? 0; + } + + let delayed = 0; + const delayedQueues = [...this.#delayedQueues]; + for ( + const [queue, messageCount] of await this.#checkQueueDepths(delayedQueues) + ) { + if (messageCount == null) { + this.#delayedQueues.delete(queue); + } else { + delayed += messageCount; + } + } + + return { + queued: ready + delayed, + ready, + delayed, + }; + } + async listen( // deno-lint-ignore no-explicit-any handler: (message: any) => void | Promise, diff --git a/packages/fedify/src/federation/mq.test.ts b/packages/fedify/src/federation/mq.test.ts index 8c7adc3de..e7c402908 100644 --- a/packages/fedify/src/federation/mq.test.ts +++ b/packages/fedify/src/federation/mq.test.ts @@ -34,6 +34,14 @@ test("InProcessMessageQueue", async (t) => { assertFalse(mq.nativeRetrial); }); + await t.step("getDepth() [empty]", async () => { + assertEquals(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + }); + const messages: string[] = []; const controller = new AbortController(); const listening = mq.listen((message: string) => { @@ -118,6 +126,171 @@ test("InProcessMessageQueue", async (t) => { await listening; }); +test("InProcessMessageQueue.getDepth()", async () => { + const mq = new InProcessMessageQueue(); + assertEquals(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + + await mq.enqueue("Ready message"); + await mq.enqueue("Delayed message", { + delay: Temporal.Duration.from({ seconds: 1 }), + }); + assertEquals(await mq.getDepth(), { + queued: 2, + ready: 1, + delayed: 1, + }); + + const messages: string[] = []; + const controller = new AbortController(); + const listening = mq.listen((message: string) => { + messages.push(message); + if (messages.length >= 2) controller.abort(); + }, { signal: controller.signal }); + + await waitFor(() => messages.length >= 2, 15_000); + await listening; + assertEquals(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); +}); + +test("InProcessMessageQueue.getDepth() snapshots delayed batches", async () => { + const mq = new InProcessMessageQueue(); + const messages = ["first", "second"]; + await mq.enqueueMany(messages, { + delay: Temporal.Duration.from({ milliseconds: 250 }), + }); + messages.length = 0; + assertEquals(await mq.getDepth(), { + queued: 2, + ready: 0, + delayed: 2, + }); + + const handled: string[] = []; + const controller = new AbortController(); + const listening = mq.listen((message: string) => { + handled.push(message); + if (handled.length >= 2) controller.abort(); + }, { signal: controller.signal }); + + await waitFor(() => handled.length >= 2, 15_000); + await listening; + assertEquals(handled, ["first", "second"]); +}); + +test("InProcessMessageQueue.getDepth() excludes in-flight messages", async () => { + const mq = new InProcessMessageQueue(); + let resolveHandler: (() => void) | undefined; + const controller = new AbortController(); + const handled = new Promise((resolve) => { + resolveHandler = resolve; + }); + // Resolved after the message has been removed from the queue and handed + // to the handler. + let notifyStarted: () => void = () => {}; + const handlerStarted = new Promise((resolve) => { + notifyStarted = resolve; + }); + const listening = mq.listen(async () => { + notifyStarted(); + await handled; + controller.abort(); + }, { signal: controller.signal }); + + try { + await mq.enqueue("in-flight"); + await handlerStarted; + assertEquals(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + } finally { + resolveHandler?.(); + controller.abort(); + await listening; + } +}); + +test("InProcessMessageQueue delayed enqueue uses the internal ready path", async () => { + class RejectingReadyQueue extends InProcessMessageQueue { + override enqueue( + message: unknown, + options?: { delay?: Temporal.Duration }, + ): Promise { + if (options?.delay == null) { + return Promise.reject(new Error("ready enqueue should not be called")); + } + return super.enqueue(message, options); + } + } + + const mq = new RejectingReadyQueue({ + pollInterval: { milliseconds: 10 }, + }); + const messages: string[] = []; + const controller = new AbortController(); + const listening = mq.listen((message: string) => { + messages.push(message); + controller.abort(); + }, { signal: controller.signal }); + + try { + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ milliseconds: 10 }), + }); + await waitFor(() => messages.length > 0, 2_000); + assertEquals(messages, ["delayed"]); + } finally { + controller.abort(); + await listening; + } +}); + +test("InProcessMessageQueue delayed enqueueMany uses the internal ready path", async () => { + class RejectingReadyQueue extends InProcessMessageQueue { + override enqueueMany( + messages: readonly unknown[], + options?: { delay?: Temporal.Duration }, + ): Promise { + if (options?.delay == null) { + return Promise.reject( + new Error("ready enqueueMany should not be called"), + ); + } + return super.enqueueMany(messages, options); + } + } + + const mq = new RejectingReadyQueue({ + pollInterval: { milliseconds: 10 }, + }); + const messages: string[] = []; + const controller = new AbortController(); + const listening = mq.listen((message: string) => { + messages.push(message); + if (messages.length >= 2) controller.abort(); + }, { signal: controller.signal }); + + try { + await mq.enqueueMany(["first", "second"], { + delay: Temporal.Duration.from({ milliseconds: 10 }), + }); + await waitFor(() => messages.length >= 2, 2_000); + assertEquals(messages, ["first", "second"]); + } finally { + controller.abort(); + await listening; + } +}); + test("InProcessMessageQueue orderingKey", async (t) => { const mq = new InProcessMessageQueue(); @@ -277,6 +450,14 @@ for (const mqName in queues) { assertEquals(workers.nativeRetrial, mq.nativeRetrial); }); + await t.step("getDepth() delegation", async () => { + if (mq.getDepth == null) { + assertEquals(workers.getDepth, undefined); + } else { + assertEquals(await workers.getDepth?.(), await mq.getDepth()); + } + }); + const messages: string[] = []; const controller = new AbortController(); const listening = workers.listen(async (message: string) => { diff --git a/packages/fedify/src/federation/mq.ts b/packages/fedify/src/federation/mq.ts index 7f32ab007..36ba9900c 100644 --- a/packages/fedify/src/federation/mq.ts +++ b/packages/fedify/src/federation/mq.ts @@ -39,6 +39,37 @@ export interface MessageQueueListenOptions { signal?: AbortSignal; } +/** + * The number of messages waiting in a message queue. + * + * @since 2.3.0 + */ +export interface MessageQueueDepth { + /** + * The total number of messages still waiting in the backend queue. + * + * This does not include messages that have already been handed to a worker + * for processing. + */ + readonly queued: number; + + /** + * The number of queued messages eligible for immediate processing. + * + * Queue backends that cannot cheaply distinguish ready and delayed messages + * may omit this field. + */ + readonly ready?: number; + + /** + * The number of queued messages scheduled for later delivery. + * + * Queue backends that cannot cheaply distinguish ready and delayed messages + * may omit this field. + */ + readonly delayed?: number; +} + /** * An abstract interface for a message queue. * @@ -88,6 +119,17 @@ export interface MessageQueue { handler: (message: any) => Promise | void, options?: MessageQueueListenOptions, ): Promise; + + /** + * Gets the number of messages waiting in the queue. + * + * This operation is optional, and may not be supported by all + * implementations. The returned counts exclude messages currently being + * handled by a worker. + * + * @since 2.3.0 + */ + getDepth?(): Promise; } /** @@ -121,6 +163,7 @@ export class InProcessMessageQueue implements MessageQueue { #messages: QueuedMessage[]; #monitors: Record, () => void>; #pollIntervalMs: number; + #delayedMessages: number; /** * Tracks which ordering keys are currently being processed to ensure * sequential processing for messages with the same key. @@ -143,6 +186,7 @@ export class InProcessMessageQueue implements MessageQueue { this.#pollIntervalMs = Temporal.Duration.from( options.pollInterval ?? { seconds: 5 }, ).total("millisecond"); + this.#delayedMessages = 0; this.#processingKeys = new Set(); } @@ -151,18 +195,30 @@ export class InProcessMessageQueue implements MessageQueue { ? 0 : Math.max(options.delay.total("millisecond"), 0); if (delay > 0) { + this.#delayedMessages++; setTimeout( - () => this.enqueue(message, { ...options, delay: undefined }), + () => { + this.#delayedMessages--; + this.#enqueueReady(message, options); + }, delay, ); return Promise.resolve(); } + this.#enqueueReady(message, options); + return Promise.resolve(); + } + + #enqueueReady(message: any, options?: MessageQueueEnqueueOptions): void { const orderingKey = options?.orderingKey ?? null; this.#messages.push({ message, orderingKey }); + this.#notifyMonitors(); + } + + #notifyMonitors(): void { for (const monitorId in this.#monitors) { this.#monitors[monitorId as ReturnType](); } - return Promise.resolve(); } enqueueMany( @@ -174,20 +230,31 @@ export class InProcessMessageQueue implements MessageQueue { ? 0 : Math.max(options.delay.total("millisecond"), 0); if (delay > 0) { + const delayedCount = messages.length; + const deferredMessages = [...messages]; + this.#delayedMessages += delayedCount; setTimeout( - () => this.enqueueMany(messages, { ...options, delay: undefined }), + () => { + this.#delayedMessages -= delayedCount; + this.#enqueueManyReady(deferredMessages, options); + }, delay, ); return Promise.resolve(); } + this.#enqueueManyReady(messages, options); + return Promise.resolve(); + } + + #enqueueManyReady( + messages: readonly any[], + options?: MessageQueueEnqueueOptions, + ): void { const orderingKey = options?.orderingKey ?? null; for (const message of messages) { this.#messages.push({ message, orderingKey }); } - for (const monitorId in this.#monitors) { - this.#monitors[monitorId as ReturnType](); - } - return Promise.resolve(); + this.#notifyMonitors(); } async listen( @@ -227,6 +294,16 @@ export class InProcessMessageQueue implements MessageQueue { } } + getDepth(): Promise { + const ready = this.#messages.length; + const delayed = this.#delayedMessages; + return Promise.resolve({ + queued: ready + delayed, + ready, + delayed, + }); + } + #wait(ms: number, signal?: AbortSignal): Promise { let timer: ReturnType | null = null; return Promise.any([ @@ -288,6 +365,7 @@ export class ParallelMessageQueue implements MessageQueue { * @since 1.7.0 */ readonly nativeRetrial?: boolean; + readonly getDepth?: () => Promise; /** * Tracks which ordering keys are currently being processed to ensure @@ -320,6 +398,9 @@ export class ParallelMessageQueue implements MessageQueue { this.queue = queue; this.workers = workers; this.nativeRetrial = queue.nativeRetrial; + if (queue.getDepth != null) { + this.getDepth = () => queue.getDepth!(); + } } enqueue(message: any, options?: MessageQueueEnqueueOptions): Promise { diff --git a/packages/mysql/src/mq.test.ts b/packages/mysql/src/mq.test.ts index 0712c5899..9e93451b7 100644 --- a/packages/mysql/src/mq.test.ts +++ b/packages/mysql/src/mq.test.ts @@ -1,9 +1,9 @@ +import { test } from "@fedify/fixture"; import { MysqlMessageQueue } from "@fedify/mysql/mq"; import { getRandomKey, testMessageQueue, waitFor } from "@fedify/testing"; import * as temporal from "@js-temporal/polyfill"; import assert from "node:assert/strict"; import process from "node:process"; -import { test } from "node:test"; import mysql from "mysql2/promise"; const Temporal = globalThis.Temporal ?? temporal.Temporal; @@ -74,8 +74,8 @@ test("MysqlMessageQueue uses default options when none are provided", () => { // Standard shared test suite // --------------------------------------------------------------------------- -test("MysqlMessageQueue", { skip: dbUrl == null }, () => { - if (dbUrl == null) return; // Bun does not support skip option +test("MysqlMessageQueue", { ignore: dbUrl == null }, () => { + if (dbUrl == null) return; const tableName = randomTableName("mq"); const pools: mysql.Pool[] = []; @@ -97,12 +97,42 @@ test("MysqlMessageQueue", { skip: dbUrl == null }, () => { ); }); +test( + "MysqlMessageQueue.getDepth()", + { ignore: dbUrl == null }, + async () => { + if (dbUrl == null) return; + const pool = mysql.createPool(dbUrl); + const tableName = randomTableName("depth"); + const mq = new MysqlMessageQueue(pool, { tableName }); + try { + assert.deepStrictEqual(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + await mq.enqueue("ready"); + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ hours: 1 }), + }); + assert.deepStrictEqual(await mq.getDepth(), { + queued: 2, + ready: 1, + delayed: 1, + }); + } finally { + await mq.drop(); + await pool.end(); + } + }, +); + // --------------------------------------------------------------------------- // initialize() and drop() // --------------------------------------------------------------------------- -test("MysqlMessageQueue.initialize()", { skip: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option +test("MysqlMessageQueue.initialize()", { ignore: dbUrl == null }, async () => { + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("init"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -166,9 +196,9 @@ test("MysqlMessageQueue.initialize()", { skip: dbUrl == null }, async () => { test( "MysqlMessageQueue.initialize() is idempotent", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("idem"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -191,8 +221,8 @@ test( }, ); -test("MysqlMessageQueue.drop()", { skip: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option +test("MysqlMessageQueue.drop()", { ignore: dbUrl == null }, async () => { + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("drop"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -214,9 +244,9 @@ test("MysqlMessageQueue.drop()", { skip: dbUrl == null }, async () => { test( "MysqlMessageQueue.drop() resets initialized flag so re-initialize works", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("reinit"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -242,9 +272,9 @@ test( test( "MysqlMessageQueue.drop() waits for in-flight initialize() before dropping", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("droprace"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -277,9 +307,9 @@ test( test( "MysqlMessageQueue concurrent initialization does not throw", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pools: mysql.Pool[] = []; const tableName = randomTableName("concinit"); try { @@ -303,9 +333,9 @@ test( test( "MysqlMessageQueue enqueue() and listen() racing on initialize() is safe", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("race"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -338,9 +368,9 @@ test( test( "MysqlMessageQueue processes messages enqueued before listen() starts", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("preq"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -379,9 +409,9 @@ test( test( "MysqlMessageQueue delayed message is not delivered early", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("delay"); const mq = new MysqlMessageQueue(pool, { @@ -436,9 +466,9 @@ test( test( "MysqlMessageQueue handles 30 concurrent enqueue() calls", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("stress"); const mq = new MysqlMessageQueue(pool, { @@ -480,9 +510,9 @@ test( test( "MysqlMessageQueue.enqueueMany() with empty array is a no-op", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("emptyb"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -509,9 +539,9 @@ test( test( "MysqlMessageQueue.enqueueMany() inserts all messages atomically", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("batcha"); const mq = new MysqlMessageQueue(pool, { tableName }); @@ -532,9 +562,9 @@ test( test( "MysqlMessageQueue.enqueueMany() delivers all 100 messages via single INSERT", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("bulk"); const mq = new MysqlMessageQueue(pool, { @@ -571,9 +601,9 @@ test( test( "MysqlMessageQueue.enqueueMany() preserves insertion order for same ordering key", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("bord"); const mq = new MysqlMessageQueue(pool, { @@ -617,9 +647,9 @@ test( test( "MysqlMessageQueue listener survives handler errors", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("hderr"); const mq = new MysqlMessageQueue(pool, { @@ -661,9 +691,9 @@ test( test( "MysqlMessageQueue handlerTimeout prevents hung handler from blocking queue", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("hdto"); const mq = new MysqlMessageQueue(pool, { @@ -706,9 +736,9 @@ test( test( "MysqlMessageQueue handlerTimeout with ordering key releases the lock", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("hdtolk"); const mq = new MysqlMessageQueue(pool, { @@ -754,9 +784,9 @@ test( test( "MysqlMessageQueue advisory lock is released after processing (regression for lock-leak)", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; // Use a pool with a small max to make lock leaks visible const pool = mysql.createPool({ uri: dbUrl!, connectionLimit: 3 }); const tableName = randomTableName("lockleak"); @@ -799,9 +829,9 @@ test( test( "MysqlMessageQueue GET_LOCK succeeds with a very long ordering key", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; // A 200-char ordering key combined with a 20-char table name produces a // raw lock name that is 221 chars — well over MySQL's 64-char limit. // The lock name hashing logic must shorten it before calling GET_LOCK, @@ -843,9 +873,9 @@ test( test( "MysqlMessageQueue delivers each message to exactly one worker", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool1 = mysql.createPool(dbUrl!); const pool2 = mysql.createPool(dbUrl!); const tableName = randomTableName("onceonly"); @@ -903,9 +933,9 @@ test( test( "MysqlMessageQueue two workers preserve ordering-key order", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool1 = mysql.createPool(dbUrl!); const pool2 = mysql.createPool(dbUrl!); const tableName = randomTableName("twowork"); @@ -962,9 +992,9 @@ test( test( "MysqlMessageQueue listen() resolves promptly when aborted during poll interval", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("clrtmo"); // Use a very long poll interval so the test would hang if clearTimeout @@ -983,13 +1013,18 @@ test( controller.abort(); // listen() must resolve well within the 60-second poll interval. - const deadline = new Promise((_, reject) => - setTimeout( + let deadlineTimer: ReturnType | undefined; + const deadline = new Promise((_, reject) => { + deadlineTimer = setTimeout( () => reject(new Error("listen() did not resolve in time")), 3_000, - ) - ); - await Promise.race([listening, deadline]); + ); + }); + try { + await Promise.race([listening, deadline]); + } finally { + clearTimeout(deadlineTimer); + } } finally { await mq.drop(); await pool.end(); @@ -1003,9 +1038,9 @@ test( test( "MysqlMessageQueue works with getRandomKey() from @fedify/testing", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; // getRandomKey returns names like "fedify_test_mq_" which may contain // hyphens — unsuitable for MySQL identifiers. Users must replace hyphens. const rawKey = getRandomKey("mq"); @@ -1040,9 +1075,9 @@ test( test( "MysqlMessageQueue with initialized: true skips DDL on first use", - { skip: dbUrl == null }, + { ignore: dbUrl == null }, async () => { - if (dbUrl == null) return; // Bun does not support skip option + if (dbUrl == null) return; const pool = mysql.createPool(dbUrl!); const tableName = randomTableName("preini"); // Create table manually first diff --git a/packages/mysql/src/mq.ts b/packages/mysql/src/mq.ts index 9b43f6313..e954d8ff6 100644 --- a/packages/mysql/src/mq.ts +++ b/packages/mysql/src/mq.ts @@ -1,5 +1,6 @@ import type { MessageQueue, + MessageQueueDepth, MessageQueueEnqueueOptions, MessageQueueListenOptions, } from "@fedify/fedify"; @@ -305,6 +306,27 @@ export class MysqlMessageQueue implements MessageQueue { }); } + /** + * {@inheritDoc MessageQueue.getDepth} + * @since 2.3.0 + */ + async getDepth(): Promise { + await this.initialize(); + const [rows] = await this.#pool.query( + `SELECT + COUNT(*) AS queued, + COALESCE(SUM(\`deliver_after\` <= NOW(6)), 0) AS ready + FROM \`${this.#tableName}\``, + ); + const queued = Number(rows[0].queued); + const ready = Number(rows[0].ready); + return { + queued, + ready, + delayed: Math.max(0, queued - ready), + }; + } + /** * {@inheritDoc MessageQueue.listen} * @since 2.1.0 diff --git a/packages/postgres/src/mq.test.ts b/packages/postgres/src/mq.test.ts index 29c02f829..97216ce69 100644 --- a/packages/postgres/src/mq.test.ts +++ b/packages/postgres/src/mq.test.ts @@ -42,6 +42,34 @@ test("PostgresMessageQueue", { ignore: dbUrl == null }, () => { ); }); +test("PostgresMessageQueue.getDepth()", { ignore: dbUrl == null }, async () => { + if (dbUrl == null) return; // Bun does not support skip option + + const tableName = getRandomKey("message_depth"); + const channelName = getRandomKey("channel_depth"); + const sql = postgres(dbUrl); + const mq = new PostgresMessageQueue(sql, { tableName, channelName }); + try { + deepStrictEqual(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + await mq.enqueue("ready"); + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ hours: 1 }), + }); + deepStrictEqual(await mq.getDepth(), { + queued: 2, + ready: 1, + delayed: 1, + }); + } finally { + await mq.drop(); + await sql.end(); + } +}); + // Regression test for advisory lock not being fully released after processing // a message with an ordering key. This test verifies that after processing // a message through PostgresMessageQueue.listen(), the advisory lock is fully diff --git a/packages/postgres/src/mq.ts b/packages/postgres/src/mq.ts index 4fed2db8d..6418f77b7 100644 --- a/packages/postgres/src/mq.ts +++ b/packages/postgres/src/mq.ts @@ -1,5 +1,6 @@ import type { MessageQueue, + MessageQueueDepth, MessageQueueEnqueueOptions, MessageQueueListenOptions, } from "@fedify/fedify"; @@ -227,6 +228,25 @@ export class PostgresMessageQueue implements MessageQueue { }); } + async getDepth(): Promise { + await this.initialize(); + const result = await this.#sql` + SELECT + COUNT(*) AS queued, + COUNT(*) FILTER ( + WHERE created + delay <= CURRENT_TIMESTAMP + ) AS ready + FROM ${this.#sql(this.#tableName)} + `; + const queued = Number(result[0].queued); + const ready = Number(result[0].ready); + return { + queued, + ready, + delayed: Math.max(0, queued - ready), + }; + } + async listen( // deno-lint-ignore no-explicit-any handler: (message: any) => void | Promise, diff --git a/packages/redis/src/mq.test.ts b/packages/redis/src/mq.test.ts index fad3a7dc3..a2069e954 100644 --- a/packages/redis/src/mq.test.ts +++ b/packages/redis/src/mq.test.ts @@ -1,10 +1,14 @@ import { test } from "@fedify/fixture"; import { RedisMessageQueue } from "@fedify/redis/mq"; import { getRandomKey, testMessageQueue } from "@fedify/testing"; +import * as temporal from "@js-temporal/polyfill"; +import assert from "node:assert/strict"; import process from "node:process"; import { Redis } from "ioredis"; +const Temporal = globalThis.Temporal ?? temporal.Temporal; + const dbUrl = process.env.REDIS_URL; async function disposeMessageQueue(mq: object): Promise { @@ -41,3 +45,33 @@ test("RedisMessageQueue", { ignore: dbUrl == null }, () => { { testOrderingKey: true }, ); }); + +test("RedisMessageQueue.getDepth()", { ignore: dbUrl == null }, async () => { + if (dbUrl == null) return; // Bun does not support skip option + const channelKey = getRandomKey("channel_depth"); + const queueKey = getRandomKey("queue_depth"); + const lockKey = getRandomKey("lock_depth"); + const mq = new RedisMessageQueue(() => new Redis(dbUrl), { + channelKey, + queueKey, + lockKey, + }); + try { + assert.deepStrictEqual(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + await mq.enqueue("ready"); + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ hours: 1 }), + }); + assert.deepStrictEqual(await mq.getDepth(), { + queued: 2, + ready: 1, + delayed: 1, + }); + } finally { + await disposeMessageQueue(mq); + } +}); diff --git a/packages/redis/src/mq.ts b/packages/redis/src/mq.ts index 32e4e352a..d658d04f3 100644 --- a/packages/redis/src/mq.ts +++ b/packages/redis/src/mq.ts @@ -1,6 +1,7 @@ // deno-lint-ignore-file no-explicit-any import type { MessageQueue, + MessageQueueDepth, MessageQueueEnqueueOptions, MessageQueueListenOptions, } from "@fedify/fedify"; @@ -201,6 +202,28 @@ export class RedisMessageQueue implements MessageQueue, Disposable { if (baseTs <= now) this.#redis.publish(this.#channelKey, ""); } + async getDepth(): Promise { + const now = Temporal.Now.instant().epochMilliseconds; + const [queuedCount, readyCount] = await this.#redis.eval( + ` + return { + redis.call("ZCARD", KEYS[1]), + redis.call("ZCOUNT", KEYS[1], "-inf", ARGV[1]) + } + `, + 1, + this.#queueKey, + now, + ) as [number, number]; + const queued = Number(queuedCount); + const ready = Number(readyCount); + return { + queued, + ready, + delayed: Math.max(0, queued - ready), + }; + } + /** * Returns the Redis key used to lock a specific ordering key. */ diff --git a/packages/sqlite/src/mq.test.ts b/packages/sqlite/src/mq.test.ts index 1846ed1af..e1dc2059d 100644 --- a/packages/sqlite/src/mq.test.ts +++ b/packages/sqlite/src/mq.test.ts @@ -2,10 +2,14 @@ import { PlatformDatabase } from "#sqlite"; import { test } from "@fedify/fixture"; import { SqliteMessageQueue } from "@fedify/sqlite/mq"; import { getRandomKey, testMessageQueue } from "@fedify/testing"; +import * as temporal from "@js-temporal/polyfill"; +import assert from "node:assert/strict"; import { mkdtemp } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; +const Temporal = globalThis.Temporal ?? temporal.Temporal; + const dbDir = await mkdtemp(join(tmpdir(), "fedify-sqlite-")); const dbPath = join(dbDir, `${getRandomKey("sqlite")}.db`); const db = new PlatformDatabase(dbPath); @@ -22,3 +26,29 @@ test("SqliteMessageQueue", () => }, { testOrderingKey: true }, )); + +test("SqliteMessageQueue.getDepth()", async () => { + const dbPath = join(dbDir, `${getRandomKey("sqlite_depth")}.db`); + const db = new PlatformDatabase(dbPath); + const tableName = getRandomKey("message_depth").replaceAll("-", "_"); + const mq = new SqliteMessageQueue(db, { tableName }); + try { + assert.deepStrictEqual(await mq.getDepth(), { + queued: 0, + ready: 0, + delayed: 0, + }); + await mq.enqueue("ready"); + await mq.enqueue("delayed", { + delay: Temporal.Duration.from({ hours: 1 }), + }); + assert.deepStrictEqual(await mq.getDepth(), { + queued: 2, + ready: 1, + delayed: 1, + }); + } finally { + mq.drop(); + mq[Symbol.dispose](); + } +}); diff --git a/packages/sqlite/src/mq.ts b/packages/sqlite/src/mq.ts index 48199eca8..30332b784 100644 --- a/packages/sqlite/src/mq.ts +++ b/packages/sqlite/src/mq.ts @@ -1,6 +1,7 @@ import { type PlatformDatabase, SqliteDatabase } from "#sqlite"; import type { MessageQueue, + MessageQueueDepth, MessageQueueEnqueueOptions, MessageQueueListenOptions, } from "@fedify/fedify"; @@ -260,6 +261,29 @@ export class SqliteMessageQueue implements MessageQueue, Disposable { }); } + /** + * {@inheritDoc MessageQueue.getDepth} + */ + async getDepth(): Promise { + this.initialize(); + const row = await this.#retryOnBusy(() => { + const now = Temporal.Now.instant().epochMilliseconds; + return this.#db + .prepare( + `SELECT + COUNT(*) AS queued, + COALESCE(SUM(CASE WHEN scheduled <= ? THEN 1 ELSE 0 END), 0) AS ready + FROM "${this.#tableName}"`, + ) + .get(now) as { queued: number; ready: number }; + }); + return { + queued: row.queued, + ready: row.ready, + delayed: Math.max(0, row.queued - row.ready), + }; + } + /** * {@inheritDoc MessageQueue.listen} */