| % Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| % use this file except in compliance with the License. You may obtain a copy of |
| % the License at |
| % |
| % http://www.apache.org/licenses/LICENSE-2.0 |
| % |
| % Unless required by applicable law or agreed to in writing, software |
| % distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| % WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| % License for the specific language governing permissions and limitations under |
| % the License. |
| |
| -module(couch_replicator_scheduler). |
| |
| -behaviour(gen_server). |
| -behaviour(config_listener). |
| |
| -export([ |
| start_link/0 |
| ]). |
| |
| -export([ |
| init/1, |
| terminate/2, |
| handle_call/3, |
| handle_info/2, |
| handle_cast/2, |
| code_change/3, |
| format_status/2 |
| ]). |
| |
| -export([ |
| add_job/1, |
| remove_job/1, |
| reschedule/0, |
| rep_state/1, |
| find_jobs_by_dbname/1, |
| find_jobs_by_doc/2, |
| job_summary/2, |
| health_threshold/0, |
| jobs/0, |
| job/1, |
| restart_job/1, |
| update_job_stats/2 |
| ]). |
| |
| %% config_listener callbacks |
| -export([ |
| handle_config_change/5, |
| handle_config_terminate/3 |
| ]). |
| |
| %% for status updater process to allow hot code loading |
| -export([ |
| stats_updater_loop/1 |
| ]). |
| |
| -include("couch_replicator_scheduler.hrl"). |
| -include("couch_replicator.hrl"). |
| -include_lib("couch_replicator/include/couch_replicator_api_wrap.hrl"). |
| -include_lib("couch/include/couch_db.hrl"). |
| |
| %% types |
| -type event_type() :: added | started | stopped | {crashed, any()}. |
| -type event() :: {Type:: event_type(), When :: erlang:timestamp()}. |
| -type history() :: nonempty_list(event()). |
| |
| %% definitions |
| -define(MAX_BACKOFF_EXPONENT, 10). |
| -define(BACKOFF_INTERVAL_MICROS, 30 * 1000 * 1000). |
| -define(DEFAULT_HEALTH_THRESHOLD_SEC, 2 * 60). |
| -define(RELISTEN_DELAY, 5000). |
| -define(STATS_UPDATE_WAIT, 5000). |
| |
| -define(DEFAULT_MAX_JOBS, 500). |
| -define(DEFAULT_MAX_CHURN, 20). |
| -define(DEFAULT_MAX_HISTORY, 20). |
| -define(DEFAULT_SCHEDULER_INTERVAL, 60000). |
| |
| |
| -record(state, {interval, timer, max_jobs, max_churn, max_history, stats_pid}). |
| -record(job, { |
| id :: job_id() | '$1' | '_', |
| rep :: #rep{} | '_', |
| pid :: undefined | pid() | '$1' | '_', |
| monitor :: undefined | reference() | '_', |
| history :: history() | '_' |
| }). |
| |
| -record(stats_acc, { |
| pending_n = 0 :: non_neg_integer(), |
| running_n = 0 :: non_neg_integer(), |
| crashed_n = 0 :: non_neg_integer() |
| }). |
| |
| |
| %% public functions |
| |
| -spec start_link() -> {ok, pid()} | ignore | {error, term()}. |
| start_link() -> |
| gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). |
| |
| |
| -spec add_job(#rep{}) -> ok. |
| add_job(#rep{} = Rep) when Rep#rep.id /= undefined -> |
| case existing_replication(Rep) of |
| false -> |
| Job = #job{ |
| id = Rep#rep.id, |
| rep = Rep, |
| history = [{added, os:timestamp()}] |
| }, |
| gen_server:call(?MODULE, {add_job, Job}, infinity); |
| true -> |
| ok |
| end. |
| |
| |
| -spec remove_job(job_id()) -> ok. |
| remove_job(Id) -> |
| gen_server:call(?MODULE, {remove_job, Id}, infinity). |
| |
| |
| -spec reschedule() -> ok. |
| % Trigger a manual reschedule. Used for testing and/or ops. |
| reschedule() -> |
| gen_server:call(?MODULE, reschedule, infinity). |
| |
| |
| -spec rep_state(rep_id()) -> #rep{} | nil. |
| rep_state(RepId) -> |
| case (catch ets:lookup_element(?MODULE, RepId, #job.rep)) of |
| {'EXIT',{badarg, _}} -> |
| nil; |
| Rep -> |
| Rep |
| end. |
| |
| |
| -spec job_summary(job_id(), non_neg_integer()) -> [_] | nil. |
| job_summary(JobId, HealthThreshold) -> |
| case job_by_id(JobId) of |
| {ok, #job{pid = Pid, history = History, rep = Rep}} -> |
| ErrorCount = consecutive_crashes(History, HealthThreshold), |
| {State, Info} = case {Pid, ErrorCount} of |
| {undefined, 0} -> |
| case History of |
| [{{crashed, Error}, _When} | _] -> |
| {crashing, crash_reason_json(Error)}; |
| [_ | _] -> |
| {pending, Rep#rep.stats} |
| end; |
| {undefined, ErrorCount} when ErrorCount > 0 -> |
| [{{crashed, Error}, _When} | _] = History, |
| {crashing, crash_reason_json(Error)}; |
| {Pid, ErrorCount} when is_pid(Pid) -> |
| {running, Rep#rep.stats} |
| end, |
| [ |
| {source, iolist_to_binary(ejson_url(Rep#rep.source))}, |
| {target, iolist_to_binary(ejson_url(Rep#rep.target))}, |
| {state, State}, |
| {info, couch_replicator_utils:ejson_state_info(Info)}, |
| {error_count, ErrorCount}, |
| {last_updated, last_updated(History)}, |
| {start_time, |
| couch_replicator_utils:iso8601(Rep#rep.start_time)}, |
| {source_proxy, job_proxy_url(Rep#rep.source)}, |
| {target_proxy, job_proxy_url(Rep#rep.target)} |
| ]; |
| {error, not_found} -> |
| nil % Job might have just completed |
| end. |
| |
| |
| job_proxy_url(#httpdb{proxy_url = ProxyUrl}) when is_list(ProxyUrl) -> |
| list_to_binary(couch_util:url_strip_password(ProxyUrl)); |
| job_proxy_url(_Endpoint) -> |
| null. |
| |
| |
| % Health threshold is the minimum amount of time an unhealthy job should run |
| % crashing before it is considered to be healthy again. HealtThreashold should |
| % not be 0 as jobs could start and immediately crash, and it shouldn't be |
| % infinity, since then consecutive crashes would accumulate forever even if |
| % job is back to normal. |
| -spec health_threshold() -> non_neg_integer(). |
| health_threshold() -> |
| config:get_integer("replicator", "health_threshold", |
| ?DEFAULT_HEALTH_THRESHOLD_SEC). |
| |
| |
| -spec find_jobs_by_dbname(binary()) -> list(#rep{}). |
| find_jobs_by_dbname(DbName) -> |
| Rep = #rep{db_name = DbName, _ = '_'}, |
| MatchSpec = #job{id = '$1', rep = Rep, _ = '_'}, |
| [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)]. |
| |
| |
| -spec find_jobs_by_doc(binary(), binary()) -> list(#rep{}). |
| find_jobs_by_doc(DbName, DocId) -> |
| Rep = #rep{db_name = DbName, doc_id = DocId, _ = '_'}, |
| MatchSpec = #job{id = '$1', rep = Rep, _ = '_'}, |
| [RepId || [RepId] <- ets:match(?MODULE, MatchSpec)]. |
| |
| |
| -spec restart_job(binary() | list() | rep_id()) -> |
| {ok, {[_]}} | {error, not_found}. |
| restart_job(JobId) -> |
| case rep_state(JobId) of |
| nil -> |
| {error, not_found}; |
| #rep{} = Rep -> |
| ok = remove_job(JobId), |
| ok = add_job(Rep), |
| job(JobId) |
| end. |
| |
| |
| -spec update_job_stats(job_id(), term()) -> ok. |
| update_job_stats(JobId, Stats) -> |
| gen_server:cast(?MODULE, {update_job_stats, JobId, Stats}). |
| |
| |
| %% gen_server functions |
| |
| init(_) -> |
| config:enable_feature('scheduler'), |
| EtsOpts = [named_table, {keypos, #job.id}, {read_concurrency, true}, |
| {write_concurrency, true}], |
| ?MODULE = ets:new(?MODULE, EtsOpts), |
| ok = config:listen_for_changes(?MODULE, nil), |
| Interval = config:get_integer("replicator", "interval", |
| ?DEFAULT_SCHEDULER_INTERVAL), |
| MaxJobs = config:get_integer("replicator", "max_jobs", ?DEFAULT_MAX_JOBS), |
| MaxChurn = config:get_integer("replicator", "max_churn", |
| ?DEFAULT_MAX_CHURN), |
| MaxHistory = config:get_integer("replicator", "max_history", |
| ?DEFAULT_MAX_HISTORY), |
| Timer = erlang:send_after(Interval, self(), reschedule), |
| State = #state{ |
| interval = Interval, |
| max_jobs = MaxJobs, |
| max_churn = MaxChurn, |
| max_history = MaxHistory, |
| timer = Timer, |
| stats_pid = start_stats_updater() |
| }, |
| {ok, State}. |
| |
| |
| handle_call({add_job, Job}, _From, State) -> |
| ok = maybe_remove_job_int(Job#job.id, State), |
| true = add_job_int(Job), |
| ok = maybe_start_newly_added_job(Job, State), |
| couch_stats:increment_counter([couch_replicator, jobs, adds]), |
| TotalJobs = ets:info(?MODULE, size), |
| couch_stats:update_gauge([couch_replicator, jobs, total], TotalJobs), |
| {reply, ok, State}; |
| |
| handle_call({remove_job, Id}, _From, State) -> |
| ok = maybe_remove_job_int(Id, State), |
| {reply, ok, State}; |
| |
| handle_call(reschedule, _From, State) -> |
| ok = reschedule(State), |
| {reply, ok, State}; |
| |
| handle_call(_, _From, State) -> |
| {noreply, State}. |
| |
| |
| handle_cast({set_max_jobs, MaxJobs}, State) when is_integer(MaxJobs), |
| MaxJobs >= 0 -> |
| couch_log:notice("~p: max_jobs set to ~B", [?MODULE, MaxJobs]), |
| {noreply, State#state{max_jobs = MaxJobs}}; |
| |
| handle_cast({set_max_churn, MaxChurn}, State) when is_integer(MaxChurn), |
| MaxChurn > 0 -> |
| couch_log:notice("~p: max_churn set to ~B", [?MODULE, MaxChurn]), |
| {noreply, State#state{max_churn = MaxChurn}}; |
| |
| handle_cast({set_max_history, MaxHistory}, State) when is_integer(MaxHistory), |
| MaxHistory > 0 -> |
| couch_log:notice("~p: max_history set to ~B", [?MODULE, MaxHistory]), |
| {noreply, State#state{max_history = MaxHistory}}; |
| |
| handle_cast({set_interval, Interval}, State) when is_integer(Interval), |
| Interval > 0 -> |
| couch_log:notice("~p: interval set to ~B", [?MODULE, Interval]), |
| {noreply, State#state{interval = Interval}}; |
| |
| handle_cast({update_job_stats, JobId, Stats}, State) -> |
| case rep_state(JobId) of |
| nil -> |
| ok; |
| #rep{} = Rep -> |
| NewRep = Rep#rep{stats = Stats}, |
| true = ets:update_element(?MODULE, JobId, {#job.rep, NewRep}) |
| end, |
| {noreply, State}; |
| |
| handle_cast(UnexpectedMsg, State) -> |
| couch_log:error("~p: received un-expected cast ~p", [?MODULE, UnexpectedMsg]), |
| {noreply, State}. |
| |
| |
| handle_info(reschedule, State) -> |
| ok = reschedule(State), |
| erlang:cancel_timer(State#state.timer), |
| Timer = erlang:send_after(State#state.interval, self(), reschedule), |
| {noreply, State#state{timer = Timer}}; |
| |
| handle_info({'DOWN', _Ref, process, Pid, normal}, State) -> |
| {ok, Job} = job_by_pid(Pid), |
| couch_log:notice("~p: Job ~p completed normally", [?MODULE, Job#job.id]), |
| remove_job_int(Job), |
| update_running_jobs_stats(State#state.stats_pid), |
| {noreply, State}; |
| |
| handle_info({'DOWN', _Ref, process, Pid, Reason}, State) -> |
| {ok, Job} = job_by_pid(Pid), |
| ok = handle_crashed_job(Job, Reason, State), |
| {noreply, State}; |
| |
| handle_info(restart_config_listener, State) -> |
| ok = config:listen_for_changes(?MODULE, nil), |
| {noreply, State}; |
| |
| handle_info(_, State) -> |
| {noreply, State}. |
| |
| |
| code_change(_OldVsn, State, _Extra) -> |
| {ok, State}. |
| |
| |
| terminate(_Reason, _State) -> |
| ok. |
| |
| |
| format_status(_Opt, [_PDict, State]) -> |
| [ |
| {max_jobs, State#state.max_jobs}, |
| {running_jobs, running_job_count()}, |
| {pending_jobs, pending_job_count()} |
| ]. |
| |
| |
| %% config listener functions |
| |
| handle_config_change("replicator", "max_jobs", V, _, S) -> |
| ok = gen_server:cast(?MODULE, {set_max_jobs, list_to_integer(V)}), |
| {ok, S}; |
| |
| handle_config_change("replicator", "max_churn", V, _, S) -> |
| ok = gen_server:cast(?MODULE, {set_max_churn, list_to_integer(V)}), |
| {ok, S}; |
| |
| handle_config_change("replicator", "interval", V, _, S) -> |
| ok = gen_server:cast(?MODULE, {set_interval, list_to_integer(V)}), |
| {ok, S}; |
| |
| handle_config_change("replicator", "max_history", V, _, S) -> |
| ok = gen_server:cast(?MODULE, {set_max_history, list_to_integer(V)}), |
| {ok, S}; |
| |
| handle_config_change(_, _, _, _, S) -> |
| {ok, S}. |
| |
| |
| handle_config_terminate(_, stop, _) -> |
| ok; |
| |
| handle_config_terminate(_, _, _) -> |
| Pid = whereis(?MODULE), |
| erlang:send_after(?RELISTEN_DELAY, Pid, restart_config_listener). |
| |
| |
| %% Private functions |
| |
| % Handle crashed jobs. Handling differs between transient and permanent jobs. |
| % Transient jobs are those posted to the _replicate endpoint. They don't have a |
| % db associated with them. When those jobs crash, they are not restarted. That |
| % is also consistent with behavior when the node they run on, crashed and they |
| % do not migrate to other nodes. Permanent jobs are those created from |
| % replicator documents. Those jobs, once they pass basic validation and end up |
| % in the scheduler will be retried indefinitely (with appropriate exponential |
| % backoffs). |
| -spec handle_crashed_job(#job{}, any(), #state{}) -> ok. |
| handle_crashed_job(#job{rep = #rep{db_name = null}} = Job, Reason, State) -> |
| Msg = "~p : Transient job ~p failed, removing. Error: ~p", |
| ErrorBinary = couch_replicator_utils:rep_error_to_binary(Reason), |
| couch_log:error(Msg, [?MODULE, Job#job.id, ErrorBinary]), |
| remove_job_int(Job), |
| update_running_jobs_stats(State#state.stats_pid), |
| ok; |
| |
| handle_crashed_job(Job, Reason, State) -> |
| ok = update_state_crashed(Job, Reason, State), |
| case couch_replicator_doc_processor:update_docs() of |
| true -> |
| couch_replicator_docs:update_error(Job#job.rep, Reason); |
| false -> |
| ok |
| end, |
| case ets:info(?MODULE, size) < State#state.max_jobs of |
| true -> |
| % Starting pending jobs is an O(TotalJobsCount) operation. Only do |
| % it if there is a relatively small number of jobs. Otherwise |
| % scheduler could be blocked if there is a cascade of lots failing |
| % jobs in a row. |
| start_pending_jobs(State), |
| update_running_jobs_stats(State#state.stats_pid), |
| ok; |
| false -> |
| ok |
| end. |
| |
| |
| % Attempt to start a newly added job. First quickly check if total jobs |
| % already exceed max jobs, then do a more expensive check which runs a |
| % select (an O(n) operation) to check pending jobs specifically. |
| -spec maybe_start_newly_added_job(#job{}, #state{}) -> ok. |
| maybe_start_newly_added_job(Job, State) -> |
| MaxJobs = State#state.max_jobs, |
| TotalJobs = ets:info(?MODULE, size), |
| case TotalJobs < MaxJobs andalso running_job_count() < MaxJobs of |
| true -> |
| start_job_int(Job, State), |
| update_running_jobs_stats(State#state.stats_pid), |
| ok; |
| false -> |
| ok |
| end. |
| |
| |
| % Return up to a given number of oldest, not recently crashed jobs. Try to be |
| % memory efficient and use ets:foldl to accumulate jobs. |
| -spec pending_jobs(non_neg_integer()) -> [#job{}]. |
| pending_jobs(0) -> |
| % Handle this case as user could set max_churn to 0. If this is passed to |
| % other function clause it will crash as gb_sets:largest assumes set is not |
| % empty. |
| []; |
| |
| pending_jobs(Count) when is_integer(Count), Count > 0 -> |
| Set0 = gb_sets:new(), % [{LastStart, Job},...] |
| Now = os:timestamp(), |
| Acc0 = {Set0, Now, Count, health_threshold()}, |
| {Set1, _, _, _} = ets:foldl(fun pending_fold/2, Acc0, ?MODULE), |
| [Job || {_Started, Job} <- gb_sets:to_list(Set1)]. |
| |
| |
| pending_fold(Job, {Set, Now, Count, HealthThreshold}) -> |
| Set1 = case {not_recently_crashed(Job, Now, HealthThreshold), |
| gb_sets:size(Set) >= Count} of |
| {true, true} -> |
| % Job is healthy but already reached accumulated limit, so might |
| % have to replace one of the accumulated jobs |
| pending_maybe_replace(Job, Set); |
| {true, false} -> |
| % Job is healthy and we haven't reached the limit, so add job |
| % to accumulator |
| gb_sets:add_element({last_started(Job), Job}, Set); |
| {false, _} -> |
| % This job is not healthy (has crashed too recently), so skip it. |
| Set |
| end, |
| {Set1, Now, Count, HealthThreshold}. |
| |
| |
| % Replace Job in the accumulator if it is older than youngest job there. |
| % "oldest" here means one which has been waiting to run the longest. "youngest" |
| % means the one with most recent activity. The goal is to keep up to Count |
| % oldest jobs during iteration. For example if there are jobs with these times |
| % accumulated so far [5, 7, 11], and start time of current job is 6. Then |
| % 6 < 11 is true, so 11 (youngest) is dropped and 6 inserted resulting in |
| % [5, 6, 7]. In the end the result might look like [1, 2, 5], for example. |
| pending_maybe_replace(Job, Set) -> |
| Started = last_started(Job), |
| {Youngest, YoungestJob} = gb_sets:largest(Set), |
| case Started < Youngest of |
| true -> |
| Set1 = gb_sets:delete({Youngest, YoungestJob}, Set), |
| gb_sets:add_element({Started, Job}, Set1); |
| false -> |
| Set |
| end. |
| |
| |
| start_jobs(Count, State) -> |
| [start_job_int(Job, State) || Job <- pending_jobs(Count)], |
| ok. |
| |
| |
| -spec stop_jobs(non_neg_integer(), boolean(), #state{}) -> non_neg_integer(). |
| stop_jobs(Count, _, _) when is_integer(Count), Count =< 0 -> |
| 0; |
| |
| stop_jobs(Count, IsContinuous, State) when is_integer(Count) -> |
| Running0 = running_jobs(), |
| ContinuousPred = fun(Job) -> is_continuous(Job) =:= IsContinuous end, |
| Running1 = lists:filter(ContinuousPred, Running0), |
| Running2 = lists:sort(fun longest_running/2, Running1), |
| Running3 = lists:sublist(Running2, Count), |
| length([stop_job_int(Job, State) || Job <- Running3]). |
| |
| |
| longest_running(#job{} = A, #job{} = B) -> |
| last_started(A) =< last_started(B). |
| |
| |
| not_recently_crashed(#job{history = History}, Now, HealthThreshold) -> |
| case History of |
| [{added, _When}] -> |
| true; |
| [{stopped, _When} | _] -> |
| true; |
| _ -> |
| LatestCrashT = latest_crash_timestamp(History), |
| CrashCount = consecutive_crashes(History, HealthThreshold), |
| timer:now_diff(Now, LatestCrashT) >= backoff_micros(CrashCount) |
| end. |
| |
| |
| % Count consecutive crashes. A crash happens when there is a `crashed` event |
| % within a short period of time (configurable) after any other event. It could |
| % be `crashed, started` for jobs crashing quickly after starting, `crashed, |
| % crashed`, `crashed, stopped` if job repeatedly failed to start |
| % being stopped. Or it could be `crashed, added` if it crashed immediately after |
| % being added during start. |
| % |
| % A streak of "consecutive crashes" ends when a crashed event is seen starting |
| % and running successfully without crashing for a period of time. That period |
| % of time is the HealthThreshold. |
| % |
| |
| -spec consecutive_crashes(history(), non_neg_integer()) -> non_neg_integer(). |
| consecutive_crashes(History, HealthThreshold) when is_list(History) -> |
| consecutive_crashes(History, HealthThreshold, 0). |
| |
| |
| -spec consecutive_crashes(history(), non_neg_integer(), non_neg_integer()) -> |
| non_neg_integer(). |
| consecutive_crashes([], _HealthThreashold, Count) -> |
| Count; |
| |
| consecutive_crashes([{{crashed, _}, CrashT}, {_, PrevT} = PrevEvent | Rest], |
| HealthThreshold, Count) -> |
| case timer:now_diff(CrashT, PrevT) > HealthThreshold * 1000000 of |
| true -> |
| Count; |
| false -> |
| consecutive_crashes([PrevEvent | Rest], HealthThreshold, Count + 1) |
| end; |
| |
| consecutive_crashes([{stopped, _}, {started, _} | _], _HealthThreshold, |
| Count) -> |
| Count; |
| |
| consecutive_crashes([_ | Rest], HealthThreshold, Count) -> |
| consecutive_crashes(Rest, HealthThreshold, Count). |
| |
| |
| -spec latest_crash_timestamp(history()) -> erlang:timestamp(). |
| latest_crash_timestamp([]) -> |
| {0, 0, 0}; % Used to avoid special-casing "no crash" when doing now_diff |
| |
| latest_crash_timestamp([{{crashed, _Reason}, When} | _]) -> |
| When; |
| |
| latest_crash_timestamp([_Event | Rest]) -> |
| latest_crash_timestamp(Rest). |
| |
| |
| -spec backoff_micros(non_neg_integer()) -> non_neg_integer(). |
| backoff_micros(CrashCount) -> |
| % When calculating the backoff interval treat consecutive crash count as the |
| % exponent in Base * 2 ^ CrashCount to achieve an exponential backoff |
| % doubling every consecutive failure, starting with the base value of |
| % ?BACKOFF_INTERVAL_MICROS. |
| BackoffExp = erlang:min(CrashCount - 1, ?MAX_BACKOFF_EXPONENT), |
| (1 bsl BackoffExp) * ?BACKOFF_INTERVAL_MICROS. |
| |
| |
| -spec add_job_int(#job{}) -> boolean(). |
| add_job_int(#job{} = Job) -> |
| ets:insert_new(?MODULE, Job). |
| |
| |
| -spec maybe_remove_job_int(job_id(), #state{}) -> ok. |
| maybe_remove_job_int(JobId, State) -> |
| case job_by_id(JobId) of |
| {ok, Job} -> |
| ok = stop_job_int(Job, State), |
| true = remove_job_int(Job), |
| couch_stats:increment_counter([couch_replicator, jobs, removes]), |
| TotalJobs = ets:info(?MODULE, size), |
| couch_stats:update_gauge([couch_replicator, jobs, total], |
| TotalJobs), |
| update_running_jobs_stats(State#state.stats_pid), |
| ok; |
| {error, not_found} -> |
| ok |
| end. |
| |
| |
| start_job_int(#job{pid = Pid}, _State) when Pid /= undefined -> |
| ok; |
| |
| start_job_int(#job{} = Job0, State) -> |
| Job = maybe_optimize_job_for_rate_limiting(Job0), |
| case couch_replicator_scheduler_sup:start_child(Job#job.rep) of |
| {ok, Child} -> |
| Ref = monitor(process, Child), |
| ok = update_state_started(Job, Child, Ref, State), |
| couch_log:notice("~p: Job ~p started as ~p", |
| [?MODULE, Job#job.id, Child]); |
| {error, {already_started, OtherPid}} when node(OtherPid) =:= node() -> |
| Ref = monitor(process, OtherPid), |
| ok = update_state_started(Job, OtherPid, Ref, State), |
| couch_log:notice("~p: Job ~p already running as ~p. Most likely" |
| " because replicator scheduler was restarted", |
| [?MODULE, Job#job.id, OtherPid]); |
| {error, {already_started, OtherPid}} when node(OtherPid) =/= node() -> |
| CrashMsg = "Duplicate replication running on another node", |
| couch_log:notice("~p: Job ~p already running as ~p. Most likely" |
| " because a duplicate replication is running on another node", |
| [?MODULE, Job#job.id, OtherPid]), |
| ok = update_state_crashed(Job, CrashMsg, State); |
| {error, Reason} -> |
| couch_log:notice("~p: Job ~p failed to start for reason ~p", |
| [?MODULE, Job, Reason]), |
| ok = update_state_crashed(Job, Reason, State) |
| end. |
| |
| |
| -spec stop_job_int(#job{}, #state{}) -> ok | {error, term()}. |
| stop_job_int(#job{pid = undefined}, _State) -> |
| ok; |
| |
| stop_job_int(#job{} = Job, State) -> |
| ok = couch_replicator_scheduler_sup:terminate_child(Job#job.pid), |
| demonitor(Job#job.monitor, [flush]), |
| ok = update_state_stopped(Job, State), |
| couch_log:notice("~p: Job ~p stopped as ~p", |
| [?MODULE, Job#job.id, Job#job.pid]). |
| |
| |
| -spec remove_job_int(#job{}) -> true. |
| remove_job_int(#job{} = Job) -> |
| ets:delete(?MODULE, Job#job.id). |
| |
| |
| -spec running_job_count() -> non_neg_integer(). |
| running_job_count() -> |
| ets:info(?MODULE, size) - pending_job_count(). |
| |
| |
| -spec running_jobs() -> [#job{}]. |
| running_jobs() -> |
| ets:select(?MODULE, [{#job{pid = '$1', _='_'}, [{is_pid, '$1'}], ['$_']}]). |
| |
| |
| -spec pending_job_count() -> non_neg_integer(). |
| pending_job_count() -> |
| ets:select_count(?MODULE, [{#job{pid=undefined, _='_'}, [], [true]}]). |
| |
| |
| -spec job_by_pid(pid()) -> {ok, #job{}} | {error, not_found}. |
| job_by_pid(Pid) when is_pid(Pid) -> |
| case ets:match_object(?MODULE, #job{pid=Pid, _='_'}) of |
| [] -> |
| {error, not_found}; |
| [#job{}=Job] -> |
| {ok, Job} |
| end. |
| |
| |
| -spec job_by_id(job_id()) -> {ok, #job{}} | {error, not_found}. |
| job_by_id(Id) -> |
| case ets:lookup(?MODULE, Id) of |
| [] -> |
| {error, not_found}; |
| [#job{}=Job] -> |
| {ok, Job} |
| end. |
| |
| |
| -spec update_state_stopped(#job{}, #state{}) -> ok. |
| update_state_stopped(Job, State) -> |
| Job1 = reset_job_process(Job), |
| Job2 = update_history(Job1, stopped, os:timestamp(), State), |
| true = ets:insert(?MODULE, Job2), |
| couch_stats:increment_counter([couch_replicator, jobs, stops]), |
| ok. |
| |
| |
| -spec update_state_started(#job{}, pid(), reference(), #state{}) -> ok. |
| update_state_started(Job, Pid, Ref, State) -> |
| Job1 = set_job_process(Job, Pid, Ref), |
| Job2 = update_history(Job1, started, os:timestamp(), State), |
| true = ets:insert(?MODULE, Job2), |
| couch_stats:increment_counter([couch_replicator, jobs, starts]), |
| ok. |
| |
| |
| -spec update_state_crashed(#job{}, any(), #state{}) -> ok. |
| update_state_crashed(Job, Reason, State) -> |
| Job1 = reset_job_process(Job), |
| Job2 = update_history(Job1, {crashed, Reason}, os:timestamp(), State), |
| true = ets:insert(?MODULE, Job2), |
| couch_stats:increment_counter([couch_replicator, jobs, crashes]), |
| ok. |
| |
| |
| -spec set_job_process(#job{}, pid(), reference()) -> #job{}. |
| set_job_process(#job{} = Job, Pid, Ref) when is_pid(Pid), is_reference(Ref) -> |
| Job#job{pid = Pid, monitor = Ref}. |
| |
| |
| -spec reset_job_process(#job{}) -> #job{}. |
| reset_job_process(#job{} = Job) -> |
| Job#job{pid = undefined, monitor = undefined}. |
| |
| |
| -spec reschedule(#state{}) -> ok. |
| reschedule(State) -> |
| StopCount = stop_excess_jobs(State, running_job_count()), |
| rotate_jobs(State, StopCount), |
| update_running_jobs_stats(State#state.stats_pid). |
| |
| |
| -spec stop_excess_jobs(#state{}, non_neg_integer()) -> non_neg_integer(). |
| stop_excess_jobs(State, Running) -> |
| #state{max_jobs=MaxJobs} = State, |
| StopCount = max(0, Running - MaxJobs), |
| Stopped = stop_jobs(StopCount, true, State), |
| OneshotLeft = StopCount - Stopped, |
| stop_jobs(OneshotLeft, false, State), |
| StopCount. |
| |
| |
| start_pending_jobs(State) -> |
| #state{max_jobs=MaxJobs} = State, |
| Running = running_job_count(), |
| Pending = pending_job_count(), |
| if Running < MaxJobs, Pending > 0 -> |
| start_jobs(MaxJobs - Running, State); |
| true -> |
| ok |
| end. |
| |
| |
| -spec rotate_jobs(#state{}, non_neg_integer()) -> ok. |
| rotate_jobs(State, ChurnSoFar) -> |
| #state{max_jobs=MaxJobs, max_churn=MaxChurn} = State, |
| Running = running_job_count(), |
| Pending = pending_job_count(), |
| % Reduce MaxChurn by the number of already stopped jobs in the |
| % current rescheduling cycle. |
| Churn = max(0, MaxChurn - ChurnSoFar), |
| SlotsAvailable = MaxJobs - Running, |
| if SlotsAvailable >= 0 -> |
| % If there is are enough SlotsAvailable reduce StopCount to avoid |
| % unnesessarily stopping jobs. `stop_jobs/3` ignores 0 or negative |
| % values so we don't worry about that here. |
| StopCount = lists:min([Pending - SlotsAvailable, Running, Churn]), |
| stop_jobs(StopCount, true, State), |
| StartCount = max(0, MaxJobs - running_job_count()), |
| start_jobs(StartCount, State); |
| true -> |
| ok |
| end. |
| |
| |
| -spec last_started(#job{}) -> erlang:timestamp(). |
| last_started(#job{} = Job) -> |
| case lists:keyfind(started, 1, Job#job.history) of |
| false -> |
| {0, 0, 0}; |
| {started, When} -> |
| When |
| end. |
| |
| |
| -spec update_history(#job{}, event_type(), erlang:timestamp(), #state{}) -> |
| #job{}. |
| update_history(Job, Type, When, State) -> |
| History0 = [{Type, When} | Job#job.history], |
| History1 = lists:sublist(History0, State#state.max_history), |
| Job#job{history = History1}. |
| |
| |
| -spec ejson_url(#httpdb{} | binary()) -> binary(). |
| ejson_url(#httpdb{}=Httpdb) -> |
| couch_util:url_strip_password(Httpdb#httpdb.url); |
| ejson_url(DbName) when is_binary(DbName) -> |
| DbName. |
| |
| |
| -spec job_ejson(#job{}) -> {[_ | _]}. |
| job_ejson(Job) -> |
| Rep = Job#job.rep, |
| Source = ejson_url(Rep#rep.source), |
| Target = ejson_url(Rep#rep.target), |
| History = lists:map(fun({Type, When}) -> |
| EventProps = case Type of |
| {crashed, Reason} -> |
| [{type, crashed}, {reason, crash_reason_json(Reason)}]; |
| Type -> |
| [{type, Type}] |
| end, |
| {[{timestamp, couch_replicator_utils:iso8601(When)} | EventProps]} |
| end, Job#job.history), |
| {BaseID, Ext} = Job#job.id, |
| Pid = case Job#job.pid of |
| undefined -> |
| null; |
| P when is_pid(P) -> |
| ?l2b(pid_to_list(P)) |
| end, |
| {[ |
| {id, iolist_to_binary([BaseID, Ext])}, |
| {pid, Pid}, |
| {source, iolist_to_binary(Source)}, |
| {target, iolist_to_binary(Target)}, |
| {database, Rep#rep.db_name}, |
| {user, (Rep#rep.user_ctx)#user_ctx.name}, |
| {doc_id, Rep#rep.doc_id}, |
| {info, couch_replicator_utils:ejson_state_info(Rep#rep.stats)}, |
| {history, History}, |
| {node, node()}, |
| {start_time, couch_replicator_utils:iso8601(Rep#rep.start_time)} |
| ]}. |
| |
| |
| -spec jobs() -> [[tuple()]]. |
| jobs() -> |
| ets:foldl(fun(Job, Acc) -> [job_ejson(Job) | Acc] end, [], ?MODULE). |
| |
| |
| -spec job(job_id()) -> {ok, {[_ | _]}} | {error, not_found}. |
| job(JobId) -> |
| case job_by_id(JobId) of |
| {ok, Job} -> |
| {ok, job_ejson(Job)}; |
| Error -> |
| Error |
| end. |
| |
| |
| crash_reason_json({_CrashType, Info}) when is_binary(Info) -> |
| Info; |
| crash_reason_json(Reason) when is_binary(Reason) -> |
| Reason; |
| crash_reason_json(Error) -> |
| couch_replicator_utils:rep_error_to_binary(Error). |
| |
| |
| -spec last_updated([_]) -> binary(). |
| last_updated([{_Type, When} | _]) -> |
| couch_replicator_utils:iso8601(When). |
| |
| |
| -spec is_continuous(#job{}) -> boolean(). |
| is_continuous(#job{rep = Rep}) -> |
| couch_util:get_value(continuous, Rep#rep.options, false). |
| |
| |
| % If job crashed last time because it was rate limited, try to |
| % optimize some options to help the job make progress. |
| -spec maybe_optimize_job_for_rate_limiting(#job{}) -> #job{}. |
| maybe_optimize_job_for_rate_limiting(Job = #job{history = |
| [{{crashed, {shutdown, max_backoff}}, _} | _]}) -> |
| Opts = [ |
| {checkpoint_interval, 5000}, |
| {worker_processes, 2}, |
| {worker_batch_size, 100}, |
| {http_connections, 5} |
| ], |
| Rep = lists:foldl(fun optimize_int_option/2, Job#job.rep, Opts), |
| Job#job{rep = Rep}; |
| maybe_optimize_job_for_rate_limiting(Job) -> |
| Job. |
| |
| |
| -spec optimize_int_option({atom(), any()}, #rep{}) -> #rep{}. |
| optimize_int_option({Key, Val}, #rep{options = Options} = Rep) -> |
| case couch_util:get_value(Key, Options) of |
| CurVal when is_integer(CurVal), CurVal > Val -> |
| Msg = "~p replication ~p : setting ~p = ~p due to rate limiting", |
| couch_log:warning(Msg, [?MODULE, Rep#rep.id, Key, Val]), |
| Options1 = lists:keyreplace(Key, 1, Options, {Key, Val}), |
| Rep#rep{options = Options1}; |
| _ -> |
| Rep |
| end. |
| |
| |
| % Updater is a separate process. It receives `update_stats` messages and |
| % updates scheduler stats from the scheduler jobs table. Updates are |
| % performed no more frequently than once per ?STATS_UPDATE_WAIT milliseconds. |
| |
| update_running_jobs_stats(StatsPid) when is_pid(StatsPid) -> |
| StatsPid ! update_stats, |
| ok. |
| |
| |
| start_stats_updater() -> |
| erlang:spawn_link(?MODULE, stats_updater_loop, [undefined]). |
| |
| |
| stats_updater_loop(Timer) -> |
| receive |
| update_stats when Timer == undefined -> |
| TRef = erlang:send_after(?STATS_UPDATE_WAIT, self(), refresh_stats), |
| ?MODULE:stats_updater_loop(TRef); |
| update_stats when is_reference(Timer) -> |
| ?MODULE:stats_updater_loop(Timer); |
| refresh_stats -> |
| ok = stats_updater_refresh(), |
| ?MODULE:stats_updater_loop(undefined); |
| Else -> |
| erlang:exit({stats_updater_bad_msg, Else}) |
| end. |
| |
| |
| -spec stats_updater_refresh() -> ok. |
| stats_updater_refresh() -> |
| #stats_acc{ |
| pending_n = PendingN, |
| running_n = RunningN, |
| crashed_n = CrashedN |
| } = ets:foldl(fun stats_fold/2, #stats_acc{}, ?MODULE), |
| couch_stats:update_gauge([couch_replicator, jobs, pending], PendingN), |
| couch_stats:update_gauge([couch_replicator, jobs, running], RunningN), |
| couch_stats:update_gauge([couch_replicator, jobs, crashed], CrashedN), |
| ok. |
| |
| |
| -spec stats_fold(#job{}, #stats_acc{}) -> #stats_acc{}. |
| stats_fold(#job{pid = undefined, history = [{added, _}]}, Acc) -> |
| Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1}; |
| stats_fold(#job{pid = undefined, history = [{stopped, _} | _]}, Acc) -> |
| Acc#stats_acc{pending_n = Acc#stats_acc.pending_n + 1}; |
| stats_fold(#job{pid = undefined, history = [{{crashed, _}, _} | _]}, Acc) -> |
| Acc#stats_acc{crashed_n =Acc#stats_acc.crashed_n + 1}; |
| stats_fold(#job{pid = P, history = [{started, _} | _]}, Acc) when is_pid(P) -> |
| Acc#stats_acc{running_n = Acc#stats_acc.running_n + 1}. |
| |
| |
| -spec existing_replication(#rep{}) -> boolean(). |
| existing_replication(#rep{} = NewRep) -> |
| case job_by_id(NewRep#rep.id) of |
| {ok, #job{rep = CurRep}} -> |
| NormCurRep = couch_replicator_utils:normalize_rep(CurRep), |
| NormNewRep = couch_replicator_utils:normalize_rep(NewRep), |
| NormCurRep == NormNewRep; |
| {error, not_found} -> |
| false |
| end. |
| |
| |
| -ifdef(TEST). |
| |
| -include_lib("eunit/include/eunit.hrl"). |
| |
| |
| backoff_micros_test_() -> |
| BaseInterval = ?BACKOFF_INTERVAL_MICROS, |
| [?_assertEqual(R * BaseInterval, backoff_micros(N)) || {R, N} <- [ |
| {1, 1}, {2, 2}, {4, 3}, {8, 4}, {16, 5}, {32, 6}, {64, 7}, {128, 8}, |
| {256, 9}, {512, 10}, {1024, 11}, {1024, 12} |
| ]]. |
| |
| |
| consecutive_crashes_test_() -> |
| Threshold = ?DEFAULT_HEALTH_THRESHOLD_SEC, |
| [?_assertEqual(R, consecutive_crashes(H, Threshold)) || {R, H} <- [ |
| {0, []}, |
| {0, [added()]}, |
| {0, [stopped()]}, |
| {0, [crashed()]}, |
| {1, [crashed(), added()]}, |
| {1, [crashed(), crashed()]}, |
| {1, [crashed(), stopped()]}, |
| {3, [crashed(), crashed(), crashed(), added()]}, |
| {2, [crashed(), crashed(), stopped()]}, |
| {1, [crashed(), started(), added()]}, |
| {2, [crashed(3), started(2), crashed(1), started(0)]}, |
| {0, [stopped(3), started(2), crashed(1), started(0)]}, |
| {1, [crashed(3), started(2), stopped(1), started(0)]}, |
| {0, [crashed(999), started(0)]}, |
| {1, [crashed(999), started(998), crashed(997), started(0)]} |
| ]]. |
| |
| |
| consecutive_crashes_non_default_threshold_test_() -> |
| [?_assertEqual(R, consecutive_crashes(H, T)) || {R, H, T} <- [ |
| {0, [crashed(11), started(0)], 10}, |
| {1, [crashed(10), started(0)], 10} |
| ]]. |
| |
| |
| latest_crash_timestamp_test_() -> |
| [?_assertEqual({0, R, 0}, latest_crash_timestamp(H)) || {R, H} <- [ |
| {0, [added()]}, |
| {1, [crashed(1)]}, |
| {3, [crashed(3), started(2), crashed(1), started(0)]}, |
| {1, [started(3), stopped(2), crashed(1), started(0)]} |
| ]]. |
| |
| |
| last_started_test_() -> |
| [?_assertEqual({0, R, 0}, last_started(testjob(H))) || {R, H} <- [ |
| {0, [added()]}, |
| {0, [crashed(1)]}, |
| {1, [started(1)]}, |
| {1, [added(), started(1)]}, |
| {2, [started(2), started(1)]}, |
| {2, [crashed(3), started(2), started(1)]} |
| ]]. |
| |
| |
| longest_running_test() -> |
| J0 = testjob([crashed()]), |
| J1 = testjob([started(1)]), |
| J2 = testjob([started(2)]), |
| Sort = fun(Jobs) -> lists:sort(fun longest_running/2, Jobs) end, |
| ?assertEqual([], Sort([])), |
| ?assertEqual([J1], Sort([J1])), |
| ?assertEqual([J1, J2], Sort([J2, J1])), |
| ?assertEqual([J0, J1, J2], Sort([J2, J1, J0])). |
| |
| |
| scheduler_test_() -> |
| { |
| setup, |
| fun setup_all/0, |
| fun teardown_all/1, |
| { |
| foreach, |
| fun setup/0, |
| fun teardown/1, |
| [ |
| t_pending_jobs_simple(), |
| t_pending_jobs_skip_crashed(), |
| t_one_job_starts(), |
| t_no_jobs_start_if_max_is_0(), |
| t_one_job_starts_if_max_is_1(), |
| t_max_churn_does_not_throttle_initial_start(), |
| t_excess_oneshot_only_jobs(), |
| t_excess_continuous_only_jobs(), |
| t_excess_prefer_continuous_first(), |
| t_stop_oldest_first(), |
| t_start_oldest_first(), |
| t_jobs_churn_even_if_not_all_max_jobs_are_running(), |
| t_jobs_dont_churn_if_there_are_available_running_slots(), |
| t_start_only_pending_jobs_do_not_churn_existing_ones(), |
| t_dont_stop_if_nothing_pending(), |
| t_max_churn_limits_number_of_rotated_jobs(), |
| t_existing_jobs(), |
| t_if_pending_less_than_running_start_all_pending(), |
| t_running_less_than_pending_swap_all_running(), |
| t_oneshot_dont_get_rotated(), |
| t_rotate_continuous_only_if_mixed(), |
| t_oneshot_dont_get_starting_priority(), |
| t_oneshot_will_hog_the_scheduler(), |
| t_if_excess_is_trimmed_rotation_still_happens(), |
| t_if_transient_job_crashes_it_gets_removed(), |
| t_if_permanent_job_crashes_it_stays_in_ets(), |
| t_job_summary_running(), |
| t_job_summary_pending(), |
| t_job_summary_crashing_once(), |
| t_job_summary_crashing_many_times(), |
| t_job_summary_proxy_fields() |
| ] |
| } |
| }. |
| |
| |
| t_pending_jobs_simple() -> |
| ?_test(begin |
| Job1 = oneshot(1), |
| Job2 = oneshot(2), |
| setup_jobs([Job2, Job1]), |
| ?assertEqual([], pending_jobs(0)), |
| ?assertEqual([Job1], pending_jobs(1)), |
| ?assertEqual([Job1, Job2], pending_jobs(2)), |
| ?assertEqual([Job1, Job2], pending_jobs(3)) |
| end). |
| |
| |
| t_pending_jobs_skip_crashed() -> |
| ?_test(begin |
| Job = oneshot(1), |
| Ts = os:timestamp(), |
| History = [crashed(Ts), started(Ts) | Job#job.history], |
| Job1 = Job#job{history = History}, |
| Job2 = oneshot(2), |
| Job3 = oneshot(3), |
| setup_jobs([Job2, Job1, Job3]), |
| ?assertEqual([Job2], pending_jobs(1)), |
| ?assertEqual([Job2, Job3], pending_jobs(2)), |
| ?assertEqual([Job2, Job3], pending_jobs(3)) |
| end). |
| |
| |
| t_one_job_starts() -> |
| ?_test(begin |
| setup_jobs([oneshot(1)]), |
| ?assertEqual({0, 1}, run_stop_count()), |
| reschedule(mock_state(?DEFAULT_MAX_JOBS)), |
| ?assertEqual({1, 0}, run_stop_count()) |
| end). |
| |
| |
| t_no_jobs_start_if_max_is_0() -> |
| ?_test(begin |
| setup_jobs([oneshot(1)]), |
| reschedule(mock_state(0)), |
| ?assertEqual({0, 1}, run_stop_count()) |
| end). |
| |
| |
| t_one_job_starts_if_max_is_1() -> |
| ?_test(begin |
| setup_jobs([oneshot(1), oneshot(2)]), |
| reschedule(mock_state(1)), |
| ?assertEqual({1, 1}, run_stop_count()) |
| end). |
| |
| |
| t_max_churn_does_not_throttle_initial_start() -> |
| ?_test(begin |
| setup_jobs([oneshot(1), oneshot(2)]), |
| reschedule(mock_state(?DEFAULT_MAX_JOBS, 0)), |
| ?assertEqual({2, 0}, run_stop_count()) |
| end). |
| |
| |
| t_excess_oneshot_only_jobs() -> |
| ?_test(begin |
| setup_jobs([oneshot_running(1), oneshot_running(2)]), |
| ?assertEqual({2, 0}, run_stop_count()), |
| reschedule(mock_state(1)), |
| ?assertEqual({1, 1}, run_stop_count()), |
| reschedule(mock_state(0)), |
| ?assertEqual({0, 2}, run_stop_count()) |
| end). |
| |
| |
| t_excess_continuous_only_jobs() -> |
| ?_test(begin |
| setup_jobs([continuous_running(1), continuous_running(2)]), |
| ?assertEqual({2, 0}, run_stop_count()), |
| reschedule(mock_state(1)), |
| ?assertEqual({1, 1}, run_stop_count()), |
| reschedule(mock_state(0)), |
| ?assertEqual({0, 2}, run_stop_count()) |
| end). |
| |
| |
| t_excess_prefer_continuous_first() -> |
| ?_test(begin |
| Jobs = [ |
| continuous_running(1), |
| oneshot_running(2), |
| continuous_running(3) |
| ], |
| setup_jobs(Jobs), |
| ?assertEqual({3, 0}, run_stop_count()), |
| ?assertEqual({1, 0}, oneshot_run_stop_count()), |
| reschedule(mock_state(2)), |
| ?assertEqual({2, 1}, run_stop_count()), |
| ?assertEqual({1, 0}, oneshot_run_stop_count()), |
| reschedule(mock_state(1)), |
| ?assertEqual({1, 0}, oneshot_run_stop_count()), |
| reschedule(mock_state(0)), |
| ?assertEqual({0, 1}, oneshot_run_stop_count()) |
| end). |
| |
| |
| t_stop_oldest_first() -> |
| ?_test(begin |
| Jobs = [ |
| continuous_running(7), |
| continuous_running(4), |
| continuous_running(5) |
| ], |
| setup_jobs(Jobs), |
| reschedule(mock_state(2, 1)), |
| ?assertEqual({2, 1}, run_stop_count()), |
| ?assertEqual([4], jobs_stopped()), |
| reschedule(mock_state(1, 1)), |
| ?assertEqual([7], jobs_running()) |
| end). |
| |
| |
| t_start_oldest_first() -> |
| ?_test(begin |
| setup_jobs([continuous(7), continuous(2), continuous(5)]), |
| reschedule(mock_state(1)), |
| ?assertEqual({1, 2}, run_stop_count()), |
| ?assertEqual([2], jobs_running()), |
| reschedule(mock_state(2)), |
| ?assertEqual({2, 1}, run_stop_count()), |
| % After rescheduling with max_jobs = 2, 2 was stopped and 5, 7 should |
| % be running. |
| ?assertEqual([2], jobs_stopped()) |
| end). |
| |
| |
| t_jobs_churn_even_if_not_all_max_jobs_are_running() -> |
| ?_test(begin |
| setup_jobs([ |
| continuous_running(7), |
| continuous(2), |
| continuous(5) |
| ]), |
| reschedule(mock_state(2, 2)), |
| ?assertEqual({2, 1}, run_stop_count()), |
| ?assertEqual([7], jobs_stopped()) |
| end). |
| |
| |
| t_jobs_dont_churn_if_there_are_available_running_slots() -> |
| ?_test(begin |
| setup_jobs([ |
| continuous_running(1), |
| continuous_running(2) |
| ]), |
| reschedule(mock_state(2, 2)), |
| ?assertEqual({2, 0}, run_stop_count()), |
| ?assertEqual([], jobs_stopped()), |
| ?assertEqual(0, meck:num_calls(couch_replicator_scheduler_sup, start_child, 1)) |
| end). |
| |
| |
| t_start_only_pending_jobs_do_not_churn_existing_ones() -> |
| ?_test(begin |
| setup_jobs([ |
| continuous(1), |
| continuous_running(2) |
| ]), |
| reschedule(mock_state(2, 2)), |
| ?assertEqual(1, meck:num_calls(couch_replicator_scheduler_sup, start_child, 1)), |
| ?assertEqual([], jobs_stopped()), |
| ?assertEqual({2, 0}, run_stop_count()) |
| end). |
| |
| |
| t_dont_stop_if_nothing_pending() -> |
| ?_test(begin |
| setup_jobs([continuous_running(1), continuous_running(2)]), |
| reschedule(mock_state(2)), |
| ?assertEqual({2, 0}, run_stop_count()) |
| end). |
| |
| |
| t_max_churn_limits_number_of_rotated_jobs() -> |
| ?_test(begin |
| Jobs = [ |
| continuous(1), |
| continuous_running(2), |
| continuous(3), |
| continuous_running(4) |
| ], |
| setup_jobs(Jobs), |
| reschedule(mock_state(2, 1)), |
| ?assertEqual([2, 3], jobs_stopped()) |
| end). |
| |
| |
| t_if_pending_less_than_running_start_all_pending() -> |
| ?_test(begin |
| Jobs = [ |
| continuous(1), |
| continuous_running(2), |
| continuous(3), |
| continuous_running(4), |
| continuous_running(5) |
| ], |
| setup_jobs(Jobs), |
| reschedule(mock_state(3)), |
| ?assertEqual([1, 2, 5], jobs_running()) |
| end). |
| |
| |
| t_running_less_than_pending_swap_all_running() -> |
| ?_test(begin |
| Jobs = [ |
| continuous(1), |
| continuous(2), |
| continuous(3), |
| continuous_running(4), |
| continuous_running(5) |
| ], |
| setup_jobs(Jobs), |
| reschedule(mock_state(2)), |
| ?assertEqual([3, 4, 5], jobs_stopped()) |
| end). |
| |
| |
| t_oneshot_dont_get_rotated() -> |
| ?_test(begin |
| setup_jobs([oneshot_running(1), continuous(2)]), |
| reschedule(mock_state(1)), |
| ?assertEqual([1], jobs_running()) |
| end). |
| |
| |
| t_rotate_continuous_only_if_mixed() -> |
| ?_test(begin |
| setup_jobs([continuous(1), oneshot_running(2), continuous_running(3)]), |
| reschedule(mock_state(2)), |
| ?assertEqual([1, 2], jobs_running()) |
| end). |
| |
| |
| t_oneshot_dont_get_starting_priority() -> |
| ?_test(begin |
| setup_jobs([continuous(1), oneshot(2), continuous_running(3)]), |
| reschedule(mock_state(1)), |
| ?assertEqual([1], jobs_running()) |
| end). |
| |
| |
| % This tested in other test cases, it is here to mainly make explicit a property |
| % of one-shot replications -- they can starve other jobs if they "take control" |
| % of all the available scheduler slots. |
| t_oneshot_will_hog_the_scheduler() -> |
| ?_test(begin |
| Jobs = [ |
| oneshot_running(1), |
| oneshot_running(2), |
| oneshot(3), |
| continuous(4) |
| ], |
| setup_jobs(Jobs), |
| reschedule(mock_state(2)), |
| ?assertEqual([1, 2], jobs_running()) |
| end). |
| |
| |
| t_if_excess_is_trimmed_rotation_still_happens() -> |
| ?_test(begin |
| Jobs = [ |
| continuous(1), |
| continuous_running(2), |
| continuous_running(3) |
| ], |
| setup_jobs(Jobs), |
| reschedule(mock_state(1)), |
| ?assertEqual([1], jobs_running()) |
| end). |
| |
| |
| t_if_transient_job_crashes_it_gets_removed() -> |
| ?_test(begin |
| Pid = mock_pid(), |
| Job = #job{ |
| id = job1, |
| pid = Pid, |
| history = [added()], |
| rep = #rep{db_name = null, options = [{continuous, true}]} |
| }, |
| setup_jobs([Job]), |
| ?assertEqual(1, ets:info(?MODULE, size)), |
| State = #state{max_history = 3, stats_pid = self()}, |
| {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed}, |
| State), |
| ?assertEqual(0, ets:info(?MODULE, size)) |
| end). |
| |
| |
| t_if_permanent_job_crashes_it_stays_in_ets() -> |
| ?_test(begin |
| Pid = mock_pid(), |
| Job = #job{ |
| id = job1, |
| pid = Pid, |
| history = [added()], |
| rep = #rep{db_name = <<"db1">>, options = [{continuous, true}]} |
| }, |
| setup_jobs([Job]), |
| ?assertEqual(1, ets:info(?MODULE, size)), |
| State = #state{max_jobs =1, max_history = 3, stats_pid = self()}, |
| {noreply, State} = handle_info({'DOWN', r1, process, Pid, failed}, |
| State), |
| ?assertEqual(1, ets:info(?MODULE, size)), |
| [Job1] = ets:lookup(?MODULE, job1), |
| [Latest | _] = Job1#job.history, |
| ?assertMatch({{crashed, failed}, _}, Latest) |
| end). |
| |
| |
| t_existing_jobs() -> |
| ?_test(begin |
| Rep = #rep{ |
| id = job1, |
| db_name = <<"db">>, |
| source = <<"s">>, |
| target = <<"t">>, |
| options = [{continuous, true}] |
| }, |
| setup_jobs([#job{id = Rep#rep.id, rep = Rep}]), |
| NewRep = #rep{ |
| id = Rep#rep.id, |
| db_name = <<"db">>, |
| source = <<"s">>, |
| target = <<"t">>, |
| options = [{continuous, true}] |
| }, |
| ?assert(existing_replication(NewRep)), |
| ?assertNot(existing_replication(NewRep#rep{source = <<"s1">>})), |
| ?assertNot(existing_replication(NewRep#rep{target = <<"t1">>})), |
| ?assertNot(existing_replication(NewRep#rep{options = []})) |
| end). |
| |
| |
| t_job_summary_running() -> |
| ?_test(begin |
| Job = #job{ |
| id = job1, |
| pid = mock_pid(), |
| history = [added()], |
| rep = #rep{ |
| db_name = <<"db1">>, |
| source = <<"s">>, |
| target = <<"t">> |
| } |
| }, |
| setup_jobs([Job]), |
| Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), |
| ?assertEqual(running, proplists:get_value(state, Summary)), |
| ?assertEqual(null, proplists:get_value(info, Summary)), |
| ?assertEqual(0, proplists:get_value(error_count, Summary)), |
| |
| Stats = [{source_seq, <<"1-abc">>}], |
| handle_cast({update_job_stats, job1, Stats}, mock_state(1)), |
| Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), |
| ?assertEqual({Stats}, proplists:get_value(info, Summary1)) |
| end). |
| |
| |
| t_job_summary_pending() -> |
| ?_test(begin |
| Job = #job{ |
| id = job1, |
| pid = undefined, |
| history = [stopped(20), started(10), added()], |
| rep = #rep{source = <<"s">>, target = <<"t">>} |
| }, |
| setup_jobs([Job]), |
| Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), |
| ?assertEqual(pending, proplists:get_value(state, Summary)), |
| ?assertEqual(null, proplists:get_value(info, Summary)), |
| ?assertEqual(0, proplists:get_value(error_count, Summary)), |
| |
| Stats = [{doc_write_failures, 1}], |
| handle_cast({update_job_stats, job1, Stats}, mock_state(1)), |
| Summary1 = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), |
| ?assertEqual({Stats}, proplists:get_value(info, Summary1)) |
| end). |
| |
| |
| t_job_summary_crashing_once() -> |
| ?_test(begin |
| Job = #job{ |
| id = job1, |
| history = [crashed(?DEFAULT_HEALTH_THRESHOLD_SEC + 1), started(0)], |
| rep = #rep{source = <<"s">>, target = <<"t">>} |
| }, |
| setup_jobs([Job]), |
| Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), |
| ?assertEqual(crashing, proplists:get_value(state, Summary)), |
| Info = proplists:get_value(info, Summary), |
| ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info), |
| ?assertEqual(0, proplists:get_value(error_count, Summary)) |
| end). |
| |
| |
| t_job_summary_crashing_many_times() -> |
| ?_test(begin |
| Job = #job{ |
| id = job1, |
| history = [crashed(4), started(3), crashed(2), started(1)], |
| rep = #rep{source = <<"s">>, target = <<"t">>} |
| }, |
| setup_jobs([Job]), |
| Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), |
| ?assertEqual(crashing, proplists:get_value(state, Summary)), |
| Info = proplists:get_value(info, Summary), |
| ?assertEqual({[{<<"error">>, <<"some_reason">>}]}, Info), |
| ?assertEqual(2, proplists:get_value(error_count, Summary)) |
| end). |
| |
| |
| t_job_summary_proxy_fields() -> |
| ?_test(begin |
| Job = #job{ |
| id = job1, |
| history = [started(10), added()], |
| rep = #rep{ |
| source = #httpdb{ |
| url = "https://s", |
| proxy_url = "http://u:p@sproxy:12" |
| }, |
| target = #httpdb{ |
| url = "http://t", |
| proxy_url = "socks5://u:p@tproxy:34" |
| } |
| } |
| }, |
| setup_jobs([Job]), |
| Summary = job_summary(job1, ?DEFAULT_HEALTH_THRESHOLD_SEC), |
| ?assertEqual(<<"http://u:*****@sproxy:12">>, |
| proplists:get_value(source_proxy, Summary)), |
| ?assertEqual(<<"socks5://u:*****@tproxy:34">>, |
| proplists:get_value(target_proxy, Summary)) |
| end). |
| |
| |
| % Test helper functions |
| |
| setup_all() -> |
| catch ets:delete(?MODULE), |
| meck:expect(couch_log, notice, 2, ok), |
| meck:expect(couch_log, warning, 2, ok), |
| meck:expect(couch_log, error, 2, ok), |
| meck:expect(couch_replicator_scheduler_sup, terminate_child, 1, ok), |
| meck:expect(couch_stats, increment_counter, 1, ok), |
| meck:expect(couch_stats, update_gauge, 2, ok), |
| Pid = mock_pid(), |
| meck:expect(couch_replicator_scheduler_sup, start_child, 1, {ok, Pid}). |
| |
| |
| teardown_all(_) -> |
| catch ets:delete(?MODULE), |
| meck:unload(). |
| |
| |
| setup() -> |
| meck:reset([ |
| couch_log, |
| couch_replicator_scheduler_sup, |
| couch_stats |
| ]). |
| |
| |
| teardown(_) -> |
| ok. |
| |
| |
| setup_jobs(Jobs) when is_list(Jobs) -> |
| ?MODULE = ets:new(?MODULE, [named_table, {keypos, #job.id}]), |
| ets:insert(?MODULE, Jobs). |
| |
| |
| all_jobs() -> |
| lists:usort(ets:tab2list(?MODULE)). |
| |
| |
| jobs_stopped() -> |
| [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined]. |
| |
| |
| jobs_running() -> |
| [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined]. |
| |
| |
| run_stop_count() -> |
| {length(jobs_running()), length(jobs_stopped())}. |
| |
| |
| oneshot_run_stop_count() -> |
| Running = [Job#job.id || Job <- all_jobs(), Job#job.pid =/= undefined, |
| not is_continuous(Job)], |
| Stopped = [Job#job.id || Job <- all_jobs(), Job#job.pid =:= undefined, |
| not is_continuous(Job)], |
| {length(Running), length(Stopped)}. |
| |
| |
| mock_state(MaxJobs) -> |
| #state{ |
| max_jobs = MaxJobs, |
| max_churn = ?DEFAULT_MAX_CHURN, |
| max_history = ?DEFAULT_MAX_HISTORY, |
| stats_pid = self() |
| }. |
| |
| mock_state(MaxJobs, MaxChurn) -> |
| #state{ |
| max_jobs = MaxJobs, |
| max_churn = MaxChurn, |
| max_history = ?DEFAULT_MAX_HISTORY, |
| stats_pid = self() |
| }. |
| |
| |
| continuous(Id) when is_integer(Id) -> |
| Started = Id, |
| Hist = [stopped(Started+1), started(Started), added()], |
| #job{ |
| id = Id, |
| history = Hist, |
| rep = #rep{options = [{continuous, true}]} |
| }. |
| |
| |
| continuous_running(Id) when is_integer(Id) -> |
| Started = Id, |
| Pid = mock_pid(), |
| #job{ |
| id = Id, |
| history = [started(Started), added()], |
| rep = #rep{options = [{continuous, true}]}, |
| pid = Pid, |
| monitor = monitor(process, Pid) |
| }. |
| |
| |
| oneshot(Id) when is_integer(Id) -> |
| Started = Id, |
| Hist = [stopped(Started + 1), started(Started), added()], |
| #job{id = Id, history = Hist, rep = #rep{options = []}}. |
| |
| |
| oneshot_running(Id) when is_integer(Id) -> |
| Started = Id, |
| Pid = mock_pid(), |
| #job{ |
| id = Id, |
| history = [started(Started), added()], |
| rep = #rep{options = []}, |
| pid = Pid, |
| monitor = monitor(process, Pid) |
| }. |
| |
| |
| testjob(Hist) when is_list(Hist) -> |
| #job{history = Hist}. |
| |
| |
| mock_pid() -> |
| list_to_pid("<0.999.999>"). |
| |
| crashed() -> |
| crashed(0). |
| |
| |
| crashed(WhenSec) when is_integer(WhenSec)-> |
| {{crashed, some_reason}, {0, WhenSec, 0}}; |
| crashed({MSec, Sec, USec}) -> |
| {{crashed, some_reason}, {MSec, Sec, USec}}. |
| |
| |
| started() -> |
| started(0). |
| |
| |
| started(WhenSec) when is_integer(WhenSec)-> |
| {started, {0, WhenSec, 0}}; |
| |
| started({MSec, Sec, USec}) -> |
| {started, {MSec, Sec, USec}}. |
| |
| |
| stopped() -> |
| stopped(0). |
| |
| |
| stopped(WhenSec) -> |
| {stopped, {0, WhenSec, 0}}. |
| |
| |
| added() -> |
| {added, {0, 0, 0}}. |
| |
| -endif. |