Manage Excessive amount of spawned pids

rexi_utils:send uses the noconnect and nosuspend options in order to immediately
return to the controller and spawns new processes to actually send the messages, assuming
the remote node is only temporarily down. In the nosuspend case these pids
can hang around indefinitely and build up.

This patch sends the messages that need to be sent from spawned processes to a new
gen_server, rexi_manager, which manages a group of rexi_governors, one for each node.
The governor does the spawning and monitoring of the pids and keeps track of how many are sent.
If a node down message is received the manager sets a timer which when expired tells the appropriate
governor to kill all the pids waiting on that node. A cap prevents spawning of processes above
a certain amount, after which messages are dropped on the floor.

BugzID: 15608
diff --git a/src/rexi_gov_manager.erl b/src/rexi_gov_manager.erl
new file mode 100644
index 0000000..14f73ed
--- /dev/null
+++ b/src/rexi_gov_manager.erl
@@ -0,0 +1,133 @@
+% Copyright 2012 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-module(rexi_gov_manager).
+
+-behaviour(gen_server).
+
+% API
+-export([start_link/0, send/2]).
+
+% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {node_timers = ets:new(timers, [set]),
+                nodeout_timeout = 2000,
+                pid_spawn_max = 10000}).
+
+
+% API
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+send({_, Node} = Dest, Msg) ->
+    case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
+    ok -> ok;
+    _ ->
+        % treat nosuspend and noconnect the same
+        {ok, Governor} = get_governor(Node),
+        gen_server:cast(Governor, {spawn_and_track, Dest, Msg})
+    end.
+
+get_governor(Node) ->
+    case ets:lookup(govs, Node) of
+    [{Node, Gov}] ->
+        {ok, Gov};
+    [] ->
+        gen_server:call(?MODULE, {get_governor, Node})
+    end.
+
+% gen_server callbacks
+
+init([]) ->
+    ets:new(govs, [named_table, set, {read_concurrency, true}]),
+    net_kernel:monitor_nodes(true),
+    %% if we install the new config app, the use of couch_config will go away
+    %%
+    %% NodeOutTimeout = list_to_integer(config:get("rexi","nodeout_timeout","500")),
+    %% PidSpawnMax = list_to_integer(config:get("rexi","pid_spawn_max", "10000")),
+    %% {ok, #state{nodeout_timeout = NodeOutTimeout,
+    %%             pid_spawn_max = PidSpawnMax}}
+    {ok, #state{}}.
+
+handle_call({get_governor, Node}, _From,
+            #state{pid_spawn_max = PidSpawnMax} = State) ->
+    case ets:lookup(govs, Node) of
+    [] ->
+        {ok, Gov} = gen_server:start_link(rexi_governor, [PidSpawnMax], []),
+        ets:insert(govs, {Node, Gov});
+    [{Node, Gov}] ->
+        Gov
+    end,
+    {reply, {ok, Gov}, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({nodeup, Node}, #state{node_timers = Timers,
+                                   pid_spawn_max = PidSpawnMax} = State) ->
+    case ets:lookup(Timers, Node) of
+    [{Node, TRef}] ->
+        erlang:cancel_timer(TRef),
+        ets:delete(Timers, Node);
+    _ ->
+        ok
+    end,
+    case ets:lookup(govs, Node) of
+    [{Node, _}] ->
+        ok;
+    [] ->
+        {ok, Gov} = gen_server:start_link(rexi_governor, [PidSpawnMax], []),
+        ets:insert(govs, {Node, Gov})
+    end,
+    {noreply, State};
+
+handle_info({nodedown, Node}, #state{node_timers = Timers,
+                                     nodeout_timeout = NodeTimeout} = State) ->
+    case ets:lookup(Timers, Node) of
+    [] ->
+        TRef = erlang:send_after(NodeTimeout, self(), {nodeout, Node}),
+        ets:insert(Timers, {Node, TRef}),
+        {noreply, State};
+    _ ->
+        {noreply, State}
+    end;
+
+handle_info({nodeout, Node}, #state{node_timers = Timers} = State) ->
+    % check for race with node up
+    case ets:member(Timers, Node) of
+    true ->
+        ets:delete(Timers, Node),
+        case ets:lookup(govs, Node) of
+        [] ->
+            ok;
+        [{Node, Governor}] ->
+            gen_server:cast(Governor, nodeout)
+        end;
+    false ->
+        ok
+    end,
+    {noreply, State};
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+% Internal functions
diff --git a/src/rexi_governor.erl b/src/rexi_governor.erl
new file mode 100644
index 0000000..9bbe7c9
--- /dev/null
+++ b/src/rexi_governor.erl
@@ -0,0 +1,74 @@
+% Copyright 2012 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-module(rexi_governor).
+
+-behaviour(gen_server).
+
+%  gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {pids = ets:new(pids, [set]),
+                spawn_max = 10000,
+                spawn_cnt = 0,
+                drop_cnt = 0}).
+
+init([PidSpawnMax]) ->
+    {ok, #state{spawn_max = PidSpawnMax}}.
+
+handle_call(_Request, _From, State) ->
+    Reply = ok,
+    {reply, Reply, State}.
+
+handle_cast({spawn_and_track, Dest, Msg},
+            #state{pids = Pids,
+                   spawn_max = SpawnMax,
+                   spawn_cnt = SC,
+                   drop_cnt = DC} = State) ->
+    {NewSC, NewDC} =
+    case ets:info(Pids, size) < SpawnMax of
+    true ->
+        {Pid, Ref} = spawn_monitor(erlang, send, [Dest, Msg]),
+        ets:insert(Pids, {Pid, Ref}),
+        margaret_counter:increment([erlang, rexi, spawned]),
+        {SC + 1, DC};
+    false ->
+        % drop message on floor
+        margaret_counter:increment([erlang, rexi, dropped]),
+        {SC, DC + 1}
+    end,
+    {noreply, State#state{spawn_cnt = NewSC, drop_cnt = NewDC}};
+
+handle_cast(nodeout, #state{pids = Pids} = State) ->
+    % kill all the pids
+    ets:foldl(fun({P, _Ref}, Acc) ->
+                  exit(P, kill),
+                  Acc
+              end, [], Pids),
+    ets:delete_all_objects(Pids),
+    {noreply, State}.
+
+handle_info({'DOWN', _, process, Pid, normal},
+            #state{pids = Pids} = State) ->
+    ets:delete(Pids, Pid),
+    {noreply, State};
+
+handle_info({'DOWN', _, process, _Pid, killed}, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
diff --git a/src/rexi_sup.erl b/src/rexi_sup.erl
index 9cb6676..b0273fc 100644
--- a/src/rexi_sup.erl
+++ b/src/rexi_sup.erl
@@ -20,10 +20,11 @@
 
 -include_lib("eunit/include/eunit.hrl").
 
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 100, Type, [I]}).
+
 start_link(Args) ->
     supervisor:start_link({local,?MODULE}, ?MODULE, Args).
 
 init([]) ->
-    Mod = rexi_server,
-    Spec = {Mod, {Mod,start_link,[]}, permanent, 100, worker, [Mod]},
-    {ok, {{one_for_one, 3, 10}, [Spec]}}.
+    {ok, {{one_for_one, 3, 10}, [?CHILD(rexi_server, worker), ?CHILD(rexi_gov_manager, worker)]}}.
diff --git a/src/rexi_utils.erl b/src/rexi_utils.erl
index 721877e..9950823 100644
--- a/src/rexi_utils.erl
+++ b/src/rexi_utils.erl
@@ -4,14 +4,7 @@
 
 %% @doc send a message as quickly as possible
 send(Dest, Msg) ->
-    case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
-    noconnect ->
-        spawn(erlang, send, [Dest, Msg]);
-    nosuspend ->
-        spawn(erlang, send, [Dest, Msg]);
-    ok ->
-        ok
-    end.
+    rexi_gov_manager:send(Dest, Msg).
 
 %% @doc set up the receive loop with an overall timeout
 -spec recv([any()], integer(), function(), any(), timeout(), timeout()) ->