Merge pull request #4 from cloudant/13311-improve-view-back-pressure
Implement message stream interface
BugzID: 13311
BugzID: 14075
diff --git a/src/rexi.erl b/src/rexi.erl
index 93d33d7..1d47369 100644
--- a/src/rexi.erl
+++ b/src/rexi.erl
@@ -18,6 +18,7 @@
-export([reply/1, sync_reply/1, sync_reply/2]).
-export([async_server_call/2, async_server_call/3]).
-export([get_errors/0, get_last_error/0, set_error_limit/1]).
+-export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]).
-include("rexi.hrl").
@@ -127,6 +128,63 @@
timeout
end.
+%% @equiv stream(Msg, 100, 300000)
+stream(Msg) ->
+ stream(Msg, 100, 300000).
+
+%% @equiv stream(Msg, Limit, 300000)
+stream(Msg, Limit) ->
+ stream(Msg, Limit, 300000).
+
+%% @doc convenience function to stream messages to caller while blocking when
+%% a specific number of messages are outstanding. Message is of the form
+%% {OriginalRef, self(), Reply}, which enables the original caller to ack.
+-spec stream(any(), integer(), pos_integer() | infinity) -> any().
+stream(Msg, Limit, Timeout) ->
+ try maybe_wait(Limit, Timeout) of
+ {ok, Count} ->
+ put(rexi_unacked, Count+1),
+ {Caller, Ref} = get(rexi_from),
+ erlang:send(Caller, {Ref, self(), Msg}),
+ ok
+ catch throw:timeout ->
+ timeout
+ end.
+
+%% @equiv stream_ack(Client, 1)
+stream_ack(Client) ->
+ erlang:send(Client, {rexi_ack, 1}).
+
+%% @doc Ack streamed messages
+stream_ack(Client, N) ->
+ erlang:send(Client, {rexi_ack, N}).
+
%% internal functions %%
cast_msg(Msg) -> {'$gen_cast', Msg}.
+
+maybe_wait(Limit, Timeout) ->
+ case get(rexi_unacked) of
+ undefined ->
+ {ok, 0};
+ Count when Count >= Limit ->
+ wait_for_ack(Count, Timeout);
+ Count ->
+ drain_acks(Count)
+ end.
+
+wait_for_ack(Count, Timeout) ->
+ receive
+ {rexi_ack, N} -> drain_acks(Count-N)
+ after Timeout ->
+ throw(timeout)
+ end.
+
+drain_acks(Count) when Count < 0 ->
+ erlang:error(mismatched_rexi_ack);
+drain_acks(Count) ->
+ receive
+ {rexi_ack, N} -> drain_acks(Count-N)
+ after 0 ->
+ {ok, Count}
+ end.