Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
# NEWS

4.4.4 - 2026-06-17
------------------

### Fixed

- HTTP/2: a connection is no longer reused after the peer sends `GOAWAY` while
keeping the socket open (as AWS ALB does to recycle connections). The
connection is retired so the pool dials a fresh one, instead of being handed
out again with new streams the peer ignores until `recv_timeout`.
- HTTP/2: when the per-stream `recv_timeout` watchdog fires, the stalled stream
is cancelled (`RST_STREAM`) so the peer stops sending and the connection is not
reused with an orphaned stream.
- HTTP/1.1: bytes that issue #544's idle `{active, once}` delivers to the
connection mailbox on a reused connection are now buffered and fed to the next
request instead of dropping the connection (refines the 4.4.3 behavior below),
so a reused request no longer blocks to `recv_timeout` while the response sits
stranded as an unread message. The idle buffer is bounded, and a server close
still refuses reuse (#544).

4.4.3 - 2026-06-17
------------------

Expand Down
186 changes: 127 additions & 59 deletions src/hackney_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@
%% late-arriving calls race the pool DOWN cleanup and still get a proper
%% error reply instead of exit:{normal, _}. See issue #836.
-define(CLOSED_GRACE_MS, 50).
%% Cap on bytes buffered from an idle HTTP/1.1 connection via #544 {active,
%% once}. A well-behaved peer sends nothing while idle; the next response's
%% stranded prefix is small. Past this, treat the peer as misbehaving (flooding
%% an idle connection) and drop it rather than buffer unboundedly.
-define(MAX_IDLE_BUFFER, 65536).

