Add ability to gracefully terminate existing changes feeds
Calling fabric:end_changes() will cause all current changes
feeds to gracefully exit without preventing new changes requests from
starting (unlike maintenance_mode).
BugzID: 45762
This is a cherry-pick of:
https://github.com/cloudant/fabric/commit/1b45cede8f11e209f28e3d06b9fda4cbdcd719cc
Conflicts:
src/fabric.erl
src/fabric_view_changes.erl
diff --git a/src/fabric.erl b/src/fabric.erl
index 25205f8..26b9f62 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -29,7 +29,7 @@
% Views
-export([all_docs/4, all_docs/5, changes/4, query_view/3, query_view/4,
- query_view/6, get_view_group_info/2]).
+ query_view/6, get_view_group_info/2, end_changes/0]).
% miscellany
-export([design_docs/1, reset_validation_funs/1, cleanup_index_files/0,
@@ -362,6 +362,10 @@
get_view_group_info(DbName, DesignId) ->
fabric_group_info:go(dbname(DbName), design_doc(DesignId)).
+-spec end_changes() -> ok.
+end_changes() ->
+ fabric_view_changes:increment_changes_epoch().
+
%% @doc retrieve all the design docs from a database
-spec design_docs(dbname()) -> {ok, [json_obj()]}.
design_docs(DbName) ->
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 92d08e7..7e6666c 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -13,6 +13,7 @@
-module(fabric_view_changes).
-export([go/5, pack_seqs/1, unpack_seqs/2]).
+-export([increment_changes_epoch/0]).
%% exported for upgrade purposes.
-export([keep_sending_changes/8]).
@@ -37,6 +38,7 @@
UpdateListener = {spawn_link(fabric_db_update_listener, go,
[Parent, Ref, DbName, Timeout]),
Ref},
+ put(changes_epoch, get_changes_epoch()),
try
keep_sending_changes(
DbName,
@@ -86,8 +88,9 @@
} = Collector,
LastSeq = pack_seqs(NewSeqs),
MaintenanceMode = config:get("couchdb", "maintenance_mode"),
+ NewEpoch = get_changes_epoch() > erlang:get(changes_epoch),
if Limit > Limit2, Feed == "longpoll";
- MaintenanceMode == "true"; MaintenanceMode == "nolb" ->
+ MaintenanceMode == "true"; MaintenanceMode == "nolb"; NewEpoch ->
Callback({stop, LastSeq, pending_count(Offset)}, AccOut);
true ->
WaitForUpdate = wait_db_updated(UpListen),
@@ -459,6 +462,18 @@
{error, {bad_request, Reason}}
end.
+get_changes_epoch() ->
+ case application:get_env(fabric, changes_epoch) of
+ undefined ->
+ increment_changes_epoch(),
+ get_changes_epoch();
+ {ok, Epoch} ->
+ Epoch
+ end.
+
+increment_changes_epoch() ->
+ application:set_env(fabric, changes_epoch, os:timestamp()).
+
unpack_seqs_test() ->
meck:new(mem3),
meck:new(fabric_view),