Update fabric_rpc to use couch_mrview

This modifies fabric_rpc to use couch_mrview for map views, reduce
views, and all docs queries. This removes the majority of view logic
from within fabric as it's better handled now in couch_mrview, and
actually provides pretty drastic decrease in logic in these
functions.

The {view,reduce}_cb functions are also updated to use the new line
format of couch_mrview, switching total_and_offset to meta and
updating the row callbacks as well.
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index cee3c55..d2e6486 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -27,38 +27,6 @@
 %% rpc endpoints
 %%  call to with_db will supply your M:F with a #db{} and then remaining args
 
-all_docs(DbName, #mrargs{keys=undefined} = QueryArgs) ->
-    {ok, Db} = get_or_create_db(DbName, []),
-    #mrargs{
-        start_key = StartKey,
-        start_key_docid = StartDocId,
-        end_key = EndKey,
-        end_key_docid = EndDocId,
-        limit = Limit,
-        skip = Skip,
-        include_docs = IncludeDocs,
-        direction = Dir,
-        inclusive_end = Inclusive,
-        extra = Extra
-    } = QueryArgs,
-    set_io_priority(DbName, Extra),
-    {ok, Total} = couch_db:get_doc_count(Db),
-    Acc0 = #view_acc{
-        db = Db,
-        include_docs = IncludeDocs,
-        conflicts = proplists:get_value(conflicts, Extra, false),
-        limit = Limit+Skip,
-        total_rows = Total
-    },
-    EndKeyType = if Inclusive -> end_key; true -> end_key_gt end,
-    Options = [
-        {dir, Dir},
-        {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end},
-        {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end}
-    ],
-    {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options),
-    final_response(Total, Acc#view_acc.offset).
-
 changes(DbName, #changes_args{} = Args, StartSeq) ->
     changes(DbName, [Args], StartSeq);
 changes(DbName, Options, StartSeq) ->
@@ -80,104 +48,20 @@
         rexi:reply(Error)
     end.
 
-map_view(DbName, DDoc, ViewName, QueryArgs) ->
+all_docs(DbName, #mrargs{keys=undefined} = Args) ->
     {ok, Db} = get_or_create_db(DbName, []),
-    #mrargs{
-        limit = Limit,
-        skip = Skip,
-        keys = Keys,
-        include_docs = IncludeDocs,
-        stale = Stale,
-        view_type = ViewType,
-        extra = Extra
-    } = QueryArgs,
-    set_io_priority(DbName, Extra),
-    {LastSeq, MinSeq} = calculate_seqs(Db, Stale),
-    Group0 = couch_view_group:design_doc_to_view_group(DDoc),
-    {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
-    {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
-    maybe_update_view_group(Pid, LastSeq, Stale),
-    erlang:monitor(process, couch_view_group:get_fd(Group)),
-    Views = couch_view_group:get_views(Group),
-    View = fabric_view:extract_view(Pid, ViewName, Views, ViewType),
-    {ok, Total} = couch_view:get_row_count(View),
-    Acc0 = #view_acc{
-        db = Db,
-        include_docs = IncludeDocs,
-        conflicts = proplists:get_value(conflicts, Extra, false),
-        limit = Limit+Skip,
-        total_rows = Total,
-        reduce_fun = fun couch_view:reduce_to_count/1
-    },
-    case Keys of
-    undefined ->
-        Options = couch_httpd_view:make_key_options(QueryArgs),
-        {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options);
-    _ ->
-        Acc = lists:foldl(fun(Key, AccIn) ->
-            KeyArgs = QueryArgs#mrargs{start_key=Key, end_key=Key},
-            Options = couch_httpd_view:make_key_options(KeyArgs),
-            {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn,
-                Options),
-            Out
-        end, Acc0, Keys)
-    end,
-    final_response(Total, Acc#view_acc.offset).
+    VAcc0 = #vacc{db=Db},
+    couch_mrview:query_all_docs(Db, Args, fun view_cb/2, VAcc0).
 
-reduce_view(DbName, #doc{} = DDoc, ViewName, QueryArgs) ->
-    Group = couch_view_group:design_doc_to_view_group(DDoc),
-    reduce_view(DbName, Group, ViewName, QueryArgs);
-reduce_view(DbName, Group0, ViewName, QueryArgs) ->
-    erlang:put(io_priority, {interactive, DbName}),
+map_view(DbName, DDoc, ViewName, Args) ->
     {ok, Db} = get_or_create_db(DbName, []),
-    #mrargs{
-        group_level = GroupLevel,
-        limit = Limit,
-        skip = Skip,
-        keys = Keys,
-        stale = Stale,
-        extra = Extra
-    } = QueryArgs,
-    set_io_priority(DbName, Extra),
-    GroupFun = group_rows_fun(GroupLevel),
-    {LastSeq, MinSeq} = calculate_seqs(Db, Stale),
-    {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
-    {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
-    maybe_update_view_group(Pid, LastSeq, Stale),
-    Lang = couch_view_group:get_language(Group),
-    Views = couch_view_group:get_views(Group),
-    erlang:monitor(process, couch_view_group:get_fd(Group)),
-    {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce),
-    ReduceView = {reduce, NthRed, Lang, View},
-    Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip},
-    case Keys of
-    undefined ->
-        Options0 = couch_httpd_view:make_key_options(QueryArgs),
-        Options = [{key_group_fun, GroupFun} | Options0],
-        couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options);
-    _ ->
-        lists:map(fun(Key) ->
-            KeyArgs = QueryArgs#mrargs{start_key=Key, end_key=Key},
-            Options0 = couch_httpd_view:make_key_options(KeyArgs),
-            Options = [{key_group_fun, GroupFun} | Options0],
-            couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options)
-        end, Keys)
-    end,
-    rexi:reply(complete).
+    VAcc0 = #vacc{db=Db},
+    couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0).
 
-calculate_seqs(Db, Stale) ->
-    LastSeq = couch_db:get_update_seq(Db),
-    if
-        Stale == ok orelse Stale == update_after ->
-            {LastSeq, 0};
-        true ->
-            {LastSeq, LastSeq}
-    end.
-
-maybe_update_view_group(GroupPid, LastSeq, update_after) ->
-    couch_view_group:trigger_group_update(GroupPid, LastSeq);
-maybe_update_view_group(_, _, _) ->
-    ok.
+reduce_view(DbName, DDoc, ViewName, Args) ->
+    {ok, Db} = get_or_create_db(DbName, []),
+    VAcc0 = #vacc{db=Db},
+    couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
 
 create_db(DbName) ->
     rexi:reply(case couch_server:create(DbName, []) of
@@ -252,9 +136,8 @@
     Docs = make_att_readers(Docs0),
     with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
 
-group_info(DbName, Group0) ->
-    {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
-    rexi:reply(couch_view_group:request_group_info(Pid)).
+group_info(DbName, DDocId) ->
+    with_db(DbName, [], {couch_mrview, get_info, [DDocId]}).
 
 reset_validation_funs(DbName) ->
     case get_or_create_db(DbName, []) of
@@ -294,109 +177,64 @@
         Else
     end.
 
-view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
-    % matches for _all_docs and translates #full_doc_info{} -> KV pair
-    case couch_doc:to_doc_info(FullDocInfo) of
-    #doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]} = DI ->
-        Value = {[{rev,couch_doc:rev_to_str(Rev)}]},
-        view_fold({{Id,Id}, Value}, OffsetReds, Acc#view_acc{doc_info=DI});
-    #doc_info{revs=[#rev_info{deleted=true}|_]} ->
-        {ok, Acc}
-    end;
-view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) ->
-    % calculates the offset for this shard
-    #view_acc{reduce_fun=Reduce} = Acc,
-    Offset = Reduce(OffsetReds),
-    case rexi:sync_reply({total_and_offset, Total, Offset}) of
-    ok ->
-        view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset});
-    stop ->
-        exit(normal);
-    timeout ->
-        exit(timeout)
-    end;
-view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) ->
-    % we scanned through limit+skip local rows
-    {stop, Acc};
-view_fold({{Key,Id}, Value}, _Offset, Acc) ->
-    % the normal case
-    #view_acc{
-        db = Db,
-        doc_info = DocInfo,
-        limit = Limit,
-        conflicts = Conflicts,
-        include_docs = IncludeDocs
-    } = Acc,
-    case Value of {Props} ->
-        LinkedDocs = (couch_util:get_value(<<"_id">>, Props) =/= undefined);
-    _ ->
-        LinkedDocs = false
-    end,
-    if LinkedDocs ->
-        % we'll embed this at a higher level b/c the doc may be non-local
-        Doc = undefined;
-    IncludeDocs ->
-        IdOrInfo = if DocInfo =/= nil -> DocInfo; true -> Id end,
-        Options = if Conflicts -> [conflicts]; true -> [] end,
-        case couch_db:open_doc(Db, IdOrInfo, Options) of
-        {not_found, deleted} ->
-            Doc = null;
-        {not_found, missing} ->
-            Doc = undefined;
-        {ok, Doc0} ->
-            Doc = couch_doc:to_json_obj(Doc0, [])
-        end;
-    true ->
-        Doc = undefined
-    end,
-    case rexi:stream(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of
+
+view_cb({meta, Meta}, Acc) ->
+    % Map function starting
+    case rexi:sync_reply({meta, Meta}) of
         ok ->
-            {ok, Acc#view_acc{limit=Limit-1}};
+            {ok, Acc};
+        stop ->
+            exit(normal);
         timeout ->
             exit(timeout)
-    end.
-
-final_response(Total, nil) ->
-    case rexi:sync_reply({total_and_offset, Total, Total}) of ok ->
-        rexi:reply(complete);
-    stop ->
-        ok;
-    timeout ->
-        exit(timeout)
     end;
-final_response(_Total, _Offset) ->
-    rexi:reply(complete).
-
-%% TODO: handle case of bogus group level
-group_rows_fun(exact) ->
-    fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end;
-group_rows_fun(0) ->
-    fun(_A, _B) -> true end;
-group_rows_fun(GroupLevel) when is_integer(GroupLevel) ->
-    fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) ->
-        lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel);
-    ({Key1,_}, {Key2,_}) ->
-        Key1 == Key2
-    end.
-
-reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) ->
-    {stop, Acc};
-reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) ->
-    send(null, Red, Acc);
-reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) ->
-    send(Key, Red, Acc);
-reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) ->
-    send(lists:sublist(K, I), Red, Acc);
-reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 ->
-    send(K, Red, Acc).
+view_cb({row, Row}, Acc) ->
+    % Adding another row
+    ViewRow = #view_row{
+        id = couch_util:get_value(id, Row),
+        key = couch_util:get_value(key, Row),
+        value = couch_util:get_value(value, Row),
+        doc = couch_util:get_value(doc, Row)
+    },
+    case rexi:stream(ViewRow) of
+        ok ->
+            {ok, Acc};
+        timeout ->
+            exit(timeout)
+    end;
+view_cb(complete, Acc) ->
+    % Finish view output
+    rexi:reply(complete),
+    {ok, Acc}.
 
 
-send(Key, Value, #view_acc{limit=Limit} = Acc) ->
+reduce_cb({meta, Meta}, Acc) ->
+    % Map function starting
+    case rexi:sync_reply({meta, Meta}) of
+        ok ->
+            {ok, Acc};
+        stop ->
+            exit(normal);
+        timeout ->
+            exit(timeout)
+    end;
+reduce_cb({row, Row}, Acc) ->
+    % Adding another row
+    Key = couch_util:get_value(key, Row),
+    Value = couch_util:get_value(value, Row),
+    send(Key, Value, Acc);
+reduce_cb(complete, Acc) ->
+    % Finish view output
+    rexi:reply(complete),
+    {ok, Acc}.
+
+
+send(Key, Value, Acc) ->
     case put(fabric_sent_first_row, true) of
     undefined ->
         case rexi:sync_reply(#view_row{key=Key, value=Value}) of
         ok ->
-            {ok, Acc#view_acc{limit=Limit-1}};
+            {ok, Acc};
         stop ->
             exit(normal);
         timeout ->
@@ -405,7 +243,7 @@
     true ->
         case rexi:stream(#view_row{key=Key, value=Value}) of
         ok ->
-            {ok, Acc#view_acc{limit=Limit-1}};
+            {ok, Acc};
         timeout ->
             exit(timeout)
         end
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index c922a7f..d2ea464 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -12,29 +12,19 @@
 
 -module(fabric_view_reduce).
 
--export([go/6]).
+-export([go/7]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
-go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
-    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
-    go(DbName, DDoc, View, Args, Callback, Acc0);
-
-go(DbName, DDoc, VName, Args, Callback, Acc0) ->
-    Group = couch_view_group:design_doc_to_view_group(DDoc),
-    Lang = couch_view_group:get_language(Group),
-    Views = couch_view_group:get_views(Group),
-    {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce),
-    {VName, RedSrc} = lists:nth(NthRed, View#mrview.reduce_funs),
-    Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
-        Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,Args]}),
-        Shard#shard{ref = Ref}
-    end, fabric_view:get_shards(DbName, Args)),
+go(DbName, DDoc, VName, Args, Callback, Acc0, {red, {_, Lang, _}, _}=VInfo) ->
+    Shards = fabric_view:get_shards(DbName, Args),
+    Workers = fabric_util:submit_jobs(Shards, reduce_view, [DDoc, VName, Args]),
+    RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
     RexiMon = fabric_util:create_monitors(Workers),
-    #mrargs{limit = Limit, skip = Skip} = Args,
+    #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args,
     OsProc = case os_proc_needed(RedSrc) of
         true -> couch_query_servers:get_os_process(Lang);
         _ -> nil
@@ -44,7 +34,7 @@
         query_args = Args,
         callback = Callback,
         counters = fabric_dict:init(Workers, 0),
-        keys = Args#mrargs.keys,
+        keys = Keys,
         skip = Skip,
         limit = Limit,
         lang = Lang,