blob: 275c14b28e734fb781ef5a3ca899d42d95b0d2f5 [file] [log] [blame]
-module(couch_replicator_rate_limiter).
-behaviour(gen_server).
-include_lib("couch/include/couch_db.hrl").
-include_lib("ibrowse/include/ibrowse.hrl").
-include_lib("couch_replicator_api_wrap.hrl").
-export([
start_link/0
]).
-export([
init/1,
terminate/2,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3
]).
-export ([
maybe_start_rate_limiter/3,
maybe_decrement_rep_count/1,
maybe_delay_request/1
]).
-record(rep_limiter, {
requestCounter = 0,
lastUpdateTimestamp = 0,
pid,
replications = 0,
limit, % max requests
period % interval is in seconds
}).
%% For each unique host, an entry is a rep_limiter record
-define(HOSTS, couch_replicator_limit_hosts).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
update_host(Host, Limit, Period) ->
gen_server:call(?MODULE, {update_host, Host, Limit, Period}, infinity).
decrement_replications(Host) ->
gen_server:call(?MODULE, {decrement_replications, Host}, infinity).
% gen_server functions.
init([]) ->
process_flag(trap_exit, true),
ets:new(?HOSTS, [set, public, named_table]),
{ok, nil}.
handle_call({update_host, Host, Limit, Period}, _From, State) ->
Reps = case ets:insert_new(?HOSTS, {Host, #rep_limiter{}}) of
true ->
% Brand new host for replication, need to spawn
% a new resetter process and initialize
LoopPid = spawn_link(fun() ->
reset_requests_loop(Host, Period)
end),
modify_host(Host, [{4, LoopPid}, {5, 1}, {6, Limit},
{7, Period}]),
1;
false ->
% this allows the rate limit to be changed during replication
modify_host(Host, [{6, Limit}, {7, Period}]),
update_rep_count(Host, 1)
end,
{ok, Reps, State};
handle_call({decrement_replications, Host}, _From, State) ->
Replications = case update_rep_count(Host, -1) of
Rep0 when Rep0 =< 0 ->
ResetPid = ets:lookup_element(?HOSTS, Host, 4),
% no more replications at this moment, stop resetter
% process and remove the host from the ets table
exit(ResetPid, stop_resetter),
remove_host(Host);
_ ->
ok % this shoulld be some error
end,
{ok, Replications, State}.
handle_cast(_, State) ->
{noreply, State}.
handle_info({'EXIT', _FromPid, stop_resetter}, State) ->
{noreply, State};
handle_info({'EXIT', _FromPid, Reason}, State) ->
couch_log:error("couch_replicator_rate_limiter reset process
died abnormally due to: ~p", [Reason]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
modify_host(Host, Elements) ->
ets:update_element(?HOSTS, Host, Elements).
remove_host(Host) ->
ets:delete(?HOSTS, Host).
update_rep_count(Host, Value) ->
ets:update_counter(?HOSTS, Host, {5, Value}).
increment_req_count(Host) ->
ets:update_counter(?HOSTS, Host, {2, 1}).
code_change(_OldVsn, nil, _Extra) ->
{ok, nil}.
reset_requests_loop(Host, Period) ->
modify_host(Host, [{2, 0}, {3, os:timestamp()}]),
timer:sleep(Period),
reset_requests_loop(Host, Period).
% Returns sleep time in milliseconds
calculate_sleep_time(Host, RequestCounter) ->
{Limit, Period} = get_host_rate(Host),
Interval = (Period / Limit) * 1000,
LastUpdateTimestamp = ets:lookup_element(?HOSTS, Host, 3),
ReqTimestamp = os:timestamp(),
TimeDiff0 = timer:now_diff(ReqTimestamp, LastUpdateTimestamp),
TimeDiff1 = TimeDiff0 div 1000 rem 1000,
CountDiff = RequestCounter - Limit,
NextReset = ts_to_millisec(LastUpdateTimestamp) + (Period * 1000),
case {CountDiff, TimeDiff1} of
{C0, _} when C0 =< 0 ->
0;
{C0, T0} when T0 > 0 ->
(NextReset - T0) + (C0 * Interval);
{C0, T0} when T0 =< 0 ->
% This can occur if the request comes in really close to
% to the reset time and we read the reset value too late.
abs(T0) + (C0 * Interval)
end.
ts_to_millisec({Mega, Sec, Micro}) ->
(Mega * 1000000) + (Sec * 1000) + (Micro div 1000 rem 1000).
get_host_rate(Host) ->
Limit = try ets:lookup_element(?HOSTS, Host, 6) of
L ->
L
catch error:badarg ->
-1
end,
Period = try ets:lookup_element(?HOSTS, Host, 7) of
P ->
P
catch error:badarg ->
-1
end,
{Limit, Period}.
maybe_start_rate_limiter(#httpdb{url = Url}, Limit, Period) ->
Host = ibrowse_lib:parse_url(Url),
case {Limit, Period} of
{-1, _} ->
false;
{_, -1} ->
false;
{L0, P0} ->
update_host(Host, L0, P0)
end;
% disabled for local
maybe_start_rate_limiter(_, _ , _) ->
false.
maybe_decrement_rep_count(Url) ->
Host = ibrowse_lib:parse_url(Url),
{Limit, Period} = get_host_rate(Host),
case {Limit, Period} of
{-1, _} ->
false;
{_, -1} ->
false;
{_, _} ->
decrement_replications(Host)
end.
maybe_delay_request(Url) ->
Host = ibrowse_lib:parse_url(Url),
{Limit, Period} = get_host_rate(Host),
case {Limit, Period} of
{-1, _} ->
false;
{_, -1} ->
false;
{_, _} ->
Counter = increment_req_count(Host),
Sleep = calculate_sleep_time(Host, Counter),
timer:sleep(Sleep)
end.