Restore adding some jitter-ed sleep to shard scanning code.

Otherwise a large cluster will flood replicator manager with potentially
hundreds of thousands of `{resume, Shard}` messages. For each one, it
would try to open a changes feed which can add significant load and has
been seen in production to hit varios system limits.

This brings back the change from before the switch to using mem3 shards
for replicator db scans.

Also adds a few tests.

COUCHDB-3311
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index bdc3b8f..4e5073e 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -934,22 +934,30 @@
     {ok, Db} = mem3_util:ensure_exists(
         config:get("mem3", "shards_db", "_dbs")),
     ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil),
-    ChangesFun(fun({change, {Change}, _}, _) ->
-        DbName = couch_util:get_value(<<"id">>, Change),
-        case DbName of <<"_design/", _/binary>> -> ok; _Else ->
-            case couch_replicator_utils:is_deleted(Change) of
-            true ->
-                ok;
-            false ->
-                [gen_server:cast(Server, {resume_scan, ShardName})
-                    || ShardName <- replicator_shards(DbName)],
-                ok
-            end
-        end;
-        (_, _) -> ok
-    end),
+    ChangesFun({fun scan_changes_cb/3, {Server, 1}}),
     couch_db:close(Db).
 
+scan_changes_cb({change, {Change}, _}, _, {Server, AccCount}) ->
+    DbName = couch_util:get_value(<<"id">>, Change),
+    case DbName of <<"_design/", _/binary>> -> {Server, AccCount}; _Else ->
+        case couch_replicator_utils:is_deleted(Change) of
+        true ->
+            {Server, AccCount};
+        false ->
+            UpdatedCount = lists:foldl(fun(ShardName, Count) ->
+                spawn_link(fun() ->
+                    timer:sleep(jitter(Count)),
+                    gen_server:cast(Server, {resume_scan, ShardName})
+                end),
+                Count + 1
+           end, AccCount, replicator_shards(DbName)),
+           {Server, UpdatedCount}
+        end
+    end;
+
+scan_changes_cb(_, _, {Server, AccCount}) ->
+    {Server, AccCount}.
+
 
 replicator_shards(DbName) ->
     case is_replicator_db(DbName) of
@@ -1027,4 +1035,42 @@
     end).
 
 
+scan_dbs_test_() ->
+{
+      foreach,
+      fun() -> test_util:start_couch([mem3, fabric]) end,
+      fun(Ctx) -> test_util:stop_couch(Ctx) end,
+      [
+          t_resume_db_shard(),
+          t_sleep_based_on_count()
+     ]
+}.
+
+
+t_resume_db_shard() ->
+    ?_test(begin
+        DbName0 = ?tempdb(),
+        DbName = <<DbName0/binary, "/_replicator">>,
+        ok = fabric:create_db(DbName, [?CTX]),
+        Change = {[{<<"id">>, DbName}]},
+        scan_changes_cb({change, Change, req}, type, {self(), 1}),
+        ResumeMsg = receive Msg -> Msg after 1000 -> timeout end,
+        ?assertMatch({'$gen_cast', {resume_scan, <<"shards/", _/binary>>}}, ResumeMsg),
+        fabric:delete_db(DbName, [?CTX])
+    end).
+
+
+t_sleep_based_on_count() ->
+    ?_test(begin
+        DbName0 = ?tempdb(),
+        DbName = <<DbName0/binary, "/_replicator">>,
+        ok = fabric:create_db(DbName, [?CTX]),
+        Change = {[{<<"id">>, DbName}]},
+        scan_changes_cb({change, Change, req}, type, {self(), 1000}),
+        Timeout = receive Msg -> Msg after 100 -> timeout end,
+        ?assertEqual(timeout, Timeout),
+        fabric:delete_db(DbName, [?CTX])
+    end).
+
+
 -endif.