From 9b0f2a338570fcbdce1491bbd0083c95663b64b5 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Wed, 17 Jun 2026 08:59:04 +0200 Subject: [PATCH] Fix intermittent read hangs on reused HTTP/2 and pooled HTTP/1.1 connections HTTP/2: a response that signals END_STREAM with a trailing HEADERS frame (trailers, or an empty trailing HEADERS, as proxies emit for no-content-length streamed bodies) left the body reader parked forever. The h2 lib delivers it as a {trailers,...} event with no terminal DATA frame, and handle_h2_event ignored it, so the read blocked until the connection died (~30s). Route trailers to the END_STREAM path so the read completes on fresh and reused connections. Add a per-stream recv_timeout watchdog for sync h2 reads, which previously parked on an infinity gen_statem call with no timer, so any other lost-frame condition fails fast with {error, timeout}. HTTP/1.1: at checkout, refuse to reuse a pooled connection that received unsolicited data while idle instead of discarding the bytes. hackney does not pipeline, so idle data cannot belong to the next response; reusing the socket stranded or corrupted the next read. Drain the mailbox and peek the socket buffer, and drop the connection on any pending data, close, or error. Healthy idle connections still reuse normally, preserving keep-alive and the #544 stale-connection detection. --- src/hackney_conn.erl | 164 +++++++++++++++++++++----- test/hackney_http2_trailers_tests.erl | 132 +++++++++++++++++++++ test/hackney_pool_tests.erl | 39 ++++++ 3 files changed, 303 insertions(+), 32 deletions(-) create mode 100644 test/hackney_http2_trailers_tests.erl diff --git a/src/hackney_conn.erl b/src/hackney_conn.erl index e951a444..15dab47d 100644 --- a/src/hackney_conn.erl +++ b/src/hackney_conn.erl @@ -204,6 +204,11 @@ h2_streams = #{} :: #{pos_integer() => {term(), tuple()}}, %% Current HTTP/2 stream ID for streaming body mode (body = stream) h2_stream_id :: pos_integer() | undefined, + %% Per-stream recv_timeout watchdog timers (sync one-shot reads): + %% StreamId => timer ref. Fires {timeout, TRef, {h2_recv_timeout, StreamId}} + %% if no progress within recv_timeout, so a lost frame fails fast instead of + %% blocking until the connection dies. Re-armed on each DATA frame. + h2_timers = #{} :: #{pos_integer() => reference()}, %% HTTP/3 support (QUIC) %% HTTP/3 connection reference from hackney_h3 @@ -834,20 +839,22 @@ connected({call, From}, is_ready, #conn_data{socket = undefined} = Data) -> %% Socket not connected {next_state, closed, Data, [{reply, From, {ok, closed}}]}; connected({call, From}, is_ready, #conn_data{transport = Transport, socket = Socket} = Data) -> - %% Check for pending close message first (from active mode) - case has_pending_close(Socket) of + %% Stop active-mode delivery before reconciling the socket for checkout, so + %% no further {tcp,_}/{ssl,_} messages can land after we inspect it. + _ = Transport:setopts(Socket, [{active, false}]), + %% A pooled connection is only reusable if nothing arrived while it was idle. + %% #544: a server-initiated close (tcp_closed/ssl_closed) means drop it. + %% Unsolicited data is just as disqualifying: hackney does not pipeline, so + %% bytes that arrived while idle cannot belong to the next response. Reusing + %% such a socket would strand them (passive recv blocks on an empty buffer) + %% or corrupt the next read. Drop it and let the pool dial a fresh one. + case has_pending_close(Socket) orelse has_pending_data(Transport, Socket) of true -> - %% Server closed the connection - transition to closed state - {next_state, closed, Data#conn_data{socket = undefined}, [{reply, From, {ok, closed}}]}; + {next_state, closed, Data#conn_data{socket = undefined}, + [{reply, From, {ok, closed}}]}; false -> - %% No close message pending - check socket health case check_socket_health(Transport, Socket) of ok -> - %% Socket is healthy - set to passive mode for checkout - %% This ensures the socket is ready for blocking recv operations - _ = Transport:setopts(Socket, [{active, false}]), - %% Flush any data messages that arrived while in active mode - flush_socket_messages(Socket), {keep_state_and_data, [{reply, From, {ok, connected}}]}; {error, _} -> {keep_state_and_data, [{reply, From, {ok, closed}}]} @@ -1044,6 +1051,9 @@ connected({call, From}, {send_headers, Method, Path, Headers}, Data) -> %% HTTP/2 owner messages from h2 library connected(info, {h2, H2Conn, Event}, #conn_data{h2_conn = H2Conn} = Data) -> handle_h2_event(Event, Data); +%% HTTP/2 per-stream recv_timeout watchdog (see arm_h2_timer/2). +connected(info, {timeout, TRef, {h2_recv_timeout, StreamId}}, Data) -> + handle_h2_recv_timeout(StreamId, TRef, Data); %% h2_connection is linked via start_link; trap_exit surfaces its termination %% as an 'EXIT' signal. Convert to the same cleanup path as the monitor DOWN. connected(info, {'EXIT', H2Conn, Reason}, #conn_data{h2_conn = H2Conn} = Data) -> @@ -1340,6 +1350,8 @@ streaming_body(info, {ssl_closed, Socket}, #conn_data{socket = Socket} = Data) - %% can surface it. Mirrors the connected-state h2 handlers. streaming_body(info, {h2, H2Conn, Event}, #conn_data{h2_conn = H2Conn} = Data) -> handle_h2_event(Event, Data); +streaming_body(info, {timeout, TRef, {h2_recv_timeout, StreamId}}, Data) -> + handle_h2_recv_timeout(StreamId, TRef, Data); streaming_body(info, {'EXIT', H2Conn, Reason}, #conn_data{h2_conn = H2Conn} = Data) -> h2_on_closed(Reason, Data#conn_data{h2_conn = undefined, h2_mon = undefined}); streaming_body(info, {'DOWN', Mon, process, _Pid, Reason}, #conn_data{h2_mon = Mon} = Data) -> @@ -2383,15 +2395,30 @@ has_pending_close(Socket) -> false end. -%% @private Flush any socket data messages that arrived while in active mode -%% This is called after setting the socket to passive mode. -%% Note: Close messages are checked separately via has_pending_close/1. -flush_socket_messages(Socket) -> +%% @private Detect unsolicited data that arrived while the socket was idle in +%% active mode. Returns true when any data is pending — such a connection is not +%% safe to reuse (hackney does not pipeline, so the bytes cannot belong to the +%% next response). Drains the mailbox so a dropped connection leaves nothing +%% behind, and peeks the socket buffer to close the active->passive race where a +%% {tcp,_}/{ssl,_} message has not yet landed. Must run after the socket is set +%% passive. Close messages are checked separately via has_pending_close/1. +has_pending_data(Transport, Socket) -> + HadMailbox = drain_socket_mailbox(Socket), + HadBuffer = case Transport:recv(Socket, 0, 0) of + {ok, _Bytes} -> true; %% bytes already buffered on the socket + {error, timeout} -> false; %% nothing pending - healthy idle socket + {error, _} -> true %% closed/other - not reusable + end, + HadMailbox orelse HadBuffer. + +%% @private Remove any {tcp/ssl, Socket, Data} messages from the mailbox, +%% returning true if at least one was present. +drain_socket_mailbox(Socket) -> receive - {tcp, Socket, _Data} -> flush_socket_messages(Socket); - {ssl, Socket, _Data} -> flush_socket_messages(Socket) + {tcp, Socket, _Data} -> _ = drain_socket_mailbox(Socket), true; + {ssl, Socket, _Data} -> _ = drain_socket_mailbox(Socket), true after 0 -> - ok + false end. %% @private Notify pool that connection is available for reuse (async) @@ -2670,6 +2697,65 @@ h2_start_failure(after_upgrade, From, Reason) -> close_h2(H2Conn) -> try h2_connection:close(H2Conn) catch _:_ -> ok end. +%% @private Arm a per-stream recv_timeout watchdog for a sync HTTP/2 read so a +%% lost frame fails fast with {error, timeout} instead of blocking until the +%% connection dies. No-op when recv_timeout is infinity. +arm_h2_timer(StreamId, #conn_data{recv_timeout = Timeout, h2_timers = Timers} = Data) -> + case Timeout of + infinity -> + Data; + _ -> + TRef = erlang:start_timer(Timeout, self(), {h2_recv_timeout, StreamId}), + Data#conn_data{h2_timers = maps:put(StreamId, TRef, Timers)} + end. + +%% @private Cancel and forget a stream's recv_timeout watchdog, if any. +cancel_h2_timer(StreamId, #conn_data{h2_timers = Timers} = Data) -> + case maps:take(StreamId, Timers) of + {TRef, Timers2} -> + _ = erlang:cancel_timer(TRef), + Data#conn_data{h2_timers = Timers2}; + error -> + Data + end. + +%% @private Reset a stream's recv_timeout watchdog after progress (headers or a +%% DATA frame). Keeps the deadline relative to the last byte received, matching +%% HTTP/1.1 per-recv timeout semantics. No-op for streams without a timer. +rearm_h2_timer(StreamId, #conn_data{h2_timers = Timers} = Data) -> + case maps:is_key(StreamId, Timers) of + true -> arm_h2_timer(StreamId, cancel_h2_timer(StreamId, Data)); + false -> Data + end. + +%% @private Cancel every outstanding recv_timeout watchdog (connection gone). +cancel_all_h2_timers(#conn_data{h2_timers = Timers} = Data) -> + _ = maps:fold(fun(_StreamId, TRef, _Acc) -> erlang:cancel_timer(TRef) end, + ok, Timers), + Data#conn_data{h2_timers = #{}}. + +%% @private A recv_timeout watchdog fired: if it is still the live timer for the +%% stream and a sync reader is parked, fail that reader and drop the stream. +%% A stale timer (re-armed or already completed) is ignored. +handle_h2_recv_timeout(StreamId, TRef, + #conn_data{h2_streams = Streams, h2_timers = Timers} = Data) -> + case maps:get(StreamId, Timers, undefined) of + TRef -> + Timers2 = maps:remove(StreamId, Timers), + case maps:get(StreamId, Streams, undefined) of + {From, Inner} when is_tuple(Inner), element(1, Inner) =:= sync -> + Streams2 = maps:remove(StreamId, Streams), + {keep_state, + Data#conn_data{h2_streams = Streams2, h2_timers = Timers2, + request_from = undefined}, + [{reply, From, {error, timeout}}]}; + _ -> + {keep_state, Data#conn_data{h2_timers = Timers2}} + end; + _ -> + {keep_state, Data} + end. + %% @private Send an HTTP/2 request via the h2 library. do_h2_request(From, Method, Path, Headers, Body, Data) -> do_h2_send(From, Method, Path, Headers, Body, @@ -2723,7 +2809,9 @@ do_h2_send(From, Method, Path, Headers, Body, StreamState, Mode, Data) -> }, NewData = case Mode of sync -> - NewData0#conn_data{request_from = From}; + %% Watchdog the response so a lost frame fails fast rather + %% than blocking on the infinity gen_statem:call. + arm_h2_timer(StreamId, NewData0#conn_data{request_from = From}); {async, Ref, StreamTo, AsyncMode} -> NewData0#conn_data{ async = AsyncMode, @@ -2912,8 +3000,12 @@ handle_h2_event({response, StreamId, Status, Headers}, Data) -> h2_on_response(StreamId, Status, Headers, Data); handle_h2_event({data, StreamId, Body, EndStream}, Data) -> h2_on_data(StreamId, Body, EndStream, Data); -handle_h2_event({trailers, _StreamId, _Headers}, Data) -> - {keep_state, Data}; +handle_h2_event({trailers, StreamId, _Headers}, Data) -> + %% RFC 9113 §5.1: a trailing HEADERS frame carries END_STREAM. The h2 lib + %% delivers it as a {trailers,...} event with no terminal DATA frame, so + %% treat it as end-of-stream to release a reader parked on END_STREAM + %% (otherwise the body read hangs until the connection dies). + h2_on_data(StreamId, <<>>, true, Data); handle_h2_event({stream_reset, StreamId, ErrorCode}, Data) -> h2_on_stream_reset(StreamId, ErrorCode, Data); handle_h2_event({goaway, _LastStreamId, ErrorCode}, Data) -> @@ -2933,9 +3025,11 @@ h2_on_response(StreamId, Status, Headers, Data) -> Streams2 = maps:put(StreamId, {From, {sync, body, Status, Headers, <<>>}}, Streams), - {keep_state, Data#conn_data{h2_streams = Streams2, - status = Status, - response_headers = Headers}}; + Data2 = rearm_h2_timer(StreamId, + Data#conn_data{h2_streams = Streams2, + status = Status, + response_headers = Headers}), + {keep_state, Data2}; {StreamTo, {async, AsyncMode, StreamTo, Ref, waiting_headers}} -> StreamTo ! {hackney_response, Ref, {status, Status, <<>>}}, StreamTo ! {hackney_response, Ref, {headers, Headers}}, @@ -2976,15 +3070,18 @@ h2_on_data(StreamId, Body, EndStream, Data) -> case EndStream of true -> Streams2 = maps:remove(StreamId, Streams), - {keep_state, - Data#conn_data{h2_streams = Streams2, - request_from = undefined}, + Data2 = cancel_h2_timer(StreamId, + Data#conn_data{h2_streams = Streams2, + request_from = undefined}), + {keep_state, Data2, [{reply, From, {ok, Status, Headers, NewAcc}}]}; false -> Streams2 = maps:put(StreamId, {From, {sync, body, Status, Headers, NewAcc}}, Streams), - {keep_state, Data#conn_data{h2_streams = Streams2}} + Data2 = rearm_h2_timer(StreamId, + Data#conn_data{h2_streams = Streams2}), + {keep_state, Data2} end; {StreamTo, {async, AsyncMode, StreamTo, Ref, streaming, Status, Headers}} -> _ = case byte_size(Body) of @@ -3055,10 +3152,12 @@ h2_on_data(StreamId, Body, EndStream, Data) -> h2_on_stream_reset(StreamId, ErrorCode, Data) -> #conn_data{h2_streams = Streams} = Data, case maps:get(StreamId, Streams, undefined) of - {From, {sync, _}} -> + {From, Inner} when is_tuple(Inner), element(1, Inner) =:= sync -> Streams2 = maps:remove(StreamId, Streams), - {keep_state, - Data#conn_data{h2_streams = Streams2, request_from = undefined}, + Data2 = cancel_h2_timer(StreamId, + Data#conn_data{h2_streams = Streams2, + request_from = undefined}), + {keep_state, Data2, [{reply, From, {error, {stream_error, ErrorCode}}}]}; {StreamTo, {async, _, StreamTo, Ref, _, _, _}} -> StreamTo ! {hackney_response, Ref, {error, {stream_error, ErrorCode}}}, @@ -3091,11 +3190,12 @@ h2_stream_parked_from(_) -> undefined. h2_on_goaway(ErrorCode, Data) -> {Replies, Data1} = collect_h2_aborts({goaway, ErrorCode}, Data), - {keep_state, Data1, Replies}. + {keep_state, cancel_all_h2_timers(Data1), Replies}. h2_on_closed(Reason, Data) -> {Replies, Data1} = collect_h2_aborts({closed, Reason}, Data), - Stripped = Data1#conn_data{h2_conn = undefined, h2_mon = undefined, + Data2 = cancel_all_h2_timers(Data1), + Stripped = Data2#conn_data{h2_conn = undefined, h2_mon = undefined, socket = undefined}, %% Transition to closed. For pooled conns, closed(enter,...) keeps the %% process alive for ?CLOSED_GRACE_MS so calls from workers that raced diff --git a/test/hackney_http2_trailers_tests.erl b/test/hackney_http2_trailers_tests.erl new file mode 100644 index 00000000..7caedd75 --- /dev/null +++ b/test/hackney_http2_trailers_tests.erl @@ -0,0 +1,132 @@ +%%% Tests for HTTP/2 responses that signal end-of-stream with a trailing +%%% HEADERS frame (trailers) instead of an END_STREAM DATA flag, and for the +%%% per-stream recv_timeout watchdog. +%%% +%%% Regression for the production hang: an h2 response with no content-length +%%% whose body is closed by trailers (what proxies/ALBs emit for streamed +%%% bodies) left the body reader parked forever, because hackney ignored the +%%% {trailers,...} event and waited for an END_STREAM DATA frame that never +%%% came. The reader must complete as soon as the peer signals end-of-stream. +-module(hackney_http2_trailers_tests). + +-include_lib("eunit/include/eunit.hrl"). + +-define(BODY, (binary:copy(<<"x">>, 14000))). + +cert_dir() -> + BeamDir = filename:dirname(code:which(?MODULE)), + Root = filename:join([BeamDir, "..", "..", "..", "..", ".."]), + filename:join([filename:absname(Root), "test", "certs"]). + +%%==================================================================== +%% Fixture +%%==================================================================== + +http2_trailers_test_() -> + {setup, + fun setup/0, + fun cleanup/1, + fun(Ctx) -> + [{"body closed by trailers completes promptly", + {timeout, 30, fun() -> t_trailered_response(Ctx) end}}, + {"trailered + END_STREAM-on-DATA both work across a reused pooled conn", + {timeout, 60, fun() -> t_pooled_reuse(Ctx) end}}, + {"a never-ending response fails fast on recv_timeout", + {timeout, 30, fun() -> t_recv_timeout_safety_net(Ctx) end}}] + end}. + +setup() -> + _ = application:ensure_all_started(hackney), + _ = application:ensure_all_started(h2), + Certs = cert_dir(), + {ok, Server} = h2:start_server(0, #{ + cert => filename:join(Certs, "server.pem"), + key => filename:join(Certs, "server.key"), + handler => fun server_handler/5, + settings => #{max_concurrent_streams => unlimited} + }), + Port = h2:server_port(Server), + #{server => Server, port => Port}. + +cleanup(#{server := Server}) -> + try h2:stop_server(Server) catch _:_ -> ok end, + ok. + +%%==================================================================== +%% Server handler +%%==================================================================== + +%% /trailered : body, then END_STREAM via a trailing HEADERS frame. +%% /normal : body with END_STREAM on the final DATA frame. +%% /stall : headers + a partial body, then never end the stream. +server_handler(Conn, Sid, _Method, Path, _Headers) -> + Json = [{<<"content-type">>, <<"application/json">>}], + case Path of + <<"/trailered">> -> + ok = h2:send_response(Conn, Sid, 200, Json), + ok = h2:send_data(Conn, Sid, ?BODY, false), + ok = h2:send_trailers(Conn, Sid, [{<<"x-trailer">>, <<"end">>}]); + <<"/normal">> -> + ok = h2:send_response(Conn, Sid, 200, Json), + ok = h2:send_data(Conn, Sid, ?BODY, true); + <<"/stall">> -> + ok = h2:send_response(Conn, Sid, 200, Json), + ok = h2:send_data(Conn, Sid, <<"partial">>, false), + %% Leave the stream open well past the client's recv_timeout. + timer:sleep(5000), + try h2:send_data(Conn, Sid, <<>>, true) catch _:_ -> ok end + end. + +%%==================================================================== +%% Tests +%%==================================================================== + +%% Bug 1a: a response closed by trailers must return the full body, not hang. +t_trailered_response(#{port := Port}) -> + {ok, 200, _Headers, Body} = + hackney:request(get, url(Port, <<"/trailered">>), [], <<>>, opts(false)), + ?assertEqual(?BODY, Body). + +%% Reuse the single pooled h2 connection for many sequential requests, +%% alternating trailered and END_STREAM-on-DATA responses; every read must +%% return the full body with no lost bytes and no recv_timeout. +t_pooled_reuse(#{port := Port}) -> + Pool = hackney_h2_trailers_pool, + _ = hackney_pool:start_pool(Pool, [{max_connections, 5}]), + try + lists:foreach(fun(N) -> + Path = case N rem 2 of + 0 -> <<"/trailered">>; + 1 -> <<"/normal">> + end, + {ok, 200, _H, Body} = + hackney:request(get, url(Port, Path), [], <<>>, opts(Pool)), + ?assertEqual(?BODY, Body) + end, lists:seq(1, 40)) + after + (try hackney_pool:stop_pool(Pool) catch _:_ -> ok end) + end. + +%% Bug 1b: with no trailers and no END_STREAM, the watchdog must surface +%% {error, timeout} quickly instead of blocking until the connection dies. +t_recv_timeout_safety_net(#{port := Port}) -> + Opts = [{pool, false}, {protocols, [http2]}, {recv_timeout, 500}, + {ssl_options, [{insecure, true}, {verify, verify_none}]}], + Started = erlang:monotonic_time(millisecond), + Result = hackney:request(get, url(Port, <<"/stall">>), [], <<>>, Opts), + Elapsed = erlang:monotonic_time(millisecond) - Started, + ?assertEqual({error, timeout}, Result), + ?assert(Elapsed < 3000). + +%%==================================================================== +%% Helpers +%%==================================================================== + +opts(Pool) -> + [{pool, Pool}, + {protocols, [http2]}, + {recv_timeout, 5000}, + {ssl_options, [{insecure, true}, {verify, verify_none}]}]. + +url(Port, Path) -> + iolist_to_binary([<<"https://localhost:">>, integer_to_list(Port), Path]). diff --git a/test/hackney_pool_tests.erl b/test/hackney_pool_tests.erl index 47300537..8a0b17f5 100644 --- a/test/hackney_pool_tests.erl +++ b/test/hackney_pool_tests.erl @@ -59,6 +59,8 @@ hackney_pool_integration_test_() -> {"queue timeout", {timeout, 120, fun test_queue_timeout/0}}, {"checkout timeout", {timeout, 120, fun test_checkout_timeout/0}}, {"server close detected when idle (issue #544)", {timeout, 30, fun test_server_close_detected/0}}, + {"unsolicited idle data refuses reuse; clean conn stays reusable", + {timeout, 30, fun test_idle_data_refuses_reuse/0}}, {"checkout survives a connection dying mid-liveness-check (PR #869)", {timeout, 30, fun test_checkout_survives_dying_connection/0}}, {"checkout_ssl without pooled conns returns needs_upgrade", @@ -825,3 +827,40 @@ test_server_close_detected() -> ?assertEqual(0, FreeCount2), ok = hackney_pool:stop_pool(test_pool_server_close). + +%% Bug 2: a pooled HTTP/1.1 connection that received unsolicited bytes while +%% idle must not be reused. hackney does not pipeline, so bytes arriving while +%% idle can never belong to the next response; reusing the socket would strand +%% them (passive recv blocks on an empty buffer) or corrupt the next read. A +%% clean idle connection is still reusable, so keep-alive is preserved. +test_idle_data_refuses_reuse() -> + Self = self(), + {ok, ListenSock} = gen_tcp:listen(0, [binary, {active, false}, {reuseaddr, true}]), + {ok, ServerPort} = inet:port(ListenSock), + ServerPid = spawn_link(fun() -> + {ok, ClientSock} = gen_tcp:accept(ListenSock, 5000), + Self ! {server, accepted}, + receive {write, Bytes} -> gen_tcp:send(ClientSock, Bytes) end, + receive stop -> ok end, + gen_tcp:close(ClientSock), + gen_tcp:close(ListenSock) + end), + + ok = hackney_pool:start_pool(test_pool_idle_data, [{pool_size, 5}]), + Opts = [{pool, test_pool_idle_data}], + {ok, _PoolInfo, ConnPid} = + hackney_pool:checkout("127.0.0.1", ServerPort, hackney_tcp, Opts), + receive {server, accepted} -> ok after 5000 -> error(timeout_accept) end, + + %% Clean connection: is_ready sets the socket passive and keeps it reusable. + ?assertEqual({ok, connected}, hackney_conn:is_ready(ConnPid)), + + %% Server pushes unsolicited bytes into the now-passive socket buffer. + ServerPid ! {write, <<"unsolicited">>}, + timer:sleep(50), + + %% The connection must now be refused rather than reused with stale bytes. + ?assertEqual({ok, closed}, hackney_conn:is_ready(ConnPid)), + + ServerPid ! stop, + ok = hackney_pool:stop_pool(test_pool_idle_data).