Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,43 @@ Version 2.3.0

To be released.

### @fedify/fedify

- Added optional `MessageQueue.getDepth()` support, using the new
`MessageQueueDepth` return type, for reporting queue backlog depth.
`InProcessMessageQueue` can now report queued messages, including ready
and delayed counts, and `ParallelMessageQueue` delegates depth reporting
to its wrapped queue when supported. [[#735], [#748]]

[#735]: https://github.com/fedify-dev/fedify/issues/735
[#748]: https://github.com/fedify-dev/fedify/pull/748

### @fedify/amqp

- Added `AmqpMessageQueue.getDepth()` for reporting queued, ready, and
delayed message counts. Delayed counts include queues created or tracked
by the same `AmqpMessageQueue` instance. [[#735], [#748]]

### @fedify/mysql

- Added `MysqlMessageQueue.getDepth()` for reporting queued, ready, and
delayed message counts. [[#735], [#748]]

### @fedify/postgres

- Added `PostgresMessageQueue.getDepth()` for reporting queued, ready, and
delayed message counts. [[#735], [#748]]

### @fedify/redis

- Added `RedisMessageQueue.getDepth()` for reporting queued, ready, and
delayed message counts. [[#735], [#748]]

### @fedify/sqlite

- Added `SqliteMessageQueue.getDepth()` for reporting queued, ready, and
delayed message counts. [[#735], [#748]]


Version 2.2.0
-------------
Expand Down
80 changes: 80 additions & 0 deletions docs/manual/mq.md
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ the `~MessageQueue.enqueue()` and `~MessageQueue.listen()` methods:
~~~~ typescript twoslash
import type {
MessageQueue,
MessageQueueDepth,
MessageQueueEnqueueOptions,
MessageQueueListenOptions,
} from "@fedify/fedify";
Expand All @@ -693,6 +694,11 @@ class CustomMessageQueue implements MessageQueue {
): Promise<void> {
// Implementation here
}

// Optional: implement only if your backend can report real counts.
// async getDepth(): Promise<MessageQueueDepth> {
// return { queued, ready, delayed };
// }
}
~~~~

Expand Down Expand Up @@ -747,6 +753,21 @@ you can set the `nativeRetrial` property to `true` to indicate this.
When this property is `true`, Fedify will skip its own retry logic and rely
on your backend to handle retries, avoiding duplicate retry mechanisms.

### Implement `~MessageQueue.getDepth()` method (optional)

*This API is available since Fedify 2.3.0.*

This optional method should return the number of messages still waiting in the
backend queue. It should not include messages that have already been handed to
a worker for processing. Return `queued` for the total waiting messages. If
your backend can cheaply distinguish scheduled messages, also return `ready`
for messages eligible for immediate processing and `delayed` for messages
scheduled for later delivery.

Implement this method if your queue backend exposes an efficient count
operation. If the platform does not expose reliable counts, omit the method
rather than returning an approximate value that could mislead monitoring.


Parallel message processing
---------------------------
Expand Down Expand Up @@ -951,6 +972,65 @@ Optimized performance
: Backend-specific optimizations for retry logic.


Queue depth reporting
---------------------

*This API is available since Fedify 2.3.0.*

Some message queue implementations expose `~MessageQueue.getDepth()` for
observability. Queue depth means messages still waiting in the backend queue:

`queued`
: Total waiting messages. This excludes messages currently being handled by
a worker.

`ready`
: Waiting messages eligible for immediate processing. This value is omitted
when the backend cannot distinguish ready and delayed messages cheaply.

`delayed`
: Waiting messages scheduled for later delivery. This value is omitted when
the backend cannot distinguish ready and delayed messages cheaply.

For example:

~~~~ typescript twoslash
import type { MessageQueue } from "@fedify/fedify";
declare const queue: MessageQueue;
// ---cut-before---
const depth = await queue.getDepth?.();
if (depth != null) {
console.log("Queued messages:", depth.queued);
}
~~~~

### Implementation support

| Implementation | Queue Depth Support |
| ------------------------ | ----------------------------------------- |
| `InProcessMessageQueue` | `queued`, `ready`, `delayed` |
| [`DenoKvMessageQueue`] | No reliable platform count |
| [`RedisMessageQueue`] | `queued`, `ready`, `delayed` |
| [`PostgresMessageQueue`] | `queued`, `ready`, `delayed` |
| [`MysqlMessageQueue`] | `queued`, `ready`, `delayed` |
| [`AmqpMessageQueue`] | `queued`, `ready`, `delayed`[^amqp-depth] |
| [`SqliteMessageQueue`] | `queued`, `ready`, `delayed` |
| `WorkersMessageQueue` | No reliable platform count |
Comment thread
coderabbitai[bot] marked this conversation as resolved.
| `ParallelMessageQueue` | Same as wrapped queue |

If you pass the same `MessageQueue` instance as the shared queue for inbox,
outbox, and fanout work, observability code should report that queue once as a
shared queue. Reporting the same `getDepth()` result separately for each
logical role would double- or triple-count the backlog.

[^amqp-depth]: `AmqpMessageQueue` can count the configured ready queues and
delayed queues created by the same `AmqpMessageQueue` instance.
AMQP 0-9-1 does not provide a portable queue-listing API, so
delayed queues created by another process before this instance
starts are not included until this instance creates or tracks
them.


Ordering guarantees
-------------------

Expand Down
Loading
Loading