Support update_seq=true in reduce views
COUCHDB-2991
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 7b16a0d..a638ff7 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -58,7 +58,7 @@
end.
go2(DbName, Workers, {red, {_, Lang, View}, _}=VInfo, Args, Callback, Acc0) ->
- #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args,
+ #mrargs{limit = Limit, skip = Skip, keys = Keys, update_seq = UpdateSeq} = Args,
RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
OsProc = case os_proc_needed(RedSrc) of
true -> couch_query_servers:get_os_process(Lang);
@@ -77,7 +77,8 @@
reducer = RedSrc,
collation = couch_util:get_value(<<"collation">>, View#mrview.options),
rows = dict:new(),
- user_acc = Acc0
+ user_acc = Acc0,
+ update_seq = case UpdateSeq of true -> []; false -> nil end
},
try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
State, infinity, 1000 * 60 * 60) of
@@ -99,30 +100,42 @@
handle_message({rexi_EXIT, Reason}, Worker, State) ->
fabric_view:handle_worker_exit(State, Worker, Reason);
-%% HACK: this just sends meta once. Instead we should move the counter logic
-%% from the #view_row handle_message below into this function and and pass the
-%% meta call through maybe_send_row. This will also be more efficient doing it
-%% here as it's one less worker round trip reply.
-%% Prior to switching to couch_mrview, the fabric_view_reduce implementation
-%% did not get a total_and_offset call, whereas now we do. We now use this
-%% message as a clean way to indicate to couch_mrview_http:view_cb that the
-%% reduce response is starting.
-handle_message({meta, Meta}, {_Worker, From}, State) ->
- rexi:stream_ack(From),
-
+handle_message({meta, Meta0}, {Worker, From}, State) ->
+ Seq = couch_util:get_value(update_seq, Meta0, 0),
#collector{
callback = Callback,
- user_acc = AccIn
+ counters = Counters0,
+ user_acc = AccIn,
+ update_seq = UpdateSeq0
} = State,
-
- {Go, Acc} = case get(meta_sent) of
- undefined ->
- put(meta_sent, true),
- Callback({meta, Meta}, AccIn);
- _ ->
- {ok, AccIn}
+ % Assert that we don't have other messages from this
+ % worker when the total_and_offset message arrives.
+ 0 = fabric_dict:lookup_element(Worker, Counters0),
+ rexi:stream_ack(From),
+ Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ UpdateSeq = case UpdateSeq0 of
+ nil -> nil;
+ _ -> [{Worker, Seq} | UpdateSeq0]
end,
- {Go, State#collector{user_acc = Acc}};
+ case fabric_dict:any(0, Counters1) of
+ true ->
+ {ok, State#collector{
+ counters = Counters1,
+ update_seq = UpdateSeq
+ }};
+ false ->
+ Meta = case UpdateSeq of
+ nil ->
+ [];
+ _ ->
+ [{update_seq, fabric_view_changes:pack_seqs(UpdateSeq)}]
+ end,
+ {Go, Acc} = Callback({meta, Meta}, AccIn),
+ {Go, State#collector{
+ counters = fabric_dict:decrement_all(Counters1),
+ user_acc = Acc
+ }}
+ end;
handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
#collector{counters = Counters0, rows = Rows0} = State,