[1/3] Start a rexi_server per remote node

This is part of a multi-release upgrade to switch to using a rexi_server
instance per remote node. The first commit introduces the new
rexi_server instances, the second switches to using the new instances
and the third will remove the singleton rexi_server. Each of these
commits should be in a separate release.

The new pattern introduced by this commit will start a 'rexi_server_%b'
process where the '%b' is replaced by the `erlang:phash2(Node)` for
which it will service requests. After the second commit is release each
rexi generated message will be sent to the rexi_server instance on the
remote node handling requests for the local node. The
`rexi_utils:server_pid/1` function is used to generate the id of the
remote server.
diff --git a/src/rexi.erl b/src/rexi.erl
index 8e53dba..a4c786d 100644
--- a/src/rexi.erl
+++ b/src/rexi.erl
@@ -15,7 +15,6 @@
 -export([cast/2, cast/3, cast/4, kill/2]).
 -export([reply/1, sync_reply/1, sync_reply/2]).
 -export([async_server_call/2, async_server_call/3]).
--export([get_errors/0, get_last_error/0, set_error_limit/1]).
 -export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]).
 
 -include_lib("rexi/include/rexi.hrl").
@@ -31,17 +30,6 @@
 restart() ->
     stop(), start().
 
--spec get_errors() -> {ok, [#error{}]}.
-get_errors() ->
-    gen_server:call(?SERVER, get_errors).
-
--spec get_last_error() -> {ok, #error{}} | {error, empty}.
-get_last_error() ->
-    gen_server:call(?SERVER, get_last_error).
-
--spec set_error_limit(pos_integer()) -> ok.
-set_error_limit(N) when is_integer(N), N > 0 ->
-    gen_server:call(?SERVER, {set_error_limit, N}).
 
 %% @equiv cast(Node, self(), MFA)
 -spec cast(node(), {atom(), atom(), list()}) -> reference().
diff --git a/src/rexi_server.erl b/src/rexi_server.erl
index b2f6de0..49661cc 100644
--- a/src/rexi_server.erl
+++ b/src/rexi_server.erl
@@ -15,7 +15,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
     code_change/3]).
 
--export([start_link/0, init_p/2, init_p/3]).
+-export([start_link/1, init_p/2, init_p/3]).
 
 -include_lib("rexi/include/rexi.hrl").
 -include_lib("eunit/include/eunit.hrl").
@@ -35,8 +35,8 @@
     error_count = 0
 }).
 
