blob: 743ad8c7467bdb8ede224946872ff66b797b4508 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(fabric_doc_open).
-export([go/3]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-record(acc, {
dbname,
workers,
r,
state,
replies,
node_revs = [],
q_reply
}).
go(DbName, Id, Options) ->
Handler = case proplists:get_value(doc_info, Options) of
true -> get_doc_info;
full -> get_full_doc_info;
undefined -> open_doc
end,
Workers = fabric_util:submit_jobs(mem3:shards(DbName,Id), Handler,
[Id, [deleted|Options]]),
SuppressDeletedDoc = not lists:member(deleted, Options),
N = mem3:n(DbName),
R = couch_util:get_value(r, Options, integer_to_list(mem3:quorum(DbName))),
Acc0 = #acc{
dbname = DbName,
workers = Workers,
r = erlang:min(N, list_to_integer(R)),
state = r_not_met,
replies = []
},
RexiMon = fabric_util:create_monitors(Workers),
try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
{ok, #acc{}=Acc} when Handler =:= open_doc ->
Reply = handle_response(Acc),
format_reply(Reply, SuppressDeletedDoc);
{ok, #acc{state = r_not_met}} ->
{error, quorum_not_met};
{ok, #acc{q_reply = QuorumReply}} ->
format_reply(QuorumReply, SuppressDeletedDoc);
{timeout, #acc{workers=DefunctWorkers}} ->
fabric_util:log_timeout(DefunctWorkers, atom_to_list(Handler)),
{error, timeout};
Error ->
Error
after
rexi_monitor:stop(RexiMon)
end.
handle_message({rexi_DOWN, _, {_, Node}, _}, _Worker, Acc) ->
NewWorkers = [W || #shard{node=N}=W <- Acc#acc.workers, N /= Node],
case NewWorkers of
[] ->
{stop, Acc#acc{workers=[]}};
_ ->
{ok, Acc#acc{workers=NewWorkers}}
end;
handle_message({rexi_EXIT, _Reason}, Worker, Acc) ->
NewWorkers = lists:delete(Worker, Acc#acc.workers),
case NewWorkers of
[] ->
{stop, Acc#acc{workers=[]}};
_ ->
{ok, Acc#acc{workers=NewWorkers}}
end;
handle_message(Reply, Worker, Acc) ->
NewReplies = fabric_util:update_counter(Reply, 1, Acc#acc.replies),
NewNodeRevs = case Reply of
{ok, #doc{revs = {Pos, [Rev | _]}}} ->
[{Worker#shard.node, [{Pos, Rev}]} | Acc#acc.node_revs];
_ ->
Acc#acc.node_revs
end,
NewAcc = Acc#acc{replies = NewReplies, node_revs = NewNodeRevs},
case is_r_met(Acc#acc.workers, NewReplies, Acc#acc.r) of
{true, QuorumReply} ->
fabric_util:cleanup(lists:delete(Worker, Acc#acc.workers)),
{stop, NewAcc#acc{workers=[], state=r_met, q_reply=QuorumReply}};
wait_for_more ->
NewWorkers = lists:delete(Worker, Acc#acc.workers),
{ok, NewAcc#acc{workers=NewWorkers}};
no_more_workers ->
{stop, NewAcc#acc{workers=[]}}
end.
handle_response(#acc{state=r_met, replies=Replies, q_reply=QuorumReply}=Acc) ->
case {Replies, fabric_util:remove_ancestors(Replies, [])} of
{[_], [_]} ->
% Complete agreement amongst all copies
QuorumReply;
{[_|_], [{_, {QuorumReply, _}}]} ->
% Any divergent replies are ancestors of the QuorumReply,
% repair the document asynchronously
spawn(fun() -> read_repair(Acc) end),
QuorumReply;
_Else ->
% real disagreement amongst the workers, block for the repair
read_repair(Acc)
end;
handle_response(Acc) ->
read_repair(Acc).
is_r_met(Workers, Replies, R) ->
case lists:dropwhile(fun({_,{_, Count}}) -> Count < R end, Replies) of
[{_,{QuorumReply, _}} | _] ->
{true, QuorumReply};
[] when length(Workers) > 1 ->
wait_for_more;
[] ->
no_more_workers
end.
read_repair(#acc{dbname=DbName, replies=Replies, node_revs=NodeRevs}) ->
Docs = [Doc || {_, {{ok, #doc{}=Doc}, _}} <- Replies],
case Docs of
% omit local docs from read repair
[#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>} | _] ->
choose_reply(Docs);
[#doc{id=Id} | _] ->
Opts = [?ADMIN_CTX, replicated_changes, {read_repair, NodeRevs}],
Res = fabric:update_docs(DbName, Docs, Opts),
case Res of
{ok, []} ->
couch_stats:increment_counter([fabric, read_repairs, success]);
_ ->
couch_stats:increment_counter([fabric, read_repairs, failure]),
couch_log:notice("read_repair ~s ~s ~p", [DbName, Id, Res])
end,
choose_reply(Docs);
[] ->
% Try hard to return some sort of information
% to the client.
Values = [V || {_, {V, _}} <- Replies],
case lists:member({not_found, missing}, Values) of
true ->
{not_found, missing};
false when length(Values) > 0 ->
% Sort for stability in responses in
% case we have some weird condition
hd(lists:sort(Values));
false ->
{error, read_failure}
end
end.
choose_reply(Docs) ->
% Sort descending by {not deleted, rev}. This should match
% the logic of couch_doc:to_doc_info/1.
[Winner | _] = lists:sort(fun(DocA, DocB) ->
InfoA = {not DocA#doc.deleted, DocA#doc.revs},
InfoB = {not DocB#doc.deleted, DocB#doc.revs},
InfoA > InfoB
end, Docs),
{ok, Winner}.
format_reply({ok, #full_doc_info{deleted=true}}, true) ->
{not_found, deleted};
format_reply({ok, #doc{deleted=true}}, true) ->
{not_found, deleted};
format_reply(not_found, _) ->
{not_found, missing};
format_reply(Else, _) ->
Else.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
setup() ->
meck:new([
couch_log,
couch_stats,
fabric,
fabric_util,
mem3,
rexi,
rexi_monitor
], [passthrough]).
teardown(_) ->
meck:unload().
open_doc_test_() ->
{
foreach,
fun setup/0,
fun teardown/1,
[
t_is_r_met(),
t_handle_message_down(),
t_handle_message_exit(),
t_handle_message_reply(),
t_store_node_revs(),
t_read_repair(),
t_handle_response_quorum_met(),
t_get_doc_info()
]
}.
t_is_r_met() ->
?_test(begin
Workers0 = [],
Workers1 = [nil],
Workers2 = [nil, nil],
SuccessCases = [
{{true, foo}, [fabric_util:kv(foo, 2)], 2},
{{true, foo}, [fabric_util:kv(foo, 3)], 2},
{{true, foo}, [fabric_util:kv(foo, 1)], 1},
{{true, foo}, [fabric_util:kv(foo, 2), fabric_util:kv(bar, 1)], 2},
{{true, bar}, [fabric_util:kv(bar, 1), fabric_util:kv(bar, 2)], 2},
{{true, bar}, [fabric_util:kv(bar, 2), fabric_util:kv(foo, 1)], 2}
],
lists:foreach(fun({Expect, Replies, Q}) ->
?assertEqual(Expect, is_r_met(Workers0, Replies, Q))
end, SuccessCases),
WaitForMoreCases = [
{[fabric_util:kv(foo, 1)], 2},
{[fabric_util:kv(foo, 2)], 3},
{[fabric_util:kv(foo, 1), fabric_util:kv(bar, 1)], 2}
],
lists:foreach(fun({Replies, Q}) ->
?assertEqual(wait_for_more, is_r_met(Workers2, Replies, Q))
end, WaitForMoreCases),
FailureCases = [
{Workers0, [fabric_util:kv(foo, 1)], 2},
{Workers1, [fabric_util:kv(foo, 1)], 2},
{Workers1, [fabric_util:kv(foo, 1), fabric_util:kv(bar, 1)], 2},
{Workers1, [fabric_util:kv(foo, 2)], 3}
],
lists:foreach(fun({Workers, Replies, Q}) ->
?assertEqual(no_more_workers, is_r_met(Workers, Replies, Q))
end, FailureCases)
end).
t_handle_message_down() ->
Node0 = 'foo@localhost',
Node1 = 'bar@localhost',
Down0 = {rexi_DOWN, nil, {nil, Node0}, nil},
Down1 = {rexi_DOWN, nil, {nil, Node1}, nil},
Workers0 = [#shard{node=Node0} || _ <- [a, b]],
Worker1 = #shard{node=Node1},
Workers1 = Workers0 ++ [Worker1],
?_test(begin
% Stop when no more workers are left
?assertEqual(
{stop, #acc{workers=[]}},
handle_message(Down0, nil, #acc{workers=Workers0})
),
% Continue when we have more workers
?assertEqual(
{ok, #acc{workers=[Worker1]}},
handle_message(Down0, nil, #acc{workers=Workers1})
),
% A second DOWN removes the remaining workers
?assertEqual(
{stop, #acc{workers=[]}},
handle_message(Down1, nil, #acc{workers=[Worker1]})
)
end).
t_handle_message_exit() ->
Exit = {rexi_EXIT, nil},
Worker0 = #shard{ref=erlang:make_ref()},
Worker1 = #shard{ref=erlang:make_ref()},
?_test(begin
% Only removes the specified worker
?assertEqual(
{ok, #acc{workers=[Worker1]}},
handle_message(Exit, Worker0, #acc{workers=[Worker0, Worker1]})
),
?assertEqual(
{ok, #acc{workers=[Worker0]}},
handle_message(Exit, Worker1, #acc{workers=[Worker0, Worker1]})
),
% We bail if it was the last worker
?assertEqual(
{stop, #acc{workers=[]}},
handle_message(Exit, Worker0, #acc{workers=[Worker0]})
)
end).
t_handle_message_reply() ->
Worker0 = #shard{ref=erlang:make_ref()},
Worker1 = #shard{ref=erlang:make_ref()},
Worker2 = #shard{ref=erlang:make_ref()},
Workers = [Worker0, Worker1, Worker2],
Acc0 = #acc{workers=Workers, r=2, replies=[]},
?_test(begin
meck:expect(rexi, kill_all, fun(_) -> ok end),
% Test that we continue when we haven't met R yet
?assertMatch(
{ok, #acc{
workers=[Worker0, Worker1],
replies=[{foo, {foo, 1}}]
}},
handle_message(foo, Worker2, Acc0)
),
?assertMatch(
{ok, #acc{
workers=[Worker0, Worker1],
replies=[{bar, {bar, 1}}, {foo, {foo, 1}}]
}},
handle_message(bar, Worker2, Acc0#acc{
replies=[{foo, {foo, 1}}]
})
),
% Test that we don't get a quorum when R isn't met. q_reply
% isn't set and state remains unchanged and {stop, NewAcc}
% is returned. Bit subtle on the assertions here.
?assertMatch(
{stop, #acc{workers=[], replies=[{foo, {foo, 1}}]}},
handle_message(foo, Worker0, Acc0#acc{workers=[Worker0]})
),
?assertMatch(
{stop, #acc{
workers=[],
replies=[{bar, {bar, 1}}, {foo, {foo, 1}}]
}},
handle_message(bar, Worker0, Acc0#acc{
workers=[Worker0],
replies=[{foo, {foo, 1}}]
})
),
% Check that when R is met we stop with a new state and
% a q_reply.
?assertMatch(
{stop, #acc{
workers=[],
replies=[{foo, {foo, 2}}],
state=r_met,
q_reply=foo
}},
handle_message(foo, Worker1, Acc0#acc{
workers=[Worker0, Worker1],
replies=[{foo, {foo, 1}}]
})
),
?assertEqual(
{stop, #acc{
workers=[],
r=1,
replies=[{foo, {foo, 1}}],
state=r_met,
q_reply=foo
}},
handle_message(foo, Worker0, Acc0#acc{r=1})
),
?assertMatch(
{stop, #acc{
workers=[],
replies=[{bar, {bar, 1}}, {foo, {foo, 2}}],
state=r_met,
q_reply=foo
}},
handle_message(foo, Worker0, Acc0#acc{
workers=[Worker0],
replies=[{bar, {bar, 1}}, {foo, {foo, 1}}]
})
)
end).
t_store_node_revs() ->
W1 = #shard{node = w1, ref = erlang:make_ref()},
W2 = #shard{node = w2, ref = erlang:make_ref()},
W3 = #shard{node = w3, ref = erlang:make_ref()},
Foo1 = {ok, #doc{id = <<"bar">>, revs = {1, [<<"foo">>]}}},
Foo2 = {ok, #doc{id = <<"bar">>, revs = {2, [<<"foo2">>, <<"foo">>]}}},
NFM = {not_found, missing},
InitAcc = #acc{workers = [W1, W2, W3], replies = [], r = 2},
?_test(begin
meck:expect(rexi, kill_all, fun(_) -> ok end),
% Simple case
{ok, #acc{node_revs = NodeRevs1}} = handle_message(Foo1, W1, InitAcc),
?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs1),
% Make sure we only hold the head rev
{ok, #acc{node_revs = NodeRevs2}} = handle_message(Foo2, W1, InitAcc),
?assertEqual([{w1, [{2, <<"foo2">>}]}], NodeRevs2),
% Make sure we don't capture anything on error
{ok, #acc{node_revs = NodeRevs3}} = handle_message(NFM, W1, InitAcc),
?assertEqual([], NodeRevs3),
% Make sure we accumulate node revs
Acc1 = InitAcc#acc{node_revs = [{w1, [{1, <<"foo">>}]}]},
{ok, #acc{node_revs = NodeRevs4}} = handle_message(Foo2, W2, Acc1),
?assertEqual(
[{w2, [{2, <<"foo2">>}]}, {w1, [{1, <<"foo">>}]}],
NodeRevs4
),
% Make sure rexi_DOWN doesn't modify node_revs
Down = {rexi_DOWN, nil, {nil, w1}, nil},
{ok, #acc{node_revs = NodeRevs5}} = handle_message(Down, W2, Acc1),
?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs5),
% Make sure rexi_EXIT doesn't modify node_revs
Exit = {rexi_EXIT, reason},
{ok, #acc{node_revs = NodeRevs6}} = handle_message(Exit, W2, Acc1),
?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs6),
% Make sure an error doesn't remove any node revs
{ok, #acc{node_revs = NodeRevs7}} = handle_message(NFM, W2, Acc1),
?assertEqual([{w1, [{1, <<"foo">>}]}], NodeRevs7),
% Make sure we have all of our node_revs when meeting
% quorum
{ok, Acc2} = handle_message(Foo1, W1, InitAcc),
{ok, Acc3} = handle_message(Foo2, W2, Acc2),
{stop, Acc4} = handle_message(NFM, W3, Acc3),
?assertEqual(
[{w2, [{2, <<"foo2">>}]}, {w1, [{1, <<"foo">>}]}],
Acc4#acc.node_revs
)
end).
t_read_repair() ->
Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}},
Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}},
NFM = {not_found, missing},
?_test(begin
meck:expect(couch_log, notice, fun(_, _) -> ok end),
meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
% Test when we have actual doc data to repair
meck:expect(fabric, update_docs, fun(_, [_], _) -> {ok, []} end),
Acc0 = #acc{
dbname = <<"name">>,
replies = [fabric_util:kv(Foo1,1)]
},
?assertEqual(Foo1, read_repair(Acc0)),
meck:expect(fabric, update_docs, fun(_, [_, _], _) -> {ok, []} end),
Acc1 = #acc{
dbname = <<"name">>,
replies = [fabric_util:kv(Foo1,1), fabric_util:kv(Foo2,1)]
},
?assertEqual(Foo2, read_repair(Acc1)),
% Test when we have nothing but errors
Acc2 = #acc{replies=[fabric_util:kv(NFM, 1)]},
?assertEqual(NFM, read_repair(Acc2)),
Acc3 = #acc{replies=[fabric_util:kv(NFM,1), fabric_util:kv(foo,2)]},
?assertEqual(NFM, read_repair(Acc3)),
Acc4 = #acc{replies=[fabric_util:kv(foo,1), fabric_util:kv(bar,1)]},
?assertEqual(bar, read_repair(Acc4))
end).
t_handle_response_quorum_met() ->
Foo1 = {ok, #doc{revs = {1,[<<"foo">>]}}},
Foo2 = {ok, #doc{revs = {2,[<<"foo2">>,<<"foo">>]}}},
Bar1 = {ok, #doc{revs = {1,[<<"bar">>]}}},
?_test(begin
meck:expect(couch_log, notice, fun(_, _) -> ok end),
meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, []} end),
meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
BasicOkAcc = #acc{
state=r_met,
replies=[fabric_util:kv(Foo1,2)],
q_reply=Foo1
},
?assertEqual(Foo1, handle_response(BasicOkAcc)),
WithAncestorsAcc = #acc{
state=r_met,
replies=[fabric_util:kv(Foo1,1), fabric_util:kv(Foo2,2)],
q_reply=Foo2
},
?assertEqual(Foo2, handle_response(WithAncestorsAcc)),
% This also checks when the quorum isn't the most recent
% revision.
DeeperWinsAcc = #acc{
state=r_met,
replies=[fabric_util:kv(Foo1,2), fabric_util:kv(Foo2,1)],
q_reply=Foo1
},
?assertEqual(Foo2, handle_response(DeeperWinsAcc)),
% Check that we return the proper doc based on rev
% (ie, pos is equal)
BiggerRevWinsAcc = #acc{
state=r_met,
replies=[fabric_util:kv(Foo1,1), fabric_util:kv(Bar1,2)],
q_reply=Bar1
},
?assertEqual(Foo1, handle_response(BiggerRevWinsAcc))
% r_not_met is a proxy to read_repair so we rely on
% read_repair_test for those conditions.
end).
t_get_doc_info() ->
?_test(begin
meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, []} end),
meck:expect(couch_stats, increment_counter, fun(_) -> ok end),
meck:expect(fabric_util, submit_jobs, fun(_, _, _) -> ok end),
meck:expect(fabric_util, create_monitors, fun(_) -> ok end),
meck:expect(rexi_monitor, stop, fun(_) -> ok end),
meck:expect(mem3, shards, fun(_, _) -> ok end),
meck:expect(mem3, n, fun(_) -> 3 end),
meck:expect(mem3, quorum, fun(_) -> 2 end),
meck:expect(fabric_util, recv, fun(_, _, _, _) ->
{ok, #acc{state = r_not_met}}
end),
Rsp1 = fabric_doc_open:go("test", "one", [doc_info]),
?assertEqual({error, quorum_not_met}, Rsp1),
Rsp2 = fabric_doc_open:go("test", "one", [{doc_info, full}]),
?assertEqual({error, quorum_not_met}, Rsp2),
meck:expect(fabric_util, recv, fun(_, _, _, _) ->
{ok, #acc{state = r_met, q_reply = not_found}}
end),
MissingRsp1 = fabric_doc_open:go("test", "one", [doc_info]),
?assertEqual({not_found, missing}, MissingRsp1),
MissingRsp2 = fabric_doc_open:go("test", "one", [{doc_info, full}]),
?assertEqual({not_found, missing}, MissingRsp2),
meck:expect(fabric_util, recv, fun(_, _, _, _) ->
A = #doc_info{},
{ok, #acc{state = r_met, q_reply = {ok, A}}}
end),
{ok, Rec1} = fabric_doc_open:go("test", "one", [doc_info]),
?assert(is_record(Rec1, doc_info)),
meck:expect(fabric_util, recv, fun(_, _, _, _) ->
A = #full_doc_info{deleted = true},
{ok, #acc{state = r_met, q_reply = {ok, A}}}
end),
Rsp3 = fabric_doc_open:go("test", "one", [{doc_info, full}]),
?assertEqual({not_found, deleted}, Rsp3),
{ok, Rec2} = fabric_doc_open:go("test", "one", [{doc_info, full},deleted]),
?assert(is_record(Rec2, full_doc_info))
end).
-endif.