% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(mango_cursor_view).

-export([
    create/4,
    explain/1,
    execute/3
]).

-export([
    view_cb/2,
    handle_message/2,
    handle_all_docs_message/2,
    composite_indexes/2,
    choose_best_index/2
]).

-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("fabric/include/fabric.hrl").

-include("mango_cursor.hrl").
-include("mango_idx_view.hrl").

-define(HEARTBEAT_INTERVAL_IN_USEC, 4000000).

create(Db, Indexes, Selector, Opts) ->
    FieldRanges = mango_idx_view:field_ranges(Selector),
    Composited = composite_indexes(Indexes, FieldRanges),
    {Index, IndexRanges} = choose_best_index(Db, Composited),

    MRLimit = mango_opts:mr_limit(couch_db:is_partitioned(Db)),
    Limit = erlang:min(MRLimit, couch_util:get_value(limit, Opts, mango_opts:default_limit())),
    Skip = couch_util:get_value(skip, Opts, 0),
    Fields = couch_util:get_value(fields, Opts, all_fields),
    Bookmark = couch_util:get_value(bookmark, Opts),

    IndexRanges1 = mango_cursor:maybe_noop_range(Selector, IndexRanges),

    {ok, #cursor{
        db = Db,
        index = Index,
        ranges = IndexRanges1,
        selector = Selector,
        opts = Opts,
        limit = Limit,
        skip = Skip,
        fields = Fields,
        bookmark = Bookmark
    }}.

