blob: bda9039ba06a7ea8f64a49e4c6aff3af685ff19c [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric_doc_purge).
-export([
go/3
]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-record(acc, {
worker_uuids,
resps,
uuid_counts,
w
}).
go(_, [], _) ->
{ok, []};
go(DbName, IdsRevs, Options) ->
% Generate our purge requests of {UUID, DocId, Revs}
{UUIDs, Reqs} = create_reqs(IdsRevs, [], []),
% Fire off rexi workers for each shard.
{Workers, WorkerUUIDs} = dict:fold(fun(Shard, ShardReqs, {Ws, WUUIDs}) ->
#shard{name = ShardDbName, node = Node} = Shard,
Args = [ShardDbName, ShardReqs, Options],
Ref = rexi:cast(Node, {fabric_rpc, purge_docs, Args}),
Worker = Shard#shard{ref=Ref},
ShardUUIDs = [UUID || {UUID, _Id, _Revs} <- ShardReqs],
{[Worker | Ws], [{Worker, ShardUUIDs} | WUUIDs]}
end, {[], []}, group_reqs_by_shard(DbName, Reqs)),
UUIDCounts = lists:foldl(fun({_Worker, WUUIDs}, CountAcc) ->
lists:foldl(fun(UUID, InnerCountAcc) ->
dict:update_counter(UUID, 1, InnerCountAcc)
end, CountAcc, WUUIDs)
end, dict:new(), WorkerUUIDs),
RexiMon = fabric_util:create_monitors(Workers),
Timeout = fabric_util:request_timeout(),
Acc0 = #acc{
worker_uuids = WorkerUUIDs,
resps = dict:from_list([{UUID, []} || UUID <- UUIDs]),
uuid_counts = UUIDCounts,
w = w(DbName, Options)
},
Acc2 = try rexi_utils:recv(Workers, #shard.ref,
fun handle_message/3, Acc0, infinity, Timeout) of
{ok, Acc1} ->
Acc1;
{timeout, Acc1} ->
#acc{
worker_uuids = WorkerUUIDs,
resps = Resps
} = Acc1,
DefunctWorkers = [Worker || {Worker, _} <- WorkerUUIDs],
fabric_util:log_timeout(DefunctWorkers, "purge_docs"),
NewResps = append_errors(timeout, WorkerUUIDs, Resps),
Acc1#acc{worker_uuids = [], resps = NewResps};
Else ->
Else
after
rexi_monitor:stop(RexiMon)
end,
FinalResps = format_resps(UUIDs, Acc2),
{resp_health(FinalResps), FinalResps}.
handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
#acc{
worker_uuids = WorkerUUIDs,
resps = Resps
} = Acc,
Pred = fun({#shard{node = N}, _}) -> N == Node end,
{Failed, Rest} = lists:partition(Pred, WorkerUUIDs),
NewResps = append_errors(internal_server_error, Failed, Resps),
maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
handle_message({rexi_EXIT, _}, Worker, Acc) ->
#acc{
worker_uuids = WorkerUUIDs,
resps = Resps
} = Acc,
{value, WorkerPair, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
NewResps = append_errors(internal_server_error, [WorkerPair], Resps),
maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
handle_message({ok, Replies}, Worker, Acc) ->
#acc{
worker_uuids = WorkerUUIDs,
resps = Resps
} = Acc,
{value, {_W, UUIDs}, Rest} = lists:keytake(Worker, 1, WorkerUUIDs),
NewResps = append_resps(UUIDs, Replies, Resps),
maybe_stop(Acc#acc{worker_uuids = Rest, resps = NewResps});
handle_message({bad_request, Msg}, _, _) ->
throw({bad_request, Msg}).
create_reqs([], UUIDs, Reqs) ->
{lists:reverse(UUIDs), lists:reverse(Reqs)};
create_reqs([{Id, Revs} | RestIdsRevs], UUIDs, Reqs) ->
UUID = couch_uuids:new(),
NewUUIDs = [UUID | UUIDs],
NewReqs = [{UUID, Id, Revs} | Reqs],
create_reqs(RestIdsRevs, NewUUIDs, NewReqs).
group_reqs_by_shard(DbName, Reqs) ->
lists:foldl(fun({_UUID, Id, _Revs} = Req, D0) ->
lists:foldl(fun(Shard, D1) ->
dict:append(Shard, Req, D1)
end, D0, mem3:shards(DbName, Id))
end, dict:new(), Reqs).
w(DbName, Options) ->
try
list_to_integer(couch_util:get_value(w, Options))
catch _:_ ->
mem3:quorum(DbName)
end.
append_errors(Type, WorkerUUIDs, Resps) ->
lists:foldl(fun({_Worker, UUIDs}, RespAcc) ->
Errors = [{error, Type} || _UUID <- UUIDs],
append_resps(UUIDs, Errors, RespAcc)
end, Resps, WorkerUUIDs).
append_resps([], [], Resps) ->
Resps;
append_resps([UUID | RestUUIDs], [Reply | RestReplies], Resps) ->
NewResps = dict:append(UUID, Reply, Resps),
append_resps(RestUUIDs, RestReplies, NewResps).
maybe_stop(#acc{worker_uuids = []} = Acc) ->
{stop, Acc};
maybe_stop(#acc{resps = Resps, uuid_counts = Counts, w = W} = Acc) ->
try
dict:fold(fun(UUID, UUIDResps, _) ->
UUIDCount = dict:fetch(UUID, Counts),
case has_quorum(UUIDResps, UUIDCount, W) of
true -> ok;
false -> throw(keep_going)
end
end, nil, Resps),
{stop, Acc}
catch throw:keep_going ->
{ok, Acc}
end.
format_resps(UUIDs, #acc{} = Acc) ->
#acc{
resps = Resps,
w = W
} = Acc,
FoldFun = fun(UUID, Replies, ReplyAcc) ->
OkReplies = [Reply || {ok, Reply} <- Replies],
case OkReplies of
[] ->
[Error | _] = lists:usort(Replies),
[{UUID, Error} | ReplyAcc];
_ ->
AllRevs = lists:usort(lists:flatten(OkReplies)),
IsOk = length(OkReplies) >= W
andalso length(lists:usort(OkReplies)) == 1,
Health = if IsOk -> ok; true -> accepted end,
[{UUID, {Health, AllRevs}} | ReplyAcc]
end
end,
FinalReplies = dict:fold(FoldFun, [], Resps),
couch_util:reorder_results(UUIDs, FinalReplies);
format_resps(_UUIDs, Else) ->
Else.
resp_health(Resps) ->
Healths = lists:usort([H || {H, _} <- Resps]),
HasError = lists:member(error, Healths),
HasAccepted = lists:member(accepted, Healths),
AllOk = Healths == [ok],
if
HasError -> error;
HasAccepted -> accepted;
AllOk -> ok;
true -> error
end.
has_quorum(Resps, Count, W) ->
OkResps = [R || {ok, _} = R <- Resps],
OkCounts = lists:foldl(fun(R, Acc) ->
orddict:update_counter(R, 1, Acc)
end, orddict:new(), OkResps),
MaxOk = lists:max([0 | element(2, lists:unzip(OkCounts))]),
if
MaxOk >= W -> true;
length(Resps) >= Count -> true;
true -> false
end.
%% -ifdef(TEST).
%% -include_lib("eunit/include/eunit.hrl").
%%
%% purge_test_() ->
%% {
%% setup,
%% fun setup/0,
%% fun teardown/1,
%% [
%% t_w2_ok(),
%% t_w3_ok(),
%%
%% t_w2_mixed_accepted(),
%% t_w3_mixed_accepted(),
%%
%% t_w2_exit1_ok(),
%% t_w2_exit2_accepted(),
%% t_w2_exit3_error(),
%%
%% t_w4_accepted(),
%%
%% t_mixed_ok_accepted(),
%% t_mixed_errors()
%% ]
%% }.
%%
%%
%% setup() ->
%% meck:new(couch_log),
%% meck:expect(couch_log, warning, fun(_, _) -> ok end),
%% meck:expect(couch_log, notice, fun(_, _) -> ok end).
%%
%%
%% teardown(_) ->
%% meck:unload().
%%
%%
%% t_w2_ok() ->
%% ?_test(begin
%% Acc0 = create_init_acc(2),
%% Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
%%
%% {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
%% ?assertEqual(2, length(Acc1#acc.worker_uuids)),
%% check_quorum(Acc1, false),
%%
%% {stop, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
%% ?assertEqual(1, length(Acc2#acc.worker_uuids)),
%% check_quorum(Acc2, true),
%%
%% Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
%% Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
%% ?assertEqual(Expect, Resps),
%% ?assertEqual(ok, resp_health(Resps))
%% end).
%%
%%
%% t_w3_ok() ->
%% ?_test(begin
%% Acc0 = create_init_acc(3),
%% Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
%%
%% {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
%% check_quorum(Acc1, false),
%%
%% {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
%% ?assertEqual(1, length(Acc2#acc.worker_uuids)),
%% check_quorum(Acc2, false),
%%
%% {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
%% ?assertEqual(0, length(Acc3#acc.worker_uuids)),
%% check_quorum(Acc3, true),
%%
%% Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
%% Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
%% ?assertEqual(Expect, Resps),
%% ?assertEqual(ok, resp_health(Resps))
%% end).
%%
%%
%% t_w2_mixed_accepted() ->
%% ?_test(begin
%% Acc0 = create_init_acc(2),
%% Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]},
%% Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]},
%%
%% {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
%% ?assertEqual(2, length(Acc1#acc.worker_uuids)),
%% check_quorum(Acc1, false),
%%
%% {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1),
%% ?assertEqual(1, length(Acc2#acc.worker_uuids)),
%% check_quorum(Acc2, false),
%%
%% {stop, Acc3} = handle_message(Msg1, worker(3, Acc0), Acc2),
%% ?assertEqual(0, length(Acc3#acc.worker_uuids)),
%% check_quorum(Acc3, true),
%%
%% Expect = [
%% {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]},
%% {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]}
%% ],
%% Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
%% ?assertEqual(Expect, Resps),
%% ?assertEqual(accepted, resp_health(Resps))
%% end).
%%
%%
%% t_w3_mixed_accepted() ->
%% ?_test(begin
%% Acc0 = create_init_acc(3),
%% Msg1 = {ok, [{ok, [{1, <<"foo1">>}]}, {ok, [{2, <<"bar1">>}]}]},
%% Msg2 = {ok, [{ok, [{1, <<"foo2">>}]}, {ok, [{2, <<"bar2">>}]}]},
%%
%% {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
%% ?assertEqual(2, length(Acc1#acc.worker_uuids)),
%% check_quorum(Acc1, false),
%%
%% {ok, Acc2} = handle_message(Msg2, worker(2, Acc0), Acc1),
%% ?assertEqual(1, length(Acc2#acc.worker_uuids)),
%% check_quorum(Acc2, false),
%%
%% {stop, Acc3} = handle_message(Msg2, worker(3, Acc0), Acc2),
%% ?assertEqual(0, length(Acc3#acc.worker_uuids)),
%% check_quorum(Acc3, true),
%%
%% Expect = [
%% {accepted, [{1, <<"foo1">>}, {1, <<"foo2">>}]},
%% {accepted, [{2, <<"bar1">>}, {2, <<"bar2">>}]}
%% ],
%% Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc2),
%% ?assertEqual(Expect, Resps),
%% ?assertEqual(accepted, resp_health(Resps))
%% end).
%%
%%
%% t_w2_exit1_ok() ->
%% ?_test(begin
%% Acc0 = create_init_acc(2),
%% Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
%% ExitMsg = {rexi_EXIT, blargh},
%%
%% {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
%% ?assertEqual(2, length(Acc1#acc.worker_uuids)),
%% check_quorum(Acc1, false),
%%
%% {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
%% ?assertEqual(1, length(Acc2#acc.worker_uuids)),
%% check_quorum(Acc2, false),
%%
%% {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
%% ?assertEqual(0, length(Acc3#acc.worker_uuids)),
%% check_quorum(Acc3, true),
%%
%% Expect = [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}],
%% Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
%% ?assertEqual(Expect, Resps),
%% ?assertEqual(ok, resp_health(Resps))
%% end).
%%
%%
%% t_w2_exit2_accepted() ->
%% ?_test(begin
%% Acc0 = create_init_acc(2),
%% Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
%% ExitMsg = {rexi_EXIT, blargh},
%%
%% {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
%% ?assertEqual(2, length(Acc1#acc.worker_uuids)),
%% check_quorum(Acc1, false),
%%
%% {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
%% ?assertEqual(1, length(Acc2#acc.worker_uuids)),
%% check_quorum(Acc2, false),
%%
%% {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2),
%% ?assertEqual(0, length(Acc3#acc.worker_uuids)),
%% check_quorum(Acc3, true),
%%
%% Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
%% Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
%% ?assertEqual(Expect, Resps),
%% ?assertEqual(accepted, resp_health(Resps))
%% end).
%%
%%
%% t_w2_exit3_error() ->
%% ?_test(begin
%% Acc0 = create_init_acc(2),
%% ExitMsg = {rexi_EXIT, blargh},
%%
%% {ok, Acc1} = handle_message(ExitMsg, worker(1, Acc0), Acc0),
%% ?assertEqual(2, length(Acc1#acc.worker_uuids)),
%% check_quorum(Acc1, false),
%%
%% {ok, Acc2} = handle_message(ExitMsg, worker(2, Acc0), Acc1),
%% ?assertEqual(1, length(Acc2#acc.worker_uuids)),
%% check_quorum(Acc2, false),
%%
%% {stop, Acc3} = handle_message(ExitMsg, worker(3, Acc0), Acc2),
%% ?assertEqual(0, length(Acc3#acc.worker_uuids)),
%% check_quorum(Acc3, true),
%%
%% Expect = [
%% {error, internal_server_error},
%% {error, internal_server_error}
%% ],
%% Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
%% ?assertEqual(Expect, Resps),
%% ?assertEqual(error, resp_health(Resps))
%% end).
%%
%%
%% t_w4_accepted() ->
%% % Make sure we return when all workers have responded
%% % rather than wait around for a timeout if a user asks
%% % for a qourum with more than the available number of
%% % shards.
%% ?_test(begin
%% Acc0 = create_init_acc(4),
%% Msg = {ok, [{ok, [{1, <<"foo">>}]}, {ok, [{2, <<"bar">>}]}]},
%%
%% {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
%% ?assertEqual(2, length(Acc1#acc.worker_uuids)),
%% check_quorum(Acc1, false),
%%
%% {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
%% ?assertEqual(1, length(Acc2#acc.worker_uuids)),
%% check_quorum(Acc2, false),
%%
%% {stop, Acc3} = handle_message(Msg, worker(3, Acc0), Acc2),
%% ?assertEqual(0, length(Acc3#acc.worker_uuids)),
%% check_quorum(Acc3, true),
%%
%% Expect = [{accepted, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
%% Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc3),
%% ?assertEqual(Expect, Resps),
%% ?assertEqual(accepted, resp_health(Resps))
%% end).
%%
%%
%% t_mixed_ok_accepted() ->
%% ?_test(begin
%% WorkerUUIDs = [
%% {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
%% {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
%% {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
%%
%% {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
%% {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
%% {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
%% ],
%%
%% Acc0 = #acc{
%% worker_uuids = WorkerUUIDs,
%% resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
%% uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
%% w = 2
%% },
%%
%% Msg1 = {ok, [{ok, [{1, <<"foo">>}]}]},
%% Msg2 = {ok, [{ok, [{2, <<"bar">>}]}]},
%% ExitMsg = {rexi_EXIT, blargh},
%%
%% {ok, Acc1} = handle_message(Msg1, worker(1, Acc0), Acc0),
%% {ok, Acc2} = handle_message(Msg1, worker(2, Acc0), Acc1),
%% {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2),
%% {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3),
%% {stop, Acc5} = handle_message(Msg2, worker(6, Acc0), Acc4),
%%
%% Expect = [{ok, [{1, <<"foo">>}]}, {accepted, [{2, <<"bar">>}]}],
%% Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5),
%% ?assertEqual(Expect, Resps),
%% ?assertEqual(accepted, resp_health(Resps))
%% end).
%%
%%
%% t_mixed_errors() ->
%% ?_test(begin
%% WorkerUUIDs = [
%% {#shard{node = a, range = [1, 2]}, [<<"uuid1">>]},
%% {#shard{node = b, range = [1, 2]}, [<<"uuid1">>]},
%% {#shard{node = c, range = [1, 2]}, [<<"uuid1">>]},
%%
%% {#shard{node = a, range = [3, 4]}, [<<"uuid2">>]},
%% {#shard{node = b, range = [3, 4]}, [<<"uuid2">>]},
%% {#shard{node = c, range = [3, 4]}, [<<"uuid2">>]}
%% ],
%%
%% Acc0 = #acc{
%% worker_uuids = WorkerUUIDs,
%% resps = dict:from_list([{<<"uuid1">>, []}, {<<"uuid2">>, []}]),
%% uuid_counts = dict:from_list([{<<"uuid1">>, 3}, {<<"uuid2">>, 3}]),
%% w = 2
%% },
%%
%% Msg = {ok, [{ok, [{1, <<"foo">>}]}]},
%% ExitMsg = {rexi_EXIT, blargh},
%%
%% {ok, Acc1} = handle_message(Msg, worker(1, Acc0), Acc0),
%% {ok, Acc2} = handle_message(Msg, worker(2, Acc0), Acc1),
%% {ok, Acc3} = handle_message(ExitMsg, worker(4, Acc0), Acc2),
%% {ok, Acc4} = handle_message(ExitMsg, worker(5, Acc0), Acc3),
%% {stop, Acc5} = handle_message(ExitMsg, worker(6, Acc0), Acc4),
%%
%% Expect = [{ok, [{1, <<"foo">>}]}, {error, internal_server_error}],
%% Resps = format_resps([<<"uuid1">>, <<"uuid2">>], Acc5),
%% ?assertEqual(Expect, Resps),
%% ?assertEqual(error, resp_health(Resps))
%% end).
%%
%%
%% create_init_acc(W) ->
%% UUID1 = <<"uuid1">>,
%% UUID2 = <<"uuid2">>,
%%
%% Nodes = [node1, node2, node3],
%% Shards = mem3_util:create_partition_map(<<"foo">>, 3, 1, Nodes),
%%
%% % Create our worker_uuids. We're relying on the fact that
%% % we're using a fake Q=1 db so we don't have to worry
%% % about any hashing here.
%% WorkerUUIDs = lists:map(fun(Shard) ->
%% {Shard#shard{ref = erlang:make_ref()}, [UUID1, UUID2]}
%% end, Shards),
%%
%% #acc{
%% worker_uuids = WorkerUUIDs,
%% resps = dict:from_list([{UUID1, []}, {UUID2, []}]),
%% uuid_counts = dict:from_list([{UUID1, 3}, {UUID2, 3}]),
%% w = W
%% }.
%%
%%
%% worker(N, #acc{worker_uuids = WorkerUUIDs}) ->
%% {Worker, _} = lists:nth(N, WorkerUUIDs),
%% Worker.
%%
%%
%% check_quorum(Acc, Expect) ->
%% dict:fold(fun(_Shard, Resps, _) ->
%% ?assertEqual(Expect, has_quorum(Resps, 3, Acc#acc.w))
%% end, nil, Acc#acc.resps).
%%
%% -endif.