% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
-export([start_link/0, call/3]).
-record(channel, {
q = queue:new()
-record(state, {
reqs = [],
min = 2,
max = 6,
global_max = 15,
channels = queue:new()
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% WARNING %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% This server relies on the internal structure of the channels queue as a %%
%% {list(), list()} to do in-place modifications of some elements. We are %%
%% "running on thin ice", as it were. %%
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
call(Pid, Msg, Priority) ->
Reply = gen_server:call(ioq_osq, {rlimit, nil, Priority, now()}, infinity),
gen_server:call(Pid, Msg, infinity)
whereis(ioq_osq) ! Reply
init([]) ->
ets:new(osq_counters, [named_table]),
St = #state{},
{ok, St#state{
min = threshold("minimum", St#state.min),
max = threshold("maximum", St#state.max),
global_max = threshold("global_maximum", St#state.global_max)
handle_call({set_minimum, C}, _From, State) when is_integer(C), C > 0 ->
{reply, ok, State#state{min = C}};
handle_call({set_maximum, C}, _From, State) when is_integer(C), C > 0 ->
{reply, ok, State#state{max = C}};
handle_call({set_global_maximum, C}, _From, State) when is_integer(C), C > 0 ->
{reply, ok, State#state{global_max = C}};
handle_call(get_queue_depths, _From, State) ->
Channels = [{N, queue:len(Q)} || #channel{name=N, q=Q}
<- queue:to_list(State#state.channels)],
{reply, Channels, State};
handle_call(get_requests, _From, State) ->
{reply, State#state.reqs, State};
handle_call({_, _, {interactive, Shard}, _} = Req, From, State) ->
{noreply, enqueue_channel(channel_name(Shard), {Req, From}, State)};
handle_call({_, _, {view_update, Shard, _}, _} = Req, From, State) ->
{noreply, enqueue_channel(channel_name(Shard), {Req, From}, State)};
handle_call({Fd, _, _, _} = Req, From, State) when is_pid(Fd) ->
{noreply, enqueue_channel(other, {Req, From}, State)};
handle_call({rlimit, _, _, _} = Req, From, State) ->
{noreply, enqueue_channel(other, {Req, From}, State)};
handle_call(_Msg, _From, State) ->
{reply, ignored, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({Ref, Reply}, #state{reqs = Reqs} = State) ->
case lists:keyfind(Ref, 3, Reqs) of
{_, notify, Ref} ->
erlang:demonitor(Ref, [flush]),
Reqs2 = lists:keydelete(Ref, 3, Reqs),
{noreply, make_next_request(State#state{reqs = Reqs2})};
{_, From, Ref} ->
erlang:demonitor(Ref, [flush]),
gen_server:reply(From, Reply),
Reqs2 = lists:keydelete(Ref, 3, Reqs),
{noreply, make_next_request(State#state{reqs = Reqs2})};
false ->
{noreply, State}
handle_info({'DOWN', Ref, _, _, Reason}, #state{reqs = Reqs} = State) ->
case lists:keyfind(Ref, 3, Reqs) of
{_, notify, Ref} ->
Reqs2 = lists:keydelete(Ref, 3, Reqs),
{noreply, make_next_request(State#state{reqs = Reqs2})};
{_, From, Ref} ->
gen_server:reply(From, {'EXIT', Reason}),
Reqs2 = lists:keydelete(Ref, 3, Reqs),
{noreply, make_next_request(State#state{reqs = Reqs2})};
false ->
{noreply, State}
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
channel_name(Shard) ->
try re:split(Shard, "/") of
[<<"shards">>, _, <<"heroku">>, AppId | _] ->
<<AppId/binary, ".heroku">>;
[<<"shards">>, _, Account | _] ->
_ ->
catch _:_ ->
find_channel(Account, {A, B}) ->
case lists:keyfind(Account, 2, A) of
false ->
case lists:keyfind(Account, 2, B) of
false ->
{new, #channel{name = Account}};
#channel{} = Channel ->
{2, Channel}
#channel{} = Channel ->
{1, Channel}
update_channel(#channel{q = Q} = Ch, Req) ->
Ch#channel{q = queue:in(Req, Q)}.
enqueue_channel(Account, Req, #state{channels = Q} = State) ->
NewState = case find_channel(Account, Q) of
{new, Channel0} ->
State#state{channels = queue:in(update_channel(Channel0, Req), Q)};
{Elem, Channel0} ->
Channel = update_channel(Channel0, Req),
% the channel already exists in the queue - update it in place
L = element(Elem, Q),
NewQ = setelement(Elem, Q, lists:keyreplace(Account, 2, L, Channel)),
State#state{channels = NewQ}
maybe_submit_request(#state{global_max=C, reqs=R} = St) when length(R) < C ->
maybe_submit_request(#state{min = Min} = State) ->
% look for a channel which hasn't reached the minimum yet
make_next_request(State, Min).
make_next_request(#state{max = Max} = State) ->
% default behavior, look for a channel not yet maxed out
make_next_request(State, Max).
make_next_request(#state{channels = Channels, reqs = R} = State, Threshold) ->
case next_unblocked_channel(Channels, R, Threshold, queue:new()) of
{#channel{name = Name, q = Q} = Ch, OutChannels} ->
{{value, Item}, NewQ} = queue:out(Q),
case queue:is_empty(NewQ) of
true ->
NewCh = OutChannels;
false ->
NewCh = queue:in(Ch#channel{q = NewQ}, OutChannels)
submit_request(Name, Item, State#state{channels = NewCh});
{nil, OutQ} ->
% everybody is using their allotted slots, try again later
State#state{channels = OutQ}
next_unblocked_channel(InQ, Reqs, Max, OutQ) ->
case queue:out(InQ) of
{empty, _} -> % all channels blocked
{nil, OutQ};
{{value, #channel{name=Name} = Channel}, NewQ} ->
case length([1 || {N, _, _} <- Reqs, N =:= Name]) >= Max of
true -> % channel is blocked, keep searching
next_unblocked_channel(NewQ, Reqs, Max, queue:in(Channel, OutQ));
false ->
{Channel, queue:join(NewQ, OutQ)}
submit_request(Channel, {{rlimit,_,Pri,T0}, From}, #state{reqs=Reqs} = State) ->
% rlimit fd means that we'll get a response back
% from the pid after it performs the call on its
% own
Ref = erlang:monitor(process, element(1, From)),
gen_server:reply(From, {Ref, nil}),
record_stats(Channel, Pri, T0),
State#state{reqs = [{Channel, notify, Ref} | Reqs]};
submit_request(Channel, {{Fd,Call,Pri,T0}, From}, #state{reqs=Reqs} = State) ->
% make the request
Ref = erlang:monitor(process, Fd),
Fd ! {'$gen_call', {self(), Ref}, Call},
record_stats(Channel, Pri, T0),
State#state{reqs = [{Channel, From, Ref} | Reqs]}.
record_stats(Channel, Pri, T0) ->
IOClass = if is_tuple(Pri) -> element(1, Pri); true -> Pri end,
Latency = timer:now_diff(now(),T0) / 1000,
catch couch_stats:increment_counter([couchdb, io_queue, IOClass]),
catch couch_stats:increment_counter([couchdb, io_queue, osproc]),
catch couch_stats:update_histogram([couchdb, io_queue, latency], Latency),
update_counter(Channel, IOClass, osproc).
update_counter(Channel, IOClass, RW) ->
try ets:update_counter(osq_counters, {Channel, IOClass, RW}, 1)
catch error:badarg ->
ets:insert(osq_counters, {{Channel, IOClass, RW}, 1})
threshold(Name, Default) ->
try list_to_integer(config:get("osq", Name)) of
C when C > 0->
_ ->
catch _:_ ->