% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(fabric_rpc).

-export([get_db_info/1, get_doc_count/1, get_update_seq/1]).
-export([open_doc/3, open_revs/4, get_missing_revs/2, get_missing_revs/3,
    update_docs/3]).
-export([all_docs/2, changes/3, map_view/4, reduce_view/4, group_info/2]).
-export([create_db/1, delete_db/1, reset_validation_funs/1, set_security/3,
    set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]).
-export([get_all_security/2]).

-include_lib("fabric/include/fabric.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").

-record (view_acc, {
    db,
    limit,
    include_docs,
    conflicts,
    doc_info = nil,
    offset = nil,
    total_rows,
    reduce_fun = fun couch_db:enum_docs_reduce_to_count/1,
    group_level = 0
}).

%% rpc endpoints
%%  call to with_db will supply your M:F with a #db{} and then remaining args

all_docs(DbName, #mrargs{keys=undefined} = QueryArgs) ->
    {ok, Db} = get_or_create_db(DbName, []),
    #mrargs{
        start_key = StartKey,
        start_key_docid = StartDocId,
        end_key = EndKey,
        end_key_docid = EndDocId,
        limit = Limit,
        skip = Skip,
        include_docs = IncludeDocs,
        direction = Dir,
        inclusive_end = Inclusive,
        extra = Extra
    } = QueryArgs,
    set_io_priority(DbName, Extra),
    {ok, Total} = couch_db:get_doc_count(Db),
    Acc0 = #view_acc{
        db = Db,
        include_docs = IncludeDocs,
        conflicts = proplists:get_value(conflicts, Extra, false),
        limit = Limit+Skip,
        total_rows = Total
    },
    EndKeyType = if Inclusive -> end_key; true -> end_key_gt end,
    Options = [
        {dir, Dir},
        {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end},
        {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end}
    ],
    {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options),
    final_response(Total, Acc#view_acc.offset).

changes(DbName, #changes_args{} = Args, StartSeq) ->
    changes(DbName, [Args], StartSeq);
changes(DbName, Options, StartSeq) ->
    erlang:put(io_priority, {interactive, DbName}),
    #changes_args{dir=Dir} = Args = lists:keyfind(changes_args, 1, Options),
    case get_or_create_db(DbName, []) of
    {ok, Db} ->
        Enum = fun changes_enumerator/2,
        Opts = [{dir,Dir}],
        Acc0 = {Db, StartSeq, Args, Options},
        try
            {ok, {_, LastSeq, _, _}} =
                couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0),
            rexi:reply({complete, LastSeq})
        after
            couch_db:close(Db)
        end;
    Error ->
        rexi:reply(Error)
    end.

map_view(DbName, DDoc, ViewName, QueryArgs) ->
    {ok, Db} = get_or_create_db(DbName, []),
    #mrargs{
        limit = Limit,
        skip = Skip,
        keys = Keys,
        include_docs = IncludeDocs,
        stale = Stale,
        view_type = ViewType,
        extra = Extra
    } = QueryArgs,
    set_io_priority(DbName, Extra),
    {LastSeq, MinSeq} = calculate_seqs(Db, Stale),
    Group0 = couch_view_group:design_doc_to_view_group(DDoc),
    {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
    {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
    maybe_update_view_group(Pid, LastSeq, Stale),
    erlang:monitor(process, couch_view_group:get_fd(Group)),
    Views = couch_view_group:get_views(Group),
    View = fabric_view:extract_view(Pid, ViewName, Views, ViewType),
    {ok, Total} = couch_view:get_row_count(View),
    Acc0 = #view_acc{
        db = Db,
        include_docs = IncludeDocs,
        conflicts = proplists:get_value(conflicts, Extra, false),
        limit = Limit+Skip,
        total_rows = Total,
        reduce_fun = fun couch_view:reduce_to_count/1
    },
    case Keys of
    undefined ->
        Options = couch_httpd_view:make_key_options(QueryArgs),
        {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options);
    _ ->
        Acc = lists:foldl(fun(Key, AccIn) ->
            KeyArgs = QueryArgs#mrargs{start_key=Key, end_key=Key},
            Options = couch_httpd_view:make_key_options(KeyArgs),
            {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn,
                Options),
            Out
        end, Acc0, Keys)
    end,
    final_response(Total, Acc#view_acc.offset).

reduce_view(DbName, #doc{} = DDoc, ViewName, QueryArgs) ->
    Group = couch_view_group:design_doc_to_view_group(DDoc),
    reduce_view(DbName, Group, ViewName, QueryArgs);
reduce_view(DbName, Group0, ViewName, QueryArgs) ->
    erlang:put(io_priority, {interactive, DbName}),
    {ok, Db} = get_or_create_db(DbName, []),
    #mrargs{
        group_level = GroupLevel,
        limit = Limit,
        skip = Skip,
        keys = Keys,
        stale = Stale,
        extra = Extra
    } = QueryArgs,
    set_io_priority(DbName, Extra),
    GroupFun = group_rows_fun(GroupLevel),
    {LastSeq, MinSeq} = calculate_seqs(Db, Stale),
    {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
    {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
    maybe_update_view_group(Pid, LastSeq, Stale),
    Lang = couch_view_group:get_language(Group),
    Views = couch_view_group:get_views(Group),
    erlang:monitor(process, couch_view_group:get_fd(Group)),
    {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce),
    ReduceView = {reduce, NthRed, Lang, View},
    Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip},
    case Keys of
    undefined ->
        Options0 = couch_httpd_view:make_key_options(QueryArgs),
        Options = [{key_group_fun, GroupFun} | Options0],
        couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options);
    _ ->
        lists:map(fun(Key) ->
            KeyArgs = QueryArgs#mrargs{start_key=Key, end_key=Key},
            Options0 = couch_httpd_view:make_key_options(KeyArgs),
            Options = [{key_group_fun, GroupFun} | Options0],
            couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options)
        end, Keys)
    end,
    rexi:reply(complete).

calculate_seqs(Db, Stale) ->
    LastSeq = couch_db:get_update_seq(Db),
    if
        Stale == ok orelse Stale == update_after ->
            {LastSeq, 0};
        true ->
            {LastSeq, LastSeq}
    end.

maybe_update_view_group(GroupPid, LastSeq, update_after) ->
    couch_view_group:trigger_group_update(GroupPid, LastSeq);
maybe_update_view_group(_, _, _) ->
    ok.

create_db(DbName) ->
    rexi:reply(case couch_server:create(DbName, []) of
    {ok, _} ->
        ok;
    Error ->
        Error
    end).

create_shard_db_doc(_, Doc) ->
    rexi:reply(mem3_util:write_db_doc(Doc)).

delete_db(DbName) ->
    couch_server:delete(DbName, []).

delete_shard_db_doc(_, DocId) ->
    rexi:reply(mem3_util:delete_db_doc(DocId)).

get_db_info(DbName) ->
    with_db(DbName, [], {couch_db, get_db_info, []}).

get_doc_count(DbName) ->
    with_db(DbName, [], {couch_db, get_doc_count, []}).

get_update_seq(DbName) ->
    with_db(DbName, [], {couch_db, get_update_seq, []}).

set_security(DbName, SecObj, Options) ->
    with_db(DbName, Options, {couch_db, set_security, [SecObj]}).

get_all_security(DbName, Options) ->
    with_db(DbName, Options, {couch_db, get_security, []}).

set_revs_limit(DbName, Limit, Options) ->
    with_db(DbName, Options, {couch_db, set_revs_limit, [Limit]}).

open_doc(DbName, DocId, Options) ->
    with_db(DbName, Options, {couch_db, open_doc, [DocId, Options]}).

open_revs(DbName, Id, Revs, Options) ->
    with_db(DbName, Options, {couch_db, open_doc_revs, [Id, Revs, Options]}).

get_missing_revs(DbName, IdRevsList) ->
    get_missing_revs(DbName, IdRevsList, []).

get_missing_revs(DbName, IdRevsList, Options) ->
    % reimplement here so we get [] for Ids with no missing revs in response
    set_io_priority(DbName, Options),
    rexi:reply(case get_or_create_db(DbName, Options) of
    {ok, Db} ->
        Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
        {ok, lists:zipwith(fun({Id, Revs}, FullDocInfoResult) ->
            case FullDocInfoResult of
            {ok, #full_doc_info{rev_tree=RevisionTree} = FullInfo} ->
                MissingRevs = couch_key_tree:find_missing(RevisionTree, Revs),
                {Id, MissingRevs, possible_ancestors(FullInfo, MissingRevs)};
            not_found ->
                {Id, Revs, []}
            end
        end, IdRevsList, couch_btree:lookup(Db#db.id_tree, Ids))};
    Error ->
        Error
    end).

update_docs(DbName, Docs0, Options) ->
    case proplists:get_value(replicated_changes, Options) of
    true ->
        X = replicated_changes;
    _ ->
        X = interactive_edit
    end,
    Docs = make_att_readers(Docs0),
    with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).

group_info(DbName, Group0) ->
    {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
    rexi:reply(couch_view_group:request_group_info(Pid)).

reset_validation_funs(DbName) ->
    case get_or_create_db(DbName, []) of
    {ok, #db{main_pid = Pid}} ->
        gen_server:cast(Pid, {load_validation_funs, undefined});
    _ ->
        ok
    end.

%%
%% internal
%%

with_db(DbName, Options, {M,F,A}) ->
    set_io_priority(DbName, Options),
    case get_or_create_db(DbName, Options) of
    {ok, Db} ->
        rexi:reply(try
            apply(M, F, [Db | A])
        catch Exception ->
            Exception;
        error:Reason ->
            couch_log:error("rpc ~p:~p/~p ~p ~p", [M, F, length(A)+1, Reason,
                clean_stack()]),
            {error, Reason}
        end);
    Error ->
        rexi:reply(Error)
    end.

get_or_create_db(DbName, Options) ->
    case couch_db:open_int(DbName, Options) of
    {not_found, no_db_file} ->
        couch_log:warning("~p creating ~s", [?MODULE, DbName]),
        couch_server:create(DbName, Options);
    Else ->
        Else
    end.

view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
    % matches for _all_docs and translates #full_doc_info{} -> KV pair
    case couch_doc:to_doc_info(FullDocInfo) of
    #doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]} = DI ->
        Value = {[{rev,couch_doc:rev_to_str(Rev)}]},
        view_fold({{Id,Id}, Value}, OffsetReds, Acc#view_acc{doc_info=DI});
    #doc_info{revs=[#rev_info{deleted=true}|_]} ->
        {ok, Acc}
    end;
view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) ->
    % calculates the offset for this shard
    #view_acc{reduce_fun=Reduce} = Acc,
    Offset = Reduce(OffsetReds),
    case rexi:sync_reply({total_and_offset, Total, Offset}) of
    ok ->
        view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset});
    stop ->
        exit(normal);
    timeout ->
        exit(timeout)
    end;
view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) ->
    % we scanned through limit+skip local rows
    {stop, Acc};
view_fold({{Key,Id}, Value}, _Offset, Acc) ->
    % the normal case
    #view_acc{
        db = Db,
        doc_info = DocInfo,
        limit = Limit,
        conflicts = Conflicts,
        include_docs = IncludeDocs
    } = Acc,
    case Value of {Props} ->
        LinkedDocs = (couch_util:get_value(<<"_id">>, Props) =/= undefined);
    _ ->
        LinkedDocs = false
    end,
    if LinkedDocs ->
        % we'll embed this at a higher level b/c the doc may be non-local
        Doc = undefined;
    IncludeDocs ->
        IdOrInfo = if DocInfo =/= nil -> DocInfo; true -> Id end,
        Options = if Conflicts -> [conflicts]; true -> [] end,
        case couch_db:open_doc(Db, IdOrInfo, Options) of
        {not_found, deleted} ->
            Doc = null;
        {not_found, missing} ->
            Doc = undefined;
        {ok, Doc0} ->
            Doc = couch_doc:to_json_obj(Doc0, [])
        end;
    true ->
        Doc = undefined
    end,
    case rexi:stream(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of
        ok ->
            {ok, Acc#view_acc{limit=Limit-1}};
        timeout ->
            exit(timeout)
    end.

final_response(Total, nil) ->
    case rexi:sync_reply({total_and_offset, Total, Total}) of ok ->
        rexi:reply(complete);
    stop ->
        ok;
    timeout ->
        exit(timeout)
    end;
final_response(_Total, _Offset) ->
    rexi:reply(complete).

%% TODO: handle case of bogus group level
group_rows_fun(exact) ->
    fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end;
group_rows_fun(0) ->
    fun(_A, _B) -> true end;
group_rows_fun(GroupLevel) when is_integer(GroupLevel) ->
    fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) ->
        lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel);
    ({Key1,_}, {Key2,_}) ->
        Key1 == Key2
    end.

reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) ->
    {stop, Acc};
reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) ->
    send(null, Red, Acc);
reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) ->
    send(Key, Red, Acc);
reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) ->
    send(lists:sublist(K, I), Red, Acc);
reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 ->
    send(K, Red, Acc).


send(Key, Value, #view_acc{limit=Limit} = Acc) ->
    case put(fabric_sent_first_row, true) of
    undefined ->
        case rexi:sync_reply(#view_row{key=Key, value=Value}) of
        ok ->
            {ok, Acc#view_acc{limit=Limit-1}};
        stop ->
            exit(normal);
        timeout ->
            exit(timeout)
        end;
    true ->
        case rexi:stream(#view_row{key=Key, value=Value}) of
        ok ->
            {ok, Acc#view_acc{limit=Limit-1}};
        timeout ->
            exit(timeout)
        end
    end.

changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
    #changes_args{
        include_docs = IncludeDocs,
        filter = Acc
    } = Args,
    Conflicts = proplists:get_value(conflicts, Options, false),
    #doc_info{high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo,
    case [X || X <- couch_changes:filter(Db, DocInfo, Acc), X /= null] of
    [] ->
        {ok, {Db, Seq, Args, Options}};
    Results ->
        Opts = if Conflicts -> [conflicts]; true -> [] end,
        ChangesRow = changes_row(Db, DocInfo, Results, Del, IncludeDocs, Opts),
        Go = rexi:sync_reply(ChangesRow),
        {Go, {Db, Seq, Args, Options}}
    end.

changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) ->
    Doc = doc_member(Db, DI, Opts),
    #change{key=Seq, id=Id, value=Results, doc=Doc, deleted=Del};
changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
    #change{key=Seq, id=Id, value=Results, deleted=true};
changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
    #change{key=Seq, id=Id, value=Results}.

doc_member(Shard, DocInfo, Opts) ->
    case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
    {ok, Doc} ->
        couch_doc:to_json_obj(Doc, []);
    Error ->
        Error
    end.

possible_ancestors(_FullInfo, []) ->
    [];
possible_ancestors(FullInfo, MissingRevs) ->
    #doc_info{revs=RevsInfo} = couch_doc:to_doc_info(FullInfo),
    LeafRevs = [Rev || #rev_info{rev=Rev} <- RevsInfo],
    % Find the revs that are possible parents of this rev
    lists:foldl(fun({LeafPos, LeafRevId}, Acc) ->
        % this leaf is a "possible ancenstor" of the missing
        % revs if this LeafPos lessthan any of the missing revs
        case lists:any(fun({MissingPos, _}) ->
                LeafPos < MissingPos end, MissingRevs) of
        true ->
            [{LeafPos, LeafRevId} | Acc];
        false ->
            Acc
        end
    end, [], LeafRevs).

make_att_readers([]) ->
    [];
make_att_readers([#doc{atts=Atts0} = Doc | Rest]) ->
    % % go through the attachments looking for 'follows' in the data,
    % % replace with function that reads the data from MIME stream.
    Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0],
    [Doc#doc{atts = Atts} | make_att_readers(Rest)].

make_att_reader({follows, Parser, Ref}) ->
    fun() ->
        ParserRef = case get(mp_parser_ref) of
            undefined ->
                PRef = erlang:monitor(process, Parser),
                put(mp_parser_ref, PRef),
                PRef;
            Else ->
                Else
        end,
        Parser ! {get_bytes, Ref, self()},
        receive
            {bytes, Ref, Bytes} ->
                Bytes;
            {'DOWN', ParserRef, _, _, Reason} ->
                throw({mp_parser_died, Reason})
        end
    end;
make_att_reader(Else) ->
    Else.

clean_stack() ->
    lists:map(fun({M,F,A}) when is_list(A) -> {M,F,length(A)}; (X) -> X end,
        erlang:get_stacktrace()).

set_io_priority(DbName, Options) ->
    case lists:keyfind(io_priority, 1, Options) of
    {io_priority, Pri} ->
        erlang:put(io_priority, Pri);
    false ->
        erlang:put(io_priority, {interactive, DbName})
    end.
