HBASE-25055 Add ReplicationSource for meta WALs; add enable/disable w… (#2451)

* HBASE-25055 Add ReplicationSource for meta WALs; add enable/disable when hbase:meta assigned to RS

Fill in gap left by HBASE-11183 'Timeline Consistent region replicas - Phase 2 design'.
HBASE-11183 left off implementing 'async WAL Replication' on the hbase:meta
Table; hbase:meta Table could only do Phase 1 Region Replicas reading
the primary Regions' hfiles. Here we add 'async WAL Replication' to
hbase:meta so Replicas can be more current with the primary's changes.

Adds a 'special' ReplicationSource that reads hbase:meta WAL files and replicates
all edits to the configured in-cluster endpoint (Defaults to the
RegionReadReplicaEndpoint.class -- set hbase.region.replica.catalog.replication to
target a different endpoint implementation).

Set hbase.region.replica.replication.catalog.enabled to enable async WAL
Replication for hbase:meta region replicas. Its off by default.

The CatalogReplicationSource for async WAL Replication of hbase:meta does
NOT need to keep up WAL offset or a queue of WALs-to-replicate in the
replication queue store as is done in other ReplicationSource implementations;
the CatalogReplicationSource is for Region Replicas only. General
Replication does not replicate hbase:meta. hbase:meta Region Replicas reset
on crash of the primary replica so there is no need to 'recover'
replication that was running on the crashed server.

Because it so different in operation, the CatalogReplicationSource is bolted
on to the side of the ReplicationSourceManager. It is lazily
instantiated to match the lazy instantiation of the hbase:meta
WALProvider, created and started on the open of the first Region of an
hbase:meta table. Thereafter it stays up till the process dies, even if
all hbase:meta Regions have moved off the server, in case a hbase:meta
Region is moved back (Doing this latter simplifies the implementation)

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
  Read configuration to see if we need to wait on setting a Region read-enabled
  (if so, replicas will only flip to enable read after confirming a
  flush of the primary so they for sure are a replica of a known point)

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
 If configured, on open of hbase:meta, ask the ReplicationSourceManager
 to add a ReplicationSource (if it hasn't already).

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
 Edit log message.

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
 If configured, on close of hbase:meta, update ReplicationSourceManager
 that a source Region has closed.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
 javadoc and make constructor private.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
 Add logPositionAndCleanOldLogs w/ default of the old behavior so
 CatalogReplicationSource can bypass updating store with WAL position,
 etc.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
 Add creation and start of an CatalogReplicationSource.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
 Go via ReplicationSource when calling logPostionAndCleanOldLogs so new RS can intercept.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
 Javadoc.

hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
 Add utility for reading configurations for hbase:meta region replicas.

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
 Javadoc.

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
 Use define.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
 Specical version of ReplicationSource for Region Replicas on hbase:meta.

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
 Needs a special peer too (peers are baked into replication though we don't use 'peers' here)

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALProvider.java
 Tests.

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Huaxiang Sun <huaxiangsun@apache.com>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 911deea..9757568 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2486,9 +2486,9 @@
     if (ServerRegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
       return;
     }
-    if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf) ||
-        !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(
-          region.conf)) {
+    TableName tn = region.getTableDescriptor().getTableName();
+    if (!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(region.conf, tn) ||
+        !ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(region.conf)) {
       region.setReadsEnabled(true);
       return;
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
index 98d09b2..4ee6efc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
@@ -20,8 +20,11 @@
 import edu.umd.cs.findbugs.annotations.Nullable;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
@@ -31,6 +34,7 @@
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
 import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -129,8 +133,15 @@
       }
       // pass null for the last parameter, which used to be a CancelableProgressable, as now the
       // opening can not be interrupted by a close request any more.
-      region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), rs.getConfiguration(),
-        rs, null);
+      Configuration conf = rs.getConfiguration();
+      TableName tn = htd.getTableName();
+      if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(conf, tn)) {
+        if (RegionReplicaUtil.isDefaultReplica(this.regionInfo.getReplicaId())) {
+          // Add the hbase:meta replication source on replica zero/default.
+          rs.getReplicationSourceService().getReplicationManager().addCatalogReplicationSource();
+        }
+      }
+      region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), conf, rs, null);
     } catch (IOException e) {
       cleanUpAndReportFailure(e);
       return;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
index dddf553..829d0bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
@@ -100,9 +100,9 @@
     RetryCounter counter = new RetryCounterFactory(maxAttempts, (int)pause).create();
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("RPC'ing to primary region replica " +
-        ServerRegionReplicaUtil.getRegionInfoForDefaultReplica(region.getRegionInfo()) + " from " +
-        region.getRegionInfo() + " to trigger FLUSH");
+      LOG.debug("RPC'ing to primary " + ServerRegionReplicaUtil.
+          getRegionInfoForDefaultReplica(region.getRegionInfo()).getRegionNameAsString() +
+        " from " + region.getRegionInfo().getRegionNameAsString() + " to trigger FLUSH");
     }
     while (!region.isClosing() && !region.isClosed()
         && !server.isAborted() && !server.isStopped()) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
