blob: 4febe1329504032f7844f64babbbf59cde3cf1b2 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric_doc_update).
-export([go/3]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
go(_, [], _) ->
{ok, []};
go(DbName, AllDocs0, Opts) ->
AllDocs1 = before_doc_update(DbName, AllDocs0),
AllDocs = tag_docs(AllDocs1),
validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)),
Options = lists:delete(all_or_nothing, Opts),
GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
Docs1 = untag_docs(Docs),
Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name,Docs1,Options]}),
{Shard#shard{ref=Ref}, Docs}
end, group_docs_by_shard(DbName, AllDocs)),
{Workers, _} = lists:unzip(GroupedDocs),
RexiMon = fabric_util:create_monitors(Workers),
W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs,
dict:new()},
Timeout = fabric_util:request_timeout(),
try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of
{ok, {Health, Results}} when Health =:= ok; Health =:= accepted ->
{Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply]};
{timeout, Acc} ->
{_, _, W1, GroupedDocs1, DocReplDict} = Acc,
{DefunctWorkers, _} = lists:unzip(GroupedDocs1),
fabric_util:log_timeout(DefunctWorkers, "update_docs"),
{Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []},
DocReplDict),
{Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply]};
Else ->
Else
after
rexi_monitor:stop(RexiMon)
end.
handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, Acc0) ->
{_, LenDocs, W, GroupedDocs, DocReplyDict} = Acc0,
NewGrpDocs = [X || {#shard{node=N}, _} = X <- GroupedDocs, N =/= NodeRef],
skip_message({length(NewGrpDocs), LenDocs, W, NewGrpDocs, DocReplyDict});
handle_message({rexi_EXIT, _}, Worker, Acc0) ->
{WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0,
NewGrpDocs = lists:keydelete(Worker,1,GrpDocs),
skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict});
handle_message(internal_server_error, Worker, Acc0) ->
% happens when we fail to load validation functions in an RPC worker
{WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0,
NewGrpDocs = lists:keydelete(Worker,1,GrpDocs),
skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict});
handle_message(attachment_chunk_received, _Worker, Acc0) ->
{ok, Acc0};
handle_message({ok, Replies}, Worker, Acc0) ->
{WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0,
{value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs),
DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
case {WaitingCount, dict:size(DocReplyDict)} of
{1, _} ->
% last message has arrived, we need to conclude things
{Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []},
DocReplyDict),
{stop, {Health, Reply}};
{_, DocCount} ->
% we've got at least one reply for each document, let's take a look
case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of
continue ->
{ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}};
{stop, W, FinalReplies} ->
{stop, {ok, FinalReplies}}
end;
_ ->
{ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}}
end;
handle_message({missing_stub, Stub}, _, _) ->
throw({missing_stub, Stub});
handle_message({not_found, no_db_file} = X, Worker, Acc0) ->
{_, _, _, GroupedDocs, _} = Acc0,
Docs = couch_util:get_value(Worker, GroupedDocs),
handle_message({ok, [X || _D <- Docs]}, Worker, Acc0);
handle_message({bad_request, Msg}, _, _) ->
throw({bad_request, Msg}).
before_doc_update(DbName, Docs) ->
case {is_replicator_db(DbName), is_users_db(DbName)} of
{true, _} ->
lists:map(fun couch_replicator_manager:before_doc_update/1, Docs);
{_, true} ->
lists:map(fun couch_users_db:before_doc_update/1, Docs);
_ ->
Docs
end.
is_replicator_db(DbName) ->
ConfigName = list_to_binary(config:get("replicator", "db", "_replicator")),
DbName == ConfigName orelse path_ends_with(DbName, <<"_replicator">>).
is_users_db(DbName) ->
ConfigName = list_to_binary(config:get(
"couch_httpd_auth", "authentication_db", "_users")),
DbName == ConfigName orelse path_ends_with(DbName, <<"_users">>).
path_ends_with(Path, Suffix) ->
Suffix == lists:last(binary:split(mem3:dbname(Path), <<"/">>, [global])).
tag_docs([]) ->
[];
tag_docs([#doc{meta=Meta}=Doc | Rest]) ->
[Doc#doc{meta=[{ref, make_ref()} | Meta]} | tag_docs(Rest)].
untag_docs([]) ->
[];
untag_docs([#doc{meta=Meta}=Doc | Rest]) ->
[Doc#doc{meta=lists:keydelete(ref, 1, Meta)} | untag_docs(Rest)].
force_reply(Doc, [], {_, W, Acc}) ->
{error, W, [{Doc, {error, internal_server_error}} | Acc]};
force_reply(Doc, [FirstReply|_] = Replies, {Health, W, Acc}) ->
case update_quorum_met(W, Replies) of
{true, Reply} ->
{Health, W, [{Doc,Reply} | Acc]};
false ->
couch_log:warning("write quorum (~p) failed for ~s", [W, Doc#doc.id]),
case [Reply || {ok, Reply} <- Replies] of
[] ->
% check if all errors are identical, if so inherit health
case lists:all(fun(E) -> E =:= FirstReply end, Replies) of
true ->
CounterKey = [fabric, doc_update, errors],
couch_stats:increment_counter(CounterKey),
{Health, W, [{Doc, FirstReply} | Acc]};
false ->
CounterKey = [fabric, doc_update, mismatched_errors],
couch_stats:increment_counter(CounterKey),
{error, W, [{Doc, FirstReply} | Acc]}
end;
[AcceptedRev | _] ->
CounterKey = [fabric, doc_update, write_quorum_errors],
couch_stats:increment_counter(CounterKey),
NewHealth = case Health of ok -> accepted; _ -> Health end,
{NewHealth, W, [{Doc, {accepted,AcceptedRev}} | Acc]}
end
end.
maybe_reply(_, _, continue) ->
% we didn't meet quorum for all docs, so we're fast-forwarding the fold
continue;
maybe_reply(Doc, Replies, {stop, W, Acc}) ->
case update_quorum_met(W, Replies) of
{true, Reply} ->
{stop, W, [{Doc, Reply} | Acc]};
false ->
continue
end.
update_quorum_met(W, Replies) ->
Counters = lists:foldl(fun(R,D) -> orddict:update_counter(R,1,D) end,
orddict:new(), Replies),
GoodReplies = lists:filter(fun good_reply/1, Counters),
case lists:dropwhile(fun({_, Count}) -> Count < W end, GoodReplies) of
[] ->
false;
[{FinalReply, _} | _] ->
{true, FinalReply}
end.
good_reply({{ok, _}, _}) ->
true;
good_reply({noreply, _}) ->
true;
good_reply(_) ->
false.
-spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}].
group_docs_by_shard(DbName, Docs) ->
dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) ->
lists:foldl(fun(Shard, D1) ->
dict:append(Shard, Doc, D1)
end, D0, mem3:shards(DbName,Id))
end, dict:new(), Docs)).
append_update_replies([], [], DocReplyDict) ->
DocReplyDict;
append_update_replies([Doc|Rest], [], Dict0) ->
% icky, if replicated_changes only errors show up in result
append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
skip_message({0, _, W, _, DocReplyDict}) ->
{Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict),
{stop, {Health, Reply}};
skip_message(Acc0) ->
{ok, Acc0}.
validate_atomic_update(_, _, false) ->
ok;
validate_atomic_update(_DbName, AllDocs, true) ->
% TODO actually perform the validation. This requires some hackery, we need
% to basically extract the prep_and_validate_updates function from couch_db
% and only run that, without actually writing in case of a success.
Error = {not_implemented, <<"all_or_nothing is not supported yet">>},
PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) ->
case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end,
{{Id, {Pos, RevId}}, Error}
end, AllDocs),
throw({aborted, PreCommitFailures}).
% eunits
doc_update1_test() ->
meck:new(couch_stats),
meck:expect(couch_stats, incrememnt_counter, fun(_) -> ok end),
Doc1 = #doc{revs = {1,[<<"foo">>]}},
Doc2 = #doc{revs = {1,[<<"bar">>]}},
Docs = [Doc1],
Docs2 = [Doc2, Doc1],
Dict = dict:from_list([{Doc,[]} || Doc <- Docs]),
Dict2 = dict:from_list([{Doc,[]} || Doc <- Docs2]),
Shards =
mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs),
% test for W = 2
AccW2 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs,
Dict},
{ok,{WaitingCountW2_1,_,_,_,_}=AccW2_1} =
handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW2),
?assertEqual(WaitingCountW2_1,2),
{stop, FinalReplyW2 } =
handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW2_1),
?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW2),
% test for W = 3
AccW3 = {length(Shards), length(Docs), list_to_integer("3"), GroupedDocs,
Dict},
{ok,{WaitingCountW3_1,_,_,_,_}=AccW3_1} =
handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW3),
?assertEqual(WaitingCountW3_1,2),
{ok,{WaitingCountW3_2,_,_,_,_}=AccW3_2} =
handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW3_1),
?assertEqual(WaitingCountW3_2,1),
{stop, FinalReplyW3 } =
handle_message({ok, [{ok, Doc1}]},lists:nth(3,Shards),AccW3_2),
?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW3),
% test w quorum > # shards, which should fail immediately
Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]),
GroupedDocs2 = group_docs_by_shard_hack(<<"foo">>,Shards2,Docs),
AccW4 =
{length(Shards2), length(Docs), list_to_integer("2"), GroupedDocs2, Dict},
Bool =
case handle_message({ok, [{ok, Doc1}]},hd(Shards2),AccW4) of
{stop, _Reply} ->
true;
_ -> false
end,
?assertEqual(Bool,true),
% Docs with no replies should end up as {error, internal_server_error}
SA1 = #shard{node=a, range=1},
SB1 = #shard{node=b, range=1},
SA2 = #shard{node=a, range=2},
SB2 = #shard{node=b, range=2},
GroupedDocs3 = [{SA1,[Doc1]}, {SB1,[Doc1]}, {SA2,[Doc2]}, {SB2,[Doc2]}],
StW5_0 = {length(GroupedDocs3), length(Docs2), 2, GroupedDocs3, Dict2},
{ok, StW5_1} = handle_message({ok, [{ok, "A"}]}, SA1, StW5_0),
{ok, StW5_2} = handle_message({rexi_EXIT, nil}, SB1, StW5_1),
{ok, StW5_3} = handle_message({rexi_EXIT, nil}, SA2, StW5_2),
{stop, ReplyW5} = handle_message({rexi_EXIT, nil}, SB2, StW5_3),
?assertEqual(
{error, [{Doc1,{accepted,"A"}},{Doc2,{error,internal_server_error}}]},
ReplyW5
),
meck:unload(couch_stats).
doc_update2_test() ->
meck:new(couch_stats),
meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
Doc1 = #doc{revs = {1,[<<"foo">>]}},
Doc2 = #doc{revs = {1,[<<"bar">>]}},
Docs = [Doc2, Doc1],
Shards =
mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs),
Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs,
dict:from_list([{Doc,[]} || Doc <- Docs])},
{ok,{WaitingCount1,_,_,_,_}=Acc1} =
handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0),
?assertEqual(WaitingCount1,2),
{ok,{WaitingCount2,_,_,_,_}=Acc2} =
handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1),
?assertEqual(WaitingCount2,1),
{stop, Reply} =
handle_message({rexi_EXIT, 1},lists:nth(3,Shards),Acc2),
?assertEqual({accepted, [{Doc1,{accepted,Doc2}}, {Doc2,{accepted,Doc1}}]},
Reply),
meck:unload(couch_stats).
doc_update3_test() ->
Doc1 = #doc{revs = {1,[<<"foo">>]}},
Doc2 = #doc{revs = {1,[<<"bar">>]}},
Docs = [Doc2, Doc1],
Shards =
mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs),
Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs,
dict:from_list([{Doc,[]} || Doc <- Docs])},
{ok,{WaitingCount1,_,_,_,_}=Acc1} =
handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0),
?assertEqual(WaitingCount1,2),
{ok,{WaitingCount2,_,_,_,_}=Acc2} =
handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1),
?assertEqual(WaitingCount2,1),
{stop, Reply} =
handle_message({ok, [{ok, Doc1},{ok, Doc2}]},lists:nth(3,Shards),Acc2),
?assertEqual({ok, [{Doc1, {ok, Doc2}},{Doc2, {ok,Doc1}}]},Reply).
% needed for testing to avoid having to start the mem3 application
group_docs_by_shard_hack(_DbName, Shards, Docs) ->
dict:to_list(lists:foldl(fun(#doc{id=_Id} = Doc, D0) ->
lists:foldl(fun(Shard, D1) ->
dict:append(Shard, Doc, D1)
end, D0, Shards)
end, dict:new(), Docs)).