mango: extend execution statistics with keys examined (#4569)

Add another field to the shard-level Mango execution statistics
to keep track of the count of keys that were examined for the
query.  Note that this requires to change the way how stats are
stored -- an approach similar to that of the view callback
arguments was chosen, which features a map.

This current version supports both the old and new formats.  The
coordinator may request getting the results in the new one by
adding `execution_stats_map` for the arguments of the view
callback.  Otherwise the old format is used (without the extra
field), which makes it possible to work with older coordinators.
Old workers will automatically ignore this argument and answer in
the old format.
diff --git a/src/docs/src/api/database/find.rst b/src/docs/src/api/database/find.rst
index 027ddf8..ede5598 100644
--- a/src/docs/src/api/database/find.rst
+++ b/src/docs/src/api/database/find.rst
@@ -145,7 +145,7 @@
                 }
             ],
             "execution_stats": {
-                "total_keys_examined": 0,
+                "total_keys_examined": 200,
                 "total_docs_examined": 200,
                 "total_quorum_docs_examined": 0,
                 "results_returned": 2,
@@ -925,7 +925,6 @@
 | Field                          | Description                                |
 +================================+============================================+
 | ``total_keys_examined``        | Number of index keys examined.             |
-|                                | Currently always 0.                        |
 +--------------------------------+--------------------------------------------+
 | ``total_docs_examined``        | Number of documents fetched from the       |
 |                                | database / index, equivalent to using      |
diff --git a/src/mango/src/mango.hrl b/src/mango/src/mango.hrl
index 2ff07aa..d8fa095 100644
--- a/src/mango/src/mango.hrl
+++ b/src/mango/src/mango.hrl
@@ -30,6 +30,14 @@
 -type selector() :: any().
 -type ejson() :: {[{atom(), any()}]}.
 
--type shard_stats() :: {docs_examined, non_neg_integer()}.
+-type shard_stats() :: shard_stats_v1() | shard_stats_v2().
+
+-type shard_stats_v1() :: {docs_examined, non_neg_integer()}.
+-type shard_stats_v2() ::
+    #{
+         docs_examined => non_neg_integer(),
+         keys_examined => non_neg_integer()
+    }.
+
 -type row_property_key() :: id | key | value | doc.
 -type row_properties() :: [{row_property_key(), any()}].
diff --git a/src/mango/src/mango_cursor_view.erl b/src/mango/src/mango_cursor_view.erl
index d5cffbc..e044c56 100644
--- a/src/mango/src/mango_cursor_view.erl
+++ b/src/mango/src/mango_cursor_view.erl
@@ -79,6 +79,15 @@
 viewcbargs_get(covering_index, Args) when is_map(Args) ->
     maps:get(covering_index, Args, undefined).
 
+-spec shard_stats_get(Key, Args) -> Stat when
+    Key :: docs_examined | keys_examined,
+    Args :: shard_stats_v2(),
+    Stat :: non_neg_integer().
+shard_stats_get(docs_examined, Args) when is_map(Args) ->
+    maps:get(docs_examined, Args, 0);
+shard_stats_get(keys_examined, Args) when is_map(Args) ->
+    maps:get(keys_examined, Args, 0).
+
 -spec create(Db, Indexes, Selector, Options) -> {ok, #cursor{}} when
     Db :: database(),
     Indexes :: [#idx{}],
@@ -187,7 +196,12 @@
             {selector, Selector},
             {callback_args, viewcbargs_new(Selector, Fields, undefined)},
 
-            {ignore_partition_query_limit, true}
+            {ignore_partition_query_limit, true},
+
+            % Request execution statistics in a map.  The purpose of this option is
+            % to maintain interoperability on version upgrades.
+            % TODO remove this option in a later version.
+            {execution_stats_map, true}
         ]
     }.
 
@@ -326,11 +340,13 @@
     (ok, ddoc_updated) -> any().
 view_cb({meta, Meta}, Acc) ->
     % Map function starting
