| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(fabric_doc_open_revs). |
| |
| -export([go/4]). |
| |
| -include_lib("mem3/include/mem3.hrl"). |
| -include_lib("couch/include/couch_db.hrl"). |
| |
| -record(state, { |
| dbname, |
| worker_count, |
| workers, |
| reply_count = 0, |
| reply_error_count = 0, |
| r, |
| revs, |
| latest, |
| replies = [], |
| node_revs = [], |
| repair = false |
| }). |
| |
| go(DbName, Id, Revs, Options) -> |
| Workers = fabric_util:submit_jobs( |
| mem3:shards(DbName, Id), |
| open_revs, |
| [Id, Revs, Options] |
| ), |
| R = couch_util:get_value(r, Options, integer_to_list(mem3:quorum(DbName))), |
| State = #state{ |
| dbname = DbName, |
| worker_count = length(Workers), |
| workers = Workers, |
| r = list_to_integer(R), |
| revs = Revs, |
| latest = lists:member(latest, Options), |
| replies = [] |
| }, |
| RexiMon = fabric_util:create_monitors(Workers), |
| try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, State) of |
| {ok, all_workers_died} -> |
| {error, all_workers_died}; |
| {ok, Replies} -> |
| {ok, Replies}; |
| {timeout, #state{workers = DefunctWorkers}} -> |
| fabric_util:log_timeout(DefunctWorkers, "open_revs"), |
| {error, timeout}; |
| Else -> |
| Else |
| after |
| rexi_monitor:stop(RexiMon) |
| end. |
| |
| handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _Worker, #state{workers = Workers} = State) -> |
| NewState = State#state{ |
| workers = lists:keydelete(NodeRef, #shard.node, Workers), |
| reply_error_count = State#state.reply_error_count + 1 |
| }, |
| handle_message({ok, []}, nil, NewState); |
| handle_message({rexi_EXIT, _}, Worker, #state{workers = Workers} = State) -> |
| NewState = State#state{ |
| workers = lists:delete(Worker, Workers), |
| reply_error_count = State#state.reply_error_count + 1 |
| }, |
| handle_message({ok, []}, nil, NewState); |
| handle_message({ok, RawReplies}, Worker, State) -> |
| #state{ |
| dbname = DbName, |
| reply_count = ReplyCount, |
| worker_count = WorkerCount, |
| workers = Workers, |
| replies = PrevReplies, |
| node_revs = PrevNodeRevs, |
| r = R, |
| revs = Revs, |
| latest = Latest, |
| repair = InRepair, |
| reply_error_count = ReplyErrorCount |
| } = State, |
| |
| IsTree = Revs == all orelse Latest, |
| |
| NewReplyCount = ReplyCount + 1, |
| |
| % Do not count error replies when checking quorum |
| RealReplyCount = NewReplyCount - ReplyErrorCount, |
| QuorumReplies = RealReplyCount >= R, |
| {NewReplies, QuorumMet, Repair} = |
| case IsTree of |
| true -> |
| {NewReplies0, AllInternal, Repair0} = |
| tree_replies(PrevReplies, tree_sort(RawReplies)), |
| NumLeafs = couch_key_tree:count_leafs(PrevReplies), |
| SameNumRevs = length(RawReplies) == NumLeafs, |
| QMet = AllInternal andalso SameNumRevs andalso QuorumReplies, |
| % Don't set repair=true on the first reply |
| {NewReplies0, QMet, (ReplyCount > 0) and Repair0}; |
| false -> |
| {NewReplies0, MinCount} = dict_replies(PrevReplies, RawReplies), |
| {NewReplies0, MinCount >= R, false} |
| end, |
| |
| NewNodeRevs = update_node_revs(Worker, RawReplies, PrevNodeRevs), |
| |
| Complete = (ReplyCount =:= (WorkerCount - 1)), |
| |
| case QuorumMet orelse Complete of |
| true -> |
| fabric_util:cleanup(lists:delete(Worker, Workers)), |
| maybe_read_repair( |
| DbName, |
| IsTree, |
| NewReplies, |
| NewNodeRevs, |
| NewReplyCount, |
| InRepair orelse Repair |
| ), |
| {stop, format_reply(IsTree, NewReplies, RealReplyCount)}; |
| false -> |
| {ok, State#state{ |
| replies = NewReplies, |
| node_revs = NewNodeRevs, |
| reply_count = NewReplyCount, |
| workers = lists:delete(Worker, Workers), |
| repair = InRepair orelse Repair |
| }} |
| end. |
| |
| % Update node revs from the latest replies |
| % |
| update_node_revs(nil, _, PrevNodeRevs) -> |
| % This handles the error cases when Worker is explicitly set to nil. (See |
| % rexi_DOWN and rexi_EXIT clauses). |
| PrevNodeRevs; |
| update_node_revs(Worker, RawReplies, PrevNodeRevs) -> |
| FoldFun = fun |
| ({ok, #doc{revs = {Pos, [Rev | _]}}}, Acc) -> |
| [{Pos, Rev} | Acc]; |
| (_, Acc) -> |
| Acc |
| end, |
| case lists:foldl(FoldFun, [], RawReplies) of |
| [] -> |
| PrevNodeRevs; |
| [_ | _] = IdRevs -> |
| [{Worker#shard.node, IdRevs} | PrevNodeRevs] |
| end. |
| |
| tree_replies(RevTree, []) -> |
| {RevTree, true, false}; |
| tree_replies(RevTree0, [{ok, Doc} | Rest]) -> |
| {RevTree1, Done, Repair} = tree_replies(RevTree0, Rest), |
| Path = couch_doc:to_path(Doc), |
| case couch_key_tree:merge(RevTree1, Path) of |
| {RevTree2, internal_node} -> |
| {RevTree2, Done, Repair}; |
| {RevTree2, new_leaf} -> |
| {RevTree2, Done, true}; |
| {RevTree2, _} -> |
| {RevTree2, false, true} |
| end; |
| tree_replies(RevTree0, [{{not_found, missing}, {Pos, Rev}} | Rest]) -> |
| {RevTree1, Done, Repair} = tree_replies(RevTree0, Rest), |
| Node = {Rev, ?REV_MISSING, []}, |
| Path = {Pos, Node}, |
| case couch_key_tree:merge(RevTree1, Path) of |
| {RevTree2, internal_node} -> |
| {RevTree2, Done, true}; |
| {RevTree2, _} -> |
| {RevTree2, false, Repair} |
| end. |
| |
| tree_sort(Replies) -> |
| SortFun = fun(A, B) -> sort_key(A) =< sort_key(B) end, |
| lists:sort(SortFun, Replies). |
| |
| sort_key({ok, #doc{revs = {Pos, [Rev | _]}}}) -> |
| {Pos, Rev}; |
| sort_key({{not_found, _}, {Pos, Rev}}) -> |
| {Pos, Rev}. |
| |
| dict_replies(Dict, []) -> |
| case [Count || {_Key, {_Reply, Count}} <- Dict] of |
| [] -> {Dict, 0}; |
| Counts -> {Dict, lists:min(Counts)} |
| end; |
| dict_replies(Dict, [Reply | Rest]) -> |
| NewDict = fabric_util:update_counter(Reply, 1, Dict), |
| dict_replies(NewDict, Rest). |
| |
| maybe_read_repair(Db, IsTree, Replies, NodeRevs, ReplyCount, DoRepair) -> |
| Docs = |
| case IsTree of |
| true -> tree_repair_docs(Replies, DoRepair); |
| false -> dict_repair_docs(Replies, ReplyCount) |
| end, |
| case Docs of |
| [] -> |
| ok; |
| _ -> |
| erlang:spawn(fun() -> read_repair(Db, Docs, NodeRevs) end) |
| end. |
| |
| tree_repair_docs(_Replies, false) -> |
| []; |
| tree_repair_docs(Replies, true) -> |
| Leafs = couch_key_tree:get_all_leafs(Replies), |
| [Doc || {Doc, {_Pos, _}} <- Leafs, is_record(Doc, doc)]. |
| |
| dict_repair_docs(Replies, ReplyCount) -> |
| NeedsRepair = lists:any(fun({_, {_, C}}) -> C < ReplyCount end, Replies), |
| if |
| not NeedsRepair -> []; |
| true -> [Doc || {_, {{ok, Doc}, _}} <- Replies] |
| end. |
| |
| read_repair(Db, Docs, NodeRevs) -> |
| Opts = [?ADMIN_CTX, ?REPLICATED_CHANGES, {read_repair, NodeRevs}], |
| Res = fabric:update_docs(Db, Docs, Opts), |
| case Res of |
| {ok, []} -> |
| couch_stats:increment_counter([fabric, read_repairs, success]); |
| _ -> |
| couch_stats:increment_counter([fabric, read_repairs, failure]), |
| [#doc{id = Id} | _] = Docs, |
| couch_log:notice("read_repair ~s ~s ~p", [Db, Id, Res]) |
| end. |
| |
| format_reply(_, _, RealReplyCount) when RealReplyCount =< 0 -> |
| all_workers_died; |
| format_reply(true, Replies, _) -> |
| tree_format_replies(Replies); |
| format_reply(false, Replies, _) -> |
| dict_format_replies(Replies). |
| |
| tree_format_replies(RevTree) -> |
| Leafs = couch_key_tree:get_all_leafs(RevTree), |
| lists:sort( |
| lists:map( |
| fun(Reply) -> |
| case Reply of |
| {?REV_MISSING, {Pos, [Rev]}} -> |
| {{not_found, missing}, {Pos, Rev}}; |
| {Doc, _} when is_record(Doc, doc) -> |
| {ok, Doc} |
| end |
| end, |
| Leafs |
| ) |
| ). |
| |
| dict_format_replies(Dict) -> |
| Replies0 = [Reply || {_, {Reply, _}} <- Dict], |
| |
| AllFoundRevs = lists:foldl( |
| fun(Reply, Acc) -> |
| case Reply of |
| {ok, #doc{revs = {Pos, [RevId | _]}}} -> |
| [{Pos, RevId} | Acc]; |
| _ -> |
| Acc |
| end |
| end, |
| [], |
| Replies0 |
| ), |
| |
| %% Drop any not_found replies for which we |
| %% found the revision on a different node. |
| Replies1 = lists:filter( |
| fun(Reply) -> |
| case Reply of |
| {{not_found, missing}, Rev} -> |
| not lists:member(Rev, AllFoundRevs); |
| _ -> |
| true |
| end |
| end, |
| Replies0 |
| ), |
| |
| % Remove replies with shorter revision |
| % paths for a given revision. |
| collapse_duplicate_revs(Replies1). |
| |
| collapse_duplicate_revs(Replies) -> |
| % The collapse logic requires that replies are |
| % sorted so that shorter rev paths are in |
| % the list just before longer lists. |
| % |
| % This somewhat implicitly relies on Erlang's |
| % sorting of [A, B] < [A, B, C] for all values |
| % of C. |
| collapse_duplicate_revs_int(lists:sort(Replies)). |
| |
| collapse_duplicate_revs_int([]) -> |
| []; |
| collapse_duplicate_revs_int([{ok, Doc1}, {ok, Doc2} | Rest]) -> |
| {D1, R1} = Doc1#doc.revs, |
| {D2, R2} = Doc2#doc.revs, |
| Head = |
| case D1 == D2 andalso lists:prefix(R1, R2) of |
| true -> []; |
| false -> [{ok, Doc1}] |
| end, |
| Head ++ collapse_duplicate_revs([{ok, Doc2} | Rest]); |
| collapse_duplicate_revs_int([Reply | Rest]) -> |
| [Reply | collapse_duplicate_revs(Rest)]. |
| |
| -ifdef(TEST). |
| -include_lib("couch/include/couch_eunit.hrl"). |
| |
| setup_all() -> |
| config:start_link([]), |
| meck:new([fabric, couch_stats, couch_log]), |
| meck:new(fabric_util, [passthrough]), |
| meck:expect(fabric, update_docs, fun(_, _, _) -> {ok, nil} end), |
| meck:expect(couch_stats, increment_counter, fun(_) -> ok end), |
| meck:expect(couch_log, notice, fun(_, _) -> ok end), |
| meck:expect(fabric_util, cleanup, fun(_) -> ok end). |
| |
| teardown_all(_) -> |
| meck:unload(), |
| config:stop(). |
| |
| setup() -> |
| meck:reset([ |
| couch_log, |
| couch_stats, |
| fabric, |
| fabric_util |
| ]). |
| |
| teardown(_) -> |
| ok. |
| |
| state0(Revs, Latest) -> |
| #state{ |
| worker_count = 3, |
| workers = |
| [#shard{node = 'node1'}, #shard{node = 'node2'}, #shard{node = 'node3'}], |
| r = 2, |
| revs = Revs, |
| latest = Latest |
| }. |
| |
| revs() -> [{1, <<"foo">>}, {1, <<"bar">>}, {1, <<"baz">>}]. |
| |
| foo1() -> {ok, #doc{revs = {1, [<<"foo">>]}}}. |
| foo2() -> {ok, #doc{revs = {2, [<<"foo2">>, <<"foo">>]}}}. |
| foo2stemmed() -> {ok, #doc{revs = {2, [<<"foo2">>]}}}. |
| fooNF() -> {{not_found, missing}, {1, <<"foo">>}}. |
| foo2NF() -> {{not_found, missing}, {2, <<"foo2">>}}. |
| bar1() -> {ok, #doc{revs = {1, [<<"bar">>]}}}. |
| barNF() -> {{not_found, missing}, {1, <<"bar">>}}. |
| bazNF() -> {{not_found, missing}, {1, <<"baz">>}}. |
| baz1() -> {ok, #doc{revs = {1, [<<"baz">>]}}}. |
| |
| open_doc_revs_test_() -> |
| { |
| setup, |
| fun setup_all/0, |
| fun teardown_all/1, |
| { |
| foreach, |
| fun setup/0, |
| fun teardown/1, |
| [ |
| ?TDEF_FE(check_empty_response_not_quorum), |
| ?TDEF_FE(check_basic_response), |
| ?TDEF_FE(check_finish_quorum), |
| ?TDEF_FE(check_finish_quorum_newer), |
| ?TDEF_FE(check_no_quorum_on_second), |
| ?TDEF_FE(check_done_on_third), |
| ?TDEF_FE(check_specific_revs_first_msg), |
| ?TDEF_FE(check_revs_done_on_agreement), |
| ?TDEF_FE(check_latest_true), |
| ?TDEF_FE(check_ancestor_counted_in_quorum), |
| ?TDEF_FE(check_not_found_counts_for_descendant), |
| ?TDEF_FE(check_worker_error_skipped), |
| ?TDEF_FE(check_quorum_only_counts_valid_responses), |
| ?TDEF_FE(check_empty_list_when_no_workers_reply), |
| ?TDEF_FE(check_node_rev_stored), |
| ?TDEF_FE(check_node_rev_store_head_only), |
| ?TDEF_FE(check_node_rev_store_multiple), |
| ?TDEF_FE(check_node_rev_dont_store_errors), |
| ?TDEF_FE(check_node_rev_store_non_errors), |
| ?TDEF_FE(check_node_rev_store_concatenate), |
| ?TDEF_FE(check_node_rev_store_concantenate_multiple), |
| ?TDEF_FE(check_node_rev_unmodified_on_down_or_exit), |
| ?TDEF_FE(check_not_found_replies_are_removed_when_doc_found), |
| ?TDEF_FE(check_not_found_returned_when_one_of_docs_not_found), |
| ?TDEF_FE(check_not_found_returned_when_doc_not_found), |
| ?TDEF_FE(check_longer_rev_list_returned), |
| ?TDEF_FE(check_longer_rev_list_not_combined), |
| ?TDEF_FE(check_not_found_removed_and_longer_rev_list) |
| ] |
| } |
| }. |
| |
| % Tests for revs=all |
| |
| check_empty_response_not_quorum(_) -> |
| % Simple smoke test that we don't think we're |
| % done with a first empty response |
| W1 = #shard{node = 'node1'}, |
| W2 = #shard{node = 'node2'}, |
| W3 = #shard{node = 'node3'}, |
| ?assertMatch( |
| {ok, #state{workers = [W2, W3]}}, |
| handle_message({ok, []}, W1, state0(all, false)) |
| ). |
| |
| check_basic_response(_) -> |
| % Check that we've handle a response |
| W1 = #shard{node = 'node1'}, |
| W2 = #shard{node = 'node2'}, |
| W3 = #shard{node = 'node3'}, |
| ?assertMatch( |
| {ok, #state{reply_count = 1, workers = [W2, W3]}}, |
| handle_message({ok, [foo1(), bar1()]}, W1, state0(all, false)) |
| ). |
| |
| check_finish_quorum(_) -> |
| % Two messages with the same revisions means we're done |
| W1 = #shard{node = 'node1'}, |
| W2 = #shard{node = 'node2'}, |
| S0 = state0(all, false), |
| {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), |
| Expect = {stop, [bar1(), foo1()]}, |
| ?assertEqual(Expect, handle_message({ok, [foo1(), bar1()]}, W2, S1)). |
| |
| check_finish_quorum_newer(_) -> |
| % We count a descendant of a revision for quorum so |
| % foo1 should count for foo2 which means we're finished. |
| % We also validate that read_repair was triggered. |
| W1 = #shard{node = 'node1'}, |
| W2 = #shard{node = 'node2'}, |
| S0 = state0(all, false), |
| {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), |
| Expect = {stop, [bar1(), foo2()]}, |
| ok = meck:reset(fabric), |
| ?assertEqual(Expect, handle_message({ok, [foo2(), bar1()]}, W2, S1)), |
| ok = meck:wait(fabric, update_docs, '_', 5000), |
| ?assertMatch( |
| [{_, {fabric, update_docs, [_, _, _]}, _}], |
| meck:history(fabric) |
| ). |
| |
| check_no_quorum_on_second(_) -> |
| % Quorum not yet met for the foo revision so we |
| % would wait for w3 |
| W1 = #shard{node = 'node1'}, |
| W2 = #shard{node = 'node2'}, |
| W3 = #shard{node = 'node3'}, |
| S0 = state0(all, false), |
| {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), |
| ?assertMatch( |
| {ok, #state{workers = [W3]}}, |
| handle_message({ok, [bar1()]}, W2, S1) |
| ). |
| |
| check_done_on_third(_) -> |
| % The third message of three means we're done no matter |
| % what. Every revision seen in this pattern should be |
| % included. |
| W1 = #shard{node = 'node1'}, |
| W2 = #shard{node = 'node2'}, |
| W3 = #shard{node = 'node3'}, |
| S0 = state0(all, false), |
| {ok, S1} = handle_message({ok, [foo1(), bar1()]}, W1, S0), |
| {ok, S2} = handle_message({ok, [bar1()]}, W2, S1), |
| Expect = {stop, [bar1(), foo1()]}, |
| ?assertEqual(Expect, handle_message({ok, [bar1()]}, W3, S2)). |
| |
| % Tests for a specific list of revs |
| |
| check_specific_revs_first_msg(_) -> |
| W1 = #shard{node = 'node1'}, |
| W2 = #shard{node = 'node2'}, |
| W3 = #shard{node = 'node3'}, |
| S0 = state0(revs(), false), |
| ?assertMatch( |
| {ok, #state{reply_count = 1, workers = [W2, W3]}}, |
| handle_message({ok, [foo1(), bar1(), bazNF()]}, W1, S0) |
| ). |
| |
| check_revs_done_on_agreement(_) -> |
| W1 = #shard{node = 'node1'}, |
| W2 = #shard{node = 'node2'}, |
| S0 = state0(revs(), false), |
| Msg = {ok, [foo1(), bar1(), bazNF()]}, |
| {ok, S1} = handle_message(Msg, W1, S0), |
| Expect = {stop, [bar1(), foo1(), bazNF()]}, |
| ?assertEqual(Expect, handle_message(Msg, W2, S1)). |
| |
| check_latest_true(_) -> |
| W1 = #shard{node = 'node1'}, |
| W2 = #shard{node = 'node2'}, |
| S0 = state0(revs(), true), |
| Msg1 = {ok, [foo2(), bar1(), bazNF()]}, |
| Msg2 = {ok, [foo2(), bar1(), bazNF()]}, |
| {ok, S1} = handle_message(Msg1, W1, S0), |
| Expect = {stop, [bar1(), foo2(), bazNF()]}, |
| ?assertEqual(Expect, handle_message(Msg2, W2, S1)). |
| |
| check_ancestor_counted_in_quorum(_) -> |
| W1 = #shard{node = 'node1'}, |
| W2 = #shard{node = 'node2'}, |
| S0 = state0(revs(), true), |
| Msg1 = {ok, [foo1(), bar1(), bazNF()]}, |
| Msg2 = {ok, [foo2(), bar1(), bazNF()]}, |
| Expect = {stop, [bar1(), foo2(), bazNF()]}, |
| |
| % Older first |
| {ok, S1} = handle_message(Msg1, W1, S0), |
| ?assertEqual(Expect, handle_message(Msg2, W2, S1)), |
| |
| % Newer first |
| {ok, S2} = handle_message(Msg2, W2, S0), |
| ?assertEqual(Expect, handle_message(Msg1, W1, S2)). |
| |
| check_not_found_counts_for_descendant(_) -> |
| W1 = #shard{node = 'node1'}, |
| W2 = #shard{node = 'node2'}, |
| S0 = state0(revs(), true), |
| Msg1 = {ok, [foo1(), bar1(), bazNF()]}, |
| Msg2 = {ok, [foo1(), bar1(), baz1()]}, |
| Expect = {stop, [bar1(), baz1(), foo1()]}, |
| |
| % not_found first |
| {ok, S1} = handle_message(Msg1, W1, S0), |
| ?assertEqual(Expect, handle_message(Msg2, W2, S1)), |
| |
| % not_found second |
| {ok, S2} = handle_message(Msg2, W2, S0), |
| ?assertEqual(Expect, handle_message(Msg1, W1, S2)). |
| |
| check_worker_error_skipped(_) -> |
| W1 = #shard{node = 'node1'}, |
| W2 = #shard{node = 'node2'}, |
| W3 = #shard{node = 'node3'}, |
| S0 = state0(revs(), true), |
| Msg1 = {ok, [foo1(), bar1(), baz1()]}, |
| Msg2 = {rexi_EXIT, reason}, |
| Msg3 = {ok, [foo1(), bar1(), baz1()]}, |
| Expect = {stop, [bar1(), baz1(), foo1()]}, |
| |
| {ok, S1} = handle_message(Msg1, W1, S0), |
| {ok, S2} = handle_message(Msg2, W2, S1), |
| ?assertEqual(Expect, handle_message(Msg3, W3, S2)). |
| |
| check_quorum_only_counts_valid_responses(_) -> |
| W1 = #shard{node = 'node1'}, |
| W2 = #shard{node = 'node2'}, |
| W3 = #shard{node = 'node3'}, |
| S0 = state0(revs(), true), |
| Msg1 = {rexi_EXIT, reason}, |
| Msg2 = {rexi_EXIT, reason}, |
| Msg3 = {ok, [foo1(), bar1(), baz1()]}, |
| Expect = {stop, [bar1(), baz1(), foo1()]}, |
| |
| {ok, S1} = handle_message(Msg1, W1, S0), |
| {ok, S2} = handle_message(Msg2, W2, S1), |
| ?assertEqual(Expect, handle_message(Msg3, W3, S2)). |
| |
| check_empty_list_when_no_workers_reply(_) -> |
| W1 = #shard{node = 'node1'}, |
| W2 = #shard{node = 'node2'}, |
| W3 = #shard{node = 'node3'}, |
| S0 = state0(revs(), true), |
| Msg1 = {rexi_EXIT, reason}, |
| Msg2 = {rexi_EXIT, reason}, |
| Msg3 = {rexi_DOWN, nodedown, {nil, node()}, nil}, |
| Expect = {stop, all_workers_died}, |
| |
| {ok, S1} = handle_message(Msg1, W1, S0), |
| {ok, S2} = handle_message(Msg2, W2, S1), |
| ?assertEqual(Expect, handle_message(Msg3, W3, S2)). |
| |
| check_node_rev_stored(_) -> |
| W1 = #shard{node = node1}, |
| S0 = state0([], true), |
| |
| {ok, S1} = handle_message({ok, [foo1()]}, W1, S0), |
| ?assertEqual([{node1, [{1, <<"foo">>}]}], S1#state.node_revs). |
| |
| check_node_rev_store_head_only(_) -> |
| W1 = #shard{node = node1}, |
| S0 = state0([], true), |
| |
| {ok, S1} = handle_message({ok, [foo2()]}, W1, S0), |
| ?assertEqual([{node1, [{2, <<"foo2">>}]}], S1#state.node_revs). |
| |
| check_node_rev_store_multiple(_) -> |
| W1 = #shard{node = node1}, |
| S0 = state0([], true), |
| |
| {ok, S1} = handle_message({ok, [foo1(), foo2()]}, W1, S0), |
| ?assertEqual( |
| [{node1, [{2, <<"foo2">>}, {1, <<"foo">>}]}], |
| S1#state.node_revs |
| ). |
| |
| check_node_rev_dont_store_errors(_) -> |
| W1 = #shard{node = node1}, |
| S0 = state0([], true), |
| |
| {ok, S1} = handle_message({ok, [barNF()]}, W1, S0), |
| ?assertEqual([], S1#state.node_revs). |
| |
| check_node_rev_store_non_errors(_) -> |
| W1 = #shard{node = node1}, |
| S0 = state0([], true), |
| |
| {ok, S1} = handle_message({ok, [foo1(), barNF()]}, W1, S0), |
| ?assertEqual([{node1, [{1, <<"foo">>}]}], S1#state.node_revs). |
| |
| check_node_rev_store_concatenate(_) -> |
| W2 = #shard{node = node2}, |
| S0 = state0([], true), |
| S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]}, |
| |
| {ok, S2} = handle_message({ok, [foo2()]}, W2, S1), |
| ?assertEqual( |
| [{node2, [{2, <<"foo2">>}]}, {node1, [{1, <<"foo">>}]}], |
| S2#state.node_revs |
| ). |
| |
| check_node_rev_store_concantenate_multiple(_) -> |
| W2 = #shard{node = node2}, |
| S0 = state0([], true), |
| S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]}, |
| |
| {ok, S2} = handle_message({ok, [foo2(), bar1()]}, W2, S1), |
| ?assertEqual( |
| [ |
| {node2, [{1, <<"bar">>}, {2, <<"foo2">>}]}, |
| {node1, [{1, <<"foo">>}]} |
| ], |
| S2#state.node_revs |
| ). |
| |
| check_node_rev_unmodified_on_down_or_exit(_) -> |
| W2 = #shard{node = node2}, |
| S0 = state0([], true), |
| S1 = S0#state{node_revs = [{node1, [{1, <<"foo">>}]}]}, |
| |
| Down = {rexi_DOWN, nodedown, {nil, node()}, nil}, |
| {ok, S2} = handle_message(Down, W2, S1), |
| ?assertEqual( |
| [{node1, [{1, <<"foo">>}]}], |
| S2#state.node_revs |
| ), |
| |
| Exit = {rexi_EXIT, reason}, |
| {ok, S3} = handle_message(Exit, W2, S1), |
| ?assertEqual( |
| [{node1, [{1, <<"foo">>}]}], |
| S3#state.node_revs |
| ). |
| |
| check_not_found_replies_are_removed_when_doc_found(_) -> |
| Replies = replies_to_dict([foo1(), bar1(), fooNF()]), |
| Expect = [bar1(), foo1()], |
| ?assertEqual(Expect, dict_format_replies(Replies)). |
| |
| check_not_found_returned_when_one_of_docs_not_found(_) -> |
| Replies = replies_to_dict([foo1(), foo2(), barNF()]), |
| Expect = [foo1(), foo2(), barNF()], |
| ?assertEqual(Expect, dict_format_replies(Replies)). |
| |
| check_not_found_returned_when_doc_not_found(_) -> |
| Replies = replies_to_dict([fooNF(), barNF(), bazNF()]), |
| Expect = [barNF(), bazNF(), fooNF()], |
| ?assertEqual(Expect, dict_format_replies(Replies)). |
| |
| check_longer_rev_list_returned(_) -> |
| Replies = replies_to_dict([foo2(), foo2stemmed()]), |
| Expect = [foo2()], |
| ?assertEqual(2, length(Replies)), |
| ?assertEqual(Expect, dict_format_replies(Replies)). |
| |
| check_longer_rev_list_not_combined(_) -> |
| Replies = replies_to_dict([foo2(), foo2stemmed(), bar1()]), |
| Expect = [bar1(), foo2()], |
| ?assertEqual(3, length(Replies)), |
| ?assertEqual(Expect, dict_format_replies(Replies)). |
| |
| check_not_found_removed_and_longer_rev_list(_) -> |
| Replies = replies_to_dict([foo2(), foo2stemmed(), foo2NF()]), |
| Expect = [foo2()], |
| ?assertEqual(3, length(Replies)), |
| ?assertEqual(Expect, dict_format_replies(Replies)). |
| |
| replies_to_dict(Replies) -> |
| [reply_to_element(R) || R <- Replies]. |
| |
| reply_to_element({ok, #doc{revs = Revs}} = Reply) -> |
| {_, [Rev | _]} = Revs, |
| {{Rev, Revs}, {Reply, 1}}; |
| reply_to_element(Reply) -> |
| {Reply, {Reply, 1}}. |
| |
| -endif. |