| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(mem3_rep_test). |
| |
| |
| -include_lib("couch/include/couch_eunit.hrl"). |
| -include_lib("couch/include/couch_db.hrl"). |
| -include_lib("couch_mrview/include/couch_mrview.hrl"). |
| -include_lib("mem3/include/mem3.hrl"). |
| |
| |
| -define(ID, <<"_id">>). |
| -define(TIMEOUT, 60). % seconds |
| |
| setup() -> |
| {AllSrc, AllTgt} = {?tempdb(), ?tempdb()}, |
| {PartSrc, PartTgt} = {?tempdb(), ?tempdb()}, |
| create_db(AllSrc, [{q, 1}, {n, 1}]), |
| create_db(AllTgt, [{q, 2}, {n, 1}]), |
| PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}], |
| create_db(PartSrc, [{q, 1}, {n, 1}, {props, PartProps}]), |
| create_db(PartTgt, [{q, 2}, {n, 1}, {props, PartProps}]), |
| #{allsrc => AllSrc, alltgt => AllTgt, partsrc => PartSrc, parttgt => PartTgt}. |
| |
| |
| teardown(#{} = Dbs) -> |
| maps:map(fun(_, Db) -> delete_db(Db) end, Dbs). |
| |
| |
| start_couch() -> |
| test_util:start_couch([mem3, fabric]). |
| |
| |
| stop_couch(Ctx) -> |
| test_util:stop_couch(Ctx). |
| |
| |
| mem3_reshard_db_test_() -> |
| { |
| "mem3 rep db tests", |
| { |
| setup, |
| fun start_couch/0, fun stop_couch/1, |
| { |
| foreach, |
| fun setup/0, fun teardown/1, |
| [ |
| fun replicate_basics/1, |
| fun replicate_small_batches/1, |
| fun replicate_low_batch_count/1, |
| fun replicate_with_partitions/1 |
| ] |
| } |
| } |
| }. |
| |
| |
| replicate_basics(#{allsrc := AllSrc, alltgt := AllTgt}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| DocSpec = #{docs => 10, delete => [5, 9]}, |
| add_test_docs(AllSrc, DocSpec), |
| SDocs = get_all_docs(AllSrc), |
| |
| [Src] = lists:sort(mem3:local_shards(AllSrc)), |
| [Tgt1, Tgt2] = lists:sort(mem3:local_shards(AllTgt)), |
| #shard{range = R1} = Tgt1, |
| #shard{range = R2} = Tgt2, |
| TMap = #{R1 => Tgt1, R2 => Tgt2}, |
| Opts = [{batch_size, 1000}, {batch_count, all}], |
| ?assertMatch({ok, 0}, mem3_rep:go(Src, TMap, Opts)), |
| |
| ?assertEqual(SDocs, get_all_docs(AllTgt)) |
| end)}. |
| |
| |
| replicate_small_batches(#{allsrc := AllSrc, alltgt := AllTgt}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| DocSpec = #{docs => 10, delete => [5, 9]}, |
| add_test_docs(AllSrc, DocSpec), |
| SDocs = get_all_docs(AllSrc), |
| |
| [Src] = lists:sort(mem3:local_shards(AllSrc)), |
| [Tgt1, Tgt2] = lists:sort(mem3:local_shards(AllTgt)), |
| #shard{range = R1} = Tgt1, |
| #shard{range = R2} = Tgt2, |
| TMap = #{R1 => Tgt1, R2 => Tgt2}, |
| Opts = [{batch_size, 2}, {batch_count, all}], |
| ?assertMatch({ok, 0}, mem3_rep:go(Src, TMap, Opts)), |
| |
| ?assertEqual(SDocs, get_all_docs(AllTgt)) |
| end)}. |
| |
| |
| replicate_low_batch_count(#{allsrc := AllSrc, alltgt := AllTgt}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| DocSpec = #{docs => 10, delete => [5, 9]}, |
| add_test_docs(AllSrc, DocSpec), |
| SDocs = get_all_docs(AllSrc), |
| |
| [Src] = lists:sort(mem3:local_shards(AllSrc)), |
| [Tgt1, Tgt2] = lists:sort(mem3:local_shards(AllTgt)), |
| #shard{range = R1} = Tgt1, |
| #shard{range = R2} = Tgt2, |
| TMap = #{R1 => Tgt1, R2 => Tgt2}, |
| |
| Opts1 = [{batch_size, 2}, {batch_count, 1}], |
| ?assertMatch({ok, 8}, mem3_rep:go(Src, TMap, Opts1)), |
| |
| Opts2 = [{batch_size, 1}, {batch_count, 2}], |
| ?assertMatch({ok, 6}, mem3_rep:go(Src, TMap, Opts2)), |
| |
| Opts3 = [{batch_size, 1000}, {batch_count, all}], |
| ?assertMatch({ok, 0}, mem3_rep:go(Src, TMap, Opts3)), |
| |
| ?assertEqual(SDocs, get_all_docs(AllTgt)) |
| end)}. |
| |
| |
| replicate_with_partitions(#{partsrc := PartSrc, parttgt := PartTgt}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| DocSpec = #{ |
| pdocs => #{ |
| <<"PX">> => 15, |
| <<"PY">> => 19 |
| } |
| }, |
| add_test_docs(PartSrc, DocSpec), |
| SDocs = get_all_docs(PartSrc), |
| PXSrc = get_partition_info(PartSrc, <<"PX">>), |
| PYSrc = get_partition_info(PartSrc, <<"PY">>), |
| |
| [Src] = lists:sort(mem3:local_shards(PartSrc)), |
| [Tgt1, Tgt2] = lists:sort(mem3:local_shards(PartTgt)), |
| #shard{range = R1} = Tgt1, |
| #shard{range = R2} = Tgt2, |
| TMap = #{R1 => Tgt1, R2 => Tgt2}, |
| Opts = [{batch_size, 1000}, {batch_count, all}], |
| ?assertMatch({ok, 0}, mem3_rep:go(Src, TMap, Opts)), |
| |
| ?assertEqual(PXSrc, get_partition_info(PartTgt, <<"PX">>)), |
| ?assertEqual(PYSrc, get_partition_info(PartTgt, <<"PY">>)), |
| ?assertEqual(SDocs, get_all_docs(PartTgt)) |
| end)}. |
| |
| |
| get_partition_info(DbName, Partition) -> |
| with_proc(fun() -> |
| {ok, PInfo} = fabric:get_partition_info(DbName, Partition), |
| maps:with([ |
| <<"doc_count">>, <<"doc_del_count">>, <<"partition">> |
| ], to_map(PInfo)) |
| end). |
| |
| |
| get_all_docs(DbName) -> |
| get_all_docs(DbName, #mrargs{}). |
| |
| |
| get_all_docs(DbName, #mrargs{} = QArgs0) -> |
| GL = erlang:group_leader(), |
| with_proc(fun() -> |
| Cb = fun |
| ({row, Props}, Acc) -> |
| Doc = to_map(couch_util:get_value(doc, Props)), |
| #{?ID := Id} = Doc, |
| {ok, Acc#{Id => Doc}}; |
| ({meta, _}, Acc) -> {ok, Acc}; |
| (complete, Acc) -> {ok, Acc} |
| end, |
| QArgs = QArgs0#mrargs{include_docs = true}, |
| {ok, Docs} = fabric:all_docs(DbName, Cb, #{}, QArgs), |
| Docs |
| end, GL). |
| |
| |
| to_map([_ | _] = Props) -> |
| to_map({Props}); |
| |
| to_map({[_ | _]} = EJson) -> |
| jiffy:decode(jiffy:encode(EJson), [return_maps]). |
| |
| |
| create_db(DbName, Opts) -> |
| GL = erlang:group_leader(), |
| with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL). |
| |
| |
| delete_db(DbName) -> |
| GL = erlang:group_leader(), |
| with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL). |
| |
| |
| with_proc(Fun) -> |
| with_proc(Fun, undefined, 30000). |
| |
| |
| with_proc(Fun, GroupLeader) -> |
| with_proc(Fun, GroupLeader, 30000). |
| |
| |
| with_proc(Fun, GroupLeader, Timeout) -> |
| {Pid, Ref} = spawn_monitor(fun() -> |
| case GroupLeader of |
| undefined -> ok; |
| _ -> erlang:group_leader(GroupLeader, self()) |
| end, |
| exit({with_proc_res, Fun()}) |
| end), |
| receive |
| {'DOWN', Ref, process, Pid, {with_proc_res, Res}} -> |
| Res; |
| {'DOWN', Ref, process, Pid, Error} -> |
| error(Error) |
| after Timeout -> |
| erlang:demonitor(Ref, [flush]), |
| exit(Pid, kill), |
| error({with_proc_timeout, Fun, Timeout}) |
| end. |
| |
| |
| add_test_docs(DbName, #{} = DocSpec) -> |
| Docs = docs(maps:get(docs, DocSpec, [])) |
| ++ pdocs(maps:get(pdocs, DocSpec, #{})), |
| Res = update_docs(DbName, Docs), |
| Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) -> |
| Doc#doc{revs = {RevPos, [Rev]}} |
| end, lists:zip(Docs, Res)), |
| case delete_docs(maps:get(delete, DocSpec, []), Docs1) of |
| [] -> ok; |
| [_ | _] = Deleted -> update_docs(DbName, Deleted) |
| end, |
| ok. |
| |
| |
| update_docs(DbName, Docs) -> |
| with_proc(fun() -> |
| case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of |
| {accepted, Res} -> Res; |
| {ok, Res} -> Res |
| end |
| end). |
| |
| |
| delete_docs([S, E], Docs) when E >= S -> |
| ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)], |
| lists:filtermap(fun(#doc{id = Id} = Doc) -> |
| case lists:member(Id, ToDelete) of |
| true -> {true, Doc#doc{deleted = true}}; |
| false -> false |
| end |
| end, Docs); |
| delete_docs(_, _) -> |
| []. |
| |
| |
| pdocs(#{} = PMap) -> |
| maps:fold(fun(Part, DocSpec, DocsAcc) -> |
| docs(DocSpec, <<Part/binary, ":">>) ++ DocsAcc |
| end, [], PMap). |
| |
| |
| docs(DocSpec) -> |
| docs(DocSpec, <<"">>). |
| |
| |
| docs(N, Prefix) when is_integer(N), N > 0 -> |
| docs([0, N - 1], Prefix); |
| docs([S, E], Prefix) when E >= S -> |
| [doc(Prefix, I) || I <- lists:seq(S, E)]; |
| docs(_, _) -> |
| []. |
| |
| |
| doc(Pref, Id) -> |
| Body = bodyprops(), |
| doc(Pref, Id, Body, 42). |
| |
| |
| doc(Pref, Id, BodyProps, AttSize) -> |
| #doc{ |
| id = doc_id(Pref, Id), |
| body = {BodyProps}, |
| atts = atts(AttSize) |
| }. |
| |
| |
| doc_id(Pref, Id) -> |
| IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])), |
| <<Pref/binary, IdBin/binary>>. |
| |
| |
| bodyprops() -> |
| [ |
| {<<"g">>, {[ |
| {<<"type">>, <<"Polygon">>}, |
| {<<"coordinates">>, [[[-71.0, 48.4], [-70.0, 48.4], [-71.0, 48.4]]]} |
| ]}} |
| ]. |
| |
| |
| atts(0) -> |
| []; |
| |
| atts(Size) when is_integer(Size), Size >= 1 -> |
| Data = << <<"x">> || _ <- lists:seq(1, Size) >>, |
| [couch_att:new([ |
| {name, <<"att">>}, |
| {type, <<"app/binary">>}, |
| {att_len, Size}, |
| {data, Data} |
| ])]. |