Fix purge infos replicating to the wrong shards during shard splitting.

Previously, internal replicator (mem3_rep) replicated purge infos to/from all
the target shards. Instead, it should push/pull changes only to
appropriate ranges if those purge infos belong there based on database's hash
function.

Users experienced this error as a failure in database which contains purges,
which was split twice in a row. For example, if a Q=8 database is split to
Q=16, then split again from Q=16 to Q=32, the second split operation might fail
with a `split_state:initial_copy ...{{badkey,not_in_range}` error. The
misplaced purge infos would be noticed only during the second split, when the
initial copy phase would crash because some purge infos do not hash to neither
one of the two target ranges. Moreover, the crash would lead to repeated
retries, which generated a huge job history log.

The fix consists of three improvements:

  1) Internal replicator is updated to filter purge infos based on the db hash.

  2) Account for the fact that some users' dbs might already contain misplaced
    purge infos. Since it's a known bug, we anticipate that error and ignore
    misplaced purge info during the second shard split operation with a warning
    emitted in the logs.

  3) Make similar range errors fatal, and emit a clear error in the logs and
     job history so any future range errors are immediately obvious.

Fixes #4624
diff --git a/src/couch/src/couch_db_split.erl b/src/couch/src/couch_db_split.erl
index d219e37..dce9187 100644
--- a/src/couch/src/couch_db_split.erl
+++ b/src/couch/src/couch_db_split.erl
@@ -62,7 +62,10 @@
             catch
                 throw:{target_create_error, DbName, Error, TargetDbs} ->
                     cleanup_targets(TargetDbs, Engine),
-                    {error, {target_create_error, DbName, Error}}
+                    {error, {target_create_error, DbName, Error}};
+                throw:{range_error, Context, DocId, TargetDbs} ->
+                    cleanup_targets(TargetDbs, Engine),
+                    {error, {range_error, Context, DocId, maps:keys(TargetDbs)}}
             after
                 couch_db:close(SourceDb)
             end;
@@ -208,10 +211,20 @@
     DelOpt = [{context, compaction}, sync],
     couch_db_engine:delete(Engine, RootDir, Filepath, DelOpt).
 
