Add IOQ2 pid for search traffic
This adds a dedicated IOQ2 pid for search traffic.
It also reworks some of the dispatch logic.
diff --git a/include/ioq.hrl b/include/ioq.hrl
index ac41314..9e9bf67 100644
--- a/include/ioq.hrl
+++ b/include/ioq.hrl
@@ -11,13 +11,16 @@
% the License.
-define(DEFAULT_PRIORITY, 1.0).
+-define(DEFAULT_IOQ2_CONCURRENCY, 1).
-define(BAD_MAGIC_NUM, -12341234).
%% Dispatch Strategies
--define(DISPATCH_RANDOM, "random").
--define(DISPATCH_FD_HASH, "fd_hash").
--define(DISPATCH_SINGLE_SERVER, "single_server").
--define(DISPATCH_SERVER_PER_SCHEDULER, "server_per_scheduler").
+-define(DISPATCH_RANDOM, random).
+-define(DISPATCH_FD_HASH, fd_hash).
+-define(DISPATCH_SINGLE_SERVER, single_server).
+-define(DISPATCH_SERVER_PER_SCHEDULER, server_per_scheduler).
+
+-define(IOQ2_SEARCH_SERVER, ioq_server_search).
%% Config Categories
-define(SHARD_CLASS_SEPARATOR, "||").
diff --git a/src/ioq.erl b/src/ioq.erl
index 5daaebd..f6be488 100644
--- a/src/ioq.erl
+++ b/src/ioq.erl
@@ -11,9 +11,9 @@
% the License.
-module(ioq).
--export([start/0, stop/0, call/3, call/4, set_disk_concurrency/1,
- get_disk_queues/0, get_osproc_queues/0, get_osproc_requests/0,
- get_disk_counters/0, get_disk_concurrency/0]).
+-export([start/0, stop/0, call/3, call/4, call_search/3,
+ set_disk_concurrency/1, get_disk_queues/0, get_osproc_queues/0,
+ get_osproc_requests/0, get_disk_counters/0, get_disk_concurrency/0]).
-export([
ioq2_enabled/0
]).
@@ -52,6 +52,12 @@
true -> ioq_server2:call(Fd, Msg, Priority)
end.
+call_search(Fd, Msg, Priority) ->
+ case ioq2_enabled() of
+ false -> ioq_server:call(Fd, Msg, Priority);
+ true -> ioq_server2:call_search(Fd, Msg, Priority)
+ end.
+
set_disk_concurrency(C) when is_integer(C), C > 0 ->
case ioq2_enabled() of
false -> gen_server:call(ioq_server, {set_concurrency, C});
diff --git a/src/ioq_config.erl b/src/ioq_config.erl
index 6b324e3..3dfe5e4 100644
--- a/src/ioq_config.erl
+++ b/src/ioq_config.erl
@@ -50,6 +50,7 @@
set_scale_factor/2,
set_resize_limit/2,
set_concurrency/2,
+ get_dispatch_strategy/0,
set_dispatch_strategy/2
]).
@@ -87,6 +88,15 @@
set_config(?IOQ2_CONFIG, "concurrency", integer_to_list(Value), Reason).
+get_dispatch_strategy() ->
+ case config:get("ioq2", "dispatch_strategy", undefined) of
+ undefined -> ?DISPATCH_SERVER_PER_SCHEDULER;
+ DispatchStrategy0 -> list_to_atom(DispatchStrategy0)
+ end.
+
+
+set_dispatch_strategy(Value, Reason) when is_list(Value) ->
+ set_dispatch_strategy(list_to_existing_atom(Value), Reason);
set_dispatch_strategy(Value, Reason) ->
ErrorMsg = "Dispatch strategy must be one of "
"random, fd_hash, server_per_scheduler, or single_server.",
@@ -97,7 +107,7 @@
?DISPATCH_SERVER_PER_SCHEDULER -> ok;
_ -> throw({badarg, ErrorMsg})
end,
- config:set(?IOQ2_CONFIG, "dispatch_strategy", Value, Reason).
+ config:set(?IOQ2_CONFIG, "dispatch_strategy", atom_to_list(Value), Reason).
set_db_config(DbName, Class, Value, Reason) when is_binary(DbName) ->
diff --git a/src/ioq_config_listener.erl b/src/ioq_config_listener.erl
index fc10880..b6cc670 100644
--- a/src/ioq_config_listener.erl
+++ b/src/ioq_config_listener.erl
@@ -24,11 +24,16 @@
handle_config_terminate/3
]).
+-include_lib("ioq/include/ioq.hrl").
+
subscribe() ->
config:listen_for_changes(?MODULE, nil).
handle_config_change("ioq", _Key, _Val, _Persist, St) ->
- ok = notify_ioq_pids(),
+ ok = notify_ioq_pid(ioq_server),
+ {ok, St};
+handle_config_change("ioq2.search", _Key, _Val, _Persist, St) ->
+ ok = notify_ioq_pid(?IOQ2_SEARCH_SERVER),
{ok, St};
handle_config_change("ioq2", _Key, _Val, _Persist, St) ->
ok = notify_ioq_pids(),
@@ -49,6 +54,7 @@
end).
notify_ioq_pids() ->
- ok = lists:foreach(fun(Pid) ->
- gen_server:cast(Pid, update_config)
- end, ioq_sup:get_ioq2_servers()).
+ ok = lists:foreach(fun notify_ioq_pid/1, ioq_sup:get_ioq2_servers()).
+
+notify_ioq_pid(Pid) ->
+ gen_server:cast(Pid, update_config).
diff --git a/src/ioq_server2.erl b/src/ioq_server2.erl
index 8d62a92..ea50346 100644
--- a/src/ioq_server2.erl
+++ b/src/ioq_server2.erl
@@ -25,6 +25,7 @@
-export([
start_link/3,
call/3,
+ call_search/3,
pcall/1,
pcall/2
]).
@@ -44,7 +45,6 @@
-define(DEFAULT_RESIZE_LIMIT, 1000).
--define(DEFAULT_CONCURRENCY, 1).
-define(DEFAULT_SCALE_FACTOR, 2.0).
-define(DEFAULT_MAX_PRIORITY, 10000.0).
@@ -53,7 +53,7 @@
reqs :: khash:khash(),
waiters :: khash:khash(),
queue :: hqueue:hqueue(),
- concurrency = ?DEFAULT_CONCURRENCY :: pos_integer(),
+ concurrency = ?DEFAULT_IOQ2_CONCURRENCY :: pos_integer(),
iterations = 0 :: non_neg_integer(),
class_p :: khash:khash(), %% class priorities
user_p :: khash:khash(), %% user priorities
@@ -93,6 +93,15 @@
-spec call(pid(), term(), io_dimensions()) -> term().
call(Fd, Msg, Dimensions) ->
+ call_int(Fd, Msg, Dimensions, normal).
+
+
+-spec call_search(pid(), term(), io_dimensions()) -> term().
+call_search(Fd, Msg, Dimensions) ->
+ call_int(Fd, Msg, Dimensions, search).
+
+
+call_int(Fd, Msg, Dimensions, IOType) ->
Req0 = #ioq_request{
fd = Fd,
msg = Msg,
@@ -108,26 +117,30 @@
[couchdb, io_queue2, RW, bypassed_count]),
gen_server:call(Fd, Msg, infinity);
_ ->
- DispatchStrategy = config:get(
- "ioq2", "dispatch_strategy", ?DISPATCH_SERVER_PER_SCHEDULER),
- Server = case DispatchStrategy of
- ?DISPATCH_RANDOM ->
- SID = rand:uniform(erlang:system_info(schedulers)),
- ?SERVER_ID(SID);
- ?DISPATCH_FD_HASH ->
- NumSchedulers = erlang:system_info(schedulers),
- SID = 1 + (erlang:phash2(Fd) rem NumSchedulers),
- ?SERVER_ID(SID);
- ?DISPATCH_SINGLE_SERVER ->
- ?SERVER_ID(1);
- _ ->
- SID = erlang:system_info(scheduler_id),
- ?SERVER_ID(SID)
- end,
+ Server = ioq_server(Req, IOType),
gen_server:call(Server, Req, infinity)
end.
+ioq_server(#ioq_request{}, search) ->
+ ?IOQ2_SEARCH_SERVER;
+ioq_server(#ioq_request{fd=Fd}, _) ->
+ case ioq_config:get_dispatch_strategy() of
+ ?DISPATCH_RANDOM ->
+ SID = rand:uniform(erlang:system_info(schedulers)),
+ ?SERVER_ID(SID);
+ ?DISPATCH_FD_HASH ->
+ NumSchedulers = erlang:system_info(schedulers),
+ SID = 1 + (erlang:phash2(Fd) rem NumSchedulers),
+ ?SERVER_ID(SID);
+ ?DISPATCH_SINGLE_SERVER ->
+ ?SERVER_ID(1);
+ _ ->
+ SID = erlang:system_info(scheduler_id),
+ ?SERVER_ID(SID)
+ end.
+
+
-spec pcall(any()) -> any().
pcall(Msg) ->
pcall(Msg, 500).
@@ -368,17 +381,21 @@
-spec update_config_int(state()) -> state().
update_config_int(State) ->
- Concurrency = config:get_integer("ioq2", "concurrency", ?DEFAULT_CONCURRENCY),
- ResizeLimit = config:get_integer("ioq2", "resize_limit", ?DEFAULT_RESIZE_LIMIT),
- DeDupe = config:get_boolean("ioq2", "dedupe", true),
+ Category = case State#state.scheduler_id of
+ search -> "ioq2.search";
+ _ -> "ioq2"
+ end,
+ Concurrency = config:get_integer(Category, "concurrency", ?DEFAULT_IOQ2_CONCURRENCY),
+ ResizeLimit = config:get_integer(Category, "resize_limit", ?DEFAULT_RESIZE_LIMIT),
+ DeDupe = config:get_boolean(Category, "dedupe", true),
ScaleFactor = ioq_config:to_float(
- config:get("ioq2", "scale_factor"),
+ config:get(Category, "scale_factor"),
?DEFAULT_SCALE_FACTOR
),
MaxPriority = ioq_config:to_float(
- config:get("ioq2", "max_priority"),
+ config:get(Category, "max_priority"),
?DEFAULT_MAX_PRIORITY
),
@@ -999,16 +1016,16 @@
instantiate(S) ->
- Old = ?DEFAULT_CONCURRENCY * length(ioq_sup:get_ioq2_servers()),
+ Old = ?DEFAULT_IOQ2_CONCURRENCY * length(ioq_sup:get_ioq2_servers()),
[{inparallel, lists:map(fun(IOClass) ->
lists:map(fun(Shard) ->
check_call(S, make_ref(), priority(IOClass, Shard))
end, shards())
end, io_classes())},
- ?_assertEqual(Old, ioq:set_disk_concurrency(10)),
- ?_assertError(badarg, ioq:set_disk_concurrency(0)),
- ?_assertError(badarg, ioq:set_disk_concurrency(-1)),
- ?_assertError(badarg, ioq:set_disk_concurrency(foo))].
+ ?_assertEqual(Old, ?MODULE:set_concurrency(10)),
+ ?_assertError(badarg, ?MODULE:set_concurrency(0)),
+ ?_assertError(badarg, ?MODULE:set_concurrency(-1)),
+ ?_assertError(badarg, ?MODULE:set_concurrency(foo))].
check_call(Server, Call, Priority) ->
diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl
index 8bfda77..188abc5 100644
--- a/src/ioq_sup.erl
+++ b/src/ioq_sup.erl
@@ -15,10 +15,12 @@
-vsn(1).
-behaviour(config_listener).
-export([start_link/0, init/1]).
--export([get_ioq2_servers/0]).
+-export([get_ioq2_servers/0, get_all_ioq2_servers/0]).
-export([handle_config_change/5, handle_config_terminate/3]).
-export([processes/1]).
+-include_lib("ioq/include/ioq.hrl").
+
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
-define(CHILD_WITH_ARGS(I, Type, Args), {I, {I, start_link, Args}, permanent, 5000, Type, [I]}).
@@ -40,8 +42,10 @@
}}.
ioq_server2_children() ->
+ Name = ?IOQ2_SEARCH_SERVER,
+ Search = {Name, {ioq_server2, start_link, [Name, search, false]}, permanent, 5000, worker, [Name]},
Bind = config:get_boolean("ioq2", "bind_to_schedulers", false),
- ioq_server2_children(erlang:system_info(schedulers), Bind).
+ [Search | ioq_server2_children(erlang:system_info(schedulers), Bind)].
ioq_server2_children(Count, Bind) ->
lists:map(fun(I) ->
@@ -54,9 +58,15 @@
list_to_atom("ioq_server_" ++ integer_to_list(I))
end, lists:seq(1, erlang:system_info(schedulers))).
+get_all_ioq2_servers() ->
+ [?IOQ2_SEARCH_SERVER | get_ioq2_servers()].
+
handle_config_change("ioq", _Key, _Val, _Persist, St) ->
gen_server:cast(ioq_server, update_config),
{ok, St};
+handle_config_change("ioq2.search", _Key, _Val, _Persist, St) ->
+ gen_server:cast(?IOQ2_SEARCH_SERVER, update_config),
+ {ok, St};
handle_config_change("ioq2" ++ _, _Key, _Val, _Persist, St) ->
lists:foreach(fun({_Id, Pid}) ->
gen_server:cast(Pid, update_config)
diff --git a/test/ioq_config_tests.erl b/test/ioq_config_tests.erl
index e81f45f..97b423f 100644
--- a/test/ioq_config_tests.erl
+++ b/test/ioq_config_tests.erl
@@ -231,7 +231,8 @@
{set_scale_factor, 3.14, "3.14"},
{set_max_priority, 99999.99, "99999.99"},
{set_enabled, true, "true"},
- {set_dispatch_strategy, ?DISPATCH_FD_HASH, ?DISPATCH_FD_HASH}
+ {set_dispatch_strategy,
+ ?DISPATCH_FD_HASH, atom_to_list(?DISPATCH_FD_HASH)}
],
Reason = "ioq_config_tests",
diff --git a/test/ioq_tests.erl b/test/ioq_tests.erl
index b6b7bad..70eefc2 100644
--- a/test/ioq_tests.erl
+++ b/test/ioq_tests.erl
@@ -14,6 +14,7 @@
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
+-include_lib("ioq/include/ioq.hrl").
all_test_() ->
{setup, fun setup/0, fun cleanup/1, fun instantiate/1}.
@@ -35,12 +36,18 @@
exit(Server, kill).
instantiate({_, S}) ->
+ Old = case ioq:ioq2_enabled() of
+ true ->
+ ?DEFAULT_IOQ2_CONCURRENCY * length(ioq_sup:get_ioq2_servers());
+ false ->
+ 20
+ end,
[{inparallel, lists:map(fun(IOClass) ->
lists:map(fun(Shard) ->
check_call(S, make_ref(), priority(IOClass, Shard))
end, shards())
end, io_classes())},
- ?_assertEqual(20, ioq:set_disk_concurrency(10)),
+ ?_assertEqual(Old, ioq:set_disk_concurrency(10)),
?_assertError(badarg, ioq:set_disk_concurrency(0)),
?_assertError(badarg, ioq:set_disk_concurrency(-1)),
?_assertError(badarg, ioq:set_disk_concurrency(foo))].
@@ -49,7 +56,7 @@
?_assertEqual({reply, Call}, ioq:call(Server, Call, Priority)).
io_classes() -> [interactive, view_update, db_compact, view_compact,
- internal_repl, other].
+ internal_repl, other, search, system].
shards() ->
[