index 0bf2543..1ed74bb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
@@ -22,6 +22,7 @@
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -30,6 +31,7 @@
 import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,12 +115,20 @@
     if (region.close(abort) == null) {
       // XXX: Is this still possible? The old comment says about split, but now split is done at
       // master side, so...
-      LOG.warn("Can't close {} already closed during close()", regionName);
+      LOG.warn("Can't close {}, already closed during close()", regionName);
       rs.getRegionsInTransitionInRS().remove(encodedNameBytes, Boolean.FALSE);
       return;
     }
 
     rs.removeRegion(region, destination);
+    if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(rs.getConfiguration(),
+        region.getTableDescriptor().getTableName())) {
+      if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo().getReplicaId())) {
+        // If hbase:meta read replicas enabled, remove replication source for hbase:meta Regions.
+        // See assign region handler where we add the replication source on open.
+        rs.getReplicationSourceService().getReplicationManager().removeCatalogReplicationSource();
+      }
+    }
     if (!rs.reportRegionStateTransition(
       new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId,
         -1, region.getRegionInfo()))) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
new file mode 100644
index 0000000..f36514d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.Collections;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * ReplicationSource that reads catalog WAL files -- e.g. hbase:meta WAL files -- and lets through
+ * all WALEdits from these WALs. This ReplicationSource is NOT created via
+ * {@link ReplicationSourceFactory}.
+ */
+@InterfaceAudience.Private
+class CatalogReplicationSource extends ReplicationSource {
+  CatalogReplicationSource() {
+    // Filters in hbase:meta WAL files and allows all edits, including 'meta' edits (these are
+    // filtered out in the 'super' class default implementation).
+    super(p -> AbstractFSWALProvider.isMetaFile(p), Collections.emptyList());
+  }
+
+  @Override
+  public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
+    // Noop. This implementation does not persist state to backing storage nor does it keep its
+    // WALs in a general map up in ReplicationSourceManager so just skip calling through to the
+    // default implemenentation.
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
new file mode 100644
index 0000000..cb00ac2
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
+import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The 'peer' used internally by Catalog Region Replicas Replication Source.
+ * The Replication system has 'peer' baked into its core so though we do not need 'peering', we
+ * need a 'peer' and its configuration else the replication system breaks at a few locales.
+ * Set "hbase.region.replica.catalog.replication" if you want to change the configured endpoint.
+ */
+@InterfaceAudience.Private
+class CatalogReplicationSourcePeer extends ReplicationPeerImpl {
+  /**
+   * @param clusterKey Usually the UUID from zk passed in by caller as a String.
+   */
+  CatalogReplicationSourcePeer(Configuration configuration, String clusterKey, String peerId) {
+    super(configuration, ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER + "_catalog",
+      ReplicationPeerConfig.newBuilder().
+        setClusterKey(clusterKey).
+        setReplicationEndpointImpl(
+          configuration.get("hbase.region.replica.catalog.replication",
+            RegionReplicaReplicationEndpoint.class.getName())).
+        setBandwidth(0). // '0' means no bandwidth.
+        setSerial(false).
+        build(),
+      true, SyncReplicationState.NONE, SyncReplicationState.NONE);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/NoopReplicationQueueStorage.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/NoopReplicationQueueStorage.java
new file mode 100644
index 0000000..4ad41fc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/NoopReplicationQueueStorage.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Noop queue storage -- does nothing.
+ */
+@InterfaceAudience.Private
+class NoopReplicationQueueStorage implements ReplicationQueueStorage {
+  NoopReplicationQueueStorage() {}
+
+  @Override
+  public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {}
+
+  @Override
+  public void addWAL(ServerName serverName, String queueId, String fileName)
+    throws ReplicationException {}
+
+  @Override
+  public void removeWAL(ServerName serverName, String queueId, String fileName)
+    throws ReplicationException { }
+
+  @Override
+  public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
+    Map<String, Long> lastSeqIds) throws ReplicationException {}
+
+  @Override
+  public long getLastSequenceId(String encodedRegionName, String peerId)
+    throws ReplicationException {
+    return 0;
+  }
+
+  @Override
+  public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
+    throws ReplicationException {}
+
+  @Override
+  public void removeLastSequenceIds(String peerId) throws ReplicationException {}
+
+  @Override
+  public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
+    throws ReplicationException {}
+
+  @Override
+  public long getWALPosition(ServerName serverName, String queueId, String fileName)
+    throws ReplicationException {
+    return 0;
+  }
+
+  @Override
+  public List<String> getWALsInQueue(ServerName serverName, String queueId)
+      throws ReplicationException {
+    return Collections.EMPTY_LIST;
+  }
+
+  @Override
+  public List<String> getAllQueues(ServerName serverName) throws ReplicationException {
+    return Collections.EMPTY_LIST;
+  }
+
+  @Override
+  public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
+    ServerName destServerName) throws ReplicationException {
+    return null;
+  }
+
+  @Override
+  public void removeReplicatorIfQueueIsEmpty(ServerName serverName)
+    throws ReplicationException {}
+
+  @Override
+  public List<ServerName> getListOfReplicators() throws ReplicationException {
+    return Collections.EMPTY_LIST;
+  }
+
+  @Override
+  public Set<String> getAllWALs() throws ReplicationException {
+    return Collections.EMPTY_SET;
+  }
+
+  @Override
+  public void addPeerToHFileRefs(String peerId) throws ReplicationException {}
+
+  @Override
+  public void removePeerFromHFileRefs(String peerId) throws ReplicationException {}
+
+  @Override
+  public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
+    throws ReplicationException {}
+
+  @Override
+  public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {}
+
+  @Override
+  public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
+    return Collections.EMPTY_LIST;
+  }
+
+  @Override
+  public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
+    return Collections.EMPTY_LIST;
+  }
+
+  @Override
+  public Set<String> getAllHFileRefs() throws ReplicationException {
+    return Collections.EMPTY_SET;
+  }
+
+  @Override
+  public String getRsNode(ServerName serverName) {
+    return null;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 46cf851..b66b7f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index 6e09077..70a9280 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -368,7 +368,7 @@
         ctx.getMetrics().incrLogEditsFiltered(skippedEdits);
         return true;
       } else {
-        LOG.warn("Failed to replicate all entris, retry={}", retryCounter.getAttemptTimes());
+        LOG.warn("Failed to replicate all entries, retry={}", retryCounter.getAttemptTimes());
         if (!retryCounter.shouldRetry()) {
           return false;
         }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 58206e0..c6b05b4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -20,7 +20,6 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.OptionalLong;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -116,14 +115,14 @@
     SyncReplicationPeerMappingManager mapping = new SyncReplicationPeerMappingManager();
     this.globalMetricsSource = CompatibilitySingletonFactory
         .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
-    WALProvider walProvider = walFactory.getWALProvider();
     this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers,
-        replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
-        walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty(),
+        replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, walFactory,
         mapping, globalMetricsSource);
     this.syncReplicationPeerInfoProvider =
         new SyncReplicationPeerInfoProviderImpl(replicationPeers, mapping);
     PeerActionListener peerActionListener = PeerActionListener.DUMMY;
