After a rescan prevent checkpoints from a previous epoch
Fix race condition which happens on rescan: rescan
function resets all checkpoints for replicator databases.
However before new change feeds start processing all
documents from sequence 0, a checkpoint could
happen from an existing change feed, which would
effectively result in a range of documents being
skipped over.
Add an `epoch` ref to State. On rescan update
the epoch. Thread epoch through the change feed process
and callbacks, then only allow checkpoints from current
epoch.
JIRA: COUCHDB-2965
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 0d52d01..1848153 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -69,7 +69,8 @@
scan_pid = nil,
rep_start_pids = [],
max_retries,
- live = []
+ live = [],
+ epoch = nil
}).
start_link() ->
@@ -159,17 +160,19 @@
?DB_TO_SEQ = ets:new(?DB_TO_SEQ, [named_table, set, public]),
Server = self(),
ok = config:listen_for_changes(?MODULE, Server),
+ Epoch = make_ref(),
ScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
% Automatically start node local changes feed loop
ensure_rep_db_exists(<<"_replicator">>),
- Pid = start_changes_reader(<<"_replicator">>, 0),
+ Pid = start_changes_reader(<<"_replicator">>, 0, Epoch),
{ok, #state{
event_listener = start_event_listener(),
scan_pid = ScanPid,
max_retries = retries_value(
config:get("replicator", "max_replication_retry_count", "10")),
rep_start_pids = [Pid],
- live = Live
+ live = Live,
+ epoch = Epoch
}}.
handle_call({owner, RepId}, _From, State) ->
@@ -215,7 +218,13 @@
handle_call({rep_error, RepId, Error}, _From, State) ->
{reply, ok, replication_error(State, RepId, Error)};
-handle_call({rep_db_checkpoint, DbName, EndSeq}, _From, State) ->
+% Match changes epoch with the current epoch in the state.
+% New epoch ref is created on a full rescan. Change feeds have to
+% be replayed from the start to determine ownership in the new
+% cluster configuration and epoch is used to match & checkpoint
+% only changes from the current cluster configuration.
+handle_call({rep_db_checkpoint, DbName, EndSeq, Epoch}, _From,
+ #state{epoch = Epoch} = State) ->
Entry = case ets:lookup(?DB_TO_SEQ, DbName) of
[] ->
{DbName, EndSeq, false};
@@ -225,6 +234,10 @@
true = ets:insert(?DB_TO_SEQ, Entry),
{reply, ok, State};
+% Ignore checkpoints from previous epoch.
+handle_call({rep_db_checkpoint, _DbName, _EndSeq, _Epoch}, _From, State) ->
+ {reply, ok, State};
+
handle_call(Msg, From, State) ->
couch_log:error("Replication manager received unexpected call ~p from ~p",
[Msg, From]),
@@ -249,7 +262,7 @@
end,
true = ets:insert(?DB_TO_SEQ, {DbName, Since, false}),
ensure_rep_ddoc_exists(DbName),
- Pid = start_changes_reader(DbName, Since),
+ Pid = start_changes_reader(DbName, Since, State#state.epoch),
couch_log:debug("Scanning ~s from update_seq ~p", [DbName, Since]),
[{DbName, Pid} | Pids]
end,
@@ -288,7 +301,7 @@
case lists:keytake(From, 2, Pids) of
{value, {DbName, From}, NewPids} ->
if Reason == normal -> ok; true ->
- Fmt = "~s : Known replication pid ~w died :: ~w",
+ Fmt = "~s : Known replication or change feed pid ~w died :: ~w",
couch_log:error(Fmt, [?MODULE, From, Reason])
end,
NewState = State#state{rep_start_pids = NewPids},
@@ -343,10 +356,10 @@
{ok, State}.
-start_changes_reader(DbName, Since) ->
- spawn_link(?MODULE, changes_reader, [self(), DbName, Since]).
+start_changes_reader(DbName, Since, Epoch) ->
+ spawn_link(?MODULE, changes_reader, [{self(), Epoch}, DbName, Since]).
-changes_reader(Server, DbName, Since) ->
+changes_reader({Server, Epoch}, DbName, Since) ->
UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
DbOpenOptions = [{user_ctx, UserCtx}, sys_db],
{ok, Db} = couch_db:open_int(DbName, DbOpenOptions),
@@ -360,9 +373,9 @@
{json_req, null},
Db
),
- ChangesFeedFun({fun ?MODULE:changes_reader_cb/3, {Server, DbName}}).
+ ChangesFeedFun({fun ?MODULE:changes_reader_cb/3, {Server, DbName, Epoch}}).
-changes_reader_cb({change, Change, _}, _, {Server, DbName}) ->
+changes_reader_cb({change, Change, _}, _, {Server, DbName, Epoch}) ->
case has_valid_rep_id(Change) of
true ->
Msg = {rep_db_update, DbName, Change},
@@ -370,11 +383,11 @@
false ->
ok
end,
- {Server, DbName};
-changes_reader_cb({stop, EndSeq, _Pending}, _, {Server, DbName}) ->
+ {Server, DbName, Epoch};
+changes_reader_cb({stop, EndSeq, _Pending}, _, {Server, DbName, Epoch}) ->
Msg = {rep_db_checkpoint, DbName, EndSeq},
ok = gen_server:call(Server, Msg, infinity),
- {Server, DbName};
+ {Server, DbName, Epoch};
changes_reader_cb(_, _, Acc) ->
Acc.
@@ -424,8 +437,9 @@
rescan(#state{scan_pid = nil} = State) ->
true = ets:delete_all_objects(?DB_TO_SEQ),
Server = self(),
+ Epoch = make_ref(),
NewScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
- State#state{scan_pid = NewScanPid};
+ State#state{scan_pid = NewScanPid, epoch = Epoch};
rescan(#state{scan_pid = ScanPid} = State) ->
unlink(ScanPid),
exit(ScanPid, exit),