explain(Cursor) ->
    #cursor{
        opts = Opts
    } = Cursor,

    BaseArgs = base_args(Cursor),
    Args = apply_opts(Opts, BaseArgs),

    [
        {mrargs,
            {[
                {include_docs, Args#mrargs.include_docs},
                {view_type, Args#mrargs.view_type},
                {reduce, Args#mrargs.reduce},
                {partition, couch_mrview_util:get_extra(Args, partition, null)},
                {start_key, maybe_replace_max_json(Args#mrargs.start_key)},
                {end_key, maybe_replace_max_json(Args#mrargs.end_key)},
                {direction, Args#mrargs.direction},
                {stable, Args#mrargs.stable},
                {update, Args#mrargs.update},
                {conflicts, Args#mrargs.conflicts}
            ]}}
    ].

% replace internal values that cannot
% be represented as a valid UTF-8 string
% with a token for JSON serialization
maybe_replace_max_json([]) ->
    [];
maybe_replace_max_json(?MAX_STR) ->
    <<"<MAX>">>;
maybe_replace_max_json([H | T] = EndKey) when is_list(EndKey) ->
    H1 =
        if
            H == ?MAX_JSON_OBJ -> <<"<MAX>">>;
            true -> H
        end,
    [H1 | maybe_replace_max_json(T)];
maybe_replace_max_json(EndKey) ->
    EndKey.

base_args(#cursor{index = Idx, selector = Selector} = Cursor) ->
    {StartKey, EndKey} =
        case Cursor#cursor.ranges of
            [empty] ->
                {null, null};
            _ ->
                {
                    mango_idx:start_key(Idx, Cursor#cursor.ranges),
                    mango_idx:end_key(Idx, Cursor#cursor.ranges)
                }
        end,
    #mrargs{
        view_type = map,
        reduce = false,
        start_key = StartKey,
        end_key = EndKey,
        include_docs = true,
        extra = [
            {callback, {?MODULE, view_cb}},
            {selector, Selector},
            {is_mango_query, true}
        ]
    }.

execute(#cursor{db = Db, index = Idx, execution_stats = Stats} = Cursor0, UserFun, UserAcc) ->
    Cursor = Cursor0#cursor{
        user_fun = UserFun,
        user_acc = UserAcc,
        execution_stats = mango_execution_stats:log_start(Stats)
    },
    case Cursor#cursor.ranges of
        [empty] ->
            % empty indicates unsatisfiable ranges, so don't perform search
            {ok, UserAcc};
        _ ->
            BaseArgs = base_args(Cursor),
            #cursor{opts = Opts, bookmark = Bookmark} = Cursor,
            Args0 = apply_opts(Opts, BaseArgs),
            Args = mango_json_bookmark:update_args(Bookmark, Args0),
            UserCtx = couch_util:get_value(user_ctx, Opts, #user_ctx{}),
            DbOpts = [{user_ctx, UserCtx}],
            Result =
                case mango_idx:def(Idx) of
                    all_docs ->
                        CB = fun ?MODULE:handle_all_docs_message/2,
                        fabric:all_docs(Db, DbOpts, CB, Cursor, Args);
                    _ ->
                        CB = fun ?MODULE:handle_message/2,
                        % Normal view
                        DDoc = ddocid(Idx),
                        Name = mango_idx:name(Idx),
                        fabric:query_view(Db, DbOpts, DDoc, Name, CB, Cursor, Args)
                end,
            case Result of
                {ok, LastCursor} ->
                    NewBookmark = mango_json_bookmark:create(LastCursor),
                    Arg = {add_key, bookmark, NewBookmark},
                    {_Go, FinalUserAcc} = UserFun(Arg, LastCursor#cursor.user_acc),
                    Stats0 = LastCursor#cursor.execution_stats,
                    FinalUserAcc0 = mango_execution_stats:maybe_add_stats(
                        Opts, UserFun, Stats0, FinalUserAcc
                    ),
                    FinalUserAcc1 = mango_cursor:maybe_add_warning(
                        UserFun, Cursor, Stats0, FinalUserAcc0
                    ),
                    {ok, FinalUserAcc1};
                {error, Reason} ->
                    {error, Reason}
            end
    end.

% Any of these indexes may be a composite index. For each
% index find the most specific set of fields for each
% index. Ie, if an index has columns a, b, c, d, then
% check FieldRanges for a, b, c, and d and return
% the longest prefix of columns found.
composite_indexes(Indexes, FieldRanges) ->
    lists:foldl(
        fun(Idx, Acc) ->
            Cols = mango_idx:columns(Idx),
            Prefix = composite_prefix(Cols, FieldRanges),
            % Calcuate the difference between the FieldRanges/Selector
            % and the Prefix. We want to select the index with a prefix
            % that is as close to the FieldRanges as possible
            PrefixDifference = length(FieldRanges) - length(Prefix),
            [{Idx, Prefix, PrefixDifference} | Acc]
        end,
        [],
        Indexes
    ).

composite_prefix([], _) ->
    [];
composite_prefix([Col | Rest], Ranges) ->
    case lists:keyfind(Col, 1, Ranges) of
        {Col, Range} ->
            [Range | composite_prefix(Rest, Ranges)];
        false ->
            []
    end.

% The query planner
% First choose the index with the lowest difference between its
% Prefix and the FieldRanges. If that is equal, then
% choose the index with the least number of
% fields in the index. If we still cannot break the tie,
% then choose alphabetically based on ddocId.
% Return the first element's Index and IndexRanges.
%
% In the future we can look into doing a cached parallel
% reduce view read on each index with the ranges to find
% the one that has the fewest number of rows or something.
choose_best_index(_DbName, IndexRanges) ->
    Cmp = fun({IdxA, _PrefixA, PrefixDifferenceA}, {IdxB, _PrefixB, PrefixDifferenceB}) ->
        case PrefixDifferenceA - PrefixDifferenceB of
            N when N < 0 -> true;
            N when N == 0 ->
                ColsLenA = length(mango_idx:columns(IdxA)),
                ColsLenB = length(mango_idx:columns(IdxB)),
                case ColsLenA - ColsLenB of
                    M when M < 0 ->
                        true;
                    M when M == 0 ->
                        % We have no other way to choose, so at this point
                        % select the index based on (dbname, ddocid, view_name) triple
                        IdxA =< IdxB;
                    _ ->
                        false
                end;
            _ ->
                false
        end
    end,
    {SelectedIndex, SelectedIndexRanges, _} = hd(lists:sort(Cmp, IndexRanges)),
    {SelectedIndex, SelectedIndexRanges}.

view_cb({meta, Meta}, Acc) ->
    % Map function starting
    put(mango_docs_examined, 0),
    set_mango_msg_timestamp(),
    ok = rexi:stream2({meta, Meta}),
    {ok, Acc};
view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
    ViewRow = #view_row{
        id = couch_util:get_value(id, Row),
        key = couch_util:get_value(key, Row),
        doc = couch_util:get_value(doc, Row)
    },
    case ViewRow#view_row.doc of
        null ->
            maybe_send_mango_ping();
        undefined ->
            % include_docs=false. Use quorum fetch at coordinator
            ok = rexi:stream2(ViewRow),
            set_mango_msg_timestamp();
        Doc ->
            put(mango_docs_examined, get(mango_docs_examined) + 1),
            Selector = couch_util:get_value(selector, Options),
            couch_stats:increment_counter([mango, docs_examined]),
            case mango_selector:match(Selector, Doc) of
                true ->
                    ok = rexi:stream2(ViewRow),
                    set_mango_msg_timestamp();
                false ->
                    maybe_send_mango_ping()
            end
    end,
    {ok, Acc};
view_cb(complete, Acc) ->
    % Send shard-level execution stats
    ok = rexi:stream2({execution_stats, {docs_examined, get(mango_docs_examined)}}),
    % Finish view output
    ok = rexi:stream_last(complete),
    {ok, Acc};
view_cb(ok, ddoc_updated) ->
    rexi:reply({ok, ddoc_updated}).

maybe_send_mango_ping() ->
    Current = os:timestamp(),
    LastPing = get(mango_last_msg_timestamp),
    % Fabric will timeout if it has not heard a response from a worker node
    % after 5 seconds. Send a ping every 4 seconds so the timeout doesn't happen.
    case timer:now_diff(Current, LastPing) > ?HEARTBEAT_INTERVAL_IN_USEC of
        false ->
            ok;
        true ->
            rexi:ping(),
            set_mango_msg_timestamp()
    end.

set_mango_msg_timestamp() ->
    put(mango_last_msg_timestamp, os:timestamp()).

handle_message({meta, _}, Cursor) ->
    {ok, Cursor};
handle_message({row, Props}, Cursor) ->
    case doc_member(Cursor, Props) of
        {ok, Doc, {execution_stats, Stats}} ->
            Cursor1 = Cursor#cursor{
                execution_stats = Stats
            },
            Cursor2 = update_bookmark_keys(Cursor1, Props),
            FinalDoc = mango_fields:extract(Doc, Cursor2#cursor.fields),
            handle_doc(Cursor2, FinalDoc);
        {no_match, _, {execution_stats, Stats}} ->
            Cursor1 = Cursor#cursor{
                execution_stats = Stats
            },
            {ok, Cursor1};
        Error ->
            couch_log:error("~s :: Error loading doc: ~p", [?MODULE, Error]),
            {ok, Cursor}
    end;
handle_message({execution_stats, ShardStats}, #cursor{execution_stats = Stats} = Cursor) ->
    {docs_examined, DocsExamined} = ShardStats,
    Cursor1 = Cursor#cursor{
        execution_stats = mango_execution_stats:incr_docs_examined(Stats, DocsExamined)
    },
    {ok, Cursor1};
handle_message(complete, Cursor) ->
    {ok, Cursor};
handle_message({error, Reason}, _Cursor) ->
    {error, Reason}.

handle_all_docs_message({row, Props}, Cursor) ->
    case is_design_doc(Props) of
        true -> {ok, Cursor};
        false -> handle_message({row, Props}, Cursor)
    end;
handle_all_docs_message(Message, Cursor) ->
    handle_message(Message, Cursor).

handle_doc(#cursor{skip = S} = C, _) when S > 0 ->
    {ok, C#cursor{skip = S - 1}};
handle_doc(#cursor{limit = L, execution_stats = Stats} = C, Doc) when L > 0 ->
    UserFun = C#cursor.user_fun,
    UserAcc = C#cursor.user_acc,
    {Go, NewAcc} = UserFun({row, Doc}, UserAcc),
    {Go, C#cursor{
        user_acc = NewAcc,
        limit = L - 1,
        execution_stats = mango_execution_stats:incr_results_returned(Stats)
    }};
handle_doc(C, _Doc) ->
    {stop, C}.

ddocid(Idx) ->
    case mango_idx:ddoc(Idx) of
        <<"_design/", Rest/binary>> ->
            Rest;
        Else ->
            Else
    end.

apply_opts([], Args) ->
    Args;
apply_opts([{r, RStr} | Rest], Args) ->
    IncludeDocs =
        case list_to_integer(RStr) of
            1 ->
                true;
            R when R > 1 ->
                % We don't load the doc in the view query because
                % we have to do a quorum read in the coordinator
                % so there's no point.
                false
        end,
    NewArgs = Args#mrargs{include_docs = IncludeDocs},
    apply_opts(Rest, NewArgs);
apply_opts([{conflicts, true} | Rest], Args) ->
    NewArgs = Args#mrargs{conflicts = true},
    apply_opts(Rest, NewArgs);
apply_opts([{conflicts, false} | Rest], Args) ->
    % Ignored cause default
    apply_opts(Rest, Args);
apply_opts([{sort, Sort} | Rest], Args) ->
    % We only support single direction sorts
    % so nothing fancy here.
    case mango_sort:directions(Sort) of
        [] ->
            apply_opts(Rest, Args);
        [<<"asc">> | _] ->
            apply_opts(Rest, Args);
        [<<"desc">> | _] ->
            SK = Args#mrargs.start_key,
            SKDI = Args#mrargs.start_key_docid,
            EK = Args#mrargs.end_key,
            EKDI = Args#mrargs.end_key_docid,
            NewArgs = Args#mrargs{
                direction = rev,
                start_key = EK,
                start_key_docid = EKDI,
                end_key = SK,
                end_key_docid = SKDI
            },
            apply_opts(Rest, NewArgs)
    end;
apply_opts([{stale, ok} | Rest], Args) ->
    NewArgs = Args#mrargs{
        stable = true,
        update = false
    },
    apply_opts(Rest, NewArgs);
apply_opts([{stable, true} | Rest], Args) ->
    NewArgs = Args#mrargs{
        stable = true
    },
    apply_opts(Rest, NewArgs);
apply_opts([{update, false} | Rest], Args) ->
    NewArgs = Args#mrargs{
        update = false
    },
    apply_opts(Rest, NewArgs);
apply_opts([{partition, <<>>} | Rest], Args) ->
    apply_opts(Rest, Args);
apply_opts([{partition, Partition} | Rest], Args) when is_binary(Partition) ->
    NewArgs = couch_mrview_util:set_extra(Args, partition, Partition),
    apply_opts(Rest, NewArgs);
apply_opts([{_, _} | Rest], Args) ->
    % Ignore unknown options
    apply_opts(Rest, Args).

doc_member(Cursor, RowProps) ->
    Db = Cursor#cursor.db,
    Opts = Cursor#cursor.opts,
    ExecutionStats = Cursor#cursor.execution_stats,
    Selector = Cursor#cursor.selector,
    case couch_util:get_value(doc, RowProps) of
        {DocProps} ->
            % only matching documents are returned; the selector
            % is evaluated at the shard level in view_cb({row, Row},
            {ok, {DocProps}, {execution_stats, ExecutionStats}};
        undefined ->
            % an undefined doc was returned, indicating we should
            % perform a quorum fetch
            ExecutionStats1 = mango_execution_stats:incr_quorum_docs_examined(ExecutionStats),
            couch_stats:increment_counter([mango, quorum_docs_examined]),
            Id = couch_util:get_value(id, RowProps),
            case mango_util:defer(fabric, open_doc, [Db, Id, Opts]) of
                {ok, #doc{} = DocProps} ->
                    Doc = couch_doc:to_json_obj(DocProps, []),
                    match_doc(Selector, Doc, ExecutionStats1);
                Else ->
                    Else
            end;
        _ ->
            % no doc, no match
            {no_match, null, {execution_stats, ExecutionStats}}
    end.

match_doc(Selector, Doc, ExecutionStats) ->
    case mango_selector:match(Selector, Doc) of
        true ->
            {ok, Doc, {execution_stats, ExecutionStats}};
        false ->
            {no_match, Doc, {execution_stats, ExecutionStats}}
    end.

is_design_doc(RowProps) ->
    case couch_util:get_value(id, RowProps) of
        <<"_design/", _/binary>> -> true;
        _ -> false
    end.

update_bookmark_keys(#cursor{limit = Limit} = Cursor, Props) when Limit > 0 ->
    Id = couch_util:get_value(id, Props),
    Key = couch_util:get_value(key, Props),
    Cursor#cursor{
        bookmark_docid = Id,
        bookmark_key = Key
    };
update_bookmark_keys(Cursor, _Props) ->
    Cursor.

%%%%%%%% module tests below %%%%%%%%

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

does_not_refetch_doc_with_value_test() ->
    Cursor = #cursor{
        db = <<"db">>,
        opts = [],
        execution_stats = #execution_stats{},
        selector = mango_selector:normalize({[{<<"user_id">>, <<"1234">>}]})
    },
    RowProps = [
        {id, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
        {key, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
        {doc,
            {
                [
                    {<<"_id">>, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
                    {<<"_rev">>, <<"1-a954fe2308f14307756067b0e18c2968">>},
                    {<<"user_id">>, 11}
                ]
            }}
    ],
    {Match, _, _} = doc_member(Cursor, RowProps),
    ?assertEqual(Match, ok).

-endif.