+    // Get the user-space WAL provider
+    WALProvider walProvider = walFactory != null? walFactory.getWALProvider(): null;
     if (walProvider != null) {
       walProvider
         .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 8091d0c..34f9c86 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -64,10 +64,10 @@
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 
 /**
@@ -222,6 +222,7 @@
     this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
     this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
 
+    // A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
     defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
     currentBandwidth = getCurrentBandwidth();
     this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
@@ -373,10 +374,10 @@
   private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
     workerThreads.compute(walGroupId, (key, value) -> {
       if (value != null) {
-        LOG.debug("{} preempted start of worker walGroupId={}", logPeerId(), walGroupId);
+        LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
         return value;
       } else {
-        LOG.debug("{} starting worker for walGroupId={}", logPeerId(), walGroupId);
+        LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
         ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
         ReplicationSourceWALReader walReader =
             createNewWALReader(walGroupId, queue, worker.getStartPosition());
@@ -521,7 +522,7 @@
 
   private long getCurrentBandwidth() {
     long peerBandwidth = replicationPeer.getPeerBandwidth();
-    // user can set peer bandwidth to 0 to use default bandwidth
+    // User can set peer bandwidth to 0 to use default bandwidth.
     return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
   }
 
@@ -607,11 +608,11 @@
       this.startupOngoing.set(false);
       throw new IllegalStateException("Source should be active.");
     }
-    LOG.info("{} queueId={} is replicating from cluster={} to cluster={}",
-      logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
-
+    LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}",
+      logPeerId(), this.replicationQueueInfo.getQueueId(), this.queues.size(), clusterId,
+      peerClusterId);
     initializeWALEntryFilter(peerClusterId);
-    // start workers
+    // Start workers
     for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
       String walGroupId = entry.getKey();
       PriorityBlockingQueue<Path> queue = entry.getValue();
@@ -621,8 +622,10 @@
   }
 
   @Override
-  public void startup() {
-    // mark we are running now
+  public ReplicationSourceInterface startup() {
+    if (this.sourceRunning) {
+      return this;
+    }
     this.sourceRunning = true;
     startupOngoing.set(true);
     initThread = new Thread(this::initialize);
@@ -649,7 +652,7 @@
             }
           }
         } while ((this.startupOngoing.get() || this.retryStartup.get()) && !this.abortOnError);
-      });
+    return this;
   }
 
   @Override
@@ -828,7 +831,8 @@
     return server;
   }
 
-  ReplicationQueueStorage getQueueStorage() {
+  @Override
+  public ReplicationQueueStorage getReplicationQueueStorage() {
     return queueStorage;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index d613049..8863f14 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -19,19 +19,22 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 
 /**
  * Constructs a {@link ReplicationSourceInterface}
+ * Note, not used to create specialized ReplicationSources
+ * @see CatalogReplicationSource
  */
 @InterfaceAudience.Private
-public class ReplicationSourceFactory {
-
+public final class ReplicationSourceFactory {
   private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceFactory.class);
 
+  private ReplicationSourceFactory() {}
+
   static ReplicationSourceInterface create(Configuration conf, String queueId) {
     ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
     boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 0bd90cf..27e4b79 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -23,7 +23,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,7 +42,6 @@
  */
 @InterfaceAudience.Private
 public interface ReplicationSourceInterface {
-
   /**
    * Initializer for the source
    * @param conf the configuration to use
@@ -76,7 +74,7 @@
   /**
    * Start the replication
    */
-  void startup();
+  ReplicationSourceInterface startup();
 
   /**
    * End the replication
@@ -174,7 +172,6 @@
   /**
    * Try to throttle when the peer config with a bandwidth
    * @param batchSize entries size will be pushed
-   * @throws InterruptedException
    */
   void tryThrottle(int batchSize) throws InterruptedException;
 
@@ -206,4 +203,21 @@
   default boolean isRecovered() {
     return false;
   }
+
+  /**
+   * @return The instance of queueStorage used by this ReplicationSource.
+   */
+  ReplicationQueueStorage getReplicationQueueStorage();
+
+  /**
+   * Log the current position to storage. Also clean old logs from the replication queue.
+   * Use to bypass the default call to
+   * {@link ReplicationSourceManager#logPositionAndCleanOldLogs(ReplicationSourceInterface,
+   * WALEntryBatch)} whem implementation does not need to persist state to backing storage.
+   * @param entryBatch the wal entry batch we just shipped
+   * @return The instance of queueStorage used by this ReplicationSource.
+   */
+  default void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
+    getSourceManager().logPositionAndCleanOldLogs(this, entryBatch);
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 0940b5a..8527f96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -27,6 +27,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -40,6 +41,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -49,6 +51,7 @@
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationListener;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
@@ -62,13 +65,15 @@
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
 import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -126,7 +131,15 @@
   private final ConcurrentMap<String, ReplicationSourceInterface> sources;
   // List of all the sources we got from died RSs
   private final List<ReplicationSourceInterface> oldsources;
+
+  /**
+   * Storage for queues that need persistance; e.g. Replication state so can be recovered
+   * after a crash. queueStorage upkeep is spread about this class and passed
+   * to ReplicationSource instances for these to do updates themselves. Not all ReplicationSource
+   * instances keep state.
+   */
   private final ReplicationQueueStorage queueStorage;
+
   private final ReplicationTracker replicationTracker;
   private final ReplicationPeers replicationPeers;
   // UUID for this cluster
@@ -153,7 +166,7 @@
   private final Path logDir;
   // Path to the wal archive
   private final Path oldLogDir;
-  private final WALFileLengthProvider walFileLengthProvider;
+  private final WALFactory walFactory;
   // The number of ms that we wait before moving znodes, HBASE-3596
   private final long sleepBeforeFailover;
   // Homemade executer service for replication
@@ -174,21 +187,29 @@
   private final MetricsReplicationGlobalSourceSource globalMetrics;
 
   /**
+   * A special ReplicationSource for hbase:meta Region Read Replicas.
+   * Usually this reference remains empty. If an hbase:meta Region is opened on this server, we
+   * will create an instance of a hbase:meta CatalogReplicationSource and it will live the life of
+   * the Server thereafter; i.e. we will not shut it down even if the hbase:meta moves away from
+   * this server (in case it later gets moved back). We synchronize on this instance testing for
+   * presence and if absent, while creating so only created and started once.
+   */
+  @VisibleForTesting
+  AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>();
+
+  /**
    * Creates a replication manager and sets the watch on all the other registered region servers
    * @param queueStorage the interface for manipulating replication queues
-   * @param replicationPeers
-   * @param replicationTracker
    * @param conf the configuration to use
    * @param server the server for this region server
    * @param fs the file system to use
    * @param logDir the directory that contains all wal directories of live RSs
    * @param oldLogDir the directory where old logs are archived
-   * @param clusterId
    */
   public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
       ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
       Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
-      WALFileLengthProvider walFileLengthProvider,
+      WALFactory walFactory,
       SyncReplicationPeerMappingManager syncReplicationPeerMappingManager,
       MetricsReplicationGlobalSourceSource globalMetrics) throws IOException {
     this.sources = new ConcurrentHashMap<>();
@@ -206,7 +227,7 @@
     // 30 seconds
     this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000);
     this.clusterId = clusterId;
-    this.walFileLengthProvider = walFileLengthProvider;
+    this.walFactory = walFactory;
     this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager;
     this.replicationTracker.registerListener(this);
     // It's preferable to failover 1 RS at a time, but with good zk servers
@@ -346,18 +367,21 @@
   }
 
   /**
-   * Factory method to create a replication source
-   * @param queueId the id of the replication queue
-   * @return the created source
+   * @return a new 'classic' user-space replication source.
+   * @param queueId the id of the replication queue to associate the ReplicationSource with.
+   * @see #createCatalogReplicationSource() for creating a ReplicationSource for hbase:meta.
    */
   private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
       throws IOException {
     ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
-
-    MetricsSource metrics = new MetricsSource(queueId);
-    // init replication source
+    // Init the just created replication source. Pass the default walProvider's wal file length
+    // provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica
+    // replication, see #createCatalogReplicationSource().
+    WALFileLengthProvider walFileLengthProvider =
+      this.walFactory.getWALProvider() != null?
+        this.walFactory.getWALProvider().getWALFileLengthProvider() : p -> OptionalLong.empty();
     src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
-      walFileLengthProvider, metrics);
+      walFileLengthProvider, new MetricsSource(queueId));
     return src;
   }
 
