blob: fb7b2fcfc5620736e65eabd7e54dd8050a0e331f [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
% This module implements the "Fair Share" algorithm by Judy Kay and Piers
% Lauder [1] and applies it to the scheduling of replication jobs.
%
% The main idea is _replicator dbs can have a configurable number of "shares"
% assigned to them. Shares is an abstract quantity from 1 to 1000. The default
% is 100. Jobs from _replicator databases with more shares get proportionally a
% higher chance to run than those from databases with a lower number of shares.
%
% Every scheduler cycle running jobs are "charged" based on how much time they
% spent running during that cycle. At the end of the cycle the accumulated
% charges for each job, the number of shares configured, and the total number
% of jobs in the pending queue from the same _replicator db, are used to
% calculate new priority values for all the jobs. To match the algorithm from
% the paper, jobs with lower priority values are the ones at the front of the
% run queue and have a higher chance of running.
%
% Here is how charges, shares, and number of sibling jobs affect the
% priority value:
%
% 1) Jobs from dbs with higher configured shares get assigned lower
% priority values and so stay closer to the front of the queue.
%
% 2) Jobs from dbs with many other jobs (many siblings) get assigned a
% higher priority value, so they get pushed further down the queue
% and have a lower chance of running.
%
% 3) Jobs which run longer accumulate more charges and get assigned a
% higher priority value and get to wait longer to run.
%
% In order to prevent job starvation, all job priorities are periodicaly
% decayed (decreased). This effectively moves all the jobs towards the front of
% the run queue. So, in effect, there are two competing processes: one
% uniformly moves all jobs to the front, and the other throws them back in
% proportion to those factors mentioned above. The speed of this uniform
% priority decay is controlled by the priority_coeff parameter.
%
% In order to prevent jobs from low shares dbs from "cheating" by getting
% deleted and immediately re-added, charges are accumulated using a
% historically decayed usage value. The speed of the usage decay is controlled
% by the `usage_coeff = 0.5` parameter.
%
% [1] : https://proteusmaster.urcf.drexel.edu/urcfwiki/images/KayLauderFairShare.pdf
-module(couch_replicator_share).
-export([
init/0,
clear/0,
update_shares/2,
reset_shares/1,
job_added/1,
job_removed/1,
update/3,
priority/1,
charge/3
]).
-include_lib("couch/include/couch_db.hrl").
-include("couch_replicator.hrl").
% Usage coefficient decays historic usage every scheduling cycle. For example,
% the usage value for a job running 1 minute is 60000000 (i.e microseconds /
% minute), then if the job stops running it will take about 26 cycles (minutes)
% for it to decay to 0 and the system to "forget" about it completely:
%
% trunc(60000000 * math:pow(0.5, 26)) = 0
%
-define(DEFAULT_USAGE_COEFF, 0.5).
% Priority coefficient decays all the job priorities such that they slowly
% drift towards the front of the run queue. This coefficient defines a maximum
% time window over which this algorithm would operate. For example, if this
% value is too small (0.1), after a few cycles quite a few jobs would end up at
% priority 0, and would render this algorithm useless. The default value of
% 0.98 is picked such that if a job ran for one scheduler cycle, then didn't
% get to run for 7 hours, it would still have priority > 0. 7 hours was picked
% as it was close enought to 8 hours which is the default maximum error backoff
% interval.
%
% Example calculation:
% shares = 100
% usage after 1 minute cycle run = 60000000
% initial priority = 60000000 / (100 * 100) = 6000
% trunc(6000 * math:pow(0.98, 431)) = 0
% 431 / 60 ~= 7 hrs
%
-define(DEFAULT_PRIORITY_COEFF, 0.98).
-define(MIN_SHARES, 1).
-define(MAX_SHARES, 1000).
-define(DEFAULT_SHARES, 100).
-define(SHARES, couch_replicator_shares).
-define(PRIORITIES, couch_replicator_priorities).
-define(USAGE, couch_replicator_usage).
-define(CHARGES, couch_replicator_stopped_usage).
-define(NUM_JOBS, couch_replicator_num_jobs).
init() ->
EtsOpts = [named_table, public],
% {Key, Shares}
?SHARES = ets:new(?SHARES, EtsOpts),
% {JobId, Priority}
?PRIORITIES = ets:new(?PRIORITIES, EtsOpts),
% {Key, Usage}
?USAGE = ets:new(?USAGE, EtsOpts),
% {Key, Charges}
?CHARGES = ets:new(?CHARGES, EtsOpts),
% {Key, NumJobs}
?NUM_JOBS = ets:new(?NUM_JOBS, EtsOpts),
lists:foreach(fun({K, V}) -> update_shares(K, V) end, get_config_shares()).
clear() ->
Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS],
lists:foreach(fun(T) -> catch ets:delete(T) end, Tables).
% This should be called when user updates the replicator.shares config section
%
update_shares(Key, Shares) when is_integer(Shares) ->
ets:insert(?SHARES, {Key, bounded(Shares, ?MIN_SHARES, ?MAX_SHARES)}).
% Called when the config value is deleted and shares are reset to the default
% value.
reset_shares(Key) ->
ets:delete(?SHARES, Key).
job_added(#job{} = Job) ->
Key = key(Job),
% If the entry is not present {Key, 0} is used as the default
ets:update_counter(?NUM_JOBS, Key, 1, {Key, 0}),
% Update job's priority as if it ran during one scheduler cycle. This is so
% new jobs don't get to be at priority 0 (highest).
update_priority(Job).
job_removed(#job{} = Job) ->
Key = key(Job),
ets:delete(?PRIORITIES, Job#job.id),
case ets:update_counter(?NUM_JOBS, Key, -1, {Key, 0}) of
N when is_integer(N), N =< 0 ->
ets:delete(?NUM_JOBS, Key);
N when is_integer(N), N > 0 ->
ok
end,
ok.
% This is the main algorithm update function. It should be called during each
% rescheduling cycle with a list of running jobs, the interval from the
% scheduler (in milliseconds), and the current timestamp.
%
% This function does all three main steps as described in [1].
%
% 1. Update usage from all the charges in the last scheduling cycle
%
% 2. Uniformly decay all job priorities
%
% 3. Update priorities for all the running jobs based on usage and number of
% sibling jobs.
%
update(RunningJobs, Interval, {_, _, _} = Now) ->
lists:foreach(fun(Job) -> charge(Job, Interval, Now) end, RunningJobs),
update_usage(),
decay_priorities(),
lists:foreach(fun(Job) -> update_priority(Job) end, RunningJobs).
priority(JobId) ->
% Not found means it was removed because it's value was 0
case ets:lookup(?PRIORITIES, JobId) of
[{_, Priority}] -> Priority;
[] -> 0
end.
charge(#job{pid = undefined}, _, _) ->
0;
charge(#job{} = Job, Interval, {_, _, _} = Now) when is_integer(Interval) ->
Key = key(Job),
Charges = job_charges(Job, Interval, Now),
% If the entry is not present {Key, 0} is used as the default
ets:update_counter(?CHARGES, Key, Charges, {Key, 0}).
usage(Key) ->
case ets:lookup(?USAGE, Key) of
[{_, Usage}] -> Usage;
[] -> 0
end.
num_jobs(Key) ->
case ets:lookup(?NUM_JOBS, Key) of
[{_, NumJobs}] -> NumJobs;
[] -> 0
end.
shares(Key) ->
case ets:lookup(?SHARES, Key) of
[{_, Shares}] -> Shares;
[] -> ?DEFAULT_SHARES
end.
% In [1] this described in the "Decay of Process Priorities" section
%
decay_priorities() ->
decay(?PRIORITIES, priority_coeff()),
% If priority becomes 0, it's removed. When looking it up, if it
% is missing we assume it is 0
clear_zero(?PRIORITIES).
% This is the main part of the alrgorithm. In [1] it is described in the
% "Priority Adjustment" section.
%
update_priority(#job{} = Job) ->
Id = Job#job.id,
Key = key(Job),
Shares = shares(Key),
Priority = (usage(Key) * num_jobs(Key)) / (Shares * Shares),
% If the entry is not present {Id, 0} is used as the default
ets:update_counter(?PRIORITIES, Id, trunc(Priority), {Id, 0}).
% This is the "User-Level Scheduling" part from [1]
%
update_usage() ->
decay(?USAGE, usage_coeff()),
clear_zero(?USAGE),
ets:foldl(
fun({Key, Charges}, _) ->
% If the entry is not present {Key, 0} is used as the default
ets:update_counter(?USAGE, Key, Charges, {Key, 0})
end,
0,
?CHARGES
),
% Start each interval with a fresh charges table
ets:delete_all_objects(?CHARGES).
% Private helper functions
decay(Ets, Coeff) when is_atom(Ets) ->
% Use trunc to ensure the result stays an integer in order for
% ets:update_counter to work properly. It throws a badarg otherwise.
Head = {'$1', '$2'},
Result = {{'$1', {trunc, {'*', '$2', {const, Coeff}}}}},
ets:select_replace(Ets, [{Head, [], [Result]}]).
clear_zero(Ets) when is_atom(Ets) ->
ets:select_delete(Ets, [{{'_', '$1'}, [{'=<', '$1', 0}], [true]}]).
key(#job{} = Job) ->
Rep = Job#job.rep,
case is_binary(Rep#rep.db_name) of
true -> mem3:dbname(Rep#rep.db_name);
false -> (Rep#rep.user_ctx)#user_ctx.name
end.
% Jobs are charged based on the amount of time the job was running during the
% last scheduling interval. The time units used are microseconds in order to
% have a large enough usage values so that when priority is calculated the
% rounded value won't be rounded off to 0 easily. The formula for the priority
% calculation is:
%
% Priority = (Usage * NumJobs) / Shares^2
%
% Then in the worst case of a single job in the db, running only for one
% second,for one job, with 1000 (max) shares, the priority would be:
%
% 1000000 * 1 / (1000^2) = 1
%
job_charges(#job{} = Job, IntervalMSec, {_, _, _} = Now) ->
TimeRunning = timer:now_diff(Now, last_started(Job)),
IntervalUSec = IntervalMSec * 1000,
bounded(TimeRunning, 0, IntervalUSec).
last_started(#job{} = Job) ->
case lists:keyfind(started, 1, Job#job.history) of
% In case user set too low of a max history
false -> {0, 0, 0};
{started, When} -> When
end.
bounded(Val, Min, Max) ->
max(Min, min(Max, Val)).
% Config helper functions
get_config_shares() ->
lists:map(
fun({K, V}) ->
{list_to_binary(K), int_val(V, ?DEFAULT_SHARES)}
end,
config:get("replicator.shares")
).
priority_coeff() ->
% This is the K2 coefficient from [1]
Default = ?DEFAULT_PRIORITY_COEFF,
Val = float_val(config:get("replicator", "priority_coeff"), Default),
bounded(Val, 0.0, 1.0).
usage_coeff() ->
% This is the K1 coefficient from [1]
Default = ?DEFAULT_USAGE_COEFF,
Val = float_val(config:get("replicator", "usage_coeff"), Default),
bounded(Val, 0.0, 1.0).
int_val(Str, Default) when is_list(Str) ->
try list_to_integer(Str) of
Val -> Val
catch
error:badarg ->
Default
end.
float_val(undefined, Default) ->
Default;
float_val(Str, Default) when is_list(Str) ->
try list_to_float(Str) of
Val -> Val
catch
error:badarg ->
Default
end.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-include_lib("couch/include/couch_eunit.hrl").
-define(DB1, <<"db1">>).
-define(DB2, <<"db2">>).
-define(DB3, <<"db3">>).
-define(J1, <<"j1">>).
-define(J2, <<"j2">>).
-define(J3, <<"j3">>).
fair_share_test_() ->
{
setup,
fun setup_all/0,
fun teardown_all/1,
{
foreach,
fun setup/0,
fun teardown/1,
[
?TDEF_FE(init_works),
?TDEF_FE(shares_are_updated_and_reset),
?TDEF_FE(jobs_are_added_and_removed),
?TDEF_FE(can_fetch_job_priority),
?TDEF_FE(jobs_are_charged),
?TDEF_FE(usage_is_updated),
?TDEF_FE(priority_coefficient_works),
?TDEF_FE(priority_decays_when_jobs_stop_running),
?TDEF_FE(priority_increases_when_jobs_run),
?TDEF_FE(two_dbs_equal_shares_equal_number_of_jobs),
?TDEF_FE(two_dbs_unequal_shares_equal_number_of_jobs),
?TDEF_FE(two_dbs_equal_shares_unequal_number_of_jobs),
?TDEF_FE(two_dbs_unequal_shares_unequal_number_of_jobs),
?TDEF_FE(three_dbs_equal_shares_equal_number_of_jobs),
?TDEF_FE(three_dbs_unequal_shares_equal_number_of_jobs),
?TDEF_FE(three_dbs_equal_shares_unequal_number_of_jobs),
?TDEF_FE(three_dbs_unequal_shares_unequal_number_of_jobs)
]
}
}.
setup_all() ->
test_util:start_couch().
teardown_all(Ctx) ->
config_delete("priority_coeff"),
config_delete("usage_coeff"),
config_shares_delete(),
test_util:stop_couch(Ctx).
setup() ->
init(),
ok.
teardown(_) ->
clear(),
config_delete("priority_coeff"),
config_delete("usage_coeff"),
config_shares_delete().
init_works(_) ->
Tables = [?SHARES, ?PRIORITIES, ?USAGE, ?CHARGES, ?NUM_JOBS],
[?assert(is_list(ets:info(T))) || T <- Tables],
?assertEqual(#{}, tab2map(?SHARES)),
clear(),
[?assertEqual(undefined, ets:info(T)) || T <- Tables],
config_share_set("db1", "200"),
init(),
?assertEqual(200, shares(?DB1)),
?assertEqual(#{?DB1 => 200}, tab2map(?SHARES)).
shares_are_updated_and_reset(_) ->
?assertEqual(#{}, tab2map(?SHARES)),
update_shares(?DB1, 42),
?assertEqual(42, shares(?DB1)),
reset_shares(?DB1),
?assertEqual(100, shares(?DB1)),
?assertEqual(#{}, tab2map(?SHARES)),
% min shares
update_shares(?DB1, 0),
?assertEqual(1, shares(?DB1)),
% max shares
update_shares(?DB1, 1001),
?assertEqual(1000, shares(?DB1)).
jobs_are_added_and_removed(_) ->
job_added(job(?J1, ?DB1)),
?assertEqual(1, num_jobs(?DB1)),
?assertEqual(#{?J1 => 0}, tab2map(?PRIORITIES)),
job_added(job(?J2, ?DB1)),
?assertEqual(2, num_jobs(?DB1)),
?assertEqual(#{?J1 => 0, ?J2 => 0}, tab2map(?PRIORITIES)),
job_added(job(?J3, ?DB2)),
?assertEqual(1, num_jobs(?DB2)),
?assertEqual(#{?J1 => 0, ?J2 => 0, ?J3 => 0}, tab2map(?PRIORITIES)),
job_removed(job(?J1, ?DB1)),
?assertEqual(1, num_jobs(?DB1)),
?assertEqual(#{?J2 => 0, ?J3 => 0}, tab2map(?PRIORITIES)),
job_removed(job(?J3, ?DB2)),
?assertEqual(0, num_jobs(?DB2)),
?assertEqual(0, priority(?J3)),
job_removed(job(?J2, ?DB1)),
?assertEqual(0, num_jobs(?DB2)),
?assertEqual(#{}, tab2map(?NUM_JOBS)),
?assertEqual(0, priority(?J2)),
?assertEqual(#{}, tab2map(?PRIORITIES)).
can_fetch_job_priority(_) ->
job_added(job(?J1, ?DB1)),
?assertEqual(0, priority(?J1)),
ets:insert(?PRIORITIES, {?J1, 42}),
?assertEqual(42, priority(?J1)),
ets:delete(?PRIORITIES, ?J1),
?assertEqual(0, priority(?J1)).
jobs_are_charged(_) ->
Job1 = running_job(?J1, ?DB1),
job_added(Job1),
?assertEqual(#{}, tab2map(?CHARGES)),
charge(Job1, 1000, {0, 1, 0}),
?assertEqual(#{?DB1 => 1000000}, tab2map(?CHARGES)),
% Stopped jobs are not charged
charge(stop(Job1), 1000, {0, 1, 0}),
?assertEqual(#{?DB1 => 1000000}, tab2map(?CHARGES)),
% Only charge up to one interval's worth even if job ran longer
charge(Job1, 1000, {0, 5, 0}),
?assertEqual(#{?DB1 => 2000000}, tab2map(?CHARGES)),
% Charges are accumulated from jobs in same db
Job2 = running_job(?J2, ?DB1),
job_added(Job2),
charge(Job2, 1000, {0, 0, 1}),
?assertEqual(#{?DB1 => 2000001}, tab2map(?CHARGES)),
% Charges are not cleared if jobs are removed
job_removed(Job1),
job_removed(Job2),
?assertEqual(#{?DB1 => 2000001}, tab2map(?CHARGES)).
usage_is_updated(_) ->
Job = running_job(?J1, ?DB1),
job_added(Job),
charge(Job, 60000, {0, 60, 0}),
update_usage(),
?assertEqual(60000000, usage(?DB1)),
% Charges table is cleared after usage is updated
?assertEqual(#{}, tab2map(?CHARGES)),
% Check that usage decay works
config_set("usage_coeff", "0.2"),
update_usage(),
?assertEqual(12000000, usage(?DB1)),
config_set("usage_coeff", "0.5"),
update_usage(),
?assertEqual(6000000, usage(?DB1)),
% Check that function both decays and updates from charges
charge(Job, 60000, {0, 60, 0}),
update_usage(),
?assertEqual(63000000, usage(?DB1)),
% Usage eventually decays to 0 and is removed from the table
[update_usage() || _ <- lists:seq(1, 100)],
?assertEqual(0, usage(?DB1)),
?assertEqual(#{}, tab2map(?USAGE)).
priority_coefficient_works(_) ->
job_added(job(?J1, ?DB1)),
ets:insert(?PRIORITIES, {?J1, 1000}),
config_set("priority_coeff", "0.8"),
decay_priorities(),
?assertEqual(800, priority(?J1)),
config_set("priority_coeff", "0.5"),
decay_priorities(),
?assertEqual(400, priority(?J1)),
% If non-float junk value is set then the default is used
config_set("priority_coeff", "junk"),
decay_priorities(),
?assertEqual(392, priority(?J1)),
% Clipped to 1.0 max
config_set("priority_coeff", "1.1"),
decay_priorities(),
?assertEqual(392, priority(?J1)),
% Clipped to 0.0 min and removed when =< 0
config_set("priority_coeff", "-0.1"),
decay_priorities(),
?assertEqual(0, priority(?J1)),
?assertEqual(#{}, tab2map(?PRIORITIES)).
priority_decays_when_jobs_stop_running(_) ->
Job = running_job(?J1, ?DB1),
job_added(Job),
% Ran for one cycle then stop
{[], Pending} = reschedule(1, {[Job], []}),
% Priority is non-0 initially
?assert(priority(?J1) > 0),
% Priority decays to 0 after some cycles
[reschedule(0, {[], Pending}) || _ <- lists:seq(1, 500)],
?assertEqual(0, priority(?J1)).
priority_increases_when_jobs_run(_) ->
Job = running_job(?J1, ?DB1),
job_added(Job),
Running = [Job],
reschedule(0, {Running, []}),
P1 = priority(?J1),
?assert(P1 > 0),
% Priority increases
reschedule(0, {Running, []}),
P2 = priority(?J1),
?assert(P2 > P1),
% Additive priority increase is balanced out by priority decay
[reschedule(0, {Running, []}) || _ <- lists:seq(1, 500)],
Pn = priority(?J1),
?assert(Pn > P2),
reschedule(0, {Running, []}),
Pm = priority(?J1),
?assertEqual(Pn, Pm).
two_dbs_equal_shares_equal_number_of_jobs(_) ->
update_shares(?DB1, 100),
update_shares(?DB2, 100),
Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}}),
#{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs),
?assert(49 =< Db1 andalso Db1 =< 51),
?assert(49 =< Db2 andalso Db2 =< 51).
two_dbs_unequal_shares_equal_number_of_jobs(_) ->
update_shares(?DB1, 100),
update_shares(?DB1, 900),
Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}}),
#{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs),
?assert(89 =< Db1 andalso Db1 =< 91),
?assert(9 =< Db2 andalso Db2 =< 11).
two_dbs_equal_shares_unequal_number_of_jobs(_) ->
update_shares(?DB1, 100),
update_shares(?DB2, 100),
Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 125}}),
#{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs),
?assert(49 =< Db1 andalso Db1 =< 51),
?assert(49 =< Db2 andalso Db2 =< 51).
two_dbs_unequal_shares_unequal_number_of_jobs(_) ->
update_shares(?DB1, 1),
update_shares(?DB2, 100),
Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 125}}),
#{?DB1 := Db1, ?DB2 := Db2} = run_scheduler(1000, 10, Jobs),
?assert(0 =< Db1 andalso Db1 =< 2),
?assert(98 =< Db2 andalso Db2 =< 100).
three_dbs_equal_shares_equal_number_of_jobs(_) ->
update_shares(?DB1, 100),
update_shares(?DB2, 100),
update_shares(?DB3, 100),
Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}, ?DB3 => {25, 75}}),
#{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs),
?assert(32 =< Db1 andalso Db1 =< 34),
?assert(32 =< Db2 andalso Db2 =< 34),
?assert(32 =< Db3 andalso Db3 =< 34).
three_dbs_unequal_shares_equal_number_of_jobs(_) ->
update_shares(?DB1, 100),
update_shares(?DB2, 700),
update_shares(?DB3, 200),
Jobs = jobs(#{?DB1 => {25, 75}, ?DB2 => {25, 75}, ?DB3 => {25, 75}}),
#{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs),
?assert(9 =< Db1 andalso Db1 =< 11),
?assert(69 =< Db2 andalso Db2 =< 71),
?assert(19 =< Db3 andalso Db3 =< 21).
three_dbs_equal_shares_unequal_number_of_jobs(_) ->
update_shares(?DB1, 100),
update_shares(?DB2, 100),
update_shares(?DB3, 100),
Jobs = jobs(#{?DB1 => {25, 25}, ?DB2 => {25, 100}, ?DB3 => {25, 75}}),
#{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs),
?assert(32 =< Db1 andalso Db1 =< 34),
?assert(32 =< Db2 andalso Db2 =< 34),
?assert(32 =< Db3 andalso Db3 =< 34).
three_dbs_unequal_shares_unequal_number_of_jobs(_) ->
update_shares(?DB1, 1000),
update_shares(?DB2, 100),
update_shares(?DB3, 1),
Jobs = jobs(#{?DB1 => {25, 100}, ?DB2 => {25, 125}, ?DB3 => {25, 875}}),
#{?DB1 := Db1, ?DB2 := Db2, ?DB3 := Db3} = run_scheduler(1000, 10, Jobs),
?assert(87 =< Db1 andalso Db1 =< 89),
?assert(9 =< Db2 andalso Db2 =< 11),
?assert(2 =< Db3 andalso Db3 =< 4).
config_set(K, V) ->
config:set("replicator", K, V, _Persist = false).
config_delete(K) ->
config:delete("replicator", K, _Persist = false).
config_share_set(K, V) ->
config:set("replicator.shares", K, V, _Persist = false).
config_shares_delete() ->
[
config:delete("replicator.shares", K, _Persist = false)
|| {K, _} <- config:get("replicator.shares")
].
tab2map(T) when is_atom(T) ->
maps:from_list(ets:tab2list(T)).
job(rand, Db) ->
job(rand:uniform(1 bsl 59), Db);
job(Id, Db) ->
Job = #job{
id = Id,
rep = #rep{
db_name = Db,
user_ctx = #user_ctx{}
}
},
stop(Job).
running_job(Id, Db) ->
run(job(Id, Db)).
run(#job{} = Job) ->
Job#job{
pid = list_to_pid("<0.9999.999>"),
history = [{started, {0, 0, 0}}, {added, {0, 0, 0}}]
}.
stop(#job{} = Job) ->
Job#job{
pid = undefined,
history = [{added, {0, 0, 0}}]
}.
% Simple scheduler simulator. Start and stop N jobs and do the
% accounting steps. Return a new list of running and pending jobs. If
% N is 0 then jobs which were running stay running and jobs were
% pending stay pending.
%
reschedule(N, {Running, Pending}) ->
update(Running, 60000, {0, 60, 0}),
RunPr = [{priority(Job#job.id), Job} || Job <- Running],
StopPr = [{priority(Job#job.id), Job} || Job <- Pending],
{_, Running1} = lists:unzip(lists:reverse(lists:sort(RunPr))),
{_, Pending1} = lists:unzip(lists:sort(StopPr)),
ToStop = lists:sublist(Running1, N),
ToStart = lists:sublist(Pending1, N),
Running2 = [run(Job) || Job <- ToStart] ++ Running1 -- ToStop,
Pending2 = [stop(Job) || Job <- ToStop] ++ Pending1 -- ToStart,
{Running2, Pending2}.
% Run a few scheduling cycles and calculate usage percentage for each db
%
run_scheduler(Cycles, Churn, Jobs0) ->
Acc0 = {#{}, Jobs0},
{Sum, _} = lists:foldl(
fun(_CycleCnt, {UsageAcc, {Running, _} = Jobs}) ->
UsageAcc1 = lists:foldl(
fun(#job{} = Job, Acc) ->
Db = Job#job.rep#rep.db_name,
maps:update_with(Db, fun(V) -> V + 1 end, 0, Acc)
end,
UsageAcc,
Running
),
{UsageAcc1, reschedule(Churn, Jobs)}
end,
Acc0,
lists:seq(1, Cycles)
),
Total = maps:fold(fun(_, V, Acc) -> Acc + V end, 0, Sum),
maps:map(fun(_Db, V) -> round(V / Total * 100) end, Sum).
% Dbs = #{Db => {RunningCount, PendingCount}
%
jobs(#{} = Dbs) ->
maps:fold(
fun(Db, {RCnt, PCnt}, {Running, Pending}) ->
RJobs = [running_job(rand, Db) || _ <- lists:seq(1, RCnt)],
PJobs = [job(rand, Db) || _ <- lists:seq(1, PCnt)],
[job_added(Job) || Job <- RJobs ++ PJobs],
{Running ++ RJobs, Pending ++ PJobs}
end,
{[], []},
Dbs
).
-endif.