Implement new streaming APIs

This adds new functions that are used by coordinators and workers to
negotiate an RPC stream. A stream is simply any response that requires
multiple messages from the worker.

BugzId: 22729
diff --git a/src/rexi.erl b/src/rexi.erl
index d37d360..75bc9bf 100644
--- a/src/rexi.erl
+++ b/src/rexi.erl
@@ -15,6 +15,8 @@
 -export([cast/2, cast/3, cast/4, kill/2]).
 -export([reply/1, sync_reply/1, sync_reply/2]).
 -export([async_server_call/2, async_server_call/3]).
+-export([stream_init/0, stream_init/1]).
+-export([stream_start/1, stream_cancel/1]).
 -export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]).
 
 -include_lib("rexi/include/rexi.hrl").
@@ -115,6 +117,50 @@
         timeout
     end.
 
+%% @equiv stream_init(300000)
+stream_init() ->
+    stream_init(300000).
+
+%% @doc Initialize an RPC stream that involves sending multiple
+%% messages back to the coordinator.
+%%
+%% This should be called by rexi workers. It blocks until the
+%% coordinator responds with whether this worker should proceed.
+%% This function will either return with `ok` or call
+%% `erlang:exit/1`.
+-spec stream_init(pos_integer()) -> ok.
+stream_init(Timeout) ->
+    case sync_reply(rexi_STREAM_INIT, Timeout) of
+        rexi_STREAM_START ->
+            ok;
+        rexi_STREAM_CANCEL ->
+            exit(normal);
+        timeout ->
+            exit(timeout);
+        Else ->
+            exit({invalid_stream_message, Else})
+    end.
+
+%% @doc Start a worker stream
+%%
+%% If a coordinator wants to continue using a streaming worker it
+%% should use this function to inform the worker to continue
+%% sending messages. The `From` should be the value provided by
+%% the worker in the rexi_STREAM_INIT message.
+-spec stream_start({pid(), any()}) -> ok.
+stream_start({Pid, _Tag}=From) when is_pid(Pid) ->
+    gen_server:reply(From, rexi_STREAM_START).
+
+%% @doc Cancel a worker stream
+%%
+%% If a coordinator decideds that a worker is not going to be part
+%% of the response it should use this function to cancel the worker.
+%% The `From` should be the value provided by the worker in the
+%% rexi_STREAM_INIT message.
+-spec stream_cancel({pid(), any()}) -> ok.
+stream_cancel({Pid, _Tag}=From) when is_pid(Pid) ->
+    gen_server:reply(From, rexi_STREAM_CANCEL).
+
 %% @equiv stream(Msg, 100, 300000)
 stream(Msg) ->
     stream(Msg, 10, 300000).