@@ -1154,4 +1178,60 @@
   MetricsReplicationGlobalSourceSource getGlobalMetrics() {
     return this.globalMetrics;
   }
+
+  /**
+   * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region.
+   * @see #removeCatalogReplicationSource()
+   */
+  public ReplicationSourceInterface addCatalogReplicationSource() throws IOException {
+    // Open/Create the hbase:meta ReplicationSource once only.
+    synchronized (this.catalogReplicationSource) {
+      ReplicationSourceInterface rs = this.catalogReplicationSource.get();
+      return rs != null ? rs :
+        this.catalogReplicationSource.getAndSet(createCatalogReplicationSource());
+    }
+  }
+
+  /**
+   * Remove the hbase:meta Catalog replication source.
+   * Called when we close hbase:meta.
+   * @see #addCatalogReplicationSource()
+   */
+  public void removeCatalogReplicationSource() {
+    // Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region
+    // comes back to this server.
+  }
+
+  /**
+   * Create, initialize, and start the Catalog ReplicationSource.
+   */
+  private ReplicationSourceInterface createCatalogReplicationSource() throws IOException {
+    // Has the hbase:meta WALProvider been instantiated?
+    WALProvider walProvider = this.walFactory.getMetaWALProvider();
+    boolean addListener = false;
+    if (walProvider == null) {
+      // The meta walProvider has not been instantiated. Create it.
+      walProvider = this.walFactory.getMetaProvider();
+      addListener = true;
+    }
+    CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
+      this.clusterId.toString(), "meta_" + ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
+    final ReplicationSourceInterface crs = new CatalogReplicationSource();
+    crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
+      clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
+    if (addListener) {
+      walProvider.addWALActionsListener(new WALActionsListener() {
+        @Override
+        public void postLogRoll(Path oldPath, Path newPath) throws IOException {
+          crs.enqueueLog(newPath);
+        }
+      });
+    } else {
+      // This is a problem. We'll have a ReplicationSource but no listener on hbase:meta WALs
+      // so nothing will be replicated.
+      LOG.error("Did not install WALActionsListener creating CatalogReplicationSource!");
+    }
+    // Start this ReplicationSource.
+    return crs.startup();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 45eb91c..d3af995 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -19,7 +19,6 @@
 
 import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
 import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -35,7 +34,6 @@
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
 
@@ -271,7 +269,7 @@
     // position and the file will be removed soon in cleanOldLogs.
     if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
       batch.getLastWalPosition() != currentPosition) {
-      source.getSourceManager().logPositionAndCleanOldLogs(source, batch);
+      source.logPositionAndCleanOldLogs(batch);
       updated = true;
     }
     // if end of file is true, then we can just skip to the next file in queue.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
