After a rescan prevent checkpoints from a previous epoch

Fix race condition which happens on rescan: rescan
function resets all checkpoints for replicator databases.
However before new change feeds start processing all
documents from sequence 0, a checkpoint could
happen from an existing change feed, which would
effectively result in a range of documents being
skipped over.

Add an `epoch` ref to State. On rescan update
the epoch. Thread epoch through the change feed process
and callbacks, then only allow checkpoints from current
epoch.

JIRA: COUCHDB-2965
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 0d52d01..1848153 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -69,7 +69,8 @@
     scan_pid = nil,
     rep_start_pids = [],
     max_retries,
-    live = []
+    live = [],
+    epoch = nil
 }).
 
 start_link() ->
@@ -159,17 +160,19 @@
     ?DB_TO_SEQ = ets:new(?DB_TO_SEQ, [named_table, set, public]),
     Server = self(),
     ok = config:listen_for_changes(?MODULE, Server),
+    Epoch = make_ref(),
     ScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
     % Automatically start node local changes feed loop
     ensure_rep_db_exists(<<"_replicator">>),
-    Pid = start_changes_reader(<<"_replicator">>, 0),
+    Pid = start_changes_reader(<<"_replicator">>, 0, Epoch),
     {ok, #state{
         event_listener = start_event_listener(),
         scan_pid = ScanPid,
         max_retries = retries_value(
             config:get("replicator", "max_replication_retry_count", "10")),
         rep_start_pids = [Pid],
-        live = Live
+        live = Live,
+        epoch = Epoch
     }}.
 
 handle_call({owner, RepId}, _From, State) ->
@@ -215,7 +218,13 @@
 handle_call({rep_error, RepId, Error}, _From, State) ->
     {reply, ok, replication_error(State, RepId, Error)};
 
-handle_call({rep_db_checkpoint, DbName, EndSeq}, _From, State) ->
+% Match changes epoch with the current epoch in the state.
+% New epoch ref is created on a full rescan. Change feeds have to
+% be replayed from the start to determine ownership in the new
+% cluster configuration and epoch is used to match & checkpoint
+% only changes from the current cluster configuration.
+handle_call({rep_db_checkpoint, DbName, EndSeq, Epoch}, _From,
+            #state{epoch = Epoch} = State) ->
     Entry = case ets:lookup(?DB_TO_SEQ, DbName) of
         [] ->
             {DbName, EndSeq, false};
@@ -225,6 +234,10 @@
     true = ets:insert(?DB_TO_SEQ, Entry),
     {reply, ok, State};
 
+% Ignore checkpoints from previous epoch.
+handle_call({rep_db_checkpoint, _DbName, _EndSeq, _Epoch}, _From, State) ->
+    {reply, ok, State};
+
 handle_call(Msg, From, State) ->
     couch_log:error("Replication manager received unexpected call ~p from ~p",
         [Msg, From]),
@@ -249,7 +262,7 @@
             end,
             true = ets:insert(?DB_TO_SEQ, {DbName, Since, false}),
             ensure_rep_ddoc_exists(DbName),
-            Pid = start_changes_reader(DbName, Since),
+            Pid = start_changes_reader(DbName, Since, State#state.epoch),
             couch_log:debug("Scanning ~s from update_seq ~p", [DbName, Since]),
             [{DbName, Pid} | Pids]
     end,
@@ -288,7 +301,7 @@
     case lists:keytake(From, 2, Pids) of
         {value, {DbName, From}, NewPids} ->
             if Reason == normal -> ok; true ->
-                Fmt = "~s : Known replication pid ~w died :: ~w",
+                Fmt = "~s : Known replication or change feed pid ~w died :: ~w",
                 couch_log:error(Fmt, [?MODULE, From, Reason])
             end,
             NewState = State#state{rep_start_pids = NewPids},
@@ -343,10 +356,10 @@
     {ok, State}.
 
 
-start_changes_reader(DbName, Since) ->
-    spawn_link(?MODULE, changes_reader, [self(), DbName, Since]).
+start_changes_reader(DbName, Since, Epoch) ->
+    spawn_link(?MODULE, changes_reader, [{self(), Epoch}, DbName, Since]).
 
-changes_reader(Server, DbName, Since) ->
+changes_reader({Server, Epoch}, DbName, Since) ->
     UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
     DbOpenOptions = [{user_ctx, UserCtx}, sys_db],
     {ok, Db} = couch_db:open_int(DbName, DbOpenOptions),
@@ -360,9 +373,9 @@
         {json_req, null},
         Db
     ),
-    ChangesFeedFun({fun ?MODULE:changes_reader_cb/3, {Server, DbName}}).
+    ChangesFeedFun({fun ?MODULE:changes_reader_cb/3, {Server, DbName, Epoch}}).
 
-changes_reader_cb({change, Change, _}, _, {Server, DbName}) ->
+changes_reader_cb({change, Change, _}, _, {Server, DbName, Epoch}) ->
     case has_valid_rep_id(Change) of
         true ->
             Msg = {rep_db_update, DbName, Change},
@@ -370,11 +383,11 @@
         false ->
             ok
     end,
-    {Server, DbName};
-changes_reader_cb({stop, EndSeq, _Pending}, _, {Server, DbName}) ->
+    {Server, DbName, Epoch};
+changes_reader_cb({stop, EndSeq, _Pending}, _, {Server, DbName, Epoch}) ->
     Msg = {rep_db_checkpoint, DbName, EndSeq},
     ok = gen_server:call(Server, Msg, infinity),
-    {Server, DbName};
+    {Server, DbName, Epoch};
 changes_reader_cb(_, _, Acc) ->
     Acc.
 
@@ -424,8 +437,9 @@
 rescan(#state{scan_pid = nil} = State) ->
     true = ets:delete_all_objects(?DB_TO_SEQ),
     Server = self(),
+    Epoch = make_ref(),
     NewScanPid = spawn_link(fun() -> scan_all_dbs(Server) end),
-    State#state{scan_pid = NewScanPid};
+    State#state{scan_pid = NewScanPid, epoch = Epoch};
 rescan(#state{scan_pid = ScanPid} = State) ->
     unlink(ScanPid),
     exit(ScanPid, exit),