From a9e41612dc497a4358f5e18d260b6ef4f5e1771b Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Mon, 27 Apr 2026 21:53:37 -0400 Subject: [PATCH] Fix cluster index and process cleanup Previously, as described in #5980 we didn't perform a thorough index cleanup when ddocs changed. We only cleaned up on nodes where the design docs were located. That was true for a n=3 db and an n=3, db but may not be true in general in a cluster. To fix the issue, run a small gen_server responsible performing cluster index cleanup. To avoid spawning Q*N jobs, deduplicate the requests by delaying for up to 30 seconds per clustered db. For cleanup reuse and call the already existing fabric index file cleanup machinery. That accomplishes two things: - Starts a quicker index file cleanup. Previously we only did this during smoosh compaction runs. The view files could linger for a while until compaction in smoosh would be triggered. - Cleaning search index files also stops indexes on their (Java) side, so index file clean-up does "double duty" so speak when it comes to index shut down. Fix https://github.com/apache/couchdb/issues/5980 --- rel/overlay/etc/default.ini | 4 + src/couch/src/couch_secondary_sup.erl | 3 +- src/couch_index/src/couch_index_cleanup.erl | 98 +++++++ src/couch_index/src/couch_index_server.erl | 68 ++--- .../eunit/couch_index_ddoc_updated_tests.erl | 178 ------------- src/couch_mrview/src/couch_mrview_cleanup.erl | 40 ++- src/couch_mrview/src/couch_mrview_util.erl | 12 +- .../test/eunit/couch_mrview_cleanup_tests.erl | 252 ++++++++++++++++++ .../test/eunit/couch_mrview_util_tests.erl | 6 +- 9 files changed, 427 insertions(+), 234 deletions(-) create mode 100644 src/couch_index/src/couch_index_cleanup.erl delete mode 100644 src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl create mode 100644 src/couch_mrview/test/eunit/couch_mrview_cleanup_tests.erl diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 17b5eee48fe..79026dea35a 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -158,6 +158,10 @@ view_index_dir = {{view_index_dir}} ; ;time_seq_min_time = 1754006400 +; Clustered index cleanup deduplication hold-off. How long to wait before +; running clean up per clustered db. +;index_cleanup_delay_msec = 30000 + [bt_engine_cache] ; Memory used for btree engine cache. This is a cache for top levels of ; database btrees (id tree, seq tree) and a few terms from the db header. Value diff --git a/src/couch/src/couch_secondary_sup.erl b/src/couch/src/couch_secondary_sup.erl index 766235d5d03..7ee9cab7848 100644 --- a/src/couch/src/couch_secondary_sup.erl +++ b/src/couch/src/couch_secondary_sup.erl @@ -28,7 +28,8 @@ init([]) -> {query_servers, {couch_proc_manager, start_link, []}}, {vhosts, {couch_httpd_vhost, start_link, []}}, {uuids, {couch_uuids, start, []}}, - {disk_manager, {couch_disk_monitor, start_link, []}} + {disk_manager, {couch_disk_monitor, start_link, []}}, + {couch_index_cleanup, {couch_index_cleanup, start_link, []}} ] ++ couch_index_servers(), MaybeHttp = diff --git a/src/couch_index/src/couch_index_cleanup.erl b/src/couch_index/src/couch_index_cleanup.erl new file mode 100644 index 00000000000..be37c565479 --- /dev/null +++ b/src/couch_index/src/couch_index_cleanup.erl @@ -0,0 +1,98 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_index_cleanup). +-behaviour(gen_server). + +-export([ + start_link/0, + schedule/1 +]). + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2 +]). + +-export([ + handle_db_event/3 +]). + +-define(DEFAULT_DELAY_MSEC, 30000). + +-record(st, { + pending = #{} :: #{binary() => reference()} +}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +schedule(DbName) when is_binary(DbName) -> + gen_server:cast(?MODULE, {schedule, DbName, fanout}). + +init([]) -> + {ok, _} = couch_event:link_listener(?MODULE, handle_db_event, nil, [all_dbs]), + {ok, #st{}}. + +handle_call(Msg, _From, #st{} = St) -> + {stop, {invalid_call, Msg}, {invalid_call, Msg}, St}. + +handle_cast({schedule, DbName, Mode}, #st{pending = Pending} = St) -> + case maps:is_key(DbName, Pending) of + true -> + {noreply, St}; + false -> + case Mode of + fanout -> fanout(DbName); + no_fanout -> ok + end, + TRef = erlang:send_after(delay_msec(), self(), {run_cleanup, DbName}), + {noreply, St#st{pending = Pending#{DbName => TRef}}} + end; +handle_cast(Msg, St) -> + {stop, {invalid_cast, Msg}, St}. + +handle_info({run_cleanup, DbName}, #st{pending = Pending} = St) -> + spawn(fun() -> + try + fabric:cleanup_index_files_this_node(DbName) + catch + Class:Reason:Stack -> + WArgs = [?MODULE, DbName, Class, Reason, Stack], + couch_log:warning("~p: cleanup ~s failed ~p:~p~n~p", WArgs) + end + end), + {noreply, St#st{pending = maps:remove(DbName, Pending)}}; +handle_info(Msg, St) -> + {stop, {invalid_info, Msg}, St}. + +handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, _DDocId}, St) -> + % Clustered dbs only + schedule(mem3:dbname(DbName)), + {ok, St}; +handle_db_event(_DbName, _Event, St) -> + {ok, St}. + +fanout(DbName) -> + try mem3:shards(DbName) of + Shards -> + Nodes = lists:usort([mem3:node(S) || S <- Shards]) -- [node()], + Args = {schedule, DbName, no_fanout}, + lists:foreach(fun(N) -> gen_server:cast({?MODULE, N}, Args) end, Nodes) + catch + _:_ -> ok + end. + +delay_msec() -> + config:get_integer("couchdb", "index_cleanup_delay_msec", ?DEFAULT_DELAY_MSEC). diff --git a/src/couch_index/src/couch_index_server.erl b/src/couch_index/src/couch_index_server.erl index d4593ee0d59..82ceb154128 100644 --- a/src/couch_index/src/couch_index_server.erl +++ b/src/couch_index/src/couch_index_server.erl @@ -23,6 +23,9 @@ -export([num_servers/0, server_name/1, by_sig/1, by_pid/1, by_db/1, openers/1]). -export([aggregate_queue_len/0, names/0]). +% Cluster cleanup helpers (used by couch_mrview_cleanup) +-export([shard_entries/1, shard_index_pid/2, forget_ddoc_binding/3]). + % Exported for callbacks -export([ handle_config_change/5, @@ -358,51 +361,9 @@ handle_db_event(DbName, created, St) -> handle_db_event(DbName, deleted, St) -> gen_server:cast(St#st.server_name, {reset_indexes, DbName}), {ok, St}; -handle_db_event(<<"shards/", _/binary>> = DbName, {ddoc_updated, DDocId}, St) -> - %% this handle_db_event function must not crash (or it takes down the couch_index_server) - try - DDocResult = couch_util:with_db(DbName, fun(Db) -> - couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX]) - end), - LocalShards = mem3:local_shards(mem3:dbname(DbName)), - DbShards = [mem3:name(Sh) || Sh <- LocalShards], - lists:foreach( - fun(DbShard) -> - lists:foreach( - fun({_DbShard, {_DDocId, Sig}}) -> - % check if there are other ddocs with the same Sig for the same db - SigDDocs = ets:match_object(St#st.by_db, {DbShard, {'$1', Sig}}), - if - length(SigDDocs) > 1 -> - % remove records from by_db for this DDoc - Args = [DbShard, DDocId, Sig], - gen_server:cast(St#st.server_name, {rem_from_ets, Args}); - true -> - % single DDoc with this Sig - close couch_index processes - case ets:lookup(St#st.by_sig, {DbShard, Sig}) of - [{_, IndexPid}] -> - (catch gen_server:cast( - IndexPid, {ddoc_updated, DDocResult} - )); - [] -> - [] - end - end - end, - ets:match_object(St#st.by_db, {DbShard, {DDocId, '$1'}}) - ) - end, - DbShards - ), - {ok, St} - catch - Class:Reason:Stack -> - couch_log:warning("~p: handle_db_event ~p for db ~p, reason ~p, stack ~p", [ - ?MODULE, Class, DbName, Reason, Stack - ]), - gen_server:cast(St#st.server_name, {rem_from_ets, [DbName, Reason]}), - {ok, St} - end; +handle_db_event(<<"shards/", _/binary>>, {ddoc_updated, _DDocId}, St) -> + %% Cluster dbs cleanup is handled by couch_index_cleanup + {ok, St}; handle_db_event(DbName, {ddoc_updated, DDocId}, St) -> lists:foreach( fun({_DbName, {_DDocId, Sig}}) -> @@ -437,6 +398,23 @@ by_db(Arg) -> openers(Arg) -> name("couchdb_indexes_openers", Arg). +% Return {DDocId, Sig} entries for a shard. Used by cluster cleanup +shard_entries(ShardName) when is_binary(ShardName) -> + Rows = ets:match_object(by_db(ShardName), {ShardName, '_'}), + [Entry || {_ShardName, Entry} <- Rows]. + +% Return indexer Pid for {ShardName, Sig} or not_found +shard_index_pid(ShardName, Sig) when is_binary(ShardName) -> + case ets:lookup(by_sig(ShardName), {ShardName, Sig}) of + [{_, Pid}] when is_pid(Pid) -> {ok, Pid}; + _ -> not_found + end. + +% Remove {ShardName, {DDocId, Sig}} row from by_db. The indexer process is left +% as is. This is for removing one of the ddocs pointing to the same sig +forget_ddoc_binding(ShardName, DDocId, Sig) when is_binary(ShardName) -> + gen_server:cast(server_name(ShardName), {rem_from_ets, [ShardName, DDocId, Sig]}). + name(BaseName, Arg) when is_list(Arg) -> name(BaseName, ?l2b(Arg)); name(BaseName, Arg) when is_binary(Arg) -> diff --git a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl b/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl deleted file mode 100644 index 6b7fe5a4a3f..00000000000 --- a/src/couch_index/test/eunit/couch_index_ddoc_updated_tests.erl +++ /dev/null @@ -1,178 +0,0 @@ -% Licensed under the Apache License, Version 2.0 (the "License"); you may not -% use this file except in compliance with the License. You may obtain a copy of -% the License at -% -% http://www.apache.org/licenses/LICENSE-2.0 -% -% Unless required by applicable law or agreed to in writing, software -% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -% License for the specific language governing permissions and limitations under -% the License. - --module(couch_index_ddoc_updated_tests). - --include_lib("couch/include/couch_eunit.hrl"). --include_lib("couch/include/couch_db.hrl"). - -start() -> - fake_index(), - Ctx = test_util:start_couch([mem3, fabric]), - DbName = ?tempdb(), - ok = fabric:create_db(DbName, [?ADMIN_CTX]), - {Ctx, DbName}. - -stop({Ctx, DbName}) -> - meck:unload(test_index), - ok = fabric:delete_db(DbName, [?ADMIN_CTX]), - DbDir = config:get("couchdb", "database_dir", "."), - WaitFun = fun() -> - filelib:fold_files( - DbDir, - <<".*", DbName/binary, "\.[0-9]+.*">>, - true, - fun(_F, _A) -> wait end, - ok - ) - end, - ok = test_util:wait(WaitFun), - test_util:stop_couch(Ctx), - ok. - -ddoc_update_test_() -> - { - "Check ddoc update actions", - { - setup, - fun start/0, - fun stop/1, - fun check_all_indexers_exit_on_ddoc_change/1 - } - }. - -check_all_indexers_exit_on_ddoc_change({_Ctx, DbName}) -> - ?_test(begin - [DbShard1 | RestDbShards] = lists:map( - fun(Sh) -> - {ok, ShardDb} = couch_db:open(mem3:name(Sh), []), - ShardDb - end, - mem3:local_shards(mem3:dbname(DbName)) - ), - - % create a DDoc on Db1 - DDocID = <<"idx_name">>, - DDocJson = couch_doc:from_json_obj( - {[ - {<<"_id">>, DDocID}, - {<<"value">>, 1} - ]} - ), - {ok, _Rev} = couch_db:update_doc(DbShard1, DDocJson, []), - {ok, DbShard} = couch_db:reopen(DbShard1), - {ok, DDoc} = couch_db:open_doc( - DbShard, DDocID, [ejson_body, ?ADMIN_CTX] - ), - DbShards = [DbShard | RestDbShards], - N = length(DbShards), - - % run couch_index process for each shard database - ok = meck:reset(test_index), - lists:foreach( - fun(ShardDb) -> - couch_index_server:get_index(test_index, ShardDb, DDoc) - end, - DbShards - ), - - IndexesBefore = get_indexes_by_ddoc(DDocID, N), - ?assertEqual(N, length(IndexesBefore)), - - AliveBefore = lists:filter(fun is_process_alive/1, IndexesBefore), - ?assertEqual(N, length(AliveBefore)), - - % update ddoc - DDocJson2 = couch_doc:from_json_obj( - {[ - {<<"_id">>, DDocID}, - {<<"value">>, 2}, - {<<"_rev">>, couch_doc:rev_to_str(DDoc#doc.revs)} - ]} - ), - {ok, _} = couch_db:update_doc(DbShard, DDocJson2, []), - - % assert that all index processes exit after ddoc updated - ok = meck:reset(test_index), - lists:foreach( - fun(I) -> - couch_index_server:handle_db_event( - couch_db:name(DbShard), - {ddoc_updated, DDocID}, - {st, "", couch_index_server:server_name(I), couch_index_server:by_sig(I), - couch_index_server:by_pid(I), couch_index_server:by_db(I), - couch_index_server:openers(I)} - ) - end, - seq() - ), - - ok = meck:wait(N, test_index, init, ['_', '_'], 5000), - IndexesAfter = get_indexes_by_ddoc(DDocID, 0), - ?assertEqual(0, length(IndexesAfter)), - - %% assert that previously running indexes are gone - AliveAfter = lists:filter(fun is_process_alive/1, IndexesBefore), - ?assertEqual(0, length(AliveAfter)), - ok - end). - -fake_index() -> - ok = meck:new([test_index], [non_strict]), - ok = meck:expect(test_index, init, fun(Db, DDoc) -> - {ok, {couch_db:name(Db), DDoc}} - end), - ok = meck:expect(test_index, open, fun(_Db, State) -> - {ok, State} - end), - ok = meck:expect(test_index, get, fun - (db_name, {DbName, _DDoc}) -> - DbName; - (idx_name, {_DbName, DDoc}) -> - DDoc#doc.id; - (signature, {_DbName, DDoc}) -> - couch_hash:md5_hash(term_to_binary(DDoc)); - (update_seq, Seq) -> - Seq - end), - ok = meck:expect(test_index, shutdown, ['_'], ok). - -get_indexes_by_ddoc(DDocID, N) -> - Indexes = test_util:wait(fun() -> - Indxs = lists:flatmap( - fun(I) -> - ets:match_object( - couch_index_server:by_db(I), {'$1', {DDocID, '$2'}} - ) - end, - seq() - ), - case length(Indxs) == N of - true -> - Indxs; - false -> - wait - end - end), - lists:foldl( - fun({DbName, {_DDocID, Sig}}, Acc) -> - case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig}) of - [{_, Pid}] -> [Pid | Acc]; - _ -> Acc - end - end, - [], - Indexes - ). - -seq() -> - lists:seq(1, couch_index_server:num_servers()). diff --git a/src/couch_mrview/src/couch_mrview_cleanup.erl b/src/couch_mrview/src/couch_mrview_cleanup.erl index e8a2833a7ca..fab449ee585 100644 --- a/src/couch_mrview/src/couch_mrview_cleanup.erl +++ b/src/couch_mrview/src/couch_mrview_cleanup.erl @@ -14,7 +14,8 @@ -export([ run/1, - cleanup/2 + cleanup/2, + cleanup_processes/2 ]). run(Db) -> @@ -23,7 +24,8 @@ run(Db) -> {ok, Db1} = couch_db:reopen(Db), Sigs = couch_mrview_util:get_signatures(Db1), ok = cleanup_purges(Db1, Sigs, Checkpoints), - ok = cleanup_indices(Sigs, Indices). + ok = cleanup_indices(Sigs, Indices), + ok = cleanup_processes(Db1, Sigs). % erpc endpoint for fabric_index_cleanup:cleanup_indexes/2 % @@ -34,7 +36,8 @@ cleanup(Dbs, #{} = Sigs) -> Indices = couch_mrview_util:get_index_files(Db), Checkpoints = couch_mrview_util:get_purge_checkpoints(Db), ok = cleanup_purges(Db, Sigs, Checkpoints), - ok = cleanup_indices(Sigs, Indices) + ok = cleanup_indices(Sigs, Indices), + ok = cleanup_processes(Db, Sigs) end, Dbs ) @@ -43,6 +46,37 @@ cleanup(Dbs, #{} = Sigs) -> ok end. +% Clean up indexer processes whose signature is no longer in the valid set. +% +cleanup_processes(ShardName, #{} = Sigs) when is_binary(ShardName) -> + Entries = couch_index_server:shard_entries(ShardName), + lists:foreach( + fun({DDocId, Sig}) -> + HexSig = couch_util:to_hex_bin(Sig), + case maps:find(HexSig, Sigs) of + {ok, ValidDDocs} -> + % Sig in use. If DDocId doesn't reference it any + % longer drop the stale by_db row + case maps:is_key(DDocId, ValidDDocs) of + true -> + ok; + false -> + couch_index_server:forget_ddoc_binding(ShardName, DDocId, Sig) + end; + error -> + case couch_index_server:shard_index_pid(ShardName, Sig) of + {ok, IndexPid} -> + (catch gen_server:cast(IndexPid, {ddoc_updated, {not_found, deleted}})); + not_found -> + ok + end + end + end, + Entries + ); +cleanup_processes(Db, #{} = Sigs) -> + cleanup_processes(couch_db:name(Db), Sigs). + cleanup_purges(Db, Sigs, Checkpoints) -> couch_index_util:cleanup_purges(Db, Sigs, Checkpoints). diff --git a/src/couch_mrview/src/couch_mrview_util.erl b/src/couch_mrview/src/couch_mrview_util.erl index 48138bf76f8..17f6db34d24 100644 --- a/src/couch_mrview/src/couch_mrview_util.erl +++ b/src/couch_mrview/src/couch_mrview_util.erl @@ -112,17 +112,19 @@ get_signatures(Db) -> DDocs1 = lists:foldl(FoldFun, [], DDocs), get_signatures_from_ddocs(DbName, DDocs1). -% From a list of design #doc{} records returns signatures map: #{Sig => true} -% This will be valid signatures of views we expect to run and build on this -% node. +% From a list of design #doc{} records returns the map +% #{Sig => #{DDocId => true}}. The keys are the valid sig of views +% and inner maps are ddocs referencing those sigs (we can have multiple +% ddocs referencing the same sig). get_signatures_from_ddocs(DbName, DDocs) when is_list(DDocs) -> - FoldFun = fun(#doc{} = Doc, Acc) -> + FoldFun = fun(#doc{id = DDocId} = Doc, Acc) -> try ddoc_to_mrst(DbName, Doc) of {ok, Mrst} -> case couch_mrview_util:mrst_has_valid_views(Mrst) of true -> Sig = couch_util:to_hex_bin(Mrst#mrst.sig), - Acc#{Sig => true}; + Inner = maps:get(Sig, Acc, #{}), + Acc#{Sig => Inner#{DDocId => true}}; false -> Acc end diff --git a/src/couch_mrview/test/eunit/couch_mrview_cleanup_tests.erl b/src/couch_mrview/test/eunit/couch_mrview_cleanup_tests.erl new file mode 100644 index 00000000000..52aee133290 --- /dev/null +++ b/src/couch_mrview/test/eunit/couch_mrview_cleanup_tests.erl @@ -0,0 +1,252 @@ +% Licensed under the Apache License, Version 2.0 (the "License"); you may not +% use this file except in compliance with the License. You may obtain a copy of +% the License at +% +% http://www.apache.org/licenses/LICENSE-2.0 +% +% Unless required by applicable law or agreed to in writing, software +% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +% License for the specific language governing permissions and limitations under +% the License. + +-module(couch_mrview_cleanup_tests). + +-include_lib("couch/include/couch_eunit.hrl"). +-include_lib("couch/include/couch_db.hrl"). + +-define(TEST_INDEX, test_index). +-define(DDOC_ID, <<"idx_name">>). + +start() -> + fake_index(), + Ctx = test_util:start_couch([mem3, fabric]), + config:set("couchdb", "index_cleanup_delay_msec", "60000", false), + DbName = ?tempdb(), + ok = fabric:create_db(DbName, [?ADMIN_CTX]), + {Ctx, DbName}. + +stop({Ctx, DbName}) -> + meck:unload(?TEST_INDEX), + ok = fabric:delete_db(DbName, [?ADMIN_CTX]), + DbDir = config:get("couchdb", "database_dir", "."), + WaitFun = fun() -> + filelib:fold_files( + DbDir, + <<".*", DbName/binary, "\.[0-9]+.*">>, + true, + fun(_F, _A) -> wait end, + ok + ) + end, + ok = test_util:wait(WaitFun), + config:delete("couchdb", "index_cleanup_delay_msec", false), + test_util:stop_couch(Ctx), + ok. + +cleanup_test_() -> + { + "couch_mrview_cleanup", + { + foreach, + fun start/0, + fun stop/1, + [ + ?TDEF_FE(t_orphan_sigs_are_reaped), + ?TDEF_FE(t_valid_sigs_survive), + ?TDEF_FE(t_shared_sig_drops_stale_ddoc_row), + ?TDEF_FE(t_schedule_dedupes_within_window) + ] + } + }. + +% Sig is now invalid, reap it! +t_orphan_sigs_are_reaped({_Ctx, DbName}) -> + [DbShard1 | RestDbShards] = open_shards(DbName), + {DDoc, DbShard} = create_ddoc(DbShard1, ?DDOC_ID), + DbShards = [DbShard | RestDbShards], + N = length(DbShards), + spawn_indexers(DbShards, DDoc), + IndexesBefore = get_indexes_by_ddoc(?DDOC_ID, N), + ?assertEqual(N, length(IndexesBefore)), + + ok = meck:reset(?TEST_INDEX), + lists:foreach( + fun(DbShardName) -> + couch_mrview_cleanup:cleanup_processes(DbShardName, #{}) + end, + [couch_db:name(S) || S <- DbShards] + ), + + wait_until_dead(IndexesBefore), + ?assertEqual(0, length(get_indexes_by_ddoc(?DDOC_ID, 0))), + ?assertEqual(0, length(lists:filter(fun is_process_alive/1, IndexesBefore))). + +% Sig is valid, leave it alone +t_valid_sigs_survive({_Ctx, DbName}) -> + [DbShard1 | RestDbShards] = open_shards(DbName), + {DDoc, DbShard} = create_ddoc(DbShard1, ?DDOC_ID), + DbShards = [DbShard | RestDbShards], + N = length(DbShards), + spawn_indexers(DbShards, DDoc), + IndexesBefore = get_indexes_by_ddoc(?DDOC_ID, N), + ?assertEqual(N, length(IndexesBefore)), + + % All test indexes share the same sig (as mocked) + [{_, {_, RawSig}} | _] = ets:match_object( + couch_index_server:by_db(couch_db:name(DbShard)), + {couch_db:name(DbShard), {?DDOC_ID, '$1'}} + ), + ValidSigs = #{couch_util:to_hex_bin(RawSig) => #{?DDOC_ID => true}}, + + lists:foreach( + fun(DbShardName) -> + couch_mrview_cleanup:cleanup_processes(DbShardName, ValidSigs) + end, + [couch_db:name(S) || S <- DbShards] + ), + + timer:sleep(100), + ?assertEqual(N, length(get_indexes_by_ddoc(?DDOC_ID, N))), + ?assertEqual(N, length(lists:filter(fun is_process_alive/1, IndexesBefore))). + +% Two ddocs share the same sig. Indexer has to stay alive old ddoc removed +t_shared_sig_drops_stale_ddoc_row({_Ctx, DbName}) -> + [DbShard1 | RestDbShards] = open_shards(DbName), + {DDoc, DbShard} = create_ddoc(DbShard1, ?DDOC_ID), + DbShards = [DbShard | RestDbShards], + N = length(DbShards), + spawn_indexers(DbShards, DDoc), + IndexesBefore = get_indexes_by_ddoc(?DDOC_ID, N), + ?assertEqual(N, length(IndexesBefore)), + + [{_, {_, RawSig}} | _] = ets:match_object( + couch_index_server:by_db(couch_db:name(DbShard)), + {couch_db:name(DbShard), {?DDOC_ID, '$1'}} + ), + OtherDDocId = <<"some_other_ddoc">>, + ValidSigs = #{couch_util:to_hex_bin(RawSig) => #{OtherDDocId => true}}, + + lists:foreach( + fun(DbShardName) -> + couch_mrview_cleanup:cleanup_processes(DbShardName, ValidSigs) + end, + [couch_db:name(S) || S <- DbShards] + ), + + test_util:wait(fun() -> + Stale = lists:flatmap( + fun(I) -> + ets:match_object( + couch_index_server:by_db(I), {'$1', {?DDOC_ID, '$2'}} + ) + end, + seq() + ), + case Stale of + [] -> ok; + _ -> wait + end + end), + ?assertEqual(N, length(lists:filter(fun is_process_alive/1, IndexesBefore))). + +% Three schedule calls should dedup +t_schedule_dedupes_within_window({_Ctx, DbName}) -> + ClusteredDbName = mem3:dbname(DbName), + ok = couch_index_cleanup:schedule(ClusteredDbName), + ok = couch_index_cleanup:schedule(ClusteredDbName), + ok = couch_index_cleanup:schedule(ClusteredDbName), + ?assertEqual([ClusteredDbName], pending_dbnames()). + +% Helpers. Some copied from other tests + +open_shards(DbName) -> + lists:map( + fun(Sh) -> + {ok, ShardDb} = couch_db:open(mem3:name(Sh), []), + ShardDb + end, + mem3:local_shards(mem3:dbname(DbName)) + ). + +create_ddoc(Db, DDocID) -> + DDocJson = couch_doc:from_json_obj( + {[ + {<<"_id">>, DDocID}, + {<<"value">>, 1} + ]} + ), + {ok, _Rev} = couch_db:update_doc(Db, DDocJson, []), + {ok, Db1} = couch_db:reopen(Db), + {ok, DDoc} = couch_db:open_doc(Db1, DDocID, [ejson_body, ?ADMIN_CTX]), + {DDoc, Db1}. + +spawn_indexers(DbShards, DDoc) -> + ok = meck:reset(?TEST_INDEX), + lists:foreach( + fun(ShardDb) -> + couch_index_server:get_index(?TEST_INDEX, ShardDb, DDoc) + end, + DbShards + ). + +fake_index() -> + ok = meck:new([?TEST_INDEX], [non_strict]), + ok = meck:expect(?TEST_INDEX, init, fun(Db, DDoc) -> + {ok, {couch_db:name(Db), DDoc}} + end), + ok = meck:expect(?TEST_INDEX, open, fun(_Db, State) -> + {ok, State} + end), + ok = meck:expect(?TEST_INDEX, get, fun + (db_name, {DbName, _DDoc}) -> + DbName; + (idx_name, {_DbName, DDoc}) -> + DDoc#doc.id; + (signature, {_DbName, DDoc}) -> + couch_hash:md5_hash(term_to_binary(DDoc)); + (update_seq, Seq) -> + Seq + end), + ok = meck:expect(?TEST_INDEX, shutdown, ['_'], ok). + +get_indexes_by_ddoc(DDocID, N) -> + Indexes = test_util:wait(fun() -> + Indxs = lists:flatmap( + fun(I) -> + ets:match_object( + couch_index_server:by_db(I), {'$1', {DDocID, '$2'}} + ) + end, + seq() + ), + case length(Indxs) == N of + true -> Indxs; + false -> wait + end + end), + lists:foldl( + fun({DbName, {_DDocID, Sig}}, Acc) -> + case ets:lookup(couch_index_server:by_sig(DbName), {DbName, Sig}) of + [{_, Pid}] -> [Pid | Acc]; + _ -> Acc + end + end, + [], + Indexes + ). + +wait_until_dead(Pids) -> + test_util:wait(fun() -> + case lists:filter(fun is_process_alive/1, Pids) of + [] -> ok; + _ -> wait + end + end). + +pending_dbnames() -> + {st, Pending} = sys:get_state(couch_index_cleanup), + lists:sort(maps:keys(Pending)). + +seq() -> + lists:seq(1, couch_index_server:num_servers()). diff --git a/src/couch_mrview/test/eunit/couch_mrview_util_tests.erl b/src/couch_mrview/test/eunit/couch_mrview_util_tests.erl index c304dcdad39..75614b72803 100644 --- a/src/couch_mrview/test/eunit/couch_mrview_util_tests.erl +++ b/src/couch_mrview/test/eunit/couch_mrview_util_tests.erl @@ -98,7 +98,8 @@ t_get_signatures_local({_, Db}) -> Sigs = couch_mrview_util:get_signatures(DbName), ?assert(is_map(Sigs)), ?assertEqual(1, map_size(Sigs)), - [{Sig, true}] = maps:to_list(Sigs), + [{Sig, DDocs}] = maps:to_list(Sigs), + ?assertEqual(#{?DDOC_ID => true}, DDocs), {ok, Info} = couch_mrview:get_info(Db, ?DDOC_ID), ?assertEqual(proplists:get_value(signature, Info), Sig), @@ -115,7 +116,8 @@ t_get_signatures_clustered({DbName, _Db}) -> ?assertEqual(Sigs, couch_mrview_util:get_signatures(ShardName2)), ?assert(is_map(Sigs)), ?assertEqual(1, map_size(Sigs)), - [{Sig, true}] = maps:to_list(Sigs), + [{Sig, DDocs}] = maps:to_list(Sigs), + ?assertEqual(#{?DDOC_ID => true}, DDocs), {ok, Info} = couch_mrview:get_info(ShardName1, ?DDOC_ID), ?assertEqual(proplists:get_value(signature, Info), Sig),