You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Streams HTTP request/response bodies through rivetkit instead of buffering them, so actors can serve and proxy long-lived/streaming responses (SSE, chunked, long-poll) without materializing the whole body.
napi: stream HTTP bodies through the TypeScript runtime
envoy-client: finalize HTTP stream safety
test: cover streaming requests and responses
Why
Consumers forwarding fetch Request/Response through an actor (e.g. the Flue Rivet target) currently must serialize the entire body to JSON, which breaks SSE/streaming (buffers until the stream closes) and mishandles null-body responses. End-to-end body streaming removes that limitation.
This is a substantial, well-structured PR that adds end-to-end HTTP body streaming through the actor stack. The protocol versioning approach, backward compat tests, and layered architecture follow project conventions well. Below are findings across the diff.
Protocol / BARE schema
Good: v5→v6 upgrade correctly promotes void abort variants to { reason: { kind: Unknown, detail: None } }. v6→v5 downgrade drops the reason entirely — appropriate lossy behavior, and the round-trip tests validate both directions.
#[allow(dead_code, unused_variables)] on v5_to_v6.rs and v6_to_v5.rs: These should be removed before merge. Scaffold lint suppressions that stay in production code hide real dead-code regressions in future changes.
pegboard-gateway2 / streaming lifecycle
Idle timeout vs. start timeout interaction: drain_http_response_stream arms an idle timer on the first response chunk, but before the first chunk arrives the only guard is gateway_response_start_timeout_ms. If a slow actor buffers its entire response body before sending a single chunk, the idle timer never ticks and the start timeout becomes the only backstop. Confirm that intent is correct (likely fine, but worth a comment).
Metrics gap — handle_streaming_request egress bytes: res.size_hint() is sampled before the channel is drained, so egress byte metrics are always 0 for ResponseBody::Channel. The byte count should be accumulated inside drain_http_response_stream and recorded after the channel closes.
advance_http_stream_message_index and gap detection:
The gap abort correctly sends InternalError upstream, but there is no log line on this path. A tracing::error! with expected, got, and actor_id would be invaluable when debugging mis-sequenced chunks in production.
envoy-client / ActorContext streaming
Early chunk race — two buffering layers: early_request_chunks buffers exist independently in both ActorContext (actor.rs) and EnvoyContext (envoy.rs). If ReqData chunks arrive before ReqStart at both levels simultaneously, the drain ordering is controlled by two separate draining loops. A brief comment explaining which layer "owns" the buffer and when the other becomes active would reduce the chance of a future mis-merge.
send_http_response_body_bytes — partial-chunk failure:
In the chunking loop, if a mid-iteration send_response_data_chunks call returns false (client disconnected), the loop breaks and send_http_request_abort + stop() are called — correct. However the return value false is returned to the caller, but the outer call site in gateway2 does not inspect this return on the final finish chunk send. Verify the finish=true send path is also checked.
Unbounded early_request_chunks growth: pending_request_chunks in EnvoyContext is a Vec<Vec<u8>> with no size cap. A misbehaving or slow actor that never sends ReqStart will accumulate incoming request chunks in memory indefinitely until the client disconnects. Consider bounding this buffer or applying a timeout after which the connection is aborted with BodyTooLarge.
rivetkit-napi bridge
HttpResponseBodyStream::write clone-vs-take: write clones the Sender out of the Option instead of taking it, while end/error call take_sender. This means calling write does not prevent a subsequent write after end — the second write will get Err from send (correct) but the error message is "channel closed" rather than the more useful "stream already ended". Low-severity UX issue.
Channel always created for non-streaming responses:
In call_request, the (body_tx, body_rx) channel is unconditionally allocated. For buffered responses (response.stream !== true), body_rx is dropped immediately. This is harmless but adds allocation overhead on every HTTP request. Consider gating channel creation behind if streams_response.
take_body_stream sync lock in async context: Request::take_body_stream calls try_lock() (sync) on a TokioMutex. This is fine today because it is called from the NAPI dispatch path before the async runtime runs handlers. If this is ever called from an async context, the sync try-lock will fail under contention. A comment noting the sync precondition would prevent accidental misuse.
TypeScript (native.ts) — streaming detection
toRuntimeHttpResponse streaming heuristic is fragile:
The code races secondRead against Promise.resolve({ kind: "pending" }) to infer whether more data is incoming. Because Promise.resolve resolves in the same microtask queue turn, the "pending" outcome is taken only when secondRead has not yet settled at that checkpoint — this is a timing-dependent heuristic that relies on Promise scheduling behavior. This could produce incorrect results (buffering a streaming response, or treating a single-chunk response as streaming) depending on the JavaScript engine's scheduling.
A more robust design: have the actor signal streaming intent explicitly (e.g., the actor sets response.stream = truebefore resolving the handler promise, and the NAPI bridge always checks this flag). The first chunk is then retrieved unconditionally, and the rest are pumped. This avoids the race entirely.
Test coverage
streaming-http.test.ts is native-only (runtimes: ["native"]). The wasm runtime path for streaming is not exercised. Consider adding a follow-up issue or a TODO comment.
204/HEAD drop behavior is tested in Rust (registry_http.rs) — good. No TypeScript-level test covers the case where an actor returns stream: true with status 204 and the stream silently closes. The actor would observe its end() call succeed with no error, but the client gets an empty body. A test documenting this behavior (even as a known limitation) would prevent future confusion.
Upload streaming chunk-count assertion (chunkCount >= 2) is correct but sensitive to chunk size tuning. If HTTP_BODY_MAX_CHUNK_SIZE is ever raised above 80 KiB, the test will pass with chunkCount === 1 and the assertion will silently pass without exercising the splitting logic. Assert the max chunk size directly: expect(Math.max(...sizes)).toBeLessThanOrEqual(HTTP_BODY_MAX_CHUNK_SIZE).
Minor / style
Log messages with structured fields follow tracing::info!(?field, "message") convention throughout — good.
The gateway_response_chunk_idle_timeout_ms default of 30 s matches a reasonable SSE keep-alive window. Document the unit in the config accessor comment (currently says "between chunks" — clarify it resets on each chunk arrival, not on total response duration).
Generated BARE TypeScript codecs (v6.ts) contain exhaustive switch statements without default fallthrough on the write* functions. This is consistent with existing generated codecs and is correct for enums, since the encoder controls the input.
Summary
The overall approach is sound. The main items to address before merge:
Remove #[allow(dead_code)] on the converter files.
Fix egress byte metrics for streaming responses (count inside drain_http_response_stream).
Cap or timeout pending_request_chunks to prevent unbounded memory growth.
Replace the Promise.resolve race in toRuntimeHttpResponse with an explicit stream: true flag from the actor.
Add a tracing::error! on the message-gap abort path.
Items 1–3 are correctness/safety concerns; items 4–5 are robustness. Everything else noted above is lower priority.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Streams HTTP request/response bodies through rivetkit instead of buffering them, so actors can serve and proxy long-lived/streaming responses (SSE, chunked, long-poll) without materializing the whole body.
Changes
Why
Consumers forwarding fetch Request/Response through an actor (e.g. the Flue Rivet target) currently must serialize the entire body to JSON, which breaks SSE/streaming (buffers until the stream closes) and mishandles null-body responses. End-to-end body streaming removes that limitation.
🤖 Generated with Claude Code