% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_mrview_updater).

-export([start_update/3, purge/4, process_doc/3, finish_update/1]).

-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").

-define(REM_VAL, removed).


start_update(Partial, State, NumChanges) ->
    MaxSize = config:get_integer("view_updater", "queue_memory_cap", 100000),
    MaxItems = config:get_integer("view_updater", "queue_item_cap", 500),
    QueueOpts = [{max_size, MaxSize}, {max_items, MaxItems}],
    {ok, DocQueue} = couch_work_queue:new(QueueOpts),
    {ok, WriteQueue} = couch_work_queue:new(QueueOpts),

    InitState = State#mrst{
        first_build=State#mrst.update_seq==0,
        partial_resp_pid=Partial,
        doc_acc=[],
        doc_queue=DocQueue,
        write_queue=WriteQueue
    },

    Self = self(),
    MapFun = fun() ->
        couch_task_status:add_task([
            {indexer_pid, ?l2b(pid_to_list(Partial))},
            {type, indexer},
            {database, State#mrst.db_name},
            {design_document, State#mrst.idx_name},
            {progress, 0},
            {changes_done, 0},
            {total_changes, NumChanges}
        ]),
        couch_task_status:set_update_frequency(500),
        map_docs(Self, InitState)
    end,
    WriteFun = fun() -> write_results(Self, InitState) end,

    spawn_link(MapFun),
    spawn_link(WriteFun),

    {ok, InitState}.


purge(_Db, PurgeSeq, PurgedIdRevs, State) ->
    #mrst{
        id_btree=IdBtree,
        log_btree=LogBtree,
        views=Views
    } = State,

    Ids = [Id || {Id, _Revs} <- PurgedIdRevs],
    {ok, Lookups, LLookups, LogBtree2, IdBtree2} = case LogBtree of
        nil ->
            {ok, L, Bt} = couch_btree:query_modify(IdBtree, Ids, [], Ids),
            {ok, L, [], nil, Bt};
        _ ->
            {ok, L, Bt} = couch_btree:query_modify(IdBtree, Ids, [], Ids),
            {ok, LL, LBt} = couch_btree:query_modify(LogBtree, Ids, [], Ids),
            {ok, L, LL, LBt, Bt}
    end,

    MakeDictFun = fun
        ({ok, {DocId, ViewNumRowKeys}}, DictAcc) ->
            FoldFun = fun
                ({ViewNum, {Key, Seq, _Op}}, DictAcc2) ->
                    dict:append(ViewNum, {Key, Seq, DocId}, DictAcc2);
                ({ViewNum, RowKey}, DictAcc2) ->
                    dict:append(ViewNum, {RowKey, DocId}, DictAcc2)
            end,
            lists:foldl(FoldFun, DictAcc, ViewNumRowKeys);
        ({not_found, _}, DictAcc) ->
            DictAcc
    end,
    KeysToRemove = lists:foldl(MakeDictFun, dict:new(), Lookups),
    SeqsToRemove = lists:foldl(MakeDictFun, dict:new(), LLookups),

    RemKeysFun = fun(#mrview{id_num=ViewId}=View) ->
        #mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View,
        ToRem = couch_util:dict_find(ViewId, KeysToRemove, []),
        {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, [], ToRem),
        NewPurgeSeq = case VBtree2 =/= View#mrview.btree of
            true -> PurgeSeq;
            _ -> View#mrview.purge_seq
        end,
        {SeqBtree3, KeyBySeqBtree3} = if SIndexed orelse KSIndexed ->
            SToRem = couch_util:dict_find(ViewId, SeqsToRemove, []),
            {ok, SeqBtree2} = if SIndexed ->
                SKs = [{Seq, Key} || {Key, Seq, _} <- SToRem],
                couch_btree:add_remove(View#mrview.seq_btree,
                                       [], SKs);
            true ->
                {ok, nil}
            end,
            {ok, KeyBySeqBtree2} = if KSIndexed ->
                KSs = [{[Seq, Key], DocId} || {Key, Seq, DocId} <- SToRem],
                couch_btree:add_remove(View#mrview.key_byseq_btree,
                                       [], KSs);
            true ->
                {ok, nil}
            end,
            {SeqBtree2, KeyBySeqBtree2};
        true ->
            {nil, nil}
        end,

        View#mrview{btree=VBtree2,
                    seq_btree=SeqBtree3,
                    key_byseq_btree=KeyBySeqBtree3,
                    purge_seq=NewPurgeSeq}

    end,

    Views2 = lists:map(RemKeysFun, Views),
    {ok, State#mrst{
        id_btree=IdBtree2,
        log_btree=LogBtree2,
        views=Views2,
        purge_seq=PurgeSeq
    }}.


