Skip to content

feat(http): stream HTTP bodies end-to-end#5305

Open
NathanFlurry wants to merge 8 commits into
mainfrom
http-body-streaming
Open

feat(http): stream HTTP bodies end-to-end#5305
NathanFlurry wants to merge 8 commits into
mainfrom
http-body-streaming

Conversation

@NathanFlurry

Copy link
Copy Markdown
Member

Streams HTTP request/response bodies through rivetkit instead of buffering them, so actors can serve and proxy long-lived/streaming responses (SSE, chunked, long-poll) without materializing the whole body.

Changes

  • protocol: add v6 HTTP abort reasons
  • http: shared streaming body primitives
  • gateway: stream HTTP bodies over the envoy tunnel
  • core: stream HTTP bodies through rivetkit core; surface streaming request bodies
  • napi: stream HTTP bodies through the TypeScript runtime
  • envoy-client: finalize HTTP stream safety
  • test: cover streaming requests and responses

Why

Consumers forwarding fetch Request/Response through an actor (e.g. the Flue Rivet target) currently must serialize the entire body to JSON, which breaks SSE/streaming (buffers until the stream closes) and mishandles null-body responses. End-to-end body streaming removes that limitation.

🤖 Generated with Claude Code

@railway-app

railway-app Bot commented Jun 18, 2026

Copy link
Copy Markdown

🚅 Deployed to the rivet-pr-5305 environment in rivet-frontend

Service Status Web Updated (UTC)
kitchen-sink 😴 Sleeping (View Logs) Web Jun 18, 2026 at 8:34 pm
website 😴 Sleeping (View Logs) Web Jun 18, 2026 at 8:33 pm
frontend-cloud 😴 Sleeping (View Logs) Web Jun 18, 2026 at 8:32 pm
frontend-inspector 😴 Sleeping (View Logs) Web Jun 18, 2026 at 8:29 pm
ladle ✅ Success (View Logs) Web Jun 18, 2026 at 8:23 pm
mcp-hub ✅ Success (View Logs) Web Jun 18, 2026 at 8:20 pm

@claude

claude Bot commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Code Review: feat(http): stream HTTP bodies end-to-end

This is a substantial, well-structured PR that adds end-to-end HTTP body streaming through the actor stack. The protocol versioning approach, backward compat tests, and layered architecture follow project conventions well. Below are findings across the diff.


Protocol / BARE schema

  • Good: v5→v6 upgrade correctly promotes void abort variants to { reason: { kind: Unknown, detail: None } }. v6→v5 downgrade drops the reason entirely — appropriate lossy behavior, and the round-trip tests validate both directions.
  • #[allow(dead_code, unused_variables)] on v5_to_v6.rs and v6_to_v5.rs: These should be removed before merge. Scaffold lint suppressions that stay in production code hide real dead-code regressions in future changes.

pegboard-gateway2 / streaming lifecycle

Idle timeout vs. start timeout interaction:
drain_http_response_stream arms an idle timer on the first response chunk, but before the first chunk arrives the only guard is gateway_response_start_timeout_ms. If a slow actor buffers its entire response body before sending a single chunk, the idle timer never ticks and the start timeout becomes the only backstop. Confirm that intent is correct (likely fine, but worth a comment).

Metrics gap — handle_streaming_request egress bytes:
res.size_hint() is sampled before the channel is drained, so egress byte metrics are always 0 for ResponseBody::Channel. The byte count should be accumulated inside drain_http_response_stream and recorded after the channel closes.

advance_http_stream_message_index and gap detection:
The gap abort correctly sends InternalError upstream, but there is no log line on this path. A tracing::error! with expected, got, and actor_id would be invaluable when debugging mis-sequenced chunks in production.


envoy-client / ActorContext streaming

Early chunk race — two buffering layers:
early_request_chunks buffers exist independently in both ActorContext (actor.rs) and EnvoyContext (envoy.rs). If ReqData chunks arrive before ReqStart at both levels simultaneously, the drain ordering is controlled by two separate draining loops. A brief comment explaining which layer "owns" the buffer and when the other becomes active would reduce the chance of a future mis-merge.

send_http_response_body_bytes — partial-chunk failure:
In the chunking loop, if a mid-iteration send_response_data_chunks call returns false (client disconnected), the loop breaks and send_http_request_abort + stop() are called — correct. However the return value false is returned to the caller, but the outer call site in gateway2 does not inspect this return on the final finish chunk send. Verify the finish=true send path is also checked.

