Merge pull request #8 from cloudant/update_handle_config_terminate

Reconfigure IOQ on config update
diff --git a/src/ioq_sup.erl b/src/ioq_sup.erl
index 7ea6284..3e9a494 100644
--- a/src/ioq_sup.erl
+++ b/src/ioq_sup.erl
@@ -12,11 +12,16 @@
 
 -module(ioq_sup).
 -behaviour(supervisor).
+-vsn(1).
+-behaviour(config_listener).
 -export([start_link/0, init/1]).
 -export([get_ioq2_servers/0]).
+-export([handle_config_change/5, handle_config_terminate/3]).
+-export([processes/1]).
 
 %% Helper macro for declaring children of supervisor
 -define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+-define(CHILD_WITH_ARGS(I, Type, Args), {I, {I, start_link, Args}, permanent, 5000, Type, [I]}).
 
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
@@ -27,6 +32,7 @@
     {ok, {
         {one_for_one, 5, 10},
         [
+            ?CHILD_WITH_ARGS(config_listener_mon, worker, [?MODULE, nil]),
             ?CHILD(ioq_server, worker),
             ?CHILD(ioq_osq, worker)
             | IOQ2Children
@@ -47,3 +53,42 @@
     lists:map(fun(I) ->
         list_to_atom("ioq_server_" ++ integer_to_list(I))
     end, lists:seq(1, erlang:system_info(schedulers))).
+
+handle_config_change("ioq", _Key, _Val, _Persist, St) ->
+    gen_server:cast(ioq_server, update_config),
+    {ok, St};
+handle_config_change("ioq2" ++ _, _Key, _Val, _Persist, St) ->
+    lists:foreach(fun({_Id, Pid}) ->
+        gen_server:call(Pid, update_config)
+    end, processes(ioq2)),
+    {ok, St};
+handle_config_change(_Sec, _Key, _Val, _Persist, St) ->
+    {ok, St}.
+
+handle_config_terminate(_Server, _Reason, _State) ->
+    gen_server:cast(ioq_server, update_config),
+    spawn(fun() ->
+        lists:foreach(fun({_Id, Pid}) ->
+            gen_server:call(Pid, update_config)
+        end, processes(ioq2))
+    end),
+    ok.
+
+processes(ioq2) ->
+    filter_children("^ioq_server_.*$");
+processes(ioq) ->
+    filter_children("^ioq_server$");
+processes(config_listener_mon) ->
+    filter_children("^config_listener_mon$");
+processes(Arg) ->
+    {error, [
+        {expected_one_of, [ioq, ioq2, config_listener_mon]},
+        {got, Arg}]}.
+
+filter_children(RegExp) ->
+    lists:filtermap(fun({Id, P, _, _}) ->
+        case re:run(atom_to_list(Id), RegExp) of
+            {match, _} -> {true, {Id, P}};
+            _ -> false
+        end
+    end, supervisor:which_children(?MODULE)).
diff --git a/test/ioq_config_tests.erl b/test/ioq_config_tests.erl
index d3fee81..2a5ccc3 100644
--- a/test/ioq_config_tests.erl
+++ b/test/ioq_config_tests.erl
@@ -30,6 +30,111 @@
     }
 ]).
 
