Merge pull request #7 from cloudant/15608-too-much-spawning2

Install a governor to limit the amount of spawning we'll do to send messages to an unresponsive node.

BugzID: 15608
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..4f2f040
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,8 @@
+# building
+ebin
+.eunit
+logs
+*.spec
+deps
+*.beam
+
diff --git a/rebar.config b/rebar.config
index 4af6b85..11cd96d 100644
--- a/rebar.config
+++ b/rebar.config
@@ -13,5 +13,7 @@
 % the License.
 
 {deps, [
+    {config, ".*", {git, "https://github.com/cloudant/config.git",
+        {tag, "0.1.0"}}},
     {twig, ".*", {git, "https://github.com/cloudant/twig.git", {tag, "0.2.1"}}}
 ]}.
diff --git a/src/rexi.app.src b/src/rexi.app.src
index 75baa77..01fa503 100644
--- a/src/rexi.app.src
+++ b/src/rexi.app.src
@@ -2,6 +2,6 @@
     {description, "Lightweight RPC server"},
     {vsn, git},
     {registered, [rexi_sup, rexi_server]},
-    {applications, [kernel, stdlib]},
+    {applications, [kernel, stdlib, config]},
     {mod, {rexi_app,[]}}
 ]}.
diff --git a/src/rexi_gov_manager.erl b/src/rexi_gov_manager.erl
new file mode 100644
index 0000000..0886351
--- /dev/null
+++ b/src/rexi_gov_manager.erl
@@ -0,0 +1,153 @@
+% Copyright 2012 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-module(rexi_gov_manager).
+
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+% API
+-export([start_link/0, send/2]).
+
+% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+-export([handle_config_change/5]).
+
+-record(state, {node_timers = ets:new(timers, [set]),
+                nodeout_timeout = 2000,
+                pid_spawn_max = 10000}).
+
+
+% API
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+send({_, Node} = Dest, Msg) ->
+    case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
+    ok -> ok;
+    _ ->
+        % treat nosuspend and noconnect the same
+        {ok, Governor} = get_governor(Node),
+        gen_server:cast(Governor, {spawn_and_track, Dest, Msg})
+    end.
+
+get_governor(Node) ->
+    case ets:lookup(govs, Node) of
+    [{Node, Gov}] ->
+        {ok, Gov};
+    [] ->
+        gen_server:call(?MODULE, {get_governor, Node})
+    end.
+
+% gen_server callbacks
+
+init([]) ->
+    ets:new(govs, [named_table, set, {read_concurrency, true}]),
+    net_kernel:monitor_nodes(true),
+    NodeOutTimeout = config:get("rexi","nodeout_timeout","500"),
+    PidSpawnMax = config:get("rexi","pid_spawn_max", "10000"),
+    State = #state{
+        nodeout_timeout = list_to_integer(NodeOutTimeout),
+        pid_spawn_max = list_to_integer(PidSpawnMax)
+    },
+    config:listen_for_changes(?MODULE, State),
+    {ok, State}.
+
+handle_config_change("rexi", "nodeout_timeout", Value, _, State) ->
+    IntValue = list_to_integer(Value),
+    %% Setting the timeout is cheap, no need to check if it actually changed
+    gen_server:call(?MODULE, {set_timeout, IntValue}),
+    {ok, State#state{nodeout_timeout = IntValue}};
+handle_config_change("rexi", "pid_spawn_max", Value, _, State) ->
+    IntValue = list_to_integer(Value),
+    %% Setting the timeout is cheap, no need to check if it actually changed
+    gen_server:call(?MODULE, {set_spawn_max, IntValue}),
+    {ok, State#state{pid_spawn_max = IntValue}};
+handle_config_change(_, _, _, _, State) ->
+    {ok, State}.
+
+handle_call({set_timeout, TO}, _, #state{nodeout_timeout = Old} = State) ->
+    {reply, Old, State#state{nodeout_timeout = TO}};
+handle_call({set_spawn_max, Max}, _, #state{pid_spawn_max = Old} = State) ->
+    {reply, Old, State#state{pid_spawn_max = Max}};
+handle_call({get_governor, Node}, _From,
+            #state{pid_spawn_max = PidSpawnMax} = State) ->
+    case ets:lookup(govs, Node) of
+    [] ->
+        {ok, Gov} = gen_server:start_link(rexi_governor, [PidSpawnMax], []),
+        ets:insert(govs, {Node, Gov});
+    [{Node, Gov}] ->
+        Gov
+    end,
+    {reply, {ok, Gov}, State}.
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+handle_info({nodeup, Node}, #state{node_timers = Timers,
+                                   pid_spawn_max = PidSpawnMax} = State) ->
+    case ets:lookup(Timers, Node) of
+    [{Node, TRef}] ->
+        erlang:cancel_timer(TRef),
+        ets:delete(Timers, Node);
+    _ ->
+        ok
+    end,
+    case ets:lookup(govs, Node) of
+    [{Node, _}] ->
+        ok;
+    [] ->
+        {ok, Gov} = gen_server:start_link(rexi_governor, [PidSpawnMax], []),
+        ets:insert(govs, {Node, Gov})
+    end,
+    {noreply, State};
+
+handle_info({nodedown, Node}, #state{node_timers = Timers,
+                                     nodeout_timeout = NodeTimeout} = State) ->
+    case ets:lookup(Timers, Node) of
+    [] ->
+        TRef = erlang:send_after(NodeTimeout, self(), {nodeout, Node}),
+        ets:insert(Timers, {Node, TRef}),
+        {noreply, State};
+    _ ->
+        {noreply, State}
+    end;
+
+handle_info({nodeout, Node}, #state{node_timers = Timers} = State) ->
+    % check for race with node up
+    case ets:member(Timers, Node) of
+    true ->
+        ets:delete(Timers, Node),
+        case ets:lookup(govs, Node) of
+        [] ->
+            ok;
+        [{Node, Governor}] ->
+            gen_server:cast(Governor, nodeout)
+        end;
+    false ->
+        ok
+    end,
+    {noreply, State};
+
+handle_info(_Info, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+% Internal functions
diff --git a/src/rexi_governor.erl b/src/rexi_governor.erl
new file mode 100644
index 0000000..9bbe7c9
--- /dev/null
+++ b/src/rexi_governor.erl
@@ -0,0 +1,74 @@
+% Copyright 2012 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-module(rexi_governor).
+
+-behaviour(gen_server).
+
+%  gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+         terminate/2, code_change/3]).
+
+-record(state, {pids = ets:new(pids, [set]),
+                spawn_max = 10000,
+                spawn_cnt = 0,
+                drop_cnt = 0}).
+
+init([PidSpawnMax]) ->
+    {ok, #state{spawn_max = PidSpawnMax}}.
+
+handle_call(_Request, _From, State) ->
+    Reply = ok,
+    {reply, Reply, State}.
+
+handle_cast({spawn_and_track, Dest, Msg},
+            #state{pids = Pids,
+                   spawn_max = SpawnMax,
+                   spawn_cnt = SC,
+                   drop_cnt = DC} = State) ->
+    {NewSC, NewDC} =
+    case ets:info(Pids, size) < SpawnMax of
+    true ->
+        {Pid, Ref} = spawn_monitor(erlang, send, [Dest, Msg]),
+        ets:insert(Pids, {Pid, Ref}),
+        margaret_counter:increment([erlang, rexi, spawned]),
+        {SC + 1, DC};
+    false ->
+        % drop message on floor
+        margaret_counter:increment([erlang, rexi, dropped]),
+        {SC, DC + 1}
+    end,
+    {noreply, State#state{spawn_cnt = NewSC, drop_cnt = NewDC}};
+
+handle_cast(nodeout, #state{pids = Pids} = State) ->
+    % kill all the pids
+    ets:foldl(fun({P, _Ref}, Acc) ->
+                  exit(P, kill),
+                  Acc
+              end, [], Pids),
+    ets:delete_all_objects(Pids),
+    {noreply, State}.
+
+handle_info({'DOWN', _, process, Pid, normal},
+            #state{pids = Pids} = State) ->
+    ets:delete(Pids, Pid),
+    {noreply, State};
+
+handle_info({'DOWN', _, process, _Pid, killed}, State) ->
+    {noreply, State}.
+
+terminate(_Reason, _State) ->
+    ok.
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
diff --git a/src/rexi_sup.erl b/src/rexi_sup.erl
index 828ee54..b0273fc 100644
--- a/src/rexi_sup.erl
+++ b/src/rexi_sup.erl
@@ -1,5 +1,5 @@
 % Copyright 2010 Cloudant
-% 
+%
 % Licensed under the Apache License, Version 2.0 (the "License"); you may not
 % use this file except in compliance with the License. You may obtain a copy of
 % the License at
@@ -20,10 +20,11 @@
 
 -include_lib("eunit/include/eunit.hrl").
 
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 100, Type, [I]}).
+
 start_link(Args) ->
     supervisor:start_link({local,?MODULE}, ?MODULE, Args).
 
 init([]) ->
-    Mod = rexi_server,
-    Spec = {Mod, {Mod,start_link,[]}, permanent, 100, worker, [Mod]},
-    {ok, {{one_for_one, 3, 10}, [Spec]}}.
+    {ok, {{one_for_one, 3, 10}, [?CHILD(rexi_server, worker), ?CHILD(rexi_gov_manager, worker)]}}.
diff --git a/src/rexi_utils.erl b/src/rexi_utils.erl
index 721877e..9950823 100644
--- a/src/rexi_utils.erl
+++ b/src/rexi_utils.erl
@@ -4,14 +4,7 @@
 
 %% @doc send a message as quickly as possible
 send(Dest, Msg) ->
-    case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
-    noconnect ->
-        spawn(erlang, send, [Dest, Msg]);
-    nosuspend ->
-        spawn(erlang, send, [Dest, Msg]);
-    ok ->
-        ok
-    end.
+    rexi_gov_manager:send(Dest, Msg).
 
 %% @doc set up the receive loop with an overall timeout
 -spec recv([any()], integer(), function(), any(), timeout(), timeout()) ->