From 651714549eb2765114c44b238edb20dbed17e18a Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 3 Jun 2026 15:37:30 +0900 Subject: [PATCH] Fix HTTP/3 client request stream reuse --- ext/source/Ruby/Protocol/HTTP3/Client.cpp | 5 +- fixtures/protocol/http3/client_server.rb | 62 ++++++++++++++++++++++ fixtures/protocol/http3/test_client.rb | 35 +++++++----- fixtures/protocol/http3/test_dispatcher.rb | 1 + fixtures/protocol/http3/test_server.rb | 4 +- test/protocol/http3/client_server.rb | 8 +++ 6 files changed, 99 insertions(+), 16 deletions(-) diff --git a/ext/source/Ruby/Protocol/HTTP3/Client.cpp b/ext/source/Ruby/Protocol/HTTP3/Client.cpp index da5e024..80bd49f 100644 --- a/ext/source/Ruby/Protocol/HTTP3/Client.cpp +++ b/ext/source/Ruby/Protocol/HTTP3/Client.cpp @@ -254,10 +254,11 @@ namespace Ruby::Protocol::HTTP3 { void submit_request_with_body(::Protocol::QUIC::StreamID stream_id, const nghttp3_nv *headers, std::size_t count, VALUE body) { + auto stream = stream_for(stream_id); + if (NIL_P(body)) { - submit_request(stream_id, headers, count); + submit_request(stream_id, headers, count, nullptr, stream); } else { - auto stream = stream_for(stream_id); submit_request(stream_id, headers, count, stream->reader(), stream); } } diff --git a/fixtures/protocol/http3/client_server.rb b/fixtures/protocol/http3/client_server.rb index 5f8fe22..b52d021 100644 --- a/fixtures/protocol/http3/client_server.rb +++ b/fixtures/protocol/http3/client_server.rb @@ -142,4 +142,66 @@ def self.exchange(configuration: Protocol::QUIC::Configuration.new, request_body return result end + + def self.exchange_reusing_client(configuration: Protocol::QUIC::Configuration.new, count: 2) + server_socket = bound_socket(address) + local_address = server_socket.local_address + results = [] + + requests = Async::Queue.new + responses = Async::Queue.new + + dispatcher = Protocol::HTTP3::TestDispatcher.new(configuration, server_context) + dispatcher.requests = requests + dispatcher.server_options[:report_all_requests] = true + + Async do |task| + server_task = task.async do + loop do + dispatcher.receive(server_socket) + Async::Task.current.yield + end + end + server_task.transient = true + + client_socket = Protocol::QUIC::Socket.new(local_address.family, ::Socket::SOCK_DGRAM, ::Socket::IPPROTO_UDP) + client_socket.connect(local_address) + + client = Protocol::HTTP3::TestClient.new(configuration, client_context, client_socket, local_address, 1) + client.responses = responses + client.close_after_response = false + + client_task = task.async do + client.send_packets + + loop do + break if client.receive(client_socket) == false + Async::Task.current.yield + end + end + client_task.transient = true + + task.with_timeout(10) do + count.times do |index| + request = requests.dequeue + response = responses.dequeue + + results << { + request_headers: request[:headers], + request_body: request[:body], + response_headers: response[:headers], + response_body: response[:body], + } + + client.submit_test_request("/#{index + 1}") if index < count - 1 + end + end + ensure + client_task&.stop + server_task&.stop + client&.close unless client_task&.failed? + end.wait + + return results + end end diff --git a/fixtures/protocol/http3/test_client.rb b/fixtures/protocol/http3/test_client.rb index 9877006..45c1a7e 100644 --- a/fixtures/protocol/http3/test_client.rb +++ b/fixtures/protocol/http3/test_client.rb @@ -12,20 +12,15 @@ def initialize(...) @response_headers = Hash.new{|hash, key| hash[key] = []} @response_body = Hash.new{|hash, key| hash[key] = String.new} @body_tasks = [] + @close_after_response = true end attr_accessor :responses, :reported_response attr_accessor :request_body + attr_accessor :close_after_response def handshake_completed - stream = submit_request([ - [":method", "GET"], - [":scheme", "https"], - [":authority", "localhost"], - [":path", "/"], - ], request_body) - - write_body(stream, request_body) if request_body + submit_test_request("/") end def header_received(stream_id, name, value) @@ -35,9 +30,10 @@ def header_received(stream_id, name, value) def headers_finished(stream_id, is_final) return unless is_final - report_response(stream_id) - - close + if close_after_response + report_response(stream_id) + close + end end def data_received(stream_id, chunk) @@ -46,11 +42,11 @@ def data_received(stream_id, chunk) def stream_finished(stream_id) report_response(stream_id) - close + close if close_after_response end def report_response(stream_id) - return if reported_response + return if reported_response && close_after_response self.reported_response = true responses&.push({ @@ -64,5 +60,18 @@ def write_body(stream, body, parent: Async::Task.current) stream.write_body(body) end end + + def submit_test_request(path, body = request_body) + stream = submit_request([ + [":method", "GET"], + [":scheme", "https"], + [":authority", "localhost"], + [":path", path], + ], body) + + write_body(stream, body) if body + + return stream + end end end diff --git a/fixtures/protocol/http3/test_dispatcher.rb b/fixtures/protocol/http3/test_dispatcher.rb index 9f9f5cd..fea76ae 100644 --- a/fixtures/protocol/http3/test_dispatcher.rb +++ b/fixtures/protocol/http3/test_dispatcher.rb @@ -20,6 +20,7 @@ def create_server(socket, address, packet_header) server = TestServer.new(self, configuration, tls_context, socket, address, packet_header, nil) server.requests = requests server.response_body = server_options[:response_body] + server.report_all_requests = server_options[:report_all_requests] return server end end diff --git a/fixtures/protocol/http3/test_server.rb b/fixtures/protocol/http3/test_server.rb index e9fe68d..56adb0e 100644 --- a/fixtures/protocol/http3/test_server.rb +++ b/fixtures/protocol/http3/test_server.rb @@ -13,10 +13,12 @@ def initialize(...) @request_body = Hash.new{|hash, key| hash[key] = String.new} @responded = {} @body_tasks = [] + @report_all_requests = false end attr_accessor :requests, :reported_request attr_accessor :response_body + attr_accessor :report_all_requests def header_received(stream_id, name, value) @request_headers[stream_id] << [name, value] @@ -41,7 +43,7 @@ def respond(stream_id) @responded[stream_id] = true - unless reported_request + if report_all_requests || !reported_request self.reported_request = true requests&.push({ headers: @request_headers[stream_id], diff --git a/test/protocol/http3/client_server.rb b/test/protocol/http3/client_server.rb index 1efb3c6..7a7b2d1 100644 --- a/test/protocol/http3/client_server.rb +++ b/test/protocol/http3/client_server.rb @@ -120,6 +120,14 @@ def send_packets expect(body.reads).to be == 4 end + it "can exchange sequential requests on one HTTP/3 connection" do + results = Protocol::HTTP3::Fixtures.exchange_reusing_client(count: 2) + + expect(results.size).to be == 2 + expect(results[0][:request_headers]).to be(:include?, [":path", "/"]) + expect(results[1][:request_headers]).to be(:include?, [":path", "/1"]) + end + it "duplicates output chunks before retaining them" do body = ReusedStringBody.new("Hello", " ", "World!") result = Protocol::HTTP3::Fixtures.exchange(response_body: body)