%% State data record
-record(conn_data, {
Expand Down Expand Up @@ -838,24 +843,26 @@ connected({call, From}, verify_socket, #conn_data{transport = Transport, socket
connected({call, From}, is_ready, #conn_data{socket = undefined} = Data) ->
%% Socket not connected
{next_state, closed, Data, [{reply, From, {ok, closed}}]};
connected({call, From}, is_ready, #conn_data{transport = Transport, socket = Socket} = Data) ->
connected({call, From}, is_ready, #conn_data{transport = Transport, socket = Socket,
buffer = Buffer} = Data) ->
%% Stop active-mode delivery before reconciling the socket for checkout, so
%% no further {tcp,_}/{ssl,_} messages can land after we inspect it.
_ = Transport:setopts(Socket, [{active, false}]),
%% A pooled connection is only reusable if nothing arrived while it was idle.
%% #544: a server-initiated close (tcp_closed/ssl_closed) means drop it.
%% Unsolicited data is just as disqualifying: hackney does not pipeline, so
%% bytes that arrived while idle cannot belong to the next response. Reusing
%% such a socket would strand them (passive recv blocks on an empty buffer)
%% or corrupt the next read. Drop it and let the pool dial a fresh one.
case has_pending_close(Socket) orelse has_pending_data(Transport, Socket) of
case has_pending_close(Socket) of
true ->
%% #544: the server closed the idle connection - never reuse it.
{next_state, closed, Data#conn_data{socket = undefined},
[{reply, From, {ok, closed}}]};
false ->
case check_socket_health(Transport, Socket) of
ok ->
{keep_state_and_data, [{reply, From, {ok, connected}}]};
%% Bytes delivered to the mailbox while idle in {active, once}
%% are the start of the response; keep them in the read buffer
%% so the next request consumes them instead of stranding them.
Drained = drain_socket_data(Socket),
{keep_state,
Data#conn_data{buffer = <<Buffer/binary, Drained/binary>>},
[{reply, From, {ok, connected}}]};
{error, _} ->
{keep_state_and_data, [{reply, From, {ok, closed}}]}
end
Expand Down Expand Up @@ -965,7 +972,10 @@ connected({call, From}, {request, Method, Path, Headers, Body, ReqOpts}, Data) -
status = undefined,
reason = undefined,
response_headers = undefined,
buffer = <<>>,
%% NOTE: buffer is intentionally preserved (not reset to <<>>). It is
%% empty after any complete response, but may hold response bytes that
%% #544 {active, once} stranded into the mailbox and connected(info,...)
%% buffered; the next request must consume them.
async = false,
async_ref = undefined,
stream_to = undefined,
Expand Down Expand Up @@ -1040,7 +1050,8 @@ connected({call, From}, {send_headers, Method, Path, Headers}, Data) ->
status = undefined,
reason = undefined,
response_headers = undefined,
buffer = <<>>,
%% buffer preserved (see the {request,...} handler): may hold stranded
%% response bytes buffered from #544 {active, once}.
async = false,
async_ref = undefined,
stream_to = undefined
Expand Down Expand Up @@ -1076,10 +1087,13 @@ connected(info, {ssl_error, Socket, _Reason}, #conn_data{socket = Socket} = Data

%% Unexpected data received while idle - HTTP/1.1 only (H/2 socket is owned
%% by h2_connection; H/3 uses QUIC messages).
connected(info, {tcp, Socket, _UnexpectedData}, #conn_data{socket = Socket} = Data) ->
{next_state, closed, Data#conn_data{socket = undefined}};
connected(info, {ssl, Socket, _UnexpectedData}, #conn_data{socket = Socket} = Data) ->
{next_state, closed, Data#conn_data{socket = undefined}};
%% Bytes delivered by #544 {active, once} while idle are the start of the next
%% response on a reused connection. Buffer them (do NOT treat as a broken
%% connection) and re-arm close detection, so the next request consumes them.
connected(info, {tcp, Socket, Data}, #conn_data{socket = Socket} = D) ->
buffer_idle_data(Data, D);
connected(info, {ssl, Socket, Data}, #conn_data{socket = Socket} = D) ->
buffer_idle_data(Data, D);

%% HTTP/3 message handling
connected(info, {h3, ConnRef, {stream_headers, StreamId, Headers, Fin}},
Expand Down Expand Up @@ -1131,15 +1145,20 @@ connected(EventType, Event, Data) ->
%% State: sending - Sending request data
%%====================================================================

sending(enter, connected, #conn_data{transport = Transport, socket = Socket}) ->
%% Set socket to passive mode for blocking send/recv operations
%% (socket was in active mode while idle in connected state)
%% Note: socket may be undefined for HTTP/3 (QUIC) connections
_ = case Socket of
undefined -> ok;
_ -> Transport:setopts(Socket, [{active, false}])
end,
keep_state_and_data;
sending(enter, connected, #conn_data{transport = Transport, socket = Socket,
buffer = Buffer} = Data) ->
%% Deterministically leave {active, once} before sending the request, and
%% drain any bytes already delivered to the mailbox into the read buffer so
%% the request/response cycle never runs with stranded data (the reuse hang).
%% Note: socket may be undefined for HTTP/3 (QUIC) connections.
case Socket of
undefined ->
keep_state_and_data;
_ ->
_ = Transport:setopts(Socket, [{active, false}]),
Drained = drain_socket_data(Socket),
{keep_state, Data#conn_data{buffer = <<Buffer/binary, Drained/binary>>}}
end;

sending(internal, {send_request, Method, Path, Headers, Body}, Data) ->
case do_send_request(Method, Path, Headers, Body, Data) of
Expand Down Expand Up @@ -1186,15 +1205,18 @@ streaming_body(enter, connected, #conn_data{protocol = http2}) ->
%% mode. hackney_conn must NOT flip it to passive or the h2 lib stops
%% receiving frames (the response would never arrive).
keep_state_and_data;
streaming_body(enter, connected, #conn_data{transport = Transport, socket = Socket}) ->
%% Set socket to passive mode for blocking send/recv operations
%% (socket was in active mode while idle in connected state)
%% Note: socket may be undefined for HTTP/3 (QUIC) connections
_ = case Socket of
undefined -> ok;
_ -> Transport:setopts(Socket, [{active, false}])
end,
keep_state_and_data;
streaming_body(enter, connected, #conn_data{transport = Transport, socket = Socket,
buffer = Buffer} = Data) ->
%% Same as sending(enter): go passive and un-strand any mailbox bytes before
%% the request/response cycle (socket may be undefined for HTTP/3 QUIC).
case Socket of
undefined ->
keep_state_and_data;
_ ->
_ = Transport:setopts(Socket, [{active, false}]),
Drained = drain_socket_data(Socket),
{keep_state, Data#conn_data{buffer = <<Buffer/binary, Drained/binary>>}}
end;

streaming_body(internal, {send_headers_only, Method, Path, Headers}, Data) ->
%% Send only headers, then return ok and wait for body chunks
Expand Down Expand Up @@ -2333,7 +2355,18 @@ stream_body_chunk_result({error, Reason}, _Data) ->

%% @private Receive data from socket
recv_data(#conn_data{transport = Transport, socket = Socket, recv_timeout = Timeout}) ->
Transport:recv(Socket, 0, Timeout).
%% Consume any bytes stranded in the mailbox by #544 {active, once} before
%% falling back to a passive socket read, so a reused connection never blocks
%% on an empty socket buffer while the response sits unread as a message.
case drain_socket_data(Socket) of
<<>> ->
case has_pending_close(Socket) of
true -> {error, closed};
false -> Transport:recv(Socket, 0, Timeout)
end;
Bytes ->
{ok, Bytes}
end.

%% @private Determine if we should enable active mode when entering connected state
%% We only want active mode for close detection when the connection is truly idle
Expand Down Expand Up @@ -2395,30 +2428,36 @@ has_pending_close(Socket) ->
false
end.

%% @private Detect unsolicited data that arrived while the socket was idle in
%% active mode. Returns true when any data is pending — such a connection is not
%% safe to reuse (hackney does not pipeline, so the bytes cannot belong to the
%% next response). Drains the mailbox so a dropped connection leaves nothing
%% behind, and peeks the socket buffer to close the active->passive race where a
%% {tcp,_}/{ssl,_} message has not yet landed. Must run after the socket is set
%% passive. Close messages are checked separately via has_pending_close/1.
has_pending_data(Transport, Socket) ->
HadMailbox = drain_socket_mailbox(Socket),
HadBuffer = case Transport:recv(Socket, 0, 0) of
{ok, _Bytes} -> true; %% bytes already buffered on the socket
{error, timeout} -> false; %% nothing pending - healthy idle socket
{error, _} -> true %% closed/other - not reusable
end,
HadMailbox orelse HadBuffer.
%% @private Drain and return any {tcp/ssl, Socket, Data} bytes queued in the
%% mailbox. #544 puts idle pooled sockets in {active, once}, which delivers the
%% next inbound bytes (the start of the response on a reused connection) as a
%% mailbox message and reverts the socket to passive. Those bytes are real
%% response data and must reach the parser, so we drain-and-return them rather
%% than discard them. Close/error messages are handled via has_pending_close/1.
drain_socket_data(Socket) ->
drain_socket_data(Socket, <<>>).

%% @private Remove any {tcp/ssl, Socket, Data} messages from the mailbox,
%% returning true if at least one was present.
drain_socket_mailbox(Socket) ->
drain_socket_data(Socket, Acc) ->
receive
{tcp, Socket, _Data} -> _ = drain_socket_mailbox(Socket), true;
{ssl, Socket, _Data} -> _ = drain_socket_mailbox(Socket), true
{tcp, Socket, Data} -> drain_socket_data(Socket, <<Acc/binary, Data/binary>>);
{ssl, Socket, Data} -> drain_socket_data(Socket, <<Acc/binary, Data/binary>>)
after 0 ->
false
Acc
end.

%% @private Buffer bytes that #544 {active, once} delivered on an idle HTTP/1.1
%% connection (the start of the next response on reuse) and re-arm close
%% detection, so the next request consumes them. Bounded by ?MAX_IDLE_BUFFER: a
%% peer flooding an idle connection is dropped rather than buffered unboundedly.
buffer_idle_data(Data, #conn_data{socket = Socket, transport = Transport,
buffer = Buffer} = D) ->
NewBuffer = <<Buffer/binary, Data/binary>>,
case byte_size(NewBuffer) > ?MAX_IDLE_BUFFER of
true ->
{next_state, closed, D#conn_data{socket = undefined}};
false ->
_ = Transport:setopts(Socket, [{active, once}]),
{keep_state, D#conn_data{buffer = NewBuffer}}
end.

%% @private Notify pool that connection is available for reuse (async)
Expand Down Expand Up @@ -2476,7 +2515,8 @@ do_request_async(From, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowR
status = undefined,
reason = undefined,
response_headers = undefined,
buffer = <<>>,
%% buffer preserved (see the {request,...} handler): may hold stranded
%% response bytes buffered from #544 {active, once}.
async = AsyncMode,
async_ref = Ref,
stream_to = StreamTo,
Expand Down Expand Up @@ -2738,12 +2778,18 @@ cancel_all_h2_timers(#conn_data{h2_timers = Timers} = Data) ->
%% stream and a sync reader is parked, fail that reader and drop the stream.
%% A stale timer (re-armed or already completed) is ignored.
handle_h2_recv_timeout(StreamId, TRef,
#conn_data{h2_streams = Streams, h2_timers = Timers} = Data) ->
#conn_data{h2_streams = Streams, h2_timers = Timers,
h2_conn = H2Conn} = Data) ->
case maps:get(StreamId, Timers, undefined) of
TRef ->
Timers2 = maps:remove(StreamId, Timers),
case maps:get(StreamId, Streams, undefined) of
{From, Inner} when is_tuple(Inner), element(1, Inner) =:= sync ->
%% RST_STREAM(CANCEL) the stalled stream so the peer stops
%% sending for it and the h2 layer drops it; otherwise the
%% pooled connection would be reused with an orphaned stream
%% still open (h2_conn_usable only checks the conn state).
_ = cancel_h2_stream(H2Conn, StreamId),
Streams2 = maps:remove(StreamId, Streams),
{keep_state,
Data#conn_data{h2_streams = Streams2, h2_timers = Timers2,
Expand All @@ -2756,6 +2802,11 @@ handle_h2_recv_timeout(StreamId, TRef,
{keep_state, Data}
end.

%% @private RST_STREAM(CANCEL) a stalled HTTP/2 stream, tolerating a dead conn.
cancel_h2_stream(undefined, _StreamId) -> ok;
cancel_h2_stream(H2Conn, StreamId) ->
try h2_connection:cancel_stream(H2Conn, StreamId) catch _:_ -> ok end.

%% @private Send an HTTP/2 request via the h2 library.
do_h2_request(From, Method, Path, Headers, Body, Data) ->
do_h2_send(From, Method, Path, Headers, Body,
Expand Down Expand Up @@ -3188,9 +3239,26 @@ h2_stream_parked_from({stream, headers, _, _, _, From}) -> From;
h2_stream_parked_from({stream, body_full, _, _, _, From}) -> From;
h2_stream_parked_from(_) -> undefined.

h2_on_goaway(ErrorCode, Data) ->
h2_on_goaway(ErrorCode, #conn_data{h2_conn = H2Conn, h2_mon = H2Mon} = Data) ->
%% A GOAWAY means the peer will not service new streams on this connection.
%% AWS ALBs recycle connections this way, sending GOAWAY but keeping the
%% socket open for a drain window. Leaving the conn `connected` and pooled
%% made checkout_h2/h2_conn_usable keep handing it out, so every reused
%% request opened a stream past last_stream_id that the peer ignored and hung
%% to recv_timeout. Tear the connection down and transition to `closed` (like
%% h2_on_closed/2): the pool then stops reusing it (h2_conn_usable requires
%% `connected`) and new requests dial a fresh connection. in-flight streams
%% are aborted with the goaway error as before.
{Replies, Data1} = collect_h2_aborts({goaway, ErrorCode}, Data),
{keep_state, cancel_all_h2_timers(Data1), Replies}.
Data2 = cancel_all_h2_timers(Data1),
_ = case H2Mon of
undefined -> ok;
_ -> erlang:demonitor(H2Mon, [flush])
end,
close_h2(H2Conn),
Stripped = Data2#conn_data{h2_conn = undefined, h2_mon = undefined,
socket = undefined, no_reuse = true},
{next_state, closed, Stripped, Replies}.

h2_on_closed(Reason, Data) ->
{Replies, Data1} = collect_h2_aborts({closed, Reason}, Data),
Expand Down
Loading
Loading