% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_index).
-behaviour(gen_server).

-vsn(2).

%% API
-export([start_link/1, stop/1, get_state/2, get_info/1]).
-export([trigger_update/2]).
-export([compact/1, compact/2, get_compactor_pid/1]).
-export([config_change/3]).

%% gen_server callbacks
-export([init/1, terminate/2, code_change/3]).
-export([handle_call/3, handle_cast/2, handle_info/2]).


-include_lib("couch/include/couch_db.hrl").


-define(CHECK_INTERVAL, 600000). % 10 minutes
-define(RELISTEN_DELAY, 5000).

-record(st, {
    mod,
    idx_state,
    updater,
    compactor,
    waiters=[],
    commit_delay,
    committed=true,
    shutdown=false
}).


start_link({Module0, IdxState0}) ->
    [Module, IdxState] = couch_index_plugin:before_open(Module0, IdxState0),
    proc_lib:start_link(?MODULE, init, [{Module, IdxState}]).


stop(Pid) ->
    gen_server:cast(Pid, stop).


get_state(Pid, RequestSeq) ->
    gen_server:call(Pid, {get_state, RequestSeq}, infinity).


get_info(Pid) ->
    gen_server:call(Pid, get_info).


trigger_update(Pid, UpdateSeq) ->
    gen_server:cast(Pid, {trigger_update, UpdateSeq}).


compact(Pid) ->
    compact(Pid, []).


compact(Pid, Options) ->
    {ok, CPid} = gen_server:call(Pid, compact),
    case lists:member(monitor, Options) of
        true -> {ok, erlang:monitor(process, CPid)};
        false -> ok
    end.


get_compactor_pid(Pid) ->
    gen_server:call(Pid, get_compactor_pid).

config_change("query_server_config", "commit_freq", NewValue) ->
    ok = gen_server:cast(?MODULE, {config_update, NewValue}).


init({Mod, IdxState}) ->
    ok = config:subscribe_for_changes([{"query_server_config", "commit_freq"}]),
    DbName = Mod:get(db_name, IdxState),
    erlang:send_after(?CHECK_INTERVAL, self(), maybe_close),
    Resp = couch_util:with_db(DbName, fun(Db) ->
        case Mod:open(Db, IdxState) of
            {ok, IdxSt} ->
                couch_db:monitor(Db),
                {ok, IdxSt};
            Error ->
                Error
        end
    end),
    case Resp of
        {ok, NewIdxState} ->
            {ok, UPid} = couch_index_updater:start_link(self(), Mod),
            {ok, CPid} = couch_index_compactor:start_link(self(), Mod),
            Delay = config:get("query_server_config", "commit_freq", "5"),
            MsDelay = 1000 * list_to_integer(Delay),
            State = #st{
                mod=Mod,
                idx_state=NewIdxState,
                updater=UPid,
                compactor=CPid,
                commit_delay=MsDelay
            },
            Args = [
                Mod:get(db_name, IdxState),
                Mod:get(idx_name, IdxState),
                couch_index_util:hexsig(Mod:get(signature, IdxState))
            ],
            couch_log:info("Opening index for db: ~s idx: ~s sig: ~p", Args),
            proc_lib:init_ack({ok, self()}),
            gen_server:enter_loop(?MODULE, [], State);
        Other ->
            proc_lib:init_ack(Other)
    end.