+config_update_test_() ->
+    {
+        "Test config updates",
+        {
+            foreach,
+            fun() -> test_util:start_applications([config, ioq]) end,
+            fun test_util:stop_applications/1,
+            [
+                fun t_restart_config_listener/1,
+                fun t_update_ioq_config/1,
+                fun t_update_ioq2_config/1,
+                fun t_update_ioq_config_on_listener_restart/1,
+                fun t_update_ioq2_config_on_listener_restart/1
+            ]
+        }
+}.
+
+t_restart_config_listener(_) ->
+    ?_test(begin
+        [{_, ConfigMonitor}] = ioq_sup:processes(config_listener_mon),
+        ?assert(is_process_alive(ConfigMonitor)),
+        test_util:stop_sync(ConfigMonitor),
+        ?assertNot(is_process_alive(ConfigMonitor)),
+        NewConfigMonitor = test_util:wait(fun() ->
+            case ioq_sup:processes(config_listener_mon) of
+                [] -> wait;
+                [{_, Pid}] -> Pid
+            end
+        end),
+        ?assert(is_process_alive(NewConfigMonitor))
+    end).
+
+t_update_ioq_config(_) ->
+    ?_test(begin
+        [{_, IoqServer}] = ioq_sup:processes(ioq),
+        gen_server:call(IoqServer, {set_concurrency, 10}),
+        ?assertEqual(10, gen_server:call(IoqServer, get_concurrency)),
+        ?assert(is_process_alive(IoqServer)),
+        config:set("ioq", "concurrency", "200", false),
+        ?assertNotEqual(timeout, test_util:wait(fun() ->
+            case gen_server:call(IoqServer, get_concurrency) of
+                200 -> 200;
+                _ -> wait
+            end
+        end)),
+        ?assert(is_process_alive(IoqServer))
+    end).
+
+t_update_ioq_config_on_listener_restart(_) ->
+    ?_test(begin
+        [{_, IoqServer}] = ioq_sup:processes(ioq),
+        DefaultConcurrency = gen_server:call(IoqServer, get_concurrency),
+        gen_server:call(IoqServer, {set_concurrency, 10}),
+        ?assertEqual(10, gen_server:call(IoqServer, get_concurrency)),
+        ?assert(is_process_alive(IoqServer)),
+
+        [{_, ConfigMonitor}] = ioq_sup:processes(config_listener_mon),
+        ?assert(is_process_alive(ConfigMonitor)),
+        test_util:stop_sync(ConfigMonitor),
+
+        ?assertNotEqual(timeout, test_util:wait(fun() ->
+            case gen_server:call(IoqServer, get_concurrency) of
+                DefaultConcurrency -> ok;
+                _ -> wait
+            end
+        end)),
+        ?assert(is_process_alive(IoqServer))
+    end).
+
+t_update_ioq2_config(_) ->
+    ?_test(begin
+        [{_, IoqServer} | _] = ioq_sup:processes(ioq2),
+        gen_server:call(IoqServer, {set_concurrency, 10}),
+        ?assertEqual(10, gen_server:call(IoqServer, get_concurrency)),
+        ?assert(is_process_alive(IoqServer)),
+        config:set("ioq2", "concurrency", "200", false),
+        ?assertNotEqual(timeout, test_util:wait(fun() ->
+            case gen_server:call(IoqServer, get_concurrency) of
+                200 -> 200;
+                _ -> wait
+            end
+        end)),
+        ?assert(is_process_alive(IoqServer))
+    end).
+
+t_update_ioq2_config_on_listener_restart(_) ->
+    ?_test(begin
+        [{_, IoqServer} | _] = ioq_sup:processes(ioq2),
+        DefaultConcurrency = gen_server:call(IoqServer, get_concurrency),
+        gen_server:call(IoqServer, {set_concurrency, 10}),
+        ?assertEqual(10, gen_server:call(IoqServer, get_concurrency)),
+        ?assert(is_process_alive(IoqServer)),
+
+        [{_, ConfigMonitor}] = ioq_sup:processes(config_listener_mon),
+        ?assert(is_process_alive(ConfigMonitor)),
+        test_util:stop_sync(ConfigMonitor),
+
+        ?assertNotEqual(timeout, test_util:wait(fun() ->
+            case gen_server:call(IoqServer, get_concurrency) of
+                DefaultConcurrency -> ok;
+                _ -> wait
+            end
+        end)),
+        ?assert(is_process_alive(IoqServer))
+    end).
 
 priorities_test_() ->
     {ok, ShardP} = ioq_config:build_shard_priorities(?SHARDS_CONFIG),