% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.


%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-

-module(dreyfus_index_updater).
-include_lib("couch/include/couch_db.hrl").
-include("dreyfus.hrl").

-export([update/2, load_docs/2]).

-import(couch_query_servers, [get_os_process/1, ret_os_process/1, proc_prompt/2]).

update(IndexPid, Index) ->
    #index{
        current_seq = CurSeq,
        dbname = DbName,
        ddoc_id = DDocId,
        name = IndexName
    } = Index,
    erlang:put(io_priority, {search, DbName, IndexName}),
    {ok, Db} = couch_db:open_int(DbName, []),
    try
        TotalUpdateChanges = couch_db:count_changes_since(Db, CurSeq),
        TotalPurgeChanges = count_pending_purged_docs_since(Db, IndexPid),
        TotalChanges = TotalUpdateChanges + TotalPurgeChanges,

        couch_task_status:add_task([
            {type, search_indexer},
            {database, DbName},
            {design_document, DDocId},
            {index, IndexName},
            {progress, 0},
            {changes_done, 0},
            {total_changes, TotalChanges}
        ]),

        %% update status every half second
        couch_task_status:set_update_frequency(500),

        %ExcludeIdRevs is [{Id1, Rev1}, {Id2, Rev2}, ...]
        %The Rev is the final Rev, not purged Rev.
        {ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index),
        %% compute on all docs modified since we last computed.

        NewCurSeq = couch_db:get_update_seq(Db),
        Proc = get_os_process(Index#index.def_lang),
        try
            true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]),
            EnumFun = fun ?MODULE:load_docs/2,
            [Changes] = couch_task_status:get([changes_done]),
            Acc0 = {Changes, IndexPid, Db, Proc, TotalChanges, now(), ExcludeIdRevs},
            {ok, _} = couch_db:fold_changes(Db, CurSeq, EnumFun, Acc0, []),
            ok = clouseau_rpc:commit(IndexPid, NewCurSeq)
        after
            ret_os_process(Proc)
        end,
        exit({updated, NewCurSeq})
    after
        couch_db:close(Db)
    end.

load_docs(FDI, {I, IndexPid, Db, Proc, Total, LastCommitTime, ExcludeIdRevs}=Acc) ->
    couch_task_status:update([{changes_done, I}, {progress, (I * 100) div Total}]),
    DI = couch_doc:to_doc_info(FDI),
    #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{rev=Rev}|_]} = DI,
    %check if it is processed in purge_index to avoid update the index again.
    case lists:member({Id, Rev}, ExcludeIdRevs) of
        true -> ok;
        false -> update_or_delete_index(IndexPid, Db, DI, Proc)
    end,
    %% Force a commit every minute
    case timer:now_diff(Now = now(), LastCommitTime) >= 60000000 of
        true ->
            ok = clouseau_rpc:commit(IndexPid, Seq),
            {ok, {I+1, IndexPid, Db, Proc, Total, Now, ExcludeIdRevs}};
        false ->
            {ok, setelement(1, Acc, I+1)}
    end.

purge_index(Db, IndexPid, Index) ->
    {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid),
    Proc = get_os_process(Index#index.def_lang),
    try
        true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]),
        FoldFun = fun({PurgeSeq, _UUID, Id, _Revs}, {Acc, _}) ->
            Acc0 = case couch_db:get_full_doc_info(Db, Id) of
                not_found ->
                    ok = clouseau_rpc:delete(IndexPid, Id),
                    Acc;
                FDI ->
                    DI = couch_doc:to_doc_info(FDI),
                    #doc_info{id=Id, revs=[#rev_info{rev=Rev}|_]} = DI,
                    case lists:member({Id, Rev}, Acc) of
                        true -> Acc;
                        false ->
                            update_or_delete_index(IndexPid, Db, DI, Proc),
                            [{Id, Rev} | Acc]
                    end
            end,
            update_task(1),
            {ok, {Acc0, PurgeSeq}}
        end,

        {ok, {ExcludeList, NewPurgeSeq}} = couch_db:fold_purge_infos(
            Db, IdxPurgeSeq, FoldFun, {[], 0}, []),
        clouseau_rpc:set_purge_seq(IndexPid, NewPurgeSeq),
        update_local_doc(Db, Index, NewPurgeSeq),
        {ok, ExcludeList}
    after
        ret_os_process(Proc)
    end.

count_pending_purged_docs_since(Db, IndexPid) ->
    DbPurgeSeq = couch_db:get_purge_seq(Db),
    {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid),
    DbPurgeSeq - IdxPurgeSeq.

update_or_delete_index(IndexPid, Db, DI, Proc) ->
    #doc_info{id=Id, revs=[#rev_info{deleted=Del}|_]} = DI,
    case Del of
        true ->
            ok = clouseau_rpc:delete(IndexPid, Id);
        false ->
            case maybe_skip_doc(Db, Id) of
                true ->
                    ok;
                false ->
                    {ok, Doc} = couch_db:open_doc(Db, DI, []),
                    Json = couch_doc:to_json_obj(Doc, []),
                    [Fields|_] = proc_prompt(Proc, [<<"index_doc">>, Json]),
                    Fields1 = [list_to_tuple(Field) || Field <- Fields],
                    Fields2 = maybe_add_partition(Db, Id, Fields1),
                    case Fields2 of
                        [] -> ok = clouseau_rpc:delete(IndexPid, Id);
                        _  -> ok = clouseau_rpc:update(IndexPid, Id, Fields2)
                    end
            end
    end.

update_local_doc(Db, Index, PurgeSeq) ->
    DocId = dreyfus_util:get_local_purge_doc_id(Index#index.sig),
    DocContent = dreyfus_util:get_local_purge_doc_body(Db, DocId, PurgeSeq, Index),
    couch_db:update_doc(Db, DocContent, []).

update_task(NumChanges) ->
    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
    Changes2 = Changes + NumChanges,
    Progress = case Total of
        0 ->
            0;
        _ ->
            (Changes2 * 100) div Total
    end,
    couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]).

maybe_skip_doc(Db, <<"_design/", _/binary>>) ->
    couch_db:is_partitioned(Db);
maybe_skip_doc(_Db, _Id) ->
    false.

maybe_add_partition(_Db, _Id, []) ->
    [];
maybe_add_partition(Db, Id, Fields) ->
    case couch_db:is_partitioned(Db) of
        true ->
            Partition = couch_partition:from_docid(Id),
            [{<<"_partition">>, Partition, {[]}} | Fields];
        false ->
            Fields
    end.