terminate(Reason, State) ->
    #st{mod=Mod, idx_state=IdxState}=State,
    Mod:close(IdxState),
    send_all(State#st.waiters, Reason),
    couch_util:shutdown_sync(State#st.updater),
    couch_util:shutdown_sync(State#st.compactor),
    Args = [
        Mod:get(db_name, IdxState),
        Mod:get(idx_name, IdxState),
        couch_index_util:hexsig(Mod:get(signature, IdxState)),
        Reason
    ],
    couch_log:info("Closing index for db: ~s idx: ~s sig: ~p because ~r", Args),
    ok.


handle_call({get_state, ReqSeq}, From, State) ->
    #st{
        mod=Mod,
        idx_state=IdxState,
        waiters=Waiters
    } = State,
    IdxSeq = Mod:get(update_seq, IdxState),
    case ReqSeq =< IdxSeq of
        true ->
            {reply, {ok, IdxState}, State};
        _ -> % View update required
            couch_index_updater:run(State#st.updater, IdxState),
            Waiters2 = [{From, ReqSeq} | Waiters],
            {noreply, State#st{waiters=Waiters2}, infinity}
    end;
handle_call(get_info, _From, State) ->
    #st{mod=Mod} = State,
    IdxState = State#st.idx_state,
    {ok, Info0} = Mod:get(info, IdxState),
    IsUpdating = couch_index_updater:is_running(State#st.updater),
    IsCompacting = couch_index_compactor:is_running(State#st.compactor),
    IdxSeq = Mod:get(update_seq, IdxState),
    GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end,
    DbName = Mod:get(db_name, IdxState),
    CommittedSeq = couch_util:with_db(DbName, GetCommSeq),
    Info = Info0 ++ [
        {updater_running, IsUpdating},
        {compact_running, IsCompacting},
        {waiting_commit, State#st.committed == false},
        {waiting_clients, length(State#st.waiters)},
        {pending_updates, max(CommittedSeq - IdxSeq, 0)}
    ],
    {reply, {ok, Info}, State};
handle_call(reset, _From, State) ->
    #st{
        mod=Mod,
        idx_state=IdxState
    } = State,
    {ok, NewIdxState} = Mod:reset(IdxState),
    {reply, {ok, NewIdxState}, State#st{idx_state=NewIdxState}};
handle_call(compact, _From, State) ->
    Resp = couch_index_compactor:run(State#st.compactor, State#st.idx_state),
    {reply, Resp, State};
handle_call(get_compactor_pid, _From, State) ->
    {reply, {ok, State#st.compactor}, State};
handle_call({compacted, NewIdxState}, _From, State) ->
    #st{
        mod=Mod,
        idx_state=OldIdxState
    } = State,
    assert_signature_match(Mod, OldIdxState, NewIdxState),
    NewSeq = Mod:get(update_seq, NewIdxState),
    OldSeq = Mod:get(update_seq, OldIdxState),
    % For indices that require swapping files, we have to make sure we're
    % up to date with the current index. Otherwise indexes could roll back
    % (perhaps considerably) to previous points in history.
    case is_recompaction_enabled(NewIdxState, State) of
        true ->
            case NewSeq >= OldSeq of
                true -> {reply, ok, commit_compacted(NewIdxState, State)};
                false -> {reply, recompact, State}
            end;
        false ->
            {reply, ok, commit_compacted(NewIdxState, State)}
    end.

handle_cast({config_change, NewDelay}, State) ->
    MsDelay = 1000 * list_to_integer(NewDelay),
    {noreply, State#st{commit_delay=MsDelay}};
handle_cast({trigger_update, UpdateSeq}, State) ->
    #st{
        mod=Mod,
        idx_state=IdxState
    } = State,
    case UpdateSeq =< Mod:get(update_seq, IdxState) of
        true ->
            {noreply, State};
        false ->
            couch_index_updater:run(State#st.updater, IdxState),
            {noreply, State}
    end;
handle_cast({updated, NewIdxState}, State) ->
    {noreply, NewState} = handle_cast({new_state, NewIdxState}, State),
    case NewState#st.shutdown andalso (NewState#st.waiters =:= []) of
        true ->
            {stop, normal, NewState};
        false ->
            maybe_restart_updater(NewState),
            {noreply, NewState}
    end;
handle_cast({new_state, NewIdxState}, State) ->
    #st{
        mod=Mod,
        idx_state=OldIdxState,
        commit_delay=Delay
    } = State,
    assert_signature_match(Mod, OldIdxState, NewIdxState),
    CurrSeq = Mod:get(update_seq, NewIdxState),
    Args = [
        Mod:get(db_name, NewIdxState),
        Mod:get(idx_name, NewIdxState),
        CurrSeq
    ],
    couch_log:debug("Updated index for db: ~s idx: ~s seq: ~B", Args),
    Rest = send_replies(State#st.waiters, CurrSeq, NewIdxState),
    case State#st.committed of
        true -> erlang:send_after(Delay, self(), commit);
        false -> ok
    end,
    {noreply, State#st{
        idx_state=NewIdxState,
        waiters=Rest,
        committed=false
    }};
handle_cast({update_error, Error}, State) ->
    send_all(State#st.waiters, Error),
    {noreply, State#st{waiters=[]}};
handle_cast(stop, State) ->
    {stop, normal, State};
handle_cast(delete, State) ->
    #st{mod=Mod, idx_state=IdxState} = State,
    ok = Mod:delete(IdxState),
    {stop, normal, State};
handle_cast(ddoc_updated, State) ->
    #st{mod = Mod, idx_state = IdxState, waiters = Waiters} = State,
    DbName = Mod:get(db_name, IdxState),
    DDocId = Mod:get(idx_name, IdxState),
    Shutdown = couch_util:with_db(DbName, fun(Db) ->
        case couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX]) of
            {not_found, deleted} ->
                true;
            {ok, DDoc} ->
                {ok, NewIdxState} = Mod:init(Db, DDoc),
                Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState)
        end
    end),
    case Shutdown of
        true ->
            case Waiters of
                [] ->
                    {stop, normal, State};
                _ ->
                    {noreply, State#st{shutdown = true}}
            end;
        false ->
            {noreply, State#st{shutdown = false}}
    end;
handle_cast(_Mesg, State) ->
    {stop, unhandled_cast, State}.

handle_info(commit, #st{committed=true}=State) ->
    {noreply, State};
handle_info(commit, State) ->
    #st{mod=Mod, idx_state=IdxState, commit_delay=Delay} = State,
    DbName = Mod:get(db_name, IdxState),
    IdxName = Mod:get(idx_name, IdxState),
    GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end,
    CommittedSeq = couch_util:with_db(DbName, GetCommSeq),
    case CommittedSeq >= Mod:get(update_seq, IdxState) of
        true ->
            % Commit the updates
            ok = Mod:commit(IdxState),
            couch_event:notify(DbName, {index_commit, IdxName}),
            {noreply, State#st{committed=true}};
        _ ->
            % We can't commit the header because the database seq that's
            % fully committed to disk is still behind us. If we committed
            % now and the database lost those changes our view could be
            % forever out of sync with the database. But a crash before we
            % commit these changes, no big deal, we only lose incremental
            % changes since last committal.
            erlang:send_after(Delay, self(), commit),
            {noreply, State}
    end;
handle_info(maybe_close, State) ->
    % We need to periodically check if our index file still
    % exists on disk because index cleanups don't notify
    % the couch_index process when a file has been deleted. If
    % we don't check for this condition then the index can
    % remain open indefinitely wasting disk space.
    %
    % We make sure that we're idle before closing by looking
    % to see if we have any clients waiting for an update.
    Mod = State#st.mod,
    case State#st.waiters of
        [] ->
            case Mod:index_file_exists(State#st.idx_state) of
                true ->
                    erlang:send_after(?CHECK_INTERVAL, self(), maybe_close),
                    {noreply, State};
                false ->
                    {stop, normal, State}
            end;
        _ ->
            erlang:send_after(?CHECK_INTERVAL, self, maybe_close),
            {noreply, State}
    end;
handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) ->
    Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
    couch_log:info("Index shutdown by monitor notice for db: ~s idx: ~s", Args),
    catch send_all(State#st.waiters, shutdown),
    {stop, normal, State#st{waiters=[]}};
handle_info({config_change, "query_server_config", "commit_freq", NewDelay, _}, State) ->
    handle_cast({config_change, NewDelay}, State);
handle_info({gen_event_EXIT, _Handler, _Reason}, State) ->
    erlang:send_after(?RELISTEN_DELAY, self(), restart_config_listener),
    {noreply, State};
handle_info(restart_config_listener, State) ->
    ok = config:subscribe_for_changes([{"query_server_config", "commit_freq"}]),
    {noreply, State}.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

maybe_restart_updater(#st{waiters=[]}) ->
    ok;
maybe_restart_updater(#st{mod=Mod, idx_state=IdxState}=State) ->
    couch_util:with_db(Mod:get(db_name, IdxState), fun(Db) ->
        UpdateSeq = couch_db:get_update_seq(Db),
        CommittedSeq = couch_db:get_committed_update_seq(Db),
        CanUpdate = UpdateSeq > CommittedSeq,
        UOpts = Mod:get(update_options, IdxState),
        case CanUpdate and lists:member(committed_only, UOpts) of
            true -> couch_db:ensure_full_commit(Db);
            false -> ok
        end
    end),
    couch_index_updater:run(State#st.updater, IdxState).


send_all(Waiters, Reply) ->
    [gen_server:reply(From, Reply) || {From, _} <- Waiters].


send_replies(Waiters, UpdateSeq, IdxState) ->
    Pred = fun({_, S}) -> S =< UpdateSeq end,
    {ToSend, Remaining} = lists:partition(Pred, Waiters),
    [gen_server:reply(From, {ok, IdxState}) || {From, _} <- ToSend],
    Remaining.

assert_signature_match(Mod, OldIdxState, NewIdxState) ->
    case {Mod:get(signature, OldIdxState), Mod:get(signature, NewIdxState)} of
        {Sig, Sig} -> ok;
        _ -> erlang:error(signature_mismatch)
    end.

commit_compacted(NewIdxState, State) ->
    #st{
        mod=Mod,
        idx_state=OldIdxState,
        updater=Updater,
        commit_delay=Delay
    } = State,
    {ok, NewIdxState1} = Mod:swap_compacted(OldIdxState, NewIdxState),
    % Restart the indexer if it's running.
    case couch_index_updater:is_running(Updater) of
        true -> ok = couch_index_updater:restart(Updater, NewIdxState1);
        false -> ok
    end,
    case State#st.committed of
        true -> erlang:send_after(Delay, self(), commit);
        false -> ok
    end,
    State#st{
        idx_state=NewIdxState1,
        committed=false
     }.

is_recompaction_enabled(IdxState, #st{mod = Mod}) ->
    DbName = binary_to_list(Mod:get(db_name, IdxState)),
    IdxName = binary_to_list(Mod:get(idx_name, IdxState)),
    IdxKey = DbName ++ ":" ++ IdxName,

    IdxSignature = couch_index_util:hexsig((Mod:get(signature, IdxState))),

    Global = get_value("view_compaction", "enabled_recompaction"),
    PerSignature = get_value("view_compaction.recompaction", IdxSignature),
    PerIdx = get_value("view_compaction.recompaction", IdxKey),
    PerDb = get_value("view_compaction.recompaction", DbName),

    find_most_specific([Global, PerDb, PerIdx, PerSignature], true).

find_most_specific(Settings, Default) ->
    Reversed = lists:reverse([Default | Settings]),
    [Value | _] = lists:dropwhile(fun(A) -> A =:= undefined end, Reversed),
    Value.

get_value(Section, Key) ->
    case config:get(Section, Key) of
        "enabled" -> true;
        "disabled" -> false;
        "true" -> true;
        "false" -> false;
        undefined -> undefined
    end.

-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").

get(db_name, _, _) ->
    <<"db_name">>;
get(idx_name, _, _) ->
    <<"idx_name">>;
get(signature, _, _) ->
    <<61,237,157,230,136,93,96,201,204,17,137,186,50,249,44,135>>.

setup(Settings) ->
    ok = meck:new([config], [passthrough]),
    ok = meck:new([test_index], [non_strict]),
    ok = meck:expect(config, get, fun(Section, Key) ->
        configure(Section, Key, Settings)
    end),
    ok = meck:expect(test_index, get, fun get/3),
    {undefined, #st{mod = {test_index}}}.

teardown(_, _) ->
    (catch meck:unload(config)),
    (catch meck:unload(test_index)),
    ok.

configure("view_compaction", "enabled_recompaction", [Global, _Db, _Index]) ->
    Global;
configure("view_compaction.recompaction", "db_name", [_Global, Db, _Index]) ->
    Db;
configure("view_compaction.recompaction", "db_name:" ++ _, [_, _, Index]) ->
    Index;
configure(Section, Key, _) ->
    meck:passthrough([Section, Key]).

recompaction_configuration_test_() ->
    {
        "Compaction tests",
        {
            setup,
            fun test_util:start_couch/0, fun test_util:stop_couch/1,
            {
                foreachx,
                fun setup/1, fun teardown/2,
                recompaction_configuration_tests()
            }
        }
    }.

recompaction_configuration_tests() ->
    AllCases = couch_tests_combinatorics:product([
        [undefined, "true", "false"],
        [undefined, "enabled", "disabled"],
        [undefined, "enabled", "disabled"]
    ]),

    EnabledCases = [
        [undefined, undefined, undefined],

        [undefined, undefined,"enabled"],
        [undefined, "enabled", undefined],
        [undefined, "disabled", "enabled"],
        [undefined, "enabled", "enabled"],

        ["true", undefined, undefined],
        ["true", undefined, "enabled"],
        ["true", "disabled", "enabled"],
        ["true", "enabled", undefined],
        ["true", "enabled", "enabled"],

        ["false", undefined, "enabled"],
        ["false", "enabled", undefined],
        ["false", "disabled", "enabled"],
        ["false", "enabled", "enabled"]
    ],

    DisabledCases = [
        [undefined, undefined, "disabled"],
        [undefined, "disabled", undefined],
        [undefined, "disabled", "disabled"],
        [undefined, "enabled", "disabled"],

        ["true", undefined, "disabled"],
        ["true", "disabled", undefined],
        ["true", "disabled", "disabled"],
        ["true", "enabled", "disabled"],

        ["false", undefined, undefined],
        ["false", undefined, "disabled"],
        ["false", "disabled", undefined],
        ["false", "disabled", "disabled"],
        ["false", "enabled", "disabled"]
    ],

    ?assertEqual([], AllCases -- (EnabledCases ++ DisabledCases)),

    [{Settings, fun should_not_call_recompact/2} || Settings <- DisabledCases]
    ++
    [{Settings, fun should_call_recompact/2} || Settings <- EnabledCases].

should_call_recompact(Settings, {IdxState, State}) ->
    {test_id(Settings), ?_test(begin
        ?assert(is_recompaction_enabled(IdxState, State)),
        ok
    end)}.

should_not_call_recompact(Settings, {IdxState, State}) ->
    {test_id(Settings), ?_test(begin
        ?assertNot(is_recompaction_enabled(IdxState, State)),
        ok
    end)}.

to_string(undefined) -> "undefined";
to_string(Value) -> Value.

test_id(Settings0) ->
    Settings1 = [to_string(Value) || Value <- Settings0],
    "[ " ++ lists:flatten(string:join(Settings1, " , ")) ++ " ]".

-endif.
