blob: 874bd8196b3730c63b72561aba597e3f595ba89f [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(mem3_seeds).
-behaviour(gen_server).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2
]).
-export([
start_link/0,
get_seeds/0,
get_status/0
]).
-record(st, {
ready = false,
seeds = [],
jobref = nil,
% map keyed on node name
status = #{}
}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
get_seeds() ->
case config:get("cluster", "seedlist") of
undefined ->
[];
List ->
Nodes = string:tokens(List, ","),
Seeds = [list_to_atom(Node) || Node <- Nodes] -- [node()],
mem3_util:rotate_list(node(), Seeds)
end.
get_status() ->
gen_server:call(?MODULE, get_status).
init([]) ->
Seeds = get_seeds(),
Ready =
case Seeds of
[] -> true;
_ -> false
end,
St = #st{
seeds = Seeds,
ready = Ready,
jobref = start_replication(Seeds),
status = maps:from_keys(Seeds, #{})
},
{ok, St}.
handle_call(get_status, _From, St) ->
Ready =
case St#st.ready of
true -> ok;
false -> seeding
end,
Status = #{status => Ready, seeds => St#st.status},
{reply, {ok, Status}, St}.
handle_cast(_Msg, St) ->
{noreply, St}.
handle_info(start_replication, #st{jobref = nil} = St) ->
JobRef = start_replication(St#st.seeds),
{noreply, St#st{jobref = JobRef}};
handle_info({'DOWN', Ref, _, Pid, Output}, #st{jobref = {Pid, Ref}} = St) ->
{noreply, update_state(St, Output)};
handle_info(_Msg, St) ->
{noreply, St}.
% internal functions
start_replication([]) ->
nil;
start_replication([Seed | _]) ->
spawn_monitor(fun() ->
Reply = mem3_rpc:pull_replication(Seed),
exit({ok, Reply})
end).
update_state(#st{} = St, {ok, Data}) ->
#st{seeds = [Current | Tail], status = #{} = Status} = St,
Report = #{
timestamp => list_to_binary(mem3_util:iso8601_timestamp()),
pending_updates => lists:foldl(fun pending_updates/2, #{}, Data),
last_replication_status => ok
},
Ready = is_ready(St#st.ready, Data),
case Ready of
true ->
Seeds = Tail ++ [Current],
Job = nil;
false ->
% Try to progress this same seed again
Seeds = [Current | Tail],
Job = start_replication([Current | Tail])
end,
St#st{
seeds = Seeds,
jobref = Job,
ready = Ready,
status = Status#{Current => Report}
};
update_state(#st{} = St, {_Error, _Stack}) ->
#st{seeds = [Current | Tail], status = #{} = Status} = St,
Report = #{
timestamp => list_to_binary(mem3_util:iso8601_timestamp()),
last_replication_status => error
},
Seeds = Tail ++ [Current],
if
not St#st.ready ->
erlang:send_after(1000, self(), start_replication);
true ->
ok
end,
St#st{
seeds = Seeds,
jobref = nil,
status = Status#{Current => Report}
}.
is_ready(true, _) ->
true;
is_ready(false, Data) ->
lists:all(fun({_DbName, Pending}) -> Pending =:= {ok, 0} end, Data).
pending_updates({DbName, Status}, #{} = Acc) ->
case Status of
{ok, Pending} when is_number(Pending) ->
Acc#{DbName => Pending};
{error, Tag} ->
Acc#{DbName => list_to_binary(io_lib:format("~p", [Tag]))};
_Else ->
Acc#{DbName => unknown_error}
end.