-start_link() ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(ServerId) ->
+    gen_server:start_link({local, ServerId}, ?MODULE, [], []).
 
 init([]) ->
     {ok, #st{}}.
diff --git a/src/rexi_server_mon.erl b/src/rexi_server_mon.erl
new file mode 100644
index 0000000..3545dba
--- /dev/null
+++ b/src/rexi_server_mon.erl
@@ -0,0 +1,123 @@
+% Copyright 2010 Cloudant
+% 
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(rexi_server_mon).
+-behaviour(gen_server).
+
+
+-export([
+    start_link/0,
+    status/0
+]).
+
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+
+-define(SUP_MODULE, rexi_server_sup).
+-define(CHILD_MODULE, rexi_server).
+-define(INTERVAL, 60000).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+status() ->
+    gen_server:call(?MODULE, status).
+
+
+init([]) ->
+    net_kernel:monitor_nodes(true),
+    erlang:send_after(?INTERVAL, self(), check_nodes),
+    {ok, nil}.
+
+
+terminate(_Reason, _St) ->
+    ok.
+
+
+handle_call(status, _From, St) ->
+    case missing_servers() of
+        [] ->
+            {reply, ok, St};
+        Missing ->
+            {reply, {waiting, length(Missing)}, St}
+    end;
+
+handle_call(Msg, _From, St) ->
+    twig:log(notice, "~s ignored_call ~w", [?MODULE, Msg]),
+    {reply, ignored, St}.
+
+
+handle_cast(Msg, St) ->
+    twig:log(notice, "~s ignored_cast ~w", [?MODULE, Msg]),
+    {noreply, St}.
+
+
+handle_info({nodeup, _}, St) ->
+    start_rexi_servers(),
+    {noreply, St};
+
+handle_info({nodedown, _}, St) ->
+    {noreply, St};
+
+handle_info(check_nodes, St) ->
+    start_rexi_servers(),
+    erlang:send_after(?INTERVAL, self(), check_nodes),
+    {noreply, St};
+
+handle_info(Msg, St) ->
+    twig:log(notice, "~s ignored_info ~w", [?MODULE, Msg]),
+    {noreply, St}.
+
+
+code_change(_OldVsn, St, _Extra) ->
+    {ok, St}.
+
+
+start_rexi_servers() ->
+    lists:foreach(fun(Id) ->
+        {ok, _} = start_rexi_server(Id)
+    end, missing_servers()).
+
+
+missing_servers() ->
+    ServerIds = [rexi_utils:server_id(Node) || Node <- [node() | nodes()]],
+    ChildIds = [Id || {Id, _, _, _} <- supervisor:which_children(?SUP_MODULE)],
+    ServerIds -- ChildIds.
+
+
+start_rexi_server(ChildId) ->
+    ChildSpec = {
+        ChildId,
+        {rexi_server, start_link, [ChildId]},
+        permanent,
+        brutal_kill,
+        worker,
+        [?CHILD_MODULE]
+    },
+    case supervisor:start_child(?SUP_MODULE, ChildSpec) of
+        {ok, Pid} ->
+            {ok, Pid};
+        Else ->
+            erlang:error(Else)
+    end.
diff --git a/src/rexi_server_sup.erl b/src/rexi_server_sup.erl
new file mode 100644
index 0000000..97b5b9e
--- /dev/null
+++ b/src/rexi_server_sup.erl
@@ -0,0 +1,29 @@
+% Copyright 2010 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(rexi_server_sup).
+-behaviour(supervisor).
+
+
+-export([init/1]).
+
+-export([start_link/0]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+    {ok, {{one_for_one, 1, 1}, []}}.
diff --git a/src/rexi_sup.erl b/src/rexi_sup.erl
index a8aa800..1c13eea 100644
--- a/src/rexi_sup.erl
+++ b/src/rexi_sup.erl
@@ -12,17 +12,45 @@
 
 -module(rexi_sup).
 -behaviour(supervisor).
--export([init/1]).
 
 -export([start_link/1]).
-
--include_lib("eunit/include/eunit.hrl").
-
-%% Helper macro for declaring children of supervisor
--define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 100, Type, [I]}).
+-export([init/1]).
 
 start_link(Args) ->
     supervisor:start_link({local,?MODULE}, ?MODULE, Args).
 
 init([]) ->
-    {ok, {{one_for_one, 3, 10}, [?CHILD(rexi_gov_manager, worker), ?CHILD(rexi_server, worker)]}}.
+    {ok, {{one_for_one, 3, 10}, [
+        {
+            rexi_gov_manager,
+            {rexi_gov_manager, start_link, []},
+            permanent,
+            100,
+            worker,
+            [rexi_gov_manager]
+        },
+        {
+            rexi_server,
+            {rexi_server, start_link, [rexi_server]},
+            permanent,
+            100,
+            worker,
+            [rexi_server]
+        },
+        {
+            rexi_server_sup,
+            {rexi_server_sup, start_link, []},
+            permanent,
+            100,
+            supervisor,
+            [rexi_server_sup]
+        },
+        {
+            rexi_server_mon,
+            {rexi_server_mon, start_link, []},
+            permanent,
+            100,
+            worker,
+            [rexi_server_mon]
+        }
+    ]}}.
diff --git a/src/rexi_utils.erl b/src/rexi_utils.erl
index 1b11576..8bc1190 100644
--- a/src/rexi_utils.erl
+++ b/src/rexi_utils.erl
@@ -12,7 +12,15 @@
 
 -module(rexi_utils).
 
--export([send/2, recv/6]).
+-export([server_id/1, server_pid/1, send/2, recv/6]).
+
+%% @doc Return a rexi_server id for the given node.
+server_id(Node) ->
+    list_to_atom("rexi_server_" ++ integer_to_list(erlang:phash2(Node))).
+
+%% @doc Return a {server_id(node()), Node} Pid name for the given Node.
+server_pid(Node) ->
+    {server_id(node()), Node}.
 
 %% @doc send a message as quickly as possible
 send(Dest, Msg) ->