| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(mem3_reshard_test). |
| |
| |
| -include_lib("couch/include/couch_eunit.hrl"). |
| -include_lib("couch/include/couch_db.hrl"). |
| -include_lib("mem3/src/mem3_reshard.hrl"). |
| -include_lib("couch_mrview/include/couch_mrview.hrl"). % for all_docs function |
| |
| -define(ID, <<"_id">>). |
| -define(TIMEOUT, 60). |
| |
| setup() -> |
| HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}, |
| case HaveDreyfus of false -> ok; true -> |
| mock_dreyfus_indices() |
| end, |
| |
| HaveHastings = code:lib_dir(hastings) /= {error, bad_name}, |
| case HaveHastings of false -> ok; true -> |
| mock_hastings_indices() |
| end, |
| {Db1, Db2} = {?tempdb(), ?tempdb()}, |
| create_db(Db1, [{q, 1}, {n, 1}]), |
| PartProps = [{partitioned, true}, {hash, [couch_partition, hash, []]}], |
| create_db(Db2, [{q, 1}, {n, 1}, {props, PartProps}]), |
| config:set("reshard", "retry_interval_sec", "0", _Persist=false), |
| #{db1 => Db1, db2 => Db2}. |
| |
| |
| teardown(#{} = Dbs) -> |
| mem3_reshard:reset_state(), |
| maps:map(fun(_, Db) -> delete_db(Db) end, Dbs), |
| config:delete("reshard", "retry_interval_sec", _Persist=false), |
| meck:unload(). |
| |
| |
| start_couch() -> |
| test_util:start_couch(?CONFIG_CHAIN, [mem3, fabric]). |
| |
| |
| stop_couch(Ctx) -> |
| test_util:stop_couch(Ctx). |
| |
| |
| mem3_reshard_db_test_() -> |
| { |
| "mem3 shard split db tests", |
| { |
| setup, |
| fun start_couch/0, fun stop_couch/1, |
| { |
| foreach, |
| fun setup/0, fun teardown/1, |
| [ |
| fun split_one_shard/1, |
| fun update_docs_before_topoff1/1, |
| fun indices_are_built/1, |
| fun split_partitioned_db/1, |
| fun split_twice/1, |
| fun couch_events_are_emitted/1, |
| fun retries_work/1, |
| fun target_reset_in_initial_copy/1, |
| fun split_an_incomplete_shard_map/1, |
| fun target_shards_are_locked/1 |
| ] |
| } |
| } |
| }. |
| |
| |
| % This is a basic test to check that shard splitting preserves documents, and |
| % db meta props like revs limits and security. |
| split_one_shard(#{db1 := Db}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| DocSpec = #{docs => 10, delete => [5, 9], mrview => 1, local => 1}, |
| add_test_docs(Db, DocSpec), |
| |
| % Save documents before the split |
| Docs0 = get_all_docs(Db), |
| Local0 = get_local_docs(Db), |
| |
| % Set some custom metadata properties |
| set_revs_limit(Db, 942), |
| set_purge_infos_limit(Db, 943), |
| SecObj = {[{<<"foo">>, <<"bar">>}]}, |
| set_security(Db, SecObj), |
| |
| % DbInfo is saved after setting metadata bits |
| % as those could bump the update sequence |
| DbInfo0 = get_db_info(Db), |
| |
| % Split the one shard |
| [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), |
| {ok, JobId} = mem3_reshard:start_split_job(Shard), |
| wait_state(JobId, completed), |
| |
| % Perform some basic checks that the shard was split |
| Shards1 = lists:sort(mem3:local_shards(Db)), |
| ?assertEqual(2, length(Shards1)), |
| [#shard{range = R1}, #shard{range = R2}] = Shards1, |
| ?assertEqual([16#00000000, 16#7fffffff], R1), |
| ?assertEqual([16#80000000, 16#ffffffff], R2), |
| |
| % Check metadata bits after the split |
| ?assertEqual(942, get_revs_limit(Db)), |
| ?assertEqual(943, get_purge_infos_limit(Db)), |
| ?assertEqual(SecObj, get_security(Db)), |
| |
| DbInfo1 = get_db_info(Db), |
| Docs1 = get_all_docs(Db), |
| Local1 = get_local_docs(Db), |
| |
| % When comparing db infos, ignore update sequences they won't be the |
| % same since they are more shards involved after the split |
| ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)), |
| |
| % Update seq prefix number is a sum of all shard update sequences |
| #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0), |
| #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1), |
| ?assertEqual(UpdateSeq0 * 2, UpdateSeq1), |
| |
| % Finally compare that the documents are still there after the split |
| ?assertEqual(Docs0, Docs1), |
| |
| % Don't forget about the local but don't include internal checkpoints |
| % as some of those are munged and transformed during the split |
| ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)) |
| end)}. |
| |
| |
| % This test checks that document added while the shard is being split are not |
| % lost. Topoff1 state happens before indices are built |
| update_docs_before_topoff1(#{db1 := Db}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| add_test_docs(Db, #{docs => 10}), |
| |
| intercept_state(topoff1), |
| |
| [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), |
| {ok, JobId} = mem3_reshard:start_split_job(Shard), |
| |
| receive {JobPid, topoff1} -> ok end, |
| add_test_docs(Db, #{docs => [10, 19], local => 1}), |
| Docs0 = get_all_docs(Db), |
| Local0 = get_local_docs(Db), |
| DbInfo0 = get_db_info(Db), |
| JobPid ! continue, |
| |
| wait_state(JobId, completed), |
| |
| % Perform some basic checks that the shard was split |
| Shards1 = lists:sort(mem3:local_shards(Db)), |
| ?assertEqual(2, length(Shards1)), |
| |
| DbInfo1 = get_db_info(Db), |
| Docs1 = get_all_docs(Db), |
| Local1 = get_local_docs(Db), |
| |
| ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)), |
| |
| % Update sequence after initial copy with 10 docs would be 10 on each |
| % target shard (to match the source) and the total update sequence |
| % would have been 20. But then 10 more docs were added (3 might have |
| % ended up on one target and 7 on another) so the final update sequence |
| % would then be 20 + 10 = 30. |
| ?assertMatch(#{<<"update_seq">> := 30}, update_seq_to_num(DbInfo1)), |
| |
| ?assertEqual(Docs0, Docs1), |
| ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)) |
| end)}. |
| |
| |
| % This test that indices are built during shard splitting. |
| indices_are_built(#{db1 := Db}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}, |
| HaveHastings = code:lib_dir(hastings) /= {error, bad_name}, |
| |
| add_test_docs(Db, #{docs => 10, mrview => 2, search => 2, geo => 2}), |
| [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), |
| {ok, JobId} = mem3_reshard:start_split_job(Shard), |
| wait_state(JobId, completed), |
| Shards1 = lists:sort(mem3:local_shards(Db)), |
| ?assertEqual(2, length(Shards1)), |
| MRViewGroupInfo = get_group_info(Db, <<"_design/mrview00000">>), |
| ?assertMatch(#{<<"update_seq">> := 32}, MRViewGroupInfo), |
| |
| HaveDreyfus = code:lib_dir(dreyfus) /= {error, bad_name}, |
| case HaveDreyfus of false -> ok; true -> |
| % 4 because there are 2 indices and 2 target shards |
| ?assertEqual(4, meck:num_calls(dreyfus_index, await, 2)) |
| end, |
| |
| HaveHastings = code:lib_dir(hastings) /= {error, bad_name}, |
| case HaveHastings of false -> ok; true -> |
| % 4 because there are 2 indices and 2 target shards |
| ?assertEqual(4, meck:num_calls(hastings_index, await, 2)) |
| end |
| end)}. |
| |
| |
| mock_dreyfus_indices() -> |
| meck:expect(dreyfus_index, design_doc_to_indexes, fun(Doc) -> |
| #doc{body = {BodyProps}} = Doc, |
| case couch_util:get_value(<<"indexes">>, BodyProps) of |
| undefined -> |
| []; |
| {[_]} -> |
| [{dreyfus, <<"db">>, dreyfus_index1}] |
| end |
| end), |
| meck:expect(dreyfus_index_manager, get_index, fun(_, _) -> {ok, pid} end), |
| meck:expect(dreyfus_index, await, fun(_, _) -> ok end). |
| |
| |
| mock_hastings_indices() -> |
| meck:expect(hastings_index, design_doc_to_indexes, fun(Doc) -> |
| #doc{body = {BodyProps}} = Doc, |
| case couch_util:get_value(<<"st_indexes">>, BodyProps) of |
| undefined -> |
| []; |
| {[_]} -> |
| [{hastings, <<"db">>, hastings_index1}] |
| end |
| end), |
| meck:expect(hastings_index_manager, get_index, fun(_, _) -> {ok, pid} end), |
| meck:expect(hastings_index, await, fun(_, _) -> ok end). |
| |
| % Split partitioned database |
| split_partitioned_db(#{db2 := Db}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| DocSpec = #{ |
| pdocs => #{ |
| <<"PX">> => 5, |
| <<"PY">> => 5 |
| }, |
| mrview => 1, |
| local => 1 |
| }, |
| add_test_docs(Db, DocSpec), |
| |
| % Save documents before the split |
| Docs0 = get_all_docs(Db), |
| Local0 = get_local_docs(Db), |
| |
| % Set some custom metadata properties |
| set_revs_limit(Db, 942), |
| set_purge_infos_limit(Db, 943), |
| SecObj = {[{<<"foo">>, <<"bar">>}]}, |
| set_security(Db, SecObj), |
| |
| % DbInfo is saved after setting metadata bits |
| % as those could bump the update sequence |
| DbInfo0 = get_db_info(Db), |
| PX0 = get_partition_info(Db, <<"PX">>), |
| PY0 = get_partition_info(Db, <<"PY">>), |
| |
| % Split the one shard |
| [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), |
| {ok, JobId} = mem3_reshard:start_split_job(Shard), |
| wait_state(JobId, completed), |
| |
| % Perform some basic checks that the shard was split |
| Shards1 = lists:sort(mem3:local_shards(Db)), |
| ?assertEqual(2, length(Shards1)), |
| [#shard{range = R1}, #shard{range = R2}] = Shards1, |
| ?assertEqual([16#00000000, 16#7fffffff], R1), |
| ?assertEqual([16#80000000, 16#ffffffff], R2), |
| |
| % Check metadata bits after the split |
| ?assertEqual(942, get_revs_limit(Db)), |
| ?assertEqual(943, get_purge_infos_limit(Db)), |
| ?assertEqual(SecObj, get_security(Db)), |
| |
| DbInfo1 = get_db_info(Db), |
| Docs1 = get_all_docs(Db), |
| Local1 = get_local_docs(Db), |
| |
| % When comparing db infos, ignore update sequences they won't be the |
| % same since they are more shards involved after the split |
| ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)), |
| |
| % Update seq prefix number is a sum of all shard update sequences |
| #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0), |
| #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1), |
| ?assertEqual(UpdateSeq0 * 2, UpdateSeq1), |
| |
| % Finally compare that documents are still there after the split |
| ?assertEqual(Docs0, Docs1), |
| |
| ?assertEqual(PX0, get_partition_info(Db, <<"PX">>)), |
| ?assertEqual(PY0, get_partition_info(Db, <<"PY">>)), |
| |
| % Don't forget about the local but don't include internal checkpoints |
| % as some of those are munged and transformed during the split |
| ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)) |
| end)}. |
| |
| |
| % Make sure a shard can be split again after it was split once. This checks that |
| % too many got added to some range, such that on next split they'd fail to fit |
| % in to any of the new target ranges. |
| split_twice(#{db1 := Db}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| DocSpec = #{docs => 100, delete => [80, 99], mrview => 2, local => 100}, |
| add_test_docs(Db, DocSpec), |
| |
| % Save documents before the split |
| Docs0 = get_all_docs(Db), |
| Local0 = get_local_docs(Db), |
| |
| % Set some custom metadata properties |
| set_revs_limit(Db, 942), |
| set_purge_infos_limit(Db, 943), |
| SecObj = {[{<<"foo">>, <<"bar">>}]}, |
| set_security(Db, SecObj), |
| |
| % DbInfo is saved after setting metadata bits |
| % as those could bump the update sequence |
| DbInfo0 = get_db_info(Db), |
| |
| % Split the one shard |
| [#shard{name=Shard1}] = lists:sort(mem3:local_shards(Db)), |
| {ok, JobId1} = mem3_reshard:start_split_job(Shard1), |
| wait_state(JobId1, completed), |
| |
| % Perform some basic checks that the shard was split |
| Shards1 = lists:sort(mem3:local_shards(Db)), |
| ?assertEqual(2, length(Shards1)), |
| [#shard{range = R1}, #shard{range = R2}] = Shards1, |
| ?assertEqual([16#00000000, 16#7fffffff], R1), |
| ?assertEqual([16#80000000, 16#ffffffff], R2), |
| |
| % Check metadata bits after the split |
| ?assertEqual(942, get_revs_limit(Db)), |
| ?assertEqual(943, get_purge_infos_limit(Db)), |
| ?assertEqual(SecObj, get_security(Db)), |
| |
| DbInfo1 = get_db_info(Db), |
| Docs1 = get_all_docs(Db), |
| Local1 = get_local_docs(Db), |
| |
| % When comparing db infos, ignore update sequences they won't be the |
| % same since they are more shards involved after the split |
| ?assertEqual(without_seqs(DbInfo0), without_seqs(DbInfo1)), |
| |
| % Update seq prefix number is a sum of all shard update sequences |
| #{<<"update_seq">> := UpdateSeq0} = update_seq_to_num(DbInfo0), |
| #{<<"update_seq">> := UpdateSeq1} = update_seq_to_num(DbInfo1), |
| ?assertEqual(UpdateSeq0 * 2, UpdateSeq1), |
| |
| ?assertEqual(Docs0, Docs1), |
| ?assertEqual(without_meta_locals(Local0), without_meta_locals(Local1)), |
| |
| % Split the first range again |
| [#shard{name=Shard2}, _] = lists:sort(mem3:local_shards(Db)), |
| {ok, JobId2} = mem3_reshard:start_split_job(Shard2), |
| wait_state(JobId2, completed), |
| |
| Shards2 = lists:sort(mem3:local_shards(Db)), |
| ?assertEqual(3, length(Shards2)), |
| [R3, R4, R5] = [R || #shard{range = R} <- Shards2], |
| ?assertEqual([16#00000000, 16#3fffffff], R3), |
| ?assertEqual([16#40000000, 16#7fffffff], R4), |
| ?assertEqual([16#80000000, 16#ffffffff], R5), |
| |
| % Check metadata bits after the second split |
| ?assertEqual(942, get_revs_limit(Db)), |
| ?assertEqual(943, get_purge_infos_limit(Db)), |
| ?assertEqual(SecObj, get_security(Db)), |
| |
| DbInfo2 = get_db_info(Db), |
| Docs2 = get_all_docs(Db), |
| Local2 = get_local_docs(Db), |
| |
| ?assertEqual(without_seqs(DbInfo1), without_seqs(DbInfo2)), |
| % Update seq prefix number is a sum of all shard update sequences |
| % But only 1 shard out of 2 was split |
| #{<<"update_seq">> := UpdateSeq2} = update_seq_to_num(DbInfo2), |
| ?assertEqual(trunc(UpdateSeq1 * 1.5), UpdateSeq2), |
| ?assertEqual(Docs1, Docs2), |
| ?assertEqual(without_meta_locals(Local1), without_meta_locals(Local2)) |
| end)}. |
| |
| |
| couch_events_are_emitted(#{db1 := Db}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| couch_event:register_all(self()), |
| |
| % Split the one shard |
| [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), |
| {ok, JobId} = mem3_reshard:start_split_job(Shard), |
| wait_state(JobId, completed), |
| |
| % Perform some basic checks that the shard was split |
| Shards1 = lists:sort(mem3:local_shards(Db)), |
| ?assertEqual(2, length(Shards1)), |
| [#shard{range = R1}, #shard{range = R2}] = Shards1, |
| ?assertEqual([16#00000000, 16#7fffffff], R1), |
| ?assertEqual([16#80000000, 16#ffffffff], R2), |
| |
| Flush = fun F(Events) -> |
| receive |
| {'$couch_event', DbName, Event} when Event =:= deleted |
| orelse Event =:= updated -> |
| case binary:match(DbName, Db) of |
| nomatch -> F(Events); |
| {_, _} -> F([Event | Events]) |
| end |
| after 0 -> |
| lists:reverse(Events) |
| end |
| end, |
| Events = Flush([]), |
| StartAtDeleted = lists:dropwhile(fun(E) -> E =/= deleted end, Events), |
| ?assertMatch([deleted, deleted, updated, updated | _], StartAtDeleted), |
| couch_event:unregister(self()) |
| end)}. |
| |
| |
| retries_work(#{db1 := Db}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| meck:expect(couch_db_split, split, fun(_, _, _) -> |
| error(kapow) |
| end), |
| |
| [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), |
| {ok, JobId} = mem3_reshard:start_split_job(Shard), |
| |
| wait_state(JobId, failed), |
| ?assertEqual(3, meck:num_calls(couch_db_split, split, 3)) |
| end)}. |
| |
| |
| target_reset_in_initial_copy(#{db1 := Db}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| [#shard{} = Src] = lists:sort(mem3:local_shards(Db)), |
| Job = #job{ |
| source = Src, |
| target = [#shard{name= <<"t1">>}, #shard{name = <<"t2">>}], |
| job_state = running, |
| split_state = initial_copy |
| }, |
| meck:expect(couch_db_split, cleanup_target, 2, ok), |
| meck:expect(couch_server, exists, fun |
| (<<"t1">>) -> true; |
| (<<"t2">>) -> true; |
| (DbName) -> meck:passthrough([DbName]) |
| end), |
| JobPid = spawn(fun() -> mem3_reshard_job:initial_copy_impl(Job) end), |
| meck:wait(2, couch_db_split, cleanup_target, ['_', '_'], 5000), |
| exit(JobPid, kill), |
| ?assertEqual(2, meck:num_calls(couch_db_split, cleanup_target, 2)) |
| end)}. |
| |
| |
| split_an_incomplete_shard_map(#{db1 := Db}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| [#shard{} = Src] = lists:sort(mem3:local_shards(Db)), |
| [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), |
| meck:expect(mem3_util, calculate_max_n, 1, 0), |
| ?assertMatch({error, {not_enough_shard_copies, _}}, |
| mem3_reshard:start_split_job(Shard)) |
| end)}. |
| |
| |
| % Opening a db target db in initial copy phase will throw an error |
| target_shards_are_locked(#{db1 := Db}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| add_test_docs(Db, #{docs => 10}), |
| |
| % Make the job stops right when it was about to copy the docs |
| TestPid = self(), |
| meck:new(couch_db, [passthrough]), |
| meck:expect(couch_db, start_link, fun(Engine, TName, FilePath, Opts) -> |
| TestPid ! {start_link, self(), TName}, |
| receive |
| continue -> |
| meck:passthrough([Engine, TName, FilePath, Opts]) |
| end |
| end), |
| |
| [#shard{name=Shard}] = lists:sort(mem3:local_shards(Db)), |
| {ok, JobId} = mem3_reshard:start_split_job(Shard), |
| {Target0, JobPid} = receive |
| {start_link, Pid, TName} -> {TName, Pid} |
| end, |
| ?assertEqual({error, {locked, <<"shard splitting">>}}, |
| couch_db:open_int(Target0, [])), |
| |
| % Send two continues for two targets |
| JobPid ! continue, |
| JobPid ! continue, |
| |
| wait_state(JobId, completed) |
| end)}. |
| |
| |
| intercept_state(State) -> |
| TestPid = self(), |
| meck:new(mem3_reshard_job, [passthrough]), |
| meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) -> |
| case Job#job.split_state of |
| State -> |
| TestPid ! {self(), State}, |
| receive |
| continue -> meck:passthrough([Job]); |
| cancel -> ok |
| end; |
| _ -> |
| meck:passthrough([Job]) |
| end |
| end). |
| |
| |
| wait_state(JobId, State) -> |
| test_util:wait(fun() -> |
| case mem3_reshard:job(JobId) of |
| {ok, {Props}} -> |
| case couch_util:get_value(job_state, Props) of |
| State -> ok; |
| _ -> timer:sleep(100), wait |
| end; |
| {error, not_found} -> timer:sleep(100), wait |
| end |
| end, 30000). |
| |
| |
| set_revs_limit(DbName, Limit) -> |
| with_proc(fun() -> fabric:set_revs_limit(DbName, Limit, [?ADMIN_CTX]) end). |
| |
| |
| get_revs_limit(DbName) -> |
| with_proc(fun() -> fabric:get_revs_limit(DbName) end). |
| |
| |
| get_purge_infos_limit(DbName) -> |
| with_proc(fun() -> fabric:get_purge_infos_limit(DbName) end). |
| |
| |
| set_purge_infos_limit(DbName, Limit) -> |
| with_proc(fun() -> |
| fabric:set_purge_infos_limit(DbName, Limit, [?ADMIN_CTX]) |
| end). |
| |
| |
| set_security(DbName, SecObj) -> |
| with_proc(fun() -> fabric:set_security(DbName, SecObj) end). |
| |
| |
| get_security(DbName) -> |
| with_proc(fun() -> fabric:get_security(DbName, [?ADMIN_CTX]) end). |
| |
| |
| get_db_info(DbName) -> |
| with_proc(fun() -> |
| {ok, Info} = fabric:get_db_info(DbName), |
| maps:with([ |
| <<"db_name">>, <<"doc_count">>, <<"props">>, <<"doc_del_count">>, |
| <<"update_seq">>, <<"purge_seq">>, <<"disk_format_version">> |
| ], to_map(Info)) |
| end). |
| |
| |
| get_group_info(DbName, DesignId) -> |
| with_proc(fun() -> |
| {ok, GInfo} = fabric:get_view_group_info(DbName, DesignId), |
| maps:with([ |
| <<"language">>, <<"purge_seq">>, <<"signature">>, <<"update_seq">> |
| ], to_map(GInfo)) |
| end). |
| |
| |
| get_partition_info(DbName, Partition) -> |
| with_proc(fun() -> |
| {ok, PInfo} = fabric:get_partition_info(DbName, Partition), |
| maps:with([ |
| <<"db_name">>, <<"doc_count">>, <<"doc_del_count">>, <<"partition">> |
| ], to_map(PInfo)) |
| end). |
| |
| |
| get_all_docs(DbName) -> |
| get_all_docs(DbName, #mrargs{}). |
| |
| |
| get_all_docs(DbName, #mrargs{} = QArgs0) -> |
| GL = erlang:group_leader(), |
| with_proc(fun() -> |
| Cb = fun |
| ({row, Props}, Acc) -> |
| Doc = to_map(couch_util:get_value(doc, Props)), |
| #{?ID := Id} = Doc, |
| {ok, Acc#{Id => Doc}}; |
| ({meta, _}, Acc) -> {ok, Acc}; |
| (complete, Acc) -> {ok, Acc} |
| end, |
| QArgs = QArgs0#mrargs{include_docs = true}, |
| {ok, Docs} = fabric:all_docs(DbName, Cb, #{}, QArgs), |
| Docs |
| end, GL). |
| |
| |
| get_local_docs(DbName) -> |
| LocalNS = {namespace, <<"_local">>}, |
| maps:map(fun(_, Doc) -> |
| maps:without([<<"_rev">>], Doc) |
| end, get_all_docs(DbName, #mrargs{extra = [LocalNS]})). |
| |
| |
| without_seqs(#{} = InfoMap) -> |
| maps:without([<<"update_seq">>, <<"purge_seq">>], InfoMap). |
| |
| |
| without_meta_locals(#{} = Local) -> |
| maps:filter(fun |
| (<<"_local/purge-mrview-", _/binary>>, _) -> false; |
| (<<"_local/shard-sync-", _/binary>>, _) -> false; |
| (_, _) -> true |
| end, Local). |
| |
| |
| update_seq_to_num(#{} = InfoMap) -> |
| maps:map(fun |
| (<<"update_seq">>, Seq) -> seq_to_num(Seq); |
| (<<"purge_seq">>, PSeq) -> seq_to_num(PSeq); |
| (_, V) -> V |
| end, InfoMap). |
| |
| |
| seq_to_num(Seq) -> |
| [SeqNum, _] = binary:split(Seq, <<"-">>), |
| binary_to_integer(SeqNum). |
| |
| |
| to_map([_ | _] = Props) -> |
| to_map({Props}); |
| |
| to_map({[_ | _]} = EJson) -> |
| jiffy:decode(jiffy:encode(EJson), [return_maps]). |
| |
| |
| create_db(DbName, Opts) -> |
| GL = erlang:group_leader(), |
| with_proc(fun() -> fabric:create_db(DbName, Opts) end, GL). |
| |
| |
| delete_db(DbName) -> |
| GL = erlang:group_leader(), |
| with_proc(fun() -> fabric:delete_db(DbName, [?ADMIN_CTX]) end, GL). |
| |
| |
| with_proc(Fun) -> |
| with_proc(Fun, undefined, 30000). |
| |
| |
| with_proc(Fun, GroupLeader) -> |
| with_proc(Fun, GroupLeader, 30000). |
| |
| |
| with_proc(Fun, GroupLeader, Timeout) -> |
| {Pid, Ref} = spawn_monitor(fun() -> |
| case GroupLeader of |
| undefined -> ok; |
| _ -> erlang:group_leader(GroupLeader, self()) |
| end, |
| exit({with_proc_res, Fun()}) |
| end), |
| receive |
| {'DOWN', Ref, process, Pid, {with_proc_res, Res}} -> |
| Res; |
| {'DOWN', Ref, process, Pid, Error} -> |
| error(Error) |
| after Timeout -> |
| erlang:demonitor(Ref, [flush]), |
| exit(Pid, kill), |
| error({with_proc_timeout, Fun, Timeout}) |
| end. |
| |
| |
| add_test_docs(DbName, #{} = DocSpec) -> |
| Docs = docs(maps:get(docs, DocSpec, [])) |
| ++ pdocs(maps:get(pdocs, DocSpec, #{})) |
| ++ ddocs(mrview, maps:get(mrview, DocSpec, [])) |
| ++ ddocs(search, maps:get(search, DocSpec, [])) |
| ++ ddocs(geo, maps:get(geo, DocSpec, [])) |
| ++ ldocs(maps:get(local, DocSpec, [])), |
| Res = update_docs(DbName, Docs), |
| Docs1 = lists:map(fun({Doc, {ok, {RevPos, Rev}}}) -> |
| Doc#doc{revs = {RevPos, [Rev]}} |
| end, lists:zip(Docs, Res)), |
| case delete_docs(maps:get(delete, DocSpec, []), Docs1) of |
| [] -> ok; |
| [_ | _] = Deleted -> update_docs(DbName, Deleted) |
| end, |
| ok. |
| |
| |
| update_docs(DbName, Docs) -> |
| with_proc(fun() -> |
| case fabric:update_docs(DbName, Docs, [?ADMIN_CTX]) of |
| {accepted, Res} -> Res; |
| {ok, Res} -> Res |
| end |
| end). |
| |
| |
| delete_docs([S, E], Docs) when E >= S -> |
| ToDelete = [doc_id(<<"">>, I) || I <- lists:seq(S, E)], |
| lists:filtermap(fun(#doc{id = Id} = Doc) -> |
| case lists:member(Id, ToDelete) of |
| true -> {true, Doc#doc{deleted = true}}; |
| false -> false |
| end |
| end, Docs); |
| delete_docs(_, _) -> |
| []. |
| |
| |
| pdocs(#{} = PMap) -> |
| maps:fold(fun(Part, DocSpec, DocsAcc) -> |
| docs(DocSpec, <<Part/binary, ":">>) ++ DocsAcc |
| end, [], PMap). |
| |
| |
| docs(DocSpec) -> |
| docs(DocSpec, <<"">>). |
| |
| |
| docs(N, Prefix) when is_integer(N), N > 0 -> |
| docs([0, N - 1], Prefix); |
| docs([S, E], Prefix) when E >= S -> |
| [doc(Prefix, I) || I <- lists:seq(S, E)]; |
| docs(_, _) -> |
| []. |
| |
| ddocs(Type, N) when is_integer(N), N > 0 -> |
| ddocs(Type, [0, N - 1]); |
| ddocs(Type, [S, E]) when E >= S -> |
| Body = ddprop(Type), |
| BType = atom_to_binary(Type, utf8), |
| [doc(<<"_design/", BType/binary>>, I, Body, 0) || I <- lists:seq(S, E)]; |
| ddocs(_, _) -> |
| []. |
| |
| |
| ldocs(N) when is_integer(N), N > 0 -> |
| ldocs([0, N - 1]); |
| ldocs([S, E]) when E >= S -> |
| [doc(<<"_local/">>, I, bodyprops(), 0) || I <- lists:seq(S, E)]; |
| ldocs(_) -> |
| []. |
| |
| |
| |
| doc(Pref, Id) -> |
| Body = bodyprops(), |
| doc(Pref, Id, Body, 42). |
| |
| |
| doc(Pref, Id, BodyProps, AttSize) -> |
| #doc{ |
| id = doc_id(Pref, Id), |
| body = {BodyProps}, |
| atts = atts(AttSize) |
| }. |
| |
| |
| doc_id(Pref, Id) -> |
| IdBin = iolist_to_binary(io_lib:format("~5..0B", [Id])), |
| <<Pref/binary, IdBin/binary>>. |
| |
| |
| ddprop(mrview) -> |
| [ |
| {<<"views">>, {[ |
| {<<"v1">>, {[ |
| {<<"map">>, <<"function(d){emit(d);}">>} |
| ]}} |
| ]}} |
| ]; |
| |
| ddprop(geo) -> |
| [ |
| {<<"st_indexes">>, {[ |
| {<<"area">>, {[ |
| {<<"analyzer">>, <<"standard">>}, |
| {<<"index">>, <<"function(d){if(d.g){st_index(d.g)}}">> } |
| ]}} |
| ]}} |
| ]; |
| |
| ddprop(search) -> |
| [ |
| {<<"indexes">>, {[ |
| {<<"types">>, {[ |
| {<<"index">>, <<"function(d){if(d.g){st_index(d.g.type)}}">>} |
| ]}} |
| ]}} |
| ]. |
| |
| |
| bodyprops() -> |
| [ |
| {<<"g">>, {[ |
| {<<"type">>, <<"Polygon">>}, |
| {<<"coordinates">>, [[[-71.0, 48.4], [-70.0, 48.4], [-71.0, 48.4]]]} |
| ]}} |
| ]. |
| |
| |
| atts(0) -> |
| []; |
| |
| atts(Size) when is_integer(Size), Size >= 1 -> |
| Data = << <<"x">> || _ <- lists:seq(1, Size) >>, |
| [couch_att:new([ |
| {name, <<"att">>}, |
| {type, <<"app/binary">>}, |
| {att_len, Size}, |
| {data, Data} |
| ])]. |