From 17434b79043c13df828bb97cb829072ad80d412e Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Thu, 18 Jun 2026 09:12:36 +0200 Subject: [PATCH] Fix HTTP/1.1 keepalive correctness in connection pooling Honor Connection: close on the sync body path, which was checking the connection back into the pool. The close decision now lives in a single hackney_keepalive predicate following RFC 7230 (1.1 keep-alive default, 1.0 default close, Connection: keep-alive, response and request Connection: close) and is used by both the sync and async checkin paths. Gate pool checkin on a proven-ready socket and the keepalive decision, defaulting unknown flags to close, so the pool only holds reusable connections. Discard a closed pool entry at checkout instead of redialing the same pid from inside the pool process. Add regression tests for the predicate and the pool behavior, plus a concurrent same-URI stress suite. Also fix a copy-paste bug in the async long-headers test server loop that the readiness gate exposed, and add a /maybe-close test route. --- src/hackney_conn.erl | 78 ++++++++--- src/hackney_keepalive.erl | 85 ++++++++++++ src/hackney_pool.erl | 79 +++++++----- ...y_integration_tests_async_long_headers.erl | 5 +- test/hackney_keepalive_tests.erl | 119 +++++++++++++++++ test/hackney_pool_stress_tests.erl | 63 +++++++++ test/hackney_pool_tests.erl | 99 +++++++++++++- test/stress_pool_concurrency.erl | 122 ++++++++++++++++++ test/test_http_resource.erl | 11 ++ 9 files changed, 610 insertions(+), 51 deletions(-) create mode 100644 src/hackney_keepalive.erl create mode 100644 test/hackney_keepalive_tests.erl create mode 100644 test/hackney_pool_stress_tests.erl create mode 100644 test/stress_pool_concurrency.erl diff --git a/src/hackney_conn.erl b/src/hackney_conn.erl index 5c3ca8a5..083e7782 100644 --- a/src/hackney_conn.erl +++ b/src/hackney_conn.erl @@ -157,6 +157,10 @@ request_from :: {pid(), reference()} | undefined, method :: binary() | undefined, path :: binary() | undefined, + %% Whether the current request carried Connection: close (the caller asked + %% the server to close). Folded into the keepalive decision at checkin so a + %% requested close is never reused. Reset on every request send. + request_close = false :: boolean(), %% Response state version :: {integer(), integer()} | undefined, @@ -583,9 +587,12 @@ is_no_reuse(Pid) -> %% @doc Get the flags the pool needs for its checkin decision in one call: %% whether the connection was SSL upgraded, opted into SSL pooling, must not -%% be reused (proxy tunnels), and the negotiated protocol. +%% be reused (proxy tunnels), the negotiated protocol, whether the response +%% requires the connection to close (keepalive), and whether the socket is +%% proven ready to pool. -spec checkin_info(pid()) -> #{upgraded_ssl := boolean(), no_reuse := boolean(), - pool_ssl := boolean(), protocol := atom()}. + pool_ssl := boolean(), protocol := atom(), + should_close := boolean(), ready := boolean()}. checkin_info(Pid) -> gen_statem:call(Pid, checkin_info). @@ -921,7 +928,9 @@ connected({call, From}, is_no_reuse, #conn_data{no_reuse = NoReuse}) -> {keep_state_and_data, [{reply, From, NoReuse}]}; connected({call, From}, checkin_info, Data) -> - {keep_state_and_data, [{reply, From, checkin_info_map(Data)}]}; + %% In connected state: prove the socket is ready before the pool may keep it. + Map = (checkin_info_map(Data))#{ready => socket_ready(Data)}, + {keep_state_and_data, [{reply, From, Map}]}; connected({call, From}, {request, Method, Path, Headers, Body, ReqOpts}, #conn_data{protocol = http2} = Data) -> %% HTTP/2 request - use h2_machine (1xx not applicable for HTTP/2) @@ -1232,10 +1241,15 @@ streaming_body(internal, {send_headers_only, Method, Path, Headers}, Data) -> RequestLine = build_request_line(Method, Path), HeaderLines = [[Name, <<": ">>, Value, <<"\r\n">>] || {Name, Value} <- HeadersList], HeadersData = [RequestLine, HeaderLines, <<"\r\n">>], + %% Record whether the caller asked the server to close (the request line is + %% built here, not via do_send_request/5), for the keepalive decision. + RequestClose = hackney_keepalive:request_closes(HeadersWithTE), case Transport:send(Socket, HeadersData) of ok -> From = Data#conn_data.request_from, - {keep_state, Data#conn_data{request_from = undefined}, [{reply, From, ok}]}; + {keep_state, Data#conn_data{request_from = undefined, + request_close = RequestClose}, + [{reply, From, ok}]}; {error, Reason} -> From = Data#conn_data.request_from, {next_state, closed, Data, [{reply, From, {error, Reason}}]} @@ -1840,7 +1854,9 @@ handle_common({call, From}, is_no_reuse, _State, #conn_data{no_reuse = NoReuse}) {keep_state_and_data, [{reply, From, NoReuse}]}; handle_common({call, From}, checkin_info, _State, Data) -> - {keep_state_and_data, [{reply, From, checkin_info_map(Data)}]}; + %% Any non-connected state (e.g. already closed) is not poolable. + Map = (checkin_info_map(Data))#{ready => false}, + {keep_state_and_data, [{reply, From, Map}]}; handle_common({call, From}, get_protocol, _State, #conn_data{protocol = Protocol}) -> {keep_state_and_data, [{reply, From, Protocol}]}; @@ -1894,14 +1910,18 @@ reset_async(Data) -> stream_to = undefined }. -%% @private Check if connection should be closed based on response headers -should_close_connection(#conn_data{response_headers = undefined}) -> +%% @private Whether the connection must close (not be reused) for keepalive +%% reasons. Distinguishes "no response observed yet" (a fresh or idle pooled +%% conn, which stays poolable) from a parsed response, whose close decision +%% follows RFC 7230 via hackney_keepalive:should_close/3. +should_close_connection(#conn_data{response_headers = undefined, version = undefined}) -> + %% No request/response cycle has run on this connection: keep it poolable so + %% a plain checkout/checkin (no request) still returns the live conn to the + %% pool. false; -should_close_connection(#conn_data{response_headers = Headers}) -> - case hackney_headers:get_value(<<"connection">>, Headers) of - undefined -> false; - Value -> hackney_bstr:to_lower(Value) =:= <<"close">> - end. +should_close_connection(#conn_data{version = Version, response_headers = Headers, + request_close = RequestClose}) -> + hackney_keepalive:should_close(Version, Headers, RequestClose). %% @private Finish async streaming - close or return to connected based on Connection header finish_async_streaming(Data) -> @@ -1960,6 +1980,10 @@ do_send_request(Method, Path, Headers, Body, Data) -> %% Build request headers FinalHeaders = build_headers(Method, Headers, Body, Netloc), + %% Record whether the caller asked the server to close, for the keepalive + %% decision at checkin. Reset per request. + Data1 = Data#conn_data{request_close = hackney_keepalive:request_closes(FinalHeaders)}, + %% Build request line and headers Path1 = case Path of <<>> -> <<"/">>; @@ -1975,7 +1999,7 @@ do_send_request(Method, Path, Headers, Body, Data) -> %% Send body if present case send_body(Transport, Socket, Body) of ok -> - {ok, Data}; + {ok, Data1}; {error, Reason} -> {error, Reason} end; @@ -2428,6 +2452,16 @@ has_pending_close(Socket) -> false end. +%% @private Whether the socket is proven ready to pool: present, no queued close, +%% and still connected. Runs in the conn process (peername + receive-after-0, +%% both fast); consuming a queued close here makes a server-closed idle conn +%% report not-ready. Does not disable active-once, so no stranded-byte regression. +socket_ready(#conn_data{socket = undefined}) -> + false; +socket_ready(#conn_data{transport = Transport, socket = Socket}) -> + not has_pending_close(Socket) andalso + check_socket_health(Transport, Socket) =:= ok. + %% @private Drain and return any {tcp/ssl, Socket, Data} bytes queued in the %% mailbox. #544 puts idle pooled sockets in {active, once}, which delivers the %% next inbound bytes (the start of the response on a reused connection) as a @@ -2478,16 +2512,24 @@ notify_pool_available_sync(#conn_data{pool_pid = undefined}) -> ok; notify_pool_available_sync(#conn_data{pool_pid = PoolPid, upgraded_ssl = UpgradedSsl, no_reuse = NoReuse, pool_ssl = PoolSsl, - protocol = Protocol}) -> + protocol = Protocol} = Data) -> + %% do_checkin_with_close_flag/3 pools whenever this flag is false, with no + %% further check, so also close on a keepalive-close response and on an + %% unready socket - matching the async-cast checkin gate. ShouldClose = NoReuse orelse (UpgradedSsl andalso not PoolSsl) - orelse Protocol =/= http1, + orelse Protocol =/= http1 + orelse should_close_connection(Data) + orelse not socket_ready(Data), gen_server:call(PoolPid, {checkin_sync, self(), ShouldClose}, 5000). -%% @private Flags for the pool's checkin decision, see checkin_info/1. +%% @private Flags for the pool's checkin decision, see checkin_info/1. The +%% `ready' flag is added per-state by the checkin_info handlers, since it +%% depends on the conn being in `connected' with a healthy socket. checkin_info_map(#conn_data{upgraded_ssl = UpgradedSsl, no_reuse = NoReuse, - pool_ssl = PoolSsl, protocol = Protocol}) -> + pool_ssl = PoolSsl, protocol = Protocol} = Data) -> #{upgraded_ssl => UpgradedSsl, no_reuse => NoReuse, - pool_ssl => PoolSsl, protocol => Protocol}. + pool_ssl => PoolSsl, protocol => Protocol, + should_close => should_close_connection(Data)}. %% @private Start an async request do_request_async(From, Method, Path, Headers, Body, AsyncMode, StreamTo, FollowRedirect, Data) -> diff --git a/src/hackney_keepalive.erl b/src/hackney_keepalive.erl new file mode 100644 index 00000000..6d68e409 --- /dev/null +++ b/src/hackney_keepalive.erl @@ -0,0 +1,85 @@ +%%% -*- erlang -*- +%%% +%%% This file is part of hackney released under the Apache 2 license. +%%% See the NOTICE for more information. +%%% +%%% Copyright (c) 2012-2024, BenoƮt Chesneau + +%% @doc HTTP/1.x keepalive semantics. +%% +%% Single source of truth for deciding whether an HTTP/1.x connection must be +%% closed (not reused) after a response, per RFC 7230. The `Connection' header +%% is a list-valued, hop-by-hop field (RFC 7230 3.2.2, 6.1): a recipient may get +%% it as several header lines or as one comma-joined value, and both are +%% equivalent. It is forbidden in HTTP/2 and HTTP/3, so these rules apply only to +%% HTTP/1.x; multiplexed conns are never pooled in `available'. +%% +%% Every function tolerates undefined or malformed header objects so a bad header +%% can never crash the keepalive decision. +-module(hackney_keepalive). + +-export([should_close/3, + request_closes/1, + connection_tokens/1]). + +%% @doc Whether a parsed HTTP/1.x response means the connection must close. +%% +%% The caller (hackney_conn:should_close_connection/1) guards the "no response +%% observed yet" case; here `Version'/`RespHeaders' describe a response that was +%% actually parsed. Order matters: an explicit `close' wins over version default. +-spec should_close(Version, RespHeaders, RequestClose) -> boolean() when + Version :: {integer(), integer()} | undefined, + RespHeaders :: term(), + RequestClose :: boolean(). +should_close(_Version, _RespHeaders, true) -> + %% We asked the server to close (request carried Connection: close). + true; +should_close(Version, RespHeaders, false) -> + Tokens = connection_tokens(RespHeaders), + case lists:member(<<"close">>, Tokens) of + true -> + true; + false -> + case Version of + {1, 1} -> + %% HTTP/1.1 default is keep-alive; an absent Connection header + %% stays persistent and poolable. + false; + {1, 0} -> + %% HTTP/1.0 default is close unless it opts into keep-alive. + not lists:member(<<"keep-alive">>, Tokens); + _ -> + %% Unknown version on a parsed response: close on the safe side. + true + end + end. + +%% @doc Whether request headers carry `Connection: close'. +-spec request_closes(term()) -> boolean(). +request_closes(ReqHeaders) -> + lists:member(<<"close">>, connection_tokens(ReqHeaders)). + +%% @doc Lower-cased, trimmed tokens from every `Connection' header. +%% +%% Defensive at each layer: an undefined or malformed header object yields `[]', +%% and a value that does not convert to a binary is skipped rather than crashing. +-spec connection_tokens(term()) -> [binary()]. +connection_tokens(undefined) -> + []; +connection_tokens(Headers) -> + Values = try hackney_headers:lookup(<<"connection">>, Headers) + catch _:_ -> [] + end, + lists:flatmap(fun({_Key, Value}) -> value_tokens(Value) end, Values). + +%% @private +value_tokens(Value) -> + case (try hackney_bstr:to_binary(Value) catch _:_ -> error end) of + error -> + []; + Bin -> + Lower = hackney_bstr:to_lower(Bin), + Parts = binary:split(Lower, <<",">>, [global]), + Trimmed = [hackney_bstr:trim(P) || P <- Parts], + [T || T <- Trimmed, T =/= <<>>] + end. diff --git a/src/hackney_pool.erl b/src/hackney_pool.erl index 4886e548..e4d831d5 100644 --- a/src/hackney_pool.erl +++ b/src/hackney_pool.erl @@ -874,17 +874,12 @@ h3_connection_key(Host0, Port, Transport, Options) -> stop_conn(Pid) -> try hackney_conn:stop(Pid) catch _:_ -> ok end. +%% @private Find a reusable idle connection for `Key', discarding any that are +%% no longer keepalive-ready. Only a conn that is_ready reports `{ok, connected}' +%% is handed out; a closed conn is stopped and dropped (never reanimated). Fresh +%% dialing for an empty bucket is the caller's `none' branch, off the pool's hot +%% path. The SSL alias exists only to mark intent at the SSL checkout site. find_available(Key, Available) -> - find_available(Key, Available, true). - -%% @private Like find_available/2 but never redials a closed connection. -%% Used for SSL buckets: hackney_conn:connect/1 on an upgraded conn would -%% reconnect the raw transport without the upgrade handshake options, so a -%% closed conn is stopped and dropped instead. -find_available_ssl(Key, Available) -> - find_available(Key, Available, false). - -find_available(Key, Available, Redial) -> case maps:find(Key, Available) of {ok, [Pid | Rest]} -> Available2 = case Rest of @@ -896,27 +891,23 @@ find_available(Key, Available, Redial) -> true -> %% is_ready checks both state and socket health in one call. %% The connection can die between is_process_alive/1 above - %% and these gen_statem calls (flaky network); the resulting + %% and this gen_statem call (flaky network); the resulting %% noproc exit must not crash the pool, so skip and move on. try hackney_conn:is_ready(Pid) of - {ok, connected} -> {ok, Pid, Available2}; - {ok, closed} when Redial -> - %% Connection closed, try reconnect - try hackney_conn:connect(Pid) of - ok -> {ok, Pid, Available2}; - _ -> find_available(Key, Available2, Redial) - catch - _:_ -> find_available(Key, Available2, Redial) - end; - {ok, closed} -> + {ok, connected} -> + {ok, Pid, Available2}; + _ -> + %% Closed or unusable: discard it rather than redial + %% from inside the pool. Reanimating a closed pid would + %% break the "only keepalive conns are reused" invariant + %% and a redial here would block the pool on connect. stop_conn(Pid), - find_available(Key, Available2, Redial); - _ -> find_available(Key, Available2, Redial) + find_available(Key, Available2) catch - _:_ -> find_available(Key, Available2, Redial) + _:_ -> find_available(Key, Available2) end; false -> - find_available(Key, Available2, Redial) + find_available(Key, Available2) end; {ok, []} -> none; @@ -924,6 +915,12 @@ find_available(Key, Available, Redial) -> none end. +%% @private SSL-bucket variant. Now identical to find_available/2 (closed conns +%% are always dropped, never redialed); kept as a named alias to mark intent at +%% the SSL checkout site. +find_available_ssl(Key, Available) -> + find_available(Key, Available). + %% @private SSL checkout miss: reuse or dial a TCP connection for the caller %% to upgrade. It is recorded in in_use under the SSL key so the checkin %% decision can tell it apart from plain TCP checkouts. @@ -1031,9 +1028,14 @@ do_checkin(Pid, State) -> %% Check if connection is still alive case is_process_alive(Pid) of true -> - %% One call fetches every flag the decision needs - Info = try hackney_conn:checkin_info(Pid) catch _:_ -> #{} end, - case checkin_poolable(Key, Info) andalso pool_has_idle_room(State) of + %% One call fetches every flag the decision needs. A failed + %% checkin_info means we cannot prove the conn keepalive/ready, + %% so treat it as not poolable (close) rather than pooling blind. + Poolable = case checkin_info(Pid) of + {ok, Info} -> checkin_poolable(Key, Info); + error -> false + end, + case Poolable andalso pool_has_idle_room(State) of true -> checkin_pool(Pid, Key, InUse2, State); false -> @@ -1060,10 +1062,27 @@ do_checkin(Pid, State) -> checkin_poolable({_Host, _Port, hackney_ssl, _TlsKey}, Info) -> maps:get(no_reuse, Info, true) =:= false andalso maps:get(upgraded_ssl, Info, false) =:= true andalso - maps:get(protocol, Info, undefined) =:= http1; + maps:get(protocol, Info, undefined) =:= http1 andalso + keepalive_ready(Info); checkin_poolable(_TcpKey, Info) -> maps:get(no_reuse, Info, false) =:= false andalso - maps:get(upgraded_ssl, Info, false) =:= false. + maps:get(upgraded_ssl, Info, false) =:= false andalso + keepalive_ready(Info). + +%% @private Fetch the conn's checkin flags, or `error' if the call fails (the +%% conn died between is_process_alive/1 and here). Caller treats `error' as +%% not poolable. +checkin_info(Pid) -> + try {ok, hackney_conn:checkin_info(Pid)} + catch _:_ -> error + end. + +%% @private Shared keepalive/readiness gate for checkin: only pool a conn whose +%% response left it reusable and whose socket is proven ready. Defaults are the +%% safe side so an unknown flag closes rather than pools. +keepalive_ready(Info) -> + maps:get(should_close, Info, true) =:= false andalso + maps:get(ready, Info, false) =:= true. %% @private Close branch of a checkin: drop the monitor and keep the host's %% TCP prewarm warm (the replacement for a closed conn is a TCP conn that diff --git a/test/hackney_integration_tests_async_long_headers.erl b/test/hackney_integration_tests_async_long_headers.erl index a8077d00..8068bd90 100644 --- a/test/hackney_integration_tests_async_long_headers.erl +++ b/test/hackney_integration_tests_async_long_headers.erl @@ -84,7 +84,10 @@ dummy_server_loop(LSock, Port, StatusCode) -> ]), send(Sock, Response), ok = gen_tcp:shutdown(Sock, read_write), - dummy_server_loop(LSock, RedirectUrl, StatusCode). + %% Recurse with Port (not RedirectUrl): the loop must keep accepting. Pool + %% prewarm reconnects to this host after a closed conn is checked in, so a + %% second accept happens; passing the binary URL here crashed integer_to_list. + dummy_server_loop(LSock, Port, StatusCode). send(Sock, << Data :128/binary, Rest/binary>>) -> ok = gen_tcp:send(Sock, Data), diff --git a/test/hackney_keepalive_tests.erl b/test/hackney_keepalive_tests.erl new file mode 100644 index 00000000..6d4def0e --- /dev/null +++ b/test/hackney_keepalive_tests.erl @@ -0,0 +1,119 @@ +%%% -*- erlang -*- +%%% +%%% This file is part of hackney released under the Apache 2 license. +%%% See the NOTICE for more information. + +-module(hackney_keepalive_tests). + +-include_lib("eunit/include/eunit.hrl"). + +%% Build a hackney_headers object from a {Key, Value} list. +h(List) -> hackney_headers:from_list(List). + +%%==================================================================== +%% should_close/3 +%%==================================================================== + +http11_no_header_keeps_alive_test() -> + %% HTTP/1.1 default is keep-alive: an absent Connection header is poolable. + ?assertEqual(false, hackney_keepalive:should_close({1, 1}, h([]), false)). + +http11_connection_close_closes_test() -> + ?assertEqual(true, + hackney_keepalive:should_close( + {1, 1}, h([{<<"Connection">>, <<"close">>}]), false)). + +http11_connection_keep_alive_keeps_alive_test() -> + ?assertEqual(false, + hackney_keepalive:should_close( + {1, 1}, h([{<<"Connection">>, <<"keep-alive">>}]), false)). + +http10_no_header_closes_test() -> + %% HTTP/1.0 default is close. + ?assertEqual(true, hackney_keepalive:should_close({1, 0}, h([]), false)). + +http10_keep_alive_keeps_alive_test() -> + ?assertEqual(false, + hackney_keepalive:should_close( + {1, 0}, h([{<<"Connection">>, <<"keep-alive">>}]), false)). + +request_close_closes_test() -> + %% Caller asked the server to close: never reuse, regardless of response. + ?assertEqual(true, hackney_keepalive:should_close({1, 1}, h([]), true)), + ?assertEqual(true, + hackney_keepalive:should_close( + {1, 1}, h([{<<"Connection">>, <<"keep-alive">>}]), true)). + +token_list_without_close_keeps_alive_test() -> + %% "keep-alive, Upgrade" is a token list with no close token. + ?assertEqual(false, + hackney_keepalive:should_close( + {1, 1}, h([{<<"Connection">>, <<"keep-alive, Upgrade">>}]), false)). + +token_list_with_close_closes_test() -> + ?assertEqual(true, + hackney_keepalive:should_close( + {1, 1}, h([{<<"Connection">>, <<"close, Foo">>}]), false)). + +multiple_connection_headers_close_closes_test() -> + %% Connection split across two header lines (RFC 7230 list field). + ?assertEqual(true, + hackney_keepalive:should_close( + {1, 1}, + h([{<<"Connection">>, <<"keep-alive">>}, + {<<"Connection">>, <<"close">>}]), + false)). + +mixed_case_close_closes_test() -> + ?assertEqual(true, + hackney_keepalive:should_close( + {1, 1}, h([{<<"Connection">>, <<"Close">>}]), false)). + +whitespace_around_token_test() -> + ?assertEqual(true, + hackney_keepalive:should_close( + {1, 1}, h([{<<"Connection">>, <<" close ">>}]), false)). + +unknown_version_parsed_closes_test() -> + %% Anomalous: a parsed response with an unknown version closes on the safe side. + ?assertEqual(true, hackney_keepalive:should_close(undefined, h([]), false)). + +%%==================================================================== +%% request_closes/1 +%%==================================================================== + +request_closes_true_test() -> + ?assertEqual(true, + hackney_keepalive:request_closes(h([{<<"Connection">>, <<"close">>}]))). + +request_closes_false_test() -> + ?assertEqual(false, + hackney_keepalive:request_closes(h([{<<"Connection">>, <<"keep-alive">>}]))), + ?assertEqual(false, hackney_keepalive:request_closes(h([]))). + +request_closes_token_list_test() -> + ?assertEqual(true, + hackney_keepalive:request_closes( + h([{<<"Connection">>, <<"keep-alive, close">>}]))). + +%%==================================================================== +%% connection_tokens/1 - defensive +%%==================================================================== + +tokens_undefined_test() -> + ?assertEqual([], hackney_keepalive:connection_tokens(undefined)). + +tokens_malformed_object_test() -> + %% Not a hackney_headers {_, dict} object: must not crash. + ?assertEqual([], hackney_keepalive:connection_tokens(not_a_headers_object)), + ?assertEqual([], hackney_keepalive:connection_tokens({bad})). + +tokens_list_value_test() -> + %% A header value supplied as a string (list) is converted, not crashed on. + ?assertEqual([<<"close">>], + hackney_keepalive:connection_tokens(h([{<<"Connection">>, "close"}]))). + +tokens_empty_value_test() -> + %% Empty / comma-only values drop to no tokens. + ?assertEqual([], hackney_keepalive:connection_tokens(h([{<<"Connection">>, <<>>}]))), + ?assertEqual([], hackney_keepalive:connection_tokens(h([{<<"Connection">>, <<" , ">>}]))). diff --git a/test/hackney_pool_stress_tests.erl b/test/hackney_pool_stress_tests.erl new file mode 100644 index 00000000..086f66cb --- /dev/null +++ b/test/hackney_pool_stress_tests.erl @@ -0,0 +1,63 @@ +%%% -*- erlang -*- +%%% +%%% This file is part of hackney released under the Apache 2 license. +%%% See the NOTICE for more information. +%%% +%%% Concurrent same-URI pool stress: many workers hit one URI through a small +%%% warm pool; every request must return a 2xx. Covers keep-alive, Connection: +%%% close, and a random mix on the same URI. +-module(hackney_pool_stress_tests). + +-include_lib("eunit/include/eunit.hrl"). + +-define(PORT, 8138). +-define(POOL, stress_test_pool). +-define(WORKERS, 50). +-define(REQS, 20). + +stress_test_() -> + {setup, + fun setup/0, + fun teardown/1, + [ + {"concurrent same-URI keep-alive requests all return ok", + {timeout, 120, fun all_keepalive/0}}, + {"concurrent same-URI Connection: close requests all return ok", + {timeout, 120, fun all_close/0}}, + {"concurrent same-URI random keep-alive/close requests all return ok", + {timeout, 120, fun random_close/0}} + ]}. + +setup() -> + error_logger:tty(false), + {ok, _} = application:ensure_all_started(cowboy), + {ok, _} = application:ensure_all_started(hackney), + Dispatch = cowboy_router:compile([{'_', [{"/[...]", test_http_resource, []}]}]), + {ok, _} = cowboy:start_clear(stress_test_server, [{port, ?PORT}], + #{env => #{dispatch => Dispatch}}), + %% Small warm pool on purpose: the burst is served by overflow connections. + ok = hackney_pool:start_pool(?POOL, [{pool_size, 4}, {prewarm_count, 0}]), + ok. + +teardown(_) -> + try hackney_pool:stop_pool(?POOL) catch _:_ -> ok end, + try cowboy:stop_listener(stress_test_server) catch _:_ -> ok end, + application:stop(cowboy), + application:stop(hackney), + error_logger:tty(true), + ok. + +all_keepalive() -> assert_all_ok(<<"/get">>). + +all_close() -> assert_all_ok(<<"/connection-close">>). + +random_close() -> assert_all_ok(<<"/maybe-close">>). + +assert_all_ok(Path) -> + Url = iolist_to_binary([<<"http://127.0.0.1:">>, integer_to_list(?PORT), Path]), + #{ok := Ok, errors := Errors} = + stress_pool_concurrency:hammer(?POOL, Url, ?WORKERS, ?REQS), + ?assertEqual([], Errors), + ?assertEqual(?WORKERS * ?REQS, Ok), + %% Pool is still healthy after the burst. + ?assert(is_list(hackney_pool:get_stats(?POOL))). diff --git a/test/hackney_pool_tests.erl b/test/hackney_pool_tests.erl index 51bd33a0..465c9c1c 100644 --- a/test/hackney_pool_tests.erl +++ b/test/hackney_pool_tests.erl @@ -76,7 +76,15 @@ hackney_pool_integration_test_() -> {"checkin closes an SSL-keyed non-http1 conn", fun test_checkin_ssl_wrong_protocol_stub/0}, {"count/2 aggregates legacy 3-tuples and host_stats spans buckets", - fun test_count_mixed_buckets/0} + fun test_count_mixed_buckets/0}, + {"keep-alive response pools the conn and reuses it", + {timeout, 30, fun test_keepalive_response_pools/0}}, + {"Connection: close response is not pooled (sync body path)", + {timeout, 30, fun test_connection_close_not_pooled/0}}, + {"many sequential Connection: close requests all return ok", + {timeout, 60, fun test_connection_close_sequential_ok/0}}, + {"a closed pool entry is discarded at checkout, not reanimated", + {timeout, 30, fun test_closed_pool_entry_not_reanimated/0}} ]}. %% HTTPS/1.1 ssl_pooling integration tests - require a TLS server @@ -471,7 +479,11 @@ swap_one(P, _Old, _New) -> P. %% also ignores casts (set_owner_async, stop) and cooperates with %% gen_statem:stop/1 (system terminate) so the pool's close path never %% blocks on it. -stub_conn(Info) -> +stub_conn(Info0) -> + %% checkin_info now also carries should_close/ready. Default them to the + %% poolable side so existing flag maps keep their meaning: the no_reuse and + %% non-http1 cases still close for their own reason, the poolable case passes. + Info = maps:merge(#{should_close => false, ready => true}, Info0), spawn(fun() -> stub_conn_loop(Info) end). stub_conn_loop(Info) -> @@ -867,3 +879,86 @@ test_idle_data_consumed_not_dropped() -> ServerPid ! stop, ok = hackney_pool:stop_pool(test_pool_idle_data). + +%% A keep-alive response (HTTP/1.1, no Connection: close) leaves the connection +%% poolable: after the body is read on the sync path the conn auto-releases and +%% the next checkout reuses the same pid. +test_keepalive_response_pools() -> + ok = hackney_pool:start_pool(test_pool_keepalive, [{pool_size, 5}, {prewarm_count, 0}]), + Opts = [{pool, test_pool_keepalive}], + {ok, _PoolInfo, Pid1} = hackney_pool:checkout("127.0.0.1", ?PORT, hackney_tcp, Opts), + {ok, 200, _H} = hackney_conn:request(Pid1, <<"GET">>, <<"/get">>, [], <<>>), + {ok, _Body} = hackney_conn:body(Pid1), + %% Body fully read -> conn auto-releases to the pool (async cast). + timer:sleep(50), + Stats = hackney_pool:get_stats(test_pool_keepalive), + ?assertEqual(0, proplists:get_value(in_use_count, Stats)), + ?assertEqual(1, proplists:get_value(free_count, Stats)), + %% Reused on next checkout. + {ok, _PoolInfo2, Pid2} = hackney_pool:checkout("127.0.0.1", ?PORT, hackney_tcp, Opts), + ?assertEqual(Pid1, Pid2), + hackney_conn:stop(Pid2), + ok = hackney_pool:stop_pool(test_pool_keepalive). + +%% A Connection: close response read on the sync body path must NOT be pooled. +%% The next checkout gets a fresh, different, working connection. +test_connection_close_not_pooled() -> + ok = hackney_pool:start_pool(test_pool_conn_close, [{pool_size, 5}, {prewarm_count, 0}]), + Opts = [{pool, test_pool_conn_close}], + {ok, _PoolInfo, Pid1} = hackney_pool:checkout("127.0.0.1", ?PORT, hackney_tcp, Opts), + {ok, 200, _H} = hackney_conn:request(Pid1, <<"GET">>, <<"/connection-close">>, [], <<>>), + {ok, _Body} = hackney_conn:body(Pid1), + %% Body read -> conn auto-releases; the pool must close it, not pool it. + timer:sleep(50), + Stats = hackney_pool:get_stats(test_pool_conn_close), + ?assertEqual(0, proplists:get_value(in_use_count, Stats)), + ?assertEqual(0, proplists:get_value(free_count, Stats)), + %% Next checkout dials a fresh conn (different pid) and works. + {ok, _PoolInfo2, Pid2} = hackney_pool:checkout("127.0.0.1", ?PORT, hackney_tcp, Opts), + ?assertNotEqual(Pid1, Pid2), + {ok, 200, _H2} = hackney_conn:request(Pid2, <<"GET">>, <<"/get">>, [], <<>>), + {ok, _Body2} = hackney_conn:body(Pid2), + hackney_conn:stop(Pid2), + ok = hackney_pool:stop_pool(test_pool_conn_close). + +%% Hammer the same Connection: close URI sequentially through a small pool: every +%% request must succeed (a fresh conn each time; no hang from a wrongly-pooled +%% close connection). +test_connection_close_sequential_ok() -> + ok = hackney_pool:start_pool(test_pool_conn_close_seq, [{pool_size, 2}]), + Opts = [{pool, test_pool_conn_close_seq}], + lists:foreach( + fun(_) -> + {ok, _PoolInfo, Pid} = + hackney_pool:checkout("127.0.0.1", ?PORT, hackney_tcp, Opts), + ?assertMatch({ok, 200, _}, + hackney_conn:request(Pid, <<"GET">>, <<"/connection-close">>, [], <<>>)), + ?assertMatch({ok, _}, hackney_conn:body(Pid)) + end, lists:seq(1, 20)), + ok = hackney_pool:stop_pool(test_pool_conn_close_seq). + +%% A pooled TCP conn whose socket is dead while the process is still alive (and +%% the entry still in `available') must be discarded at checkout, never redialed +%% into the same pid. Guards the find_available reanimation removal: closing the +%% socket locally invalidates peername without delivering a tcp_closed, so the +%% conn stays in `connected' and is_ready reports {ok, closed} via the health +%% check - exactly the case the old Redial path reanimated. +test_closed_pool_entry_not_reanimated() -> + ok = hackney_pool:start_pool(test_pool_no_reanimate, [{pool_size, 5}]), + Opts = [{pool, test_pool_no_reanimate}], + {ok, PoolInfo, Pid1} = hackney_pool:checkout("127.0.0.1", ?PORT, hackney_tcp, Opts), + ok = hackney_pool:checkin(PoolInfo, Pid1), + timer:sleep(20), + ?assertEqual(1, proplists:get_value(free_count, + hackney_pool:get_stats(test_pool_no_reanimate))), + {connected, ConnData} = sys:get_state(Pid1), + Socket = element(8, ConnData), %% #conn_data.socket + ok = gen_tcp:close(Socket), + %% Next checkout must NOT hand back the reanimated Pid1; it dials fresh. + {ok, _PoolInfo2, Pid2} = hackney_pool:checkout("127.0.0.1", ?PORT, hackney_tcp, Opts), + ?assertNotEqual(Pid1, Pid2), + ?assert(is_process_alive(Pid2)), + {ok, 200, _H} = hackney_conn:request(Pid2, <<"GET">>, <<"/get">>, [], <<>>), + {ok, _B} = hackney_conn:body(Pid2), + hackney_conn:stop(Pid2), + ok = hackney_pool:stop_pool(test_pool_no_reanimate). diff --git a/test/stress_pool_concurrency.erl b/test/stress_pool_concurrency.erl new file mode 100644 index 00000000..0b8bbccd --- /dev/null +++ b/test/stress_pool_concurrency.erl @@ -0,0 +1,122 @@ +%%% -*- erlang -*- +%%% +%%% This file is part of hackney released under the Apache 2 license. +%%% See the NOTICE for more information. +%%% +%%% Concurrent same-URI pool stress harness. +%%% +%%% Hammers a single URI from many workers through a small warm pool and checks +%%% that every request returns a 2xx. The warm pool is deliberately small; the +%%% burst is served by overflow connections, so this exercises the checkin +%%% keepalive/readiness gate and the checkout path under contention. +%%% +%%% Standalone (starts its own cowboy server + pool): +%%% rebar3 as test shell +%%% stress_pool_concurrency:run(). +%%% stress_pool_concurrency:run(#{workers => 200, reqs => 50, pool_size => 4}). +%%% +%%% Against an already-running server/pool (used by the eunit stress suite): +%%% stress_pool_concurrency:hammer(Pool, Url, Workers, Reqs). +-module(stress_pool_concurrency). + +-export([run/0, run/1, hammer/4]). + +-define(DEFAULT_PORT, 8138). +-define(SERVER_REF, stress_pool_concurrency_server). +-define(POOL, stress_pool_concurrency_pool). + +%% @doc Standalone stress run with default settings. +run() -> run(#{}). + +%% @doc Standalone stress run. Starts a local cowboy server and a small pool, +%% hammers each scenario URI concurrently, then tears everything down. Returns +%% `ok' only if every request returned a 2xx, else `{error, Errors}'. +%% Opts: workers (100), reqs (20), pool_size (4), port (8138), +%% scenarios ([<<"/get">>, <<"/connection-close">>, <<"/maybe-close">>]). +run(Opts) when is_map(Opts) -> + Workers = maps:get(workers, Opts, 100), + Reqs = maps:get(reqs, Opts, 20), + PoolSize = maps:get(pool_size, Opts, 4), + Port = maps:get(port, Opts, ?DEFAULT_PORT), + Scenarios = maps:get(scenarios, Opts, + [<<"/get">>, <<"/connection-close">>, <<"/maybe-close">>]), + {ok, _} = application:ensure_all_started(cowboy), + {ok, _} = application:ensure_all_started(hackney), + Dispatch = cowboy_router:compile([{'_', [{"/[...]", test_http_resource, []}]}]), + {ok, _} = cowboy:start_clear(?SERVER_REF, [{port, Port}], + #{env => #{dispatch => Dispatch}}), + ok = hackney_pool:start_pool(?POOL, [{pool_size, PoolSize}, {prewarm_count, 0}]), + try + Results = [{Path, hammer(?POOL, url(Port, Path), Workers, Reqs)} + || Path <- Scenarios], + report(Results) + after + try hackney_pool:stop_pool(?POOL) catch _:_ -> ok end, + try cowboy:stop_listener(?SERVER_REF) catch _:_ -> ok end + end. + +%% @doc Spawn `Workers' processes, each making `Reqs' sequential requests to +%% `Url' through `Pool'. Returns a summary `#{ok => N, errors => [term()]}'. +%% A worker never crashes the run: any exception is captured as an error entry. +hammer(Pool, Url, Workers, Reqs) -> + Parent = self(), + _ = [spawn(fun() -> Parent ! {worker_done, worker(Pool, Url, Reqs)} end) + || _ <- lists:seq(1, Workers)], + gather(Workers, #{ok => 0, errors => []}). + +%%==================================================================== +%% Internal +%%==================================================================== + +worker(Pool, Url, Reqs) -> + %% Generous concurrency cap and timeouts: the warm pool is small on purpose, + %% but the per-host concurrency cap and checkout/recv timeouts must not be + %% what makes a request fail - a healthy run returns ok for every request. + ReqOpts = [{pool, Pool}, + {max_per_host, 256}, + {checkout_timeout, 30000}, + {connect_timeout, 30000}, + {recv_timeout, 30000}], + lists:foldl( + fun(_, Acc) -> + Res = try hackney:request(get, Url, [], <<>>, ReqOpts) + catch Class:Reason -> {caught, Class, Reason} end, + classify(Res, Acc) + end, #{ok => 0, errors => []}, lists:seq(1, Reqs)). + +classify({ok, Status, _H, _B}, Acc) when Status >= 200, Status < 300 -> + bump_ok(Acc); +classify(Other, Acc) -> + add_err(Acc, Other). + +gather(0, Acc) -> + Acc; +gather(N, Acc) -> + receive + {worker_done, R} -> gather(N - 1, merge_summary(Acc, R)) + after 120000 -> + add_err(Acc, {timeout_waiting_for_workers, N}) + end. + +bump_ok(#{ok := N} = A) -> A#{ok := N + 1}. + +add_err(#{errors := Es} = A, E) -> A#{errors := [E | Es]}. + +merge_summary(#{ok := O1, errors := E1}, #{ok := O2, errors := E2}) -> + #{ok => O1 + O2, errors => E1 ++ E2}. + +url(Port, Path) -> + iolist_to_binary([<<"http://127.0.0.1:">>, integer_to_list(Port), Path]). + +report(Results) -> + Total = lists:sum([summary_total(S) || {_, S} <- Results]), + Errors = lists:append([[{Path, E} || E <- maps:get(errors, S)] + || {Path, S} <- Results]), + io:format("stress: ~p requests across ~p scenarios, ~p errors~n", + [Total, length(Results), length(Errors)]), + case Errors of + [] -> ok; + _ -> {error, lists:sublist(Errors, 20)} + end. + +summary_total(#{ok := Ok, errors := Es}) -> Ok + length(Es). diff --git a/test/test_http_resource.erl b/test/test_http_resource.erl index 82ff7a30..8ff0e840 100644 --- a/test/test_http_resource.erl +++ b/test/test_http_resource.erl @@ -130,6 +130,17 @@ handle_request(<<"GET">>, <<"/connection-close">>, Req, State) -> }, <<"{\"connection\": \"close\"}">>, Req), {ok, Req2, State}; +%% GET /maybe-close - randomly close the connection (~50%), to stress the pool +%% with a mix of keep-alive and Connection: close responses on the same URI. +handle_request(<<"GET">>, <<"/maybe-close">>, Req, State) -> + Headers = case rand:uniform(2) of + 1 -> #{<<"content-type">> => <<"application/json">>, + <<"connection">> => <<"close">>}; + 2 -> #{<<"content-type">> => <<"application/json">>} + end, + Req2 = cowboy_req:reply(200, Headers, <<"{\"maybe\": \"close\"}">>, Req), + {ok, Req2, State}; + %% GET /connection-close/:size - return large body with Connection: close %% For testing issue #439 - responses with connection close sometimes lost handle_request(<<"GET">>, <<"/connection-close/", SizeBin/binary>>, Req, State) ->