index 6b3c34a..fdc1e54 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
@@ -37,7 +37,6 @@
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
@@ -140,7 +139,7 @@
 
   public SerialReplicationChecker(Configuration conf, ReplicationSource source) {
     this.peerId = source.getPeerId();
-    this.storage = source.getQueueStorage();
+    this.storage = source.getReplicationQueueStorage();
     this.conn = source.getServer().getConnection();
     this.waitTimeMs =
       conf.getLong(REPLICATION_SERIALLY_WAITING_KEY, REPLICATION_SERIALLY_WAITING_DEFAULT);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
index 010fa69..c60faa9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
@@ -25,6 +25,9 @@
 /**
  * Used by replication to prevent replicating unacked log entries. See
  * https://issues.apache.org/jira/browse/HBASE-14004 for more details.
+ * WALFileLengthProvider exists because we do not want to reference WALFactory and WALProvider
+ * directly in the replication code so in the future it will be easier to decouple them.
+ * Each walProvider will have its own implementation.
  */
 @InterfaceAudience.Private
 @FunctionalInterface
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index fbd8d30..5583a47 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,6 +22,8 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -57,7 +59,15 @@
   public static final String REGION_REPLICA_REPLICATION_CONF_KEY
     = "hbase.region.replica.replication.enabled";
   private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false;
-  private static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
+  public static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
+
+  /**
+   * Same as for {@link #REGION_REPLICA_REPLICATION_CONF_KEY} but for catalog replication.
+   */
+  public static final String REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY
+    = "hbase.region.replica.replication.catalog.enabled";
+  private static final boolean DEFAULT_REGION_REPLICA_REPLICATION_CATALOG = false;
+
 
   /**
    * Enables or disables refreshing store files of secondary region replicas when the memory is
@@ -116,7 +126,6 @@
    * files of the primary region, so an HFileLink is used to construct the StoreFileInfo. This
    * way ensures that the secondary will be able to continue reading the store files even if
    * they are moved to archive after compaction
-   * @throws IOException
    */
   public static StoreFileInfo getStoreFileInfo(Configuration conf, FileSystem fs,
       RegionInfo regionInfo, RegionInfo regionInfoForFs, String familyName, Path path)
@@ -153,8 +162,7 @@
   }
 
   /**
-   * Create replication peer for replicating to region replicas if needed.
-   * <p/>
+   * Create replication peer for replicating user-space Region Read Replicas.
    * This methods should only be called at master side.
    */
   public static void setupRegionReplicaReplication(MasterServices services)
