blob: a9ed331990446aaac85e6f2de6c44e7298a19978 [file] [log] [blame]
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_event_listener).
-export([
start/3,
start/4,
start_link/3,
start_link/4,
enter_loop/3,
cast/2
]).
-export([
do_init/3,
loop/2
]).
-record(st, {
module,
state
}).
-callback init(Arg :: term()) ->
term().
-callback terminate(Reason :: term(), State :: term()) ->
term().
-callback handle_cast(Message :: term(), State :: term()) ->
term().
-callback handle_event(DbName :: term(), Event :: term(), State :: term()) ->
term().
-callback handle_info(Message :: term(), State :: term()) ->
term().
start(Mod, Arg, Options) ->
Pid = erlang:spawn(?MODULE, do_init, [Mod, Arg, Options]),
{ok, Pid}.
start(Name, Mod, Arg, Options) ->
case where(Name) of
undefined ->
start(Mod, Arg, [{name, Name} | Options]);
Pid ->
{error, {already_started, Pid}}
end.
start_link(Mod, Arg, Options) ->
Pid = erlang:spawn_link(?MODULE, do_init, [Mod, Arg, Options]),
{ok, Pid}.
start_link(Name, Mod, Arg, Options) ->
case where(Name) of
undefined ->
start_link(Mod, Arg, [{name, Name} | Options]);
Pid ->
{error, {already_started, Pid}}
end.
enter_loop(Module, State, Options) ->
ok = register_listeners(Options),
?MODULE:loop(#st{module=Module, state=State}, infinity).
cast(Pid, Message) ->
Pid ! {'$couch_event_cast', Message},
ok.
do_init(Module, Arg, Options) ->
ok = maybe_name_process(Options),
ok = register_listeners(Options),
case (catch Module:init(Arg)) of
{ok, State} ->
?MODULE:loop(#st{module=Module, state=State}, infinity);
{ok, State, Timeout} when is_integer(Timeout), Timeout >= 0 ->
?MODULE:loop(#st{module=Module, state=State}, Timeout);
Else ->
erlang:exit(Else)
end.
loop(St, Timeout) ->
receive
{'$couch_event', DbName, Event} ->
do_event(St, DbName, Event);
{'$couch_event_cast', Message} ->
do_cast(St, Message);
Else ->
do_info(St, Else)
after Timeout ->
do_info(St, timeout)
end.
maybe_name_process(Options) ->
case proplists:lookup(name, Options) of
{name, Name} ->
case name_register(Name) of
true ->
ok;
{false, Pid} ->
erlang:error({already_started, Pid})
end;
none ->
ok
end.
register_listeners(Options) ->
case get_all_dbnames(Options) of
all_dbs ->
couch_event:register_all(self());
DbNames ->
couch_event:register_many(self(), DbNames)
end,
ok.
do_event(#st{module=Module, state=State}=St, DbName, Event) ->
case (catch Module:handle_event(DbName, Event, State)) of
{ok, NewState} ->
?MODULE:loop(St#st{state=NewState}, infinity);
{ok, NewState, Timeout} when is_integer(Timeout), Timeout >= 0 ->
?MODULE:loop(St#st{state=NewState}, Timeout);
{stop, Reason, NewState} ->
do_terminate(Reason, St#st{state=NewState});
Else ->
erlang:error(Else)
end.
do_cast(#st{module=Module, state=State}=St, Message) ->
case (catch Module:handle_cast(Message, State)) of
{ok, NewState} ->
?MODULE:loop(St#st{state=NewState}, infinity);
{ok, NewState, Timeout} when is_integer(Timeout), Timeout >= 0 ->
?MODULE:loop(St#st{state=NewState}, Timeout);
{stop, Reason, NewState} ->
do_terminate(Reason, St#st{state=NewState});
Else ->
erlang:error(Else)
end.
do_info(#st{module=Module, state=State}=St, Message) ->
case (catch Module:handle_info(Message, State)) of
{ok, NewState} ->
?MODULE:loop(St#st{state=NewState}, infinity);
{ok, NewState, Timeout} when is_integer(Timeout), Timeout >= 0 ->
?MODULE:loop(St#st{state=NewState}, Timeout);
{stop, Reason, NewState} ->
do_terminate(Reason, St#st{state=NewState});
Else ->
erlang:error(Else)
end.
do_terminate(Reason, #st{module=Module, state=State}) ->
% Order matters. We want to make sure Module:terminate/1
% is called even if couch_event:unregister/1 hangs
% indefinitely.
catch Module:terminate(Reason, State),
catch couch_event:unregister(self()),
Status = case Reason of
normal -> normal;
shutdown -> normal;
ignore -> normal;
Else -> Else
end,
erlang:exit(Status).
where({global, Name}) -> global:whereis_name(Name);
where({local, Name}) -> whereis(Name).
name_register({global, Name}=GN) ->
case global:register_name(Name, self()) of
yes -> true;
no -> {false, where(GN)}
end;
name_register({local, Name}=LN) ->
try register(Name, self()) of
true -> true
catch error:_ ->
{false, where(LN)}
end.
get_all_dbnames(Options) ->
case proplists:get_value(all_dbs, Options) of
true -> all_dbs;
_ -> get_all_dbnames(Options, [])
end.
get_all_dbnames([], []) ->
erlang:error(no_dbnames_provided);
get_all_dbnames([], Acc) ->
lists:usort(convert_dbname_list(Acc));
get_all_dbnames([{dbname, DbName} | Rest], Acc) ->
get_all_dbnames(Rest, [DbName | Acc]);
get_all_dbnames([{dbnames, DbNames} | Rest], Acc) when is_list(DbNames) ->
get_all_dbnames(Rest, DbNames ++ Acc);
get_all_dbnames([_Ignored | Rest], Acc) ->
get_all_dbnames(Rest, Acc).
convert_dbname_list([]) ->
[];
convert_dbname_list([DbName | Rest]) when is_binary(DbName) ->
[DbName | convert_dbname_list(Rest)];
convert_dbname_list([DbName | Rest]) when is_list(DbName) ->
[list_to_binary(DbName) | convert_dbname_list(Rest)];
convert_dbname_list([DbName | _]) ->
erlang:error({invalid_dbname, DbName}).