Rejigger the governor implementation

Previously we'd spawn a number of processes to send messages to
'noconnect' / 'nosuspend' nodes.  Now we're buffering the messages that
need to be sent directly in each governor and sending them one at a
time.  This prevents the net_kernel from tipping over in the noconnect
case.  We also decide whether to drop messages based on memory
consumption in the node instead of process limits (since we're not
spawning anymore).

BugzID: 23717
BugzID: 23718
diff --git a/src/rexi_governor.erl b/src/rexi_governor.erl
index 876165d..fdf8c93 100644
--- a/src/rexi_governor.erl
+++ b/src/rexi_governor.erl
@@ -18,54 +18,67 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
--record(state, {pids = ets:new(pids, [set]),
-                spawn_max = 10000,
-                spawn_cnt = 0,
-                drop_cnt = 0}).
+-export ([
+    send/2,
+    start_link/1
+]).
 
-init([PidSpawnMax]) ->
-    {ok, #state{spawn_max = PidSpawnMax}}.
+-record(state, {
+    buffer = queue:new(),
+    count = 0
+}).
 
-handle_call(_Request, _From, State) ->
-    Reply = ok,
-    {reply, Reply, State}.
+%% TODO Leverage os_mon to discover available memory in the system
+-define (MAX_MEMORY, 17179869184).
 
-handle_cast({spawn_and_track, Dest, Msg},
-            #state{pids = Pids,
-                   spawn_max = SpawnMax,
-                   spawn_cnt = SC,
-                   drop_cnt = DC} = State) ->
-    {NewSC, NewDC} =
-    case ets:info(Pids, size) < SpawnMax of
+start_link(ServerId) ->
+    gen_server:start_link({local, ServerId}, ?MODULE, nil, []).
+
+send(Dest, Msg) ->
+    Server = list_to_atom(lists:concat([rexi_governor, "_", get_node(Dest)])),
+    gen_server:cast(Server, {deliver, Dest, Msg}).
+
+
+init(_) ->
+    {ok, #state{}}.
+
+handle_call(get_buffered_count, _From, State) ->
+    {reply, State#state.count, State, 0}.
+
+handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) ->
+    margaret_counter:increment([erlang, rexi, buffered]),
+    Q2 = queue:in({Dest, Msg}, Q),
+    case should_drop() of
     true ->
-        {Pid, Ref} = spawn_monitor(erlang, send, [Dest, Msg]),
-        ets:insert(Pids, {Pid, Ref}),
-        {SC + 1, DC};
+            {noreply, State#state{buffer = queue:drop(Q2)}, 0};
     false ->
-        % drop message on floor
-        {SC, DC + 1}
+            {noreply, State#state{buffer = Q2, count = C+1}, 0}
+    end.
+
+handle_info(timeout, State) ->
+    #state{buffer = Q, count = C} = State,
+    case queue:out_r(Q) of
+        {{value, {Dest, Msg}}, Q2} ->
+            erlang:send(Dest, Msg);
+        {empty, Q2} ->
+            ok
     end,
-    {noreply, State#state{spawn_cnt = NewSC, drop_cnt = NewDC}};
-
-handle_cast(nodeout, #state{pids = Pids} = State) ->
-    % kill all the pids
-    ets:foldl(fun({P, _Ref}, Acc) ->
-                  exit(P, kill),
-                  Acc
-              end, [], Pids),
-    ets:delete_all_objects(Pids),
-    {noreply, State}.
-
-handle_info({'DOWN', _, process, Pid, normal},
-            #state{pids = Pids} = State) ->
-    ets:delete(Pids, Pid),
-    {noreply, State};
-
-handle_info({'DOWN', _, process, _Pid, killed}, State) ->
-    {noreply, State}.
+    if C > 1 ->
+        {noreply, State#state{buffer = Q2, count = C-1}, 0};
+    true ->
+        {noreply, State#state{buffer = Q2, count = 0}}
+    end.
 
 terminate(_Reason, _State) ->
     ok.
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
+
+should_drop() ->
+    erlang:memory(total) > ?MAX_MEMORY.
+
+get_node({_, Node}) when is_atom(Node) ->
+    Node;
+get_node(Pid) when is_pid(Pid) ->
+    node(Pid).
diff --git a/src/rexi_utils.erl b/src/rexi_utils.erl
index 6e03757..3c89ca9 100644
--- a/src/rexi_utils.erl
+++ b/src/rexi_utils.erl
@@ -29,7 +29,13 @@
 
 %% @doc send a message as quickly as possible
 send(Dest, Msg) ->
-    rexi_gov_manager:send(Dest, Msg).
+    case erlang:send(Dest, Msg, [noconnect, nosuspend]) of
+    ok ->
+        ok;
+    _ ->
+        % treat nosuspend and noconnect the same
+        rexi_governor:send(Dest, Msg)
+    end.
 
 %% @doc set up the receive loop with an overall timeout
 -spec recv([any()], integer(), function(), any(), timeout(), timeout()) ->