process_doc(Doc, Seq, #mrst{doc_acc=Acc}=State) when length(Acc) > 100 ->
    couch_work_queue:queue(State#mrst.doc_queue, lists:reverse(Acc)),
    process_doc(Doc, Seq, State#mrst{doc_acc=[]});
process_doc(nil, Seq, #mrst{doc_acc=Acc}=State) ->
    {ok, State#mrst{doc_acc=[{nil, Seq, nil, nil} | Acc]}};
process_doc(#doc{id=Id, deleted=true}=Doc, Seq, #mrst{doc_acc=Acc}=State) ->
    Rev= extract_rev(Doc#doc.revs),
    {ok, State#mrst{doc_acc=[{Id, Seq, Rev, deleted} | Acc]}};
process_doc(#doc{id=Id}=Doc, Seq, #mrst{doc_acc=Acc}=State) ->
    Rev = extract_rev(Doc#doc.revs),
    {ok, State#mrst{doc_acc=[{Id, Seq, Rev, Doc} | Acc]}}.

extract_rev({0, []}) ->
    {0, []};
extract_rev({RevPos, [Rev | _]}) ->
    {RevPos, Rev}.

finish_update(#mrst{doc_acc=Acc}=State) ->
    if Acc /= [] ->
        couch_work_queue:queue(State#mrst.doc_queue, Acc);
        true -> ok
    end,
    couch_work_queue:close(State#mrst.doc_queue),
    receive
        {new_state, NewState} ->
            {ok, NewState#mrst{
                first_build=undefined,
                partial_resp_pid=undefined,
                doc_acc=undefined,
                doc_queue=undefined,
                write_queue=undefined,
                qserver=nil
            }}
    end.


map_docs(Parent, #mrst{db_name = DbName, idx_name = IdxName} = State0) ->
    erlang:put(io_priority, {view_update, DbName, IdxName}),
    case couch_work_queue:dequeue(State0#mrst.doc_queue) of
        closed ->
            couch_query_servers:stop_doc_map(State0#mrst.qserver),
            couch_work_queue:close(State0#mrst.write_queue);
        {ok, Dequeued} ->
            % Run all the non deleted docs through the view engine and
            % then pass the results on to the writer process.
            State1 = case State0#mrst.qserver of
                nil -> start_query_server(State0);
                _ -> State0
            end,
            QServer = State1#mrst.qserver,
            DocFun = fun
                ({nil, Seq, _, _}, {SeqAcc, Results}) ->
                    {erlang:max(Seq, SeqAcc), Results};
                ({Id, Seq, Rev, deleted}, {SeqAcc, Results}) ->
                    {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, []} | Results]};
                ({Id, Seq, Rev, Doc}, {SeqAcc, Results}) ->
                    couch_stats:increment_counter([couchdb, mrview, map_doc]),
                    {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
                    {erlang:max(Seq, SeqAcc), [{Id, Seq, Rev, Res} | Results]}
            end,
            FoldFun = fun(Docs, Acc) ->
                update_task(length(Docs)),
                lists:foldl(DocFun, Acc, Docs)
            end,
            Results = lists:foldl(FoldFun, {0, []}, Dequeued),
            couch_work_queue:queue(State1#mrst.write_queue, Results),
            map_docs(Parent, State1)
    end.


write_results(Parent, #mrst{db_name = DbName, idx_name = IdxName} = State) ->
    case accumulate_writes(State, State#mrst.write_queue, nil) of
        stop ->
            Parent ! {new_state, State};
        {Go, {Seq, ViewKVs, DocIdKeys, Seqs, Log}} ->
            erlang:put(io_priority, {view_update, DbName, IdxName}),
            NewState = write_kvs(State, Seq, ViewKVs, DocIdKeys, Seqs, Log),
            if Go == stop ->
                Parent ! {new_state, NewState};
            true ->
                send_partial(NewState#mrst.partial_resp_pid, NewState),
                write_results(Parent, NewState)
            end
    end.


start_query_server(State) ->
    #mrst{
        language=Language,
        lib=Lib,
        views=Views
    } = State,
    Defs = [View#mrview.def || View <- Views],
    {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib),
    State#mrst{qserver=QServer}.


accumulate_writes(State, W, Acc0) ->
    {Seq, ViewKVs, DocIdKVs, Seqs, Log} = case Acc0 of
        nil -> {0, [{V#mrview.id_num, {[], []}} || V <- State#mrst.views], [], dict:new(), dict:new()};
        _ -> Acc0
    end,
    case couch_work_queue:dequeue(W) of
        closed when Seq == 0 ->
            stop;
        closed ->
            {stop, {Seq, ViewKVs, DocIdKVs, Seqs, Log}};
        {ok, Info} ->
            {_, _, NewIds, _, _} = Acc = merge_results(Info, Seq, ViewKVs, DocIdKVs, Seqs, Log),
            case accumulate_more(length(NewIds), Acc) of
                true -> accumulate_writes(State, W, Acc);
                false -> {ok, Acc}
            end
    end.


accumulate_more(NumDocIds, Acc) ->
    % check if we have enough items now
    MinItems = config:get("view_updater", "min_writer_items", "100"),
    MinSize = config:get("view_updater", "min_writer_size", "16777216"),
    CurrMem = ?term_size(Acc),
    NumDocIds < list_to_integer(MinItems)
        andalso CurrMem < list_to_integer(MinSize).


merge_results([], SeqAcc, ViewKVs, DocIdKeys, Seqs, Log) ->
    {SeqAcc, ViewKVs, DocIdKeys, Seqs, Log};
merge_results([{Seq, Results} | Rest], SeqAcc, ViewKVs, DocIdKeys, Seqs, Log) ->
    Fun = fun(RawResults, {VKV, DIK, Seqs2, Log2}) ->
        merge_results(RawResults, VKV, DIK, Seqs2, Log2)
    end,
    {ViewKVs1, DocIdKeys1, Seqs1, Log1} = lists:foldl(Fun, {ViewKVs, DocIdKeys, Seqs, Log}, Results),
    merge_results(Rest, erlang:max(Seq, SeqAcc), ViewKVs1, DocIdKeys1, Seqs1, Log1).


merge_results({DocId, Seq, Rev, []}, ViewKVs, DocIdKeys, Seqs, Log) ->
    {ViewKVs, [{DocId, []} | DocIdKeys], dict:store(DocId, Seq, Seqs), dict:store({DocId, Rev}, [], Log)};
merge_results({DocId, Seq, Rev, RawResults}, ViewKVs, DocIdKeys, Seqs, Log) ->
    JsonResults = couch_query_servers:raw_to_ejson(RawResults),
    Results = [[list_to_tuple(Res) || Res <- FunRs] || FunRs <- JsonResults],
    case lists:flatten(Results) of
        [] ->
            {ViewKVs, [{DocId, []} | DocIdKeys], dict:store(DocId, Seq, Seqs), dict:store({DocId, Rev}, [], Log)};
        _ ->
            {ViewKVs1, ViewIdKeys, Log1} = insert_results(DocId, Seq, Rev, Results, ViewKVs, [], [], Log),
            {ViewKVs1, [ViewIdKeys | DocIdKeys], dict:store(DocId, Seq, Seqs), Log1}
    end.


insert_results(DocId, _Seq, _Rev, [], [], ViewKVs, ViewIdKeys, Log) ->
    {lists:reverse(ViewKVs), {DocId, ViewIdKeys}, Log};
insert_results(DocId, Seq, Rev, [KVs | RKVs], [{Id, {VKVs, SKVs}} | RVKVs], VKVAcc,
               VIdKeys, Log) ->
    CombineDupesFun = fun
        ({Key, Val}, {[{Key, {dups, Vals}} | Rest], IdKeys, Log2}) ->
            {[{Key, {dups, [Val | Vals]}} | Rest], IdKeys, Log2};
        ({Key, Val1}, {[{Key, Val2} | Rest], IdKeys, Log2}) ->
            {[{Key, {dups, [Val1, Val2]}} | Rest], IdKeys, Log2};
        ({Key, Value}, {Rest, IdKeys, Log2}) ->
            {[{Key, Value} | Rest], [{Id, Key} | IdKeys],
             dict:append({DocId, Rev}, {Id, {Key, Seq, add}}, Log2)}
    end,
    InitAcc = {[], VIdKeys, Log},
    couch_stats:increment_counter([couchdb, mrview, emits], length(KVs)),
    {Duped, VIdKeys0, Log1} = lists:foldl(CombineDupesFun, InitAcc,
                                          lists:sort(KVs)),
    FinalKVs = [{{Key, DocId}, Val} || {Key, Val} <- Duped] ++ VKVs,
    FinalSKVs = [{{Seq, Key}, {DocId, Val, Rev}} || {Key, Val} <- Duped] ++ SKVs,
    insert_results(DocId, Seq, Rev, RKVs, RVKVs,
                  [{Id, {FinalKVs, FinalSKVs}} | VKVAcc], VIdKeys0, Log1).


write_kvs(State, UpdateSeq, ViewKVs, DocIdKeys, Seqs, Log0) ->
    #mrst{
        id_btree=IdBtree,
        log_btree=LogBtree,
        first_build=FirstBuild
    } = State,

    Revs = dict:from_list(dict:fetch_keys(Log0)),

    Log = dict:fold(fun({Id, _Rev}, DIKeys, Acc) ->
        dict:store(Id, DIKeys, Acc)
    end, dict:new(), Log0),

    {ok, ToRemove, IdBtree2} = update_id_btree(IdBtree, DocIdKeys, FirstBuild),
    ToRemByView = collapse_rem_keys(ToRemove, dict:new()),

    {ok, SeqsToAdd, SeqsToRemove, LogBtree2} = case LogBtree of
        nil -> {ok, undefined, undefined, nil};
        _ -> update_log(LogBtree, Log, Revs, Seqs, FirstBuild)
    end,

    UpdateView = fun(#mrview{id_num=ViewId}=View, {ViewId, {KVs, SKVs}}) ->
        #mrview{seq_indexed=SIndexed, keyseq_indexed=KSIndexed} = View,
        ToRem = couch_util:dict_find(ViewId, ToRemByView, []),
        {ok, VBtree2} = couch_btree:add_remove(View#mrview.btree, KVs, ToRem),
        NewUpdateSeq = case VBtree2 =/= View#mrview.btree of
            true -> UpdateSeq;
            _ -> View#mrview.update_seq
        end,

        %% store the view changes.
        {SeqBtree3, KeyBySeqBtree3} = if SIndexed orelse KSIndexed ->
            SToRem = couch_util:dict_find(ViewId, SeqsToRemove, []),
            SToAdd = couch_util:dict_find(ViewId, SeqsToAdd, []),
            SKVs1 = SKVs ++ SToAdd,

            {ok, SeqBtree2} = if SIndexed ->
                RemSKs = lists:sort([{Seq, Key} || {Key, Seq, _} <- SToRem]),
                couch_btree:add_remove(View#mrview.seq_btree,
                                       SKVs1, RemSKs);
            true ->
                {ok, nil}
            end,

            {ok, KeyBySeqBtree2} = if KSIndexed ->
                RemKSs = [{[Key, Seq], DocId} || {Key, Seq, DocId} <- SToRem],
                couch_btree:add_remove(View#mrview.key_byseq_btree,
                                       couch_mrview_util:to_key_seq(SKVs1),
                                       RemKSs);
            true ->
                {ok, nil}
            end,
            {SeqBtree2, KeyBySeqBtree2};
        true ->
            {nil, nil}
        end,
        View2 = View#mrview{btree=VBtree2,
                    seq_btree=SeqBtree3,
                    key_byseq_btree=KeyBySeqBtree3,
                    update_seq=NewUpdateSeq},
        maybe_notify(State, View2, KVs, ToRem),
        View2
    end,

    State#mrst{
        views=lists:zipwith(UpdateView, State#mrst.views, ViewKVs),
        update_seq=UpdateSeq,
        id_btree=IdBtree2,
        log_btree=LogBtree2
    }.

update_id_btree(Btree, DocIdKeys, true) ->
    ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
    couch_btree:query_modify(Btree, [], ToAdd, []);
update_id_btree(Btree, DocIdKeys, _) ->
    ToFind = [Id || {Id, _} <- DocIdKeys],
    ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- DocIdKeys, DIKeys /= []],
    ToRem = [Id || {Id, DIKeys} <- DocIdKeys, DIKeys == []],
    couch_btree:query_modify(Btree, ToFind, ToAdd, ToRem).


update_log(Btree, Log, _Revs, _Seqs, true) ->
    ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- dict:to_list(Log),
                             DIKeys /= []],
    {ok, LogBtree2} = couch_btree:add_remove(Btree, ToAdd, []),
    {ok, dict:new(), dict:new(), LogBtree2};
update_log(Btree, Log, Revs, Seqs, _) ->
    %% build list of updated keys and Id
    {ToLook, Updated, Removed} = dict:fold(
        fun(Id, [], {IdsAcc, KeysAcc, RemAcc}) ->
            {[Id | IdsAcc], KeysAcc, RemAcc};
        (Id, DIKeys, {IdsAcc, KeysAcc, RemAcc}) ->
            {KeysAcc1, RemAcc1} = lists:foldl(fun({ViewId, {Key, _Seq, Op}}, {KeysAcc2, RemAcc2}) ->
                case Op of
                    add -> {[{Id, ViewId, Key}|KeysAcc2], RemAcc2};
                    del -> {KeysAcc2, [{Id, ViewId, Key}|RemAcc2]}
                end
            end, {KeysAcc, RemAcc}, DIKeys),
            {[Id | IdsAcc], KeysAcc1, RemAcc1}
        end, {[], [], []}, Log),

    MapFun = fun({ok, KV}) -> [KV]; (not_found) -> [] end,
    KVsToLook = lists:flatmap(MapFun, couch_btree:lookup(Btree, ToLook)),

    {Log1, AddAcc, DelAcc} = lists:foldl(fun({DocId, VIdKeys}, Acc) ->
        lists:foldl(fun({ViewId, {Key, OldSeq, _Op}}, {Log4, AddAcc4, DelAcc4}) ->

            IsUpdated = lists:member({DocId, ViewId, Key}, Updated),
            IsRemoved = lists:member({DocId, ViewId, Key}, Removed),

            case IsUpdated of
                true ->
                    % the log is updated, deleted old record from the view
                    DelAcc5 = dict:append(ViewId, {Key, OldSeq, DocId}, DelAcc4),
                    {Log4, AddAcc4, DelAcc5};
                false ->
                    % an update operation has been logged for this key. We must
                    % now record it as deleted in the log, remove the old record
                    % in the view and update the view with a removed record.
                    NewSeq = dict:fetch(DocId, Seqs),
                    Log5 = case IsRemoved of
                        false ->
                            dict:append(DocId, {ViewId, {Key, NewSeq, del}}, Log4);
                        true ->
                            Log4
                    end,
                    Rev = dict:fetch(DocId, Revs),
                    DelAcc5 = dict:append(ViewId, {Key, OldSeq, DocId}, DelAcc4),
                    AddAcc5 = dict:append(ViewId, {{NewSeq, Key}, {DocId, ?REM_VAL, Rev}}, AddAcc4),
                    {Log5, AddAcc5, DelAcc5}
            end
        end, Acc, VIdKeys)
    end, {Log, dict:new(), dict:new()}, KVsToLook),

    ToAdd = [{Id, DIKeys} || {Id, DIKeys} <- dict:to_list(Log1), DIKeys /= []],
    % store the new logs
    {ok, LogBtree2} = couch_btree:add_remove(Btree, ToAdd, []),
    {ok, AddAcc, DelAcc, LogBtree2}.

collapse_rem_keys([], Acc) ->
    Acc;
collapse_rem_keys([{ok, {DocId, ViewIdKeys}} | Rest], Acc) ->
    NewAcc = lists:foldl(fun({ViewId, Key}, Acc2) ->
        dict:append(ViewId, {Key, DocId}, Acc2)
    end, Acc, ViewIdKeys),
    collapse_rem_keys(Rest, NewAcc);
collapse_rem_keys([{not_found, _} | Rest], Acc) ->
    collapse_rem_keys(Rest, Acc).


send_partial(Pid, State) when is_pid(Pid) ->
    gen_server:cast(Pid, {new_state, State});
send_partial(_, _) ->
    ok.


update_task(NumChanges) ->
    [Changes, Total] = couch_task_status:get([changes_done, total_changes]),
    Changes2 = Changes + NumChanges,
    Progress = case Total of
        0 ->
            % updater restart after compaction finishes
            0;
        _ ->
            (Changes2 * 100) div Total
    end,
    couch_task_status:update([{progress, Progress}, {changes_done, Changes2}]).


maybe_notify(State, View, KVs, ToRem) ->
    Updated = fun() ->
        [Key || {{Key, _}, _} <- KVs]
    end,
    Removed = fun() ->
        [Key || {Key, _DocId} <- ToRem]
    end,
    couch_index_plugin:index_update(State, View, Updated, Removed).
