| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(mem3_sync). |
| -behaviour(gen_server). |
| -vsn(1). |
| -export([ |
| init/1, |
| handle_call/3, |
| handle_cast/2, |
| handle_info/2, |
| terminate/2, |
| code_change/3 |
| ]). |
| |
| -export([ |
| start_link/0, |
| get_active/0, |
| get_queue/0, |
| push/1, push/2, |
| remove_node/1, |
| remove_shard/1, |
| initial_sync/1, |
| get_backlog/0, |
| nodes_db/0, |
| shards_db/0, |
| users_db/0, |
| find_next_node/0 |
| ]). |
| -export([ |
| local_dbs/0 |
| ]). |
| |
| -import(queue, [in/2, out/1, to_list/1, join/2, from_list/1, is_empty/1]). |
| |
| -include_lib("mem3/include/mem3.hrl"). |
| -include_lib("couch/include/couch_db.hrl"). |
| |
| -define(DEFAULT_CONCURRENCY, 10). |
| -define(DEFAULT_BATCH_SIZE, 500). |
| -define(DEFAULT_BATCH_COUNT, 5). |
| |
| -record(state, { |
| active = [], |
| count = 0, |
| limit = ?DEFAULT_CONCURRENCY, |
| dict = dict:new(), |
| waiting = queue:new() |
| }). |
| |
| -record(job, {name, node, count = nil, pid = nil}). |
| |
| start_link() -> |
| gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). |
| |
| get_active() -> |
| gen_server:call(?MODULE, get_active). |
| |
| get_queue() -> |
| gen_server:call(?MODULE, get_queue). |
| |
| get_backlog() -> |
| gen_server:call(?MODULE, get_backlog). |
| |
| push(#shard{name = Name}, Target) -> |
| push(Name, Target); |
| push(Name, #shard{node = Node}) -> |
| push(Name, Node); |
| push(Name, Node) -> |
| push(#job{name = Name, node = Node}). |
| |
| push(#job{node = Node} = Job) when Node =/= node() -> |
| gen_server:cast(?MODULE, {push, Job}); |
| push(_) -> |
| ok. |
| |
| remove_node(Node) -> |
| gen_server:cast(?MODULE, {remove_node, Node}). |
| |
| remove_shard(Shard) -> |
| gen_server:cast(?MODULE, {remove_shard, Shard}). |
| |
| init([]) -> |
| process_flag(trap_exit, true), |
| Concurrency = config:get_integer("mem3", "sync_concurrency", ?DEFAULT_CONCURRENCY), |
| gen_event:add_handler(mem3_events, mem3_sync_event, []), |
| initial_sync(), |
| {ok, #state{limit = Concurrency}}. |
| |
| handle_call({push, Job}, From, State) -> |
| handle_cast({push, Job#job{pid = From}}, State); |
| handle_call(get_active, _From, State) -> |
| {reply, State#state.active, State}; |
| handle_call(get_queue, _From, State) -> |
| {reply, to_list(State#state.waiting), State}; |
| handle_call(get_backlog, _From, #state{active = A, waiting = WQ} = State) -> |
| CA = lists:sum([C || #job{count = C} <- A, is_integer(C)]), |
| CW = lists:sum([C || #job{count = C} <- to_list(WQ), is_integer(C)]), |
| {reply, CA + CW, State}. |
| |
| handle_cast({push, DbName, Node}, State) -> |
| handle_cast({push, #job{name = DbName, node = Node}}, State); |
| handle_cast({push, Job}, #state{count = Count, limit = Limit} = State) when |
| Count >= Limit |
| -> |
| {noreply, add_to_queue(State, Job)}; |
| handle_cast({push, Job}, State) -> |
| #state{active = L, count = C} = State, |
| #job{name = DbName, node = Node} = Job, |
| case is_running(DbName, Node, L) of |
| true -> |
| {noreply, add_to_queue(State, Job)}; |
| false -> |
| Pid = start_push_replication(Job), |
| {noreply, State#state{active = [Job#job{pid = Pid} | L], count = C + 1}} |
| end; |
| handle_cast({remove_node, Node}, #state{waiting = W0} = State) -> |
| {Alive, Dead} = lists:partition(fun(#job{node = N}) -> N =/= Node end, to_list(W0)), |
| Dict = remove_entries(State#state.dict, Dead), |
| [ |
| exit(Pid, die_now) |
| || #job{node = N, pid = Pid} <- State#state.active, |
| N =:= Node |
| ], |
| {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}; |
| handle_cast({remove_shard, Shard}, #state{waiting = W0} = State) -> |
| {Alive, Dead} = lists:partition( |
| fun(#job{name = S}) -> |
| S =/= Shard |
| end, |
| to_list(W0) |
| ), |
| Dict = remove_entries(State#state.dict, Dead), |
| [ |
| exit(Pid, die_now) |
| || #job{name = S, pid = Pid} <- State#state.active, |
| S =:= Shard |
| ], |
| {noreply, State#state{dict = Dict, waiting = from_list(Alive)}}. |
| |
| handle_info({'EXIT', Active, normal}, State) -> |
| handle_replication_exit(State, Active); |
| handle_info({'EXIT', Active, die_now}, State) -> |
| % we forced this one ourselves, do not retry |
| handle_replication_exit(State, Active); |
| handle_info({'EXIT', Active, {{not_found, no_db_file}, _Stack}}, State) -> |
| % target doesn't exist, do not retry |
| handle_replication_exit(State, Active); |
| handle_info({'EXIT', Active, Reason}, State) -> |
| NewState = |
| case lists:keyfind(Active, #job.pid, State#state.active) of |
| #job{name = OldDbName, node = OldNode} = Job -> |
| couch_log:warning("~s ~s ~s ~w", [?MODULE, OldDbName, OldNode, Reason]), |
| case Reason of |
| {pending_changes, Count} -> |
| maybe_resubmit(State, Job#job{pid = nil, count = Count}); |
| _ -> |
| case mem3:db_is_current(Job#job.name) of |
| true -> |
| timer:apply_after(5000, ?MODULE, push, [Job#job{pid = nil}]); |
| false -> |
| % no need to retry (db deleted or recreated) |
| ok |
| end, |
| State |
| end; |
| false -> |
| State |
| end, |
| handle_replication_exit(NewState, Active); |
| handle_info(Msg, State) -> |
| couch_log:notice("unexpected msg at replication manager ~p", [Msg]), |
| {noreply, State}. |
| |
| terminate(_Reason, State) -> |
| [exit(Pid, shutdown) || #job{pid = Pid} <- State#state.active], |
| ok. |
| |
| code_change(_, #state{waiting = WaitingList} = State, _) when is_list(WaitingList) -> |
| {ok, State#state{waiting = from_list(WaitingList)}}; |
| code_change(_, State, _) -> |
| {ok, State}. |
| |
| maybe_resubmit(State, #job{name = DbName, node = Node} = Job) -> |
| case lists:member(DbName, local_dbs()) of |
| true -> |
| case find_next_node() of |
| Node -> |
| add_to_queue(State, Job); |
| _ -> |
| % don't resubmit b/c we have a new replication target |
| State |
| end; |
| false -> |
| add_to_queue(State, Job) |
| end. |
| |
| handle_replication_exit(State, Pid) -> |
| #state{active = Active, limit = Limit, dict = D, waiting = Waiting} = State, |
| Active1 = lists:keydelete(Pid, #job.pid, Active), |
| case is_empty(Waiting) of |
| true -> |
| {noreply, State#state{active = Active1, count = length(Active1)}}; |
| _ -> |
| Count = length(Active1), |
| NewState = |
| if |
| Count < Limit -> |
| case next_replication(Active1, Waiting, queue:new()) of |
| % all waiting replications are also active |
| nil -> |
| State#state{active = Active1, count = Count}; |
| {#job{name = DbName, node = Node} = Job, StillWaiting} -> |
| NewPid = start_push_replication(Job), |
| State#state{ |
| active = [Job#job{pid = NewPid} | Active1], |
| count = Count + 1, |
| dict = dict:erase({DbName, Node}, D), |
| waiting = StillWaiting |
| } |
| end; |
| true -> |
| State#state{active = Active1, count = Count} |
| end, |
| {noreply, NewState} |
| end. |
| |
| start_push_replication(#job{name = Name, node = Node, pid = From}) -> |
| if |
| From =/= nil -> gen_server:reply(From, ok); |
| true -> ok |
| end, |
| spawn_link(fun() -> |
| BatchSize = config:get_integer("mem3", "sync_batch_size", ?DEFAULT_BATCH_SIZE), |
| BatchCount = config:get_integer("mem3", "sync_batch_count", ?DEFAULT_BATCH_COUNT), |
| Opts = [{batch_size, BatchSize}, {batch_count, BatchCount}], |
| case mem3_rep:go(Name, maybe_redirect(Node), Opts) of |
| {ok, Pending} when Pending > 0 -> |
| exit({pending_changes, Pending}); |
| _ -> |
| ok |
| end |
| end). |
| |
| add_to_queue(State, #job{name = DbName, node = Node, pid = From} = Job) -> |
| #state{dict = D, waiting = WQ} = State, |
| case dict:is_key({DbName, Node}, D) of |
| true -> |
| if |
| From =/= nil -> gen_server:reply(From, ok); |
| true -> ok |
| end, |
| State; |
| false -> |
| couch_log:debug("adding ~s -> ~p to mem3_sync queue", [DbName, Node]), |
| State#state{ |
| dict = dict:store({DbName, Node}, ok, D), |
| waiting = in(Job, WQ) |
| } |
| end. |
| |
| sync_nodes_and_dbs() -> |
| Node = find_next_node(), |
| [push(Db, Node) || Db <- local_dbs()]. |
| |
| initial_sync() -> |
| mem3_sync_nodes:add(nodes()). |
| |
| initial_sync(Live) -> |
| sync_nodes_and_dbs(), |
| Acc = {node(), Live, []}, |
| {_, _, Shards} = mem3_shards:fold(fun initial_sync_fold/2, Acc), |
| submit_replication_tasks(node(), Live, Shards). |
| |
| initial_sync_fold(#shard{dbname = Db} = Shard, {LocalNode, Live, AccShards}) -> |
| case AccShards of |
| [#shard{dbname = AccDb} | _] when Db =/= AccDb -> |
| submit_replication_tasks(LocalNode, Live, AccShards), |
| {LocalNode, Live, [Shard]}; |
| _ -> |
| {LocalNode, Live, [Shard | AccShards]} |
| end. |
| |
| submit_replication_tasks(LocalNode, Live, Shards) -> |
| SplitFun = fun(#shard{node = Node}) -> Node =:= LocalNode end, |
| {Local, Remote} = lists:partition(SplitFun, Shards), |
| lists:foreach( |
| fun(#shard{name = ShardName}) -> |
| [ |
| sync_push(ShardName, N) |
| || #shard{node = N, name = Name} <- Remote, |
| Name =:= ShardName, |
| lists:member(N, Live) |
| ] |
| end, |
| Local |
| ). |
| |
| sync_push(ShardName, N) -> |
| gen_server:call(mem3_sync, {push, #job{name = ShardName, node = N}}, infinity). |
| |
| find_next_node() -> |
| LiveNodes = [node() | nodes()], |
| AllNodes0 = lists:sort(mem3:nodes()), |
| AllNodes1 = [X || X <- AllNodes0, lists:member(X, LiveNodes)], |
| AllNodes = AllNodes1 ++ [hd(AllNodes1)], |
| [_Self, Next | _] = lists:dropwhile(fun(N) -> N =/= node() end, AllNodes), |
| Next. |
| |
| %% @doc Finds the next {DbName,Node} pair in the list of waiting replications |
| %% which does not correspond to an already running replication |
| -spec next_replication([#job{}], queue:queue(_), queue:queue(_)) -> |
| {#job{}, queue:queue(_)} | nil. |
| next_replication(Active, Waiting, WaitingAndRunning) -> |
| case is_empty(Waiting) of |
| true -> |
| nil; |
| false -> |
| {{value, #job{name = S, node = N} = Job}, RemQ} = out(Waiting), |
| case is_running(S, N, Active) of |
| true -> |
| next_replication(Active, RemQ, in(Job, WaitingAndRunning)); |
| false -> |
| {Job, join(RemQ, WaitingAndRunning)} |
| end |
| end. |
| |
| is_running(DbName, Node, ActiveList) -> |
| [] =/= [true || #job{name = S, node = N} <- ActiveList, S =:= DbName, N =:= Node]. |
| |
| remove_entries(Dict, Entries) -> |
| lists:foldl( |
| fun(#job{name = S, node = N}, D) -> |
| dict:erase({S, N}, D) |
| end, |
| Dict, |
| Entries |
| ). |
| |
| local_dbs() -> |
| UsersDb = users_db(), |
| % users db might not have been created so don't include it unless it exists |
| case couch_server:exists(UsersDb) of |
| true -> [nodes_db(), shards_db(), UsersDb]; |
| false -> [nodes_db(), shards_db()] |
| end. |
| |
| nodes_db() -> |
| ?l2b(config:get("mem3", "nodes_db", "_nodes")). |
| |
| shards_db() -> |
| ?l2b(config:get("mem3", "shards_db", "_dbs")). |
| |
| users_db() -> |
| ?l2b(config:get("couch_httpd_auth", "authentication_db", "_users")). |
| |
| maybe_redirect(Node) -> |
| case config:get("mem3.redirects", atom_to_list(Node)) of |
| undefined -> |
| Node; |
| Redirect -> |
| couch_log:debug("Redirecting push from ~p to ~p", [Node, Redirect]), |
| list_to_existing_atom(Redirect) |
| end. |