blob: 58e7a5ea064486b7dd822981b859bd27cac4beba [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric_doc_purge).
-export([go/3]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
go(_, [], _) ->
{ok, []};
go(DbName, AllIdsRevs, Opts) ->
% tag each purge request with UUId
{AllUUIDs, AllUUIDsIdsRevs, DocCount} = tag_docs(AllIdsRevs),
Options = lists:delete(all_or_nothing, Opts),
% Counters -> [{Worker, UUIDs}]
{Counters, Workers} = dict:fold(fun(Shard, UUIDsIdsRevs, {Cs,Ws}) ->
UUIDs = [UUID || {UUID, _Id, _Revs} <-UUIDsIdsRevs],
#shard{name=Name, node=Node} = Shard,
Ref = rexi:cast(Node, {fabric_rpc, purge_docs, [Name, UUIDsIdsRevs, Options]}),
Worker = Shard#shard{ref=Ref},
{ [{Worker, UUIDs}|Cs], [Worker|Ws]}
end, {[], []}, group_idrevs_by_shard(DbName, AllUUIDsIdsRevs)),
RexiMon = fabric_util:create_monitors(Workers),
W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
% ResultsAcc -> {PSeqsCs, DocsDict}
% PSeqsCs -> [{Shard, PurgeSeq}]
% DocsDict -> UUID -> [{ok, PurgedRevs}]
ResultsAcc = {[], dict:new()},
Acc = {length(Workers), DocCount, list_to_integer(W), Counters, ResultsAcc},
Timeout = fabric_util:request_timeout(),
try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc, infinity, Timeout) of
{ok, {Health, PSeq, Results}} when Health =:= ok; Health =:= accepted ->
% Results-> [{UUID, {ok, Revs}}]
{Health, {PSeq, [R || R <-
couch_util:reorder_results(AllUUIDs, Results), R =/= noreply]}};
{timeout, Acc1} ->
{_, _, W1, Counters1, {PSeqsCs, DocsDict1}} = Acc1,
{DefunctWorkers, _} = lists:unzip(Counters1),
fabric_util:log_timeout(DefunctWorkers, "purge_docs"),
{Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []}, DocsDict1),
FinalPSeq = fabric_view_changes:pack_seqs(PSeqsCs),
{Health, {FinalPSeq, [R || R <-
couch_util:reorder_results(AllUUIDs, Resp), R =/= noreply]}};
Else ->
Else
after
rexi_monitor:stop(RexiMon)
end.
handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, Acc0) ->
{_, DocCount, W, Counters, {PSeqsCs, DocsDict0}} = Acc0,
{FailCounters, NewCounters} = lists:partition(fun({#shard{node=N}, _}) ->
N == NodeRef
end, Counters),
% fill DocsDict with error messages for relevant Docs
DocsDict = lists:foldl(fun({_W, Docs}, CDocsDict) ->
Replies = [{error, internal_server_error} || _D <- Docs],
append_update_replies(Docs, Replies, CDocsDict)
end, DocsDict0, FailCounters),
Results = {PSeqsCs, DocsDict},
skip_message({length(NewCounters), DocCount, W, NewCounters, Results});
handle_message({rexi_EXIT, _}, Worker, Acc0) ->
{WC, DocCount , W, Counters, {PSeqsCs, DocsDict0}} = Acc0,
% fill DocsDict with error messages for relevant Docs
{value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
Replies = [{error, internal_server_error} || _D <- Docs],
DocsDict = append_update_replies(Docs, Replies, DocsDict0),
skip_message({WC-1, DocCount, W, NewCounters, {PSeqsCs, DocsDict}});
handle_message({ok, {PSeq, Replies0}}, Worker, Acc0) ->
{WCount, DocCount, W, Counters, {PSeqsCs0, DocsDict0}} = Acc0,
{value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
DocsDict = append_update_replies(Docs, Replies0, DocsDict0),
PSeqsCs = [{Worker, PSeq}| PSeqsCs0],
case {WCount, dict:size(DocsDict)} of
{1, _} ->
% last message has arrived, we need to conclude things
{Health, W, Replies} = dict:fold(fun force_reply/3, {ok, W, []},
DocsDict),
FinalPSeq = fabric_view_changes:pack_seqs(PSeqsCs),
{stop, {Health, FinalPSeq, Replies}};
{_, DocCount} ->
% we've got at least one reply for each document, let's take a look
case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocsDict) of
continue ->
{ok, {WCount - 1, DocCount, W, NewCounters, {PSeqsCs, DocsDict}}};
{stop, W, Replies} ->
FinalPSeq = fabric_view_changes:pack_seqs(PSeqsCs),
{stop, {ok, FinalPSeq, Replies}}
end;
_ ->
{ok, {WCount - 1, DocCount, W, NewCounters, {PSeqsCs, DocsDict}}}
end;
handle_message({error, purged_docs_limit_exceeded}=Error, Worker, Acc0) ->
{WC, DocCount , W, Counters, {PSeqsCs, DocsDict0}} = Acc0,
% fill DocsDict with error messages for relevant Docs
{value, {_W, Docs}, NewCounters} = lists:keytake(Worker, 1, Counters),
Replies = [Error || _D <- Docs],
DocsDict = append_update_replies(Docs, Replies, DocsDict0),
skip_message({WC-1, DocCount, W, NewCounters, {PSeqsCs, DocsDict}});
handle_message({bad_request, Msg}, _, _) ->
throw({bad_request, Msg}).
tag_docs(AllIdsRevs) ->
lists:foldr(fun({Id, Revs}, {UUIDsAcc, UUIDsIdsRevsAcc, L}) ->
UUID = couch_uuids:new(),
NewUUIDsAcc = [UUID | UUIDsAcc],
NewUUIDsIdsRevsAcc = [{UUID, Id, Revs} | UUIDsIdsRevsAcc],
{NewUUIDsAcc, NewUUIDsIdsRevsAcc, L+1}
end, {[], [], 0}, AllIdsRevs).
force_reply(Doc, Replies, {Health, W, Acc}) ->
case update_quorum_met(W, Replies) of
{true, FinalReply} ->
{Health, W, [{Doc, FinalReply} | Acc]};
false ->
case [Reply || {ok, Reply} <- Replies] of
[] ->
UReplies = lists:usort(Replies),
case UReplies of
[{error, internal_server_error}] ->
{error, W, [{Doc, {error, internal_server_error}} | Acc]};
[FirstReply|[]] ->
% check if all errors are identical, if so inherit health
{Health, W, [{Doc, FirstReply} | Acc]};
_ ->
{error, W, [{Doc, UReplies} | Acc]}
end;
AcceptedReplies0 ->
NewHealth = case Health of ok -> accepted; _ -> Health end,
AcceptedReplies = lists:usort(lists:flatten(AcceptedReplies0)),
{NewHealth, W, [{Doc, {accepted, AcceptedReplies}} | Acc]}
end
end.
maybe_reply(_, _, continue) ->
% we didn't meet quorum for all docs, so we're fast-forwarding the fold
continue;
maybe_reply(Doc, Replies, {stop, W, Acc}) ->
case update_quorum_met(W, Replies) of
{true, Reply} ->
{stop, W, [{Doc, Reply} | Acc]};
false ->
continue
end.
update_quorum_met(W, Replies) ->
OkReplies = lists:foldl(fun(Reply, PrevsAcc) ->
case Reply of
{ok, PurgedRevs} -> [PurgedRevs | PrevsAcc];
_ -> PrevsAcc
end
end, [], Replies),
if length(OkReplies) < W -> false; true ->
% make a union of PurgedRevs
FinalReply = {ok, lists:usort(lists:flatten(OkReplies))},
{true, FinalReply}
end.
group_idrevs_by_shard(DbName, UUIDsIdsRevs) ->
lists:foldl(fun({_UUID, Id, _Revs} = UUIDIdRevs, D0) ->
lists:foldl(fun(Shard, D1) ->
dict:append(Shard, UUIDIdRevs, D1)
end, D0, mem3:shards(DbName, Id))
end, dict:new(), UUIDsIdsRevs).
append_update_replies([], [], DocReplyDict) ->
DocReplyDict;
append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
skip_message({0, _, W, _, {PSeqsCs, DocsDict}}) ->
{Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocsDict),
FinalPSeq = fabric_view_changes:pack_seqs(PSeqsCs),
{stop, {Health, FinalPSeq, Reply}};
skip_message(Acc0) ->
{ok, Acc0}.
% eunits
doc_purge_ok_test() ->
meck:new(couch_log),
meck:expect(couch_log, warning, fun(_,_) -> ok end),
meck:expect(couch_log, notice, fun(_,_) -> ok end),
Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
Shards =
mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
Counters = dict:to_list(
group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
DocsDict = dict:new(),
% ***test for W = 2
AccW2 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
Counters, {[], DocsDict}},
{ok, {WaitingCountW2_1,_,_,_,_} = AccW2_1} =
handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), AccW2),
?assertEqual(2, WaitingCountW2_1),
{stop, FinalReplyW2 } =
handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}},
lists:nth(2,Shards), AccW2_1),
?assertMatch(
{ok, _PSeq, Replies} when Replies ==
[{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}],
FinalReplyW2
),
% ***test for W = 3
AccW3 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
Counters, {[], DocsDict}},
{ok, {WaitingCountW3_1,_,_,_,_} = AccW3_1} =
handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), AccW3),
?assertEqual(2, WaitingCountW3_1),
{ok, {WaitingCountW3_2,_,_,_,_} = AccW3_2} =
handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}},
lists:nth(2,Shards), AccW3_1),
?assertEqual(1, WaitingCountW3_2),
{stop, FinalReplyW3 } =
handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}},
lists:nth(3,Shards), AccW3_2),
?assertMatch(
{ok, _PSeq, Replies} when Replies ==
[{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}],
FinalReplyW3
),
% *** test rexi_exit on 1 node
Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
Counters, {[], DocsDict}},
{ok, {WaitingCount1,_,_,_,_} = Acc1} =
handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), Acc0),
?assertEqual(2, WaitingCount1),
{ok, {WaitingCount2,_,_,_,_} = Acc2} =
handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
?assertEqual(1, WaitingCount2),
{stop, Reply} =
handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, lists:nth(3,Shards), Acc2),
?assertMatch(
{ok, _PSeq, Replies} when Replies ==
[{UUID1, {ok, Revs1}}, {UUID2, {ok, Revs2}}],
Reply
),
% *** test {error, purge_during_compaction_exceeded_limit} on all nodes
% *** still should return ok reply for the request
ErrPDCEL = {error, purge_during_compaction_exceeded_limit},
Acc20 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
Counters, {[], DocsDict}},
{ok, {WaitingCount21,_,_,_,_} = Acc21} =
handle_message({ok,{0,[ErrPDCEL, ErrPDCEL]}}, hd(Shards), Acc20),
?assertEqual(2, WaitingCount21),
{ok, {WaitingCount22,_,_,_,_} = Acc22} =
handle_message({ok,{0,[ErrPDCEL, ErrPDCEL]}},
lists:nth(2,Shards), Acc21),
?assertEqual(1, WaitingCount22),
{stop, Reply2 } =
handle_message({ok,{0,[ErrPDCEL, ErrPDCEL]}},
lists:nth(3,Shards), Acc22),
?assertMatch(
{ok, _PSeq, Replies2} when Replies2 ==
[{UUID1, ErrPDCEL}, {UUID2, ErrPDCEL}],
Reply2
),
% *** test {error, purged_docs_limit_exceeded} on all nodes
% *** still should return ok reply for the request
ErrPDLE = {error, purged_docs_limit_exceeded},
Acc30 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("3"),
Counters, {[], DocsDict}},
{ok, {WaitingCount31,_,_,_,_} = Acc31} =
handle_message({ok,{0,[ErrPDLE, ErrPDLE]}}, hd(Shards), Acc30),
?assertEqual(2, WaitingCount31),
{ok, {WaitingCount32,_,_,_,_} = Acc32} =
handle_message({ok,{0,[ErrPDLE, ErrPDLE]}},
lists:nth(2,Shards), Acc31),
?assertEqual(1, WaitingCount32),
{stop, Reply3 } =
handle_message({ok,{0,[ErrPDLE, ErrPDLE]}},
lists:nth(3,Shards), Acc32),
?assertMatch(
{ok, _PSeq, Replies3} when Replies3 ==
[{UUID1, ErrPDLE}, {UUID2, ErrPDLE}],
Reply3
),
meck:unload(couch_log).
doc_purge_accepted_test() ->
meck:new(couch_log),
meck:expect(couch_log, warning, fun(_,_) -> ok end),
meck:expect(couch_log, notice, fun(_,_) -> ok end),
Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
Shards =
mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
Counters = dict:to_list(
group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
DocsDict = dict:new(),
% *** test rexi_exit on 2 nodes
Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
Counters, {[], DocsDict}},
{ok, {WaitingCount1,_,_,_,_} = Acc1} =
handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), Acc0),
?assertEqual(2, WaitingCount1),
{ok, {WaitingCount2,_,_,_,_} = Acc2} =
handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
?assertEqual(1, WaitingCount2),
{stop, Reply} =
handle_message({rexi_EXIT, nil}, lists:nth(3,Shards), Acc2),
?assertMatch(
{accepted, _PSeq, Replies} when Replies ==
[{UUID1, {accepted, Revs1}}, {UUID2, {accepted, Revs2}}],
Reply
),
meck:unload(couch_log).
doc_purge_error_test() ->
meck:new(couch_log),
meck:expect(couch_log, warning, fun(_,_) -> ok end),
meck:expect(couch_log, notice, fun(_,_) -> ok end),
Revs1 = [{1, <<"rev11">>}], UUID1 = <<"3de03c5f4c2cd34cc515a9d1ea000abd">>,
UUIDIdRevs1 = {UUID1, <<"id1">>, Revs1},
Revs2 = [{1, <<"rev12">>}], UUID2 = <<"4de03c5f4c2cd34cc515a9d1ea000abc">>,
UUIDIdRevs2 = {UUID2, <<"id2">>, Revs2},
UUIDsIDdsRevs = [UUIDIdRevs1, UUIDIdRevs2],
Shards =
mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
Counters = dict:to_list(
group_idrevs_by_shard_hack(<<"foo">>, Shards, UUIDsIDdsRevs)),
DocsDict = dict:new(),
% *** test rexi_exit on all 3 nodes
Acc0 = {length(Shards), length(UUIDsIDdsRevs), list_to_integer("2"),
Counters, {[], DocsDict}},
{ok, {WaitingCount1,_,_,_,_} = Acc1} =
handle_message({rexi_EXIT, nil}, hd(Shards), Acc0),
?assertEqual(2, WaitingCount1),
{ok, {WaitingCount2,_,_,_,_} = Acc2} =
handle_message({rexi_EXIT, nil}, lists:nth(2,Shards), Acc1),
?assertEqual(1, WaitingCount2),
{stop, Reply} =
handle_message({rexi_EXIT, nil}, lists:nth(3,Shards), Acc2),
?assertMatch(
{error, _PSeq, Replies} when Replies ==
[{UUID1, {error, internal_server_error}},
{UUID2, {error, internal_server_error}}],
Reply
),
% ***test w quorum > # shards, which should fail immediately
Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]),
Counters2 = dict:to_list(
group_idrevs_by_shard_hack(<<"foo">>, Shards2, UUIDsIDdsRevs)),
AccW4 = {length(Shards2), length(UUIDsIDdsRevs), list_to_integer("2"),
Counters2, {[], DocsDict}},
Bool =
case handle_message({ok,{2,[{ok, Revs1}, {ok, Revs2}]}}, hd(Shards), AccW4) of
{stop, _Reply} ->
true;
_ -> false
end,
?assertEqual(true, Bool),
% *** test Docs with no replies should end up as {error, internal_server_error}
SA1 = #shard{node=a, range=1},
SA2 = #shard{node=a, range=2},
SB1 = #shard{node=b, range=1},
SB2 = #shard{node=b, range=2},
Counters3 = [{SA1,[UUID1]}, {SB1,[UUID1]},
{SA2,[UUID2]}, {SB2,[UUID2]}],
Acc30 = {length(Counters3), length(UUIDsIDdsRevs), 2,
Counters3, {[], DocsDict}},
{ok, Acc31} = handle_message({ok,{1,[{ok, Revs1}]}}, SA1, Acc30),
{ok, Acc32} = handle_message({rexi_EXIT, nil}, SB1, Acc31),
{ok, Acc33} = handle_message({rexi_EXIT, nil}, SA2, Acc32),
{stop, Acc34} = handle_message({rexi_EXIT, nil}, SB2, Acc33),
?assertMatch(
{error, _PSeq, Replies3} when Replies3 ==
[{UUID1, {accepted, Revs1}},
{UUID2, {error, internal_server_error}}],
Acc34
),
meck:unload(couch_log).
% needed for testing to avoid having to start the mem3 application
group_idrevs_by_shard_hack(_DbName, Shards, UUIDsIdsRevs) ->
lists:foldl(fun({UUID, _Id, _Revs}, Dict0) ->
lists:foldl(fun(Shard, Dict1) ->
dict:append(Shard, UUID, Dict1)
end, Dict0, Shards)
end, dict:new(), UUIDsIdsRevs).