-    put(mango_docs_examined, 0),
+    mango_execution_stats:shard_init(),
     set_mango_msg_timestamp(),
     ok = rexi:stream2({meta, Meta}),
     {ok, Acc};
 view_cb({row, Row}, #mrargs{extra = Options} = Acc) ->
+    mango_execution_stats:shard_incr_keys_examined(),
+    couch_stats:increment_counter([mango, keys_examined]),
     ViewRow = #view_row{
         id = couch_util:get_value(id, Row),
         key = couch_util:get_value(key, Row),
@@ -379,14 +395,23 @@
             ok = rexi:stream2(ViewRow),
             set_mango_msg_timestamp();
         {Doc, _} ->
-            put(mango_docs_examined, get(mango_docs_examined) + 1),
+            mango_execution_stats:shard_incr_docs_examined(),
             couch_stats:increment_counter([mango, docs_examined]),
             Process(Doc)
     end,
     {ok, Acc};
-view_cb(complete, Acc) ->
+view_cb(complete, #mrargs{extra = Options} = Acc) ->
+    ShardStats = mango_execution_stats:shard_get_stats(),
+    Stats =
+        case couch_util:get_value(execution_stats_map, Options, false) of
+            true ->
+                ShardStats;
+            false ->
+                DocsExamined = maps:get(docs_examined, ShardStats),
+                {docs_examined, DocsExamined}
+        end,
     % Send shard-level execution stats
-    ok = rexi:stream2({execution_stats, {docs_examined, get(mango_docs_examined)}}),
+    ok = rexi:stream2({execution_stats, Stats}),
     % Finish view output
     ok = rexi:stream_last(complete),
     {ok, Acc};
@@ -459,12 +484,20 @@
             couch_log:error("~s :: Error loading doc: ~p", [?MODULE, Error]),
             {ok, Cursor}
     end;
-handle_message({execution_stats, ShardStats}, #cursor{execution_stats = Stats} = Cursor) ->
-    {docs_examined, DocsExamined} = ShardStats,
-    Cursor1 = Cursor#cursor{
+handle_message({execution_stats, {docs_examined, DocsExamined}}, Cursor0) ->
+    #cursor{execution_stats = Stats} = Cursor0,
+    Cursor = Cursor0#cursor{
         execution_stats = mango_execution_stats:incr_docs_examined(Stats, DocsExamined)
     },
-    {ok, Cursor1};
+    {ok, Cursor};
+handle_message({execution_stats, #{} = ShardStats}, Cursor0) ->
+    DocsExamined = shard_stats_get(docs_examined, ShardStats),
+    KeysExamined = shard_stats_get(keys_examined, ShardStats),
+    #cursor{execution_stats = Stats0} = Cursor0,
+    Stats1 = mango_execution_stats:incr_docs_examined(Stats0, DocsExamined),
+    Stats = mango_execution_stats:incr_keys_examined(Stats1, KeysExamined),
+    Cursor = Cursor0#cursor{execution_stats = Stats},
+    {ok, Cursor};
 handle_message(complete, Cursor) ->
     {ok, Cursor};
 handle_message({error, Reason}, _Cursor) ->
@@ -702,7 +735,8 @@
                 fields => Fields,
                 covering_index => undefined
             }},
-            {ignore_partition_query_limit, true}
+            {ignore_partition_query_limit, true},
+            {execution_stats_map, true}
         ],
     MRArgs =
         #mrargs{
@@ -945,6 +979,7 @@
         [
             ?TDEF_FE(t_execute_empty),
             ?TDEF_FE(t_execute_ok_all_docs),
+            ?TDEF_FE(t_execute_ok_all_docs_with_execution_stats),
             ?TDEF_FE(t_execute_ok_query_view),
             ?TDEF_FE(t_execute_error)
         ]
@@ -997,7 +1032,8 @@
                 fields => Fields,
                 covering_index => undefined
             }},
-            {ignore_partition_query_limit, true}
+            {ignore_partition_query_limit, true},
+            {execution_stats_map, true}
         ],
     Args =
         #mrargs{
@@ -1060,7 +1096,8 @@
                 fields => Fields,
                 covering_index => undefined
             }},
-            {ignore_partition_query_limit, true}
+            {ignore_partition_query_limit, true},
+            {execution_stats_map, true}
         ],
     Args =
         #mrargs{
@@ -1084,6 +1121,79 @@
     ?assertEqual({ok, updated_accumulator}, execute(Cursor, fun foo:bar/2, accumulator)),
     ?assert(meck:called(fabric, query_view, '_')).
 
