Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ext/source/Ruby/Protocol/HTTP3/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,11 @@ namespace Ruby::Protocol::HTTP3 {

void submit_request_with_body(::Protocol::QUIC::StreamID stream_id, const nghttp3_nv *headers, std::size_t count, VALUE body)
{
auto stream = stream_for(stream_id);

if (NIL_P(body)) {
submit_request(stream_id, headers, count);
submit_request(stream_id, headers, count, nullptr, stream);
} else {
auto stream = stream_for(stream_id);
submit_request(stream_id, headers, count, stream->reader(), stream);
}
}
Expand Down
62 changes: 62 additions & 0 deletions fixtures/protocol/http3/client_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,66 @@ def self.exchange(configuration: Protocol::QUIC::Configuration.new, request_body

return result
end

def self.exchange_reusing_client(configuration: Protocol::QUIC::Configuration.new, count: 2)
server_socket = bound_socket(address)
local_address = server_socket.local_address
results = []

requests = Async::Queue.new
responses = Async::Queue.new

dispatcher = Protocol::HTTP3::TestDispatcher.new(configuration, server_context)
dispatcher.requests = requests
dispatcher.server_options[:report_all_requests] = true

Async do |task|
server_task = task.async do
loop do
dispatcher.receive(server_socket)
Async::Task.current.yield
end
end
server_task.transient = true

client_socket = Protocol::QUIC::Socket.new(local_address.family, ::Socket::SOCK_DGRAM, ::Socket::IPPROTO_UDP)
client_socket.connect(local_address)

client = Protocol::HTTP3::TestClient.new(configuration, client_context, client_socket, local_address, 1)
client.responses = responses
client.close_after_response = false

client_task = task.async do
client.send_packets

loop do
break if client.receive(client_socket) == false
Async::Task.current.yield
end
end
client_task.transient = true

task.with_timeout(10) do
count.times do |index|
request = requests.dequeue
response = responses.dequeue

results << {
request_headers: request[:headers],
request_body: request[:body],
response_headers: response[:headers],
response_body: response[:body],
}

client.submit_test_request("/#{index + 1}") if index < count - 1
end
end
ensure
client_task&.stop
server_task&.stop
client&.close unless client_task&.failed?
end.wait

return results
end
end
35 changes: 22 additions & 13 deletions fixtures/protocol/http3/test_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,15 @@ def initialize(...)
@response_headers = Hash.new{|hash, key| hash[key] = []}
@response_body = Hash.new{|hash, key| hash[key] = String.new}
@body_tasks = []
@close_after_response = true
end

attr_accessor :responses, :reported_response
attr_accessor :request_body
attr_accessor :close_after_response

def handshake_completed
stream = submit_request([
[":method", "GET"],
[":scheme", "https"],
[":authority", "localhost"],
[":path", "/"],
], request_body)

write_body(stream, request_body) if request_body
submit_test_request("/")
end

def header_received(stream_id, name, value)
Expand All @@ -35,9 +30,10 @@ def header_received(stream_id, name, value)
def headers_finished(stream_id, is_final)
return unless is_final

report_response(stream_id)

close
if close_after_response
report_response(stream_id)
close
end
end

def data_received(stream_id, chunk)
Expand All @@ -46,11 +42,11 @@ def data_received(stream_id, chunk)

def stream_finished(stream_id)
report_response(stream_id)
close
close if close_after_response
end

def report_response(stream_id)
return if reported_response
return if reported_response && close_after_response

self.reported_response = true
responses&.push({
Expand All @@ -64,5 +60,18 @@ def write_body(stream, body, parent: Async::Task.current)
stream.write_body(body)
end
end

def submit_test_request(path, body = request_body)
stream = submit_request([
[":method", "GET"],
[":scheme", "https"],
[":authority", "localhost"],
[":path", path],
], body)

write_body(stream, body) if body

return stream
end
end
end
1 change: 1 addition & 0 deletions fixtures/protocol/http3/test_dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def create_server(socket, address, packet_header)
server = TestServer.new(self, configuration, tls_context, socket, address, packet_header, nil)
server.requests = requests
server.response_body = server_options[:response_body]
server.report_all_requests = server_options[:report_all_requests]
return server
end
end
Expand Down
4 changes: 3 additions & 1 deletion fixtures/protocol/http3/test_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ def initialize(...)
@request_body = Hash.new{|hash, key| hash[key] = String.new}
@responded = {}
@body_tasks = []
@report_all_requests = false
end

attr_accessor :requests, :reported_request
attr_accessor :response_body
attr_accessor :report_all_requests

def header_received(stream_id, name, value)
@request_headers[stream_id] << [name, value]
Expand All @@ -41,7 +43,7 @@ def respond(stream_id)

@responded[stream_id] = true

unless reported_request
if report_all_requests || !reported_request
self.reported_request = true
requests&.push({
headers: @request_headers[stream_id],
Expand Down
8 changes: 8 additions & 0 deletions test/protocol/http3/client_server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ def send_packets
expect(body.reads).to be == 4
end

it "can exchange sequential requests on one HTTP/3 connection" do
results = Protocol::HTTP3::Fixtures.exchange_reusing_client(count: 2)

expect(results.size).to be == 2
expect(results[0][:request_headers]).to be(:include?, [":path", "/"])
expect(results[1][:request_headers]).to be(:include?, [":path", "/1"])
end

it "duplicates output chunks before retaining them" do
body = ReusedStringBody.new("Hello", " ", "World!")
result = Protocol::HTTP3::Fixtures.exchange(response_body: body)
Expand Down
Loading