% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(mango_cursor_view).

-export([
    create/4,
    explain/1,
    execute/3
]).

-export([
    view_cb/2,
    handle_message/2,
    handle_all_docs_message/2,
    composite_indexes/2,
    choose_best_index/1
]).

-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
-include_lib("fabric/include/fabric.hrl").

-include("mango.hrl").
-include("mango_cursor.hrl").
-include("mango_idx.hrl").
-include("mango_idx_view.hrl").

-define(HEARTBEAT_INTERVAL_IN_USEC, 4000000).

-type cursor_options() :: [{term(), term()}].
-type message() ::
    {meta, _}
    | {row, row_properties()}
    | {execution_stats, shard_stats()}
    | {stop, #cursor{}}
    | {complete, #cursor{}}
    | {error, any()}.

% viewcbargs wraps up the arguments that view_cb uses into a single
% entry in the mrargs.extra list. We use a Map to allow us to later
% add fields without having old messages causing errors/crashes.

-type viewcbargs() ::
    #{
        selector => selector(),
        fields => fields(),
        covering_index => maybe(#idx{})
    }.

-spec viewcbargs_new(Selector, Fields, CoveringIndex) -> ViewCBArgs when
    Selector :: selector(),
    Fields :: fields(),
    CoveringIndex :: maybe(#idx{}),
    ViewCBArgs :: viewcbargs().
viewcbargs_new(Selector, Fields, CoveringIndex) ->
    #{
        selector => Selector,
        fields => Fields,
        covering_index => CoveringIndex
    }.

-spec viewcbargs_get(Key, Args) -> maybe(term()) when
    Key :: selector | fields | covering_index,
    Args :: viewcbargs().
viewcbargs_get(selector, Args) when is_map(Args) ->
    maps:get(selector, Args, undefined);
viewcbargs_get(fields, Args) when is_map(Args) ->
    maps:get(fields, Args, undefined);
viewcbargs_get(covering_index, Args) when is_map(Args) ->
    maps:get(covering_index, Args, undefined).

-spec create(Db, Indexes, Selector, Options) -> {ok, #cursor{}} when
    Db :: database(),
    Indexes :: [#idx{}],
    Selector :: selector(),
    Options :: cursor_options().
create(Db, Indexes, Selector, Opts) ->
    FieldRanges = mango_idx_view:field_ranges(Selector),
    Composited = composite_indexes(Indexes, FieldRanges),
    {Index, IndexRanges} = choose_best_index(Composited),

    Limit = couch_util:get_value(limit, Opts, mango_opts:default_limit()),
    Skip = couch_util:get_value(skip, Opts, 0),
    Fields = couch_util:get_value(fields, Opts, all_fields),
    Bookmark = couch_util:get_value(bookmark, Opts),

    IndexRanges1 = mango_cursor:maybe_noop_range(Selector, IndexRanges),

    {ok, #cursor{
        db = Db,
        index = Index,
        ranges = IndexRanges1,
        selector = Selector,
        opts = Opts,
        limit = Limit,
        skip = Skip,
        fields = Fields,
        bookmark = Bookmark
    }}.

-spec required_fields(#cursor{}) -> fields().
required_fields(#cursor{fields = all_fields}) ->
    all_fields;
required_fields(#cursor{fields = Fields, selector = Selector}) ->
    lists:usort(Fields ++ mango_selector:fields(Selector)).

-spec apply_cursor_opts(#cursor{}) -> {#mrargs{}, boolean()}.
apply_cursor_opts(#cursor{} = Cursor) ->
    #cursor{index = Index, opts = Opts} = Cursor,
    BaseArgs = base_args(Cursor),
    Args0 = apply_opts(Opts, BaseArgs),
    Fields = required_fields(Cursor),
    Args = consider_index_coverage(Index, Fields, Args0),
    Covered = mango_idx_view:covers(Index, Fields),
    {Args, Covered}.

-spec explain(#cursor{}) -> nonempty_list(term()).
explain(Cursor) ->
    {Args, Covered} = apply_cursor_opts(Cursor),
    [
        {mrargs,
            {[
                {include_docs, Args#mrargs.include_docs},
                {view_type, Args#mrargs.view_type},
                {reduce, Args#mrargs.reduce},
                {partition, couch_mrview_util:get_extra(Args, partition, null)},
                {start_key, maybe_replace_max_json(Args#mrargs.start_key)},
                {end_key, maybe_replace_max_json(Args#mrargs.end_key)},
                {direction, Args#mrargs.direction},
                {stable, Args#mrargs.stable},
                {update, Args#mrargs.update},
                {conflicts, Args#mrargs.conflicts}
            ]}},
        {covered, Covered}
    ].

% replace internal values that cannot
% be represented as a valid UTF-8 string
% with a token for JSON serialization
maybe_replace_max_json([]) ->
    [];
maybe_replace_max_json(?MAX_STR) ->
    <<"<MAX>">>;
maybe_replace_max_json([H | T] = EndKey) when is_list(EndKey) ->
    H1 =
        if
            H == ?MAX_JSON_OBJ -> <<"<MAX>">>;
            true -> H
        end,
    [H1 | maybe_replace_max_json(T)];
maybe_replace_max_json(EndKey) ->
    EndKey.

-spec base_args(#cursor{}) -> #mrargs{}.
base_args(#cursor{index = Idx, selector = Selector, fields = Fields} = Cursor) ->
    {StartKey, EndKey} =
        case Cursor#cursor.ranges of
            [empty] ->
                {null, null};
            _ ->
                {
                    mango_idx:start_key(Idx, Cursor#cursor.ranges),
                    mango_idx:end_key(Idx, Cursor#cursor.ranges)
                }
        end,
    #mrargs{
        view_type = map,
        reduce = false,
        start_key = StartKey,
        end_key = EndKey,
        include_docs = true,
        extra = [
            % view_cb pushes down post hoc matching and field extraction to
            % the shard.
            {callback, {?MODULE, view_cb}},
            % TODO remove selector. It supports older nodes during version upgrades.
            {selector, Selector},
            {callback_args, viewcbargs_new(Selector, Fields, undefined)},

            {ignore_partition_query_limit, true}
        ]
    }.

-spec execute(#cursor{}, UserFunction, UserAccumulator) -> Result when
    UserFunction :: fun(),
    UserAccumulator :: any(),
    Result :: {ok, UserAccumulator} | {error, any()}.
execute(#cursor{db = Db, index = Idx, execution_stats = Stats} = Cursor0, UserFun, UserAcc) ->
    Cursor = Cursor0#cursor{
        user_fun = UserFun,
        user_acc = UserAcc,
        execution_stats = mango_execution_stats:log_start(Stats)
    },
    case Cursor#cursor.ranges of
        [empty] ->
            % empty indicates unsatisfiable ranges, so don't perform search
            {ok, UserAcc};
        _ ->
            {Args0, _Covered} = apply_cursor_opts(Cursor),
            #cursor{opts = Opts, bookmark = Bookmark} = Cursor,
            Args = mango_json_bookmark:update_args(Bookmark, Args0),
            UserCtx = couch_util:get_value(user_ctx, Opts, #user_ctx{}),
            DbOpts = [{user_ctx, UserCtx}],
            Result =
                case mango_idx:def(Idx) of
                    all_docs ->
                        CB = fun ?MODULE:handle_all_docs_message/2,
                        fabric:all_docs(Db, DbOpts, CB, Cursor, Args);
                    _ ->
                        CB = fun ?MODULE:handle_message/2,
                        % Normal view
                        DDoc = ddocid(Idx),
                        Name = mango_idx:name(Idx),
                        fabric:query_view(Db, DbOpts, DDoc, Name, CB, Cursor, Args)
                end,
            case Result of
                {ok, LastCursor} ->
                    NewBookmark = mango_json_bookmark:create(LastCursor),
                    Arg = {add_key, bookmark, NewBookmark},
                    {_Go, FinalUserAcc} = UserFun(Arg, LastCursor#cursor.user_acc),
                    Stats0 = LastCursor#cursor.execution_stats,
                    {FinalUserAcc0, Stats1} = mango_execution_stats:maybe_add_stats(
                        Opts, UserFun, Stats0, FinalUserAcc
                    ),
                    %% This needs Stats1 as log_end is called in maybe_add_stats
                    mango_execution_stats:log_stats(Stats1),
                    FinalUserAcc1 = mango_cursor:maybe_add_warning(
                        UserFun, Cursor, Stats1, FinalUserAcc0
                    ),
                    {ok, FinalUserAcc1};
                {error, Reason} ->
                    {error, Reason}
            end
    end.

-type comparator() :: '$lt' | '$lte' | '$eq' | '$gte' | '$gt'.
-type range() :: {comparator(), any(), comparator(), any()} | empty.

% Any of these indexes may be a composite index. For each
% index find the most specific set of fields for each
% index. Ie, if an index has columns a, b, c, d, then
% check FieldRanges for a, b, c, and d and return
% the longest prefix of columns found.
-spec composite_indexes([#idx{}], [{field(), range()}]) -> [{#idx{}, [range()], integer()}].
composite_indexes(Indexes, FieldRanges) ->
    lists:foldl(
        fun(Idx, Acc) ->
            Cols = mango_idx:columns(Idx),
            Prefix = composite_prefix(Cols, FieldRanges),
            % Calculate the difference between the FieldRanges/Selector
            % and the Prefix. We want to select the index with a prefix
            % that is as close to the FieldRanges as possible
            PrefixDifference = length(FieldRanges) - length(Prefix),
            [{Idx, Prefix, PrefixDifference} | Acc]
        end,
        [],
        Indexes
    ).

-spec composite_prefix([field()], [{field(), range()}]) -> [range()].
composite_prefix([], _) ->
    [];
composite_prefix([Col | Rest], Ranges) ->
    case lists:keyfind(Col, 1, Ranges) of
        {Col, Range} ->
            [Range | composite_prefix(Rest, Ranges)];
        false ->
            []
    end.

% The query planner
% First choose the index with the lowest difference between its
% Prefix and the FieldRanges. If that is equal, then
% choose the index with the least number of
% fields in the index. If we still cannot break the tie,
% then choose alphabetically based on (dbname, ddocid, view_name).
% Return the first element's Index and IndexRanges.
%
% In the future we can look into doing a cached parallel
% reduce view read on each index with the ranges to find
% the one that has the fewest number of rows or something.
-spec choose_best_index(IndexRanges) -> Selection when
    IndexRanges :: nonempty_list({#idx{}, [range()], integer()}),
    Selection :: {#idx{}, [range()]}.
choose_best_index(IndexRanges) ->
    Cmp = fun({IdxA, _PrefixA, PrefixDifferenceA}, {IdxB, _PrefixB, PrefixDifferenceB}) ->
        case PrefixDifferenceA - PrefixDifferenceB of
            N when N < 0 -> true;
            N when N == 0 ->
                ColsLenA = length(mango_idx:columns(IdxA)),
                ColsLenB = length(mango_idx:columns(IdxB)),
                case ColsLenA - ColsLenB of
                    M when M < 0 ->
                        true;
                    M when M == 0 ->
                        % Restrict the comparison to the (dbname, ddocid, view_name)
                        % triple -- in case of their equivalence, the original order
                        % will be maintained.
                        #idx{dbname = DbNameA, ddoc = DDocA, name = NameA} = IdxA,
                        #idx{dbname = DbNameB, ddoc = DDocB, name = NameB} = IdxB,
                        {DbNameA, DDocA, NameA} =< {DbNameB, DDocB, NameB};
                    _ ->
                        false
                end;
            _ ->
                false
        end
    end,
    {SelectedIndex, SelectedIndexRanges, _} = hd(lists:sort(Cmp, IndexRanges)),
    {SelectedIndex, SelectedIndexRanges}.

-spec view_cb
    (Message, #mrargs{}) -> Response when
        Message :: {meta, any()} | {row, row_properties()} | complete,
        Response :: {ok, #mrargs{}};
    (ok, ddoc_updated) -> any().
view_cb({meta, Meta}, Acc) ->
    % Map function starting
    put(mango_docs_examined, 0),
    set_mango_msg_timestamp(),
    ok = rexi:stream2({meta, Meta}),
    {ok, Acc};
view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
    ViewRow = #view_row{
        id = couch_util:get_value(id, Row),
        key = couch_util:get_value(key, Row),
        doc = couch_util:get_value(doc, Row)
    },
    % This supports receiving our "arguments" either as just the `selector`
    % or in the new record in `callback_args`. This is to support mid-upgrade
    % clusters where the non-upgraded coordinator nodes will send the older style.
    % TODO remove this in a couple of couchdb versions.
    {Selector, Fields, CoveringIndex} =
        case couch_util:get_value(callback_args, Options) of
            % old style
            undefined ->
                {couch_util:get_value(selector, Options), undefined, undefined};
            % new style - assume a viewcbargs
            Args = #{} ->
                {
                    viewcbargs_get(selector, Args),
                    viewcbargs_get(fields, Args),
                    viewcbargs_get(covering_index, Args)
                }
        end,
    Process =
        fun(Doc) ->
            % slightly abuse the doc field in the view response here,
            % because we may return something other than the full document:
            % we may have projected the requested `fields` from the query.
            % However, this oddness is confined to being visible in this module.
            case match_and_extract_doc(Doc, Selector, Fields) of
                {match, FinalDoc} ->
                    FinalViewRow = ViewRow#view_row{doc = FinalDoc},
                    ok = rexi:stream2(FinalViewRow),
                    set_mango_msg_timestamp();
                {no_match, undefined} ->
                    maybe_send_mango_ping()
            end
        end,
    case {ViewRow#view_row.doc, CoveringIndex} of
        {null, _} ->
            maybe_send_mango_ping();
        {undefined, Index = #idx{}} ->
            Doc = derive_doc_from_index(Index, ViewRow),
            Process(Doc);
        {undefined, _} ->
            % include_docs=false. Use quorum fetch at coordinator
            ok = rexi:stream2(ViewRow),
            set_mango_msg_timestamp();
        {Doc, _} ->
            put(mango_docs_examined, get(mango_docs_examined) + 1),
            couch_stats:increment_counter([mango, docs_examined]),
            Process(Doc)
    end,
    {ok, Acc};
view_cb(complete, Acc) ->
    % Send shard-level execution stats
    ok = rexi:stream2({execution_stats, {docs_examined, get(mango_docs_examined)}}),
    % Finish view output
    ok = rexi:stream_last(complete),
    {ok, Acc};
view_cb(ok, ddoc_updated) ->
    rexi:reply({ok, ddoc_updated}).

%% match_and_extract_doc checks whether Doc matches Selector. If it does,
%% extract Fields and return {match, FinalDoc}; otherwise return {no_match, undefined}.
-spec match_and_extract_doc(Doc, Selector, Fields) -> Result when
    Doc :: ejson(),
    Selector :: selector(),
    Fields :: maybe(fields()),
    Result :: {match, term()} | {no_match, undefined}.
match_and_extract_doc(Doc, Selector, Fields) ->
    case mango_selector:match(Selector, Doc) of
        true ->
            FinalDoc = mango_fields:extract(Doc, Fields),
            {match, FinalDoc};
        false ->
            {no_match, undefined}
    end.

-spec derive_doc_from_index(#idx{}, #view_row{}) -> term().
derive_doc_from_index(Index, #view_row{id = DocId, key = Keys}) ->
    Columns = mango_idx:columns(Index),
    lists:foldr(
        fun({Column, Key}, Doc) -> mango_doc:set_field(Doc, Column, Key) end,
        mango_doc:set_field({[]}, <<"_id">>, DocId),
        lists:zip(Columns, Keys)
    ).

-spec maybe_send_mango_ping() -> ok | term().
maybe_send_mango_ping() ->
    Current = os:timestamp(),
    LastPing = get(mango_last_msg_timestamp),
    % Fabric will timeout if it has not heard a response from a worker node
    % after 5 seconds. Send a ping every 4 seconds so the timeout doesn't happen.
    case timer:now_diff(Current, LastPing) > ?HEARTBEAT_INTERVAL_IN_USEC of
        false ->
            ok;
        true ->
            rexi:ping(),
            set_mango_msg_timestamp()
    end.

-spec set_mango_msg_timestamp() -> term().
set_mango_msg_timestamp() ->
    put(mango_last_msg_timestamp, os:timestamp()).

-spec handle_message(message(), #cursor{}) -> Response when
    Response ::
        {ok, #cursor{}}
        | {error, any()}.
handle_message({meta, _}, Cursor) ->
    {ok, Cursor};
handle_message({row, Props}, Cursor) ->
    case doc_member_and_extract(Cursor, Props) of
        {ok, Doc, {execution_stats, Stats}} ->
            Cursor1 = Cursor#cursor{
                execution_stats = Stats
            },
            Cursor2 = update_bookmark_keys(Cursor1, Props),
            handle_doc(Cursor2, Doc);
        {no_match, _, {execution_stats, Stats}} ->
            Cursor1 = Cursor#cursor{
                execution_stats = Stats
            },
            {ok, Cursor1};
        Error ->
            couch_log:error("~s :: Error loading doc: ~p", [?MODULE, Error]),
            {ok, Cursor}
    end;
handle_message({execution_stats, ShardStats}, #cursor{execution_stats = Stats} = Cursor) ->
    {docs_examined, DocsExamined} = ShardStats,
    Cursor1 = Cursor#cursor{
        execution_stats = mango_execution_stats:incr_docs_examined(Stats, DocsExamined)
    },
    {ok, Cursor1};
handle_message(complete, Cursor) ->
    {ok, Cursor};
handle_message({error, Reason}, _Cursor) ->
    {error, Reason}.

-spec handle_all_docs_message(message(), #cursor{}) -> Response when
    Response ::
        {ok, #cursor{}}
        | {error, any()}.
handle_all_docs_message({row, Props}, Cursor) ->
    case is_design_doc(Props) of
        true -> {ok, Cursor};
        false -> handle_message({row, Props}, Cursor)
    end;
handle_all_docs_message(Message, Cursor) ->
    handle_message(Message, Cursor).

-spec handle_doc(#cursor{}, doc()) -> Response when
    Response :: {ok, #cursor{}} | {stop, #cursor{}}.
handle_doc(#cursor{skip = S} = C, _) when S > 0 ->
    {ok, C#cursor{skip = S - 1}};
handle_doc(#cursor{limit = L, execution_stats = Stats} = C, Doc) when L > 0 ->
    UserFun = C#cursor.user_fun,
    UserAcc = C#cursor.user_acc,
    {Go, NewAcc} = UserFun({row, Doc}, UserAcc),
    {Go, C#cursor{
        user_acc = NewAcc,
        limit = L - 1,
        execution_stats = mango_execution_stats:incr_results_returned(Stats)
    }};
handle_doc(C, _Doc) ->
    {stop, C}.

-spec ddocid(#idx{}) -> binary().
ddocid(Idx) ->
    case mango_idx:ddoc(Idx) of
        <<"_design/", Rest/binary>> ->
            Rest;
        Else ->
            Else
    end.

-spec apply_opts(cursor_options(), #mrargs{}) -> #mrargs{}.
apply_opts([], Args) ->
    Args;
apply_opts([{r, RStr} | Rest], Args) ->
    IncludeDocs =
        case list_to_integer(RStr) of
            1 ->
                true;
            R when R > 1 ->
                % We don't load the doc in the view query because
                % we have to do a quorum read in the coordinator
                % so there's no point.
                false
        end,
    NewArgs = Args#mrargs{include_docs = IncludeDocs},
    apply_opts(Rest, NewArgs);
apply_opts([{conflicts, true} | Rest], Args) ->
    NewArgs = Args#mrargs{conflicts = true},
    apply_opts(Rest, NewArgs);
apply_opts([{conflicts, false} | Rest], Args) ->
    % Ignored cause default
    apply_opts(Rest, Args);
apply_opts([{sort, Sort} | Rest], Args) ->
    % We only support single direction sorts
    % so nothing fancy here.
    case mango_sort:directions(Sort) of
        [] ->
            apply_opts(Rest, Args);
        [<<"asc">> | _] ->
            apply_opts(Rest, Args);
        [<<"desc">> | _] ->
            SK = Args#mrargs.start_key,
            SKDI = Args#mrargs.start_key_docid,
            EK = Args#mrargs.end_key,
            EKDI = Args#mrargs.end_key_docid,
            NewArgs = Args#mrargs{
                direction = rev,
                start_key = EK,
                start_key_docid = EKDI,
                end_key = SK,
                end_key_docid = SKDI
            },
            apply_opts(Rest, NewArgs)
    end;
apply_opts([{stale, ok} | Rest], Args) ->
    NewArgs = Args#mrargs{
        stable = true,
        update = false
    },
    apply_opts(Rest, NewArgs);
apply_opts([{stable, true} | Rest], Args) ->
    NewArgs = Args#mrargs{
        stable = true
    },
    apply_opts(Rest, NewArgs);
apply_opts([{update, false} | Rest], Args) ->
    NewArgs = Args#mrargs{
        update = false
    },
    apply_opts(Rest, NewArgs);
apply_opts([{partition, <<>>} | Rest], Args) ->
    apply_opts(Rest, Args);
apply_opts([{partition, Partition} | Rest], Args) when is_binary(Partition) ->
    NewArgs = couch_mrview_util:set_extra(Args, partition, Partition),
    apply_opts(Rest, NewArgs);
apply_opts([{_, _} | Rest], Args) ->
    % Ignore unknown options
    apply_opts(Rest, Args).

-spec consider_index_coverage(#idx{}, fields(), #mrargs{}) -> #mrargs{}.
consider_index_coverage(Index, Fields, #mrargs{include_docs = IncludeDocs0} = Args0) ->
    Covering = mango_idx_view:covers(Index, Fields),
    Args = Args0#mrargs{include_docs = IncludeDocs0 andalso (not Covering)},
    #mrargs{include_docs = IncludeDocs, extra = Extra0} = Args,
    case
        {
            IncludeDocs,
            Covering,
            couch_util:get_value(callback_args, Extra0)
        }
    of
        {false, true, ViewCBArgs0} when ViewCBArgs0 =/= undefined ->
            VCBSelector = viewcbargs_get(selector, ViewCBArgs0),
            VCBFields = viewcbargs_get(fields, ViewCBArgs0),
            ViewCBArgs = viewcbargs_new(VCBSelector, VCBFields, Index),
            Extra = couch_util:set_value(callback_args, Extra0, ViewCBArgs),
            Args#mrargs{extra = Extra};
        _ ->
            Args
    end.

-spec doc_member_and_extract(#cursor{}, row_properties()) -> Result when
    Result ::
        {ok | no_match, term(), {execution_stats, shard_stats()}}
        | {no_match, null, {execution_stats, shard_stats()}}
        | any().
doc_member_and_extract(Cursor, RowProps) ->
    Db = Cursor#cursor.db,
    Opts = Cursor#cursor.opts,
    ExecutionStats = Cursor#cursor.execution_stats,
    Selector = Cursor#cursor.selector,
    case couch_util:get_value(doc, RowProps) of
        {DocProps} ->
            % If the query doesn't request quorum doc read via r>1,
            % match_and_extract_doc/3 is executed in view_cb, ie, locally
            % on the shard. We only receive back the final result for the query.
            % TODO during upgrade, some nodes will not be processing `fields`
            % on the shard because they're old, so re-execute here just in case.
            % Remove this later, same time as the duplicate extract at the coordinator.
            DocProps2 = mango_fields:extract({DocProps}, Cursor#cursor.fields),
            {ok, DocProps2, {execution_stats, ExecutionStats}};
        undefined ->
            % an undefined doc was returned, indicating we should
            % perform a quorum fetch
            ExecutionStats1 = mango_execution_stats:incr_quorum_docs_examined(ExecutionStats),
            couch_stats:increment_counter([mango, quorum_docs_examined]),
            Id = couch_util:get_value(id, RowProps),
            case mango_util:defer(fabric, open_doc, [Db, Id, Opts]) of
                {ok, #doc{} = DocProps} ->
                    Doc = couch_doc:to_json_obj(DocProps, []),
                    case match_and_extract_doc(Doc, Selector, Cursor#cursor.fields) of
                        {match, FinalDoc} ->
                            {ok, FinalDoc, {execution_stats, ExecutionStats1}};
                        {no_match, undefined} ->
                            {no_match, Doc, {execution_stats, ExecutionStats1}}
                    end;
                Else ->
                    Else
            end;
        _ ->
            % no doc, no match
            {no_match, null, {execution_stats, ExecutionStats}}
    end.

-spec is_design_doc(row_properties()) -> boolean().
is_design_doc(RowProps) ->
    case couch_util:get_value(id, RowProps) of
        <<"_design/", _/binary>> -> true;
        _ -> false
    end.

-spec update_bookmark_keys(#cursor{}, row_properties()) -> #cursor{}.
update_bookmark_keys(#cursor{limit = Limit} = Cursor, Props) when Limit > 0 ->
    Id = couch_util:get_value(id, Props),
    Key = couch_util:get_value(key, Props),
    Cursor#cursor{
        bookmark_docid = Id,
        bookmark_key = Key
    };
update_bookmark_keys(Cursor, _Props) ->
    Cursor.

%%%%%%%% module tests below %%%%%%%%

-ifdef(TEST).
-include_lib("couch/include/couch_eunit.hrl").

viewcbargs_test() ->
    ViewCBArgs = viewcbargs_new(selector, fields, index),
    ?assertEqual(selector, viewcbargs_get(selector, ViewCBArgs)),
    ?assertEqual(fields, viewcbargs_get(fields, ViewCBArgs)),
    ?assertEqual(index, viewcbargs_get(covering_index, ViewCBArgs)),
    ?assertError(function_clause, viewcbargs_get(something_else, ViewCBArgs)).

maybe_replace_max_json_test() ->
    ?assertEqual([], maybe_replace_max_json([])),
    ?assertEqual(<<"<MAX>">>, maybe_replace_max_json(?MAX_STR)),
    ?assertEqual(
        [val1, val2, <<"<MAX>">>, val3], maybe_replace_max_json([val1, val2, ?MAX_JSON_OBJ, val3])
    ),
    ?assertEqual(something, maybe_replace_max_json(something)).

base_opts_test() ->
    Index =
        #idx{
            type = <<"json">>,
            def = {[{<<"fields">>, {[{field1, undefined}, {field2, undefined}]}}]}
        },
    Fields = [field1, field2],
    Cursor =
        #cursor{
            index = Index,
            selector = selector,
            fields = Fields,
            ranges = [{'$gte', start_key, '$lte', end_key}]
        },
    Extra =
        [
            {callback, {mango_cursor_view, view_cb}},
            {selector, selector},
            {callback_args, #{
                selector => selector,
                fields => Fields,
                covering_index => undefined
            }},
            {ignore_partition_query_limit, true}
        ],
    MRArgs =
        #mrargs{
            view_type = map,
            reduce = false,
            start_key = [start_key],
            end_key = [end_key, ?MAX_JSON_OBJ],
            include_docs = true,
            extra = Extra
        },
    ?assertEqual(MRArgs, base_args(Cursor)).

apply_opts_empty_test() ->
    ?assertEqual(args, apply_opts([], args)).

apply_opts_r_test() ->
    Args = #mrargs{},
    ArgsWithDocs = Args#mrargs{include_docs = true},
    ?assertEqual(ArgsWithDocs, apply_opts([{r, "1"}], Args)),
    ArgsWithoutDocs = Args#mrargs{include_docs = false},
    ?assertEqual(ArgsWithoutDocs, apply_opts([{r, "3"}], Args)).

apply_opts_conflicts_test() ->
    Args = #mrargs{},
    ArgsWithConflicts = Args#mrargs{conflicts = true},
    ?assertEqual(ArgsWithConflicts, apply_opts([{conflicts, true}], Args)),
    ArgsWithoutConflicts = Args#mrargs{conflicts = undefined},
    ?assertEqual(ArgsWithoutConflicts, apply_opts([{conflicts, false}], Args)).

apply_opts_sort_test() ->
    Args =
        #mrargs{
            start_key = start_key,
            start_key_docid = start_key_docid,
            end_key = end_key,
            end_key_docid = end_key_docid
        },
    ?assertEqual(Args, apply_opts([{sort, {[]}}], Args)),
    ?assertEqual(Args, apply_opts([{sort, {[{field1, <<"asc">>}]}}], Args)),
    ?assertEqual(Args, apply_opts([{sort, {[{field1, <<"asc">>}, {field2, <<"desc">>}]}}], Args)),
    ArgsWithSort =
        Args#mrargs{
            direction = rev,
            start_key = end_key,
            start_key_docid = end_key_docid,
            end_key = start_key,
            end_key_docid = start_key_docid
        },
    ?assertEqual(ArgsWithSort, apply_opts([{sort, {[{field1, <<"desc">>}]}}], Args)).

apply_opts_stale_test() ->
    Args = #mrargs{},
    ArgsWithStale = Args#mrargs{stable = true, update = false},
    ?assertEqual(ArgsWithStale, apply_opts([{stale, ok}], Args)).

apply_opts_stable_test() ->
    Args = #mrargs{},
    ArgsWithStable = Args#mrargs{stable = true},
    ?assertEqual(ArgsWithStable, apply_opts([{stable, true}], Args)),
    ArgsWithoutStable = Args#mrargs{stable = false},
    ?assertEqual(ArgsWithoutStable, apply_opts([{stable, false}], Args)).

apply_opts_update_test() ->
    Args = #mrargs{},
    ArgsWithUpdate = Args#mrargs{update = true},
    ?assertEqual(ArgsWithUpdate, apply_opts([{update, true}], Args)),
    ArgsWithoutUpdate = Args#mrargs{update = false},
    ?assertEqual(ArgsWithoutUpdate, apply_opts([{update, false}], Args)).

apply_opts_partition_test() ->
    Args = #mrargs{},
    ArgsWithPartition = Args#mrargs{extra = [{partition, <<"partition">>}]},
    ?assertEqual(ArgsWithPartition, apply_opts([{partition, <<"partition">>}], Args)),
    ArgsWithoutPartition = Args#mrargs{extra = []},
    ?assertEqual(ArgsWithoutPartition, apply_opts([{partition, <<>>}], Args)).

consider_index_coverage_positive_test() ->
    Index =
        #idx{
            type = <<"json">>,
            def = {[{<<"fields">>, {[]}}]}
        },
    Fields = [<<"_id">>],
    MRArgs =
        #mrargs{
            include_docs = true,
            extra = [{callback_args, viewcbargs_new(selector, fields, undefined)}]
        },
    MRArgsRef =
        MRArgs#mrargs{
            include_docs = false,
            extra = [{callback_args, viewcbargs_new(selector, fields, Index)}]
        },
    ?assertEqual(MRArgsRef, consider_index_coverage(Index, Fields, MRArgs)),
    MRArgs1 = MRArgs#mrargs{include_docs = false},
    ?assertEqual(MRArgsRef, consider_index_coverage(Index, Fields, MRArgs1)).

consider_index_coverage_negative_test() ->
    Index = undefined,
    Fields = all_fields,
    MRArgs = #mrargs{include_docs = true},
    ?assertEqual(MRArgs, consider_index_coverage(Index, Fields, MRArgs)),
    MRArgs1 = #mrargs{include_docs = false},
    ?assertEqual(MRArgs1, consider_index_coverage(Index, Fields, MRArgs1)),
    % no extra attributes hence no effect
    Index1 =
        #idx{
            type = <<"json">>,
            def = {[{<<"fields">>, {[]}}]}
        },
    MRArgs2 = #mrargs{include_docs = false},
    ?assertEqual(MRArgs1, consider_index_coverage(Index1, [<<"_id">>], MRArgs2)).

derive_doc_from_index_test() ->
    Index =
        #idx{
            type = <<"json">>,
            def = {[{<<"fields">>, {[{<<"field1">>, undefined}, {<<"field2">>, undefined}]}}]}
        },
    DocId = doc_id,
    Keys = [key1, key2],
    ViewRow = #view_row{id = DocId, key = Keys},
    Doc = {[{<<"_id">>, DocId}, {<<"field2">>, key2}, {<<"field1">>, key1}]},
    ?assertEqual(Doc, derive_doc_from_index(Index, ViewRow)).

composite_indexes_test() ->
    ?assertEqual([], composite_indexes([], [])),
    Index1 =
        #idx{
            type = <<"json">>,
            def = {[{<<"fields">>, {[{field1, undefined}, {field2, undefined}]}}]}
        },
    Index2 =
        #idx{
            type = <<"json">>,
            def = {[{<<"fields">>, {[{field1, undefined}, {field3, undefined}, {field4, range4}]}}]}
        },
    Index3 =
        #idx{
            type = <<"json">>,
            def = {[{<<"fields">>, {[{field3, undefined}, {field4, undefined}]}}]}
        },
    Indexes = [Index1, Index2, Index3],
    Ranges = [{field1, range1}, {field3, range3}, {field4, range4}],
    Result = [
        {Index3, [range3, range4], 1}, {Index2, [range1, range3, range4], 0}, {Index1, [range1], 2}
    ],
    ?assertEqual(Result, composite_indexes(Indexes, Ranges)).

create_test() ->
    Index = #idx{type = <<"json">>, def = {[{<<"fields">>, {[]}}]}},
    Indexes = [Index],
    Ranges = [],
    Selector = {[]},
    Options = [{limit, limit}, {skip, skip}, {fields, fields}, {bookmark, bookmark}],
    Cursor =
        #cursor{
            db = db,
            index = Index,
            ranges = Ranges,
            selector = Selector,
            opts = Options,
            limit = limit,
            skip = skip,
            fields = fields,
            bookmark = bookmark
        },
    ?assertEqual({ok, Cursor}, create(db, Indexes, Selector, Options)).

to_selector(Map) ->
    test_util:as_selector(Map).

required_fields_all_fields_test() ->
    Cursor = #cursor{fields = all_fields},
    ?assertEqual(all_fields, required_fields(Cursor)).

required_fields_disjoint_fields_test() ->
    Fields1 = [<<"field1">>, <<"field2">>, <<"field3">>],
    Selector1 = to_selector(#{}),
    Cursor1 = #cursor{fields = Fields1, selector = Selector1},
    ?assertEqual([<<"field1">>, <<"field2">>, <<"field3">>], required_fields(Cursor1)),
    Fields2 = [<<"field1">>, <<"field2">>],
    Selector2 = to_selector(#{<<"field3">> => undefined, <<"field4">> => undefined}),
    Cursor2 = #cursor{fields = Fields2, selector = to_selector(Selector2)},
    ?assertEqual(
        [<<"field1">>, <<"field2">>, <<"field3">>, <<"field4">>], required_fields(Cursor2)
    ).

required_fields_overlapping_fields_test() ->
    Fields1 = [<<"field1">>, <<"field2">>, <<"field3">>],
    Selector1 = to_selector(#{<<"field3">> => undefined, <<"field4">> => undefined}),
    Cursor1 = #cursor{fields = Fields1, selector = Selector1},
    ?assertEqual(
        [<<"field1">>, <<"field2">>, <<"field3">>, <<"field4">>], required_fields(Cursor1)
    ),
    Fields2 = [<<"field3">>, <<"field1">>, <<"field2">>],
    Selector2 = to_selector(#{<<"field4">> => undefined, <<"field1">> => undefined}),
    Cursor2 = #cursor{fields = Fields2, selector = Selector2},
    ?assertEqual(
        [<<"field1">>, <<"field2">>, <<"field3">>, <<"field4">>], required_fields(Cursor2)
    ).

explain_test() ->
    Cursor =
        #cursor{
            ranges = [empty],
            fields = all_fields,
            opts = []
        },
    Response =
        [
            {mrargs,
                {[
                    {include_docs, true},
                    {view_type, map},
                    {reduce, false},
                    {partition, null},
                    {start_key, null},
                    {end_key, null},
                    {direction, fwd},
                    {stable, false},
                    {update, true},
                    {conflicts, undefined}
                ]}},
            {covered, false}
        ],
    ?assertEqual(Response, explain(Cursor)).

execute_test_() ->
    {
        foreach,
        fun() ->
            meck:new(foo, [non_strict]),
            meck:new(fabric)
        end,
        fun(_) ->
            meck:unload(fabric),
            meck:unload(foo)
        end,
        [
            ?TDEF_FE(t_execute_empty),
            ?TDEF_FE(t_execute_ok_all_docs),
            ?TDEF_FE(t_execute_ok_query_view),
            ?TDEF_FE(t_execute_error)
        ]
    }.

t_execute_empty(_) ->
    Cursor = #cursor{ranges = [empty]},
    meck:expect(fabric, all_docs, ['_', '_', '_', '_', '_'], meck:val(error)),
    meck:expect(fabric, query_view, ['_', '_', '_', '_', '_', '_'], meck:val(error)),
    ?assertEqual({ok, accumulator}, execute(Cursor, undefined, accumulator)),
    ?assertNot(meck:called(fabric, all_docs, '_')),
    ?assertNot(meck:called(fabric, query_view, '_')).

t_execute_ok_all_docs(_) ->
    Bookmark = bookmark,
    UserFnParameters = [{add_key, bookmark, Bookmark}, accumulator],
    meck:expect(foo, bar, UserFnParameters, meck:val({undefined, updated_accumulator})),
    Index = #idx{type = <<"json">>, def = all_docs},
    Selector = {[]},
    Fields = all_fields,
    Cursor =
        #cursor{
            index = Index,
            db = db,
            selector = Selector,
            fields = Fields,
            ranges = [{'$gte', start_key, '$lte', end_key}],
            opts = [{user_ctx, user_ctx}],
            bookmark = nil
        },
    Cursor1 =
        Cursor#cursor{
            user_acc = accumulator,
            user_fun = fun foo:bar/2,
            execution_stats = '_'
        },
    Cursor2 =
        Cursor1#cursor{
            bookmark = Bookmark,
            bookmark_docid = undefined,
            bookmark_key = undefined,
            execution_stats = #execution_stats{executionStartTime = {0, 0, 0}}
        },
    Extra =
        [
            {callback, {mango_cursor_view, view_cb}},
            {selector, Selector},
            {callback_args, #{
                selector => Selector,
                fields => Fields,
                covering_index => undefined
            }},
            {ignore_partition_query_limit, true}
        ],
    Args =
        #mrargs{
            view_type = map,
            reduce = false,
            start_key = [start_key],
            end_key = [end_key, ?MAX_JSON_OBJ],
            include_docs = true,
            extra = Extra
        },
    Parameters = [
        db, [{user_ctx, user_ctx}], fun mango_cursor_view:handle_all_docs_message/2, Cursor1, Args
    ],
    meck:expect(fabric, all_docs, Parameters, meck:val({ok, Cursor2})),
    ?assertEqual({ok, updated_accumulator}, execute(Cursor, fun foo:bar/2, accumulator)),
    ?assert(meck:called(fabric, all_docs, '_')).

t_execute_ok_query_view(_) ->
    Bookmark = bookmark,
    UserFnParameters = [{add_key, bookmark, Bookmark}, accumulator],
    meck:expect(foo, bar, UserFnParameters, meck:val({undefined, updated_accumulator})),
    Index =
        #idx{
            type = <<"json">>,
            ddoc = <<"_design/ghibli">>,
            name = index_name,
            def = {[{<<"fields">>, {[{field1, undefined}]}}]}
        },
    Selector = {[]},
    Fields = all_fields,
    Cursor =
        #cursor{
            index = Index,
            db = db,
            selector = Selector,
            fields = Fields,
            ranges = [{'$gte', start_key, '$lte', end_key}],
            opts = [{user_ctx, user_ctx}],
            bookmark = nil
        },
    Cursor1 =
        Cursor#cursor{
            user_acc = accumulator,
            user_fun = fun foo:bar/2,
            execution_stats = '_'
        },
    Cursor2 =
        Cursor1#cursor{
            bookmark = Bookmark,
            bookmark_docid = undefined,
            bookmark_key = undefined,
            execution_stats = #execution_stats{executionStartTime = {0, 0, 0}}
        },
    Extra =
        [
            {callback, {mango_cursor_view, view_cb}},
            {selector, Selector},
            {callback_args, #{
                selector => Selector,
                fields => Fields,
                covering_index => undefined
            }},
            {ignore_partition_query_limit, true}
        ],
    Args =
        #mrargs{
            view_type = map,
            reduce = false,
            start_key = [start_key],
            end_key = [end_key, ?MAX_JSON_OBJ],
            include_docs = true,
            extra = Extra
        },
    Parameters = [
        db,
        [{user_ctx, user_ctx}],
        <<"ghibli">>,
        index_name,
        fun mango_cursor_view:handle_message/2,
        Cursor1,
        Args
    ],
    meck:expect(fabric, query_view, Parameters, meck:val({ok, Cursor2})),
    ?assertEqual({ok, updated_accumulator}, execute(Cursor, fun foo:bar/2, accumulator)),
    ?assert(meck:called(fabric, query_view, '_')).

t_execute_error(_) ->
    Cursor =
        #cursor{
            index = #idx{type = <<"json">>, ddoc = <<"_design/ghibli">>, name = index_name},
            db = db,
            selector = {[]},
            fields = all_fields,
            ranges = [{'$gte', start_key, '$lte', end_key}],
            opts = [{user_ctx, user_ctx}],
            bookmark = nil
        },
    Parameters = [
        db, '_', <<"ghibli">>, index_name, fun mango_cursor_view:handle_message/2, '_', '_'
    ],
    meck:expect(fabric, query_view, Parameters, meck:val({error, reason})),
    ?assertEqual({error, reason}, execute(Cursor, undefined, accumulator)).

view_cb_test_() ->
    {
        foreach,
        fun() ->
            meck:new(rexi)
        end,
        fun(_) ->
            meck:unload(rexi)
        end,
        [
            ?TDEF_FE(t_view_cb_meta),
            ?TDEF_FE(t_view_cb_row_matching_regular_doc),
            ?TDEF_FE(t_view_cb_row_non_matching_regular_doc),
            ?TDEF_FE(t_view_cb_row_null_doc),
            ?TDEF_FE(t_view_cb_row_missing_doc_triggers_quorum_fetch),
            ?TDEF_FE(t_view_cb_row_matching_covered_doc),
            ?TDEF_FE(t_view_cb_row_non_matching_covered_doc),
            ?TDEF_FE(t_view_cb_row_backwards_compatible),
            ?TDEF_FE(t_view_cb_complete),
            ?TDEF_FE(t_view_cb_ok)
        ]
    }.

t_view_cb_meta(_) ->
    meck:expect(rexi, stream2, [{meta, meta}], meck:val(ok)),
    ?assertEqual({ok, accumulator}, view_cb({meta, meta}, accumulator)),
    ?assert(meck:called(rexi, stream2, '_')).

t_view_cb_row_matching_regular_doc(_) ->
    Row = [{id, id}, {key, key}, {doc, doc}],
    Result = #view_row{id = id, key = key, doc = doc},
    meck:expect(rexi, stream2, [Result], meck:val(ok)),
    Accumulator =
        #mrargs{
            extra = [
                {callback_args, #{
                    selector => {[]},
                    fields => all_fields,
                    covering_index => undefined
                }}
            ]
        },
    put(mango_docs_examined, 0),
    ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
    ?assert(meck:called(rexi, stream2, '_')).

t_view_cb_row_non_matching_regular_doc(_) ->
    Doc = {[]},
    Row = [{id, id}, {key, key}, {doc, Doc}],
    meck:expect(rexi, stream2, ['_'], undefined),
    Accumulator =
        #mrargs{
            extra = [
                {callback_args, #{
                    selector => {[{<<"field">>, {[{<<"$exists">>, true}]}}]},
                    fields => all_fields,
                    covering_index => undefined
                }}
            ]
        },
    put(mango_docs_examined, 0),
    put(mango_last_msg_timestamp, os:timestamp()),
    ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
    ?assertNot(meck:called(rexi, stream2, '_')).

t_view_cb_row_null_doc(_) ->
    Row = [{id, id}, {key, key}, {doc, null}],
    meck:expect(rexi, stream2, ['_'], undefined),
    Accumulator =
        #mrargs{
            extra = [
                {callback_args, #{
                    selector => {[]},
                    fields => all_fields,
                    covering_index => undefined
                }}
            ]
        },
    put(mango_last_msg_timestamp, os:timestamp()),
    ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
    ?assertNot(meck:called(rexi, stream2, '_')).

t_view_cb_row_missing_doc_triggers_quorum_fetch(_) ->
    Row = [{id, id}, {key, key}, {doc, undefined}],
    ViewRow = #view_row{id = id, key = key, doc = undefined},
    meck:expect(rexi, stream2, [ViewRow], meck:val(ok)),
    Accumulator =
        #mrargs{
            extra = [
                {callback_args, #{
                    selector => {[]},
                    fields => all_fields,
                    covering_index => undefined
                }}
            ]
        },
    ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
    ?assert(meck:called(rexi, stream2, '_')).

t_view_cb_row_matching_covered_doc(_) ->
    Keys = [key1, key2],
    Row = [{id, id}, {key, Keys}, {doc, undefined}],
    Doc = {[{<<"field1">>, key1}, {<<"field2">>, key2}]},
    Result = #view_row{id = id, key = Keys, doc = Doc},
    Fields = [<<"field1">>, <<"field2">>],
    Index =
        #idx{
            type = <<"json">>,
            def = {[{<<"fields">>, {[{<<"field1">>, undefined}, {<<"field2">>, undefined}]}}]}
        },
    meck:expect(rexi, stream2, [Result], meck:val(ok)),
    Accumulator =
        #mrargs{
            extra = [
                {callback_args, #{
                    selector => {[]},
                    fields => Fields,
                    covering_index => Index
                }}
            ]
        },
    ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
    ?assert(meck:called(rexi, stream2, '_')).

t_view_cb_row_non_matching_covered_doc(_) ->
    Row = [{id, id}, {key, [key1, key2]}, {doc, undefined}],
    Fields = [<<"field1">>, <<"field2">>],
    Index =
        #idx{
            type = <<"json">>,
            def = {[{<<"fields">>, {[{<<"field1">>, undefined}, {<<"field2">>, undefined}]}}]}
        },
    meck:expect(rexi, stream2, ['_'], undefined),
    Accumulator =
        #mrargs{
            extra = [
                {callback_args, #{
                    selector => {[{<<"field">>, {[{<<"$exists">>, true}]}}]},
                    fields => Fields,
                    covering_index => Index
                }}
            ]
        },
    put(mango_last_msg_timestamp, os:timestamp()),
    ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
    ?assertNot(meck:called(rexi, stream2, '_')).

t_view_cb_row_backwards_compatible(_) ->
    Row = [{id, id}, {key, key}, {doc, null}],
    meck:expect(rexi, stream2, ['_'], undefined),
    Accumulator = #mrargs{extra = [{selector, {[]}}]},
    put(mango_last_msg_timestamp, os:timestamp()),
    ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
    ?assertNot(meck:called(rexi, stream2, '_')).

t_view_cb_complete(_) ->
    meck:expect(rexi, stream2, [{execution_stats, {docs_examined, '_'}}], meck:val(ok)),
    meck:expect(rexi, stream_last, [complete], meck:val(ok)),
    ?assertEqual({ok, accumulator}, view_cb(complete, accumulator)),
    ?assert(meck:called(rexi, stream2, '_')),
    ?assert(meck:called(rexi, stream_last, '_')).

t_view_cb_ok(_) ->
    meck:expect(rexi, reply, [{ok, ddoc_updated}], meck:val(ok)),
    view_cb(ok, ddoc_updated),
    ?assert(meck:called(rexi, reply, '_')).

maybe_send_mango_ping_test_() ->
    {
        foreach,
        fun() ->
            meck:new(rexi)
        end,
        fun(_) ->
            meck:unload(rexi)
        end,
        [
            ?TDEF_FE(t_maybe_send_mango_ping_nop),
            ?TDEF_FE(t_maybe_send_mango_ping_happens)
        ]
    }.

t_maybe_send_mango_ping_nop(_) ->
    put(mango_last_msg_timestamp, os:timestamp()),
    meck:expect(rexi, ping, [], meck:val(error)),
    ?assertEqual(ok, maybe_send_mango_ping()),
    ?assertNot(meck:called(rexi, ping, '_')).

t_maybe_send_mango_ping_happens(_) ->
    put(mango_last_msg_timestamp, {0, 0, 0}),
    meck:expect(rexi, ping, [], meck:val(ok)),
    maybe_send_mango_ping(),
    ?assert(meck:called(rexi, ping, '_')),
    Timestamp = get(mango_last_msg_timestamp),
    ?assertNotEqual(Timestamp, {0, 0, 0}).

ddocid_test() ->
    ?assertEqual(<<"name">>, ddocid(#idx{ddoc = <<"_design/name">>})),
    ?assertEqual(something_else, ddocid(#idx{ddoc = something_else})).

is_design_doc_test() ->
    ?assert(is_design_doc([{id, <<"_design/name">>}])),
    ?assertNot(is_design_doc([{id, something_else}])).

handle_message_test_() ->
    {
        foreach,
        fun() ->
            meck:new(foo, [non_strict])
        end,
        fun(_) ->
            meck:unload(foo)
        end,
        [
            ?TDEF_FE(t_handle_message_meta),
            ?TDEF_FE(t_handle_message_row_ok_above_limit),
            ?TDEF_FE(t_handle_message_row_ok_at_limit),
            ?TDEF_FE(t_handle_message_row_ok_skip),
            ?TDEF_FE(t_handle_message_row_ok_triggers_quorum_fetch_match),
            ?TDEF_FE(t_handle_message_row_ok_triggers_quorum_fetch_no_match),
            ?TDEF_FE(t_handle_message_row_no_match),
            ?TDEF_FE(t_handle_message_row_error),
            ?TDEF_FE(t_handle_message_execution_stats),
            ?TDEF_FE(t_handle_message_complete),
            ?TDEF_FE(t_handle_message_error)
        ]
    }.

t_handle_message_meta(_) ->
    ?assertEqual({ok, cursor}, handle_message({meta, undefined}, cursor)).

t_handle_message_row_ok_above_limit(_) ->
    Doc = {[{<<"field1">>, value1}, {<<"field2">>, value2}]},
    meck:expect(foo, bar, [{row, Doc}, accumulator], meck:val({go, updated_accumulator})),
    Cursor =
        #cursor{
            execution_stats = #execution_stats{resultsReturned = 0},
            fields = all_fields,
            limit = 9,
            user_acc = accumulator,
            user_fun = fun foo:bar/2
        },
    Row = [{id, id}, {key, key}, {doc, Doc}],
    Cursor1 =
        Cursor#cursor{
            execution_stats = #execution_stats{resultsReturned = 1},
            limit = 8,
            user_acc = updated_accumulator,
            bookmark_docid = id,
            bookmark_key = key
        },
    ?assertEqual({go, Cursor1}, handle_message({row, Row}, Cursor)).

t_handle_message_row_ok_at_limit(_) ->
    Cursor =
        #cursor{
            execution_stats = #execution_stats{resultsReturned = n},
            fields = all_fields,
            limit = 0
        },
    Row = [{doc, {[]}}],
    ?assertEqual({stop, Cursor}, handle_message({row, Row}, Cursor)).

t_handle_message_row_ok_skip(_) ->
    Cursor =
        #cursor{
            execution_stats = #execution_stats{resultsReturned = n},
            fields = all_fields,
            skip = 8
        },
    Row = [{doc, {[]}}],
    Cursor1 = Cursor#cursor{skip = 7},
    ?assertEqual({ok, Cursor1}, handle_message({row, Row}, Cursor)).

t_handle_message_row_ok_triggers_quorum_fetch_match(_) ->
    Doc = #doc{id = id, body = {[{<<"field">>, something}]}},
    Object = {[{<<"_id">>, id}, {<<"field">>, something}]},
    meck:expect(foo, bar, [{row, Object}, accumulator], meck:val({go, updated_accumulator})),
    Cursor =
        #cursor{
            db = db,
            opts = opts,
            execution_stats =
                #execution_stats{
                    totalQuorumDocsExamined = 0,
                    resultsReturned = 0
                },
            fields = all_fields,
            selector = {[{<<"field">>, {[{<<"$exists">>, true}]}}]},
            user_fun = fun foo:bar/2,
            user_acc = accumulator,
            limit = 1
        },
    Row = [{id, id}, {doc, undefined}],
    Cursor1 =
        Cursor#cursor{
            execution_stats =
                #execution_stats{
                    totalQuorumDocsExamined = 1,
                    resultsReturned = 1
                },
            user_acc = updated_accumulator,
            limit = 0,
            bookmark_docid = id
        },
    meck:expect(mango_util, defer, [fabric, open_doc, [db, id, opts]], meck:val({ok, Doc})),
    ?assertEqual({go, Cursor1}, handle_message({row, Row}, Cursor)),
    ?assert(meck:called(mango_util, defer, '_')),
    meck:delete(mango_util, defer, 3).

t_handle_message_row_ok_triggers_quorum_fetch_no_match(_) ->
    Cursor =
        #cursor{
            db = db,
            opts = opts,
            execution_stats = #execution_stats{totalQuorumDocsExamined = 0},
            fields = all_fields,
            selector = {[{<<"field">>, {[{<<"$exists">>, true}]}}]}
        },
    Row = [{id, id}, {doc, undefined}],
    Cursor1 =
        Cursor#cursor{
            execution_stats = #execution_stats{totalQuorumDocsExamined = 1}
        },
    Doc = #doc{id = id, body = {[]}},
    meck:expect(mango_util, defer, [fabric, open_doc, [db, id, opts]], meck:val({ok, Doc})),
    ?assertEqual({ok, Cursor1}, handle_message({row, Row}, Cursor)),
    ?assert(meck:called(mango_util, defer, '_')),
    meck:delete(mango_util, defer, 3).

t_handle_message_row_no_match(_) ->
    Cursor =
        #cursor{
            execution_stats = #execution_stats{resultsReturned = n}
        },
    Row = [{doc, null}],
    ?assertEqual({ok, Cursor}, handle_message({row, Row}, Cursor)).

t_handle_message_row_error(_) ->
    Cursor =
        #cursor{
            db = db,
            opts = opts,
            execution_stats = #execution_stats{totalQuorumDocsExamined = 0}
        },
    Row = [{id, id}, {doc, undefined}],
    meck:expect(mango_util, defer, [fabric, open_doc, [db, id, opts]], meck:val(error)),
    meck:expect(couch_log, error, ['_', [mango_cursor_view, error]], meck:val(ok)),
    ?assertEqual({ok, Cursor}, handle_message({row, Row}, Cursor)),
    ?assert(meck:called(mango_util, defer, '_')),
    ?assert(meck:called(couch_log, error, '_')),
    meck:delete(mango_util, defer, 3),
    meck:delete(couch_log, error, 2).

t_handle_message_execution_stats(_) ->
    ShardStats = {docs_examined, 42},
    ExecutionStats = #execution_stats{totalDocsExamined = 11},
    ExecutionStats1 = #execution_stats{totalDocsExamined = 53},
    Cursor = #cursor{execution_stats = ExecutionStats},
    Cursor1 = #cursor{execution_stats = ExecutionStats1},
    ?assertEqual({ok, Cursor1}, handle_message({execution_stats, ShardStats}, Cursor)).

t_handle_message_complete(_) ->
    ?assertEqual({ok, cursor}, handle_message(complete, cursor)).

t_handle_message_error(_) ->
    ?assertEqual({error, reason}, handle_message({error, reason}, undefined)).

handle_all_docs_message_ddoc_test() ->
    Row = [{id, <<"_design/foobar">>}],
    ?assertEqual({ok, cursor}, handle_all_docs_message({row, Row}, cursor)).

handle_all_docs_message_row_test() ->
    Cursor =
        #cursor{
            execution_stats = #execution_stats{resultsReturned = n}
        },
    Row = [{doc, null}],
    ?assertEqual({ok, Cursor}, handle_all_docs_message({row, Row}, Cursor)).