+t_execute_ok_all_docs_with_execution_stats(_) ->
+    Bookmark = bookmark,
+    Stats =
+        {[
+            {total_keys_examined, 0},
+            {total_docs_examined, 0},
+            {total_quorum_docs_examined, 0},
+            {results_returned, 0},
+            {execution_time_ms, '_'}
+        ]},
+    UserFnDefinition =
+        [
+            {[{add_key, bookmark, Bookmark}, accumulator], {undefined, updated_accumulator1}},
+            {
+                [{add_key, execution_stats, Stats}, updated_accumulator1],
+                {undefined, updated_accumulator2}
+            }
+        ],
+    meck:expect(foo, bar, UserFnDefinition),
+    Index = #idx{type = <<"json">>, def = all_docs},
+    Selector = {[]},
+    Fields = all_fields,
+    Cursor =
+        #cursor{
+            index = Index,
+            db = db,
+            selector = Selector,
+            fields = Fields,
+            ranges = [{'$gte', start_key, '$lte', end_key}],
+            opts = [{user_ctx, user_ctx}, {execution_stats, true}],
+            bookmark = nil
+        },
+    Cursor1 =
+        Cursor#cursor{
+            user_acc = accumulator,
+            user_fun = fun foo:bar/2,
+            execution_stats = '_'
+        },
+    Cursor2 =
+        Cursor1#cursor{
+            bookmark = Bookmark,
+            bookmark_docid = undefined,
+            bookmark_key = undefined,
+            execution_stats = #execution_stats{executionStartTime = {0, 0, 0}}
+        },
+    Extra =
+        [
+            {callback, {mango_cursor_view, view_cb}},
+            {selector, Selector},
+            {callback_args, #{
+                selector => Selector,
+                fields => Fields,
+                covering_index => undefined
+            }},
+            {ignore_partition_query_limit, true},
+            {execution_stats_map, true}
+        ],
+    Args =
+        #mrargs{
+            view_type = map,
+            reduce = false,
+            start_key = [start_key],
+            end_key = [end_key, ?MAX_JSON_OBJ],
+            include_docs = true,
+            extra = Extra
+        },
+    Parameters = [
+        db, [{user_ctx, user_ctx}], fun mango_cursor_view:handle_all_docs_message/2, Cursor1, Args
+    ],
+    meck:expect(fabric, all_docs, Parameters, meck:val({ok, Cursor2})),
+    ?assertEqual({ok, updated_accumulator2}, execute(Cursor, fun foo:bar/2, accumulator)),
+    ?assert(meck:called(fabric, all_docs, '_')).
+
 t_execute_error(_) ->
     Cursor =
         #cursor{
@@ -1119,7 +1229,8 @@
             ?TDEF_FE(t_view_cb_row_matching_covered_doc),
             ?TDEF_FE(t_view_cb_row_non_matching_covered_doc),
             ?TDEF_FE(t_view_cb_row_backwards_compatible),
-            ?TDEF_FE(t_view_cb_complete),
+            ?TDEF_FE(t_view_cb_complete_shard_stats_v1),
+            ?TDEF_FE(t_view_cb_complete_shard_stats_v2),
             ?TDEF_FE(t_view_cb_ok)
         ]
     }.
@@ -1143,7 +1254,7 @@
                 }}
             ]
         },
-    put(mango_docs_examined, 0),
+    mango_execution_stats:shard_init(),
     ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
     ?assert(meck:called(rexi, stream2, '_')).
 
@@ -1161,7 +1272,7 @@
                 }}
             ]
         },
-    put(mango_docs_examined, 0),
+    mango_execution_stats:shard_init(),
     put(mango_last_msg_timestamp, os:timestamp()),
     ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
     ?assertNot(meck:called(rexi, stream2, '_')).
@@ -1179,6 +1290,7 @@
                 }}
             ]
         },
+    mango_execution_stats:shard_init(),
     put(mango_last_msg_timestamp, os:timestamp()),
     ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
     ?assertNot(meck:called(rexi, stream2, '_')).
@@ -1197,6 +1309,7 @@
                 }}
             ]
         },
+    mango_execution_stats:shard_init(),
     ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
     ?assert(meck:called(rexi, stream2, '_')).
 
@@ -1222,6 +1335,7 @@
                 }}
             ]
         },
+    mango_execution_stats:shard_init(),
     ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
     ?assert(meck:called(rexi, stream2, '_')).
 
@@ -1244,6 +1358,7 @@
                 }}
             ]
         },
+    mango_execution_stats:shard_init(),
     put(mango_last_msg_timestamp, os:timestamp()),
     ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
     ?assertNot(meck:called(rexi, stream2, '_')).
@@ -1252,14 +1367,27 @@
     Row = [{id, id}, {key, key}, {doc, null}],
     meck:expect(rexi, stream2, ['_'], undefined),
     Accumulator = #mrargs{extra = [{selector, {[]}}]},
