blob: 0d3377db7ccb054d3b6f02ae465bd4e05ba2a89b [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(mem3_reshard_api).
-export([
create_jobs/5,
get_jobs/0,
get_job/1,
get_summary/0,
resume_job/1,
stop_job/2,
start_shard_splitting/0,
stop_shard_splitting/1,
get_shard_splitting_state/0
]).
create_jobs(Node, Shard, Db, Range, split) ->
lists:map(fun(S) ->
N = mem3:node(S),
Name = mem3:name(S),
case rpc:call(N, mem3_reshard, start_split_job, [Name]) of
{badrpc, Error} ->
{error, Error, N, Name};
{ok, JobId} ->
{ok, JobId, N, Name};
{error, Error} ->
{error, Error, N, Name}
end
end, pick_shards(Node, Shard, Db, Range)).
get_jobs() ->
Nodes = mem3_util:live_nodes(),
{Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, jobs, []),
lists:flatten(Replies).
get_job(JobId) ->
Nodes = mem3_util:live_nodes(),
{Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, job, [JobId]),
case [JobInfo || {ok, JobInfo} <- Replies] of
[JobInfo | _] ->
{ok, JobInfo};
[] ->
{error, not_found}
end.
get_summary() ->
Nodes = mem3_util:live_nodes(),
{Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, get_state, []),
Stats0 = #{running => 0, total => 0, completed => 0, failed => 0,
stopped => 0},
StatsF = lists:foldl(fun({Res}, Stats) ->
maps:map(fun(Stat, OldVal) ->
OldVal + couch_util:get_value(Stat, Res, 0)
end, Stats)
end, Stats0, Replies),
{State, Reason} = state_and_reason(Replies),
StateReasonProps = [{state, State}, {state_reason, Reason}],
{StateReasonProps ++ lists:sort(maps:to_list(StatsF))}.
resume_job(JobId) ->
Nodes = mem3_util:live_nodes(),
{Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, resume_job,
[JobId]),
WithoutNotFound = [R || R <- Replies, R =/= {error, not_found}],
case lists:usort(WithoutNotFound) of
[ok] ->
ok;
[{error, Error} | _] ->
{error, {[{error, couch_util:to_binary(Error)}]}};
[] ->
{error, not_found}
end.
stop_job(JobId, Reason) ->
Nodes = mem3_util:live_nodes(),
{Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, stop_job,
[JobId, Reason]),
WithoutNotFound = [R || R <- Replies, R =/= {error, not_found}],
case lists:usort(WithoutNotFound) of
[ok] ->
ok;
[{error, Error} | _] ->
{error, {[{error, couch_util:to_binary(Error)}]}};
[] ->
{error, not_found}
end.
start_shard_splitting() ->
{Replies, _Bad} = rpc:multicall(mem3_reshard, start, []),
case lists:usort(lists:flatten(Replies)) of
[ok] ->
{ok, {[{ok, true}]}};
[Error | _] ->
{error, {[{error, couch_util:to_binary(Error)}]}}
end.
stop_shard_splitting(Reason) ->
{Replies, _Bad} = rpc:multicall(mem3_reshard, stop, [Reason]),
case lists:usort(lists:flatten(Replies)) of
[ok] ->
{ok, {[{ok, true}]}};
[Error | _] ->
{error, {[{error, couch_util:to_binary(Error)}]}}
end.
get_shard_splitting_state() ->
Nodes = mem3_util:live_nodes(),
{Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, get_state, []),
state_and_reason(Replies).
state_and_reason(StateReplies) ->
AccF = lists:foldl(fun({ResProps}, Acc) ->
Reason = get_reason(ResProps),
case couch_util:get_value(state, ResProps) of
<<"running">> -> orddict:append(running, Reason, Acc);
<<"stopped">> -> orddict:append(stopped, Reason, Acc);
undefined -> Acc
end
end, orddict:from_list([{running, []}, {stopped, []}]), StateReplies),
Running = orddict:fetch(running, AccF),
case length(Running) > 0 of
true ->
Reason = pick_reason(Running),
{running, Reason};
false ->
Reason = pick_reason(orddict:fetch(stopped, AccF)),
{stopped, Reason}
end.
pick_reason(Reasons) ->
Reasons1 = lists:usort(Reasons),
Reasons2 = [R || R <- Reasons1, R =/= undefined],
case Reasons2 of
[] -> null;
[R1 | _] -> R1
end.
get_reason(StateProps) when is_list(StateProps) ->
case couch_util:get_value(state_info, StateProps) of
[] -> undefined;
undefined -> undefined;
{SInfoProps} -> couch_util:get_value(reason, SInfoProps)
end.
pick_shards(undefined, undefined, Db, undefined) when is_binary(Db) ->
check_node_required(),
check_range_required(),
mem3:shards(Db);
pick_shards(Node, undefined, Db, undefined) when is_atom(Node),
is_binary(Db) ->
check_range_required(),
[S || S <- mem3:shards(Db), mem3:node(S) == Node];
pick_shards(undefined, undefined, Db, [_B, _E] = Range) when is_binary(Db) ->
check_node_required(),
[S || S <- mem3:shards(Db), mem3:range(S) == Range];
pick_shards(Node, undefined, Db, [_B, _E] = Range) when is_atom(Node),
is_binary(Db) ->
[S || S <- mem3:shards(Db), mem3:node(S) == Node, mem3:range(S) == Range];
pick_shards(undefined, Shard, undefined, undefined) when is_binary(Shard) ->
check_node_required(),
Db = mem3:dbname(Shard),
[S || S <- mem3:shards(Db), mem3:name(S) == Shard];
pick_shards(Node, Shard, undefined, undefined) when is_atom(Node),
is_binary(Shard) ->
Db = mem3:dbname(Shard),
[S || S <- mem3:shards(Db), mem3:name(S) == Shard, mem3:node(S) == Node];
pick_shards(_, undefined, undefined, _) ->
throw({bad_request, <<"Must specify at least `db` or `shard`">>});
pick_shards(_, Db, Shard, _) when is_binary(Db), is_binary(Shard) ->
throw({bad_request, <<"`db` and `shard` are mutually exclusive">>}).
check_node_required() ->
case config:get_boolean("reshard", "require_node_param", false) of
true ->
throw({bad_request, <<"`node` prameter is required">>});
false ->
ok
end.
check_range_required() ->
case config:get_boolean("reshard", "require_range_param", false) of
true ->
throw({bad_request, <<"`range` prameter is required">>});
false ->
ok
end.