Use mem3 to discover all _replicator shards in replicator manager

Previously this was done via recursive db directory traversal, looking for
shards names ending in `_replicator`. However, if there are orphanned shard
files (not associated with a clustered db), replicator manager crashes. It
restarts eventually, but as long as the orphanned shard file
without an entry in dbs db is present on the file system, replicator manager
will keep crashing and never reach some replication documents in shards which
would be traversed after the problematic shard. The user-visible effect of this
is some replication documents are never triggered.

To fix, use mem3 to traverse and discover `_replicator` shards. This was used
Cloudant's production code for many years it is battle-tested and it doesn't
suffer from file system vs mem3 inconsistency.

Local `_replicator` db is a special case. Since it is not clustered it will
not appear in the clustered db list. However it is already handled as a special
case in `init(_)` so that behavior is not affected by this change.

COUCHDB-3277
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 953b1bf..37d3b3a 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -931,24 +931,34 @@
     {lists:keydelete(<<"oauth">>, 1, Props)}.
 
 scan_all_dbs(Server) when is_pid(Server) ->
-    Root = config:get("couchdb", "database_dir", "."),
-    NormRoot = couch_util:normpath(Root),
-    filelib:fold_files(Root, "_replicator(\\.[0-9]{10,})?.couch$", true,
-        fun(Filename, Acc) ->
-	    % shamelessly stolen from couch_server.erl
-            NormFilename = couch_util:normpath(Filename),
-            case NormFilename -- NormRoot of
-                [$/ | RelativeFilename] -> ok;
-                RelativeFilename -> ok
-            end,
-            DbName = ?l2b(filename:rootname(RelativeFilename, ".couch")),
-            Jitter = jitter(Acc),
-            spawn_link(fun() ->
-                timer:sleep(Jitter),
-                gen_server:cast(Server, {resume_scan, DbName})
-            end),
-	    Acc + 1
-	end, 1).
+    {ok, Db} = mem3_util:ensure_exists(
+        config:get("mem3", "shard_db", "dbs")),
+    ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil),
+    ChangesFun(fun({change, {Change}, _}, _) ->
+        DbName = couch_util:get_value(<<"id">>, Change),
+        case DbName of <<"_design/", _/binary>> -> ok; _Else ->
+            case couch_replicator_utils:is_deleted(Change) of
+            true ->
+                ok;
+            false ->
+                [gen_server:cast(Server, {resume_scan, ShardName})
+                    || ShardName <- replicator_shards(DbName)],
+                ok
+            end
+        end;
+        (_, _) -> ok
+    end),
+    couch_db:close(Db).
+
+
+replicator_shards(DbName) ->
+    case is_replicator_db(DbName) of
+    false ->
+        [];
+    true ->
+        [ShardName || #shard{name = ShardName} <- mem3:local_shards(DbName)]
+    end.
+
 
 % calculate random delay proportional to the number of replications
 % on current node, in order to prevent a stampede:
@@ -980,3 +990,41 @@
         Else ->
             Else
     end.
+
+
+-ifdef(TEST).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+replicator_shards_test_() ->
+{
+      foreach,
+      fun() -> test_util:start_couch([mem3, fabric]) end,
+      fun(Ctx) -> test_util:stop_couch(Ctx) end,
+      [
+          t_pass_replicator_shard(),
+          t_fail_non_replicator_shard()
+     ]
+}.
+
+
+t_pass_replicator_shard() ->
+    ?_test(begin
+        DbName0 = ?tempdb(),
+        DbName = <<DbName0/binary, "/_replicator">>,
+        ok = fabric:create_db(DbName, [?CTX]),
+        ?assertEqual(8, length(replicator_shards(DbName))),
+        fabric:delete_db(DbName, [?CTX])
+    end).
+
+
+t_fail_non_replicator_shard() ->
+    ?_test(begin
+        DbName = ?tempdb(),
+        ok = fabric:create_db(DbName, [?CTX]),
+        ?assertEqual([], replicator_shards(DbName)),
+        fabric:delete_db(DbName, [?CTX])
+    end).
+
+
+-endif.