Allow specifying an end_key for fold_changes
This is useful so that read conflicts on the changes feed will
eventually be resolved. Without an end key specified a reader could end
up in an infinite conflict retry loop if there are clients updating
documents in the database.
diff --git a/src/fabric/src/fabric2_db.erl b/src/fabric/src/fabric2_db.erl
index b3e510b..a310470 100644
--- a/src/fabric/src/fabric2_db.erl
+++ b/src/fabric/src/fabric2_db.erl
@@ -1093,14 +1093,18 @@
end,
StartKey = get_since_seq(TxDb, Dir, SinceSeq),
- EndKey = case Dir of
- rev -> fabric2_util:seq_zero_vs();
- _ -> fabric2_util:seq_max_vs()
+ EndKey = case fabric2_util:get_value(end_key, Options) of
+ undefined when Dir == rev ->
+ fabric2_util:seq_zero_vs();
+ undefined ->
+ fabric2_util:seq_max_vs();
+ EK when is_binary(EK) ->
+ fabric2_fdb:seq_to_vs(EK);
+ EK when is_tuple(EK), element(1, EK) == versionstamp ->
+ EK
end,
- FoldOpts = [
- {start_key, StartKey},
- {end_key, EndKey}
- ] ++ RestartTx ++ Options,
+ BaseOpts = [{start_key, StartKey}] ++ RestartTx ++ Options,
+ FoldOpts = lists:keystore(end_key, 1, BaseOpts, {end_key, EndKey}),
{ok, fabric2_fdb:fold_range(TxDb, Prefix, fun({K, V}, Acc) ->
{SeqVS} = erlfdb_tuple:unpack(K, Prefix),
diff --git a/src/fabric/test/fabric2_changes_fold_tests.erl b/src/fabric/test/fabric2_changes_fold_tests.erl
index 8541d97..fa79f25 100644
--- a/src/fabric/test/fabric2_changes_fold_tests.erl
+++ b/src/fabric/test/fabric2_changes_fold_tests.erl
@@ -40,6 +40,7 @@
?TDEF_FE(fold_changes_basic_rev),
?TDEF_FE(fold_changes_since_now_rev),
?TDEF_FE(fold_changes_since_seq_rev),
+ ?TDEF_FE(fold_changes_with_end_key),
?TDEF_FE(fold_changes_basic_tx_too_old),
?TDEF_FE(fold_changes_reverse_tx_too_old),
?TDEF_FE(fold_changes_tx_too_old_with_single_row_emits),
@@ -124,6 +125,16 @@
fold_changes_since_seq_rev({Db, RestRows}).
+fold_changes_with_end_key({Db, DocRows}) ->
+ lists:foldl(fun(DocRow, Acc) ->
+ EndSeq = maps:get(sequence, DocRow),
+ Changes = changes(Db, 0, [{end_key, EndSeq}]),
+ NewAcc = [DocRow | Acc],
+ ?assertEqual(Changes, NewAcc),
+ NewAcc
+ end, [], DocRows).
+
+
fold_changes_basic_tx_too_old({Db, DocRows0}) ->
DocRows = lists:reverse(DocRows0),