blob: 19d8eb4e1e865ad5e82c46c9b06b9551a03dd7fc [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_epi_server).
-behaviour(gen_server).
-define(SERVER, ?MODULE).
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([start_link/0]).
-export([subscribe/3, subscribe/4, unsubscribe/1, unsubscribe/2]).
-export([notify/4, notify/5]).
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(epi_server_state, {subscriptions}).
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
subscribe(App, Key, MFA) ->
subscribe(?SERVER, App, Key, MFA).
subscribe(Server, App, Key, {_M, _F, _A} = MFA) ->
gen_server:call(Server, {subscribe, App, Key, MFA}).
unsubscribe(Subscription) ->
unsubscribe(?SERVER, Subscription).
unsubscribe(Server, Subscription) ->
gen_server:call(Server, {unsubscribe, Subscription}).
notify(App, Key, OldData, Data) ->
notify(?SERVER, App, Key, OldData, Data).
notify(Server, App, Key, OldData, Data) ->
gen_server:cast(Server, {notify, App, Key, OldData, Data}).
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
init(_Args) ->
State = #epi_server_state{subscriptions = dict:new()},
{ok, State}.
handle_call({subscribe, App, Key, MFA}, {Pid, _Tag},
#epi_server_state{subscriptions = Subscriptions0} = State0) ->
{Subscription, Subscriptions1} = add(Pid, Subscriptions0, App, Key, MFA),
State1 = State0#epi_server_state{subscriptions = Subscriptions1},
{reply, {ok, Subscription}, State1};
handle_call({unsubscribe, Subscription}, _From,
#epi_server_state{subscriptions = Subscriptions0} = State0) ->
Subscriptions1 = remove(Subscriptions0, Subscription),
State1 = State0#epi_server_state{subscriptions = Subscriptions1},
{reply, ok, State1};
handle_call(_Request, _From, State) ->
{stop, normal, State}.
handle_cast({notify, App, Key, OldData, Data},
#epi_server_state{subscriptions = Subscriptions} = State) ->
Subscribers = subscribers(Subscriptions, App, Key),
notify_subscribers(Subscribers, App, Key, OldData, Data),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', MonitorRef, _Type, _Object, _Info},
#epi_server_state{subscriptions = Subscriptions0} = State0) ->
Subscriptions1 = remove(Subscriptions0, MonitorRef),
State1 = State0#epi_server_state{subscriptions = Subscriptions1},
{noreply, State1};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
subscribers(Subscriptions, App, Key) ->
case dict:find({App, Key}, Subscriptions) of
error ->
[];
{ok, Subscribers} ->
Subscribers
end.
add(Pid, Subscriptions, App, Key, MFA) ->
Subscription = erlang:monitor(process, Pid),
{Subscription, dict:append({App, Key}, {Subscription, MFA}, Subscriptions)}.
remove(Subscriptions, SubscriptionId) ->
case find(Subscriptions, SubscriptionId) of
{App, Key} ->
demonitor(SubscriptionId, [flush]),
delete_subscriber(Subscriptions, App, Key, SubscriptionId);
_ ->
Subscriptions
end.
find(Subscriptions, SubscriptionId) ->
dict:fold(fun(Key, Subscribers, Acc) ->
case [ok || {Id, _MFA} <- Subscribers, Id =:= SubscriptionId] of
[_] ->
Key;
[] ->
Acc
end
end, not_found, Subscriptions).
delete_subscriber(Subscriptions, App, Key, SubscriptionId) ->
dict:update({App, Key}, fun(Subscribers) ->
[{Id, MFA} || {Id, MFA} <- Subscribers, Id =/= SubscriptionId]
end, Subscriptions).
notify_subscribers(Subscribers, App, Key, OldData, Data) ->
[M:F(App, Key, OldData, Data, A) || {_Id, {M, F, A}} <- Subscribers].