+    mango_execution_stats:shard_init(),
     put(mango_last_msg_timestamp, os:timestamp()),
     ?assertEqual({ok, Accumulator}, view_cb({row, Row}, Accumulator)),
     ?assertNot(meck:called(rexi, stream2, '_')).
 
-t_view_cb_complete(_) ->
+t_view_cb_complete_shard_stats_v1(_) ->
     meck:expect(rexi, stream2, [{execution_stats, {docs_examined, '_'}}], meck:val(ok)),
     meck:expect(rexi, stream_last, [complete], meck:val(ok)),
-    ?assertEqual({ok, accumulator}, view_cb(complete, accumulator)),
+    Accumulator = #mrargs{},
+    mango_execution_stats:shard_init(),
+    ?assertEqual({ok, Accumulator}, view_cb(complete, Accumulator)),
+    ?assert(meck:called(rexi, stream2, '_')),
+    ?assert(meck:called(rexi, stream_last, '_')).
+
+t_view_cb_complete_shard_stats_v2(_) ->
+    ShardStats = #{docs_examined => '_', keys_examined => '_'},
+    meck:expect(rexi, stream2, [{execution_stats, ShardStats}], meck:val(ok)),
+    meck:expect(rexi, stream_last, [complete], meck:val(ok)),
+    Accumulator = #mrargs{extra = [{execution_stats_map, true}]},
+    mango_execution_stats:shard_init(),
+    ?assertEqual({ok, Accumulator}, view_cb(complete, Accumulator)),
     ?assert(meck:called(rexi, stream2, '_')),
     ?assert(meck:called(rexi, stream_last, '_')).
 
@@ -1323,7 +1451,8 @@
             ?TDEF_FE(t_handle_message_row_ok_triggers_quorum_fetch_no_match),
             ?TDEF_FE(t_handle_message_row_no_match),
             ?TDEF_FE(t_handle_message_row_error),
-            ?TDEF_FE(t_handle_message_execution_stats),
+            ?TDEF_FE(t_handle_message_execution_stats_v1),
+            ?TDEF_FE(t_handle_message_execution_stats_v2),
             ?TDEF_FE(t_handle_message_complete),
             ?TDEF_FE(t_handle_message_error)
         ]
@@ -1455,7 +1584,7 @@
     meck:delete(mango_util, defer, 3),
     meck:delete(couch_log, error, 2).
 
-t_handle_message_execution_stats(_) ->
+t_handle_message_execution_stats_v1(_) ->
     ShardStats = {docs_examined, 42},
     ExecutionStats = #execution_stats{totalDocsExamined = 11},
     ExecutionStats1 = #execution_stats{totalDocsExamined = 53},
@@ -1463,6 +1592,14 @@
     Cursor1 = #cursor{execution_stats = ExecutionStats1},
     ?assertEqual({ok, Cursor1}, handle_message({execution_stats, ShardStats}, Cursor)).
 
+t_handle_message_execution_stats_v2(_) ->
+    ShardStats = #{docs_examined => 42, keys_examined => 53},
+    ExecutionStats = #execution_stats{totalDocsExamined = 11, totalKeysExamined = 22},
+    ExecutionStats1 = #execution_stats{totalDocsExamined = 53, totalKeysExamined = 75},
+    Cursor = #cursor{execution_stats = ExecutionStats},
+    Cursor1 = #cursor{execution_stats = ExecutionStats1},
+    ?assertEqual({ok, Cursor1}, handle_message({execution_stats, ShardStats}, Cursor)).
+
 t_handle_message_complete(_) ->
     ?assertEqual({ok, cursor}, handle_message(complete, cursor)).
 
diff --git a/src/mango/src/mango_execution_stats.erl b/src/mango/src/mango_execution_stats.erl
index 66104e8..350b58b 100644
--- a/src/mango/src/mango_execution_stats.erl
+++ b/src/mango/src/mango_execution_stats.erl
@@ -15,7 +15,7 @@
 -export([
     to_json/1,
     to_map/1,
-    incr_keys_examined/1,
+    incr_keys_examined/2,
     incr_docs_examined/1,
     incr_docs_examined/2,
     incr_quorum_docs_examined/1,
@@ -23,11 +23,18 @@
     log_start/1,
     log_end/1,
     log_stats/1,
-    maybe_add_stats/4
+    maybe_add_stats/4,
+    shard_init/0,
+    shard_incr_keys_examined/0,
+    shard_incr_docs_examined/0,
+    shard_get_stats/0
 ]).
 
+-include("mango.hrl").
 -include("mango_cursor.hrl").
 
+-define(SHARD_STATS_KEY, mango_shard_execution_stats).
+
 to_json(Stats) ->
     {[
         {total_keys_examined, Stats#execution_stats.totalKeysExamined},
@@ -46,9 +53,9 @@
         execution_time_ms => Stats#execution_stats.executionTimeMs
     }.
 
-incr_keys_examined(Stats) ->
+incr_keys_examined(Stats, N) ->
     Stats#execution_stats{
-        totalKeysExamined = Stats#execution_stats.totalKeysExamined + 1
+        totalKeysExamined = Stats#execution_stats.totalKeysExamined + N
     }.
 
 incr_docs_examined(Stats) ->
@@ -106,3 +113,30 @@
     Nonce = list_to_binary(couch_log_util:get_msg_id()),
     MStats1 = MStats0#{nonce => Nonce},
     couch_log:report("mango-stats", MStats1).
+
+-spec shard_init() -> any().
+shard_init() ->
+    InitialState = #{docs_examined => 0, keys_examined => 0},
+    put(?SHARD_STATS_KEY, InitialState).
+
+-spec shard_incr_keys_examined() -> any().
+shard_incr_keys_examined() ->
+    incr(keys_examined).
+
+-spec shard_incr_docs_examined() -> any().
+shard_incr_docs_examined() ->
+    incr(docs_examined).
+
+-spec incr(atom()) -> any().
+incr(Key) ->
+    case get(?SHARD_STATS_KEY) of
+        #{} = Stats0 ->
+            Stats = maps:update_with(Key, fun(X) -> X + 1 end, Stats0),
+            put(?SHARD_STATS_KEY, Stats);
+        _ ->
+            ok
+    end.
+
+-spec shard_get_stats() -> shard_stats_v2().
+shard_get_stats() ->
+    get(?SHARD_STATS_KEY).
diff --git a/src/mango/test/15-execution-stats-test.py b/src/mango/test/15-execution-stats-test.py
index 537a19a..a8f9961 100644
--- a/src/mango/test/15-execution-stats-test.py
+++ b/src/mango/test/15-execution-stats-test.py
@@ -20,7 +20,7 @@
     def test_simple_json_index(self):
         resp = self.db.find({"age": {"$lt": 35}}, return_raw=True, executionStats=True)
         self.assertEqual(len(resp["docs"]), 3)
-        self.assertEqual(resp["execution_stats"]["total_keys_examined"], 0)
+        self.assertEqual(resp["execution_stats"]["total_keys_examined"], 3)
         self.assertEqual(resp["execution_stats"]["total_docs_examined"], 3)
         self.assertEqual(resp["execution_stats"]["total_quorum_docs_examined"], 0)
         self.assertEqual(resp["execution_stats"]["results_returned"], 3)
@@ -38,7 +38,7 @@
             {"age": {"$lt": 35}}, return_raw=True, r=3, executionStats=True
         )
         self.assertEqual(len(resp["docs"]), 3)
-        self.assertEqual(resp["execution_stats"]["total_keys_examined"], 0)
+        self.assertEqual(resp["execution_stats"]["total_keys_examined"], 3)
         self.assertEqual(resp["execution_stats"]["total_docs_examined"], 0)
         self.assertEqual(resp["execution_stats"]["total_quorum_docs_examined"], 3)
         self.assertEqual(resp["execution_stats"]["results_returned"], 3)
@@ -60,6 +60,19 @@
         self.assertEqual(resp["execution_stats"]["total_docs_examined"], 3)
         self.assertEqual(resp["execution_stats"]["results_returned"], 0)
 
+    def test_covering_json_index(self):
+        resp = self.db.find(
+            {"age": {"$lt": 35}},
+            fields=["_id", "age"],
+            return_raw=True,
+            executionStats=True,
+        )
+        self.assertEqual(len(resp["docs"]), 3)
+        self.assertEqual(resp["execution_stats"]["total_keys_examined"], 3)
+        self.assertEqual(resp["execution_stats"]["total_docs_examined"], 0)
+        self.assertEqual(resp["execution_stats"]["total_quorum_docs_examined"], 0)
+        self.assertEqual(resp["execution_stats"]["results_returned"], 3)
+
 
 @unittest.skipUnless(mango.has_text_service(), "requires text service")
 class ExecutionStatsTests_Text(mango.UserDocsTextTests):