Merge pull request #7 from cloudant/15608-too-much-spawning2
Install a governor to limit the amount of spawning we'll do to send messages to an unresponsive node.
BugzID: 15608
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..4f2f040
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,8 @@
+# building
+ebin
+.eunit
+logs
+*.spec
+deps
+*.beam
+
diff --git a/rebar.config b/rebar.config
index 4af6b85..11cd96d 100644
--- a/rebar.config
+++ b/rebar.config
@@ -13,5 +13,7 @@
% the License.
{deps, [
+ {config, ".*", {git, "https://github.com/cloudant/config.git",
+ {tag, "0.1.0"}}},
{twig, ".*", {git, "https://github.com/cloudant/twig.git", {tag, "0.2.1"}}}
]}.
diff --git a/src/rexi.app.src b/src/rexi.app.src
index 75baa77..01fa503 100644
--- a/src/rexi.app.src
+++ b/src/rexi.app.src
@@ -2,6 +2,6 @@
{description, "Lightweight RPC server"},
{vsn, git},
{registered, [rexi_sup, rexi_server]},
- {applications, [kernel, stdlib]},
+ {applications, [kernel, stdlib, config]},
{mod, {rexi_app,[]}}
]}.
diff --git a/src/rexi_gov_manager.erl b/src/rexi_gov_manager.erl
new file mode 100644
index 0000000..0886351
--- /dev/null
+++ b/src/rexi_gov_manager.erl
@@ -0,0 +1,153 @@
+% Copyright 2012 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-module(rexi_gov_manager).
+
+-behaviour(gen_server).
+-behaviour(config_listener).
+
+% API
+-export([start_link/0, send/2]).
+
+% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+-export([handle_config_change/5]).
+
+-record(state, {node_timers = ets:new(timers, [set]),
+ nodeout_timeout = 2000,
+ pid_spawn_max = 10000}).
+
+
+% API
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+send({_, Node} = Dest, Msg) ->
+ case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
+ ok -> ok;
+ _ ->
+ % treat nosuspend and noconnect the same
+ {ok, Governor} = get_governor(Node),
+ gen_server:cast(Governor, {spawn_and_track, Dest, Msg})
+ end.
+
+get_governor(Node) ->
+ case ets:lookup(govs, Node) of
+ [{Node, Gov}] ->
+ {ok, Gov};
+ [] ->
+ gen_server:call(?MODULE, {get_governor, Node})
+ end.
+
+% gen_server callbacks
+
+init([]) ->
+ ets:new(govs, [named_table, set, {read_concurrency, true}]),
+ net_kernel:monitor_nodes(true),
+ NodeOutTimeout = config:get("rexi","nodeout_timeout","500"),
+ PidSpawnMax = config:get("rexi","pid_spawn_max", "10000"),
+ State = #state{
+ nodeout_timeout = list_to_integer(NodeOutTimeout),
+ pid_spawn_max = list_to_integer(PidSpawnMax)
+ },
+ config:listen_for_changes(?MODULE, State),
+ {ok, State}.
+
+handle_config_change("rexi", "nodeout_timeout", Value, _, State) ->
+ IntValue = list_to_integer(Value),
+ %% Setting the timeout is cheap, no need to check if it actually changed
+ gen_server:call(?MODULE, {set_timeout, IntValue}),
+ {ok, State#state{nodeout_timeout = IntValue}};
+handle_config_change("rexi", "pid_spawn_max", Value, _, State) ->
+ IntValue = list_to_integer(Value),
+ %% Setting the timeout is cheap, no need to check if it actually changed
+ gen_server:call(?MODULE, {set_spawn_max, IntValue}),
+ {ok, State#state{pid_spawn_max = IntValue}};
+handle_config_change(_, _, _, _, State) ->
+ {ok, State}.
+
+handle_call({set_timeout, TO}, _, #state{nodeout_timeout = Old} = State) ->
+ {reply, Old, State#state{nodeout_timeout = TO}};
+handle_call({set_spawn_max, Max}, _, #state{pid_spawn_max = Old} = State) ->
+ {reply, Old, State#state{pid_spawn_max = Max}};
+handle_call({get_governor, Node}, _From,
+ #state{pid_spawn_max = PidSpawnMax} = State) ->
+ case ets:lookup(govs, Node) of
+ [] ->
+ {ok, Gov} = gen_server:start_link(rexi_governor, [PidSpawnMax], []),
+ ets:insert(govs, {Node, Gov});
+ [{Node, Gov}] ->
+ Gov
+ end,
+ {reply, {ok, Gov}, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({nodeup, Node}, #state{node_timers = Timers,
+ pid_spawn_max = PidSpawnMax} = State) ->
+ case ets:lookup(Timers, Node) of
+ [{Node, TRef}] ->
+ erlang:cancel_timer(TRef),
+ ets:delete(Timers, Node);
+ _ ->
+ ok
+ end,
+ case ets:lookup(govs, Node) of
+ [{Node, _}] ->
+ ok;
+ [] ->
+ {ok, Gov} = gen_server:start_link(rexi_governor, [PidSpawnMax], []),
+ ets:insert(govs, {Node, Gov})
+ end,
+ {noreply, State};
+
+handle_info({nodedown, Node}, #state{node_timers = Timers,
+ nodeout_timeout = NodeTimeout} = State) ->
+ case ets:lookup(Timers, Node) of
+ [] ->
+ TRef = erlang:send_after(NodeTimeout, self(), {nodeout, Node}),
+ ets:insert(Timers, {Node, TRef}),
+ {noreply, State};
+ _ ->
+ {noreply, State}
+ end;
+
+handle_info({nodeout, Node}, #state{node_timers = Timers} = State) ->
+ % check for race with node up
+ case ets:member(Timers, Node) of
+ true ->
+ ets:delete(Timers, Node),
+ case ets:lookup(govs, Node) of
+ [] ->
+ ok;
+ [{Node, Governor}] ->
+ gen_server:cast(Governor, nodeout)
+ end;
+ false ->
+ ok
+ end,
+ {noreply, State};
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+% Internal functions
diff --git a/src/rexi_governor.erl b/src/rexi_governor.erl
new file mode 100644
index 0000000..9bbe7c9
--- /dev/null
+++ b/src/rexi_governor.erl
@@ -0,0 +1,74 @@
+% Copyright 2012 Cloudant
+%
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+-module(rexi_governor).
+
+-behaviour(gen_server).
+
+% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {pids = ets:new(pids, [set]),
+ spawn_max = 10000,
+ spawn_cnt = 0,
+ drop_cnt = 0}).
+
+init([PidSpawnMax]) ->
+ {ok, #state{spawn_max = PidSpawnMax}}.
+
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+handle_cast({spawn_and_track, Dest, Msg},
+ #state{pids = Pids,
+ spawn_max = SpawnMax,
+ spawn_cnt = SC,
+ drop_cnt = DC} = State) ->
+ {NewSC, NewDC} =
+ case ets:info(Pids, size) < SpawnMax of
+ true ->
+ {Pid, Ref} = spawn_monitor(erlang, send, [Dest, Msg]),
+ ets:insert(Pids, {Pid, Ref}),
+ margaret_counter:increment([erlang, rexi, spawned]),
+ {SC + 1, DC};
+ false ->
+ % drop message on floor
+ margaret_counter:increment([erlang, rexi, dropped]),
+ {SC, DC + 1}
+ end,
+ {noreply, State#state{spawn_cnt = NewSC, drop_cnt = NewDC}};
+
+handle_cast(nodeout, #state{pids = Pids} = State) ->
+ % kill all the pids
+ ets:foldl(fun({P, _Ref}, Acc) ->
+ exit(P, kill),
+ Acc
+ end, [], Pids),
+ ets:delete_all_objects(Pids),
+ {noreply, State}.
+
+handle_info({'DOWN', _, process, Pid, normal},
+ #state{pids = Pids} = State) ->
+ ets:delete(Pids, Pid),
+ {noreply, State};
+
+handle_info({'DOWN', _, process, _Pid, killed}, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/src/rexi_sup.erl b/src/rexi_sup.erl
index 828ee54..b0273fc 100644
--- a/src/rexi_sup.erl
+++ b/src/rexi_sup.erl
@@ -1,5 +1,5 @@
% Copyright 2010 Cloudant
-%
+%
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
@@ -20,10 +20,11 @@
-include_lib("eunit/include/eunit.hrl").
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 100, Type, [I]}).
+
start_link(Args) ->
supervisor:start_link({local,?MODULE}, ?MODULE, Args).
init([]) ->
- Mod = rexi_server,
- Spec = {Mod, {Mod,start_link,[]}, permanent, 100, worker, [Mod]},
- {ok, {{one_for_one, 3, 10}, [Spec]}}.
+ {ok, {{one_for_one, 3, 10}, [?CHILD(rexi_server, worker), ?CHILD(rexi_gov_manager, worker)]}}.
diff --git a/src/rexi_utils.erl b/src/rexi_utils.erl
index 721877e..9950823 100644
--- a/src/rexi_utils.erl
+++ b/src/rexi_utils.erl
@@ -4,14 +4,7 @@
%% @doc send a message as quickly as possible
send(Dest, Msg) ->
- case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
- noconnect ->
- spawn(erlang, send, [Dest, Msg]);
- nosuspend ->
- spawn(erlang, send, [Dest, Msg]);
- ok ->
- ok
- end.
+ rexi_gov_manager:send(Dest, Msg).
%% @doc set up the receive loop with an overall timeout
-spec recv([any()], integer(), function(), any(), timeout(), timeout()) ->