@@ -174,16 +182,42 @@
     services.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, true);
   }
 
-  public static boolean isRegionReplicaReplicationEnabled(Configuration conf) {
-    return conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY,
-      DEFAULT_REGION_REPLICA_REPLICATION);
+  /**
+   * @return True if Region Read Replica is enabled for <code>tn</code> (whether hbase:meta or
+   *   user-space tables).
+   */
+  public static boolean isRegionReplicaReplicationEnabled(Configuration conf, TableName tn) {
+    return isMetaRegionReplicaReplicationEnabled(conf, tn) ||
+      isRegionReplicaReplicationEnabled(conf);
   }
 
+  /**
+   * @return True if Region Read Replica is enabled for user-space tables.
+   */
+  private static boolean isRegionReplicaReplicationEnabled(Configuration conf) {
+    return conf.getBoolean(REGION_REPLICA_REPLICATION_CONF_KEY, DEFAULT_REGION_REPLICA_REPLICATION);
+  }
+
+  /**
+   * @return True if hbase:meta Region Read Replica is enabled.
+   */
+  public static boolean isMetaRegionReplicaReplicationEnabled(Configuration conf, TableName tn) {
+    return TableName.isMetaTableName(tn) &&
+      conf.getBoolean(REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY,
+        DEFAULT_REGION_REPLICA_REPLICATION_CATALOG);
+  }
+
+  /**
+   * @return True if wait for primary to flush is enabled for user-space tables.
+   */
   public static boolean isRegionReplicaWaitForPrimaryFlushEnabled(Configuration conf) {
     return conf.getBoolean(REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY,
       DEFAULT_REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH);
   }
 
+  /**
+   * @return True if we are to refresh user-space hfiles in Region Read Replicas.
+   */
   public static boolean isRegionReplicaStoreFileRefreshEnabled(Configuration conf) {
     return conf.getBoolean(REGION_REPLICA_STORE_FILE_REFRESH,
       DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH);
@@ -194,11 +228,4 @@
       DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER);
   }
 
-  /**
-   * Return the peer id used for replicating to secondary region replicas
-   */
-  public static String getReplicationPeerId() {
-    return REGION_REPLICA_REPLICATION_PEER;
-  }
-
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index 6a5feb0..b3663f6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -67,7 +67,7 @@
   /**
    * Maps between configuration names for providers and implementation classes.
    */
