| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(mem3_reshard_api_test). |
| |
| |
| -include_lib("couch/include/couch_eunit.hrl"). |
| -include_lib("couch/include/couch_db.hrl"). |
| -include_lib("mem3/src/mem3_reshard.hrl"). |
| |
| |
| -define(USER, "mem3_reshard_api_test_admin"). |
| -define(PASS, "pass"). |
| -define(AUTH, {basic_auth, {?USER, ?PASS}}). |
| -define(JSON, {"Content-Type", "application/json"}). |
| -define(RESHARD, "_reshard/"). |
| -define(JOBS, "_reshard/jobs/"). |
| -define(STATE, "_reshard/state"). |
| -define(ID, <<"id">>). |
| -define(OK, <<"ok">>). |
| -define(TIMEOUT, 60). % seconds |
| |
| |
| setup() -> |
| Hashed = couch_passwords:hash_admin_password(?PASS), |
| ok = config:set("admins", ?USER, ?b2l(Hashed), _Persist=false), |
| Addr = config:get("chttpd", "bind_address", "127.0.0.1"), |
| Port = mochiweb_socket_server:get(chttpd, port), |
| Url = lists:concat(["http://", Addr, ":", Port, "/"]), |
| {Db1, Db2, Db3} = {?tempdb(), ?tempdb(), ?tempdb()}, |
| create_db(Url, Db1, "?q=1&n=1"), |
| create_db(Url, Db2, "?q=1&n=1"), |
| create_db(Url, Db3, "?q=2&n=1"), |
| {Url, {Db1, Db2, Db3}}. |
| |
| |
| teardown({Url, {Db1, Db2, Db3}}) -> |
| mem3_reshard:reset_state(), |
| application:unset_env(mem3, reshard_disabled), |
| delete_db(Url, Db1), |
| delete_db(Url, Db2), |
| delete_db(Url, Db3), |
| ok = config:delete("reshard", "max_jobs", _Persist=false), |
| ok = config:delete("reshard", "require_node_param", _Persist=false), |
| ok = config:delete("reshard", "require_range_param", _Persist=false), |
| ok = config:delete("admins", ?USER, _Persist=false), |
| meck:unload(). |
| |
| |
| start_couch() -> |
| test_util:start_couch([mem3, chttpd]). |
| |
| |
| stop_couch(Ctx) -> |
| test_util:stop_couch(Ctx). |
| |
| |
| mem3_reshard_api_test_() -> |
| { |
| "mem3 shard split api tests", |
| { |
| setup, |
| fun start_couch/0, fun stop_couch/1, |
| { |
| foreach, |
| fun setup/0, fun teardown/1, |
| [ |
| fun basics/1, |
| fun create_job_basic/1, |
| fun create_two_jobs/1, |
| fun create_multiple_jobs_from_one_post/1, |
| fun start_stop_cluster_basic/1, |
| fun test_disabled/1, |
| fun start_stop_cluster_with_a_job/1, |
| fun individual_job_start_stop/1, |
| fun individual_job_stop_when_cluster_stopped/1, |
| fun create_job_with_invalid_arguments/1, |
| fun create_job_with_db/1, |
| fun create_job_with_shard_name/1, |
| fun completed_job_handling/1, |
| fun handle_db_deletion_in_initial_copy/1, |
| fun handle_db_deletion_in_topoff1/1, |
| fun handle_db_deletion_in_copy_local_docs/1, |
| fun handle_db_deletion_in_build_indices/1, |
| fun handle_db_deletion_in_update_shard_map/1, |
| fun handle_db_deletion_in_wait_source_close/1, |
| fun recover_in_initial_copy/1, |
| fun recover_in_topoff1/1, |
| fun recover_in_copy_local_docs/1, |
| fun recover_in_build_indices/1, |
| fun recover_in_update_shard_map/1, |
| fun recover_in_wait_source_close/1, |
| fun recover_in_topoff3/1, |
| fun recover_in_source_delete/1, |
| fun check_max_jobs/1, |
| fun check_node_and_range_required_params/1, |
| fun cleanup_completed_jobs/1 |
| ] |
| } |
| } |
| }. |
| |
| |
| basics({Top, _}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| % GET /_reshard |
| ?assertMatch({200, #{ |
| <<"state">> := <<"running">>, |
| <<"state_reason">> := null, |
| <<"completed">> := 0, |
| <<"failed">> := 0, |
| <<"running">> := 0, |
| <<"stopped">> := 0, |
| <<"total">> := 0 |
| }}, req(get, Top ++ ?RESHARD)), |
| |
| % GET _reshard/state |
| ?assertMatch({200, #{<<"state">> := <<"running">>}}, |
| req(get, Top ++ ?STATE)), |
| |
| % GET _reshard/jobs |
| ?assertMatch({200, #{ |
| <<"jobs">> := [], |
| <<"offset">> := 0, |
| <<"total_rows">> := 0 |
| }}, req(get, Top ++ ?JOBS)), |
| |
| % Some invalid paths and methods |
| ?assertMatch({404, _}, req(get, Top ++ ?RESHARD ++ "/invalidpath")), |
| ?assertMatch({405, _}, req(put, Top ++ ?RESHARD, #{dont => thinkso})), |
| ?assertMatch({405, _}, req(post, Top ++ ?RESHARD, #{nope => nope})) |
| end)}. |
| |
| |
| create_job_basic({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| % POST /_reshard/jobs |
| {C1, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}), |
| ?assertEqual(201, C1), |
| ?assertMatch([#{?OK := true, ?ID := J, <<"shard">> := S}] |
| when is_binary(J) andalso is_binary(S), R1), |
| [#{?ID := Id, <<"shard">> := Shard}] = R1, |
| |
| % GET /_reshard/jobs |
| ?assertMatch({200, #{ |
| <<"jobs">> := [#{?ID := Id, <<"type">> := <<"split">>}], |
| <<"offset">> := 0, |
| <<"total_rows">> := 1 |
| }}, req(get, Top ++ ?JOBS)), |
| |
| % GET /_reshard/job/$jobid |
| {C2, R2} = req(get, Top ++ ?JOBS ++ ?b2l(Id)), |
| ?assertEqual(200, C2), |
| ThisNode = atom_to_binary(node(), utf8), |
| ?assertMatch(#{?ID := Id}, R2), |
| ?assertMatch(#{<<"type">> := <<"split">>}, R2), |
| ?assertMatch(#{<<"source">> := Shard}, R2), |
| ?assertMatch(#{<<"history">> := History} when length(History) > 1, R2), |
| ?assertMatch(#{<<"node">> := ThisNode}, R2), |
| ?assertMatch(#{<<"split_state">> := SSt} when is_binary(SSt), R2), |
| ?assertMatch(#{<<"job_state">> := JSt} when is_binary(JSt), R2), |
| ?assertMatch(#{<<"state_info">> := #{}}, R2), |
| ?assertMatch(#{<<"target">> := Target} when length(Target) == 2, R2), |
| |
| % GET /_reshard/job/$jobid/state |
| ?assertMatch({200, #{<<"state">> := S, <<"reason">> := R}} |
| when is_binary(S) andalso (is_binary(R) orelse R =:= null), |
| req(get, Top ++ ?JOBS ++ ?b2l(Id) ++ "/state")), |
| |
| % GET /_reshard |
| ?assertMatch({200, #{<<"state">> := <<"running">>, <<"total">> := 1}}, |
| req(get, Top ++ ?RESHARD)), |
| |
| % DELETE /_reshard/jobs/$jobid |
| ?assertMatch({200, #{?OK := true}}, |
| req(delete, Top ++ ?JOBS ++ ?b2l(Id))), |
| |
| % GET _reshard/jobs |
| ?assertMatch({200, #{<<"jobs">> := [], <<"total_rows">> := 0}}, |
| req(get, Top ++ ?JOBS)), |
| |
| % GET /_reshard/job/$jobid should be a 404 |
| ?assertMatch({404, #{}}, req(get, Top ++ ?JOBS ++ ?b2l(Id))), |
| |
| % DELETE /_reshard/jobs/$jobid should be a 404 as well |
| ?assertMatch({404, #{}}, req(delete, Top ++ ?JOBS ++ ?b2l(Id))) |
| end)}. |
| |
| |
| create_two_jobs({Top, {Db1, Db2, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| Jobs = Top ++ ?JOBS, |
| |
| ?assertMatch({201, [#{?OK := true}]}, |
| req(post, Jobs, #{type => split, db => Db1})), |
| ?assertMatch({201, [#{?OK := true}]}, |
| req(post, Jobs, #{type => split, db => Db2})), |
| |
| ?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD)), |
| |
| ?assertMatch({200, #{ |
| <<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}], |
| <<"offset">> := 0, |
| <<"total_rows">> := 2 |
| }} when Id1 =/= Id2, req(get, Jobs)), |
| |
| {200, #{<<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}]}} = req(get, Jobs), |
| |
| {200, #{?OK := true}} = req(delete, Jobs ++ ?b2l(Id1)), |
| ?assertMatch({200, #{<<"total">> := 1}}, req(get, Top ++ ?RESHARD)), |
| {200, #{?OK := true}} = req(delete, Jobs ++ ?b2l(Id2)), |
| ?assertMatch({200, #{<<"total">> := 0}}, req(get, Top ++ ?RESHARD)) |
| end)}. |
| |
| |
| create_multiple_jobs_from_one_post({Top, {_, _, Db3}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| Jobs = Top ++ ?JOBS, |
| {C1, R1} = req(post, Jobs, #{type => split, db => Db3}), |
| ?assertMatch({201, [#{?OK := true}, #{?OK := true}]}, {C1, R1}), |
| ?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD)) |
| end)}. |
| |
| |
| start_stop_cluster_basic({Top, _}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| Url = Top ++ ?STATE, |
| |
| ?assertMatch({200, #{ |
| <<"state">> := <<"running">>, |
| <<"reason">> := null |
| }}, req(get, Url)), |
| |
| ?assertMatch({200, _}, req(put, Url, #{state => stopped})), |
| ?assertMatch({200, #{ |
| <<"state">> := <<"stopped">>, |
| <<"reason">> := R |
| }} when is_binary(R), req(get, Url)), |
| |
| ?assertMatch({200, _}, req(put, Url, #{state => running})), |
| |
| % Make sure the reason shows in the state GET request |
| Reason = <<"somereason">>, |
| ?assertMatch({200, _}, req(put, Url, #{state => stopped, |
| reason => Reason})), |
| ?assertMatch({200, #{<<"state">> := <<"stopped">>, |
| <<"reason">> := Reason}}, req(get, Url)), |
| |
| % Top level summary also shows the reason |
| ?assertMatch({200, #{ |
| <<"state">> := <<"stopped">>, |
| <<"state_reason">> := Reason |
| }}, req(get, Top ++ ?RESHARD)), |
| ?assertMatch({200, _}, req(put, Url, #{state => running})), |
| ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, Url)) |
| end)}. |
| |
| |
| test_disabled({Top, _}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| application:set_env(mem3, reshard_disabled, true), |
| ?assertMatch({501, _}, req(get, Top ++ ?RESHARD)), |
| ?assertMatch({501, _}, req(put, Top ++ ?STATE, #{state => running})), |
| |
| application:unset_env(mem3, reshard_disabled), |
| ?assertMatch({200, _}, req(get, Top ++ ?RESHARD)), |
| ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})) |
| end)}. |
| |
| |
| start_stop_cluster_with_a_job({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| Url = Top ++ ?STATE, |
| |
| ?assertMatch({200, _}, req(put, Url, #{state => stopped})), |
| ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, Url)), |
| |
| % Can add jobs with global state stopped, they just won't be running |
| {201, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}), |
| ?assertMatch([#{?OK := true}], R1), |
| [#{?ID := Id1}] = R1, |
| {200, J1} = req(get, Top ++ ?JOBS ++ ?b2l(Id1)), |
| ?assertMatch(#{?ID := Id1, <<"job_state">> := <<"stopped">>}, J1), |
| % Check summary stats |
| ?assertMatch({200, #{ |
| <<"state">> := <<"stopped">>, |
| <<"running">> := 0, |
| <<"stopped">> := 1, |
| <<"total">> := 1 |
| }}, req(get, Top ++ ?RESHARD)), |
| |
| % Can delete the job when stopped |
| {200, #{?OK := true}} = req(delete, Top ++ ?JOBS ++ ?b2l(Id1)), |
| ?assertMatch({200, #{ |
| <<"state">> := <<"stopped">>, |
| <<"running">> := 0, |
| <<"stopped">> := 0, |
| <<"total">> := 0 |
| }}, req(get, Top ++ ?RESHARD)), |
| |
| % Add same job again |
| {201, [#{?ID := Id2}]} = req(post, Top ++ ?JOBS, #{type => split, |
| db => Db1}), |
| ?assertMatch({200, #{?ID := Id2, <<"job_state">> := <<"stopped">>}}, |
| req(get, Top ++ ?JOBS ++ ?b2l(Id2))), |
| |
| % Job should start after resharding is started on the cluster |
| ?assertMatch({200, _}, req(put, Url, #{state => running})), |
| ?assertMatch({200, #{?ID := Id2, <<"job_state">> := JSt}} |
| when JSt =/= <<"stopped">>, req(get, Top ++ ?JOBS ++ ?b2l(Id2))) |
| end)}. |
| |
| |
| individual_job_start_stop({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| intercept_state(topoff1), |
| |
| Body = #{type => split, db => Db1}, |
| {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), |
| |
| JobUrl = Top ++ ?JOBS ++ ?b2l(Id), |
| StUrl = JobUrl ++ "/state", |
| |
| % Wait for the the job to start running and intercept it in topoff1 state |
| receive {JobPid, topoff1} -> ok end, |
| % Tell the intercept to never finish checkpointing so job is left hanging |
| % forever in running state |
| JobPid ! cancel, |
| ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)), |
| |
| {200, _} = req(put, StUrl, #{state => stopped}), |
| wait_state(StUrl, <<"stopped">>), |
| |
| % Stop/start resharding globally and job should still stay stopped |
| ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), |
| ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), |
| ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)), |
| |
| % Start the job again |
| ?assertMatch({200, _}, req(put, StUrl, #{state => running})), |
| % Wait for the the job to start running and intercept it in topoff1 state |
| receive {JobPid2, topoff1} -> ok end, |
| ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)), |
| % Let it continue running and it should complete eventually |
| JobPid2 ! continue, |
| wait_state(StUrl, <<"completed">>) |
| end)}. |
| |
| |
| individual_job_stop_when_cluster_stopped({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| intercept_state(topoff1), |
| |
| Body = #{type => split, db => Db1}, |
| {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), |
| |
| JobUrl = Top ++ ?JOBS ++ ?b2l(Id), |
| StUrl = JobUrl ++ "/state", |
| |
| % Wait for the the job to start running and intercept in topoff1 |
| receive {JobPid, topoff1} -> ok end, |
| % Tell the intercept to never finish checkpointing so job is left |
| % hanging forever in running state |
| JobPid ! cancel, |
| ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)), |
| |
| % Stop resharding globally |
| ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), |
| wait_state(StUrl, <<"stopped">>), |
| |
| % Stop the job specifically |
| {200, _} = req(put, StUrl, #{state => stopped}), |
| % Job stays stopped |
| ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)), |
| |
| % Set cluster to running again |
| ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), |
| |
| % The job should stay stopped |
| ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)), |
| |
| % It should be possible to resume job and it should complete |
| ?assertMatch({200, _}, req(put, StUrl, #{state => running})), |
| |
| % Wait for the the job to start running and intercept in topoff1 state |
| receive {JobPid2, topoff1} -> ok end, |
| ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)), |
| |
| % Let it continue running and it should complete eventually |
| JobPid2 ! continue, |
| wait_state(StUrl, <<"completed">>) |
| end)}. |
| |
| |
| create_job_with_invalid_arguments({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| Jobs = Top ++ ?JOBS, |
| |
| % Nothing in the body |
| ?assertMatch({400, _}, req(post, Jobs, #{})), |
| |
| % Missing type |
| ?assertMatch({400, _}, req(post, Jobs, #{db => Db1})), |
| |
| % Have type but no db and no shard |
| ?assertMatch({400, _}, req(post, Jobs, #{type => split})), |
| |
| % Have type and db but db is invalid |
| ?assertMatch({400, _}, req(post, Jobs, #{db => <<"baddb">>, |
| type => split})), |
| |
| % Have type and shard but shard is not an existing database |
| ?assertMatch({404, _}, req(post, Jobs, #{type => split, |
| shard => <<"shards/80000000-ffffffff/baddb.1549492084">>})), |
| |
| % Bad range values, too large, different types, inverted |
| ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, range => 42, |
| type => split})), |
| ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, |
| range => <<"x">>, type => split})), |
| ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, |
| range => <<"ffffffff-80000000">>, type => split})), |
| ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, |
| range => <<"00000000-fffffffff">>, type => split})), |
| |
| % Can't have both db and shard |
| ?assertMatch({400, _}, req(post, Jobs, #{type => split, db => Db1, |
| shard => <<"blah">>})) |
| end)}. |
| |
| |
| create_job_with_db({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| Jobs = Top ++ ?JOBS, |
| Body1 = #{type => split, db => Db1}, |
| |
| % Node with db |
| N = atom_to_binary(node(), utf8), |
| {C1, R1} = req(post, Jobs, Body1#{node => N}), |
| ?assertMatch({201, [#{?OK := true}]}, {C1, R1}), |
| wait_to_complete_then_cleanup(Top, R1), |
| |
| % Range and db |
| {C2, R2} = req(post, Jobs, Body1#{range => <<"00000000-7fffffff">>}), |
| ?assertMatch({201, [#{?OK := true}]}, {C2, R2}), |
| wait_to_complete_then_cleanup(Top, R2), |
| |
| % Node, range and db |
| Range = <<"80000000-ffffffff">>, |
| {C3, R3} = req(post, Jobs, Body1#{range => Range, node => N}), |
| ?assertMatch({201, [#{?OK := true}]}, {C3, R3}), |
| wait_to_complete_then_cleanup(Top, R3), |
| |
| ?assertMatch([ |
| [16#00000000, 16#3fffffff], |
| [16#40000000, 16#7fffffff], |
| [16#80000000, 16#bfffffff], |
| [16#c0000000, 16#ffffffff] |
| ], [mem3:range(S) || S <- lists:sort(mem3:shards(Db1))]) |
| end)}. |
| |
| |
| create_job_with_shard_name({Top, {_, _, Db3}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| Jobs = Top ++ ?JOBS, |
| [S1, S2] = [mem3:name(S) || S <- lists:sort(mem3:shards(Db3))], |
| |
| % Shard only |
| {C1, R1} = req(post, Jobs, #{type => split, shard => S1}), |
| ?assertMatch({201, [#{?OK := true}]}, {C1, R1}), |
| wait_to_complete_then_cleanup(Top, R1), |
| |
| % Shard with a node |
| N = atom_to_binary(node(), utf8), |
| {C2, R2} = req(post, Jobs, #{type => split, shard => S2, node => N}), |
| ?assertMatch({201, [#{?OK := true}]}, {C2, R2}), |
| wait_to_complete_then_cleanup(Top, R2), |
| |
| ?assertMatch([ |
| [16#00000000, 16#3fffffff], |
| [16#40000000, 16#7fffffff], |
| [16#80000000, 16#bfffffff], |
| [16#c0000000, 16#ffffffff] |
| ], [mem3:range(S) || S <- lists:sort(mem3:shards(Db3))]) |
| end)}. |
| |
| |
| completed_job_handling({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| Jobs = Top ++ ?JOBS, |
| |
| % Run job to completion |
| {C1, R1} = req(post, Jobs, #{type => split, db => Db1}), |
| ?assertMatch({201, [#{?OK := true}]}, {C1, R1}), |
| [#{?ID := Id}] = R1, |
| wait_to_complete(Top, R1), |
| |
| % Check top level stats |
| ?assertMatch({200, #{ |
| <<"state">> := <<"running">>, |
| <<"state_reason">> := null, |
| <<"completed">> := 1, |
| <<"failed">> := 0, |
| <<"running">> := 0, |
| <<"stopped">> := 0, |
| <<"total">> := 1 |
| }}, req(get, Top ++ ?RESHARD)), |
| |
| % Job state itself |
| JobUrl = Jobs ++ ?b2l(Id), |
| ?assertMatch({200, #{ |
| <<"split_state">> := <<"completed">>, |
| <<"job_state">> := <<"completed">> |
| }}, req(get, JobUrl)), |
| |
| % Job's state endpoint |
| StUrl = Jobs ++ ?b2l(Id) ++ "/state", |
| ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), |
| |
| % Try to stop it and it should stay completed |
| {200, _} = req(put, StUrl, #{state => stopped}), |
| ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), |
| |
| % Try to resume it and it should stay completed |
| {200, _} = req(put, StUrl, #{state => running}), |
| ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), |
| |
| % Stop resharding globally and job should still stay completed |
| ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), |
| ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), |
| |
| % Start resharding and job stays completed |
| ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), |
| ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), |
| |
| ?assertMatch({200, #{?OK := true}}, req(delete, JobUrl)) |
| end)}. |
| |
| |
| handle_db_deletion_in_topoff1({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| JobId = delete_source_in_state(Top, Db1, topoff1), |
| wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) |
| end)}. |
| |
| |
| handle_db_deletion_in_initial_copy({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| JobId = delete_source_in_state(Top, Db1, initial_copy), |
| wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) |
| end)}. |
| |
| |
| handle_db_deletion_in_copy_local_docs({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| JobId = delete_source_in_state(Top, Db1, copy_local_docs), |
| wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) |
| end)}. |
| |
| |
| handle_db_deletion_in_build_indices({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| JobId = delete_source_in_state(Top, Db1, build_indices), |
| wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) |
| end)}. |
| |
| |
| handle_db_deletion_in_update_shard_map({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| JobId = delete_source_in_state(Top, Db1, update_shardmap), |
| wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) |
| end)}. |
| |
| |
| handle_db_deletion_in_wait_source_close({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| JobId = delete_source_in_state(Top, Db1, wait_source_close), |
| wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) |
| end)}. |
| |
| |
| recover_in_topoff1({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| JobId = recover_in_state(Top, Db1, topoff1), |
| wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) |
| end)}. |
| |
| |
| recover_in_initial_copy({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| JobId = recover_in_state(Top, Db1, initial_copy), |
| wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) |
| end)}. |
| |
| |
| recover_in_copy_local_docs({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| JobId = recover_in_state(Top, Db1, copy_local_docs), |
| wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) |
| end)}. |
| |
| |
| recover_in_build_indices({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| JobId = recover_in_state(Top, Db1, build_indices), |
| wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) |
| end)}. |
| |
| |
| recover_in_update_shard_map({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| JobId = recover_in_state(Top, Db1, update_shardmap), |
| wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) |
| end)}. |
| |
| |
| recover_in_wait_source_close({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| JobId = recover_in_state(Top, Db1, wait_source_close), |
| wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) |
| end)}. |
| |
| |
| recover_in_topoff3({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| JobId = recover_in_state(Top, Db1, topoff3), |
| wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) |
| end)}. |
| |
| |
| recover_in_source_delete({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| JobId = recover_in_state(Top, Db1, source_delete), |
| wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) |
| end)}. |
| |
| |
| check_max_jobs({Top, {Db1, Db2, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| Jobs = Top ++ ?JOBS, |
| |
| config:set("reshard", "max_jobs", "0", _Persist=false), |
| {C1, R1} = req(post, Jobs, #{type => split, db => Db1}), |
| ?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]}, {C1, R1}), |
| |
| config:set("reshard", "max_jobs", "1", _Persist=false), |
| {201, R2} = req(post, Jobs, #{type => split, db => Db1}), |
| wait_to_complete(Top, R2), |
| |
| % Stop clustering so jobs are not started anymore and ensure max jobs |
| % is enforced even if jobs are stopped |
| ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), |
| |
| {C3, R3} = req(post, Jobs, #{type => split, db => Db2}), |
| ?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]}, |
| {C3, R3}), |
| |
| % Allow the job to be created by raising max_jobs |
| config:set("reshard", "max_jobs", "2", _Persist=false), |
| |
| {C4, R4} = req(post, Jobs, #{type => split, db => Db2}), |
| ?assertEqual(201, C4), |
| |
| % Lower max_jobs after job is created but it's not running |
| config:set("reshard", "max_jobs", "1", _Persist=false), |
| |
| % Start resharding again |
| ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), |
| |
| % Jobs that have been created already are not removed if max jobs is lowered |
| % so make sure the job completes |
| wait_to_complete(Top, R4) |
| end)}. |
| |
| |
| check_node_and_range_required_params({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| Jobs = Top ++ ?JOBS, |
| |
| Node = atom_to_binary(node(), utf8), |
| Range = <<"00000000-ffffffff">>, |
| |
| config:set("reshard", "require_node_param", "true", _Persist=false), |
| {C1, R1} = req(post, Jobs, #{type => split, db => Db1}), |
| NodeRequiredErr = <<"`node` prameter is required">>, |
| ?assertEqual({400, #{<<"error">> => <<"bad_request">>, |
| <<"reason">> => NodeRequiredErr}}, {C1, R1}), |
| |
| config:set("reshard", "require_range_param", "true", _Persist=false), |
| {C2, R2} = req(post, Jobs, #{type => split, db => Db1, node => Node}), |
| RangeRequiredErr = <<"`range` prameter is required">>, |
| ?assertEqual({400, #{<<"error">> => <<"bad_request">>, |
| <<"reason">> => RangeRequiredErr}}, {C2, R2}), |
| |
| Body = #{type => split, db => Db1, range => Range, node => Node}, |
| {C3, R3} = req(post, Jobs, Body), |
| ?assertMatch({201, [#{?OK := true}]}, {C3, R3}), |
| wait_to_complete_then_cleanup(Top, R3) |
| end)}. |
| |
| |
| cleanup_completed_jobs({Top, {Db1, _, _}}) -> |
| {timeout, ?TIMEOUT, ?_test(begin |
| Body = #{type => split, db => Db1}, |
| {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), |
| JobUrl = Top ++ ?JOBS ++ ?b2l(Id), |
| wait_state(JobUrl ++ "/state", <<"completed">>), |
| delete_db(Top, Db1), |
| wait_for_http_code(JobUrl, 404) |
| end)}. |
| |
| |
| % Test help functions |
| |
| wait_to_complete_then_cleanup(Top, Jobs) -> |
| JobsUrl = Top ++ ?JOBS, |
| lists:foreach(fun(#{?ID := Id}) -> |
| wait_state(JobsUrl ++ ?b2l(Id) ++ "/state", <<"completed">>), |
| {200, _} = req(delete, JobsUrl ++ ?b2l(Id)) |
| end, Jobs). |
| |
| |
| wait_to_complete(Top, Jobs) -> |
| JobsUrl = Top ++ ?JOBS, |
| lists:foreach(fun(#{?ID := Id}) -> |
| wait_state(JobsUrl ++ ?b2l(Id) ++ "/state", <<"completed">>) |
| end, Jobs). |
| |
| |
| intercept_state(State) -> |
| TestPid = self(), |
| meck:new(mem3_reshard_job, [passthrough]), |
| meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) -> |
| case Job#job.split_state of |
| State -> |
| TestPid ! {self(), State}, |
| receive |
| continue -> meck:passthrough([Job]); |
| cancel -> ok |
| end; |
| _ -> |
| meck:passthrough([Job]) |
| end |
| end). |
| |
| |
| cancel_intercept() -> |
| meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) -> |
| meck:passthrough([Job]) |
| end). |
| |
| |
| wait_state(Url, State) -> |
| test_util:wait(fun() -> |
| case req(get, Url) of |
| {200, #{<<"state">> := State}} -> ok; |
| {200, #{}} -> timer:sleep(100), wait |
| end |
| end, 30000). |
| |
| |
| wait_for_http_code(Url, Code) when is_integer(Code) -> |
| test_util:wait(fun() -> |
| case req(get, Url) of |
| {Code, _} -> ok; |
| {_, _} -> timer:sleep(100), wait |
| end |
| end, 30000). |
| |
| |
| delete_source_in_state(Top, Db, State) when is_atom(State), is_binary(Db) -> |
| intercept_state(State), |
| Body = #{type => split, db => Db}, |
| {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), |
| receive {JobPid, State} -> ok end, |
| sync_delete_db(Top, Db), |
| JobPid ! continue, |
| Id. |
| |
| |
| recover_in_state(Top, Db, State) when is_atom(State) -> |
| intercept_state(State), |
| Body = #{type => split, db => Db}, |
| {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), |
| receive {JobPid, State} -> ok end, |
| % Job is now stuck in running we prevented it from executing |
| % the given state |
| JobPid ! cancel, |
| % Now restart resharding |
| ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), |
| cancel_intercept(), |
| ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), |
| Id. |
| |
| |
| create_db(Top, Db, QArgs) when is_binary(Db) -> |
| Url = Top ++ binary_to_list(Db) ++ QArgs, |
| {ok, Status, _, _} = test_request:put(Url, [?JSON, ?AUTH], "{}"), |
| ?assert(Status =:= 201 orelse Status =:= 202). |
| |
| |
| delete_db(Top, Db) when is_binary(Db) -> |
| Url = Top ++ binary_to_list(Db), |
| case test_request:get(Url, [?AUTH]) of |
| {ok, 404, _, _} -> |
| not_found; |
| {ok, 200, _, _} -> |
| {ok, 200, _, _} = test_request:delete(Url, [?AUTH]), |
| ok |
| end. |
| |
| |
| sync_delete_db(Top, Db) when is_binary(Db) -> |
| delete_db(Top, Db), |
| try |
| Shards = mem3:local_shards(Db), |
| ShardNames = [mem3:name(S) || S <- Shards], |
| [couch_server:delete(N, [?ADMIN_CTX]) || N <- ShardNames], |
| ok |
| catch |
| error:database_does_not_exist -> |
| ok |
| end. |
| |
| |
| req(Method, Url) -> |
| Headers = [?AUTH], |
| {ok, Code, _, Res} = test_request:request(Method, Url, Headers), |
| {Code, jiffy:decode(Res, [return_maps])}. |
| |
| |
| req(Method, Url, #{} = Body) -> |
| req(Method, Url, jiffy:encode(Body)); |
| |
| req(Method, Url, Body) -> |
| Headers = [?JSON, ?AUTH], |
| {ok, Code, _, Res} = test_request:request(Method, Url, Headers, Body), |
| {Code, jiffy:decode(Res, [return_maps])}. |