% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_replicator_job_server).

-behaviour(gen_server).

-export([
    start_link/1
]).

-export([
    init/1,
    terminate/2,
    handle_call/3,
    handle_cast/2,
    handle_info/2,
    format_status/2,
    code_change/3
]).

-export([
    accepted/2,
    scheduling_interval_sec/0,
    reschedule/0
]).

-include("couch_replicator.hrl").
-include_lib("kernel/include/logger.hrl").

-define(MAX_ACCEPTORS, 2).
-define(MAX_JOBS, 500).
-define(MAX_CHURN, 100).
-define(INTERVAL_SEC, 15).
-define(MIN_RUN_TIME_SEC, 60).
% 1 day
-define(TRANSIENT_JOB_MAX_AGE_SEC, 86400).

start_link(Timeout) when is_integer(Timeout) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, Timeout, []).

init(Timeout) when is_integer(Timeout) ->
    process_flag(trap_exit, true),
    couch_replicator_jobs:set_timeout(),
    St = #{
        acceptors => #{},
        workers => #{},
        churn => 0,
        config => get_config(),
        timer => undefined,
        timeout => Timeout
    },
    St1 = spawn_acceptors(St),
    St2 = do_send_after(St1),
    {ok, St2}.

terminate(_, #{} = St) ->
    #{
        workers := Workers,
        timeout := Timeout
    } = St,
    [stop_job(Pid) || Pid <- maps:keys(Workers)],
    % Give jobs a chance to checkpoint and release their locks
    wait_jobs_exit(Workers, Timeout),
    ok.

handle_call({accepted, Pid, Normal}, _From, #{} = St) ->
    #{
        acceptors := Acceptors,
        workers := Workers,
        churn := Churn
    } = St,
    case maps:is_key(Pid, Acceptors) of
        true ->
            Val = {Normal, erlang:system_time(second)},
            St1 = St#{
                acceptors := maps:remove(Pid, Acceptors),
                workers := Workers#{Pid => Val},
                churn := Churn + 1
            },
            {reply, ok, spawn_acceptors(St1)};
        false ->
            ?LOG_ERROR(#{
                what => unknown_acceptor,
                in => replicator,
                pid => Pid
            }),
            LogMsg = "~p : unknown acceptor process ~p",
            couch_log:error(LogMsg, [?MODULE, Pid]),
            {stop, {unknown_acceptor_pid, Pid}, St}
    end;
handle_call(reschedule, _From, St) ->
    {reply, ok, reschedule(St)};
handle_call(Msg, _From, St) ->
    {stop, {bad_call, Msg}, {bad_call, Msg}, St}.

handle_cast(Msg, St) ->
    {stop, {bad_cast, Msg}, St}.

