Merge pull request #4 from cloudant/13311-improve-view-back-pressure

Implement message stream interface

BugzID: 13311
BugzID: 14075
diff --git a/src/rexi.erl b/src/rexi.erl
index 93d33d7..1d47369 100644
--- a/src/rexi.erl
+++ b/src/rexi.erl
@@ -18,6 +18,7 @@
 -export([reply/1, sync_reply/1, sync_reply/2]).
 -export([async_server_call/2, async_server_call/3]).
 -export([get_errors/0, get_last_error/0, set_error_limit/1]).
+-export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]).
 
 -include("rexi.hrl").
 
@@ -127,6 +128,63 @@
         timeout
     end.
 
+%% @equiv stream(Msg, 100, 300000)
+stream(Msg) ->
+    stream(Msg, 100, 300000).
+
+%% @equiv stream(Msg, Limit, 300000)
+stream(Msg, Limit) ->
+    stream(Msg, Limit, 300000).
+
+%% @doc convenience function to stream messages to caller while blocking when
+%% a specific number of messages are outstanding. Message is of the form
+%% {OriginalRef, self(), Reply}, which enables the original caller to ack.
+-spec stream(any(), integer(), pos_integer() | infinity) -> any().
+stream(Msg, Limit, Timeout) ->
+    try maybe_wait(Limit, Timeout) of
+        {ok, Count} ->
+            put(rexi_unacked, Count+1),
+            {Caller, Ref} = get(rexi_from),
+            erlang:send(Caller, {Ref, self(), Msg}),
+            ok
+    catch throw:timeout ->
+        timeout
+    end.
+
+%% @equiv stream_ack(Client, 1)
+stream_ack(Client) ->
+    erlang:send(Client, {rexi_ack, 1}).
+
+%% @doc Ack streamed messages
+stream_ack(Client, N) ->
+    erlang:send(Client, {rexi_ack, N}).
+
 %% internal functions %%
 
 cast_msg(Msg) -> {'$gen_cast', Msg}.
+
+maybe_wait(Limit, Timeout) ->
+    case get(rexi_unacked) of
+        undefined ->
+            {ok, 0};
+        Count when Count >= Limit ->
+            wait_for_ack(Count, Timeout);
+        Count ->
+            drain_acks(Count)
+    end.
+
+wait_for_ack(Count, Timeout) ->
+    receive
+        {rexi_ack, N} -> drain_acks(Count-N)
+    after Timeout ->
+        throw(timeout)
+    end.
+
+drain_acks(Count) when Count < 0 ->
+    erlang:error(mismatched_rexi_ack);
+drain_acks(Count) ->
+    receive
+        {rexi_ack, N} -> drain_acks(Count-N)
+    after 0 ->
+        {ok, Count}
+    end.