% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_event_listener).

-export([
    start/3,
    start/4,
    start_link/3,
    start_link/4,
    enter_loop/3,
    cast/2
]).

-export([
    do_init/3,
    loop/2
]).

-record(st, {
    module,
    state
}).

-callback init(Arg :: term()) ->
    term().

-callback terminate(Reason :: term(), State :: term()) ->
    term().

-callback handle_cast(Message :: term(), State :: term()) ->
    term().

-callback handle_event(DbName :: term(), Event :: term(), State :: term()) ->
    term().

-callback handle_info(Message :: term(), State :: term()) ->
    term().

start(Mod, Arg, Options) ->
    Pid = erlang:spawn(?MODULE, do_init, [Mod, Arg, Options]),
    {ok, Pid}.

start(Name, Mod, Arg, Options) ->
    case where(Name) of
        undefined ->
            start(Mod, Arg, [{name, Name} | Options]);
        Pid ->
            {error, {already_started, Pid}}
    end.

start_link(Mod, Arg, Options) ->
    Pid = erlang:spawn_link(?MODULE, do_init, [Mod, Arg, Options]),
    {ok, Pid}.

start_link(Name, Mod, Arg, Options) ->
    case where(Name) of
        undefined ->
            start_link(Mod, Arg, [{name, Name} | Options]);
        Pid ->
            {error, {already_started, Pid}}
    end.

enter_loop(Module, State, Options) ->
    ok = register_listeners(Options),
    ?MODULE:loop(#st{module = Module, state = State}, infinity).

cast(Pid, Message) ->
    Pid ! {'$couch_event_cast', Message},
    ok.

do_init(Module, Arg, Options) ->
    ok = maybe_name_process(Options),
    ok = register_listeners(Options),
    case (catch Module:init(Arg)) of
        {ok, State} ->
            ?MODULE:loop(#st{module = Module, state = State}, infinity);
        {ok, State, Timeout} when is_integer(Timeout), Timeout >= 0 ->
            ?MODULE:loop(#st{module = Module, state = State}, Timeout);
        Else ->
            erlang:exit(Else)
    end.

loop(St, Timeout) ->
    receive
        {'$couch_event', DbName, Event} ->
            do_event(St, DbName, Event);
        {'$couch_event_cast', Message} ->
            do_cast(St, Message);
        Else ->
            do_info(St, Else)
    after Timeout ->
        do_info(St, timeout)
    end.

maybe_name_process(Options) ->
    case proplists:lookup(name, Options) of
        {name, Name} ->
            case name_register(Name) of
                true ->
                    ok;
                {false, Pid} ->
                    erlang:error({already_started, Pid})
            end;
        none ->
            ok
    end.

register_listeners(Options) ->
    case get_all_dbnames(Options) of
        all_dbs ->
            couch_event:register_all(self());
        DbNames ->
            couch_event:register_many(self(), DbNames)
    end,
    ok.

do_event(#st{module = Module, state = State} = St, DbName, Event) ->
    case (catch Module:handle_event(DbName, Event, State)) of
        {ok, NewState} ->
            ?MODULE:loop(St#st{state = NewState}, infinity);
        {ok, NewState, Timeout} when is_integer(Timeout), Timeout >= 0 ->
            ?MODULE:loop(St#st{state = NewState}, Timeout);
        {stop, Reason, NewState} ->
            do_terminate(Reason, St#st{state = NewState});
        Else ->
            erlang:error(Else)
    end.

do_cast(#st{module = Module, state = State} = St, Message) ->
    case (catch Module:handle_cast(Message, State)) of
        {ok, NewState} ->
            ?MODULE:loop(St#st{state = NewState}, infinity);
        {ok, NewState, Timeout} when is_integer(Timeout), Timeout >= 0 ->
            ?MODULE:loop(St#st{state = NewState}, Timeout);
        {stop, Reason, NewState} ->
            do_terminate(Reason, St#st{state = NewState});
        Else ->
            erlang:error(Else)
    end.

do_info(#st{module = Module, state = State} = St, Message) ->
    case (catch Module:handle_info(Message, State)) of
        {ok, NewState} ->
            ?MODULE:loop(St#st{state = NewState}, infinity);
        {ok, NewState, Timeout} when is_integer(Timeout), Timeout >= 0 ->
            ?MODULE:loop(St#st{state = NewState}, Timeout);
        {stop, Reason, NewState} ->
            do_terminate(Reason, St#st{state = NewState});
        Else ->
            erlang:error(Else)
    end.

do_terminate(Reason, #st{module = Module, state = State}) ->
    % Order matters. We want to make sure Module:terminate/1
    % is called even if couch_event:unregister/1 hangs
    % indefinitely.
    catch Module:terminate(Reason, State),
    catch couch_event:unregister(self()),
    Status =
        case Reason of
            normal -> normal;
            shutdown -> normal;
            ignore -> normal;
            Else -> Else
        end,
    erlang:exit(Status).

where({local, Name}) -> whereis(Name).

name_register({local, Name} = LN) ->
    try register(Name, self()) of
        true -> true
    catch
        error:_ ->
            {false, where(LN)}
    end.

get_all_dbnames(Options) ->
    case proplists:get_value(all_dbs, Options) of
        true -> all_dbs;
        _ -> get_all_dbnames(Options, [])
    end.

get_all_dbnames([], []) ->
    erlang:error(no_dbnames_provided);
get_all_dbnames([], Acc) ->
    lists:usort(convert_dbname_list(Acc));
get_all_dbnames([{dbname, DbName} | Rest], Acc) ->
    get_all_dbnames(Rest, [DbName | Acc]);
get_all_dbnames([{dbnames, DbNames} | Rest], Acc) when is_list(DbNames) ->
    get_all_dbnames(Rest, DbNames ++ Acc);
get_all_dbnames([_Ignored | Rest], Acc) ->
    get_all_dbnames(Rest, Acc).

convert_dbname_list([]) ->
    [];
convert_dbname_list([DbName | Rest]) when is_binary(DbName) ->
    [DbName | convert_dbname_list(Rest)];
convert_dbname_list([DbName | Rest]) when is_list(DbName) ->
    [list_to_binary(DbName) | convert_dbname_list(Rest)];
convert_dbname_list([DbName | _]) ->
    erlang:error({invalid_dbname, DbName}).