handle_info(reschedule, #{} = St) ->
    {noreply, reschedule(St)};
handle_info({'EXIT', Pid, Reason}, #{} = St) ->
    #{
        acceptors := Acceptors,
        workers := Workers
    } = St,
    case {maps:is_key(Pid, Acceptors), maps:is_key(Pid, Workers)} of
        {true, false} -> handle_acceptor_exit(St, Pid, Reason);
        {false, true} -> handle_worker_exit(St, Pid, Reason);
        {false, false} -> handle_unknown_exit(St, Pid, Reason)
    end;
handle_info(Msg, St) ->
    {stop, {bad_info, Msg}, St}.

format_status(_Opt, [_PDict, #{} = St]) ->
    #{
        acceptors := Acceptors,
        workers := Workers,
        churn := Churn,
        config := Config
    } = St,
    [
        {acceptors, map_size(Acceptors)},
        {workers, map_size(Workers)},
        {churn, Churn},
        {config, Config}
    ].

code_change(_OldVsn, St, _Extra) ->
    {ok, St}.

accepted(Worker, Normal) when is_pid(Worker), is_boolean(Normal) ->
    gen_server:call(?MODULE, {accepted, Worker, Normal}, infinity).

scheduling_interval_sec() ->
    config:get_integer("replicator", "interval_sec", ?INTERVAL_SEC).

reschedule() ->
    gen_server:call(?MODULE, reschedule, infinity).

% Scheduling logic

do_send_after(#{} = St) ->
    #{config := #{interval_sec := IntervalSec}} = St,
    IntervalMSec = IntervalSec * 1000,
    Jitter = IntervalMSec div 3,
    WaitMSec = IntervalMSec + rand:uniform(max(1, Jitter)),
    TRef = erlang:send_after(WaitMSec, self(), reschedule),
    St#{timer := TRef}.

cancel_timer(#{timer := undefined} = St) ->
    St;
cancel_timer(#{timer := TRef} = St) when is_reference(TRef) ->
    erlang:cancel_timer(TRef),
    St#{timer := undefined}.

reschedule(#{} = St) ->
    St1 = cancel_timer(St),
    St2 = St1#{config := get_config()},
    St3 = trim_jobs(St2),
    St4 = start_excess_acceptors(St3),
    St5 = transient_job_cleanup(St4),
    St6 = update_stats(St5),
    St7 = do_send_after(St6),
    St7#{churn := 0}.

start_excess_acceptors(#{} = St) ->
    #{
        churn := Churn,
        acceptors := Acceptors,
        workers := Workers,
        config := #{max_jobs := MaxJobs, max_churn := MaxChurn}
    } = St,

    ACnt = maps:size(Acceptors),
    WCnt = maps:size(Workers),

    ChurnLeft = MaxChurn - Churn,
    Slots = (MaxJobs + MaxChurn) - (ACnt + WCnt),
    MinSlotsChurn = min(Slots, ChurnLeft),

    Pending =
        if
            MinSlotsChurn =< 0 ->
                0;
            true ->
                % Don't fetch pending if we don't have enough slots or churn budget
                couch_replicator_jobs:pending_count(undefined, MinSlotsChurn)
        end,

    couch_stats:update_gauge([couch_replicator, jobs, pending], Pending),

    % Start new acceptors only if we have churn budget, there are pending jobs
    % and we won't start more than max jobs + churn total acceptors
    ToStart = max(0, lists:min([ChurnLeft, Pending, Slots])),

    lists:foldl(
        fun(_, #{} = StAcc) ->
            #{acceptors := AccAcceptors} = StAcc,
            {ok, Pid} = couch_replicator_job:start_link(),
            StAcc#{acceptors := AccAcceptors#{Pid => true}}
        end,
        St,
        lists:seq(1, ToStart)
    ).

transient_job_cleanup(#{} = St) ->
    #{
        config := #{transient_job_max_age_sec := MaxAge}
    } = St,
    Now = erlang:system_time(second),
    FoldFun = fun(_JTx, JobId, State, #{} = Data, ok) ->
        IsTransient = maps:get(?DB_NAME, Data) =:= null,
        IsOld = Now - maps:get(?LAST_UPDATED, Data) >= MaxAge,
        case State =:= finished andalso IsTransient andalso IsOld of
            true ->
                ok = couch_replicator_jobs:remove_job(undefined, JobId),
                ?LOG_INFO(#{
                    what => removing_old_job,
                    in => replicator,
                    jobid => JobId
                }),
                couch_log:info("~p : Removed old job ~p", [?MODULE, JobId]),
                ok;
            false ->
                ok
        end
    end,
    ok = couch_replicator_jobs:fold_jobs(undefined, FoldFun, ok),
    St.

update_stats(#{} = St) ->
    ACnt = maps:size(maps:get(acceptors, St)),
    WCnt = maps:size(maps:get(workers, St)),
    couch_stats:update_gauge([couch_replicator, jobs, accepting], ACnt),
    couch_stats:update_gauge([couch_replicator, jobs, running], WCnt),
    couch_stats:increment_counter([couch_replicator, jobs, reschedules]),
    St.

trim_jobs(#{} = St) ->
    #{
        workers := Workers,
        churn := Churn,
        config := #{max_jobs := MaxJobs}
    } = St,
    Excess = max(0, maps:size(Workers) - MaxJobs),
    lists:foreach(fun stop_job/1, stop_candidates(St, Excess)),
    St#{churn := Churn + Excess}.

stop_candidates(#{}, Top) when is_integer(Top), Top =< 0 ->
    [];
stop_candidates(#{} = St, Top) when is_integer(Top), Top > 0 ->
    #{
        workers := Workers,
        config := #{min_run_time_sec := MinRunTime}
    } = St,

    % [{Pid, {Normal, StartTime}},...]
    WList1 = maps:to_list(Workers),

    % Filter out normal jobs and those which have just started running
    MaxT = erlang:system_time(second) - MinRunTime,
    WList2 = lists:filter(
        fun({_Pid, {Normal, T}}) ->
            not Normal andalso T =< MaxT
        end,
        WList1
    ),

    Sorted = lists:keysort(2, WList2),
    Pids = lists:map(fun({Pid, _}) -> Pid end, Sorted),
    lists:sublist(Pids, Top).

stop_job(Pid) when is_pid(Pid) ->
    % Replication jobs handle the shutdown signal and then checkpoint in
    % terminate handler
    exit(Pid, shutdown).

wait_jobs_exit(#{} = Jobs, _) when map_size(Jobs) =:= 0 ->
    ok;
wait_jobs_exit(#{} = Jobs, Timeout) ->
    receive
        {'EXIT', Pid, _} ->
            wait_jobs_exit(maps:remove(Pid, Jobs), Timeout)
    after Timeout ->
        ?LOG_ERROR(#{
            what => unclean_job_termination,
            in => replicator,
            job_count => map_size(Jobs)
        }),
        LogMsg = "~p : ~p jobs didn't terminate cleanly",
        couch_log:error(LogMsg, [?MODULE, map_size(Jobs)]),
        ok
    end.

spawn_acceptors(St) ->
    #{
        workers := Workers,
        acceptors := Acceptors,
        config := #{max_jobs := MaxJobs, max_acceptors := MaxAcceptors}
    } = St,
    ACnt = maps:size(Acceptors),
    WCnt = maps:size(Workers),
    case ACnt < MaxAcceptors andalso (ACnt + WCnt) < MaxJobs of
        true ->
            {ok, Pid} = couch_replicator_job:start_link(),
            NewSt = St#{acceptors := Acceptors#{Pid => true}},
            spawn_acceptors(NewSt);
        false ->
            St
    end.

% Worker process exit handlers

handle_acceptor_exit(#{acceptors := Acceptors} = St, Pid, Reason) ->
    St1 = St#{acceptors := maps:remove(Pid, Acceptors)},
    ?LOG_ERROR(#{
        what => acceptor_crash,
        in => replicator,
        pid => Pid,
        details => Reason
    }),
    LogMsg = "~p : acceptor process ~p exited with ~p",
    couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
    {noreply, spawn_acceptors(St1)}.

handle_worker_exit(#{workers := Workers} = St, Pid, Reason) ->
    St1 = St#{workers := maps:remove(Pid, Workers)},
    case Reason of
        normal ->
            ok;
        shutdown ->
            ok;
        {shutdown, _} ->
            ok;
        _ ->
            ?LOG_ERROR(#{
                what => worker_crash,
                in => replicator,
                pid => Pid,
                details => Reason
            }),
            LogMsg = "~p : replicator job process ~p exited with ~p",
            couch_log:error(LogMsg, [?MODULE, Pid, Reason])
    end,
    {noreply, spawn_acceptors(St1)}.

handle_unknown_exit(St, Pid, Reason) ->
    ?LOG_ERROR(#{
        what => unknown_process_crash,
        in => replicator,
        pid => Pid
    }),
    LogMsg = "~p : unknown process ~p exited with ~p",
    couch_log:error(LogMsg, [?MODULE, Pid, Reason]),
    {stop, {unknown_pid_exit, Pid}, St}.

get_config() ->
    Defaults = #{
        max_acceptors => ?MAX_ACCEPTORS,
        interval_sec => ?INTERVAL_SEC,
        max_jobs => ?MAX_JOBS,
        max_churn => ?MAX_CHURN,
        min_run_time_sec => ?MIN_RUN_TIME_SEC,
        transient_job_max_age_sec => ?TRANSIENT_JOB_MAX_AGE_SEC
    },
    maps:map(
        fun(K, Default) ->
            config:get_integer("replicator", atom_to_list(K), Default)
        end,
        Defaults
    ).
