Configure buffer limit by message count
This allows an operator to decide how large the buffers should be. It
also provides an escape valve to clear the buffer entirely.
diff --git a/src/rexi_buffer.erl b/src/rexi_buffer.erl
index 874ec3c..f75399c 100644
--- a/src/rexi_buffer.erl
+++ b/src/rexi_buffer.erl
@@ -29,9 +29,6 @@
count = 0
}).
-%% TODO Leverage os_mon to discover available memory in the system
--define (MAX_MEMORY, 17179869184).
-
start_link(ServerId) ->
gen_server:start_link({local, ServerId}, ?MODULE, nil, []).
@@ -41,7 +38,12 @@
init(_) ->
- {ok, #state{}}.
+ %% TODO Leverage os_mon to discover available memory in the system
+ Max = list_to_integer(config:get("rexi", "buffer_count", "2000")),
+ {ok, #state{max_count = Max}}.
+
+handle_call(erase_buffer, _From, State) ->
+ {reply, ok, State#state{buffer = queue:new(), count = 0}, 0};
handle_call(get_buffered_count, _From, State) ->
{reply, State#state.count, State, 0}.
@@ -49,7 +51,7 @@
handle_cast({deliver, Dest, Msg}, #state{buffer = Q, count = C} = State) ->
margaret_counter:increment([erlang, rexi, buffered]),
Q2 = queue:in({Dest, Msg}, Q),
- case should_drop() of
+ case should_drop(State) of
true ->
{noreply, State#state{buffer = queue:drop(Q2)}, 0};
false ->
@@ -94,11 +96,14 @@
terminate(_Reason, _State) ->
ok.
+code_change(_OldVsn, {state, Buffer, Sender, Count}, _Extra) ->
+ Max = list_to_integer(config:get("rexi", "buffer_count", "2000")),
+ {ok, #state{buffer=Buffer, sender=Sender, count=Count, max_count=Max}};
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-should_drop() ->
- erlang:memory(total) > ?MAX_MEMORY.
+should_drop(#state{count = Count, max_count = Max}) ->
+ Count >= Max.
get_node({_, Node}) when is_atom(Node) ->
Node;