| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(mem3_sync). |
| -behaviour(gen_server). |
| -vsn(1). |
| -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, |
| code_change/3]). |
| |
| -export([start_link/0, get_active/0, get_queue/0, push/1, push/2, |
| remove_node/1, remove_shard/1, initial_sync/1, get_backlog/0, nodes_db/0, |
| shards_db/0, users_db/0, find_next_node/0]). |
| -export([ |
| local_dbs/0 |
| ]). |
| |
| -import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]). |
| |
| -include_lib("mem3/include/mem3.hrl"). |
| -include_lib("couch/include/couch_db.hrl"). |
| |
| -record(state, { |
| active = [], |
| count = 0, |
| limit, |
| dict = dict:new(), |
| waiting = queue:new() |
| }). |
| |
| -record(job, {name, node, count=nil, pid=nil}). |
| |
| start_link() -> |
| gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). |
| |
| get_active() -> |
| gen_server:call(?MODULE, get_active). |
| |
| get_queue() -> |
| gen_server:call(?MODULE, get_queue). |
| |
| get_backlog() -> |
| gen_server:call(?MODULE, get_backlog). |
| |
| push(#shard{name = Name}, Target) -> |
| push(Name, Target); |
| push(Name, #shard{node=Node}) -> |
| push(Name, Node); |
| push(Name, Node) -> |
| push(#job{name = Name, node = Node}). |
| |
| push(#job{node = Node} = Job) when Node =/= node() -> |
| gen_server:cast(?MODULE, {push, Job}); |
| push(_) -> |
| ok. |
| |
| remove_node(Node) -> |
| gen_server:cast(?MODULE, {remove_node, Node}). |
| |
| remove_shard(Shard) -> |
| gen_server:cast(?MODULE, {remove_shard, Shard}). |
| |
| init([]) -> |
| process_flag(trap_exit, true), |
| Concurrency = config:get("mem3", "sync_concurrency", "10"), |
| gen_event:add_handler(mem3_events, mem3_sync_event, []), |
| initial_sync(), |
| {ok, #state{limit = list_to_integer(Concurrency)}}. |
| |
| handle_call({push, Job}, From, State) -> |
| handle_cast({push, Job#job{pid = From}}, State); |
| |
| handle_call(get_active, _From, State) -> |
| {reply, State#state.active, State}; |
| |
| handle_call(get_queue, _From, State) -> |
| {reply, to_list(State#state.waiting), State}; |
| |
| handle_call(get_backlog, _From, #state{active=A, waiting=WQ} = State) -> |
| CA = lists:sum([C || #job{count=C} <- A, is_integer(C)]), |
| CW = lists:sum([C || #job{count=C} <- to_list(WQ), is_integer(C)]), |
| {reply, CA+CW, State}. |
| |
| handle_cast({push, DbName, Node}, State) -> |
| handle_cast({push, #job{name = DbName, node = Node}}, State); |
| |
| handle_cast({push, Job}, #state{count=Count, limit=Limit} = State) |
| when Count >= Limit -> |
| {noreply, add_to_queue(State, Job)}; |
| |
| handle_cast({push, Job}, State) -> |
| #state{active = L, count = C} = State, |
| #job{name = DbName, node = Node} = Job, |
| case is_running(DbName, Node, L) of |
| true -> |
| {noreply, add_to_queue(State, Job)}; |
| false -> |
| Pid = start_push_replication(Job), |
| {noreply, State#state{active=[Job#job{pid=Pid}|L], count=C+1}} |
| end; |
| |
| handle_cast({remove_node, Node}, #state{waiting = W0} = State) -> |
| {Alive, Dead} = lists:partition(fun(#job{node=N}) -> N =/= Node end, to_list(W0)), |
| Dict = remove_entries(State#state.dict, Dead), |
| [exit(Pid, die_now) || #job{node=N, pid=Pid} <- State#state.active, |
| N =:= Node], |
| {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}; |
| |
| handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) -> |
| {Alive, Dead} = lists:partition(fun(#job{name=S}) -> |
| S =/= Shard end, to_list(W0)), |
| Dict = remove_entries(State#state.dict, Dead), |
| [exit(Pid, die_now) || #job{name=S, pid=Pid} <- State#state.active, |
| S =:= Shard], |
| {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}. |
| |
| handle_info({'EXIT', Active, normal}, State) -> |
| handle_replication_exit(State, Active); |
| |
| handle_info({'EXIT', Active, die_now}, State) -> |
| % we forced this one ourselves, do not retry |
| handle_replication_exit(State, Active); |
| |
| handle_info({'EXIT', Active, {{not_found, no_db_file}, _Stack}}, State) -> |
| % target doesn't exist, do not retry |
| handle_replication_exit(State, Active); |
| |
| handle_info({'EXIT', Active, Reason}, State) -> |
| NewState = case lists:keyfind(Active, #job.pid, State#state.active) of |
| #job{name=OldDbName, node=OldNode} = Job -> |
| couch_log:warning("~s ~s ~s ~w", [?MODULE, OldDbName, OldNode, Reason]), |
| case Reason of {pending_changes, Count} -> |
| maybe_resubmit(State, Job#job{pid = nil, count = Count}); |
| _ -> |
| case mem3:db_is_current(Job#job.name) of |
| true -> |
| timer:apply_after(5000, ?MODULE, push, [Job#job{pid=nil}]); |
| false -> |
| % no need to retry (db deleted or recreated) |
| ok |
| end, |
| State |
| end; |
| false -> State end, |
| handle_replication_exit(NewState, Active); |
| |
| handle_info(Msg, State) -> |
| couch_log:notice("unexpected msg at replication manager ~p", [Msg]), |
| {noreply, State}. |
| |
| terminate(_Reason, State) -> |
| [exit(Pid, shutdown) || #job{pid=Pid} <- State#state.active], |
| ok. |
| |
| code_change(_, #state{waiting = WaitingList} = State, _) when is_list(WaitingList) -> |
| {ok, State#state{waiting = from_list(WaitingList)}}; |
| |
| code_change(_, State, _) -> |
| {ok, State}. |
| |
| maybe_resubmit(State, #job{name=DbName, node=Node} = Job) -> |
| case lists:member(DbName, local_dbs()) of |
| true -> |
| case find_next_node() of |
| Node -> |
| add_to_queue(State, Job); |
| _ -> |
| State % don't resubmit b/c we have a new replication target |
| end; |
| false -> |
| add_to_queue(State, Job) |
| end. |
| |
| handle_replication_exit(State, Pid) -> |
| #state{active=Active, limit=Limit, dict=D, waiting=Waiting} = State, |
| Active1 = lists:keydelete(Pid, #job.pid, Active), |
| case is_empty(Waiting) of |
| true -> |
| {noreply, State#state{active=Active1, count=length(Active1)}}; |
| _ -> |
| Count = length(Active1), |
| NewState = if Count < Limit -> |
| case next_replication(Active1, Waiting, queue:new()) of |
| nil -> % all waiting replications are also active |
| State#state{active = Active1, count = Count}; |
| {#job{name=DbName, node=Node} = Job, StillWaiting} -> |
| NewPid = start_push_replication(Job), |
| State#state{ |
| active = [Job#job{pid = NewPid} | Active1], |
| count = Count+1, |
| dict = dict:erase({DbName,Node}, D), |
| waiting = StillWaiting |
| } |
| end; |
| true -> |
| State#state{active = Active1, count=Count} |
| end, |
| {noreply, NewState} |
| end. |
| |
| start_push_replication(#job{name=Name, node=Node, pid=From}) -> |
| if From =/= nil -> gen_server:reply(From, ok); true -> ok end, |
| spawn_link(fun() -> |
| case mem3_rep:go(Name, maybe_redirect(Node)) of |
| {ok, Pending} when Pending > 0 -> |
| exit({pending_changes, Pending}); |
| _ -> |
| ok |
| end |
| end). |
| |
| add_to_queue(State, #job{name=DbName, node=Node, pid=From} = Job) -> |
| #state{dict=D, waiting=WQ} = State, |
| case dict:is_key({DbName, Node}, D) of |
| true -> |
| if From =/= nil -> gen_server:reply(From, ok); true -> ok end, |
| State; |
| false -> |
| couch_log:debug("adding ~s -> ~p to mem3_sync queue", [DbName, Node]), |
| State#state{ |
| dict = dict:store({DbName,Node}, ok, D), |
| waiting = in(Job, WQ) |
| } |
| end. |
| |
| sync_nodes_and_dbs() -> |
| Node = find_next_node(), |
| [push(Db, Node) || Db <- local_dbs()]. |
| |
| initial_sync() -> |
| [net_kernel:connect_node(Node) || Node <- mem3:nodes()], |
| mem3_sync_nodes:add(nodes()). |
| |
| initial_sync(Live) -> |
| sync_nodes_and_dbs(), |
| Acc = {node(), Live, []}, |
| {_, _, Shards} = mem3_shards:fold(fun initial_sync_fold/2, Acc), |
| submit_replication_tasks(node(), Live, Shards). |
| |
| initial_sync_fold(#shard{dbname = Db} = Shard, {LocalNode, Live, AccShards}) -> |
| case AccShards of |
| [#shard{dbname = AccDb} | _] when Db =/= AccDb -> |
| submit_replication_tasks(LocalNode, Live, AccShards), |
| {LocalNode, Live, [Shard]}; |
| _ -> |
| {LocalNode, Live, [Shard|AccShards]} |
| end. |
| |
| submit_replication_tasks(LocalNode, Live, Shards) -> |
| SplitFun = fun(#shard{node = Node}) -> Node =:= LocalNode end, |
| {Local, Remote} = lists:partition(SplitFun, Shards), |
| lists:foreach(fun(#shard{name = ShardName}) -> |
| [sync_push(ShardName, N) || #shard{node=N, name=Name} <- Remote, |
| Name =:= ShardName, lists:member(N, Live)] |
| end, Local). |
| |
| sync_push(ShardName, N) -> |
| gen_server:call(mem3_sync, {push, #job{name=ShardName, node=N}}, infinity). |
| |
| |
| |
| find_next_node() -> |
| LiveNodes = [node()|nodes()], |
| AllNodes0 = lists:sort(mem3:nodes()), |
| AllNodes1 = [X || X <- AllNodes0, lists:member(X, LiveNodes)], |
| AllNodes = AllNodes1 ++ [hd(AllNodes1)], |
| [_Self, Next| _] = lists:dropwhile(fun(N) -> N =/= node() end, AllNodes), |
| Next. |
| |
| %% @doc Finds the next {DbName,Node} pair in the list of waiting replications |
| %% which does not correspond to an already running replication |
| -spec next_replication([#job{}], queue:queue(_), queue:queue(_)) -> |
| {#job{}, queue:queue(_)} | nil. |
| next_replication(Active, Waiting, WaitingAndRunning) -> |
| case is_empty(Waiting) of |
| true -> |
| nil; |
| false -> |
| {{value, #job{name=S, node=N} = Job}, RemQ} = out(Waiting), |
| case is_running(S,N,Active) of |
| true -> |
| next_replication(Active, RemQ, in(Job, WaitingAndRunning)); |
| false -> |
| {Job, join(RemQ, WaitingAndRunning)} |
| end |
| end. |
| |
| is_running(DbName, Node, ActiveList) -> |
| [] =/= [true || #job{name=S, node=N} <- ActiveList, S=:=DbName, N=:=Node]. |
| |
| remove_entries(Dict, Entries) -> |
| lists:foldl(fun(#job{name=S, node=N}, D) -> |
| dict:erase({S, N}, D) |
| end, Dict, Entries). |
| |
| local_dbs() -> |
| [nodes_db(), shards_db(), users_db()]. |
| |
| nodes_db() -> |
| ?l2b(config:get("mem3", "nodes_db", "_nodes")). |
| |
| shards_db() -> |
| ?l2b(config:get("mem3", "shards_db", "_dbs")). |
| |
| users_db() -> |
| ?l2b(config:get("couch_httpd_auth", "authentication_db", "_users")). |
| |
| maybe_redirect(Node) -> |
| case config:get("mem3.redirects", atom_to_list(Node)) of |
| undefined -> |
| Node; |
| Redirect -> |
| couch_log:debug("Redirecting push from ~p to ~p", [Node, Redirect]), |
| list_to_existing_atom(Redirect) |
| end. |