-  static enum Providers {
+  enum Providers {
     defaultProvider(AsyncFSWALProvider.class),
     filesystem(FSHLogProvider.class),
     multiwal(RegionGroupingProvider.class),
@@ -260,8 +260,12 @@
     return provider.getWALs();
   }
 
-  @VisibleForTesting
-  WALProvider getMetaProvider() throws IOException {
+  /**
+   * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of
+   * creating the first hbase:meta WAL so we can register a listener.
+   * @see #getMetaWALProvider()
+   */
+  public WALProvider getMetaProvider() throws IOException {
     for (;;) {
       WALProvider provider = this.metaProvider.get();
       if (provider != null) {
@@ -312,7 +316,6 @@
    * to reopen it multiple times, use {@link WAL.Reader#reset()} instead of this method
    * then just seek back to the last known good position.
    * @return A WAL reader.  Close when done with it.
-   * @throws IOException
    */
   public Reader createReader(final FileSystem fs, final Path path,
       CancelableProgressable reporter) throws IOException {
@@ -491,6 +494,10 @@
     return this.provider;
   }
 
+  /**
+   * @return Current metaProvider... may be null if not yet initialized.
+   * @see #getMetaProvider()
+   */
   public final WALProvider getMetaWALProvider() {
     return this.metaProvider.get();
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index a361c44..cab01d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -71,8 +71,9 @@
   }
 
   @Override
-  public void startup() {
+  public ReplicationSourceInterface startup() {
     startup.set(true);
+    return this;
   }
 
   public boolean isStartup() {
@@ -163,6 +164,11 @@
   }
 
   @Override
+  public ReplicationQueueStorage getReplicationQueueStorage() {
+    return null;
+  }
+
+  @Override
   public ReplicationPeer getPeer() {
     return replicationPeer;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
new file mode 100644
index 0000000..3bf0b9a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ClientMetaTableAccessor;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests RegionReplicaReplicationEndpoint class for hbase:meta by setting up region replicas and
+ * verifying async wal replication replays the edits to the secondary region in various scenarios.
+ * @see TestRegionReplicaReplicationEndpoint
+ */
+@Category({LargeTests.class})
+public class TestMetaRegionReplicaReplicationEndpoint {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class);
+  private static final int NB_SERVERS = 3;
+  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+
+  @Rule
+  public TestName name = new TestName();
+
+  @Before
+  public void before() throws Exception {
+    Configuration conf = HTU.getConfiguration();
+    conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
+    conf.setInt("replication.source.size.capacity", 10240);
+    conf.setLong("replication.source.sleepforretries", 100);
+    conf.setInt("hbase.regionserver.maxlogs", 10);
+    conf.setLong("hbase.master.logcleaner.ttl", 10);
+    conf.setInt("zookeeper.recovery.retry", 1);
+    conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+    conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+    conf.setInt("replication.stats.thread.period.seconds", 5);
+    conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+    conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
+    // Enable hbase:meta replication.
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
+    // Set hbase:meta replicas to be 3.
+    conf.setInt(HConstants.META_REPLICAS_NUM, NB_SERVERS);
+    HTU.startMiniCluster(NB_SERVERS);
+    HTU.waitFor(30000,
+      () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size() >= NB_SERVERS);
+  }
+
+  @After
+  public void after() throws Exception {
+    HTU.shutdownMiniCluster();
+  }
+
+  /**
+   * Assert that the ReplicationSource for hbase:meta gets created when hbase:meta is opened.
+   */
+  @Test
+  public void testHBaseMetaReplicationSourceCreatedOnOpen()
+    throws IOException, InterruptedException {
+    MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();
+    HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
+    assertTrue(isMetaRegionReplicaReplicationSource(hrs));
+    // Now move the hbase:meta and make sure the ReplicationSoruce is in both places.
+    HRegionServer hrsOther = null;
+    for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
+      hrsOther = cluster.getRegionServer(i);
+      if (hrsOther.getServerName().equals(hrs.getServerName())) {
+        hrsOther = null;
+        continue;
+      }
+      break;
+    }
+    assertNotNull(hrsOther);
+    assertFalse(isMetaRegionReplicaReplicationSource(hrsOther));
+    Region meta = null;
+    for (Region region: hrs.getOnlineRegionsLocalContext()) {
+      if (region.getRegionInfo().isMetaRegion()) {
+        meta = region;
+        break;
+      }
+    }
+    assertNotNull(meta);
+    HTU.moveRegionAndWait(meta.getRegionInfo(), hrsOther.getServerName());
+    // Assert that there is a ReplicationSource in both places now.
+    assertTrue(isMetaRegionReplicaReplicationSource(hrs));
+    assertTrue(isMetaRegionReplicaReplicationSource(hrsOther));
+  }
+
+  /**
+   * @return Whether the special meta region replica peer is enabled on <code>hrs</code>
+   */
+  private boolean isMetaRegionReplicaReplicationSource(HRegionServer hrs) {
+    return hrs.getReplicationSourceService().getReplicationManager().
+      catalogReplicationSource.get() != null;
+  }
+
+  /**
+   * Test meta region replica replication. Create some tables and see if replicas pick up the
+   * additions.
+   */
+  @Test
+  public void testHBaseMetaReplicates() throws Exception {
+    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"),
+      HConstants.CATALOG_FAMILY,
+        Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length)))  {
+      verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
+    }
+    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"),
+      HConstants.CATALOG_FAMILY,
+      Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length)))  {
+      verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
+      // Try delete.
+      HTU.deleteTableIfAny(table.getName());
+      verifyDeletedReplication(TableName.META_TABLE_NAME, NB_SERVERS, table.getName());
+    }
+  }
+
+  /**
+   * Replicas come online after primary.
+   */
+  private void waitForMetaReplicasToOnline() throws IOException {
+    final RegionLocator regionLocator =
+      HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME);
+    HTU.waitFor(10000,
+      // getRegionLocations returns an entry for each replica but if unassigned, entry is null.
+      // Pass reload to force us to skip cache else it just keeps returning default.
+      () -> regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW, true).stream().
+        filter(Objects::nonNull).count() >= NB_SERVERS);
+    List<HRegionLocation> locations = regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW);
+    LOG.info("Found locations {}", locations);
+    assertEquals(NB_SERVERS, locations.size());
+  }
+
+  /**
+   * Scan hbase:meta for <code>tableName</code> content.
+   */
+  private List<Result> getMetaCells(TableName tableName) throws IOException {
+    final List<Result> results = new ArrayList<>();
+    ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
+      @Override public boolean visit(Result r) throws IOException {
+        results.add(r);
+        return true;
+      }
+    };
+    MetaTableAccessor.scanMetaForTableRegions(HTU.getConnection(), visitor, tableName);
+    return results;
+  }
+
+  /**
+   * @return All Regions for tableName including Replicas.
+   */
+  private Region [] getAllRegions(TableName tableName, int replication) {
+    final Region[] regions = new Region[replication];
+    for (int i = 0; i < NB_SERVERS; i++) {
+      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+      List<HRegion> onlineRegions = rs.getRegions(tableName);
+      for (HRegion region : onlineRegions) {
+        regions[region.getRegionInfo().getReplicaId()] = region;
+      }
+    }
+    for (Region region : regions) {
+      assertNotNull(region);
+    }
+    return regions;
+  }
+
+  /**
+   * Verify when a Table is deleted from primary, then there are no references in replicas
+   * (because they get the delete of the table rows too).
+   */
+  private void verifyDeletedReplication(TableName tableName, int regionReplication,
+      final TableName deletedTableName) {
+    final Region[] regions = getAllRegions(tableName, regionReplication);
+
+    // Start count at '1' so we skip default, primary replica and only look at secondaries.
+    for (int i = 1; i < regionReplication; i++) {
+      final Region region = regions[i];
+      // wait until all the data is replicated to all secondary regions
+      Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          LOG.info("Verifying replication for region replica {}", region.getRegionInfo());
+          try (RegionScanner rs = region.getScanner(new Scan())) {
+            List<Cell> cells = new ArrayList<>();
+            while (rs.next(cells)) {
+              continue;
+            }
+            return doesNotContain(cells, deletedTableName);
+          } catch(Throwable ex) {
+            LOG.warn("Verification from secondary region is not complete yet", ex);
+            // still wait
+            return false;
+          }
+        }
+      });
+    }
+  }
+
+  /**
+   * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed
+   * by HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
+   * <code>cells</code>.
+   */
+  private boolean doesNotContain(List<Cell> cells, TableName tableName) {
+    for (Cell cell: cells) {
+      String row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+      if (row.startsWith(tableName.toString() + HConstants.DELIMITER)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Verify Replicas have results (exactly).
+   */
+  private void verifyReplication(TableName tableName, int regionReplication,
+      List<Result> contains) {
+    final Region[] regions = getAllRegions(tableName, regionReplication);
+
+    // Start count at '1' so we skip default, primary replica and only look at secondaries.
+    for (int i = 1; i < regionReplication; i++) {
+      final Region region = regions[i];
+      // wait until all the data is replicated to all secondary regions
+      Waiter.waitFor(HTU.getConfiguration(), 30000, 1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          LOG.info("Verifying replication for region replica {}", region.getRegionInfo());
+          try (RegionScanner rs = region.getScanner(new Scan())) {
+            List<Cell> cells = new ArrayList<>();
+            while (rs.next(cells)) {
+              continue;
+            }
+            return contains(contains, cells);
+          } catch(Throwable ex) {
+            LOG.warn("Verification from secondary region is not complete yet", ex);
+            // still wait
+            return false;
+          }
+        }
+      });
+    }
+  }
+
+  /**
+   * Presumes sorted Cells. Verify that <code>cells</code> has <code>contains</code> at least.
+   */
+  static boolean contains(List<Result> contains, List<Cell> cells) throws IOException {
+    CellScanner containsScanner = CellUtil.createCellScanner(contains);
+    CellScanner cellsScanner = CellUtil.createCellScanner(cells);
+    int matches = 0;
+    int count = 0;
+    while (containsScanner.advance()) {
+      while (cellsScanner.advance()) {
+        count++;
+        LOG.info("{} {}", containsScanner.current(), cellsScanner.current());
+        if (containsScanner.current().equals(cellsScanner.current())) {
+          matches++;
+          break;
+        }
+      }
+    }
+    return !containsScanner.advance() && matches >= 1 && count >= matches && count == cells.size();
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
index 56afcda..5456058 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -127,7 +127,7 @@
     // and replication started.
     try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
       Admin admin = connection.getAdmin()) {
-      String peerId = "region_replica_replication";
+      String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER;
 
       ReplicationPeerConfig peerConfig = null;
       try {
@@ -416,7 +416,7 @@
     HTU.getAdmin().createTable(htd);
 
     // both tables are created, now pause replication
-    HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
+    HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
 
     // now that the replication is disabled, write to the table to be dropped, then drop the table.
 
@@ -450,9 +450,9 @@
     MetricsSource metrics = mock(MetricsSource.class);
     ReplicationEndpoint.Context ctx =
       new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(),
-        HTU.getTestFileSystem(), ServerRegionReplicaUtil.getReplicationPeerId(),
+        HTU.getTestFileSystem(), ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER,
         UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers()
-          .getPeer(ServerRegionReplicaUtil.getReplicationPeerId()),
+          .getPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER),
         metrics, rs.getTableDescriptors(), rs);
     RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
     rrpe.init(ctx);
@@ -476,7 +476,7 @@
       HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
 
       // now enable the replication
-      HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
+      HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
 
       verifyReplication(tableName, regionReplication, 0, 1000);
     } finally {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
index 6be09c0..eb445f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationChecker.java
@@ -109,7 +109,7 @@
   public void setUp() throws IOException {
     ReplicationSource source = mock(ReplicationSource.class);
     when(source.getPeerId()).thenReturn(PEER_ID);
-    when(source.getQueueStorage()).thenReturn(QUEUE_STORAGE);
+    when(source.getReplicationQueueStorage()).thenReturn(QUEUE_STORAGE);
     conn = mock(Connection.class);
     when(conn.isClosed()).thenReturn(false);
     doAnswer(new Answer<Table>() {