blob: 8170f3c1a753be8cf9d9d51aee85743c4d42720d [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(mem3_sync).
-behaviour(gen_server).
-vsn(1).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([start_link/0, get_active/0, get_queue/0, push/1, push/2,
remove_node/1, remove_shard/1, initial_sync/1, get_backlog/0, nodes_db/0,
shards_db/0, users_db/0, find_next_node/0]).
-export([
local_dbs/0
]).
-import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]).
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-record(state, {
active = [],
count = 0,
limit,
dict = dict:new(),
waiting = queue:new()
}).
-record(job, {name, node, count=nil, pid=nil}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
get_active() ->
gen_server:call(?MODULE, get_active).
get_queue() ->
gen_server:call(?MODULE, get_queue).
get_backlog() ->
gen_server:call(?MODULE, get_backlog).
push(#shard{name = Name}, Target) ->
push(Name, Target);
push(Name, #shard{node=Node}) ->
push(Name, Node);
push(Name, Node) ->
push(#job{name = Name, node = Node}).
push(#job{node = Node} = Job) when Node =/= node() ->
gen_server:cast(?MODULE, {push, Job});
push(_) ->
ok.
remove_node(Node) ->
gen_server:cast(?MODULE, {remove_node, Node}).
remove_shard(Shard) ->
gen_server:cast(?MODULE, {remove_shard, Shard}).
init([]) ->
process_flag(trap_exit, true),
Concurrency = config:get("mem3", "sync_concurrency", "10"),
gen_event:add_handler(mem3_events, mem3_sync_event, []),
initial_sync(),
{ok, #state{limit = list_to_integer(Concurrency)}}.
handle_call({push, Job}, From, State) ->
handle_cast({push, Job#job{pid = From}}, State);
handle_call(get_active, _From, State) ->
{reply, State#state.active, State};
handle_call(get_queue, _From, State) ->
{reply, to_list(State#state.waiting), State};
handle_call(get_backlog, _From, #state{active=A, waiting=WQ} = State) ->
CA = lists:sum([C || #job{count=C} <- A, is_integer(C)]),
CW = lists:sum([C || #job{count=C} <- to_list(WQ), is_integer(C)]),
{reply, CA+CW, State}.
handle_cast({push, DbName, Node}, State) ->
handle_cast({push, #job{name = DbName, node = Node}}, State);
handle_cast({push, Job}, #state{count=Count, limit=Limit} = State)
when Count >= Limit ->
{noreply, add_to_queue(State, Job)};
handle_cast({push, Job}, State) ->
#state{active = L, count = C} = State,
#job{name = DbName, node = Node} = Job,
case is_running(DbName, Node, L) of
true ->
{noreply, add_to_queue(State, Job)};
false ->
Pid = start_push_replication(Job),
{noreply, State#state{active=[Job#job{pid=Pid}|L], count=C+1}}
end;
handle_cast({remove_node, Node}, #state{waiting = W0} = State) ->
{Alive, Dead} = lists:partition(fun(#job{node=N}) -> N =/= Node end, to_list(W0)),
Dict = remove_entries(State#state.dict, Dead),
[exit(Pid, die_now) || #job{node=N, pid=Pid} <- State#state.active,
N =:= Node],
{noreply, State#state{dict = Dict, waiting = from_list(Alive)}};
handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) ->
{Alive, Dead} = lists:partition(fun(#job{name=S}) ->
S =/= Shard end, to_list(W0)),
Dict = remove_entries(State#state.dict, Dead),
[exit(Pid, die_now) || #job{name=S, pid=Pid} <- State#state.active,
S =:= Shard],
{noreply, State#state{dict = Dict, waiting = from_list(Alive)}}.
handle_info({'EXIT', Active, normal}, State) ->
handle_replication_exit(State, Active);
handle_info({'EXIT', Active, die_now}, State) ->
% we forced this one ourselves, do not retry
handle_replication_exit(State, Active);
handle_info({'EXIT', Active, {{not_found, no_db_file}, _Stack}}, State) ->
% target doesn't exist, do not retry
handle_replication_exit(State, Active);
handle_info({'EXIT', Active, Reason}, State) ->
NewState = case lists:keyfind(Active, #job.pid, State#state.active) of
#job{name=OldDbName, node=OldNode} = Job ->
couch_log:warning("~s ~s ~s ~w", [?MODULE, OldDbName, OldNode, Reason]),
case Reason of {pending_changes, Count} ->
maybe_resubmit(State, Job#job{pid = nil, count = Count});
_ ->
case mem3:db_is_current(Job#job.name) of
true ->
timer:apply_after(5000, ?MODULE, push, [Job#job{pid=nil}]);
false ->
% no need to retry (db deleted or recreated)
ok
end,
State
end;
false -> State end,
handle_replication_exit(NewState, Active);
handle_info(Msg, State) ->
couch_log:notice("unexpected msg at replication manager ~p", [Msg]),
{noreply, State}.
terminate(_Reason, State) ->
[exit(Pid, shutdown) || #job{pid=Pid} <- State#state.active],
ok.
code_change(_, #state{waiting = WaitingList} = State, _) when is_list(WaitingList) ->
{ok, State#state{waiting = from_list(WaitingList)}};
code_change(_, State, _) ->
{ok, State}.
maybe_resubmit(State, #job{name=DbName, node=Node} = Job) ->
case lists:member(DbName, local_dbs()) of
true ->
case find_next_node() of
Node ->
add_to_queue(State, Job);
_ ->
State % don't resubmit b/c we have a new replication target
end;
false ->
add_to_queue(State, Job)
end.
handle_replication_exit(State, Pid) ->
#state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State,
Active1 = lists:keydelete(Pid, #job.pid, Active),
case is_empty(Waiting) of
true ->
{noreply, State#state{active=Active1, count=length(Active1)}};
_ ->
Count = length(Active1),
NewState = if Count < Limit ->
case next_replication(Active1, Waiting, queue:new()) of
nil -> % all waiting replications are also active
State#state{active = Active1, count = Count};
{#job{name=DbName, node=Node} = Job, StillWaiting} ->
NewPid = start_push_replication(Job),
State#state{
active = [Job#job{pid = NewPid} | Active1],
count = Count+1,
dict = dict:erase({DbName,Node}, D),
waiting = StillWaiting
}
end;
true ->
State#state{active = Active1, count=Count}
end,
{noreply, NewState}
end.
start_push_replication(#job{name=Name, node=Node, pid=From}) ->
if From =/= nil -> gen_server:reply(From, ok); true -> ok end,
spawn_link(fun() ->
case mem3_rep:go(Name, maybe_redirect(Node)) of
{ok, Pending} when Pending > 0 ->
exit({pending_changes, Pending});
_ ->
ok
end
end).
add_to_queue(State, #job{name=DbName, node=Node, pid=From} = Job) ->
#state{dict=D, waiting=WQ} = State,
case dict:is_key({DbName, Node}, D) of
true ->
if From =/= nil -> gen_server:reply(From, ok); true -> ok end,
State;
false ->
couch_log:debug("adding ~s -> ~p to mem3_sync queue", [DbName, Node]),
State#state{
dict = dict:store({DbName,Node}, ok, D),
waiting = in(Job, WQ)
}
end.
sync_nodes_and_dbs() ->
Node = find_next_node(),
[push(Db, Node) || Db <- local_dbs()].
initial_sync() ->
[net_kernel:connect_node(Node) || Node <- mem3:nodes()],
mem3_sync_nodes:add(nodes()).
initial_sync(Live) ->
sync_nodes_and_dbs(),
Acc = {node(), Live, []},
{_, _, Shards} = mem3_shards:fold(fun initial_sync_fold/2, Acc),
submit_replication_tasks(node(), Live, Shards).
initial_sync_fold(#shard{dbname = Db} = Shard, {LocalNode, Live, AccShards}) ->
case AccShards of
[#shard{dbname = AccDb} | _] when Db =/= AccDb ->
submit_replication_tasks(LocalNode, Live, AccShards),
{LocalNode, Live, [Shard]};
_ ->
{LocalNode, Live, [Shard|AccShards]}
end.
submit_replication_tasks(LocalNode, Live, Shards) ->
SplitFun = fun(#shard{node = Node}) -> Node =:= LocalNode end,
{Local, Remote} = lists:partition(SplitFun, Shards),
lists:foreach(fun(#shard{name = ShardName}) ->
[sync_push(ShardName, N) || #shard{node=N, name=Name} <- Remote,
Name =:= ShardName, lists:member(N, Live)]
end, Local).
sync_push(ShardName, N) ->
gen_server:call(mem3_sync, {push, #job{name=ShardName, node=N}}, infinity).
find_next_node() ->
LiveNodes = [node()|nodes()],
AllNodes0 = lists:sort(mem3:nodes()),
AllNodes1 = [X || X <- AllNodes0, lists:member(X, LiveNodes)],
AllNodes = AllNodes1 ++ [hd(AllNodes1)],
[_Self, Next| _] = lists:dropwhile(fun(N) -> N =/= node() end, AllNodes),
Next.
%% @doc Finds the next {DbName,Node} pair in the list of waiting replications
%% which does not correspond to an already running replication
-spec next_replication([#job{}], queue:queue(_), queue:queue(_)) ->
{#job{}, queue:queue(_)} | nil.
next_replication(Active, Waiting, WaitingAndRunning) ->
case is_empty(Waiting) of
true ->
nil;
false ->
{{value, #job{name=S, node=N} = Job}, RemQ} = out(Waiting),
case is_running(S,N,Active) of
true ->
next_replication(Active, RemQ, in(Job, WaitingAndRunning));
false ->
{Job, join(RemQ, WaitingAndRunning)}
end
end.
is_running(DbName, Node, ActiveList) ->
[] =/= [true || #job{name=S, node=N} <- ActiveList, S=:=DbName, N=:=Node].
remove_entries(Dict, Entries) ->
lists:foldl(fun(#job{name=S, node=N}, D) ->
dict:erase({S, N}, D)
end, Dict, Entries).
local_dbs() ->
[nodes_db(), shards_db(), users_db()].
nodes_db() ->
?l2b(config:get("mem3", "nodes_db", "_nodes")).
shards_db() ->
?l2b(config:get("mem3", "shards_db", "_dbs")).
users_db() ->
?l2b(config:get("couch_httpd_auth", "authentication_db", "_users")).
maybe_redirect(Node) ->
case config:get("mem3.redirects", atom_to_list(Node)) of
undefined ->
Node;
Redirect ->
couch_log:debug("Redirecting push from ~p to ~p", [Node, Redirect]),
list_to_existing_atom(Redirect)
end.