return packed update_seq (WIP)
diff --git a/include/fabric.hrl b/include/fabric.hrl
index abbc4ad..eb7858e 100644
--- a/include/fabric.hrl
+++ b/include/fabric.hrl
@@ -29,7 +29,8 @@
reducer,
lang,
sorted,
- user_acc
+ user_acc,
+ update_seq
}).
-record(stream_acc, {
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index 1977888..e9d8db7 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -64,6 +64,7 @@
query_args = Args,
callback = Callback,
counters = fabric_dict:init(Workers, 0),
+ update_seq = fabric_dict:init(Workers, nil),
skip = Skip,
limit = Limit,
keys = fabric_view:keydict(Keys),
@@ -89,30 +90,35 @@
handle_message({meta, Meta0}, {Worker, From}, State) ->
Tot = couch_util:get_value(total, Meta0, 0),
Off = couch_util:get_value(offset, Meta0, 0),
+ UpdateSeq = couch_util:get_value(update_seq, Meta0, 0),
#collector{
callback = Callback,
counters = Counters0,
total_rows = Total0,
offset = Offset0,
- user_acc = AccIn
+ user_acc = AccIn,
+ update_seq = UpdateSeq0
} = State,
% Assert that we don't have other messages from this
% worker when the total_and_offset message arrives.
0 = fabric_dict:lookup_element(Worker, Counters0),
rexi:stream_ack(From),
Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+ UpdateSeq1 = fabric_dict:store(Worker, UpdateSeq, UpdateSeq0),
Total = Total0 + Tot,
Offset = Offset0 + Off,
case fabric_dict:any(0, Counters1) of
true ->
{ok, State#collector{
counters = Counters1,
+ update_seq = UpdateSeq1,
total_rows = Total,
offset = Offset
}};
false ->
FinalOffset = erlang:min(Total, Offset+State#collector.skip),
- Meta = [{total, Total}, {offset, FinalOffset}],
+ Meta = [{total, Total}, {offset, FinalOffset},
+ {update_seq, fabric_view_changes:pack_seqs(UpdateSeq1)}],
{Go, Acc} = Callback({meta, Meta}, AccIn),
{Go, State#collector{
counters = fabric_dict:decrement_all(Counters1),