Skip to content

Commit 75e1358

Browse files
committed
feat(outbox): transactional outbox helper — Node core (ADR-0029)
Ports the producer-side transactional outbox: Outbox.write encodes the frozen envelope and persists it via a caller-bound OutboxStore inside the caller's own DB transaction (no dual-write), and OutboxRelay.flush/drain forwards stored rows verbatim through an OutboxTransport — mark-published only after publish resolves, a rejecting publish -> markFailed + linear backoff (row stays pending), one poison row never blocks the batch. At-least-once handoff; consumers dedupe on meta.id (the Idempotent mirror). Frozen bytes ride verbatim (GR-1/4/5); store+transport are interfaces so the core stays zero-dep (GR-7). v1.6.0.
1 parent 406be28 commit 75e1358

7 files changed

Lines changed: 686 additions & 4 deletions

File tree

CHANGELOG.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,36 @@ The envelope wire format is versioned separately by `meta.schema_version`
99

1010
## [Unreleased]
1111

12+
## [1.6.0] - 2026-06-21
13+
14+
### Added
15+
- **Transactional outbox — an optional producer-side helper that removes the dual
16+
write (ADR-0029).** A plain producer must commit its business row **and** publish to
17+
the broker — two systems that disagree on a crash. The outbox persists the encoded
18+
envelope into the **same DB transaction** as the business write, and a separate relay
19+
publishes the durable rows afterwards: no distributed transaction, exactly-once
20+
*handoff*, then at-least-once on the wire as always.
21+
- New `Outbox` writer — `write(envelope)` encodes via the frozen `EnvelopeCodec` and
22+
delegates to the store. **The caller owns the transaction boundary**; the helper
23+
never begins/commits anything (GR-7).
24+
- New `OutboxStore` interface (`save`, `fetchUnpublished` oldest-first, `markPublished`,
25+
`markFailed`) — the persistence seam the caller binds to their own DB; the core ships
26+
no DB driver. New `OutboxTransport` interface (`publish(body, queue)`) — the
27+
publish-only seam the relay forwards through, bound to the caller's broker.
28+
- New `OutboxRelay``flush()` publishes one batch (marking each row published only
29+
**after** the transport resolves; a rejecting publish → `markFailed` + bounded linear
30+
backoff, the row stays pending, the batch continues) and `drain(maxPasses?)` loops
31+
until no progress, with a safety ceiling. The sleeper is injectable so tests are
32+
instant.
33+
- New `InMemoryOutboxStore` reference store (tests / single-process demos; no real
34+
transaction). New types `OutboxRecord`, `OutboxRelayResult`, `OutboxRelayOptions`,
35+
`Sleeper`.
36+
- The relay publishes the **stored bytes verbatim** — never decoding, rebuilding or
37+
re-encoding the envelope — so `trace_id` is preserved end-to-end (GR-4) and the body
38+
is byte-identical before store and after relay (GR-1/GR-5). `schema_version` stays
39+
**1**: the outbox's bookkeeping lives *around* the envelope, never *on* the wire.
40+
Entirely opt-in and backward compatible.
41+
1242
## [1.5.0] - 2026-06-21
1343

1444
### Added

README.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,42 @@ stays **1**, GR-1) — the same out-of-band `HeaderCarrier` seam as the `tracepa
129129
header. It takes effect only when the `RedriveIO` implements `publishWithHeaders`;
130130
otherwise `bypass` is a no-op (`bypassed: false`) and the message is still redriven.
131131

132+
### Transactional outbox (optional)
133+
134+
A plain producer does two things that must both happen or neither — commit the
135+
business row and publish the message — across two systems that can disagree on a
136+
crash (the **dual write**). The outbox removes it: `outbox.write(env)` persists the
137+
**encoded envelope into the same DB transaction** as your business write, and a
138+
separate `OutboxRelay` publishes the durable rows afterwards (ADR-0029).
139+
140+
```ts
141+
import { Outbox, OutboxRelay, InMemoryOutboxStore, EnvelopeCodec } from "@babelqueue/core";
142+
143+
// 1) WRITE — the caller owns the transaction boundary (this is the whole point).
144+
const outbox = new Outbox(store); // your OutboxStore, bound to your DB
145+
await db.transaction(async (tx) => {
146+
await tx.insertOrder(order); // the business write
147+
const env = EnvelopeCodec.make("urn:babel:orders:created", { order_id }, { queue: "orders" });
148+
await outbox.write(env); // same connection, same tx
149+
}); // both commit, or neither
150+
151+
// 2) RELAY — drain the durable rows onto the broker (a worker loop / cron).
152+
const relay = new OutboxRelay(transport, store); // your OutboxTransport
153+
await relay.drain(); // publishes verbatim, marks published
154+
```
155+
156+
The store and the transport are **interfaces you bind** to your own DB and broker —
157+
the core ships no DB driver (GR-7) and only an `InMemoryOutboxStore` reference for
158+
tests/demos. The relay publishes the **stored bytes verbatim** — it never decodes,
159+
rebuilds or re-encodes the envelope — so `trace_id` is preserved end-to-end (GR-4)
160+
and the body is byte-identical before store and after relay (GR-1/GR-5). It is
161+
**at-least-once handoff**: a crash between publish and mark-published re-publishes the
162+
row, so consumers must stay idempotent (the `Wrap` helper is the consumer-side mirror).
163+
164+
Implement `OutboxStore` over your DB (`save`, `fetchUnpublished` oldest-first — your
165+
adapter SHOULD claim/lock rows so two relays don't double-publish — `markPublished`,
166+
`markFailed`) and `OutboxTransport` (`publish(body, queue)`) over your broker.
167+
132168
## What this core is (and isn't)
133169

134170
It enforces the **contract**: the envelope shape, URN identity, trace propagation,

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@babelqueue/core",
3-
"version": "1.5.0",
3+
"version": "1.6.0",
44
"description": "Polyglot Queues, Simplified — the Node/TypeScript core: the canonical BabelQueue wire-envelope codec, contracts and dead-letter helpers.",
55
"keywords": [
66
"queue",
@@ -59,7 +59,7 @@
5959
"build": "tsup",
6060
"typecheck": "tsc --noEmit",
6161
"lint": "eslint src test",
62-
"test": "node --import tsx --test test/codec.test.ts test/dead-letter.test.ts test/conformance.test.ts test/overhead.test.ts test/idempotency.test.ts test/schema.test.ts test/otel.test.ts test/redrive.test.ts test/replay.test.ts",
62+
"test": "node --import tsx --test test/codec.test.ts test/dead-letter.test.ts test/conformance.test.ts test/overhead.test.ts test/idempotency.test.ts test/schema.test.ts test/otel.test.ts test/redrive.test.ts test/replay.test.ts test/outbox.test.ts",
6363
"coverage": "c8 --check-coverage --lines 90 --functions 90 --branches 85 --reporter=text npm test",
6464
"prepublishOnly": "npm run build"
6565
},

src/index.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,13 @@ export type {
4949
} from "./redrive.js";
5050

5151
export { HEADER_REPLAY_BYPASS, isReplay, bypassExternalEffects } from "./replay.js";
52+
53+
export { Outbox, OutboxRelay, InMemoryOutboxStore } from "./outbox.js";
54+
export type {
55+
OutboxStore,
56+
OutboxTransport,
57+
OutboxRecord,
58+
OutboxRelayResult,
59+
OutboxRelayOptions,
60+
Sleeper,
61+
} from "./outbox.js";

0 commit comments

Comments
 (0)