Skip to content

feat(ilp): QWiP store-and-forward client buffer#17

Open
bluestreak01 wants to merge 50 commits intomainfrom
vi_sf
Open

feat(ilp): QWiP store-and-forward client buffer#17
bluestreak01 wants to merge 50 commits intomainfrom
vi_sf

Conversation

@bluestreak01
Copy link
Copy Markdown
Member

@bluestreak01 bluestreak01 commented Apr 26, 2026

Summary

Opt-in store-and-forward (SF) durability for the QWP WebSocket ingest client. Outgoing batches are persisted to disk before they leave the wire; the server's cumulative ACK trims sealed segments; on transient failure or process restart the I/O thread silently reconnects and replays whatever is still on disk. User code does not see transient disconnects.

Architecture

  • CursorSendEngine — central engine. Producer (user thread) appends encoded QWP frames into mmap'd ring segments; an I/O thread (CursorWebSocketSendLoop) walks the cursor and sends frames; ACKs trim sealed segments.
  • MmapSegment — fixed-size mmap'd file holding [u32 crc32c | u32 frame_len | frame bytes] envelopes. Torn tails and silent bit-rot are detected on recovery and the active segment is truncated to the last good frame.
  • SegmentRing — append/seal/rotate ring of segments. Filenames sf-<baseSeq:016x>.sfa encode the FSN range so trim and recovery don't have to scan a sealed segment to know its bounds.
  • SegmentManager — provisions hot spares ahead of the producer, enforces the per-engine sf_max_total_bytes cap, and unmaps trimmed segments.
  • SlotLock — advisory exclusive lock on <sf_dir>/<sender_id>/.lock. Released on engine.close() or kernel-on-process-exit.
  • OrphanScanner + BackgroundDrainerPool — opt-in (drain_orphans=true) at foreground sender startup: scan <sf_dir>/*/ for sibling slots that are unlocked and contain unacked segments, lock each one, drain it on its own connection, release. Capped at max_background_drainers=4. Failed drains drop a .failed sentinel so future scans skip the slot until the user clears it.

Wire/disk decoupling

Wire messageSequence always starts at 0 each connect; SF's persistent FSN is monotonic across restarts. fsnAtZero is pinned at connect time so server ACKs translate back to FSN for trim. Invariant: fsn = fsnAtZero + wireSeq. Cursor frames are self-sufficient — every frame carries full schema + full symbol-dict delta from id 0, so replay against any fresh server connection (post-reconnect, post-restart, drainer adopting an orphan slot) is correct.

Reconnect & close

  • I/O loop catches any wire error and enters reconnect with exponential backoff capped at reconnect_max_backoff_millis. Per-outage time budget reconnect_max_duration_millis (default 5 min); auth failures (401, 403, non-101 upgrade reject) are immediately terminal.
  • Initial-connect failures are terminal by default; opt-in initial_connect_retry=true uses the same backoff loop.
  • flush() returns once data is published into the engine (in-RAM for memory mode, on-disk for SF) — never waits for server ACK.
  • close() blocks up to close_flush_timeout_millis (default 5 s) waiting for ackedFsn >= publishedFsn; 0 or -1 skips the drain.

Backpressure

Per-engine sf_max_total_bytes cap. When full, appendBlocking spins for sf_append_deadline_millis (default 30 s); ACK arrival → trim → space frees → append succeeds. If the deadline fires the call throws.

Connect-string knobs

ws::addr=...
   ;sf_dir=<group root>            slot lives at <sf_dir>/<sender_id>/
   ;sender_id=<id>                 default "default"
   ;sf_max_bytes=<bytes>           per-segment rotation threshold
   ;sf_max_total_bytes=<bytes>     hard cap → backpressure
   ;sf_durability=memory           (flush/append deferred)
   ;sf_append_deadline_millis=<n>  default 30000
   ;reconnect_max_duration_millis=<n>     default 300000
   ;reconnect_initial_backoff_millis=<n>  default 100
   ;reconnect_max_backoff_millis=<n>      default 30000
   ;initial_connect_retry=on/off          default off
   ;close_flush_timeout_millis=<n>        default 5000; 0 or -1 = fast close
   ;drain_orphans=on/off                  default off
   ;max_background_drainers=<n>           default 4

All file I/O goes through a new native Files layer (POSIX + Win32) and software CRC32C (Castagnoli, slice-by-8) — no java.nio.FileChannel, java.util.zip.CRC32C, or MappedByteBuffer.

Test plan

  • Native Files I/O (FilesTest): write/read roundtrip, truncate, allocate, append, rename, dir iteration, exclusive lock, exists/remove, page size.
  • CRC32C (Crc32cTest): known vector, chaining, property fuzz over random inputs and split points, single-bit-flip detection.
  • Segment primitives (MmapSegmentTest, SegmentRingTest, SegmentManagerTest): append/replay, multi-segment rotation, trim, torn-tail recovery, CRC mismatch, oversized length defense, FSN-gap detection, disk-cap enforcement.
  • Slot model (SlotLockTest, OrphanScannerTest, OrphanScanIntegrationTest): lock collision, .failed sentinel, scanner skip-rules, end-to-end orphan drain.
  • Engine & I/O loop (CursorSendEngineTest, CursorWebSocketSendLoopCloseTest, CursorWebSocketSendLoopReconnectLeakTest, SegmentManagerCloseRaceTest, SegmentManagerRecoveryCapTest, SegmentRingRecoveryUnlinkTest, BackgroundDrainerPoolRaceTest).
  • Connect-string parsing (SfFromConfigTest): all knobs, validation rejections, disk-full backpressure, sender takes ownership of engine.
  • End-to-end (ReconnectTest, RecoveryReplayTest, ServerErrorAckTerminalTest, InitialConnectRetryTest, IoThreadErrorSurfacedOnRowApiTest, CloseDrainTest, CleanShutdownNoReplayTest, SelfSufficientFramesTest, BackgroundDrainerEndToEndTest): reconnect cap, terminal upgrade error, replay-on-connect, close-drain timeout, fast-close, recovery replay against fresh server.
  • All SF tests wrapped in TestUtils.assertMemoryLeak() so native-memory leaks surface in CI.
  • 800 QWP-client tests pass on darwin-aarch64. Linux + Windows native bodies untested locally (CI handles).

Known follow-ups (deliberately deferred)

  • sf_durability=flush / =append (per-flush / per-append fsync). Cursor accepts only memory today.
  • Multi-host failover: connect-string parses comma-separated hosts but Sender.build() only uses the first.
  • Reconnect cap exhaustion test against a drainer specifically.
  • Process-kill (SIGKILL) torture harness — needs a separate CLI tool, not unit tests.
  • Per-segment partial-ack tracking would let trim drop partially-acked sealed segments without re-replay.

🤖 Generated with Claude Code

bluestreak01 and others added 30 commits April 26, 2026 02:15
Opt-in durable buffer for the QWP WebSocket ingest client. Outgoing batches
are persisted to disk before they leave the wire; the server's cumulative
ACK trims sealed segments; on restart or transient failure, the I/O thread
silently reconnects and replays whatever is still on disk.

On-disk format is the QWP wire frame captured verbatim, wrapped in an
8-byte SF envelope (CRC32C + length) so torn tails and silent bit-rot are
caught on recovery. Filenames encode (baseSeq, lastSeq) so trim and
recovery don't have to scan sealed segments.

Auto-reconnect absorbs all transient connection failures with exponential
backoff (capped at 30s); only fatal SF storage errors (corruption, frame
larger than segment cap) propagate to the user. flush() under SF returns
once data is on disk, not on server ACK — natural backpressure when SF
total disk cap is reached makes flush() block until ACKs free space.

All file I/O goes through a new native Files layer ported from the
upstream QuestDB server repo: open/read/write/fsync/truncate/allocate/
length/lock/mkdir/exists/remove/rename/dir-iteration. Software CRC32C
implementation (Castagnoli, polynomial 0x1EDC6F41) added alongside.

Configured via connect string:

  ws::addr=...
     ;store_and_forward=on
     ;sf_dir=/var/lib/qdb/sf
     ;sf_max_bytes=67108864       (per-segment rotation; default 64 MiB)
     ;sf_max_total_bytes=4G       (hard cap → backpressure; default unlimited)
     ;sf_fsync=on                 (fsync after every append; default off)

49 new tests across 6 files: native Files (9), CRC32C (7 incl. property-
fuzz + bit-flip), SegmentLog (14), SegmentLog torture (7 incl. randomized
op-sequence fuzzer + multi-crash), SF integration (10 incl. multi-
reconnect + replay-during-replay + stress), connect-string from-config
(11). 1956 tests pass total.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Multiple correctness fixes to the QWiP store-and-forward client that
were silently losing data under realistic outage scenarios.

Critical:
- markSending() moved after segmentLog.append; nextBatchSequence only
  advances on append success. Disk-full retry no longer crashes with
  IllegalStateException + drift exception, recycling the buffer
  without persistence (C1, C2).
- doReconnectCycle no longer drops pendingBuffer on every reconnect
  attempt. Buffer survives across attempts and is persisted by the
  post-reconnect ACTIVE state (C3).
- createActive closes fd in try/catch on writeHeader/fsync failure;
  no more fd leak on every failed rotation under disk pressure (C4).
- scanActive/replaySegment reject Files.length(fd) == -1 instead of
  treating it as "empty segment" (C5).

Moderate:
- scanActive distinguishes torn tail from mid-stream CRC mismatch;
  bit-rot followed by trailing bytes throws instead of silently
  truncating (M1).
- Files.close accepts any fd >= 0 (was refusing 0/1/2, leaking lock
  fd in containers where stdin/stdout/stderr were pre-closed) (M2).
- Connect-string sf_max_bytes / sf_max_total_bytes parsed as long;
  was capped at ~2 GB by parseIntValue (M3).
- WebSocketSendQueue.client made volatile so close-during-reconnect
  reads the live ref, not a stale one (M4).
- SegmentLog uses ObjList instead of java.util.ArrayList; bytesOnDisk
  is cached and updated incrementally so append() is O(1) zero-alloc
  on the I/O hot path (M6, N3).
- Each Segment caches a native UTF-8 path pointer; remove(String) is
  no longer called per-trim, eliminating the byte[] alloc on the I/O
  thread per ACK (M7).
- retryStalled always re-flags interrupt status (M8).

Cleanup:
- Dead WebSocketSendQueue.safeSendBatch removed (N1).
- @FunctionalInterface on Reconnector (N2).
- Inline FQNs in QwpWebSocketSender / Sender replaced with imports (N6).
- setSegmentLog overload pair co-located with cleaner doc (N8).
- Javadoc added to Files.java public surface and Crc32c.update (N9).
- Single-arg failConnection overload removed; every call site is now
  explicit about fatal vs non-fatal (N10).

Infrastructure:
- New FilesFacade interface + DefaultFilesFacade impl in
  io.questdb.client.std. SegmentLog refactored to use the facade so
  tests can inject OS-level failures (short writes, fstat -1, fsync
  EIO) without filesystem-level tricks.

Tests:
- 12 new red regression tests for the bug fixes above (now green).
- 5 coverage-gap tests for previously-untested error paths (M9):
  unsupported version header, baseSeq mismatch, multi-active
  rejection, oldestSeq edge cases, short-write recovery via
  fault-injection.
- Full SF + Files suite: 70 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three correctness fixes from a follow-up review of the QWiP store-and-
forward client. Each is paired with a regression test that fails on the
unfixed code and passes after the change.

Critical:
- WebSocketSendQueue.retryStalled split its catch ladder so a fatal
  SfException during stall-retry (corruption, oversized frame, fsync
  EIO) is classified the same as the main-loop sendBatch catch:
  failConnection(_, true) terminal, not (_, false) reconnect. The old
  behaviour silently reconnected and recycled the buffer as if sent,
  hiding storage failures and risking infinite loops on persistent
  errors. (C1)

- SegmentLog.createActive registers the freshly-opened fd into the
  Segment before calling allocNativePath, and the try block now wraps
  the path-allocation call. The catch closes the fd and best-effort
  removes the orphan .sfa file. The previous order leaked one fd per
  failed rotation under OOM pressure. (C2)

- ResponseHandler.onBinaryMessage error branch now fails the connection
  fatally. A server-side per-batch error (parse, schema mismatch, write,
  security, internal) is a protocol-level rejection of specific bytes;
  reconnecting and re-sending the same payload produces the same error.
  Under SF the rejected frame sits on disk and replay-on-reconnect
  shipped it again, so the previous transient classification turned any
  poisoned frame into an unbounded reconnect loop. (C4)

Infrastructure:
- FilesFacade gains allocNativePath / freeNativePath. SegmentLog now
  routes all path-pointer alloc/free through the facade so tests can
  inject OOM at the exact moment between openCleanRW and the try block
  in createActive. Required for the C2 regression test.

Tests:
- testCreateActiveDoesNotLeakFdOnAllocNativePathOom (SegmentLogTest)
- testRetryStalledTreatsSfStorageErrorAsTerminal (SfIntegrationTest)
- testPoisonedFrameInSfDoesNotLoopForever (SfIntegrationTest)
- Full suite: 1971 tests pass (was 1968), zero regressions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
SegmentLog.scanDirectory used insertion sort over the segments list. At
the documented sf_max_total_bytes / sf_max_bytes ceiling (1 TiB / 64
MiB ≈ 16K segments) that is ~268M comparisons + array shifts → multi-
second wall time before the I/O thread can start.

Replaced with an in-place quicksort with median-of-three pivot. O(N log
N) average, no allocation (matching the surrounding code's discipline),
recursion depth bounded by ~2 log₂(N) by always recursing into the
smaller partition and looping on the larger.

Median-of-three is required because the insertion sort's only saving
grace was O(N) on already-sorted input, which is the common case from
readdir on filesystems that return entries in lexicographic (and
therefore baseSeq-hex) order. A naive first-element-pivot quicksort
would degrade back to O(N²) in exactly that scenario.

Tests:
- testLargeSegmentCountReopensInOrder (SegmentLogTortureTest):
  generates 1024 sealed segments, reopens, asserts the replay returns
  every appended seq exactly once in order, and that reopen+replay
  completes within a generous wall-clock bound that catches a
  regression back to O(N²) at the production ceiling.
- Full SF + adjacent suite: 117/117 pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
SegmentLog.rotate freed old.pathPtrNative and assigned the new sealed-
path pointer in non-atomic order: free → assign-path → alloc → set
sealed. If allocNativePath OOMed mid-sequence the segment was left in
two simultaneously broken states:

- pathPtrNative still held the freed pointer (the assignment never
  ran). On close() the segments-cleanup loop called freeNativePath
  on it again — a native-heap double-free that crashed the JVM with
  malloc free-list corruption (verified via the new red test on the
  unfixed code: surefire reported "The forked VM terminated without
  properly saying goodbye").
- sealed/lastSeqOnDisk were never set, so trim()'s !s.sealed guard
  silently skipped the segment. The .sfs file on disk was never
  reclaimed within the lifetime of the process.

Fix:
- Set old.pathPtrNative=0 immediately after the free so a subsequent
  OOM cannot leave a stale freed pointer in the field.
- Mark sealed=true / lastSeqOnDisk=lastSeq BEFORE allocating the new
  pointer. After OOM the segment is still classified as sealed so
  trim can reclaim it.
- trim() now handles the recovery case where pathPtrNative is 0 by
  falling back to ff.remove(path) (one-time per trim, acceptable —
  these recovery branches only fire after an OOM, not on the hot
  path).

Test: testRotateOomLeavesSegmentInRecoverableSealedState
(SegmentLogTest). Forces rotation under an injected allocNativePath
OOM, then asserts (a) close() does not double-free, (b) trim() reaps
the orphan .sfs file. On the unfixed code the JVM dies; on the fix
118 SF + adjacent tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pre-fix behaviour: SegmentLog.trim only deleted sealed segments. Frames
that the server had acknowledged but lived in the still-open active
segment stayed on disk until the next natural rotation. On restart the
new sender replayed those frames and the public Sender.storeAndForward
contract — "trimmed when the server acknowledges it" — was load-bearing
on server-side seqTxn dedup to avoid duplicate rows. Worst case at the
default 64 MiB segment size: ~640 acked batches re-shipped per restart.

Fix: when every frame in the active segment has been acked, force-rotate
the active (sealing the file) and immediately remove the just-sealed
segment. nextSeq is preserved across the auto-rotate so subsequent
appends keep monotonic FSNs. The only safe-guard is the rotate-OOM
recovery state from the M2 fix: when active.sealed is already true,
the sealed pass above has already trimmed the file and force-rotate
is skipped.

Tests:
- testTrimRotatesAndDropsFullyAckedActiveSegment (SegmentLogTest):
  unit-level proof that trim with full coverage drops the active
  contents to a fresh empty segment, with nextSeq preserved.
- testTrimPartialAckOfActiveLeavesItIntact (SegmentLogTest, replaces
  testTrimNeverDeletesActive): proves partial ACKs do not seal a
  segment that still contains unacked data.
- testRestartAfterAckedBatchesReplaysNothing (SfIntegrationTest):
  end-to-end. Send 5 batches, wait for trim, close, reopen with a
  fresh sender, send one more, assert server saw exactly 6 frames
  (5 originals + 1 new, no replays).
- testCapturedBytesMatchWireBytes (SfIntegrationTest): updated to use
  a non-acking handler so the test thread's log.replay() doesn't race
  the I/O thread's trim.
- testAutoReconnectAndReplay (SfIntegrationTest): expected frame
  count drops from 5 to 4 (msg1 trimmed before reconnect, no replay).
- testMultiTableSurvivesReconnect (SfIntegrationTest): expected frame
  count drops from 6 to 5 (alpha-1 trimmed before reconnect).

Public API:
- Sender.storeAndForwardDir Javadoc rewritten to honestly describe the
  new contract: acked batches are reclaimed in real time; only batches
  whose ACK had not been received before sender shutdown are replayed
  on the next sender against the same directory.

Full suite: 1975 tests pass, zero regressions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The CRC32C table was lazily computed by the first thread to call
Java_io_questdb_client_std_Crc32c_update, with a `volatile int
crc32c_table_ready` flag for the once-guard. C's `volatile` does not
provide acquire/release semantics — it only suppresses compiler
reordering. On weakly-ordered platforms (aarch64, the QuestDB ARM
Mac/Linux builds) a second thread could observe `ready == 1` while
still reading partial / zero entries from `crc32c_table`, producing a
silently wrong CRC. The downstream effect would be SegmentLog.scanActive
mis-classifying a valid frame as a torn tail and silently truncating
good frames after restart.

In practice the JNI transition's implicit barriers and x86's TSO
made this benign on the platforms we test on. But the C standard does
not guarantee it, and the bug class is the kind that surfaces only
under load or after a JVM upgrade tightens its barrier semantics.

Fix: drop the lazy init entirely. The table is a deterministic
function of the Castagnoli polynomial — pre-compute it once and embed
the values as a `static const uint32_t[256]` initializer. Zero
runtime cost, zero races, perfectly portable. The polynomial is
documented in a comment so the table can be regenerated if needed.

Tests:
- Existing Crc32cTest (7 tests): empty input, known vector, chaining,
  zeros stable, property-fuzz over 200 random inputs × 5 splits,
  bit-flip-changes-CRC over 256 positions, empty-chaining-idempotent.
  All pass — table values verified correct against the lazy-init
  algorithm by SegmentLog round-trip tests as well.
- Full suite: 1975/1975 pass, zero regressions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two tests targeting the disk-cap deadlock scenario the reviewer flagged
as a separate "high" severity finding. Both pass under the per-frame
trim that landed in the previous commit; both would deadlock on the
pre-fix code where trim left the active segment alone.

testSingleActiveSegmentDoesNotDeadlockOnFullCap (new):
  Sets sf_max_bytes == sf_max_total_bytes, so no natural rotation
  can ever fire — the append-time projection check raises disk-full
  before rotate() is reached. Pre per-frame trim this state was
  permanent: the active was the only segment on disk, trim couldn't
  touch it, ACKs freed nothing. Force-rotate-on-fully-acked makes
  the active itself reclaimable, so an ACK covering every appended
  frame now restores capacity. The test stresses recovery further
  by refilling to disk-full a second time.

testMaxTotalBytesTriggersDiskFullThenRecoversOnAck (renamed from
  testMaxTotalBytesTriggersDiskFull):
  The "Acceptable: only the active was on disk and active doesn't
  trim" branch in the catch block — which the reviewer specifically
  cited as evidence the deadlock was tolerated by the test suite —
  is gone. The recovery append after trim now must succeed; we
  assert it directly instead of swallowing a second disk-full.

Full suite: 1976 tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sender.storeAndForwardFsync's Javadoc claimed the default sf_fsync=off
"runs fsync on rotation and on explicit flush()". In practice flush()
never called segmentLog.fsync() — the only production fsync paths were
per-append (gated on fsync_each_append, the sf_fsync=on path),
rotation (rare), and new-segment header creation (rare). With default
config a sender that flushes coarsely between rotations was leaving
all bytes in the OS page cache; an OS crash would lose them despite
the docs implying durability.

Two-part fix:

1. Doc honesty (Sender.java):
   storeAndForwardFsync rewritten to spell out exactly what
   sf_fsync=off and sf_fsync=on mean. The default leaves bytes
   between rotations in the page cache — process crashes survive,
   OS crashes don't.

2. Opt-in fsync-on-flush:
   New knob storeAndForwardFsyncOnFlush(boolean) on the builder,
   parsed as sf_fsync_on_flush=on/off in the connect string. When
   enabled, every flush() (and the implicit flush in close()) routes
   a fsync request to the I/O thread before returning. Off by
   default — small-batch + frequent-flush senders pay one disk fsync
   per call, which is unacceptable for high-rate workloads.

   The fsync runs on the I/O thread because SegmentLog is single-
   threaded (the I/O thread owns every read/write/trim/rotate).
   Calling segmentLog.fsync() from the user thread would race
   against an in-flight trim() (which may force-rotate the active
   under per-frame trim) or append() from a concurrent send. The
   signal pattern is the same one used by ping/pong: user sets
   fsyncRequested + waits on fsyncComplete; I/O thread observes the
   flag at the top of its iteration, performs the fsync, publishes
   outcome via fsyncError + fsyncComplete. Concurrent callers are
   serialised by fsyncLock so each gets its own round-trip.

Tests:
- testFlushDoesNotFsyncByDefault (SfIntegrationTest): with
  fsyncOnFlush=false the FsyncCountingFacade observes ZERO fsyncs
  during flush() — proves we did not regress the small-batch
  hot path.
- testFlushFsyncsWhenOptedIn (SfIntegrationTest): with
  fsyncOnFlush=true the same counter observes >= 1 fsync per
  flush() — proves the wiring is end-to-end.
- testSfFsyncOnFlushParses (SfFromConfigTest): connect-string
  round-trip.
- testInvalidSfFsyncOnFlushValueRejected (SfFromConfigTest): bad
  value rejected with a useful message.
- testSfFsyncOnFlushOnTcpRejected (SfFromConfigTest): TCP transport
  rejects the WebSocket-only knob.

Full suite: 1981 tests pass, zero regressions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Pre-fix: flushPendingRows checked schemaResetNeeded once, at the top of
the encode pass. If the I/O thread completed a reconnect AFTER that
check but BEFORE encoder.finishMessage, the encoded bytes carried
stale schema-id refs into the previous connection's id space. Those
bytes then went through segmentLog.append (persisted to SF) and out to
the new server, which rejected them. Pre-C4: silent unbounded
reconnect-replay loop. Post-C4: terminal failure with no self-heal —
the user has to manually clear the SF dir to recover.

Fix (option C from the previous review): connection-generation tag on
each encoded batch.

- New volatile long QwpWebSocketSender.connectionGeneration, bumped
  by performReconnect AFTER schemaResetNeeded is flipped. Order is
  load-bearing: a reader that observes the new generation also sees
  the new schemaResetNeeded (volatile happens-before within the
  writer thread).

- flushPendingRows now wraps the encode in a retry loop:
    long genBefore = connectionGeneration;        // read FIRST
    if (schemaResetNeeded) reset;
    encode...
    if (connectionGeneration != genBefore) discard + retry;

  Re-encoding is cheap because the source rows in QwpTableBuffer are
  not reset until AFTER sealAndSwapBuffer (line 1830 in this commit).
  encoder.beginMessage internally calls buffer.reset(), so the
  discard step is implicit.

- Bounded at MAX_SCHEMA_RACE_RETRIES = 10. Reconnects firing faster
  than a single encode is pathological and surfaces as
  LineSenderException to the user rather than a silent infinite
  loop. countNonEmptyTables extracted as a small helper so the
  retry loop reads cleanly.

Together with C4 (server-error responses are now terminal) the
schema-reset race goes from "silent data corruption + infinite loop"
to "no poisoned batch ever reaches SF in the first place".

Tests:
- testGenerationBumpBetweenBatchesTriggersSchemaReset (SfIntegrationTest):
  reflectively bumps connectionGeneration + sets schemaResetNeeded
  between batches; asserts the next batch carries a fresh schema
  definition (frame size >= the first batch).
- testSchemaResetRaceUnderConcurrentBumps (SfIntegrationTest, 30s
  timeout): spawns a bumper thread that flips schemaResetNeeded +
  bumps generation on a 50us cadence while the main thread flushes
  200 batches in a tight loop. Asserts every batch either ships
  successfully OR the bounded MAX_SCHEMA_RACE_RETRIES fail-fast
  trips — never a silent escape, never an unexpected exception.

Mid-encode injection without instrumentation is timing-sensitive;
the stress test is a smoke check that the retry loop does not crash
under load. End-to-end "poisoned bytes never reach the server"
verification would need a strict QWP-wire-format-validating server
test handler — left as a future test.

Full suite: 1983 tests pass, zero regressions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each fix is paired with a red-then-green regression test.

C1 — SegmentLog.rotate() partial failure deadlock
  When rotate's allocNativePath OOMs or createActive fails after the
  rename succeeds, `active` is left pointing at a sealed segment with
  fd=-1. A subsequent small append that fits under the cap bypasses
  the rotate trigger and falls through to ff.write(fd=-1, ...) which
  returns -1 and is wrapped as the recoverable SfDiskFullException.
  The I/O thread retries forever (disk-full backpressure path) and
  the user thread blocks in flush() — silent deadlock. Guard added at
  the top of append() that throws a fatal SfException when active is
  in the post-rotate sealed/fd=-1 state.

C2 — Symbol-delta watermark not reset on reconnect
  resetSchemaStateForNewConnection cleared maxSentSchemaId and
  per-table schema ids but left maxSentSymbolId and
  currentBatchMaxSymbolId untouched. The encoder's first
  post-reconnect batch then shipped a delta dictionary that excluded
  every symbol id <= the old server's high-water mark; column refs
  into the new server's empty dictionary decoded as garbage (or were
  rejected). Both watermarks are now reset alongside the schema
  state.

C3 — trim() forgets sealed segments when remove() fails
  trimSealedSegments discarded the boolean return of ff.remove(). On
  Windows sharing-violation under antivirus, transient NFS errors,
  ESTALE, etc., the file stayed on disk while bytesOnDiskCache was
  decremented and the segment was dropped from the in-memory list.
  Failure modes: (a) bytesOnDisk underreports reality, so
  sf_max_total_bytes stops being an enforceable cap; (b) on next
  process start scanDirectory rediscovers the orphan .sfs and
  re-ships its already-acked frames to the new server. Failed
  removes now keep the segment in the list with a removePending
  flag — bytesOnDiskCache stays honest, replay() skips removePending
  segments (so already-acked frames don't re-ship), the next trim()
  retries naturally, and close() does a last-chance retry too.

C4 — Future server ACK can delete unsent SF data
  InFlightWindow.acknowledgeUpTo clamps incoming server sequence at
  highestSent; ResponseHandler.onBinaryMessage was passing the raw
  uncapped sequence into segmentLog.trim(fsnAtZero + sequence) with
  no symmetric clamp. A buggy/replayed/malformed server ACK with a
  sequence beyond what the client had sent drove SegmentLog.trim
  past every real lastSeq, force-rotating the active segment and
  unlinking every sealed segment whose lastSeq <= the bogus value —
  including frames mid-replay the new server had never seen.
  Permanent silent data loss. The trim path now clamps the sequence
  at nextBatchSequence-1 (mirroring the InFlightWindow cap) and
  emits a WARN when the cap fires.

C5 — Replay window-wait spin hangs after mid-replay socket drop
  replayPersistedFrames's window-wait spin only called
  tryReceiveAcks while client.isConnected() returned true. When the
  server dropped mid-replay, isConnected went false, no ACKs could
  arrive, hasWindowSpace stayed false, and the spin ran forever —
  preventing the outer state machine from running another
  doReconnectCycle and blocking flush()/close() until the user
  signalled shutdown. Compounding: replayPersistedFrames swallowed
  internal failures via failConnection(non-fatal) and returned
  normally, so doReconnectCycle returned true and ioLoop cleared
  reconnectRequested — losing the failure. The spin now exits on
  !isConnected or reconnectRequested; doReconnectCycle clears the
  stale reconnectRequested before replay (so the freshly-reconnected
  spin doesn't bail) and re-checks it after replay (so internal
  failures propagate to the outer loop's backoff/retry).

Tests:
  SegmentLogTest:
    + testRotateOomThenSmallAppendThrowsFatalNotDiskFull (C1)
    + testTrimRemoveFailureMustNotForgetSealedSegment   (C3)
  SfIntegrationTest:
    + testReconnectResetsSymbolWatermark               (C2)
    + testFutureAckMustNotTrimUnsentSfData             (C4)
    + testReplayMustNotHangWhenConnectionDropsMidReplay (C5)

All 1988 client tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Plain main()-style benchmark (same idiom as StacBenchmarkClient — no JMH
dependency required). Measures the per-frame latency of the SF persist
path: CRC32C over the payload, frame-envelope construction, two pwrite
syscalls (header + payload), bookkeeping, and an optional fsync when
--fsync=each. Reports min / p50 / p90 / p99 / p99.9 / max in nanoseconds
plus throughput in frames/sec and MB/sec.

Smoke run on darwin-aarch64 (APFS):
  --payload-bytes=512 --measure=20000 --fsync=off
    p50 ≈ 4 µs, p99 ≈ 14 µs, ~150K frames/sec, ~74 MB/sec
  --payload-bytes=512 --measure=5000 --fsync=each
    p50 ≈ 28 µs, p99 ≈ 900 µs, ~16K frames/sec, ~8 MB/sec

Run via Maven exec or directly from the IDE; the class lives under
core/src/test so it has free access to the SF code path without adding
to the production classpath.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the byte-at-a-time CRC32C inner loop in the JNI implementation
with a slice-by-8 variant that consumes 8 input bytes per iteration via
eight parallel 256-entry table lookups whose results are XORed.

The seven additional tables (~7 KB of static read-only data) are derived
from the existing crc32c_table at build time using the standard
`table[k][i] = (table[k-1][i] >> 8) ^ table[0][table[k-1][i] & 0xFF]`
recurrence, which corresponds to "advance the input by one more zero
byte". They are emitted as static const initialisers — same rationale as
the original table: hard-coding sidesteps the C-memory-model pitfalls of
lazy initialisation on weakly-ordered platforms.

Measured on darwin-aarch64 with SegmentLogLatencyBenchmark at the typical
512-byte SF frame payload (warmup=50_000, measure=500_000, fsync=off):

  before:  min ~12_000 ns,  p50 ~14_000 ns
  after:   min   2_625 ns,  p50   3_625 ns

That collapses the per-append CRC cost from the dominant term (~85% of
p50) to a small fraction, which is what the SF store-and-forward layer
needs at high frame rates. The tail (p99/p99.9) is dominated by pwrite
syscalls and OS scheduling, not CRC, and is unchanged.

Correctness is covered by the existing Crc32cTest suite — in particular
testChainingPropertyOverManyRandomInputs (200 random buffers up to 2048
bytes, 5 random split points each) which exercises the slice-8 main loop
plus byte-at-a-time tail across every alignment offset and length class
the SF path can produce.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three independent recovery-time bugs in SegmentLog that all let a
durability layer silently produce or operate on a wrong view of the
on-disk log. Each fix has a red regression test that fails on the
unfixed code and passes after the fix.

1. Mid-rotate crash recovery resets FSN sequence to 0.
   rotate() has a window between ff.rename(.sfa → .sfs) and the
   subsequent createActive(lastSeq + 1) where the process can die or
   createActive can throw (allocNativePath OOM, openCleanRW failure,
   etc.) leaving on disk: one or more sealed .sfs files, no .sfa.
   openInternal saw active==null after scanDirectory and unconditionally
   called createActive(FIRST_SEQ=0), restarting FSN assignment at 0
   even though sealed segments on disk already covered 0..N. The new
   active produced frames whose FSNs collided with sealed FSNs already
   on disk, breaking ACK translation, trim, and replay against data
   the recovery never saw. Fix derives the new active's baseSeq from
   the highest sealed lastSeqOnDisk + 1 (segments is sorted by baseSeq
   and sealed ranges are non-overlapping, so the last entry holds the
   largest lastSeqOnDisk).

   Tests:
   - testMidRotateCrashRecoveryPreservesFsnMonotonicity (fault
     injection: failNextActiveAllocNativePath inside rotate()).
   - testRestartWithOnlySealedSegmentsRecoversCorrectly (independent
     coverage via pure on-disk filesystem manipulation — write frames,
     manually rename .sfa to .sfs — to exercise the open/recovery code
     in isolation from rotate's failure handling, then verify the full
     contract: nextSeq, oldestSeq, replay order, and post-restart
     append).

2. oldestSeq() returned a removePending segment's baseSeq even though
   replay() skips it.
   trim() keeps an undeletable sealed segment in the in-memory list as
   removePending; replay() correctly skips such segments so already-
   acked frames are not re-shipped on reconnect. oldestSeq() returned
   segments.getQuick(0).baseSeq unconditionally — including when the
   first segment was removePending. WebSocketSendQueue pins
   fsnAtZero = oldestSeq() in both the constructor (line 247-248) and
   doReconnectCycle (line 925-926), then asserts fsn == fsnAtZero +
   wireSeq inside the replay visitor (line 974). The mismatch threw
   "SF replay FSN drift" on the first replayed frame; the catch
   triggered failConnection(non-fatal); reconnectRequested fired; the
   I/O loop re-entered doReconnectCycle, called oldestSeq() again with
   the same stale return, and drift fired identically. Permanent
   reconnect loop until either the FS issue cleared AND a non-reconnect
   trim ran (it can't — the I/O thread is stuck reconnecting), or the
   user closed the sender. Fix skips removePending in oldestSeq() the
   same way replay() does.

   Tests:
   - testOldestSeqMustSkipRemovePendingToMatchReplay (unit-level: cross-
     check oldestSeq() against the first FSN replay() actually visits).
   - testReplaySucceedsWithRemovePendingSegmentAtHeadOfList (end-to-end
     integration: real TestWebSocketServer + sender + RemoveFailingSf
     Facade; verified pre-fix to reproduce the reconnect loop with "SF
     replay FSN drift: fsn=2 expected=0", post-fix the 2 unacked frames
     replay successfully and a fresh send reaches the server).

3. Directory scan errors silently treated as EOF / empty log.
   Files.findNext()'s contract is 1=success, 0=EOF, -1=read error.
   scanDirectory's while (rc > 0) loop exited identically on both 0 and
   -1, conflating a real readdir failure (EIO/ESTALE on NFS, etc.) with
   normal end-of-directory. Files.findFirst()==0 means either opendir
   failed (errno set — transient EACCES/EMFILE/ESTALE/ENOMEM) or the
   directory is empty; scanDirectory unconditionally treated it as
   "nothing to scan." By the time scanDirectory runs, openInternal has
   created the directory if missing and successfully opened+locked the
   lock file inside it, so an empty listing is impossible — find==0
   here can only mean opendir failed. The silent fallthrough let
   openInternal proceed to createActive(...) on top of any unscanned
   on-disk segments, aliasing or overwriting still-existing data — the
   exact failure mode a durability layer must guard against. Fix
   throws SfException in both branches; recovery refuses to proceed
   from a partial / unknown view of its own log.

   Tests:
   - testScanDirectoryFailsWhenFindFirstReturnsZero (FilesFacade
     forces findFirst to return 0; pre-fix open silently succeeded
     with empty segments and nextSeq=0 over real on-disk data).
   - testScanDirectoryFailsWhenFindNextReturnsError (FilesFacade
     forces findNext to return -1; same shape, mid-scan readdir
     failure is now fatal).

Full module suite: 1994/1994 green (1988 baseline + 6 new tests).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds the user-facing counterpart to QwpEgressLatencyBenchmark in the
OSS repo. Measures end-to-end wall time of a single row .at()+flush()
against a locally running QuestDB. Default mode is SF on, which
measures user-handover latency: flush() returns when the row is durable
on the local SF segment. -Dsf=false switches to the no-SF path that
blocks for the full server-ACK round-trip (apples-to-apples vs egress).

Pulls in JMH 1.37 as a test-scope dependency, wires the annotation
processor into maven-compiler-plugin, requires jmh.core +
ch.qos.logback.classic in the test module-info. The benchmark's
static initializer downgrades the logback root level to WARN before
any other class loads -- DEBUG-level WS / SF logging would otherwise
emit one log line per flush and inflate measured latency by ~70us.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Lays the foundation for a lock-free, mmap-backed alternative to the
current SegmentLog + WebSocketSendQueue + processingLock design. Today
~85% of the user thread's flush() time is spent parked in
__psynch_cvwait waiting for the I/O thread to signal completion (see
QwpIngressLatencyBenchmark async-profiler flamegraph). The cursor design
moves SF.append onto the user thread, making the cross-thread wait
unnecessary -- the user-thread append microbench now reports p50=42ns
vs ~38us in the legacy SF path on the same hardware.

What lands:

  * mmap/munmap/msync ported from QuestDB OSS into client/std/Files
    (both POSIX and Win32). Native rebuild required per-platform; the
    darwin-aarch64 dev lib is the only one rebuilt locally.

  * MmapSegment: one mmap'd file, format-compatible with the legacy
    SegmentLog (same SF01 magic, 24-byte header, [crc | u32 len |
    payload] frame layout). Single-producer cursor (appendCursor plain
    field, publishedCursor volatile). tryAppend is pure memory + CRC.
    openExisting + scanFrames recover from torn tails.

  * SegmentRing: chain of MmapSegments with hot-spare swap and
    ACK-driven trim. Four cursors, all single-writer (no CAS). Rotation
    rebases the spare's baseSeq at promotion time to avoid the
    precompute race.

  * SegmentManager: JVM-wide background thread that pre-creates spares
    and trims fully-acked segments. Moves the open + truncate + fsync +
    rename + unlink quartet (45k samples / 100k I/O thread samples in
    the legacy flamegraph) off the hot path.

  * CursorSendEngine: facade bundling ring + manager with the API a
    future WebSocketSendQueue rewrite will consume.

  * sf_engine=legacy|cursor config option in LineSenderBuilder. Default
    legacy. Selecting cursor at build time fails fast with a clear "not
    yet wired" message -- the WebSocketSendQueue integration that
    actually consumes CursorSendEngine is the next PR.

  * CursorEngineAppendLatencyBenchmark: standalone microbench for the
    user-thread append path (the floor a wired cursor engine would
    inherit).

20 new tests across the cursor/ package, all green. FilesTest gains a
mmap roundtrip test.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two pre-wiring additions on the cursor side that the upcoming
WebSocketSendQueue replacement will need.

1. SegmentRing.openExisting(sfDir, maxBytesPerSegment)
   Walks *.sfa files in the directory, opens each via
   MmapSegment.openExisting (which already validates header + scans
   torn tails), arranges by baseSeq, and returns a ring with the
   newest as active and the rest as sealed. Validates that the
   recovered segments form a contiguous FSN range -- a gap signals
   manual deletion or partial-write damage and aborts recovery
   rather than silently producing duplicate / missing FSNs after
   restart. Stray .sfa files with bad headers are skipped (logged-
   then-ignored), not fatal.

2. SegmentManager maxTotalBytes cap
   Manager tracks total bytes it has provisioned across all rings
   it serves. When provisioning a hot spare would exceed the cap,
   the manager skips the install and the requesting ring stays in
   BACKPRESSURE_NO_SPARE until ACK-driven trim frees space. Default
   is UNLIMITED_TOTAL_BYTES (no behavioural change for existing
   callers). Disk-full state is logged at WARN, throttled to once
   per 30s so a sustained-full state doesn't drown the log. Cap is
   approximate -- it counts only manager-provisioned segments, not
   the engine's initial active per ring (so the effective on-disk
   cap is maxTotalBytes + (rings * segmentSizeBytes)). Acceptable
   for a runaway-growth guard; documented in the constructor.

Also makes SegmentRing.sealedSegments mutation thread-safe via a
synchronized snapshot path that the I/O loop will use, and marks
SegmentRing.active volatile so cross-thread rotation publication
is correct without a lock.

10 new tests across SegmentRingTest + SegmentManagerTest covering:
recovery happy path, FSN-gap detection, bad-magic skip, cap blocks
provisioning, cap is released by ACK-driven trim. All 2020 tests in
the suite pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CursorWebSocketSendLoop is the cursor-engine equivalent of
WebSocketSendQueue's I/O loop. Owns one I/O thread that:

  * Polls CursorSendEngine.publishedFsn() and walks newly-published
    frames from the engine's segments (active + sealed). Sends each
    frame's payload as one WS binary frame via WebSocketClient.sendBinary
    -- exactly the bytes the legacy WebSocketSendQueue would send,
    minus the 8-byte SF envelope which is engine-internal.

  * Polls the WebSocket for server ACKs via tryReceiveFrame. On each
    successful ACK with cumulative wire seq N, calls
    engine.acknowledge(fsnAtZero + N), which advances ackedFsn so the
    SegmentManager can trim fully-acked sealed segments.

No locks. The producer thread (user) writes into the engine; this
thread reads. publishedFsn is the volatile publish barrier. Sealed-
segment iteration uses the synchronized snapshot accessor added in
the previous commit, so the producer's rotation can't tear the
ObjList underneath us.

PR1 scope is deliberately the happy path. Deferred (TODO PR2):
  * Ping/pong heartbeat
  * fsync-on-flush request channel
  * Per-table seqTxn tracking
  * Reconnect / replay-on-reconnect (walk segments from ackedFsn+1)
  * Disk-full retry (the cap from the previous commit handles the
    upstream signal; PR2 wires the producer-side recovery)
  * Multi-connection failover

Errors are reported via getLastError(); the I/O thread sets it and
exits, producers polling checkError() surface the failure.

Same wireSeq-clamp safety check the legacy path uses (clamp ACK
sequence to nextWireSeq-1 so a malformed/replayed server ACK can't
force trim of segments the new server has never seen).

Companion change: CursorSendEngine.sealedSegmentsSnapshot pass-through
to SegmentRing's thread-safe snapshot accessor.

No new tests in this commit -- the integration test for the wired
end-to-end path lands with the QwpWebSocketSender wiring (next slice).
The class compiles and the 2020-test suite continues to pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ring

Wire CursorSendEngine into the public Sender API and collapse the connect-
string surface around it.

* Sender.build(): cursor is the only async ingest path. sfDir present →
  store-and-forward (mmap'd, recoverable); sfDir absent → memory-only ring
  (same lock-free architecture, no disk).
* Connect-string keys reshaped:
  - drop store_and_forward (sf_dir is the on-switch)
  - drop sf_fsync / sf_fsync_on_flush (replaced by sf_durability)
  - drop sf_engine (cursor is unconditional now)
  - sf_durability=memory|flush|append (today only memory works; flush/append
    throw "not yet supported" until cursor learns fsync)
  - size suffixes accepted on sf_max_bytes / sf_max_total_bytes (64m, 4g)
  - default sf_max_bytes 4 MiB; default sf_max_total_bytes 128 MiB
    (memory mode) / 10 GiB (SF mode) — bounded by default rather than the
    previous unlimited foot-gun
* MmapSegment.createInMemory() — memory-backed (Unsafe.malloc) variant for
  the non-SF async path; same on-the-wire layout.
* SegmentManager — when the registered ring's dir is null, provisions
  memory-backed spares and skips file unlink on trim. Producer-thread
  unpark of the worker (eager wakeup) cuts the post-rotation tail by
  preempting the polling tick.
* CursorSendEngine.appendBlocking — bounded backpressure: deadline (default
  30 s) throws LineSenderException; cumulative getTotalBackpressureStalls()
  counter; throttled WARN log every 5 s of sustained backpressure. No more
  silent unbounded waits.
* CursorWebSocketSendLoop.advanceSegment — replaced fixed-size sealed-list
  snapshot with SegmentRing.nextSealedAfter() / firstSealed() lookups.
  Fixes "sealed snapshot grew unexpectedly large" crash when the producer
  outpaces the wire.

Legacy SF and async-queue paths are dead code at the test layer; their
tests are removed and the remaining src files (WebSocketSendQueue,
SegmentLog, InFlightWindow, Reconnector, SfDiskFullException, SfException)
will be deleted in a follow-up that strips QwpWebSocketSender's legacy
fields and connect overloads.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
QwpWebSocketSender now has a single send pipeline — the cursor SF engine.
The legacy WebSocketSendQueue + SegmentLog stack and the sync (window=1)
mode have no remaining callers, so the sender drops:

  * connect() overloads with SegmentLog / fsyncOnFlush parameters
  * setSegmentLog*, setSegmentLogFsyncOnFlush, setRequestDurableAck,
    getTotalSfDiskFullStalls, getHighestAckedSeqTxn,
    getHighestDurableSeqTxn, getMaxSentSymbolId, ping()
  * sync-mode flushSync, syncPing, waitForAck, AckFrameHandler,
    nextBatchSequence, sync*SeqTxns, sawPong/sawBinaryAck
  * SF reconnect machinery (performReconnect, schemaResetNeeded,
    connectionGeneration, MAX_SCHEMA_RACE_RETRIES retry loop)
  * sendQueue, segmentLog, ownsSegmentLog, fsyncOnFlush fields and
    inFlightWindow member

The remaining flow: ensureConnected wires up CursorWebSocketSendLoop;
flush()/close() drain through the cursor I/O thread; sealAndSwapBuffer
hands sealed buffers to engine.appendBlocking on the user thread.

The orphaned legacy source files (WebSocketSendQueue, InFlightWindow,
Reconnector, SegmentLog, SfDiskFullException, SfException) and the
sync-mode QwpWebSocketAckIntegrationTest are deleted in the same commit
since they no longer have any callers.

Net diff: ~4500 lines removed, ~125 added.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CursorSendEngine's constructor unconditionally created a fresh
sf-initial.sfa at baseSeq=0 even when the SF directory contained
sealed segments from a prior session, restarting the FSN sequence
at 0 and overlapping with FSNs already on disk. ACK translation,
trim, and replay would then operate on overlapping ranges.

The recovery primitive — SegmentRing.openExisting — already exists
and does the right thing (scans *.sfa, sorts by baseSeq, validates
contiguity, picks the highest-baseSeq segment as the new active).
The constructor just never called it. Now it does, in disk mode,
before falling back to the fresh-create path on an empty dir.

Adds a regression test that writes 5 frames to one engine, closes,
reopens against the same dir, and asserts the next append's FSN
continues at 5 instead of restarting at 0.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Captures the design for closing the WS sender's reliability gap that
landed when we collapsed onto the cursor engine: flush()/close() no
longer wait for ACKs (in either mode), and memory mode can drop data
on close-then-exit.

Spec covers:
- flush()/close() contracts (close gets a 5s drain timeout with
  fast-close opt-out)
- Reconnect with bounded per-outage retry budget (default 5 min) and
  schema-reset machinery (volatile connectionGeneration counter to
  close the encode-mid-reconnect race)
- Slot directory model: sf_dir is the parent, sender_id picks the
  slot, foreground sender + opt-in background drainers for orphan
  recovery
- Server-side dedup contract (assumed)

13 decisions locked, no open items.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…illis

QwpWebSocketSender.close() previously stopped the cursor I/O loop the
moment it was called: any frames already published into the engine but
not yet sent (or sent but not yet ACK'd) were silently dropped on
JVM exit. In memory mode that means data loss; in SF mode the next
sender recovers from disk, but the durability claim of close() was
weaker than the spec promised.

Closes the gap with one knob from the durability spec
(design/qwp-cursor-durability.md, decision #3):

  close_flush_timeout_millis (default 5000)
    > 0:  close() blocks until ackedFsn >= publishedFsn or timeout
    0/-1: fast close — no drain, opt-in to legacy fast-exit behavior

On timeout, log WARN and proceed with shutdown. SF-mode pending data
is recoverable; memory-mode pending data is not.

Wired through:
  - LineSenderBuilder.closeFlushTimeoutMillis(long)
  - connect-string key close_flush_timeout_millis
  - new QwpWebSocketSender.connect overload that takes the timeout

Tests cover all three regimes:
  - delayed-ACK server: close blocks ~ack delay
  - timeout=0: close returns immediately
  - silent server: close times out at the configured cap, logs WARN

This is decision #3 of the spec; subsequent commits add the
connectionGeneration foundation, reconnect/replay, slot dirs,
and background drainers.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Re-adds the volatile generation counter (and its companion retry loop in
flushPendingRows) that the cursor strip had removed. This is the
foundation the reconnect work (#20/#21) builds on — the producer needs a
way to detect that the wire-side actor has rotated state mid-encode so
it can discard now-poisoned schema-ID refs and re-encode with full
schema definitions.

What lands here:

  * QwpWebSocketSender: volatile connectionGeneration + lastSeenGeneration
    pair. Bumped on initial recovery from disk (the recovered FSNs were
    never seen by *this* server connection, so the first batch must
    re-publish full schemas). Reconnect path will bump in subsequent
    work.

  * flushPendingRows: encode-mid-reconnect retry loop. Sample gen before
    encode + after finishMessage; if it changed, discard the encoded
    bytes (table buffers haven't been reset yet — source rows are
    intact) and retry with reset schema state. Bounded at
    MAX_SCHEMA_RACE_RETRIES = 10 so reconnect-faster-than-encode
    surfaces a hard error instead of spinning.

  * CursorSendEngine.wasRecoveredFromDisk(): single-bit accessor the
    sender reads during ensureConnected to decide whether to bump.

  * SegmentRing.openExisting: filter out empty hot-spare leftovers
    (frameCount=0) from prior sessions. Those carry the provisional
    baseSeq=0 and would otherwise collide with the real baseSeq=0
    segment and trip the contiguity check. Surfaced by the new
    recovery test — caught a real bug in the recovery scan.

  * Test hooks bumpConnectionGenerationForTest / accessors for gen and
    maxSent*Id so reconnect-effect tests can run without spinning up
    the (still-not-implemented) reconnect path.

Tests cover: gen=0 for fresh connect, gen=1 after disk recovery, gen
bump triggers schema-state reset on the next encode and is sticky
(further flushes without bump don't re-reset).

Spec decisions #4 and #5 land here.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The cursor I/O loop previously treated any wire failure as terminal —
first disconnect = sender broken, every subsequent batch threw. Now,
when the sender wires a ReconnectFactory + ReconnectListener, a wire
failure triggers:

  1. WARN log
  2. Build a fresh WebSocketClient via the factory (same auth/TLS/host)
  3. Reset wire state: nextWireSeq=0, fsnAtZero = engine.ackedFsn() + 1
  4. Reposition the cursor at the first unacked FSN (replay)
  5. Notify the listener → producer's connectionGeneration bumps so
     the next encode emits full schema definitions, not refs the new
     server has never seen
  6. Outer ioLoop continues — nextWireSeq=0 starts on the new wire,
     trySendOne picks up at the repositioned cursor and replays every
     unacked frame, then continues with whatever the producer publishes
     next

Added in main:
  * CursorWebSocketSendLoop.ReconnectFactory + .ReconnectListener
    interfaces (both functional, both null-able for legacy "fail-fast"
    semantics)
  * positionCursorAt(fsn) — walks frames inside the segment containing
    fsn to find the byte offset
  * SegmentRing.findSegmentContaining(fsn) + CursorSendEngine
    pass-through — used by the cursor reposition
  * QwpWebSocketSender extracts buildAndConnect() to use both for the
    initial connect and as the reconnect factory; onWireReconnect()
    is the listener that bumps connectionGeneration

This commit covers the *mechanics* (one attempt, succeed-or-fail).
The follow-up commit adds policy: exponential backoff with jitter,
per-outage time cap (reconnect_max_duration_millis, default 300s
per spec decision #2), and auth-failure detection (401/403/non-101
treated as terminal so the retry budget isn't wasted on errors that
won't fix themselves).

Two integration tests:
  * testReconnectAfterServerInducedDisconnect — server ACKs then
    closes; sender reconnects, second batch lands on the new wire
  * testReplayResendsUnackedFramesAcrossReconnect — server receives
    the first frame WITHOUT ACKing then closes; sender reconnects
    and replays the unacked frame on the new connection

Spec decisions #5 (encode-mid-reconnect race) and the core of
#1/#2 (reconnect mechanics) land here.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replaces the single-attempt reconnect with a per-outage retry budget:
exponential backoff with jitter, capped duration, terminal classification
for HTTP upgrade failures (401/403/426/...) so misconfig surfaces fast
instead of grinding through the cap.

Knobs (WebSocket only, all bypassable from connect string + builder):
  reconnect_max_duration_millis   default 300_000  (5 min)
  reconnect_initial_backoff_millis default 100
  reconnect_max_backoff_millis     default 5_000

Auth-terminal detection walks the cause chain — the WebSocketClient's
"WebSocket upgrade failed:" sentinel is wrapped at least once by the
connect path, so a top-level message-only check missed it.

Tests: testReconnectGivesUpAfterCap exercises the budget exhaustion via
server.close() (TCP refused on every retry); testTerminalUpgradeError-
AbortsReconnect uses a raw-socket fixture that 101s the first connection
then 401s every subsequent one.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
sf_dir is now the *group root*, not a slot. Each sender owns
<sf_dir>/<sender_id>/, taking an exclusive flock on <slot>/.lock for its
lifetime. Two senders pointing at the same slot is the multi-writer
footgun the model exists to prevent — their FSN sequences would
interleave on disk and corrupt recovery. Detected at acquisition time;
second sender fails fast with the holder's PID in the diagnostic.

Knobs:
  sender_id  (default "default")  — slot identity inside the group root

Allowed sender_id chars: letters, digits, _ - (verbatim dir name).

SlotLock writes the holder's PID into the lock file at acquisition; a
contended acquire reads it back so the error message names the
offending process. flock is released by the kernel on hard process
exit, so a crashed sender doesn't leave the slot wedged.

Tests:
- SlotLockTest: acquire creates dir + .lock, second acquire contends,
  close releases, distinct slots coexist.
- SfFromConfigTest: sender_id creates named slot; two senders with
  same id collide on lock; two senders with distinct ids coexist;
  invalid char in sender_id rejected at parse time.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three small things:

1. initial_connect_retry (default false). When true, the startup connect
   goes through the same backoff/cap/auth-terminal loop as in-flight
   reconnect. Default off because a misconfigured host shouldn't sit
   retrying for the cap on startup. Auth failures stay terminal in
   either mode.

2. sf_append_deadline_millis. Was a hardcoded 30s constant; expose so
   tight-SLA users can lower and offline-tolerant pipelines can raise.

3. Two new counters on the cursor I/O loop, exposed on
   QwpWebSocketSender:
   - getTotalReconnectAttempts()  — succeeded + failed (diverges from
     getTotalReconnectsSucceeded() when the server is flapping)
   - getTotalFramesReplayed()     — frames re-sent on the post-reconnect
     catch-up window; non-zero confirms replay actually fired.

Implementation: extracted the reconnect retry-with-jitter loop into a
static CursorWebSocketSendLoop.connectWithRetry helper so ensureConnected
and the I/O loop's fail() path share verbatim semantics (auth-terminal,
backoff, jitter, throttled logs, cap). Replay counter uses a snapshot of
publishedFsn at swapClient time as a target — incremented per frame
sent, cleared once we cross the boundary. Branch is cold on the
steady-state path.

Tests: InitialConnectRetryTest covers the no-retry-fails-fast path, the
retry-succeeds-when-server-comes-up path, and the retry-gives-up-after-cap
path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
bluestreak01 and others added 20 commits April 27, 2026 02:44
Foundation for background-drainer adoption. The scan + visibility piece
lands now; the drainer runtime that actually empties orphan slots is a
follow-up.

A "candidate orphan" is a sibling slot under sf_dir that:
- isn't the foreground sender's own slot (filtered by sender_id)
- contains at least one *.sfa segment file
- doesn't have a .failed sentinel

Lock state intentionally isn't part of the candidate filter — testing it
requires opening + flocking the lock file, which races with concurrent
acquirers. The drainer pool will attempt to lock each candidate in turn
and skip ones that fail.

The .failed sentinel is the "bounded automatic retry, then human-in-the-
loop" hop in the spec: drainer gives up after its reconnect cap →
drops .failed → exits. Future scans skip the slot until the operator
clears the file.

Knobs (WS-only, default off):
  drain_orphans=false             — scan + log; future: spawn drainers
  max_background_drainers=4       — cap on concurrent drainers

When drain_orphans=true today, the foreground sender's startup logs the
count + first few orphan paths so operators have visibility while the
drainer runtime is still pending.

Tests:
- OrphanScannerTest: empty group root, missing dir, candidate detection,
  empty-slot exclusion, .failed exclusion, sender_id exclusion, multiple
  candidates, isCandidateOrphan direct.
- OrphanScanIntegrationTest: a "ghost" sender writes data with no ACKs
  and dies; a fresh sender with a different sender_id sees the ghost slot
  as an orphan via OrphanScanner.scan and its own slot is filtered out.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Every frame written through the cursor SF path now carries its full
schema definition and the complete symbol-dictionary delta starting at
id 0. No schema-by-id refs, no incremental delta-dicts.

The bytes persist on disk and get replayed against fresh server
connections — post-reconnect, post-restart, or via background drainers
adopting orphan slots. A frame whose schema ref points at an ID the new
connection has never seen is unrecoverable; the spec's
dedup-by-messageSequence assumption fixes duplicates, not stale refs.
The previous testReplayResendsUnackedFramesAcrossReconnect only covered
single-batch replay (first batch always carries full schemas) so the
gap wasn't caught.

Implementation: encode pass forces confirmedMaxId=-1 (full symbol delta
from 0) and useSchemaRef=false (full schema definition, never a ref).
Producer-side maxSentSchemaId / maxSentSymbolId tracking is now
effectively dead state — left in place as a no-op safety net.

Cost: bytes per batch grow vs the prior delta encoding. Acceptable for
correctness across the entire recovery story.

Test: SelfSufficientFramesTest sends two batches with distinct symbol
values over the same connection and verifies batch 2 carries
deltaStart=0 with deltaCount ≥ 2 — i.e. it redefines the prior batch's
symbols too, instead of starting from where the prior delta left off.

Spec updated with decision #14.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three correlated bugs that together orphaned data on recovery:

1. CursorWebSocketSendLoop.start() began at engine.activeSegment(),
   skipping every sealed segment on disk. After a crash + restart with
   multiple unacked segments, only the active's tail would replay; all
   sealed-segment data sat orphaned. Fixed by positioning at
   engine.ackedFsn() + 1 (same as swapClient does on reconnect) — the
   cursor walks sealed segments oldest-first and falls through to active
   only when sealed is exhausted. Existing replay test only covered
   single-batch replay (first batch always carries full schemas), so
   the gap wasn't caught.

2. CursorSendEngine recovery left ackedFsn = -1 even when earlier
   segments had been trimmed before the crash (lowestBaseSeq > 0). With
   ackedFsn -1, positionCursorAt(0) would land before any segment exists
   and fall through to active.publishedOffset() — same orphan symptom.
   Fixed by seeding ackedFsn = lowestBaseSeq - 1 on recovery; everything
   trimmed must have been acked, so this is a sound lower bound.

3. SegmentManager.fileGeneration started at 0 even when the slot dir
   already contained sf-0000000000000000.sfa from a prior session.
   Manager would then mint its first hot spare at that name —
   openCleanRW truncates the file, scrambling the in-flight mmap of the
   active segment under the I/O loop. Spec called for this fix at
   line 93 ("seeds fileGeneration to max(existing) + 1"); now done in
   register() by scanning the slot dir for sf-<hex>.sfa files.

Test: RecoveryReplayTest writes 50 multi-segment-spanning rows against a
silent server (no acks), closes fast, then opens a fresh sender against
an ack server pointed at the same slot. Asserts all 50 distinct row
values reach the new server. Without the start() fix, only the active
segment's frames replay (subset). Without the fileGeneration fix, the
in-flight mmap gets clobbered and the cursor walks zero-padded garbage.

Adds getTotalFramesSent / getTotalAcks accessors on QwpWebSocketSender
(used during diagnosis; useful in their own right).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Wires the drainer runtime onto the orphan-scanner foundation. With
drain_orphans=true the foreground sender now actually empties sibling
slots holding unacked data instead of just logging that they exist.

Per-drainer lifecycle:
1. Open CursorSendEngine on the slot — its constructor takes the slot
   lock; if another sender or drainer holds it, the engine throws and
   the drainer exits silently (LOCKED_BY_OTHER, not a failure).
2. Open a fresh WebSocketClient via the foreground sender's connect
   factory — separate connection, same auth/host/port/TLS config.
3. Run a CursorWebSocketSendLoop until ackedFsn catches up to the
   publishedFsn snapshot taken at startup.
4. On terminal failure (auth, recovery, budget), drop a .failed
   sentinel into the slot. Future scans skip it until an operator
   clears it manually — bounded retry, then human-in-the-loop.

Pool: bounded fixed-thread executor, daemon threads, sized by
max_background_drainers (default 4). Closes via cooperative stop +
3s grace; daemon threads ensure no JVM-exit blocking.

Visibility: QwpWebSocketSender#getBackgroundDrainers returns a snapshot
list of live drainers with {slot, target, acked, outcome, lastError}.

Test: ghost sender writes 30 distinct rows against a silent server and
closes fast — leaves an unacked slot. Foreground sender opens the same
group root with a different sender_id and drain_orphans=true against an
ack server; asserts every distinct payload reaches the new server. Plus
a sentinel-skip test confirming an operator-set .failed file disqualifies
the slot from the next foreground run's scan.

Empty active segments and stale hot spares are left in the slot dir per
spec decision #13 ("no automatic cleanup of empty slot dirs"); the
scanner's no-op behavior on empty slots makes this cheap.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Hand-off doc for the follow-up work on cursor SF durability. Captures
what's done on this branch, what's left (multi-host failover at the
top — needs the server-side repo to validate end-to-end), and where to
look in the code. Spec decisions 1–14 are locked; this is the residual
list.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Multi-issue review pass over the cursor SF stack from PR-17. Each
fix lands with a regression test (or extends an existing one); full
QWP suite green at 1053 tests.

- recovery sort: SegmentRing.openExisting used selection sort, which
  regressed the legacy SegmentLog perf fix (commit 86b6e6f) on the
  cursor path. Replaced with median-of-three in-place quicksort —
  median-of-three needed because lexicographic readdir returns our
  zero-padded hex names already-sorted, the worst case for a naive
  pivot. testLargeSegmentCountReopensInOrder guards against
  regression.

- cap accounting: SegmentManager.register/deregister ignored the
  bytes a recovered or adopted ring already owns, so the documented
  sf_max_total_bytes cap could be exceeded by a full segment-set on
  restart. Now seeds totalBytes from ring.totalSegmentBytes() and
  serviceRing updates the counter under the same lock so concurrent
  register/deregister stays consistent.

- torn-tail observability: openExisting silently truncated on the
  first bad CRC with no log or signal. Now distinguishes attempted-
  but-failed frame writes (non-zero bytes past lastGood) from clean
  unwritten space; emits WARN with byte count + offset and exposes
  tornTailBytes() for diagnostics. No false alarms on fresh hot
  spares or partial fills.

- drainOrphans Javadoc: setter claimed scan/log only with drainer
  runtime "as a follow-up", but build() spawns the
  BackgroundDrainerPool. Rewritten to describe actual adoption.

- BackgroundDrainerPool submit/close race: volatile-boolean gate
  let close() return while submit() was about to register a drainer.
  AtomicInteger CAS gate with sign-bit closed flag closes the window.

- CursorWebSocketSendLoop close hangs and reconnect leak: start()
  try/catches Thread.start() and assigns ioThread only on success;
  close() guards on isAlive() and closes the current client field;
  reconnect closes the previous WebSocketClient before assigning the
  new one.

- row-API methods accepting rows after the I/O loop went terminal:
  table()/longColumn()/atNow() etc. now surface the loop's terminal
  error on the next call instead of swallowing it until the next
  flush().

- non-success ACK not marking the loop fatal: the I/O response
  handler now routes through recordFatal so the loop exits cleanly
  instead of looping on the same rejected frame.

- CursorSendEngine close() left .sfa files when fully drained:
  captures fullyDrained before ring.close() and unlinks all segment
  files via unlinkAllSegmentFiles(dir) when nothing is pending
  replay.

- SegmentRing recovery leaked empty .sfa files: openExisting now
  Files.remove(path) after closing zero-frame segments instead of
  letting them accumulate across restart cycles.

- closeFlushTimeoutMillis sentinel collision: -1 was both the
  documented "no timeout" value and the parameter-not-set sentinel.
  Introduced CLOSE_FLUSH_TIMEOUT_NOT_SET = Long.MIN_VALUE; -1 now
  correctly disables the timeout.

- Windows files.c read/write/append accepted negative lengths and
  silently overflowed DWORD. New clamp_len helper rejects < 0 and
  clamps to MAXDWORD per call.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Batch of PR-17 review-driven fixes touching the cursor SF engine and the
Files API. Each fix has a regression test (committed alongside in the
follow-up test commit; this commit is production-only so the tests in
that follow-up land green under git bisect).

M1 — MmapSegment.frameCount is now volatile. The producer thread writes
it in tryAppend without synchronizing on the ring monitor; the I/O
thread reads it from synchronized accessors (findSegmentContaining,
appendOrFsn). One-sided fencing left no happens-before for the read,
permitting a stale frameCount that would make findSegmentContaining
return null for a published FSN.

M2 — SegmentManager.serviceRing now installs the hot spare and commits
totalBytes += segmentSize atomically under the manager lock, gated on
the ring still being registered. The previous sequence (snapshot,
release lock, create+install spare, re-acquire lock, commit) leaked one
segment of accounting whenever deregister fired between snapshot and
commit: deregister subtracted ring.totalSegmentBytes() (without the
in-flight spare), then the commit added segmentSize with no future
subtractor. Stress reproduces the drift consistently.

M3 — CursorSendEngine.closed is now volatile and close() is
synchronized, so two racing closers cannot both pass the if-closed
gate and double-close the manager / ring / slotLock.

M4 — CursorWebSocketSendLoop.close() is now synchronized on the same
monitor as start(). A close() racing a slow start() previously could
read ioThread==null and skip the latch await while the I/O thread was
mid-sendBinary.

M5 — CursorSendEngine.close() now wraps manager/ring/unlink steps in
try/finally so slotLock.close() always runs. A throw from any earlier
step previously left the kernel-held flock outliving the engine; a
fresh sender for the same slot would then collide on a lock the dead
engine never released.

M6 — close() also unlinks segment files when publishedFsn < 0 (nothing
ever published), not only on full-drain. Without this a drainer
adopting an empty orphan slot — segments filtered as empty by recovery,
engine recreates a fresh sf-initial.sfa — left that fresh empty file
behind, the next scanner re-adopted, recovery unlinked, and the cycle
repeated indefinitely.

M7 — Files.findFirst() now returns -1L on opendir/readdir failure,
distinguishing an inaccessible directory from a (theoretically) empty
one. Native findFirst0 returns 0 in both cases — POSIX/Win32
directories that exist always contain ./.., so 0 in practice always
meant "could not enumerate". Recovery code paths
(SegmentRing.openExisting, OrphanScanner.scan / hasAnySegmentFile,
CursorSendEngine.unlinkAllSegmentFiles, SegmentManager.scanMaxGeneration)
now log WARN on a negative return and fall through, instead of
silently treating an inaccessible slot as empty and overlapping FSN 0
with on-disk segments the JVM could not enumerate.

M9 — strip stale references to deleted classes (SegmentLog,
SfDiskFullException, WebSocketSendQueue, InFlightWindow) from
javadoc/comments across MmapSegment, MmapSegmentException,
CursorSendEngine, CursorWebSocketSendLoop, SegmentManager. Also marks
MmapSegmentException final (consistency with other cursor classes).

M10 — BackgroundDrainerPool.snapshot() returns ObjList<BackgroundDrainer>
instead of java.util.ArrayList. QwpWebSocketSender.getBackgroundDrainers()
follows the same contract.

C3 — drop the dead connectionGeneration retry loop in
QwpWebSocketSender.flushPendingRows. Cursor frames are self-sufficient
(every frame carries full schema + full symbol-dict delta from id 0),
so encode unconditionally passes confirmedMaxId=-1 and useSchemaRef=false
and there is no longer a schema-race window to defend against. Removes
~80 lines: connectionGeneration / lastSeenGeneration fields,
MAX_SCHEMA_RACE_RETRIES, bumpConnectionGenerationForTest /
getConnectionGenerationForTest hooks, onWireReconnect callback, and the
ReconnectListener interface in CursorWebSocketSendLoop. BackgroundDrainer
no longer passes a no-op listener.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… tests

Test-only commit; production fixes are in the previous commit so the
new red-then-green tests below land green under git bisect.

assertMemoryLeak wrapping (C2). Every @test method across the SF cursor
test surface (15 files, 75 tests) is now wrapped in
TestUtils.assertMemoryLeak(() -> { ... }), so any native-memory leak
introduced anywhere in the cursor codepaths (mmap segment lifecycle,
slot lock fd, drainer pool, native Files calls) surfaces in CI instead
of accumulating silently. Test method signatures changed to declare
throws Exception where needed.

New regression tests:
  * MemoryOrderingFindingsTest — pins MmapSegment.frameCount and
    CursorSendEngine.closed as volatile via reflection. Reflection is
    used because x86's strong memory model masks plain-long staleness in
    practice; a stress test would be flaky, the modifier check is
    deterministic.
  * SegmentManagerTotalBytesRaceTest — concurrent stress with eight
    producer threads doing register/spin/deregister cycles while the
    manager's worker polls at 1us. Reads totalBytes via reflection
    after every ring is deregistered; the field must read 0 (no
    accounting drift). Produced 50-60 KB of drift on the failing run
    pre-fix.
  * EngineCloseSlotLockReleaseTest — opens an engine, reflectively
    nulls the ring field to inject an NPE on engine.close(), then
    asserts a fresh SlotLock.acquire on the same dir succeeds. Pre-fix
    the close-path NPE skipped slotLock.close() and the kernel-held
    flock blocked the second acquire.
  * EmptyOrphanSlotChurnTest — opens-and-closes a CursorSendEngine on a
    fresh slot without writing, asserts no .sfa file remains. Re-opens
    and asserts the same. Pre-fix sf-initial.sfa survived close because
    unlinkAllSegmentFiles was gated only on publishedFsn>=0.
  * FilesFindFirstErrorTest — pins the post-fix contract:
    Files.findFirst on a missing path must return a negative sentinel
    so callers can distinguish "opendir failed" from "directory empty".

Test cleanups:
  * Migrate ~25 test cleanup blocks from `if (find != 0)` /
    `if (find == 0)` to `if (find > 0)` / `if (find <= 0)` so the new
    Files.findFirst -1L sentinel is handled correctly and findClose is
    never called on -1.
  * BackgroundDrainerPoolRaceTest switches its leak check from
    java.util.List.contains to indexed ObjList.getQuick iteration to
    match the new BackgroundDrainerPool.snapshot() contract.
  * ReconnectTest class doc — drop the connectionGeneration mention
    (refs were removed alongside the dead retry loop).
  * Delete ConnectionGenerationTest — every hook it tested
    (bumpConnectionGenerationForTest, getConnectionGenerationForTest,
    onWireReconnect bump on disk recovery) was removed alongside the
    dead retry loop in the previous commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Critical:
- C1: SegmentRing.openExisting quarantines corrupt-first-frame .sfa
  files to <path>.corrupt instead of unlinking, preventing silent
  loss of valid frames behind a bit-flipped frame[0] CRC.
- C2: clamp wireSeq in CursorWebSocketSendLoop.handleServerRejection
  against nextWireSeq-1 (matches OK-ACK branch); SegmentRing.acknowledge
  also clamps at publishedFsn for defense-in-depth.
- C4: recordFatal now runs before dispatchError at all four HALT sites
  in CursorWebSocketSendLoop so the typed terminal error is latched
  before the user handler is invoked.
- C5: MmapSegment.create removes the on-disk file on mmap-fail catch;
  SegmentManager.serviceRing cleanup removes the path even when the
  spare is null.
- C7: remove stray QWP_CLIENT_REVIEW.md (review notes for vi_egress).
- C8: Sender.build wraps startOrphanDrainers in its own try/catch that
  closes the connected sender on failure; the outer catch (which
  closes cursorEngine directly) only fires for the pre-connect window
  before ownership transfer.

Moderate:
- M1: MmapSegment.openExisting validates baseSeq >= 0 and throws
  MmapSegmentException so SegmentRing's skip-with-log handles it.
- M2: SegmentRing.openExisting catches Throwable per-file and wraps
  the whole recovery body in an outer catch that closes every
  recovered segment on rethrow, plugging fd+mmap leaks.
- M3: POSIX read/write/append reject negative len with EINVAL,
  matching the existing Win32 guard.
- M4: crc32c.c _Static_assert on __BYTE_ORDER__ so big-endian builds
  fail loudly rather than silently miscompiling slice-by-8.

Tests:
- C9: ServerErrorAckTerminalTest and IoThreadErrorSurfacedOnRowApiTest
  flipped from STATUS_SCHEMA_MISMATCH (DROP) to STATUS_PARSE_ERROR
  (HALT) — the terminal-throw contract being asserted is the HALT
  contract per spec. New testDropPolicyNackDoesNotHaltAndAdvancesAck
  pins the DROP_AND_CONTINUE contract.
- C10: MmapSegmentTest.testFirstFrameCrcCorruptionFlagsTornTailAnd
  PreservesFile covers the unit-level contract for corrupt frame[0].
- C11: PrReviewRedTestsE2e adds end-to-end coverage for the central
  user-visible error API contract — flush() after a HALT NACK throws
  LineSenderServerException carrying the typed SenderError.
- SegmentRingTest.testAcknowledgeIsMonotonic publishes frames before
  acking to reflect the new clamp-at-publishedFsn contract.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three small changes to QwpWebSocketSender that close a silent-loss
window in the async client:

1. close() now captures the first Throwable raised during the
   flush/drain path, finishes shutdown of the I/O thread, socket
   and microbatch buffers, and then rethrows. Previously a HALT
   policy SenderError surfaced from drainOnClose was swallowed
   into a LOG.error(), so a user who only ever called close()
   (no flush() afterwards) had no way to observe the failure and
   no signal that data was lost. A static rethrowTerminal helper
   wraps any non-RuntimeException Throwable in a LineSenderException
   so close() keeps its existing throws contract.

2. drainOnClose now throws a LineSenderException on timeout
   carrying publishedFsn, ackedFsn, and the count of unacked
   batches, instead of logging a WARN and returning silently.
   The exception propagates through close() via mechanism (1)
   above. SF-mode users can recover the unacked tail by reopening
   on the same SF directory; memory-mode users have no recovery
   path and must treat this as fatal.

3. Adds two accessors so tests and user code can observe
   storage-drained progress without polling internal state:
     - getAckedFsn() returns the highest server-acknowledged FSN
       (or -1 before the I/O loop has started).
     - awaitAckedFsn(targetFsn, timeoutMillis) blocks until
       acked >= target or the timeout elapses, returning a
       boolean and surfacing latched I/O errors via
       cursorSendLoop.checkError(). Pair with
       flushAndGetSequence() to wait for a specific publish
       to land on disk server-side.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
initial_connect_retry already had a SYNC mode that retried on the user
thread up to reconnect_max_duration_millis, but that contradicts SF's
core promise of decoupling the producer from network state: at startup
the application thread is blocked on a wire that may never come up.

This change extends initial_connect_retry with a third value, async,
which:

- Returns from Sender.fromConfig immediately. The producer thread can
  call at()/atNow()/flush() right away; rows accumulate in the cursor
  SF engine while the I/O thread runs the connect retry loop in the
  background.
- Reuses the existing per-outage retry/backoff/cap/auth-terminal
  machinery from CursorWebSocketSendLoop. fail() is refactored into a
  shared connectLoop(initial, phase) helper used by both the in-flight
  reconnect path (phase="reconnect") and the new attemptInitialConnect
  path (phase="initial connect"). The I/O loop's first iteration now
  drives initial connect when the constructor was handed a null client.
- Surfaces terminal failures (auth/upgrade reject, budget exhaustion)
  through the existing SenderError dispatcher rather than throwing
  from the constructor. close() still rethrows the latched terminal so
  users without a custom error_handler still see the failure.

The new InitialConnectMode enum (OFF, SYNC, ASYNC) replaces the
internal boolean. The legacy initialConnectRetry(boolean) builder
method is preserved as a back-compat shim that maps false to OFF and
true to SYNC, and the config string parser accepts off/false, on/true/
sync, and async as values for initial_connect_retry.

Connectivity classification:

When the connect-retry budget exhausts, the SenderError now tags the
failure by what was actually observed on the wire so users can tell a
config typo apart from a transient blip:

- never-connected-budget-exhausted: ... -- never reached the server
  (check addr/port/firewall) -- when the I/O loop has not once
  installed a live, upgraded WebSocket. Most likely a typo, wrong
  port, firewall block, or server not yet deployed.
- connection-lost-budget-exhausted: ... -- server unreachable since
  last connect (transient) -- when the loop did connect at least
  once and the wire dropped after.

A sticky volatile boolean hasEverConnected on the I/O loop tracks the
distinction; CursorWebSocketSendLoop.hasEverConnected() and
QwpWebSocketSender.wasEverConnected() expose it for handlers that want
to branch programmatically without parsing the message string. The
flag is set in the constructor for SYNC/OFF modes (which are handed a
live client) and inside swapClient on every successful connect for
ASYNC and reconnect paths.

Auth/upgrade rejects keep the existing ws-upgrade-failed message and
SECURITY_ERROR category -- the failure cause is already self-describing
and disambiguation is unnecessary.

Tests:

InitialConnectAsyncTest covers six cases:
- fromConfig returns immediately in async mode with no server reachable;
- buffered rows are delivered once a late-arriving server starts;
- never-connected budget exhaustion uses the never-connected tag and
  wasEverConnected() returns false;
- 401 upgrade reject delivers SECURITY_ERROR via the inbox short of the
  cap;
- connect-then-disconnect budget exhaustion uses the connection-lost
  tag and wasEverConnected() remains true;
- OFF/SYNC modes report wasEverConnected()==true the moment the sender
  is visible to the caller.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
When this client repo's branch has a corresponding branch with the same
name in questdb/questdb, clone that branch instead of master. Lets paired
client+server changes be exercised by CI in lockstep — previously this
PR's tests against master failed to compile because they relied on a
QwpWebSocketSender.connect signature that landed in this branch but not
on master.

Falls back to master when no matching branch exists. No override knob
for now — collisions on generic branch names haven't happened in
practice and we can add a `[ci master]` opt-out later if needed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Six tests were red after recent source changes; updates them to match
the new behavior:

CloseDrainTest.testCloseDrainTimesOutWhenAcksNeverArrive
ServerErrorAckTerminalTest.testServerErrorAckIsTerminalAndDoesNotBurn...
ReconnectTest.testTerminalUpgradeErrorAbortsReconnect
ReconnectTest.testReconnectGivesUpAfterCap
  close() now rethrows latched terminal errors (commit 052f6ee). The
  drain-timeout test now asserts on the throw + elapsed time. The other
  three replace try-with-resources with try/finally that swallows the
  expected close-time rethrow — the terminal error has already been
  observed and asserted on inside the test body.

EngineCloseSlotLockReleaseTest.testSlotLockReleasedEvenIfRingCloseThrows
  Sabotage (nulling the engine's `ring` field) was orphaning the 4MB
  segment + manager worker thread, tripping assertMemoryLeak (commit
  05c3829). Capture ring + manager refs before the sabotage and release
  them after engine.close() throws. Slot-lock semantics being tested
  are unchanged.

OrphanScanIntegrationTest.testScanFindsOrphanFromPriorSenderUnderSameG...
  drain_orphans=true now actually drains the orphan via the background
  drainer pool (commit c25773f), so the post-Phase-2 scan sees zero
  slots, not one. Updated assertion + comment.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three review findings on the close-rethrow contract introduced by
052f6ee, plus one concurrency tightening on the SF segment manager.

QwpWebSocketSender.close():
- Every cleanup step (encoder, table buffers, buffer0/buffer1, client,
  cursorEngine, errorDispatcher, cursorSendLoop, drainerPool) now runs
  in its own try/catch. A new captureCloseError(terminalError, t)
  helper promotes the first error and addSuppressed-chains every
  subsequent one. Previously, a throw from any of the unguarded
  cleanups (encoder, buffers, client) would skip rethrowTerminal and
  silently lose the captured drain-time error -- defeating the whole
  point of the rethrow contract.

QwpWebSocketSender.awaitAckedFsn():
- cursorSendLoop.checkError() and checkConnectionError() now run
  before the ackedFsn() >= targetFsn and timeoutMillis <= 0 early
  returns. A caller polling with timeoutMillis=0 to drive its own
  retry loop now observes the latched terminal throw immediately
  instead of an indefinite "not yet" stream.

QwpWebSocketSender.rethrowTerminal():
- The wrap path for non-RuntimeException, non-Error throwables now
  passes the original throwable as the cause of LineSenderException,
  preserving stack trace and chained causes. Message text uses
  t.getMessage() instead of t.toString().

SegmentManager:
- close() is now synchronized so it cannot interleave with the
  already-synchronized start(); workerThread is now volatile so the
  unsynchronized read in wakeWorker() sees the latest assignment.
  Removes a cross-thread visibility ambiguity between start() and
  close() under arbitrary call ordering.
BackgroundDrainerPool.close() called requestStop() before shutdown(),
forcing in-flight drainers to exit as STOPPED instead of SUCCESS. Their
engine.close() then saw fullyDrained=false and skipped the .sfa cleanup,
so drain_orphans=true left orphan slots on disk on macOS where the race
went the other way. Now we shutdown() first and let drainers complete
naturally within a grace window; requestStop() is the fallback.

SlotLock kept the holder's PID inside the .lock file itself. Windows'
LockFileEx is a mandatory whole-file lock, so a contender reading the
PID got 0 bytes and surfaced "holder=unknown" instead of "pid=<n>".
Move the PID to a sibling .lock.pid sidecar that nobody locks — the
diagnostic now works uniformly on POSIX and Windows.

CursorWebSocketSendLoop.recordFatal() logged the full throwable for
server-side rejects like PARSE_ERROR. The structured info is already
in the message and the dispatcher's one-line log; suppress the stack
trace for serverError != null and keep it for genuine client-side
failures.
@mtopolnik
Copy link
Copy Markdown
Contributor

[PR Coverage check]

😍 pass : 1712 / 2126 (80.53%)

file detail

path covered line new line coverage
🔵 io/questdb/client/std/DefaultFilesFacade.java 0 24 00.00%
🔵 io/questdb/client/std/FilesFacade.java 0 1 00.00%
🔵 io/questdb/client/cutlass/qwp/client/sf/cursor/MmapSegmentException.java 2 4 50.00%
🔵 io/questdb/client/cutlass/qwp/client/sf/cursor/BackgroundDrainer.java 60 97 61.86%
🔵 io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java 171 263 65.02%
🔵 io/questdb/client/cutlass/qwp/client/sf/cursor/BackgroundDrainerPool.java 37 56 66.07%
🔵 io/questdb/client/cutlass/qwp/client/sf/cursor/CursorSendEngine.java 111 149 74.50%
🔵 io/questdb/client/Sender.java 227 295 76.95%
🔵 io/questdb/client/cutlass/qwp/client/sf/cursor/CursorWebSocketSendLoop.java 328 379 86.54%
🔵 io/questdb/client/cutlass/qwp/client/sf/cursor/MmapSegment.java 136 156 87.18%
🔵 io/questdb/client/cutlass/qwp/client/sf/cursor/OrphanScanner.java 52 59 88.14%
🔵 io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java 139 156 89.10%
🔵 io/questdb/client/cutlass/qwp/client/sf/cursor/SlotLock.java 56 62 90.32%
🔵 io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentRing.java 189 209 90.43%
🔵 io/questdb/client/std/Files.java 63 69 91.30%
🔵 io/questdb/client/cutlass/qwp/client/sf/cursor/SenderErrorDispatcher.java 70 76 92.11%
🔵 io/questdb/client/LineSenderServerException.java 19 19 100.00%
🔵 io/questdb/client/std/Crc32c.java 2 2 100.00%
🔵 io/questdb/client/cutlass/qwp/client/sf/cursor/DefaultSenderErrorHandler.java 16 16 100.00%
🔵 io/questdb/client/SenderError.java 34 34 100.00%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants