Support update_seq=true in reduce views

COUCHDB-2991
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 7b16a0d..a638ff7 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -58,7 +58,7 @@
     end.
 
 go2(DbName, Workers, {red, {_, Lang, View}, _}=VInfo, Args, Callback, Acc0) ->
-    #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args,
+    #mrargs{limit = Limit, skip = Skip, keys = Keys, update_seq = UpdateSeq} = Args,
     RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
     OsProc = case os_proc_needed(RedSrc) of
         true -> couch_query_servers:get_os_process(Lang);
@@ -77,7 +77,8 @@
         reducer = RedSrc,
         collation = couch_util:get_value(<<"collation">>, View#mrview.options),
         rows = dict:new(),
-        user_acc = Acc0
+        user_acc = Acc0,
+        update_seq = case UpdateSeq of true -> []; false -> nil end
     },
     try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
         State, infinity, 1000 * 60 * 60) of
@@ -99,30 +100,42 @@
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
     fabric_view:handle_worker_exit(State, Worker, Reason);
 
-%% HACK: this just sends meta once. Instead we should move the counter logic
-%% from the #view_row handle_message below into this function and and pass the
-%% meta call through maybe_send_row. This will also be more efficient doing it
-%% here as it's one less worker round trip reply.
-%% Prior to switching to couch_mrview, the fabric_view_reduce implementation
-%% did not get a total_and_offset call, whereas now we do. We now use this
-%% message as a clean way to indicate to couch_mrview_http:view_cb that the
-%% reduce response is starting.
-handle_message({meta, Meta}, {_Worker, From}, State) ->
-    rexi:stream_ack(From),
-
+handle_message({meta, Meta0}, {Worker, From}, State) ->
+    Seq = couch_util:get_value(update_seq, Meta0, 0),
     #collector{
         callback = Callback,
-        user_acc = AccIn
+        counters = Counters0,
+        user_acc = AccIn,
+        update_seq = UpdateSeq0
     } = State,
-
-    {Go, Acc} = case get(meta_sent) of
-        undefined ->
-            put(meta_sent, true),
-            Callback({meta, Meta}, AccIn);
-        _ ->
-            {ok, AccIn}
+    % Assert that we don't have other messages from this
+    % worker when the total_and_offset message arrives.
+    0 = fabric_dict:lookup_element(Worker, Counters0),
+    rexi:stream_ack(From),
+    Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    UpdateSeq = case UpdateSeq0 of
+        nil -> nil;
+        _   -> [{Worker, Seq} | UpdateSeq0]
     end,
-    {Go, State#collector{user_acc = Acc}};
+    case fabric_dict:any(0, Counters1) of
+    true ->
+        {ok, State#collector{
+            counters = Counters1,
+            update_seq = UpdateSeq
+        }};
+    false ->
+        Meta = case UpdateSeq of
+            nil ->
+                [];
+            _ ->
+                [{update_seq, fabric_view_changes:pack_seqs(UpdateSeq)}]
+        end,
+        {Go, Acc} = Callback({meta, Meta}, AccIn),
+        {Go, State#collector{
+            counters = fabric_dict:decrement_all(Counters1),
+            user_acc = Acc
+        }}
+    end;
 
 handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
     #collector{counters = Counters0, rows = Rows0} = State,