blob: c970f80511a37fbc15a8dd1ec6860111a88911b7 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(rexi).
-export([start/0, stop/0, restart/0]).
-export([cast/2, cast/3, cast/4, kill/2]).
-export([reply/1, sync_reply/1, sync_reply/2]).
-export([async_server_call/2, async_server_call/3]).
-export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]).
-include_lib("rexi/include/rexi.hrl").
start() ->
application:start(rexi).
stop() ->
application:stop(rexi).
restart() ->
stop(), start().
%% @equiv cast(Node, self(), MFA)
-spec cast(node(), {atom(), atom(), list()}) -> reference().
cast(Node, MFA) ->
cast(Node, self(), MFA).
%% @doc Executes apply(M, F, A) on Node.
%% You might want to use this instead of rpc:cast/4 for two reasons. First,
%% the Caller pid and the returned reference are inserted into the remote
%% process' dictionary as `rexi_from', so it has a way to communicate with you.
%% Second, the remote process is monitored. If it exits with a Reason other
%% than normal, Caller will receive a message of the form
%% `{Ref, {rexi_EXIT, Reason}}' where Ref is the returned reference.
-spec cast(node(), pid(), {atom(), atom(), list()}) -> reference().
cast(Node, Caller, MFA) ->
Ref = make_ref(),
Msg = cast_msg({doit, {Caller, Ref}, get(nonce), MFA}),
rexi_utils:send(rexi_utils:server_pid(Node), Msg),
Ref.
%% @doc Executes apply(M, F, A) on Node.
%% This version accepts a sync option which uses the erlang:send/2 call
%% directly in process instead of deferring to a spawned process if
%% erlang:send/2 were to block. If the sync option is omitted this call
%% is identical to cast/3.
-spec cast(node(), pid(), {atom(), atom(), list()}, [atom()]) -> reference().
cast(Node, Caller, MFA, Options) ->
case lists:member(sync, Options) of
true ->
Ref = make_ref(),
Msg = cast_msg({doit, {Caller, Ref}, get(nonce), MFA}),
erlang:send(rexi_utils:server_pid(Node), Msg),
Ref;
false ->
cast(Node, Caller, MFA)
end.
%% @doc Sends an async kill signal to the remote process associated with Ref.
%% No rexi_EXIT message will be sent.
-spec kill(node(), reference()) -> ok.
kill(Node, Ref) ->
% This first version is to tide us over during the upgrade. We'll
% remove it in the next commit that will be in a separate release.
rexi_utils:send({rexi_server, Node}, cast_msg({kill, Ref})),
rexi_utils:send(rexi_utils:server_pid(Node), cast_msg({kill, Ref})),
ok.
%% @equiv async_server_call(Server, self(), Request)
-spec async_server_call(pid() | {atom(),node()}, any()) -> reference().
async_server_call(Server, Request) ->
async_server_call(Server, self(), Request).
%% @doc Sends a properly formatted gen_server:call Request to the Server and
%% returns the reference which the Server will include in its reply. The
%% function acts more like cast() than call() in that the server process
%% is not monitored. Clients who want to know if the server is alive should
%% monitor it themselves before calling this function.
-spec async_server_call(pid() | {atom(),node()}, pid(), any()) -> reference().
async_server_call(Server, Caller, Request) ->
Ref = make_ref(),
rexi_utils:send(Server, {'$gen_call', {Caller,Ref}, Request}),
Ref.
%% @doc convenience function to reply to the original rexi Caller.
-spec reply(any()) -> any().
reply(Reply) ->
{Caller, Ref} = get(rexi_from),
erlang:send(Caller, {Ref,Reply}).
%% @equiv sync_reply(Reply, 300000)
sync_reply(Reply) ->
sync_reply(Reply, 300000).
%% @doc convenience function to reply to caller and wait for response. Message
%% is of the form {OriginalRef, {self(),reference()}, Reply}, which enables the
%% original caller to respond back.
-spec sync_reply(any(), pos_integer() | infinity) -> any().
sync_reply(Reply, Timeout) ->
{Caller, Ref} = get(rexi_from),
Tag = make_ref(),
erlang:send(Caller, {Ref, {self(),Tag}, Reply}),
receive {Tag, Response} ->
Response
after Timeout ->
timeout
end.
%% @equiv stream(Msg, 100, 300000)
stream(Msg) ->
stream(Msg, 10, 300000).
%% @equiv stream(Msg, Limit, 300000)
stream(Msg, Limit) ->
stream(Msg, Limit, 300000).
%% @doc convenience function to stream messages to caller while blocking when
%% a specific number of messages are outstanding. Message is of the form
%% {OriginalRef, self(), Reply}, which enables the original caller to ack.
-spec stream(any(), integer(), pos_integer() | infinity) -> any().
stream(Msg, Limit, Timeout) ->
try maybe_wait(Limit, Timeout) of
{ok, Count} ->
put(rexi_unacked, Count+1),
{Caller, Ref} = get(rexi_from),
erlang:send(Caller, {Ref, self(), Msg}),
ok
catch throw:timeout ->
timeout
end.
%% @equiv stream_ack(Client, 1)
stream_ack(Client) ->
erlang:send(Client, {rexi_ack, 1}).
%% @doc Ack streamed messages
stream_ack(Client, N) ->
erlang:send(Client, {rexi_ack, N}).
%% internal functions %%
cast_msg(Msg) -> {'$gen_cast', Msg}.
maybe_wait(Limit, Timeout) ->
case get(rexi_unacked) of
undefined ->
{ok, 0};
Count when Count >= Limit ->
wait_for_ack(Count, Timeout);
Count ->
drain_acks(Count)
end.
wait_for_ack(Count, Timeout) ->
receive
{rexi_ack, N} -> drain_acks(Count-N)
after Timeout ->
throw(timeout)
end.
drain_acks(Count) when Count < 0 ->
erlang:error(mismatched_rexi_ack);
drain_acks(Count) ->
receive
{rexi_ack, N} -> drain_acks(Count-N)
after 0 ->
{ok, Count}
end.