-pick_target(DocId, #state{} = State, #{} = Targets) ->
+pick_target(DocId, #state{} = State, #{} = Targets, Context) when
+    is_binary(DocId), is_atom(Context)
+->
     #state{pickfun = PickFun, hashfun = HashFun} = State,
-    Key = PickFun(DocId, maps:keys(Targets), HashFun),
-    {Key, maps:get(Key, Targets)}.
+    TargetRanges = maps:keys(Targets),
+    Key = PickFun(DocId, TargetRanges, HashFun),
+    case Key of
+        not_in_range ->
+            % We found a document, or purge info which doesn't hash to
+            % any of the target ranges. Stop, and raise a fatal exception.
+            throw({range_error, Context, DocId, Targets});
+        [B, E] when is_integer(B), is_integer(E), B =< E, is_map_key(Key, Targets) ->
+            {Key, map_get(Key, Targets)}
+    end.
 
 set_targets_update_seq(#state{targets = Targets} = State) ->
     Seq = couch_db:get_update_seq(State#state.source_db),
@@ -337,11 +350,23 @@
         false -> Target1
     end.
 
-purge_cb({_PSeq, _UUID, Id, _Revs} = PI, #state{targets = Targets} = State) ->
-    {Key, Target} = pick_target(Id, State, Targets),
-    MaxBuffer = State#state.max_buffer_size,
-    Target1 = acc_and_flush(PI, Target, MaxBuffer, fun commit_purge_infos/1),
-    {ok, State#state{targets = Targets#{Key => Target1}}}.
+purge_cb({PSeq, UUID, Id, _Revs} = PI, #state{} = State) ->
+    #state{source_db = Db, targets = Targets} = State,
+    try
+        {Key, Target} = pick_target(Id, State, Targets, purge_info_copy),
+        MaxBuffer = State#state.max_buffer_size,
+        Target1 = acc_and_flush(PI, Target, MaxBuffer, fun commit_purge_infos/1),
+        {ok, State#state{targets = Targets#{Key => Target1}}}
+    catch
+        throw:{range_error, purge_info_copy, _, _} ->
+            % Before 3.4, due to a bug in internal replicator, it was possible
+            % for purge infos which don't belong to the source shard to end up
+            % there during, or after shard splitting. We choose to emit a warning
+            % and ignore the misplaced purge info record.
+            LogMsg = "~p : ignore misplaced purge info pseq:~p uuid:~p doc_id:~p shard:~p",
+            couch_log:warning(LogMsg, [?MODULE, PSeq, UUID, Id, couch_db:name(Db)]),
+            {ok, State}
+    end.
 
 commit_purge_infos(#target{buffer = [], db = Db} = Target) ->
     Target#target{db = Db};
@@ -367,7 +392,7 @@
     changes_cb(FDI, State);
 changes_cb(#full_doc_info{id = Id} = FDI, #state{} = State) ->
     #state{source_db = SourceDb, targets = Targets} = State,
-    {Key, Target} = pick_target(Id, State, Targets),
+    {Key, Target} = pick_target(Id, State, Targets, changes),
     FDI1 = process_fdi(FDI, SourceDb, Target#target.db),
     MaxBuffer = State#state.max_buffer_size,
     Target1 = acc_and_flush(FDI1, Target, MaxBuffer, fun commit_docs/1),
@@ -500,7 +525,7 @@
                 <<?LOCAL_DOC_PREFIX, _/binary>> ->
                     % Users' and replicator app's checkpoints go to their
                     % respective shards based on the general hashing algorithm
-                    {Key, Target} = pick_target(Id, State, Acc),
+                    {Key, Target} = pick_target(Id, State, Acc, local_docs),
                     #target{buffer = Docs} = Target,
                     Acc#{Key => Target#target{buffer = [Doc | Docs]}}
             end,
diff --git a/src/mem3/src/mem3_rep.erl b/src/mem3/src/mem3_rep.erl
index d186f9e..3cc3814 100644
--- a/src/mem3/src/mem3_rep.erl
+++ b/src/mem3/src/mem3_rep.erl
@@ -314,12 +314,18 @@
             {ok, Count}
     end.
 
-pull_purges_multi(#acc{source = Source} = Acc0) ->
-    #acc{batch_size = Count, seq = UpdateSeq, targets = Targets0} = Acc0,
+pull_purges_multi(#acc{} = Acc0) ->
+    #acc{
+        source = Source,
+        targets = Targets0,
+        batch_size = Count,
+        seq = UpdateSeq,
+        hashfun = HashFun
+    } = Acc0,
     with_src_db(Acc0, fun(Db) ->
         Targets = maps:map(
             fun(_, #tgt{} = T) ->
-                pull_purges(Db, Count, Source, T)
+                pull_purges(Db, Count, Source, T, HashFun)
             end,
             reset_remaining(Targets0)
         ),
@@ -343,7 +349,7 @@
         end
     end).
 
-pull_purges(Db, Count, SrcShard, #tgt{} = Tgt0) ->
+pull_purges(Db, Count, #shard{} = SrcShard, #tgt{} = Tgt0, HashFun) ->
     #tgt{shard = TgtShard} = Tgt0,
     SrcUUID = couch_db:get_uuid(Db),
     #shard{node = TgtNode, name = TgtDbName} = TgtShard,
@@ -354,18 +360,33 @@
         Infos == [] ->
             ok;
         true ->
-            {ok, _} = couch_db:purge_docs(Db, Infos, [?REPLICATED_CHANGES]),
+            % When shard ranges are split it's possible to pull purges from a
+            % larger target range to a smaller source range, we don't want to
+            % pull purges which don't belong on the source, so we filter them
+            % out using the same pickfun which we use when picking documents
+            #shard{range = SrcRange} = SrcShard,
+            BelongsFun = fun({_UUID, Id, _Revs}) when is_binary(Id) ->
+                mem3_reshard_job:pickfun(Id, [SrcRange], HashFun) =:= SrcRange
+            end,
+            Infos1 = lists:filter(BelongsFun, Infos),
+            {ok, _} = couch_db:purge_docs(Db, Infos1, [?REPLICATED_CHANGES]),
             Body = purge_cp_body(SrcShard, TgtShard, ThroughSeq),
             mem3_rpc:save_purge_checkpoint(TgtNode, TgtDbName, LocalPurgeId, Body)
     end,
     Tgt#tgt{remaining = max(0, Remaining)}.
 
-push_purges_multi(#acc{source = SrcShard} = Acc) ->
-    #acc{batch_size = BatchSize, seq = UpdateSeq, targets = Targets0} = Acc,
+push_purges_multi(#acc{} = Acc) ->
+    #acc{
+        source = SrcShard,
+        targets = Targets0,
+        batch_size = BatchSize,
+        seq = UpdateSeq,
+        hashfun = HashFun
+    } = Acc,
     with_src_db(Acc, fun(Db) ->
         Targets = maps:map(
             fun(_, #tgt{} = T) ->
-                push_purges(Db, BatchSize, SrcShard, T)
+                push_purges(Db, BatchSize, SrcShard, T, HashFun)
             end,
             reset_remaining(Targets0)
         ),
@@ -385,9 +406,9 @@
         end
     end).
 
-push_purges(Db, BatchSize, SrcShard, Tgt) ->
+push_purges(Db, BatchSize, SrcShard, Tgt, HashFun) ->
     #tgt{shard = TgtShard, purgeid = LocalPurgeId} = Tgt,
-    #shard{node = TgtNode, name = TgtDbName} = TgtShard,
+    #shard{node = TgtNode, range = TgtRange, name = TgtDbName} = TgtShard,
     StartSeq =
         case couch_db:open_doc(Db, LocalPurgeId, []) of
             {ok, #doc{body = {Props}}} ->
@@ -396,15 +417,25 @@
                 Oldest = couch_db:get_oldest_purge_seq(Db),
                 erlang:max(0, Oldest - 1)
         end,
+    BelongsFun = fun(Id) when is_binary(Id) ->
+        mem3_reshard_job:pickfun(Id, [TgtRange], HashFun) =:= TgtRange
+    end,
     FoldFun = fun({PSeq, UUID, Id, Revs}, {Count, Infos, _}) ->
-        NewCount = Count + length(Revs),
-        NewInfos = [{UUID, Id, Revs} | Infos],
-        Status =
-            if
-                NewCount < BatchSize -> ok;
-                true -> stop
-            end,
-        {Status, {NewCount, NewInfos, PSeq}}
+        case BelongsFun(Id) of
+            true ->
+                NewCount = Count + length(Revs),
+                NewInfos = [{UUID, Id, Revs} | Infos],
+                Status =
+                    if
+                        NewCount < BatchSize -> ok;
+                        true -> stop
+                    end,
+                {Status, {NewCount, NewInfos, PSeq}};
+            false ->
+                % In case of split shard ranges, purges, like documents, will
+                % belong only to one target
+                {ok, {Count, Infos, PSeq}}
+        end
     end,
     InitAcc = {0, [], StartSeq},
     {ok, {_, Infos, ThroughSeq}} =
diff --git a/src/mem3/src/mem3_reshard_job.erl b/src/mem3/src/mem3_reshard_job.erl
index 25ff1cf..4c93bf9 100644
--- a/src/mem3/src/mem3_reshard_job.erl
+++ b/src/mem3/src/mem3_reshard_job.erl
@@ -265,6 +265,19 @@
     couch_log:error(Msg, [?MODULE, jobfmt(Job)]),
     kill_workers(Job),
     exit({error, missing_target});
+handle_worker_exit(#job{} = Job, _Pid, {error, {range_error, _, _, _}} = Reason) ->
+    Msg1 = "~p fatal range error job:~p error:~p",
+    couch_log:error(Msg1, [?MODULE, jobfmt(Job), Reason]),
+    kill_workers(Job),
+    case lists:member(Job#job.split_state, ?CLEAN_TARGET_STATES) of
+        true ->
+            Msg2 = "~p cleaning target after db was deleted ~p",
+            couch_log:error(Msg2, [?MODULE, jobfmt(Job)]),
+            reset_target(Job),
+            exit(Reason);
+        false ->
+            exit(Reason)
+    end;
 handle_worker_exit(#job{} = Job0, _Pid, Reason) ->
     couch_log:error("~p worker error ~p ~p", [?MODULE, jobfmt(Job0), Reason]),
     kill_workers(Job0),
diff --git a/src/mem3/test/eunit/mem3_reshard_test.erl b/src/mem3/test/eunit/mem3_reshard_test.erl
index be539b4..4376ee4 100644
--- a/src/mem3/test/eunit/mem3_reshard_test.erl
+++ b/src/mem3/test/eunit/mem3_reshard_test.erl
@@ -77,7 +77,9 @@
                     fun retries_work/1,
                     fun target_reset_in_initial_copy/1,
                     fun split_an_incomplete_shard_map/1,
-                    fun target_shards_are_locked/1
+                    fun target_shards_are_locked/1,
+                    fun doc_in_bad_range_on_source/1,
+                    fun purge_info_in_bad_range_on_source/1
                 ]
             }
         }
@@ -88,7 +90,7 @@
 split_one_shard(#{db1 := Db}) ->
     {timeout, ?TIMEOUT,
         ?_test(begin
-            DocSpec = #{docs => 10, delete => [5, 9], mrview => 1, local => 1},
+            DocSpec = #{docs => 10, delete => [5, 9], mrview => 1, local => 10},
             add_test_docs(Db, DocSpec),
 
             % Save documents before the split
@@ -107,6 +109,7 @@
 
             % Split the one shard
             [#shard{name = Shard}] = lists:sort(mem3:local_shards(Db)),
+            SrcDocIds = get_shard_doc_ids(Shard),
             {ok, JobId} = mem3_reshard:start_split_job(Shard),
             wait_state(JobId, completed),
 
@@ -117,6 +120,16 @@
             ?assertEqual([16#00000000, 16#7fffffff], R1),
             ?assertEqual([16#80000000, 16#ffffffff], R2),
 
+            % Check that docs are on the shards where they belong to
+            [#shard{name = SN1}, #shard{name = SN2}] = Shards1,
+            DocIds1 = get_shard_doc_ids(SN1),
+            DocIds2 = get_shard_doc_ids(SN2),
+            [?assert(mem3:belongs(SN1, Id)) || Id <- DocIds1],
+            [?assert(mem3:belongs(SN2, Id)) || Id <- DocIds2],
+
+            % None of the docs or purges were dropped
+            ?assertEqual(lists:sort(SrcDocIds), lists:sort(DocIds1 ++ DocIds2)),
+
             % Check metadata bits after the split
             ?assertEqual(942, get_revs_limit(Db)),
             ?assertEqual(943, get_purge_infos_limit(Db)),
@@ -173,6 +186,9 @@
 
             % Split the one shard
             [#shard{name = Shard}] = lists:sort(mem3:local_shards(Db)),
+            % Get purge infos before the split
+            SrcPurges = get_purge_infos(Shard),
+            SrcDocIds = get_shard_doc_ids(Shard),
             {ok, JobId} = mem3_reshard:start_split_job(Shard),
             wait_state(JobId, completed),
 
@@ -183,6 +199,21 @@
             ?assertEqual([16#00000000, 16#7fffffff], R1),
             ?assertEqual([16#80000000, 16#ffffffff], R2),
 
+            % Check that purges and docs are on the shards where they belong to
+            [#shard{name = SN1}, #shard{name = SN2}] = Shards1,
+            TgtPurges1 = get_purge_infos(SN1),
+            TgtPurges2 = get_purge_infos(SN2),
+            DocIds1 = get_shard_doc_ids(SN1),
+            DocIds2 = get_shard_doc_ids(SN2),
+            [?assert(mem3:belongs(SN1, Id)) || Id <- TgtPurges1],
+            [?assert(mem3:belongs(SN2, Id)) || Id <- TgtPurges2],
+            [?assert(mem3:belongs(SN1, Id)) || Id <- DocIds1],
+            [?assert(mem3:belongs(SN2, Id)) || Id <- DocIds2],
+
+            % None of the docs or purges were dropped
+            ?assertEqual(lists:sort(SrcDocIds), lists:sort(DocIds1 ++ DocIds2)),
+            ?assertEqual(lists:sort(SrcPurges), lists:sort(TgtPurges1 ++ TgtPurges2)),
+
             % Check metadata bits after the split
             ?assertEqual(10, get_purge_infos_limit(Db)),
 
@@ -619,6 +650,53 @@
             wait_state(JobId, completed)
         end)}.
 
+% Source somehow got a bad doc which doesn't belong there
+doc_in_bad_range_on_source(#{db1 := Db}) ->
+    {timeout, ?TIMEOUT,
+        ?_test(begin
+            DocSpec = #{docs => 10, mrview => 0, local => 1},
+            add_test_docs(Db, DocSpec),
+
+            % Split first shard
+            [#shard{name = Shard1}] = lists:sort(mem3:local_shards(Db)),
+            {ok, JobId1} = mem3_reshard:start_split_job(Shard1),
+            wait_state(JobId1, completed),
+
+            % Split the first range again but before doing that insert
+            % a doc in the shard with a doc id that wouldn't belong to
+            % any target ranges
+            [#shard{name = Shard2}, _] = lists:sort(mem3:local_shards(Db)),
+            add_shard_doc(Shard2, <<"4">>, [{<<"in_the_wrong">>, <<"shard_range">>}]),
+            {ok, JobId2} = mem3_reshard:start_split_job(Shard2),
+            wait_state(JobId2, failed),
+            {ok, {JobProps}} = mem3_reshard:job(JobId2),
+            StateInfo = proplists:get_value(state_info, JobProps),
+            ?assertMatch({[{reason, <<"{error,{range_error", _/binary>>}]}, StateInfo)
+        end)}.
+
+% Source has a bad doc but we expect that due to a bug in <3.4 so we
+% skip over it and move on
+purge_info_in_bad_range_on_source(#{db1 := Db}) ->
+    {timeout, ?TIMEOUT,
+        ?_test(begin
+            DocSpec = #{docs => 10, mrview => 0, local => 1},
+            add_test_docs(Db, DocSpec),
+
+            % Split first shard
+            [#shard{name = Shard1}] = lists:sort(mem3:local_shards(Db)),
+            {ok, JobId1} = mem3_reshard:start_split_job(Shard1),
+            wait_state(JobId1, completed),
+
+            % Split the first range again but before doing that insert
+            % a purge info in the shard with an id which wouldn't belong to
+            % any target ranges
+            [#shard{name = Shard2}, _] = lists:sort(mem3:local_shards(Db)),
+            PurgeInfo = {couch_uuids:new(), <<"4">>, [{1, <<"a">>}]},
+            add_shard_purge_info(Shard2, PurgeInfo),
+            {ok, JobId2} = mem3_reshard:start_split_job(Shard2),
+            wait_state(JobId2, completed)
+        end)}.
+
 intercept_state(State) ->
     TestPid = self(),
     meck:new(mem3_reshard_job, [passthrough]),
@@ -988,3 +1066,29 @@
             {data, Data}
         ])
     ].
+
+get_purge_infos(ShardName) when is_binary(ShardName) ->
+    FoldFun = fun({_Seq, _UUID, Id, _Revs}, Acc) -> {ok, [Id | Acc]} end,
+    couch_util:with_db(ShardName, fun(Db) ->
+        PSeq = max(0, couch_db:get_oldest_purge_seq(Db) - 1),
+        {ok, Res} = couch_db:fold_purge_infos(Db, PSeq, FoldFun, []),
+        Res
+    end).
+
+get_shard_doc_ids(ShardName) when is_binary(ShardName) ->
+    FoldFun = fun(#full_doc_info{id = Id}, Acc) -> {ok, [Id | Acc]} end,
+    couch_util:with_db(ShardName, fun(Db) ->
+        {ok, Res} = couch_db:fold_docs(Db, FoldFun, [], []),
+        Res
+    end).
+
+add_shard_doc(ShardName, DocId, Props) ->
+    couch_util:with_db(ShardName, fun(Db) ->
+        Doc = couch_doc:from_json_obj({[{<<"_id">>, DocId}] ++ Props}),
+        couch_db:update_doc(Db, Doc, [])
+    end).
+
+add_shard_purge_info(ShardName, {UUID, Id, Revs}) ->
+    couch_util:with_db(ShardName, fun(Db) ->
+        couch_db:purge_docs(Db, [{UUID, Id, Revs}])
+    end).