Unbounded early_request_chunks growth:
pending_request_chunks in EnvoyContext is a Vec<Vec<u8>> with no size cap. A misbehaving or slow actor that never sends ReqStart will accumulate incoming request chunks in memory indefinitely until the client disconnects. Consider bounding this buffer or applying a timeout after which the connection is aborted with BodyTooLarge.


rivetkit-napi bridge

HttpResponseBodyStream::write clone-vs-take:
write clones the Sender out of the Option instead of taking it, while end/error call take_sender. This means calling write does not prevent a subsequent write after end — the second write will get Err from send (correct) but the error message is "channel closed" rather than the more useful "stream already ended". Low-severity UX issue.

Channel always created for non-streaming responses:
In call_request, the (body_tx, body_rx) channel is unconditionally allocated. For buffered responses (response.stream !== true), body_rx is dropped immediately. This is harmless but adds allocation overhead on every HTTP request. Consider gating channel creation behind if streams_response.

take_body_stream sync lock in async context:
Request::take_body_stream calls try_lock() (sync) on a TokioMutex. This is fine today because it is called from the NAPI dispatch path before the async runtime runs handlers. If this is ever called from an async context, the sync try-lock will fail under contention. A comment noting the sync precondition would prevent accidental misuse.


TypeScript (native.ts) — streaming detection

toRuntimeHttpResponse streaming heuristic is fragile:
The code races secondRead against Promise.resolve({ kind: "pending" }) to infer whether more data is incoming. Because Promise.resolve resolves in the same microtask queue turn, the "pending" outcome is taken only when secondRead has not yet settled at that checkpoint — this is a timing-dependent heuristic that relies on Promise scheduling behavior. This could produce incorrect results (buffering a streaming response, or treating a single-chunk response as streaming) depending on the JavaScript engine's scheduling.

A more robust design: have the actor signal streaming intent explicitly (e.g., the actor sets response.stream = true before resolving the handler promise, and the NAPI bridge always checks this flag). The first chunk is then retrieved unconditionally, and the rest are pumped. This avoids the race entirely.


Test coverage

  • streaming-http.test.ts is native-only (runtimes: ["native"]). The wasm runtime path for streaming is not exercised. Consider adding a follow-up issue or a TODO comment.
  • 204/HEAD drop behavior is tested in Rust (registry_http.rs) — good. No TypeScript-level test covers the case where an actor returns stream: true with status 204 and the stream silently closes. The actor would observe its end() call succeed with no error, but the client gets an empty body. A test documenting this behavior (even as a known limitation) would prevent future confusion.
  • Upload streaming chunk-count assertion (chunkCount >= 2) is correct but sensitive to chunk size tuning. If HTTP_BODY_MAX_CHUNK_SIZE is ever raised above 80 KiB, the test will pass with chunkCount === 1 and the assertion will silently pass without exercising the splitting logic. Assert the max chunk size directly: expect(Math.max(...sizes)).toBeLessThanOrEqual(HTTP_BODY_MAX_CHUNK_SIZE).

Minor / style

  • Log messages with structured fields follow tracing::info!(?field, "message") convention throughout — good.
  • The gateway_response_chunk_idle_timeout_ms default of 30 s matches a reasonable SSE keep-alive window. Document the unit in the config accessor comment (currently says "between chunks" — clarify it resets on each chunk arrival, not on total response duration).
  • Generated BARE TypeScript codecs (v6.ts) contain exhaustive switch statements without default fallthrough on the write* functions. This is consistent with existing generated codecs and is correct for enums, since the encoder controls the input.

Summary

The overall approach is sound. The main items to address before merge:

  1. Remove #[allow(dead_code)] on the converter files.
  2. Fix egress byte metrics for streaming responses (count inside drain_http_response_stream).
  3. Cap or timeout pending_request_chunks to prevent unbounded memory growth.
  4. Replace the Promise.resolve race in toRuntimeHttpResponse with an explicit stream: true flag from the actor.
  5. Add a tracing::error! on the message-gap abort path.

Items 1–3 are correctness/safety concerns; items 4–5 are robustness. Everything else noted above is lower priority.

🤖 Generated with Claude Code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant