A plain producer makes a dual write: it commits the business row (INSERT INTO orders …) and publishes to the broker. Those are two systems that can disagree on a
crash — the order is saved but the message is lost, or the message is sent but the order
rolled back. The transactional outbox (ADR-0029) removes the
dual write: the message is written into the same database, inside the same transaction
as the business data, so they commit or roll back atomically. A separate relay
publishes the stored rows afterwards. Exactly-once handoff into the broker, no
distributed transaction.
BabelQueue's php-sdk ships the dependency-free helper
(BabelQueue\Outbox); this
example binds it to a real database with InitORM
(initorm/database). The SDK core stays ext-json-only (GR-7) — the InitORM glue lives
here, in the src/InitOrmOutboxStore.php adapter.
produce.php relay.php
┌──────────────────────────────┐ ┌────────────────────────────┐
│ $db->transaction(function(){ │ │ $rows = fetchUnpublished() │
│ INSERT INTO orders … │ │ foreach: transport.publish │
│ $outbox->write($envelope) │ ── DB ──▶ │ markPublished() / markFailed
│ }); // both, or neither │ │ (at-least-once, retry) │
└──────────────────────────────┘ └────────────────────────────┘
atomic commit drains the durable outbox
- Write side (
produce.php) — the caller owns the transaction boundary. We open one$db->transaction(…), do the businessINSERT INTO ordersand$outbox->write($envelope)inside it, and InitORM commits both together. TheInitOrmOutboxStore::save()adapter runs itsINSERTon the connection's already-open transaction — it never begins or commits one itself. - Read side (
relay.php) —OutboxRelay::drain()SELECTs pending rows oldest-first, publishes each through a BabelQueueTransport(hereRedisTransport), marks itpublished, or — on a publish failure — bumpsattempts+last_errorand leaves it pending for the next pass. At-least-once: a downstream consumer dedupes on the canonicalmeta.id(the php-sdk'sIdempotent::wrap, ADR-0022, is the consumer-side mirror).
The envelope is the frozen schema_version: 1 wire format. The outbox stores it
verbatim in the body column and the relay publishes those exact bytes — it never
decodes or rebuilds the envelope — so trace_id and every field reach the broker
byte-for-byte as written (GR-1 / GR-4 / GR-5). The outbox table's own columns (id,
status, attempts, …) are bookkeeping around the envelope; they never travel on the
wire.
Production DDL (MySQL / PostgreSQL / SQLite variants) is in schema.sql.
The demo's bootstrap.php creates a SQLite version automatically so it runs with no
server.
CREATE TABLE babelqueue_outbox (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
queue VARCHAR(255) NOT NULL,
body LONGTEXT NOT NULL, -- encoded envelope, verbatim
status VARCHAR(16) NOT NULL DEFAULT 'pending',
attempts INT UNSIGNED NOT NULL DEFAULT 0,
last_error VARCHAR(500) NULL,
created_at DATETIME NOT NULL,
published_at DATETIME NULL,
failed_at DATETIME NULL,
PRIMARY KEY (id),
KEY idx_outbox_pending (status, id) -- the relay's hot query
);Concurrency note: a multi-worker relay should reserve rows with
SELECT … FOR UPDATE SKIP LOCKED(or apicked_atclaim) so two relays never publish the same row. At-least-once tolerates the rare overlap regardless. The adapter'sfetchUnpublished()marks the spot.
composer install
# 1) write 3 orders, each committed atomically with its outbox message (SQLite, no server)
DB_DSN="sqlite:/tmp/babelqueue-outbox.sqlite" php produce.php
# 2) start Redis, then relay the durable rows onto the broker
docker run -d -p 6379:6379 redis:7
DB_DSN="sqlite:/tmp/babelqueue-outbox.sqlite" BROKER_URL="redis://localhost:6379/0" php relay.phpAny other-language BabelQueue consumer (Go, Java, Python, …) on the orders queue reads
the relayed envelopes natively — same canonical envelope, same trace_id.
composer install
vendor/bin/phpunitThe test drives InitOrmOutboxStore against an in-memory SQLite database: save →
fetch (oldest-first, limited) → markPublished; markFailed bumps attempts and keeps
the row pending; a full Outbox write → OutboxRelay flush preserves trace_id; and a
throwing transport marks the row failed. It skips cleanly if pdo_sqlite is unavailable.
MIT © Muhammet Şafak