blob: 5e2e01f9d4ad2eac8bdd2efe22477154ee1f8e15 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(ioq_server2).
-behavior(gen_server).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-export([
start_link/3,
call/3,
pcall/1,
pcall/2
]).
-export([
get_state/0,
get_state/1,
update_config/0,
get_queue_depths/0,
get_queue_depths/1,
get_concurrency/0,
set_concurrency/1,
get_counters/0
]).
-include_lib("ioq/include/ioq.hrl").
-define(DEFAULT_RESIZE_LIMIT, 1000).
-define(DEFAULT_CONCURRENCY, 1).
-define(DEFAULT_SCALE_FACTOR, 2.0).
-define(DEFAULT_MAX_PRIORITY, 10000.0).
-record(state, {
reqs :: khash:khash(),
waiters :: khash:khash(),
queue :: hqueue:hqueue(),
concurrency = ?DEFAULT_CONCURRENCY :: pos_integer(),
iterations = 0 :: non_neg_integer(),
class_p :: khash:khash(), %% class priorities
user_p :: khash:khash(), %% user priorities
shard_p :: khash:khash(), %% shard priorities
scale_factor = ?DEFAULT_SCALE_FACTOR :: float(),
dedupe = true :: boolean(),
resize_limit = ?DEFAULT_RESIZE_LIMIT :: pos_integer(),
next_key = 1 :: pos_integer(),
server_name :: atom(),
scheduler_id = 0 :: non_neg_integer(),
max_priority = ?DEFAULT_MAX_PRIORITY :: float()
}).
-type state() :: #state{}.
-type waiter_key() :: {pid(), integer()} | pos_integer().
-type priority() :: float(). %% should be non_negative_float().
%% Hacky queue_depth type due to existing fixed element lists for JSON in API
%% Actual type is:
%% [
%% {compaction, non_neg_integer()},
%% {replication, non_neg_integer()},
%% {low, non_neg_integer()},
%% {channels, {[{username(), [Interactive, DbUpdate, ViewUpdate]}]}}
%% ]
%% when Interactive = DbUpdate = ViewUpdate = non_neg_integer().
-type depths() :: compaction | replication | low.
-type user_depth() :: {binary(), [non_neg_integer()]}.
-type depth_ele() :: {depths(), non_neg_integer()} | user_depth().
-type queue_depths() :: [depth_ele()].
-type read_write() :: reads | writes | unknown.
-define(SERVER_ID(SID), list_to_atom("ioq_server_" ++ integer_to_list(SID))).
-spec call(pid(), term(), io_dimensions()) -> term().
call(Fd, Msg, Dimensions) ->
Req0 = #ioq_request{
fd = Fd,
msg = Msg,
t0 = os:timestamp()
},
Req = add_request_dimensions(Req0, Dimensions),
Class = atom_to_list(Req#ioq_request.class),
case config:get_boolean("ioq2.bypass", Class, false) of
true ->
RW = rw(Msg),
couch_stats:increment_counter([couchdb, io_queue2, bypassed_count]),
couch_stats:increment_counter(
[couchdb, io_queue2, RW, bypassed_count]),
gen_server:call(Fd, Msg, infinity);
_ ->
DispatchStrategy = config:get(
"ioq2", "dispatch_strategy", ?DISPATCH_SERVER_PER_SCHEDULER),
Server = case DispatchStrategy of
?DISPATCH_RANDOM ->
maybe_seed(),
SID = random:uniform(erlang:system_info(schedulers)),
?SERVER_ID(SID);
?DISPATCH_FD_HASH ->
NumSchedulers = erlang:system_info(schedulers),
SID = 1 + (erlang:phash2(Fd) rem NumSchedulers),
?SERVER_ID(SID);
?DISPATCH_SINGLE_SERVER ->
?SERVER_ID(1);
_ ->
SID = erlang:system_info(scheduler_id),
?SERVER_ID(SID)
end,
gen_server:call(Server, Req, infinity)
end.
-spec pcall(any()) -> any().
pcall(Msg) ->
pcall(Msg, 500).
-spec pcall(any(), non_neg_integer()) -> any().
pcall(Msg, Timeout) ->
{MainPid, MainRef} = spawn_monitor(fun() ->
PidRefs = lists:map(fun(Name) ->
spawn_monitor(fun() ->
Resp = gen_server:call(Name, Msg, Timeout),
exit({resp_ok, Resp})
end)
end, ioq_sup:get_ioq2_servers()),
Resps = lists:map(fun({Pid, _}) ->
receive
{'DOWN', _, _, Pid, {resp_ok, Resp}} ->
Resp;
{'DOWN', _, _, Pid, Error} ->
exit(Error)
end
end, PidRefs),
exit({resp_ok, Resps})
end),
receive
{'DOWN', _, _, MainPid, {resp_ok, Resps}} ->
{ok, Resps};
{'DOWN', _, _, MainPid, Error} ->
{error, Error}
after Timeout ->
erlang:demonitor(MainRef, [flush]),
exit(MainPid, kill),
{error, timeout}
end.
-spec get_queue_depths() -> queue_depths().
get_queue_depths() ->
case pcall(get_pending_reqs, 500) of
{ok, PReqs} ->
get_queue_depths([Req || {_Priority, Req} <- lists:flatten(PReqs)]);
{error, _} ->
[
{compaction, ?BAD_MAGIC_NUM},
{replication, ?BAD_MAGIC_NUM},
{low, ?BAD_MAGIC_NUM},
{channels, {[]}}
]
end.
-spec get_queue_depths([ioq_request()]) -> queue_depths().
get_queue_depths(Reqs) ->
{ok, Users0} = khash:new(),
{Compaction, Replication, Low, Users} = lists:foldl(
fun
(#ioq_request{class=db_compact}, {C, R, L, U}) ->
{C+1, R, L, U};
(#ioq_request{class=view_compact}, {C, R, L, U}) ->
{C+1, R, L, U};
(#ioq_request{class=internal_repl}, {C, R, L, U}) ->
{C, R+1, L, U};
(#ioq_request{class=low}, {C, R, L, U}) ->
{C, R, L+1, U};
(#ioq_request{class=Class, user=User}, {C, R, L, U}) ->
[UI0, UDB0, UV0] = case khash:get(U, User) of
undefined ->
[0,0,0];
UC0 ->
UC0
end,
UC = case Class of
db_update ->
[UI0, UDB0+1, UV0];
view_update ->
[UI0, UDB0, UV0+1];
_Interactive ->
[UI0+1, UDB0, UV0]
end,
ok = khash:put(U, User, UC),
{C, R, L, U}
end,
{0, 0, 0, Users0},
Reqs
),
[
{compaction, Compaction},
{replication, Replication},
{low, Low},
{channels, {khash:to_list(Users)}}
].
-spec get_concurrency() -> non_neg_integer().
get_concurrency() ->
case pcall(get_concurrency, 500) of
{ok, Concurrencies} ->
lists:sum(Concurrencies);
{error, _} ->
?BAD_MAGIC_NUM
end.
-spec set_concurrency(non_neg_integer()) -> non_neg_integer().
set_concurrency(C) when is_integer(C), C > 0 ->
lists:foldl(
fun(Pid, Total) ->
{ok, Old} = gen_server:call(Pid, {set_concurrency, C}, 1000),
Total + Old
end,
0,
ioq_sup:get_ioq2_servers()
);
set_concurrency(_) ->
erlang:error(badarg).
get_counters() ->
undefined.
%% @equiv get_state(?SERVER_ID(1))
-spec get_state() -> any().
get_state() ->
get_state(?SERVER_ID(1)).
%% Returns a mutated #state{} with list representations of khash/hqueue objects
-spec get_state(atom()) -> any().
get_state(Server) ->
gen_server:call(Server, get_state, infinity).
-spec update_config() -> ok.
update_config() ->
gen_server:call(?SERVER_ID(1), update_config, infinity).
start_link(Name, SID, Bind) ->
Options = case Bind of
true -> [{scheduler, SID}];
false -> []
end,
gen_server:start_link({local, Name}, ?MODULE, [Name, SID], Options).
init([Name, SID]) ->
{ok, HQ} = hqueue:new(),
{ok, Reqs} = khash:new(),
{ok, Waiters} = khash:new(),
State = #state{
queue = HQ,
reqs = Reqs,
waiters = Waiters,
server_name = Name,
scheduler_id = SID
},
{ok, update_config_int(State)}.
handle_call(get_state, _From, State) ->
Resp = State#state{
user_p = khash:to_list(State#state.user_p),
class_p = khash:to_list(State#state.class_p),
shard_p = khash:to_list(State#state.shard_p),
reqs = khash:to_list(State#state.reqs),
waiters = khash:to_list(State#state.waiters),
queue = hqueue:to_list(State#state.queue)
},
{reply, Resp, State, 0};
handle_call(#ioq_request{} = Req, From, State) ->
{noreply, enqueue_request(Req#ioq_request{from=From}, State), 0};
handle_call({hqueue, Method, Args}, _From, #state{queue=HQ}=State) ->
Resp = erlang:apply(hqueue, Method, [HQ | Args]),
{reply, Resp, State, 0};
handle_call(update_config, _From, State) ->
{reply, ok, update_config_int(State), 0};
handle_call(get_concurrency, _From, State) ->
{reply, State#state.concurrency, State, 0};
handle_call({set_concurrency, C}, _From, State) when is_integer(C), C > 0 ->
{reply, {ok, State#state.concurrency}, State#state{concurrency = C}, 0};
handle_call(get_reqs, _From, #state{reqs=Reqs}=State) ->
{reply, khash:to_list(Reqs), State, 0};
handle_call(get_pending_reqs, _From, #state{queue=HQ}=State) ->
{reply, hqueue:to_list(HQ), State, 0};
handle_call(get_counters, _From, State) ->
{reply, undefined, State, 0};
handle_call(_, _From, State) ->
{reply, ok, State, 0}.
handle_cast(update_config, State) ->
{noreply, update_config_int(State), 0};
handle_cast(_Msg, State) ->
{noreply, State, 0}.
handle_info({Ref, Reply}, #state{reqs = Reqs} = State) ->
case khash:get(Reqs, Ref) of
undefined ->
ok;
#ioq_request{ref=Ref}=Req ->
ok = khash:del(Reqs, Ref),
TResponse = os:timestamp(),
ServiceTime = time_delta(TResponse, Req#ioq_request.tsub),
IOWait = time_delta(TResponse, Req#ioq_request.t0),
couch_stats:update_histogram(
[couchdb, io_queue2, svctm], ServiceTime),
couch_stats:update_histogram([couchdb, io_queue2, iowait], IOWait),
erlang:demonitor(Ref, [flush]),
send_response(State#state.waiters, Req, Reply)
end,
{noreply, State, 0};
handle_info({'DOWN', Ref, _, _, Reason}, #state{reqs = Reqs} = State) ->
case khash:get(Reqs, Ref) of
undefined ->
ok;
#ioq_request{ref=Ref}=Req ->
couch_stats:increment_counter([couchdb, io_queue2, io_errors]),
ok = khash:del(Reqs, Ref),
send_response(State#state.waiters, Req, {'EXIT', Reason})
end,
{noreply, State, 0};
handle_info(timeout, State) ->
{noreply, maybe_submit_request(State)};
handle_info(_Info, State) ->
{noreply, State, 0}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, #state{}=State, _Extra) ->
{ok, State}.
-spec update_config_int(state()) -> state().
update_config_int(State) ->
Concurrency = config:get_integer("ioq2", "concurrency", ?DEFAULT_CONCURRENCY),
ResizeLimit = config:get_integer("ioq2", "resize_limit", ?DEFAULT_RESIZE_LIMIT),
DeDupe = config:get_boolean("ioq2", "dedupe", true),
ScaleFactor = ioq_config:to_float(
config:get("ioq2", "scale_factor"),
?DEFAULT_SCALE_FACTOR
),
MaxPriority = ioq_config:to_float(
config:get("ioq2", "max_priority"),
?DEFAULT_MAX_PRIORITY
),
{ok, ClassP} = ioq_config:build_class_priorities(),
{ok, UserP} = ioq_config:build_user_priorities(),
{ok, ShardP} = ioq_config:build_shard_priorities(),
State#state{
user_p = UserP,
class_p = ClassP,
shard_p = ShardP,
scale_factor = ScaleFactor,
concurrency = Concurrency,
dedupe = DeDupe,
resize_limit = ResizeLimit,
max_priority = MaxPriority
}.
-spec maybe_submit_request(state()) -> state().
maybe_submit_request(#state{reqs=Reqs, concurrency=C}=State) ->
NumReqs = khash:size(Reqs),
case NumReqs < C of
true ->
case make_next_request(State) of
State ->
State;
NewState when NumReqs >= C-1 ->
NewState;
NewState ->
maybe_submit_request(NewState)
end;
false ->
State
end.
-spec make_next_request(state()) -> state().
make_next_request(#state{queue=HQ}=State) ->
case hqueue:extract_max(HQ) of
{error, empty} ->
State;
{Priority, #ioq_request{} = Req} ->
submit_request(Req#ioq_request{fin_priority=Priority}, State)
end.
-spec submit_request(ioq_request(), state()) -> state().
submit_request(Req, #state{iterations=I, resize_limit=RL}=State) when I >= RL ->
ok = hqueue:scale_by(State#state.queue, State#state.scale_factor),
submit_request(Req, State#state{iterations=0});
submit_request(Req, #state{iterations=Iterations}=State) ->
#ioq_request{
fd = Fd,
msg = Call,
class = Class,
t0 = T0
} = Req,
#state{reqs = Reqs} = State,
% make the request
Ref = erlang:monitor(process, Fd),
Fd ! {'$gen_call', {self(), Ref}, Call},
% record some stats
RW = rw(Call),
SubmitTime = os:timestamp(),
Latency = time_delta(SubmitTime, T0),
couch_stats:increment_counter([couchdb, io_queue2, Class, count]),
couch_stats:increment_counter([couchdb, io_queue2, RW, count]),
couch_stats:update_histogram([couchdb, io_queue2, submit_delay], Latency),
khash:put(Reqs, Ref, Req#ioq_request{tsub=SubmitTime, ref=Ref}),
State#state{iterations=Iterations+1}.
-spec send_response(khash:khash(), ioq_request(), term()) -> [ok].
send_response(Waiters, #ioq_request{key=Key}, Reply) ->
Waiting = khash:get(Waiters, Key),
khash:del(Waiters, Key),
[gen_server:reply(W, Reply) || W <- Waiting].
-spec waiter_key(ioq_request(), state()) -> {waiter_key(), state()}.
waiter_key(Req, State) ->
case {State#state.dedupe, Req#ioq_request.msg} of
{true, {pread_iolist, Pos}} ->
{{Req#ioq_request.fd, Pos}, State};
_ ->
Next = State#state.next_key,
{Next, State#state{next_key = Next + 1}}
end.
-spec enqueue_request(ioq_request(), state()) -> state().
enqueue_request(Req, #state{queue=HQ, waiters=Waiters}=State0) ->
#ioq_request{
from = From,
msg = Msg
} = Req,
{ReqKey, State} = waiter_key(Req, State0),
RW = rw(Msg),
couch_stats:increment_counter([couchdb, io_queue2, queued]),
couch_stats:increment_counter([couchdb, io_queue2, RW, queued]),
case khash:get(State#state.waiters, ReqKey, not_found) of
not_found ->
Priority = prioritize_request(Req, State),
Req1 = Req#ioq_request{
key = ReqKey,
init_priority = Priority
},
hqueue:insert(HQ, Priority, Req1),
khash:put(State#state.waiters, ReqKey, [From]);
Pids ->
couch_stats:increment_counter([couchdb, io_queue2, merged]),
khash:put(Waiters, ReqKey, [From | Pids])
end,
State.
-spec add_request_dimensions(ioq_request(), io_dimensions()) -> ioq_request().
add_request_dimensions(Request, {Class, Shard}) ->
add_request_dimensions(Request, {Class, Shard, undefined});
add_request_dimensions(Request, {Class, Shard0, GroupId}) ->
{Shard, User, DbName} = case {Class, Shard0} of
{interactive, "dbcopy"} ->
{undefined, undefined, undefined};
{db_meta, security} ->
{undefined, undefined, undefined};
_ ->
Shard1 = filename:rootname(Shard0),
{User0, DbName0} = shard_info(Shard1),
{Shard1, User0, DbName0}
end,
Request#ioq_request{
shard = Shard,
user = User,
db = DbName,
ddoc = GroupId,
class = Class
};
add_request_dimensions(Request, undefined) ->
Request#ioq_request{class=other}.
-spec shard_info(dbname()) -> {any(), any()}.
shard_info(Shard) ->
try split(Shard) of
[<<"shards">>, _, <<"heroku">>, AppId, DbName] ->
{<<AppId/binary, ".heroku">>, DbName};
[<<"shards">>, _, DbName] ->
{system, DbName};
[<<"shards">>, _, Account, DbName] ->
{Account, DbName};
[<<"shards">>, _, Account | DbParts] ->
{Account, filename:join(DbParts)};
_ ->
{undefined, undefined}
catch _:_ ->
{undefined, undefined}
end.
-spec split(binary()) -> [binary()]
; ([binary()]) -> [binary()].
split(B) when is_binary(B) ->
split(B, 0, 0, []);
split(B) ->
B.
-spec split(binary(), non_neg_integer(), non_neg_integer(), [binary()]) -> [binary()].
split(B, O, S, Acc) ->
case B of
<<_:O/binary>> ->
Len = O - S,
<<_:S/binary, Part:Len/binary>> = B,
lists:reverse(Acc, [Part]);
<<_:O/binary, $/, _/binary>> ->
Len = O - S,
<<_:S/binary, Part:Len/binary, _/binary>> = B,
split(B, O+1, O+1, [Part | Acc]);
_ ->
split(B, O+1, S, Acc)
end.
-spec time_delta(T1, T0) -> Tdiff when
T1 :: erlang:timestamp(),
T0 :: erlang:timestamp(),
Tdiff :: integer().
time_delta(T1, T0) ->
trunc(timer:now_diff(T1, T0) / 1000).
-spec rw(io_dimensions()) -> read_write().
rw({pread_iolist, _}) ->
reads;
rw({append_bin, _}) ->
writes;
rw({append_bin, _, _}) ->
writes;
rw(_) ->
unknown.
-spec prioritize_request(ioq_request(), state()) -> priority().
prioritize_request(Req, State) ->
#state{
class_p = ClassP,
user_p = UserP,
shard_p = ShardP,
max_priority = Max
} = State,
case ioq_config:prioritize(Req, ClassP, UserP, ShardP) of
Priority when Priority < 0.0 -> 0.0;
Priority when Priority > Max -> Max;
Priority -> Priority
end.
-spec maybe_seed() -> {integer(), integer(), integer()}.
maybe_seed() ->
case get(random_seed) of
undefined ->
<<A:32, B:32, C:32>> = crypto:strong_rand_bytes(12),
Seed = {A, B, C},
random:seed(Seed),
Seed;
Seed ->
Seed
end.
%% ioq_server2 Tests
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
mock_server() ->
mock_server([]).
mock_server(Config) ->
meck:new(config),
meck:expect(config, get, fun(Group) ->
couch_util:get_value(Group, Config, [])
end),
meck:expect(config, get, fun(_,_) ->
undefined
end),
meck:expect(config, get, fun("ioq2", _, Default) ->
Default
end),
meck:expect(config, get_integer, fun("ioq2", _, Default) ->
Default
end),
meck:expect(config, get_boolean, fun("ioq2", _, Default) ->
Default
end),
{ok, State} = ioq_server2:init([?SERVER_ID(1), 1]),
State.
unmock_server(_) ->
true = meck:validate(config),
ok = meck:unload(config).
empty_config_test_() ->
{
"Empty config tests",
{
foreach,
fun mock_server/0,
fun unmock_server/1,
[
fun test_basic_server_config/1,
fun test_simple_request_priority/1,
fun test_simple_dedupe/1,
fun test_io_error/1
]
}
}.
simple_config_test_() ->
{
"Simple config tests",
{
foreach,
fun() ->
Config = [
{"ioq2.classes", [{"db_compact", "0.9"}]},
{"ioq2", [{"resize_limit", "10"}]}
],
mock_server(Config)
end,
fun unmock_server/1,
[
fun test_simple_config/1,
fun test_auto_scale/1
]
}
}.
priority_extremes_test_() ->
{
"Test min/max priorities",
{
foreach,
fun() ->
Config = [
{"ioq2.classes", [
{"db_compact", "9999999.0"},
{"interactive", "-0.00000000000000001"}
]}
],
mock_server(Config)
end,
fun unmock_server/1,
[
fun test_min_max_priorities/1
]
}
}.
queue_depths_test_() ->
Foo = <<"foo">>,
Bar = <<"bar">>,
Reqs = [
#ioq_request{user=Foo, class=db_compact},
#ioq_request{user=Bar, class=db_compact},
#ioq_request{user=Bar, class=view_compact},
#ioq_request{user=Foo, class=internal_repl},
#ioq_request{user=Bar, class=internal_repl},
#ioq_request{user=Bar, class=internal_repl},
#ioq_request{class=low},
#ioq_request{user=Foo, class=interactive},
#ioq_request{user=Foo, class=interactive},
#ioq_request{user=Foo, class=db_update},
#ioq_request{user=Foo, class=view_update},
#ioq_request{user=Foo, class=view_update},
#ioq_request{user=Foo, class=view_update},
#ioq_request{user=Foo, class=view_update},
#ioq_request{user=Bar, class=interactive},
#ioq_request{user=Bar, class=db_update},
#ioq_request{user=Bar, class=db_update},
#ioq_request{user=Bar, class=db_update},
#ioq_request{user=Bar, class=view_update}
],
Expected = [
{compaction, 3},
{replication, 3},
{low, 1},
{channels, {[
{<<"foo">>, [2,1,4]},
{<<"bar">>, [1,3,1]}
]}}
],
{
"Test queue depth stats",
?_assertEqual(
Expected,
get_queue_depths(Reqs)
)
}.
test_basic_server_config(St0) ->
{reply, RespState, _St1, 0} = handle_call(get_state, pid, St0),
[
?_assertEqual([], RespState#state.user_p),
?_assertEqual(
lists:sort(?DEFAULT_CLASS_PRIORITIES),
lists:sort(RespState#state.class_p)
),
?_assertEqual([], RespState#state.shard_p)
].
test_simple_request_priority(St0) ->
From = pid1,
Request0 = #ioq_request{class=db_compact},
Priority = prioritize_request(Request0, St0),
Request1 = Request0#ioq_request{
init_priority = Priority,
from = From,
key = St0#state.next_key
},
{noreply, St1, 0} = handle_call(Request0, From, St0),
{reply, RespState, _St2, 0} = handle_call(get_state, From, St1),
[
?_assertEqual(
[{Priority, Request1}],
RespState#state.queue
)
].
test_simple_dedupe(St0) ->
FromA = pid1,
FromB = pid2,
Fd = fd,
Pos = 1234,
Msg = {pread_iolist, Pos},
Request0 = #ioq_request{
class=db_compact,
fd = Fd,
msg = Msg
},
{ReqKey, St1} = waiter_key(Request0, St0),
Priority = prioritize_request(Request0, St1),
Request1A = Request0#ioq_request{
init_priority = Priority,
from = FromA,
key = {Fd, Pos}
},
_Request1B = Request0#ioq_request{
init_priority = Priority,
from = FromA,
key = {Fd, Pos}
},
{noreply, St2, 0} = handle_call(Request0, FromA, St1),
{noreply, St3, 0} = handle_call(Request0, FromB, St2),
{reply, RespState, _St4, 0} = handle_call(get_state, FromA, St3),
[
?_assertEqual(
[{Priority, Request1A}],
RespState#state.queue
),
?_assertEqual(
[{ReqKey, [FromB, FromA]}],
RespState#state.waiters
)
].
test_simple_config(St) ->
RequestA = #ioq_request{},
RequestB = #ioq_request{class=db_compact},
PriorityA = prioritize_request(RequestA, St),
PriorityB = prioritize_request(RequestB, St),
[
?_assertEqual(
1.0,
PriorityA
),
?_assertEqual(
0.9,
PriorityB
)
].
test_min_max_priorities(St) ->
RequestA = #ioq_request{class=interactive},
RequestB = #ioq_request{class=db_compact},
PriorityA = prioritize_request(RequestA, St),
PriorityB = prioritize_request(RequestB, St),
[
?_assertEqual(
0.0,
PriorityA
),
?_assertEqual(
?DEFAULT_MAX_PRIORITY,
PriorityB
)
].
test_auto_scale(#state{queue=HQ}=St0) ->
%% start with iterations=2 so we can tell when we auto-scaled
St1 = St0#state{resize_limit=10, iterations=2},
Pid = spawn(fun() -> receive baz -> ok end end),
T0 = os:timestamp(),
BaseReq = #ioq_request{t0=T0, fd=Pid},
RequestA = BaseReq#ioq_request{ref=make_ref()},
RequestB = BaseReq#ioq_request{class=db_compact, ref=make_ref()},
PriorityA = prioritize_request(RequestA, St1),
PriorityB = prioritize_request(RequestB, St1),
{noreply, St2, 0} = handle_call(RequestB, Pid, St1),
{noreply, St3, 0} = handle_call(RequestA, Pid, St2),
{_, #ioq_request{init_priority=PriorityA2}} = hqueue:extract_max(HQ),
Tests0 = [?_assertEqual(PriorityA, PriorityA2)],
{_St, Tests} = lists:foldl(
fun(_N, {#state{iterations=I, resize_limit=RL}=StN0, TestsN}) ->
ReqN = BaseReq#ioq_request{ref=make_ref()},
ExpectedPriority = case I == 1 of
false -> PriorityA;
true -> PriorityB
end,
{noreply, StN1, 0} = handle_call(ReqN, Pid, StN0),
StN2 = submit_request(ReqN, StN1),
{_, #ioq_request{init_priority=PriorityN}} = hqueue:extract_max(HQ),
{
StN2,
[?_assertEqual(ExpectedPriority, PriorityN) | TestsN]
}
end,
{St3, Tests0},
lists:seq(1, St3#state.resize_limit + 7)
),
lists:reverse(Tests).
all_test_() ->
{setup, fun setup/0, fun cleanup/1, fun instantiate/1}.
many_clients_test_() ->
FDCount = 50,
ClientCount = 10,
MaxDelay = 20,
{
setup,
fun() -> setup_many(FDCount, MaxDelay) end,
fun cleanup/1,
fun(Servers) -> test_many_clients(Servers, ClientCount) end
}.
setup() ->
meck:new(config, [passthrough]),
meck:expect(config, get_boolean,
fun
("ioq2", "enabled", _) ->
true;
("ioq2", "server_per_scheduler", _) ->
false;
(_, _, Default) ->
Default
end
),
{ok, _} = application:ensure_all_started(ioq),
FakeServer = fun(F) ->
receive {'$gen_call', {Pid, Ref}, Call} ->
Pid ! {Ref, {reply, Call}}
end,
F(F)
end,
spawn(fun() -> FakeServer(FakeServer) end).
setup_many(Count, RespDelay) ->
{ok, _} = application:ensure_all_started(ioq),
meck:new(config, [passthrough]),
meck:expect(config, get_boolean,
fun
("ioq2", "enabled", _) ->
true;
("ioq2", "server_per_scheduler", _) ->
false;
(_, _, Default) ->
Default
end
),
FakeServer = fun(F) ->
receive {'$gen_call', {Pid, Ref}, Call} ->
timer:sleep(random:uniform(RespDelay)),
Pid ! {Ref, {reply, Call}}
end,
F(F)
end,
[spawn(fun() -> FakeServer(FakeServer) end) || _ <- lists:seq(1, Count)].
cleanup(Server) when not is_list(Server) ->
cleanup([Server]);
cleanup(Servers) ->
ok = application:stop(ioq),
true = meck:validate(config),
ok = meck:unload(config),
[exit(Server, kill) || Server <- Servers].
instantiate(S) ->
Old = ?DEFAULT_CONCURRENCY * length(ioq_sup:get_ioq2_servers()),
[{inparallel, lists:map(fun(IOClass) ->
lists:map(fun(Shard) ->
check_call(S, make_ref(), priority(IOClass, Shard))
end, shards())
end, io_classes())},
?_assertEqual(Old, ioq:set_disk_concurrency(10)),
?_assertError(badarg, ioq:set_disk_concurrency(0)),
?_assertError(badarg, ioq:set_disk_concurrency(-1)),
?_assertError(badarg, ioq:set_disk_concurrency(foo))].
check_call(Server, Call, Priority) ->
?_assertEqual({reply, Call}, ioq_server2:call(Server, Call, Priority)).
io_classes() -> [interactive, view_update, db_compact, view_compact,
internal_repl, other, db_meta].
shards() ->
[
<<"shards/0-1/heroku/app928427/couchrest.1317609656.couch">>,
<<"shards/0-1/foo">>,
<<"shards/0-3/foo">>,
<<"shards/0-1/bar">>,
<<"shards/0-1/kocolosk/stats.1299297461.couch">>,
<<"shards/0-1/kocolosk/my/db.1299297457.couch">>,
other
].
priority(view_update, Shard) ->
{view_update, Shard, <<"_design/foo">>};
priority(Any, Shard) ->
{Any, Shard}.
test_many_clients(Servers, ClientCount) ->
ClientFun = fun() ->
ok = lists:foreach(fun(IOClass) ->
ok = lists:foreach(fun(Shard) ->
Server = random_server(Servers),
Ref = make_ref(),
Priority = priority(IOClass, Shard),
{reply, Ref} = ioq_server2:call(Server, Ref, Priority),
ok
end, shards())
end, io_classes()),
ok
end,
ok = lists:foreach(fun(_) -> spawn_monitor(ClientFun) end, lists:seq(1, ClientCount)),
Status = wait_for_success(ClientCount),
?_assert(Status).
wait_for_success(0) ->
true;
wait_for_success(Count) when Count > 0 ->
receive
{'DOWN', _Ref, process, _Pid, normal} ->
wait_for_success(Count - 1);
Msg ->
?debugFmt("UNEXPECTED CLIENT EXIT: ~p~n", [Msg]),
false
end.
random_server(Servers) ->
lists:nth(random:uniform(length(Servers)), Servers).
test_io_error(#state{waiters=Waiters, reqs=Reqs}=State) ->
Key = asdf,
Ref = make_ref(),
RefTag = make_ref(),
Req = #ioq_request{ref=Ref, key=Key},
khash:put(Waiters, Key, [{self(), RefTag}]),
khash:put(Reqs, Ref, Req),
Error = {exit, foo},
{noreply, _State1, 0} = handle_info({'DOWN', Ref, baz, zab, Error}, State),
Resp = receive
{RefTag, {'EXIT', Error}} ->
{ok, Error};
Else ->
{error, Else}
after 5000 ->
{error, timeout}
end,
?_assertEqual({ok, Error}, Resp).
-endif.