blob: c4df24ad3f5390aeff62ff9bd232feb5209e8d76 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(mem3_reshard_api_test).
-include_lib("couch/include/couch_eunit.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("mem3/src/mem3_reshard.hrl").
-define(USER, "mem3_reshard_api_test_admin").
-define(PASS, "pass").
-define(AUTH, {basic_auth, {?USER, ?PASS}}).
-define(JSON, {"Content-Type", "application/json"}).
-define(RESHARD, "_reshard/").
-define(JOBS, "_reshard/jobs/").
-define(STATE, "_reshard/state").
-define(ID, <<"id">>).
-define(OK, <<"ok">>).
-define(TIMEOUT, 60). % seconds
setup() ->
Hashed = couch_passwords:hash_admin_password(?PASS),
ok = config:set("admins", ?USER, ?b2l(Hashed), _Persist=false),
Addr = config:get("chttpd", "bind_address", "127.0.0.1"),
Port = mochiweb_socket_server:get(chttpd, port),
Url = lists:concat(["http://", Addr, ":", Port, "/"]),
{Db1, Db2, Db3} = {?tempdb(), ?tempdb(), ?tempdb()},
create_db(Url, Db1, "?q=1&n=1"),
create_db(Url, Db2, "?q=1&n=1"),
create_db(Url, Db3, "?q=2&n=1"),
{Url, {Db1, Db2, Db3}}.
teardown({Url, {Db1, Db2, Db3}}) ->
mem3_reshard:reset_state(),
application:unset_env(mem3, reshard_disabled),
delete_db(Url, Db1),
delete_db(Url, Db2),
delete_db(Url, Db3),
ok = config:delete("reshard", "max_jobs", _Persist=false),
ok = config:delete("reshard", "require_node_param", _Persist=false),
ok = config:delete("reshard", "require_range_param", _Persist=false),
ok = config:delete("admins", ?USER, _Persist=false),
meck:unload().
start_couch() ->
test_util:start_couch([mem3, chttpd]).
stop_couch(Ctx) ->
test_util:stop_couch(Ctx).
mem3_reshard_api_test_() ->
{
"mem3 shard split api tests",
{
setup,
fun start_couch/0, fun stop_couch/1,
{
foreach,
fun setup/0, fun teardown/1,
[
fun basics/1,
fun create_job_basic/1,
fun create_two_jobs/1,
fun create_multiple_jobs_from_one_post/1,
fun start_stop_cluster_basic/1,
fun test_disabled/1,
fun start_stop_cluster_with_a_job/1,
fun individual_job_start_stop/1,
fun individual_job_stop_when_cluster_stopped/1,
fun create_job_with_invalid_arguments/1,
fun create_job_with_db/1,
fun create_job_with_shard_name/1,
fun completed_job_handling/1,
fun handle_db_deletion_in_initial_copy/1,
fun handle_db_deletion_in_topoff1/1,
fun handle_db_deletion_in_copy_local_docs/1,
fun handle_db_deletion_in_build_indices/1,
fun handle_db_deletion_in_update_shard_map/1,
fun handle_db_deletion_in_wait_source_close/1,
fun recover_in_initial_copy/1,
fun recover_in_topoff1/1,
fun recover_in_copy_local_docs/1,
fun recover_in_build_indices/1,
fun recover_in_update_shard_map/1,
fun recover_in_wait_source_close/1,
fun recover_in_topoff3/1,
fun recover_in_source_delete/1,
fun check_max_jobs/1,
fun check_node_and_range_required_params/1,
fun cleanup_completed_jobs/1
]
}
}
}.
basics({Top, _}) ->
{timeout, ?TIMEOUT, ?_test(begin
% GET /_reshard
?assertMatch({200, #{
<<"state">> := <<"running">>,
<<"state_reason">> := null,
<<"completed">> := 0,
<<"failed">> := 0,
<<"running">> := 0,
<<"stopped">> := 0,
<<"total">> := 0
}}, req(get, Top ++ ?RESHARD)),
% GET _reshard/state
?assertMatch({200, #{<<"state">> := <<"running">>}},
req(get, Top ++ ?STATE)),
% GET _reshard/jobs
?assertMatch({200, #{
<<"jobs">> := [],
<<"offset">> := 0,
<<"total_rows">> := 0
}}, req(get, Top ++ ?JOBS)),
% Some invalid paths and methods
?assertMatch({404, _}, req(get, Top ++ ?RESHARD ++ "/invalidpath")),
?assertMatch({405, _}, req(put, Top ++ ?RESHARD, #{dont => thinkso})),
?assertMatch({405, _}, req(post, Top ++ ?RESHARD, #{nope => nope}))
end)}.
create_job_basic({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
% POST /_reshard/jobs
{C1, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}),
?assertEqual(201, C1),
?assertMatch([#{?OK := true, ?ID := J, <<"shard">> := S}]
when is_binary(J) andalso is_binary(S), R1),
[#{?ID := Id, <<"shard">> := Shard}] = R1,
% GET /_reshard/jobs
?assertMatch({200, #{
<<"jobs">> := [#{?ID := Id, <<"type">> := <<"split">>}],
<<"offset">> := 0,
<<"total_rows">> := 1
}}, req(get, Top ++ ?JOBS)),
% GET /_reshard/job/$jobid
{C2, R2} = req(get, Top ++ ?JOBS ++ ?b2l(Id)),
?assertEqual(200, C2),
ThisNode = atom_to_binary(node(), utf8),
?assertMatch(#{?ID := Id}, R2),
?assertMatch(#{<<"type">> := <<"split">>}, R2),
?assertMatch(#{<<"source">> := Shard}, R2),
?assertMatch(#{<<"history">> := History} when length(History) > 1, R2),
?assertMatch(#{<<"node">> := ThisNode}, R2),
?assertMatch(#{<<"split_state">> := SSt} when is_binary(SSt), R2),
?assertMatch(#{<<"job_state">> := JSt} when is_binary(JSt), R2),
?assertMatch(#{<<"state_info">> := #{}}, R2),
?assertMatch(#{<<"target">> := Target} when length(Target) == 2, R2),
% GET /_reshard/job/$jobid/state
?assertMatch({200, #{<<"state">> := S, <<"reason">> := R}}
when is_binary(S) andalso (is_binary(R) orelse R =:= null),
req(get, Top ++ ?JOBS ++ ?b2l(Id) ++ "/state")),
% GET /_reshard
?assertMatch({200, #{<<"state">> := <<"running">>, <<"total">> := 1}},
req(get, Top ++ ?RESHARD)),
% DELETE /_reshard/jobs/$jobid
?assertMatch({200, #{?OK := true}},
req(delete, Top ++ ?JOBS ++ ?b2l(Id))),
% GET _reshard/jobs
?assertMatch({200, #{<<"jobs">> := [], <<"total_rows">> := 0}},
req(get, Top ++ ?JOBS)),
% GET /_reshard/job/$jobid should be a 404
?assertMatch({404, #{}}, req(get, Top ++ ?JOBS ++ ?b2l(Id))),
% DELETE /_reshard/jobs/$jobid should be a 404 as well
?assertMatch({404, #{}}, req(delete, Top ++ ?JOBS ++ ?b2l(Id)))
end)}.
create_two_jobs({Top, {Db1, Db2, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
Jobs = Top ++ ?JOBS,
?assertMatch({201, [#{?OK := true}]},
req(post, Jobs, #{type => split, db => Db1})),
?assertMatch({201, [#{?OK := true}]},
req(post, Jobs, #{type => split, db => Db2})),
?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD)),
?assertMatch({200, #{
<<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}],
<<"offset">> := 0,
<<"total_rows">> := 2
}} when Id1 =/= Id2, req(get, Jobs)),
{200, #{<<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}]}} = req(get, Jobs),
{200, #{?OK := true}} = req(delete, Jobs ++ ?b2l(Id1)),
?assertMatch({200, #{<<"total">> := 1}}, req(get, Top ++ ?RESHARD)),
{200, #{?OK := true}} = req(delete, Jobs ++ ?b2l(Id2)),
?assertMatch({200, #{<<"total">> := 0}}, req(get, Top ++ ?RESHARD))
end)}.
create_multiple_jobs_from_one_post({Top, {_, _, Db3}}) ->
{timeout, ?TIMEOUT, ?_test(begin
Jobs = Top ++ ?JOBS,
{C1, R1} = req(post, Jobs, #{type => split, db => Db3}),
?assertMatch({201, [#{?OK := true}, #{?OK := true}]}, {C1, R1}),
?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD))
end)}.
start_stop_cluster_basic({Top, _}) ->
{timeout, ?TIMEOUT, ?_test(begin
Url = Top ++ ?STATE,
?assertMatch({200, #{
<<"state">> := <<"running">>,
<<"reason">> := null
}}, req(get, Url)),
?assertMatch({200, _}, req(put, Url, #{state => stopped})),
?assertMatch({200, #{
<<"state">> := <<"stopped">>,
<<"reason">> := R
}} when is_binary(R), req(get, Url)),
?assertMatch({200, _}, req(put, Url, #{state => running})),
% Make sure the reason shows in the state GET request
Reason = <<"somereason">>,
?assertMatch({200, _}, req(put, Url, #{state => stopped,
reason => Reason})),
?assertMatch({200, #{<<"state">> := <<"stopped">>,
<<"reason">> := Reason}}, req(get, Url)),
% Top level summary also shows the reason
?assertMatch({200, #{
<<"state">> := <<"stopped">>,
<<"state_reason">> := Reason
}}, req(get, Top ++ ?RESHARD)),
?assertMatch({200, _}, req(put, Url, #{state => running})),
?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, Url))
end)}.
test_disabled({Top, _}) ->
{timeout, ?TIMEOUT, ?_test(begin
application:set_env(mem3, reshard_disabled, true),
?assertMatch({501, _}, req(get, Top ++ ?RESHARD)),
?assertMatch({501, _}, req(put, Top ++ ?STATE, #{state => running})),
application:unset_env(mem3, reshard_disabled),
?assertMatch({200, _}, req(get, Top ++ ?RESHARD)),
?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running}))
end)}.
start_stop_cluster_with_a_job({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
Url = Top ++ ?STATE,
?assertMatch({200, _}, req(put, Url, #{state => stopped})),
?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, Url)),
% Can add jobs with global state stopped, they just won't be running
{201, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}),
?assertMatch([#{?OK := true}], R1),
[#{?ID := Id1}] = R1,
{200, J1} = req(get, Top ++ ?JOBS ++ ?b2l(Id1)),
?assertMatch(#{?ID := Id1, <<"job_state">> := <<"stopped">>}, J1),
% Check summary stats
?assertMatch({200, #{
<<"state">> := <<"stopped">>,
<<"running">> := 0,
<<"stopped">> := 1,
<<"total">> := 1
}}, req(get, Top ++ ?RESHARD)),
% Can delete the job when stopped
{200, #{?OK := true}} = req(delete, Top ++ ?JOBS ++ ?b2l(Id1)),
?assertMatch({200, #{
<<"state">> := <<"stopped">>,
<<"running">> := 0,
<<"stopped">> := 0,
<<"total">> := 0
}}, req(get, Top ++ ?RESHARD)),
% Add same job again
{201, [#{?ID := Id2}]} = req(post, Top ++ ?JOBS, #{type => split,
db => Db1}),
?assertMatch({200, #{?ID := Id2, <<"job_state">> := <<"stopped">>}},
req(get, Top ++ ?JOBS ++ ?b2l(Id2))),
% Job should start after resharding is started on the cluster
?assertMatch({200, _}, req(put, Url, #{state => running})),
?assertMatch({200, #{?ID := Id2, <<"job_state">> := JSt}}
when JSt =/= <<"stopped">>, req(get, Top ++ ?JOBS ++ ?b2l(Id2)))
end)}.
individual_job_start_stop({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
intercept_state(topoff1),
Body = #{type => split, db => Db1},
{201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
StUrl = JobUrl ++ "/state",
% Wait for the the job to start running and intercept it in topoff1 state
receive {JobPid, topoff1} -> ok end,
% Tell the intercept to never finish checkpointing so job is left hanging
% forever in running state
JobPid ! cancel,
?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
{200, _} = req(put, StUrl, #{state => stopped}),
wait_state(StUrl, <<"stopped">>),
% Stop/start resharding globally and job should still stay stopped
?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
% Start the job again
?assertMatch({200, _}, req(put, StUrl, #{state => running})),
% Wait for the the job to start running and intercept it in topoff1 state
receive {JobPid2, topoff1} -> ok end,
?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
% Let it continue running and it should complete eventually
JobPid2 ! continue,
wait_state(StUrl, <<"completed">>)
end)}.
individual_job_stop_when_cluster_stopped({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
intercept_state(topoff1),
Body = #{type => split, db => Db1},
{201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
StUrl = JobUrl ++ "/state",
% Wait for the the job to start running and intercept in topoff1
receive {JobPid, topoff1} -> ok end,
% Tell the intercept to never finish checkpointing so job is left
% hanging forever in running state
JobPid ! cancel,
?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
% Stop resharding globally
?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
wait_state(StUrl, <<"stopped">>),
% Stop the job specifically
{200, _} = req(put, StUrl, #{state => stopped}),
% Job stays stopped
?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
% Set cluster to running again
?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
% The job should stay stopped
?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
% It should be possible to resume job and it should complete
?assertMatch({200, _}, req(put, StUrl, #{state => running})),
% Wait for the the job to start running and intercept in topoff1 state
receive {JobPid2, topoff1} -> ok end,
?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
% Let it continue running and it should complete eventually
JobPid2 ! continue,
wait_state(StUrl, <<"completed">>)
end)}.
create_job_with_invalid_arguments({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
Jobs = Top ++ ?JOBS,
% Nothing in the body
?assertMatch({400, _}, req(post, Jobs, #{})),
% Missing type
?assertMatch({400, _}, req(post, Jobs, #{db => Db1})),
% Have type but no db and no shard
?assertMatch({400, _}, req(post, Jobs, #{type => split})),
% Have type and db but db is invalid
?assertMatch({400, _}, req(post, Jobs, #{db => <<"baddb">>,
type => split})),
% Have type and shard but shard is not an existing database
?assertMatch({404, _}, req(post, Jobs, #{type => split,
shard => <<"shards/80000000-ffffffff/baddb.1549492084">>})),
% Bad range values, too large, different types, inverted
?assertMatch({400, _}, req(post, Jobs, #{db => Db1, range => 42,
type => split})),
?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
range => <<"x">>, type => split})),
?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
range => <<"ffffffff-80000000">>, type => split})),
?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
range => <<"00000000-fffffffff">>, type => split})),
% Can't have both db and shard
?assertMatch({400, _}, req(post, Jobs, #{type => split, db => Db1,
shard => <<"blah">>}))
end)}.
create_job_with_db({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
Jobs = Top ++ ?JOBS,
Body1 = #{type => split, db => Db1},
% Node with db
N = atom_to_binary(node(), utf8),
{C1, R1} = req(post, Jobs, Body1#{node => N}),
?assertMatch({201, [#{?OK := true}]}, {C1, R1}),
wait_to_complete_then_cleanup(Top, R1),
% Range and db
{C2, R2} = req(post, Jobs, Body1#{range => <<"00000000-7fffffff">>}),
?assertMatch({201, [#{?OK := true}]}, {C2, R2}),
wait_to_complete_then_cleanup(Top, R2),
% Node, range and db
Range = <<"80000000-ffffffff">>,
{C3, R3} = req(post, Jobs, Body1#{range => Range, node => N}),
?assertMatch({201, [#{?OK := true}]}, {C3, R3}),
wait_to_complete_then_cleanup(Top, R3),
?assertMatch([
[16#00000000, 16#3fffffff],
[16#40000000, 16#7fffffff],
[16#80000000, 16#bfffffff],
[16#c0000000, 16#ffffffff]
], [mem3:range(S) || S <- lists:sort(mem3:shards(Db1))])
end)}.
create_job_with_shard_name({Top, {_, _, Db3}}) ->
{timeout, ?TIMEOUT, ?_test(begin
Jobs = Top ++ ?JOBS,
[S1, S2] = [mem3:name(S) || S <- lists:sort(mem3:shards(Db3))],
% Shard only
{C1, R1} = req(post, Jobs, #{type => split, shard => S1}),
?assertMatch({201, [#{?OK := true}]}, {C1, R1}),
wait_to_complete_then_cleanup(Top, R1),
% Shard with a node
N = atom_to_binary(node(), utf8),
{C2, R2} = req(post, Jobs, #{type => split, shard => S2, node => N}),
?assertMatch({201, [#{?OK := true}]}, {C2, R2}),
wait_to_complete_then_cleanup(Top, R2),
?assertMatch([
[16#00000000, 16#3fffffff],
[16#40000000, 16#7fffffff],
[16#80000000, 16#bfffffff],
[16#c0000000, 16#ffffffff]
], [mem3:range(S) || S <- lists:sort(mem3:shards(Db3))])
end)}.
completed_job_handling({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
Jobs = Top ++ ?JOBS,
% Run job to completion
{C1, R1} = req(post, Jobs, #{type => split, db => Db1}),
?assertMatch({201, [#{?OK := true}]}, {C1, R1}),
[#{?ID := Id}] = R1,
wait_to_complete(Top, R1),
% Check top level stats
?assertMatch({200, #{
<<"state">> := <<"running">>,
<<"state_reason">> := null,
<<"completed">> := 1,
<<"failed">> := 0,
<<"running">> := 0,
<<"stopped">> := 0,
<<"total">> := 1
}}, req(get, Top ++ ?RESHARD)),
% Job state itself
JobUrl = Jobs ++ ?b2l(Id),
?assertMatch({200, #{
<<"split_state">> := <<"completed">>,
<<"job_state">> := <<"completed">>
}}, req(get, JobUrl)),
% Job's state endpoint
StUrl = Jobs ++ ?b2l(Id) ++ "/state",
?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
% Try to stop it and it should stay completed
{200, _} = req(put, StUrl, #{state => stopped}),
?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
% Try to resume it and it should stay completed
{200, _} = req(put, StUrl, #{state => running}),
?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
% Stop resharding globally and job should still stay completed
?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
% Start resharding and job stays completed
?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
?assertMatch({200, #{?OK := true}}, req(delete, JobUrl))
end)}.
handle_db_deletion_in_topoff1({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
JobId = delete_source_in_state(Top, Db1, topoff1),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
end)}.
handle_db_deletion_in_initial_copy({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
JobId = delete_source_in_state(Top, Db1, initial_copy),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
end)}.
handle_db_deletion_in_copy_local_docs({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
JobId = delete_source_in_state(Top, Db1, copy_local_docs),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
end)}.
handle_db_deletion_in_build_indices({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
JobId = delete_source_in_state(Top, Db1, build_indices),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
end)}.
handle_db_deletion_in_update_shard_map({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
JobId = delete_source_in_state(Top, Db1, update_shardmap),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
end)}.
handle_db_deletion_in_wait_source_close({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
JobId = delete_source_in_state(Top, Db1, wait_source_close),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
end)}.
recover_in_topoff1({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
JobId = recover_in_state(Top, Db1, topoff1),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end)}.
recover_in_initial_copy({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
JobId = recover_in_state(Top, Db1, initial_copy),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end)}.
recover_in_copy_local_docs({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
JobId = recover_in_state(Top, Db1, copy_local_docs),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end)}.
recover_in_build_indices({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
JobId = recover_in_state(Top, Db1, build_indices),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end)}.
recover_in_update_shard_map({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
JobId = recover_in_state(Top, Db1, update_shardmap),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end)}.
recover_in_wait_source_close({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
JobId = recover_in_state(Top, Db1, wait_source_close),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end)}.
recover_in_topoff3({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
JobId = recover_in_state(Top, Db1, topoff3),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end)}.
recover_in_source_delete({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
JobId = recover_in_state(Top, Db1, source_delete),
wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
end)}.
check_max_jobs({Top, {Db1, Db2, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
Jobs = Top ++ ?JOBS,
config:set("reshard", "max_jobs", "0", _Persist=false),
{C1, R1} = req(post, Jobs, #{type => split, db => Db1}),
?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]}, {C1, R1}),
config:set("reshard", "max_jobs", "1", _Persist=false),
{201, R2} = req(post, Jobs, #{type => split, db => Db1}),
wait_to_complete(Top, R2),
% Stop clustering so jobs are not started anymore and ensure max jobs
% is enforced even if jobs are stopped
?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
{C3, R3} = req(post, Jobs, #{type => split, db => Db2}),
?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]},
{C3, R3}),
% Allow the job to be created by raising max_jobs
config:set("reshard", "max_jobs", "2", _Persist=false),
{C4, R4} = req(post, Jobs, #{type => split, db => Db2}),
?assertEqual(201, C4),
% Lower max_jobs after job is created but it's not running
config:set("reshard", "max_jobs", "1", _Persist=false),
% Start resharding again
?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
% Jobs that have been created already are not removed if max jobs is lowered
% so make sure the job completes
wait_to_complete(Top, R4)
end)}.
check_node_and_range_required_params({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
Jobs = Top ++ ?JOBS,
Node = atom_to_binary(node(), utf8),
Range = <<"00000000-ffffffff">>,
config:set("reshard", "require_node_param", "true", _Persist=false),
{C1, R1} = req(post, Jobs, #{type => split, db => Db1}),
NodeRequiredErr = <<"`node` prameter is required">>,
?assertEqual({400, #{<<"error">> => <<"bad_request">>,
<<"reason">> => NodeRequiredErr}}, {C1, R1}),
config:set("reshard", "require_range_param", "true", _Persist=false),
{C2, R2} = req(post, Jobs, #{type => split, db => Db1, node => Node}),
RangeRequiredErr = <<"`range` prameter is required">>,
?assertEqual({400, #{<<"error">> => <<"bad_request">>,
<<"reason">> => RangeRequiredErr}}, {C2, R2}),
Body = #{type => split, db => Db1, range => Range, node => Node},
{C3, R3} = req(post, Jobs, Body),
?assertMatch({201, [#{?OK := true}]}, {C3, R3}),
wait_to_complete_then_cleanup(Top, R3)
end)}.
cleanup_completed_jobs({Top, {Db1, _, _}}) ->
{timeout, ?TIMEOUT, ?_test(begin
Body = #{type => split, db => Db1},
{201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
wait_state(JobUrl ++ "/state", <<"completed">>),
delete_db(Top, Db1),
wait_for_http_code(JobUrl, 404)
end)}.
% Test help functions
wait_to_complete_then_cleanup(Top, Jobs) ->
JobsUrl = Top ++ ?JOBS,
lists:foreach(fun(#{?ID := Id}) ->
wait_state(JobsUrl ++ ?b2l(Id) ++ "/state", <<"completed">>),
{200, _} = req(delete, JobsUrl ++ ?b2l(Id))
end, Jobs).
wait_to_complete(Top, Jobs) ->
JobsUrl = Top ++ ?JOBS,
lists:foreach(fun(#{?ID := Id}) ->
wait_state(JobsUrl ++ ?b2l(Id) ++ "/state", <<"completed">>)
end, Jobs).
intercept_state(State) ->
TestPid = self(),
meck:new(mem3_reshard_job, [passthrough]),
meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
case Job#job.split_state of
State ->
TestPid ! {self(), State},
receive
continue -> meck:passthrough([Job]);
cancel -> ok
end;
_ ->
meck:passthrough([Job])
end
end).
cancel_intercept() ->
meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
meck:passthrough([Job])
end).
wait_state(Url, State) ->
test_util:wait(fun() ->
case req(get, Url) of
{200, #{<<"state">> := State}} -> ok;
{200, #{}} -> timer:sleep(100), wait
end
end, 30000).
wait_for_http_code(Url, Code) when is_integer(Code) ->
test_util:wait(fun() ->
case req(get, Url) of
{Code, _} -> ok;
{_, _} -> timer:sleep(100), wait
end
end, 30000).
delete_source_in_state(Top, Db, State) when is_atom(State), is_binary(Db) ->
intercept_state(State),
Body = #{type => split, db => Db},
{201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
receive {JobPid, State} -> ok end,
sync_delete_db(Top, Db),
JobPid ! continue,
Id.
recover_in_state(Top, Db, State) when is_atom(State) ->
intercept_state(State),
Body = #{type => split, db => Db},
{201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
receive {JobPid, State} -> ok end,
% Job is now stuck in running we prevented it from executing
% the given state
JobPid ! cancel,
% Now restart resharding
?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
cancel_intercept(),
?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
Id.
create_db(Top, Db, QArgs) when is_binary(Db) ->
Url = Top ++ binary_to_list(Db) ++ QArgs,
{ok, Status, _, _} = test_request:put(Url, [?JSON, ?AUTH], "{}"),
?assert(Status =:= 201 orelse Status =:= 202).
delete_db(Top, Db) when is_binary(Db) ->
Url = Top ++ binary_to_list(Db),
case test_request:get(Url, [?AUTH]) of
{ok, 404, _, _} ->
not_found;
{ok, 200, _, _} ->
{ok, 200, _, _} = test_request:delete(Url, [?AUTH]),
ok
end.
sync_delete_db(Top, Db) when is_binary(Db) ->
delete_db(Top, Db),
try
Shards = mem3:local_shards(Db),
ShardNames = [mem3:name(S) || S <- Shards],
[couch_server:delete(N, [?ADMIN_CTX]) || N <- ShardNames],
ok
catch
error:database_does_not_exist ->
ok
end.
req(Method, Url) ->
Headers = [?AUTH],
{ok, Code, _, Res} = test_request:request(Method, Url, Headers),
{Code, jiffy:decode(Res, [return_maps])}.
req(Method, Url, #{} = Body) ->
req(Method, Url, jiffy:encode(Body));
req(Method, Url, Body) ->
Headers = [?JSON, ?AUTH],
{ok, Code, _, Res} = test_request:request(Method, Url, Headers, Body),
{Code, jiffy:decode(Res, [return_maps])}.