handle_all_docs_message_regular_test() ->
    ?assertEqual(handle_message(complete, cursor), handle_all_docs_message(complete, cursor)).

%% Test the doc_member_and_extract bypasses the selector check if it receives
%% a document in RowProps.doc.
does_not_refetch_doc_with_value_test() ->
    Cursor = #cursor{
        db = <<"db">>,
        opts = [],
        execution_stats = #execution_stats{},
        selector = mango_selector:normalize({[{<<"user_id">>, <<"1234">>}]})
    },
    RowProps = [
        {id, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
        {key, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
        {doc,
            {
                [
                    {<<"_id">>, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
                    {<<"_rev">>, <<"1-a954fe2308f14307756067b0e18c2968">>},
                    {<<"user_id">>, 11}
                ]
            }}
    ],
    {Match, _, _} = doc_member_and_extract(Cursor, RowProps),
    ?assertEqual(Match, ok).

%% Test that field filtering is duplicated in doc_member_and_extract even when
%% returning a value via RowProps.doc (ie, should have been done on the shard).
%% This is needed temporarily for mixed version upgrades, as some shards may
%% not have performed the field extraction. This can be later removed.
doc_member_and_extract_fields_test() ->
    Cursor = #cursor{
        db = <<"db">>,
        opts = [],
        execution_stats = #execution_stats{},
        %% no selector here as we should be bypassing this in the case of
        %% shard level selector application.
        fields = [<<"user_id">>, <<"a_non_existent_field">>]
    },
    RowProps = [
        {id, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
        {key, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
        {doc,
            {
                [
                    {<<"_id">>, <<"b06aadcf-cd0f-4ca6-9f7e-2c993e48d4c4">>},
                    {<<"_rev">>, <<"1-a954fe2308f14307756067b0e18c2968">>},
                    {<<"user_id">>, 11}
                ]
            }}
    ],
    {Match, Doc, _} = doc_member_and_extract(Cursor, RowProps),
    ?assertEqual(ok, Match),
    ?assertEqual({[{<<"user_id">>, 11}]}, Doc).

%% match_and_extract_doc should return full Doc when Doc matches Selector and
%% Fields is undefined.
match_and_extract_doc_match_test() ->
    Doc = {[{<<"_id">>, <<"myid">>}, {<<"_rev">>, <<"myrev">>}, {<<"user_id">>, 11}]},
    Selector = mango_selector:normalize({[{<<"user_id">>, 11}]}),
    Fields = undefined,
    {Match, FinalDoc} = match_and_extract_doc(Doc, Selector, Fields),
    ?assertEqual(match, Match),
    ?assertEqual(Doc, FinalDoc).

%% match_and_extract_doc should return projected Doc when Doc matches Selector
%% and Fields is a list of fields.
match_and_extract_doc_matchextract_test() ->
    Doc = {[{<<"_id">>, <<"myid">>}, {<<"_rev">>, <<"myrev">>}, {<<"user_id">>, 11}]},
    Selector = mango_selector:normalize({[{<<"user_id">>, 11}]}),
    Fields = [<<"_id">>, <<"user_id">>],
    {Match, FinalDoc} = match_and_extract_doc(Doc, Selector, Fields),
    ?assertEqual(match, Match),
    ?assertEqual({[{<<"_id">>, <<"myid">>}, {<<"user_id">>, 11}]}, FinalDoc).

%% match_and_extract_doc should return no document when Doc does not match
%% Selector.
match_and_extract_doc_nomatch_test() ->
    Doc = {[{<<"_id">>, <<"myid">>}, {<<"_rev">>, <<"myrev">>}, {<<"user_id">>, 11}]},
    Selector = mango_selector:normalize({[{<<"user_id">>, <<"1234">>}]}),
    Fields = undefined,
    {Match, FinalDoc} = match_and_extract_doc(Doc, Selector, Fields),
    ?assertEqual(no_match, Match),
    ?assertEqual(undefined, FinalDoc).

%% match_and_extract_doc should return no document when Doc does not match
%% Selector even if Fields is defined.
match_and_extract_doc_nomatch_fields_test() ->
    Doc = {[{<<"_id">>, <<"myid">>}, {<<"_rev">>, <<"myrev">>}, {<<"user_id">>, 11}]},
    Selector = mango_selector:normalize({[{<<"user_id">>, 1234}]}),
    Fields = [<<"_id">>, <<"user_id">>],
    {Match, FinalDoc} = match_and_extract_doc(Doc, Selector, Fields),
    ?assertEqual(no_match, Match),
    ?assertEqual(undefined, FinalDoc).

%% Query planner tests:
%% - there should be no comparison for a singleton list, with a trivial result
choose_best_index_with_singleton_test() ->
    ?assertEqual({index, ranges}, choose_best_index([{index, ranges, undefined}])).

%% - choose the index with the lowest difference between its prefix and field ranges
choose_best_index_lowest_difference_test() ->
    IndexRanges1 =
        [
            {index1, ranges1, 3},
            {index2, ranges2, 2},
            {index3, ranges3, 1}
        ],
    ?assertEqual({index3, ranges3}, choose_best_index(IndexRanges1)),
    IndexRanges2 =
        [
            {index1, ranges1, 3},
            {index2, ranges2, 1},
            {index3, ranges3, 2}
        ],
    ?assertEqual({index2, ranges2}, choose_best_index(IndexRanges2)).

%% - if that is equal, choose the index with the least number of fields in the index
choose_best_index_least_number_of_fields_test() ->
    Index = json_index(dbname, design_document, index_name),
    [Index1, Index2, Index3] = [with_dummy_columns(Index, N) || N <- [6, 3, 9]],
    IndexRanges =
        [
            {Index1, ranges1, 1},
            {Index2, ranges2, 1},
            {Index3, ranges3, 1}
        ],
    ?assertEqual({Index2, ranges2}, choose_best_index(IndexRanges)).

%% - otherwise, choose alphabetically based on the index properties:
choose_best_index_lowest_index_triple_test() ->
    WithSomeColumns = fun(Idx) -> with_dummy_columns(Idx, 3) end,

    % - database name
    Index1 = WithSomeColumns(json_index(<<"db_a">>, <<"_design/c">>, <<"B">>)),
    Index2 = WithSomeColumns(json_index(<<"db_b">>, <<"_design/b">>, <<"C">>)),
    Index3 = WithSomeColumns(json_index(<<"db_c">>, <<"_design/a">>, <<"A">>)),
    IndexRanges1 =
        [
            {Index1, ranges1, 1},
            {Index2, ranges2, 1},
            {Index3, ranges3, 1}
        ],
    ?assertEqual({Index1, ranges1}, choose_best_index(IndexRanges1)),

    % - if that is equal, design document name
    Index4 = WithSomeColumns(json_index(<<"db_a">>, <<"_design/c">>, <<"B">>)),
    Index5 = WithSomeColumns(json_index(<<"db_a">>, <<"_design/b">>, <<"C">>)),
    Index6 = WithSomeColumns(json_index(<<"db_a">>, <<"_design/a">>, <<"A">>)),
    IndexRanges2 =
        [
            {Index4, ranges4, 1},
            {Index5, ranges5, 1},
            {Index6, ranges6, 1}
        ],
    ?assertEqual({Index6, ranges6}, choose_best_index(IndexRanges2)),

    % - otherwise, index name
    Index7 = WithSomeColumns(json_index(<<"db_a">>, <<"_design/a">>, <<"B">>)),
    Index8 = WithSomeColumns(json_index(<<"db_a">>, <<"_design/a">>, <<"C">>)),
    Index9 = WithSomeColumns(json_index(<<"db_a">>, <<"_design/a">>, <<"A">>)),
    IndexRanges3 =
        [
            {Index7, ranges7, 1},
            {Index8, ranges8, 1},
            {Index9, ranges9, 1}
        ],
    ?assertEqual({Index9, ranges9}, choose_best_index(IndexRanges3)).

json_index(DbName, DesignDoc, Name) ->
    #idx{
        dbname = DbName,
        ddoc = DesignDoc,
        name = Name,
        type = <<"json">>
    }.

with_dummy_columns(Index, Count) ->
    Columns =
        {[{<<"field", (integer_to_binary(I))/binary>>, undefined} || I <- lists:seq(1, Count)]},
    Index#idx{def = {[{<<"fields">>, Columns}]}}.

update_bookmark_keys_test() ->
    Cursor0 = #cursor{limit = 0},
    ?assertEqual(Cursor0, update_bookmark_keys(Cursor0, undefined)),
    Cursor1 = #cursor{limit = 1},
    Row = [{id, id}, {key, key}],
    UpdatedCursor1 = Cursor1#cursor{bookmark_docid = id, bookmark_key = key},
    ?assertEqual(UpdatedCursor1, update_bookmark_keys(Cursor1, Row)).
-endif.
