HBASE-25126 Add load balance logic in hbase-client to distribute read load over meta replica regions

It adds load balance support for meta lookup in AsyncTableRegionLocator.
The existing meta replica mode is renamed as "HedgedRead", client sends scan request to the primary meta replica region first,
if response is not back within a configured amount of time, it will send scan requests to all meta replica regions and
take the first response. On top of the existing mode, a new mode "LoadBalance" is introduced. In this mode, client first
choose a meta replica region to send scan request. If the response is stale, it may send the request to another meta replica region or
the primary region. In this mode, meta scan requests are load balanced across all replica regions with the primary mode as
the ultimate source of truth.

Two new config knobs are added:

1. hbase.locator.meta.replicas.mode
   Valid options are "None", "HedgedRead" and "LoadBalance", they are case insensitive. The default mode is "None".

2. hbase.locator.meta.replicas.mode.loadbalance.selector
   The load balance alogrithm to select a meta replica to send the requests.
   Only org.apache.hadoop.hbase.client.CatalogReplicaLoadBalanceReplicaSimpleSelector.class
   is supported for now, which is the default as well. The algorithm works as follows:
      a. Clients select a randome meta replica region to send the requests if there is no entry for the location in the stale
         location cache.
      b. If the location from one meta replica region is stale, a stale entry will be created in the statle location cache
         for the region.
      c. Clients select the primary meta region if the location is in the stale location cache.
      d. The stale location cache entries time out in 3 seconds.

If there is no "hbase.locator.meta.replicas.mode" configured, it will check the config knob "hbase.meta.replicas.use".
If "hbase.meta.replicas.use" is configured, the mode will be set to "HedgedRead".
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 406af0d..fda262c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -116,7 +116,7 @@
   private final Optional<ServerStatisticTracker> stats;
   private final ClientBackoffPolicy backoffPolicy;
 
-  private ChoreService authService;
+  private ChoreService choreService;
 
   private final AtomicBoolean closed = new AtomicBoolean(false);
 
@@ -130,6 +130,7 @@
       SocketAddress localAddress, User user) {
     this.conf = conf;
     this.user = user;
+
     if (user.isLoginFromKeytab()) {
       spawnRenewalChore(user.getUGI());
     }
@@ -182,8 +183,19 @@
   }
 
   private void spawnRenewalChore(final UserGroupInformation user) {
-    authService = new ChoreService("Relogin service");
-    authService.scheduleChore(AuthUtil.getAuthRenewalChore(user));
+    ChoreService service = getChoreService();
+    service.scheduleChore(AuthUtil.getAuthRenewalChore(user));
+  }
+
+  /**
+   * If choreService has not been created yet, create the ChoreService.
+   * @return ChoreService
+   */
+  synchronized ChoreService getChoreService() {
+    if (choreService == null) {
+      choreService = new ChoreService("AsyncConn Chore Service");
+    }
+    return choreService;
   }
 
   @Override
@@ -208,8 +220,8 @@
     IOUtils.closeQuietly(clusterStatusListener);
     IOUtils.closeQuietly(rpcClient);
     IOUtils.closeQuietly(registry);
-    if (authService != null) {
-      authService.shutdown();
+    if (choreService != null) {
+      choreService.shutdown();
     }
     metrics.ifPresent(MetricsConnection::shutdown);
     ConnectionOverAsyncConnection c = this.conn;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index b202168..a9ee6a9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -30,6 +30,7 @@
 import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
 import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
+import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
 import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
 
@@ -46,6 +47,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.hadoop.hbase.CatalogFamilyFormat;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -89,7 +91,10 @@
 
   private final int locatePrefetchLimit;
 
-  private final boolean useMetaReplicas;
+  // The mode tells if HedgedRead, LoadBalance mode is supported.
+  // The default mode is CatalogReplicaMode.None.
+  private CatalogReplicaMode metaReplicaMode;
+  private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
 
   private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
 
@@ -196,8 +201,41 @@
       MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
     this.locatePrefetchLimit =
       conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
-    this.useMetaReplicas =
-      conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS);
+
+    // Get the region locator's meta replica mode.
+    this.metaReplicaMode = CatalogReplicaMode.fromString(conn.getConfiguration()
+      .get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
+
+    switch (this.metaReplicaMode) {
+      case LOAD_BALANCE:
+        String replicaSelectorClass = conn.getConfiguration().
+          get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
+          CatalogReplicaLoadBalanceSimpleSelector.class.getName());
+
+        this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory.createSelector(
+          replicaSelectorClass, META_TABLE_NAME, conn, () -> {
+            int numOfReplicas = 1;
+            try {
+              RegionLocations metaLocations = conn.registry.getMetaRegionLocations().get(
+                conn.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
+              numOfReplicas = metaLocations.size();
+            } catch (Exception e) {
+              LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
+            }
+            return numOfReplicas;
+          });
+        break;
+      case NONE:
+        // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
+        boolean useMetaReplicas = conn.getConfiguration().getBoolean(USE_META_REPLICAS,
+          DEFAULT_USE_META_REPLICAS);
+        if (useMetaReplicas) {
+          this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
+        }
+        break;
+      default:
+        // Doing nothing
+    }
   }
 
   private TableCache getTableCache(TableName tableName) {
@@ -433,9 +471,24 @@
     Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
       .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
       .setReadType(ReadType.PREAD);
-    if (useMetaReplicas) {
-      scan.setConsistency(Consistency.TIMELINE);
+
+    switch (this.metaReplicaMode) {
+      case LOAD_BALANCE:
+        int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType);
+        if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
+          // If the selector gives a non-primary meta replica region, then go with it.
+          // Otherwise, just go to primary in non-hedgedRead mode.
+          scan.setConsistency(Consistency.TIMELINE);
+          scan.setReplicaId(metaReplicaId);
+        }
+        break;
+      case HEDGED_READ:
+        scan.setConsistency(Consistency.TIMELINE);
+        break;
+      default:
+        // do nothing
     }
+
     conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
 
       private boolean completeNormally = false;
@@ -577,6 +630,13 @@
       if (!canUpdateOnError(loc, oldLoc)) {
         return;
       }
+      // Tell metaReplicaSelector that the location is stale. It will create a stale entry
+      // with timestamp internally. Next time the client looks up the same location,
+      // it will pick a different meta replica region.
+      if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
+        metaReplicaSelector.onError(loc);
+      }
+
       RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
       if (newLocs == null) {
         if (tableCache.cache.remove(startKey, oldLocs)) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelector.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelector.java
new file mode 100644
index 0000000..f9572b3
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelector.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A Catalog replica selector decides which catalog replica to go for read requests when it is
+ * configured as CatalogReplicaMode.LoadBalance.
+ */
+@InterfaceAudience.Private
+interface CatalogReplicaLoadBalanceSelector {
+
+  /**
+   * This method is called when input location is stale, i.e, when clients run into
+   * org.apache.hadoop.hbase.NotServingRegionException.
+   * @param loc stale location
+   */
+  void onError(HRegionLocation loc);
+
+  /**
+   * Select a catalog replica region where client go to loop up the input row key.
+   *
+   * @param tablename table name
+   * @param row  key to look up
+   * @param locateType  locate type
+   * @return replica id
+   */
+  int select(TableName tablename, byte[] row, RegionLocateType locateType);
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelectorFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelectorFactory.java
new file mode 100644
index 0000000..1570e7f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelectorFactory.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.function.IntSupplier;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Factory to create a {@link CatalogReplicaLoadBalanceSelector}
+ */
+@InterfaceAudience.Private
+final class CatalogReplicaLoadBalanceSelectorFactory {
+  /**
+   * Private Constructor
+   */
+  private CatalogReplicaLoadBalanceSelectorFactory() {
+  }
+
+  /**
+   * Create a CatalogReplicaLoadBalanceReplicaSelector based on input config.
+   * @param replicaSelectorClass  Selector classname.
+   * @param tableName  System table name.
+   * @param conn {@link AsyncConnectionImpl}
+   * @return  {@link CatalogReplicaLoadBalanceSelector}
+   */
+  public static CatalogReplicaLoadBalanceSelector createSelector(String replicaSelectorClass,
+    TableName tableName, AsyncConnectionImpl conn, IntSupplier getReplicaCount) {
+    return ReflectionUtils.instantiateWithCustomCtor(replicaSelectorClass,
+      new Class[] { TableName.class, AsyncConnectionImpl.class, IntSupplier.class },
+      new Object[] { tableName, conn, getReplicaCount });
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSimpleSelector.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSimpleSelector.java
new file mode 100644
index 0000000..ccd7412
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSimpleSelector.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.IntSupplier;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+/**
+ * <p>CatalogReplicaLoadBalanceReplicaSimpleSelector implements a simple catalog replica load
+ * balancing algorithm. It maintains a stale location cache for each table. Whenever client looks
+ * up location, it first check if the row is the stale location cache. If yes, the location from
+ * catalog replica is stale, it will go to the primary region to look up update-to-date location;
+ * otherwise, it will randomly pick up a replica region for lookup. When clients receive
+ * RegionNotServedException from region servers, it will add these region locations to the stale
+ * location cache. The stale cache will be cleaned up periodically by a chore.</p>
+ *
+ * It follows a simple algorithm to choose a replica to go:
+ *
+ * <ol>
+ *  <li>If there is no stale location entry for rows it looks up, it will randomly
+ *     pick a replica region to do lookup. </li>
+ *  <li>If the location from the replica region is stale, client gets RegionNotServedException
+ *     from region server, in this case, it will create StaleLocationCacheEntry in
+ *     CatalogReplicaLoadBalanceReplicaSimpleSelector.</li>
+ *  <li>When client tries to do location lookup, it checks StaleLocationCache first for rows it
+ *     tries to lookup, if entry exists, it will go with primary meta region to do lookup;
+ *     otherwise, it will follow step 1.</li>
+ *  <li>A chore will periodically run to clean up cache entries in the StaleLocationCache.</li>
+ * </ol>
+ */
+class CatalogReplicaLoadBalanceSimpleSelector implements
+  CatalogReplicaLoadBalanceSelector, Stoppable {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(CatalogReplicaLoadBalanceSimpleSelector.class);
+  private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds
+  private final int STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS = 1500; // 1.5 seconds
+  private final int REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS = 60000; // 1 minute
+
+  /**
+   * StaleLocationCacheEntry is the entry when a stale location is reported by an client.
+   */
+  private static final class StaleLocationCacheEntry {
+    // timestamp in milliseconds
+    private final long timestamp;
+
+    private final byte[] endKey;
+
+    StaleLocationCacheEntry(final byte[] endKey) {
+      this.endKey = endKey;
+      timestamp = EnvironmentEdgeManager.currentTime();
+    }
+
+    public byte[] getEndKey() {
+      return this.endKey;
+    }
+
+    public long getTimestamp() {
+      return this.timestamp;
+    }
+
+    @Override
+    public String toString() {
+      return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+        .append("endKey", endKey)
+        .append("timestamp", timestamp)
+        .toString();
+    }
+  }
+
+  private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], StaleLocationCacheEntry>>
+    staleCache = new ConcurrentHashMap<>();
+  private volatile int numOfReplicas;
+  private final AsyncConnectionImpl conn;
+  private final TableName tableName;
+  private final IntSupplier getNumOfReplicas;
+  private volatile boolean isStopped = false;
+  private final static int UNINITIALIZED_NUM_OF_REPLICAS = -1;
+
+  CatalogReplicaLoadBalanceSimpleSelector(TableName tableName, AsyncConnectionImpl conn,
+    IntSupplier getNumOfReplicas) {
+    this.conn = conn;
+    this.tableName = tableName;
+    this.getNumOfReplicas = getNumOfReplicas;
+
+    // This numOfReplicas is going to be lazy initialized.
+    this.numOfReplicas = UNINITIALIZED_NUM_OF_REPLICAS;
+    // Start chores
+    this.conn.getChoreService().scheduleChore(getCacheCleanupChore(this));
+    this.conn.getChoreService().scheduleChore(getRefreshReplicaCountChore(this));
+  }
+
+  /**
+   * When a client runs into RegionNotServingException, it will call this method to
+   * update Selector's internal state.
+   * @param loc the location which causes exception.
+   */
+  public void onError(HRegionLocation loc) {
+    ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache =
+      computeIfAbsent(staleCache, loc.getRegion().getTable(),
+        () -> new ConcurrentSkipListMap<>(BYTES_COMPARATOR));
+    byte[] startKey = loc.getRegion().getStartKey();
+    tableCache.putIfAbsent(startKey,
+      new StaleLocationCacheEntry(loc.getRegion().getEndKey()));
+    LOG.debug("Add entry to stale cache for table {} with startKey {}, {}",
+      loc.getRegion().getTable(), startKey, loc.getRegion().getEndKey());
+  }
+
+  /**
+   * Select an random replica id. In case there is no replica region configured, return
+   * the primary replica id.
+   * @return Replica id
+   */
+  private int getRandomReplicaId() {
+    int cachedNumOfReplicas = this.numOfReplicas;
+    if (cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) {
+      cachedNumOfReplicas = refreshCatalogReplicaCount();
+      this.numOfReplicas = cachedNumOfReplicas;
+    }
+    // In case of no replica configured, return the primary region id.
+    if (cachedNumOfReplicas <= 1) {
+      return RegionInfo.DEFAULT_REPLICA_ID;
+    }
+    return 1 + ThreadLocalRandom.current().nextInt(cachedNumOfReplicas - 1);
+  }
+
+  /**
+   * When it looks up a location, it will call this method to find a replica region to go.
+   * For a normal case, > 99% of region locations from catalog/meta replica will be up to date.
+   * In extreme cases such as region server crashes, it will depends on how fast replication
+   * catches up.
+   *
+   * @param tablename table name it looks up
+   * @param row key it looks up.
+   * @param locateType locateType, Only BEFORE and CURRENT will be passed in.
+   * @return catalog replica id
+   */
+  public int select(final TableName tablename, final byte[] row,
+    final RegionLocateType locateType) {
+    Preconditions.checkArgument(locateType == RegionLocateType.BEFORE ||
+        locateType == RegionLocateType.CURRENT,
+      "Expected type BEFORE or CURRENT but got: %s", locateType);
+
+    ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = staleCache.get(tablename);
+
+    // If there is no entry in StaleCache, select a random replica id.
+    if (tableCache == null) {
+      return getRandomReplicaId();
+    }
+
+    Map.Entry<byte[], StaleLocationCacheEntry> entry;
+    boolean isEmptyStopRow = isEmptyStopRow(row);
+    // Only BEFORE and CURRENT are passed in.
+    if (locateType == RegionLocateType.BEFORE) {
+      entry = isEmptyStopRow ? tableCache.lastEntry() : tableCache.lowerEntry(row);
+    } else {
+      entry = tableCache.floorEntry(row);
+    }
+
+    // It is not in the stale cache, return a random replica id.
+    if (entry == null) {
+      return getRandomReplicaId();
+    }
+
+    // The entry here is a possible match for the location. Check if the entry times out first as
+    // long comparing is faster than comparing byte arrays(in most cases). It could remove
+    // stale entries faster. If the possible match entry does not time out, it will check if
+    // the entry is a match for the row passed in and select the replica id accordingly.
+    if ((EnvironmentEdgeManager.currentTime() - entry.getValue().getTimestamp()) >=
+      STALE_CACHE_TIMEOUT_IN_MILLISECONDS) {
+      LOG.debug("Entry for table {} with startKey {}, {} times out", tablename, entry.getKey(),
+        entry);
+      tableCache.remove(entry.getKey());
+      return getRandomReplicaId();
+    }
+
+    byte[] endKey =  entry.getValue().getEndKey();
+
+    // The following logic is borrowed from AsyncNonMetaRegionLocator.
+    if (isEmptyStopRow(endKey)) {
+      LOG.debug("Lookup {} goes to primary region", row);
+      return RegionInfo.DEFAULT_REPLICA_ID;
+    }
+
+    if (locateType == RegionLocateType.BEFORE) {
+      if (!isEmptyStopRow && Bytes.compareTo(endKey, row) >= 0) {
+        LOG.debug("Lookup {} goes to primary meta", row);
+        return RegionInfo.DEFAULT_REPLICA_ID;
+      }
+    } else {
+      if (Bytes.compareTo(row, endKey) < 0) {
+        LOG.debug("Lookup {} goes to primary meta", row);
+        return RegionInfo.DEFAULT_REPLICA_ID;
+      }
+    }
+
+    // Not in stale cache, return a random replica id.
+    return getRandomReplicaId();
+  }
+
+  // This class implements the Stoppable interface as chores needs a Stopable object, there is
+  // no-op on this Stoppable object currently.
+  @Override
+  public void stop(String why) {
+    isStopped = true;
+  }
+
+  @Override
+  public boolean isStopped() {
+    return isStopped;
+  }
+
+  private void cleanupReplicaReplicaStaleCache() {
+    long curTimeInMills = EnvironmentEdgeManager.currentTime();
+    for (ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache : staleCache.values()) {
+      Iterator<Map.Entry<byte[], StaleLocationCacheEntry>> it =
+        tableCache.entrySet().iterator();
+      while (it.hasNext()) {
+        Map.Entry<byte[], StaleLocationCacheEntry> entry = it.next();
+        if (curTimeInMills - entry.getValue().getTimestamp() >=
+          STALE_CACHE_TIMEOUT_IN_MILLISECONDS) {
+          LOG.debug("clean entry {}, {} from stale cache", entry.getKey(), entry.getValue());
+          it.remove();
+        }
+      }
+    }
+  }
+
+  private int refreshCatalogReplicaCount() {
+    int newNumOfReplicas = this.getNumOfReplicas.getAsInt();
+    LOG.debug("Refreshed replica count {}", newNumOfReplicas);
+    if (newNumOfReplicas == 1) {
+      LOG.warn("Table {}'s region replica count is 1, maybe a misconfiguration or failure to "
+        + "fetch the replica count", tableName);
+    }
+    int cachedNumOfReplicas = this.numOfReplicas;
+
+    // If the returned number of replicas is 1, it is mostly caused by failure to fetch the
+    // replica count. Do not update the numOfReplicas in this case.
+    if ((cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) ||
+      ((cachedNumOfReplicas != newNumOfReplicas) && (newNumOfReplicas != 1))) {
+      this.numOfReplicas = newNumOfReplicas;
+    }
+    return newNumOfReplicas;
+  }
+
+  private ScheduledChore getCacheCleanupChore(
+    final CatalogReplicaLoadBalanceSimpleSelector selector) {
+    return new ScheduledChore("CleanupCatalogReplicaStaleCache", this,
+      STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS) {
+      @Override
+      protected void chore() {
+        selector.cleanupReplicaReplicaStaleCache();
+      }
+    };
+  }
+
+  private ScheduledChore getRefreshReplicaCountChore(
+    final CatalogReplicaLoadBalanceSimpleSelector selector) {
+    return new ScheduledChore("RefreshReplicaCountChore", this,
+      REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS) {
+      @Override
+      protected void chore() {
+        selector.refreshCatalogReplicaCount();
+      }
+    };
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaMode.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaMode.java
new file mode 100644
index 0000000..0f126e1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaMode.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * <p>There are two modes with catalog replica support. </p>
+ *
+ * <ol>
+ *   <li>HEDGED_READ  - Client sends requests to the primary region first, within a
+ *                 configured amount of time, if there is no response coming back,
+ *                 client sends requests to all replica regions and takes the first
+ *                 response. </li>
+ *
+ *   <li>LOAD_BALANCE - Client sends requests to replica regions in a round-robin mode,
+ *                 if results from replica regions are stale, next time, client sends requests for
+ *                 these stale locations to the primary region. In this mode, scan
+ *                 requests are load balanced across all replica regions.</li>
+ * </ol>
+ */
+@InterfaceAudience.Private
+enum CatalogReplicaMode {
+  NONE {
+    @Override
+    public String toString() {
+      return "None";
+    }
+  },
+  HEDGED_READ {
+    @Override
+    public String toString() {
+      return "HedgedRead";
+    }
+  },
+  LOAD_BALANCE {
+    @Override
+    public String toString() {
+      return "LoadBalance";
+    }
+  };
+
+  public static CatalogReplicaMode fromString(final String value) {
+    for(CatalogReplicaMode mode : values()) {
+      if (mode.toString().equalsIgnoreCase(value)) {
+        return mode;
+      }
+    }
+    throw new IllegalArgumentException();
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java
index c7440c6..7ea6e4a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java
@@ -37,6 +37,18 @@
  */
 @InterfaceAudience.Public
 public interface RegionLocator extends Closeable {
+
+  /** Configuration for Region Locator's mode when meta replica is configured.
+   * Valid values are: HedgedRead, LoadBalance, None
+   */
+  String LOCATOR_META_REPLICAS_MODE = "hbase.locator.meta.replicas.mode";
+
+  /** Configuration for meta replica selector when Region Locator's LoadBalance mode is configured.
+   * The default value is org.apache.hadoop.hbase.client.CatalogReplicaLoadBalanceSimpleSelector.
+   */
+  String LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR =
+    "hbase.locator.meta.replicas.mode.loadbalance.selector";
+
   /**
    * Finds the region on which the given row is being served. Does not reload the cache.
    * @param row Row to find.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index 70a9280..f8eca6c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -182,7 +182,16 @@
   private void replicate(CompletableFuture<Long> future, RegionLocations locs,
       TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, List<Entry> entries) {
     if (locs.size() == 1) {
-      // Could this happen?
+      LOG.info("Only one location for {}.{}, refresh the location cache only for meta now",
+        tableDesc.getTableName(), Bytes.toString(encodedRegionName));
+
+      // This could happen to meta table. In case of meta table comes with no replica and
+      // later it is changed to multiple replicas. The cached location for meta may only has
+      // the primary region. In this case, it needs to clean up and refresh the cached meta
+      // locations.
+      if (tableDesc.isMergeEnabled()) {
+        connection.getRegionLocator(tableDesc.getTableName()).clearRegionLocationCache();
+      }
       future.complete(Long.valueOf(entries.size()));
       return;
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
index a2466a5..989fdbb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
@@ -142,6 +142,21 @@
       locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false)
         .getDefaultRegionLocation().getServerName());
     // should get the new location when reload = true
+    // when meta replica LoadBalance mode is enabled, it may delay a bit.
+    util.waitFor(3000, new ExplainingPredicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        ServerName sn = locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID,
+          true).getDefaultRegionLocation().getServerName();
+        return newServerName.equals(sn);
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "New location does not show up in meta (replica) region";
+      }
+    });
+
     assertEquals(newServerName,
       locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, true)
         .getDefaultRegionLocation().getServerName());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index d8388de..b147d91 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -29,12 +29,14 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.IntStream;
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -49,41 +51,63 @@
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Category({ MediumTests.class, ClientTests.class })
+@RunWith(Parameterized.class)
 public class TestAsyncNonMetaRegionLocator {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
 
+  private static final Logger LOG = LoggerFactory.getLogger(TestAsyncNonMetaRegionLocator.class);
+
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
   private static TableName TABLE_NAME = TableName.valueOf("async");
 
   private static byte[] FAMILY = Bytes.toBytes("cf");
+  private static final int META_STOREFILE_REFRESH_PERIOD = 100;
+  private static final int NB_SERVERS = 4;
+  private static int numOfMetaReplica = NB_SERVERS - 1;
 
   private static AsyncConnectionImpl CONN;
 
   private static AsyncNonMetaRegionLocator LOCATOR;
+  private static ConnectionRegistry registry;
 
   private static byte[][] SPLIT_KEYS;
+  private CatalogReplicaMode metaReplicaMode;
 
   @BeforeClass
   public static void setUp() throws Exception {
-    TEST_UTIL.startMiniCluster(3);
-    TEST_UTIL.getAdmin().balancerSwitch(false, true);
-    ConnectionRegistry registry =
-        ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
-    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-      registry.getClusterId().get(), null, User.getCurrent());
-    LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+    Configuration conf = TEST_UTIL.getConfiguration();
+    // Enable hbase:meta replication.
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
+    conf.setLong("replication.source.sleepforretries", 10);    // 10 ms
+
+    TEST_UTIL.startMiniCluster(NB_SERVERS);
+    Admin admin = TEST_UTIL.getAdmin();
+    admin.balancerSwitch(false, true);
+
+    // Enable hbase:meta replication.
+    HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, numOfMetaReplica);
+    TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getMiniHBaseCluster().getRegions(
+      TableName.META_TABLE_NAME).size() >= numOfMetaReplica);
+
+    registry = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
+
     SPLIT_KEYS = new byte[8][];
     for (int i = 111; i < 999; i += 111) {
       SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
@@ -108,6 +132,27 @@
     LOCATOR.clearCache(TABLE_NAME);
   }
 
+  @Parameterized.Parameters
+  public static Collection<Object[]> parameters() {
+    return Arrays.asList(new Object[][] {
+      { null },
+      { CatalogReplicaMode.LOAD_BALANCE.toString() }
+    });
+  }
+
+  public TestAsyncNonMetaRegionLocator(String clientMetaReplicaMode) throws Exception {
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    // Enable meta replica LoadBalance mode for this connection.
+    if (clientMetaReplicaMode != null) {
+      c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, clientMetaReplicaMode);
+      metaReplicaMode = CatalogReplicaMode.fromString(clientMetaReplicaMode);
+    }
+
+    CONN = new AsyncConnectionImpl(c, registry,
+      registry.getClusterId().get(), null, User.getCurrent());
+    LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+  }
+
   private void createSingleRegionTable() throws IOException, InterruptedException {
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
@@ -347,8 +392,21 @@
         getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
     }
     // should get the new location when reload = true
-    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
-      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get());
+    // when meta replica LoadBalance mode is enabled, it may delay a bit.
+    TEST_UTIL.waitFor(3000, new ExplainingPredicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        HRegionLocation loc = getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW,
+          RegionLocateType.CURRENT, true).get();
+        return newServerName.equals(loc.getServerName());
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "New location does not show up in meta (replica) region";
+      }
+    });
+
     // the cached location should be replaced
     for (RegionLocateType locateType : RegionLocateType.values()) {
       assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
index 694b7a1..c68a7f1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -39,6 +41,10 @@
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -63,17 +69,20 @@
 /**
  * Tests RegionReplicaReplicationEndpoint class for hbase:meta by setting up region replicas and
  * verifying async wal replication replays the edits to the secondary region in various scenarios.
+ *
  * @see TestRegionReplicaReplicationEndpoint
  */
 @Category({LargeTests.class})
 public class TestMetaRegionReplicaReplicationEndpoint {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class);
+    HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class);
   private static final Logger LOG =
-      LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class);
-  private static final int NB_SERVERS = 3;
-  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+    LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class);
+  private static final int NB_SERVERS = 4;
+  private final HBaseTestingUtility HTU = new HBaseTestingUtility();
+  private int numOfMetaReplica = NB_SERVERS - 1;
+  private static byte[] VALUE = Bytes.toBytes("value");
 
   @Rule
   public TestName name = new TestName();
@@ -93,12 +102,17 @@
     conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
     conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
     // Enable hbase:meta replication.
-    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY,
+      true);
     // Set hbase:meta replicas to be 3.
-    conf.setInt(HConstants.META_REPLICAS_NUM, NB_SERVERS);
+    // conf.setInt(HConstants.META_REPLICAS_NUM, numOfMetaReplica);
     HTU.startMiniCluster(NB_SERVERS);
+    // Enable hbase:meta replication.
+    HBaseTestingUtility.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, numOfMetaReplica);
+
     HTU.waitFor(30000,
-      () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size() >= NB_SERVERS);
+      () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size()
+      >= numOfMetaReplica);
   }
 
   @After
@@ -129,7 +143,7 @@
     assertNotNull(hrsOther);
     assertFalse(isMetaRegionReplicaReplicationSource(hrsOther));
     Region meta = null;
-    for (Region region: hrs.getOnlineRegionsLocalContext()) {
+    for (Region region : hrs.getOnlineRegionsLocalContext()) {
       if (region.getRegionInfo().isMetaRegion()) {
         meta = region;
         break;
@@ -157,8 +171,8 @@
   private void testHBaseMetaReplicatesOneRow(int i) throws Exception {
     waitForMetaReplicasToOnline();
     try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_" + i),
-        HConstants.CATALOG_FAMILY)) {
-      verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
+      HConstants.CATALOG_FAMILY)) {
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
     }
   }
 
@@ -176,18 +190,136 @@
    */
   @Test
   public void testHBaseMetaReplicates() throws Exception {
-    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"),
-      HConstants.CATALOG_FAMILY,
-        Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length)))  {
-      verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
+    try (Table table = HTU
+      .createTable(TableName.valueOf(this.name.getMethodName() + "_0"), HConstants.CATALOG_FAMILY,
+        Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) {
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
     }
-    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"),
-      HConstants.CATALOG_FAMILY,
-      Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length)))  {
-      verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
+    try (Table table = HTU
+      .createTable(TableName.valueOf(this.name.getMethodName() + "_1"), HConstants.CATALOG_FAMILY,
+        Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) {
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
       // Try delete.
       HTU.deleteTableIfAny(table.getName());
-      verifyDeletedReplication(TableName.META_TABLE_NAME, NB_SERVERS, table.getName());
+      verifyDeletedReplication(TableName.META_TABLE_NAME, numOfMetaReplica, table.getName());
+    }
+  }
+
+  @Test
+  public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Exception {
+    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+    TableName tableName = TableName.valueOf("hbase:meta");
+    Table table = connection.getTable(tableName);
+    try {
+      // load the data to the table
+      for (int i = 0; i < 5; i++) {
+        LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
+        HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
+        LOG.info("flushing table");
+        HTU.flush(tableName);
+        LOG.info("compacting table");
+        if (i < 4) {
+          HTU.compact(tableName, false);
+        }
+      }
+
+      verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
+    } finally {
+      table.close();
+      connection.close();
+    }
+  }
+
+  @Test
+  public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception {
+    MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();
+    HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
+
+    HRegionServer hrsMetaReplica = null;
+    HRegionServer hrsNoMetaReplica = null;
+    HRegionServer server = null;
+    Region metaReplica = null;
+    boolean hostingMeta;
+
+    for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
+      server = cluster.getRegionServer(i);
+      hostingMeta = false;
+      if (server == hrs) {
+        continue;
+      }
+      for (Region region : server.getOnlineRegionsLocalContext()) {
+        if (region.getRegionInfo().isMetaRegion()) {
+          if (metaReplica == null) {
+            metaReplica = region;
+          }
+          hostingMeta = true;
+          break;
+        }
+      }
+      if (!hostingMeta) {
+        hrsNoMetaReplica = server;
+      }
+    }
+
+    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+    TableName tableName = TableName.valueOf("hbase:meta");
+    Table table = connection.getTable(tableName);
+    try {
+      // load the data to the table
+      for (int i = 0; i < 5; i++) {
+        LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
+        HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
+        if (i == 0) {
+          HTU.moveRegionAndWait(metaReplica.getRegionInfo(), hrsNoMetaReplica.getServerName());
+        }
+      }
+
+      verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
+    } finally {
+      table.close();
+      connection.close();
+    }
+  }
+
+  protected void verifyReplication(TableName tableName, int regionReplication, final int startRow,
+    final int endRow, final byte[] family) throws Exception {
+    verifyReplication(tableName, regionReplication, startRow, endRow, family, true);
+  }
+
+  private void verifyReplication(TableName tableName, int regionReplication, final int startRow,
+    final int endRow, final byte[] family, final boolean present) throws Exception {
+    // find the regions
+    final Region[] regions = new Region[regionReplication];
+
+    for (int i = 0; i < NB_SERVERS; i++) {
+      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+      List<HRegion> onlineRegions = rs.getRegions(tableName);
+      for (HRegion region : onlineRegions) {
+        regions[region.getRegionInfo().getReplicaId()] = region;
+      }
+    }
+
+    for (Region region : regions) {
+      assertNotNull(region);
+    }
+
+    for (int i = 1; i < regionReplication; i++) {
+      final Region region = regions[i];
+      // wait until all the data is replicated to all secondary regions
+      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
+          try {
+            HTU.verifyNumericRows(region, family, startRow, endRow, present);
+          } catch (Throwable ex) {
+            LOG.warn("Verification from secondary region is not complete yet", ex);
+            // still wait
+            return false;
+          }
+          return true;
+        }
+      });
     }
   }
 
@@ -201,10 +333,10 @@
       // getRegionLocations returns an entry for each replica but if unassigned, entry is null.
       // Pass reload to force us to skip cache else it just keeps returning default.
       () -> regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW, true).stream().
-        filter(Objects::nonNull).count() >= NB_SERVERS);
+        filter(Objects::nonNull).count() >= numOfMetaReplica);
     List<HRegionLocation> locations = regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW);
     LOG.info("Found locations {}", locations);
-    assertEquals(NB_SERVERS, locations.size());
+    assertEquals(numOfMetaReplica, locations.size());
   }
 
   /**
@@ -213,7 +345,8 @@
   private List<Result> getMetaCells(TableName tableName) throws IOException {
     final List<Result> results = new ArrayList<>();
     ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
-      @Override public boolean visit(Result r) throws IOException {
+      @Override
+      public boolean visit(Result r) throws IOException {
         results.add(r);
         return true;
       }
@@ -225,7 +358,7 @@
   /**
    * @return All Regions for tableName including Replicas.
    */
-  private Region [] getAllRegions(TableName tableName, int replication) {
+  private Region[] getAllRegions(TableName tableName, int replication) {
     final Region[] regions = new Region[replication];
     for (int i = 0; i < NB_SERVERS; i++) {
       HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
@@ -240,12 +373,23 @@
     return regions;
   }
 
+  private Region getOneRegion(TableName tableName) {
+    for (int i = 0; i < NB_SERVERS; i++) {
+      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+      List<HRegion> onlineRegions = rs.getRegions(tableName);
+      if (onlineRegions.size() > 1) {
+        return onlineRegions.get(0);
+      }
+    }
+    return null;
+  }
+
   /**
    * Verify when a Table is deleted from primary, then there are no references in replicas
    * (because they get the delete of the table rows too).
    */
   private void verifyDeletedReplication(TableName tableName, int regionReplication,
-      final TableName deletedTableName) {
+    final TableName deletedTableName) {
     final Region[] regions = getAllRegions(tableName, regionReplication);
 
     // Start count at '1' so we skip default, primary replica and only look at secondaries.
@@ -262,7 +406,7 @@
               continue;
             }
             return doesNotContain(cells, deletedTableName);
-          } catch(Throwable ex) {
+          } catch (Throwable ex) {
             LOG.warn("Verification from secondary region is not complete yet", ex);
             // still wait
             return false;
@@ -278,7 +422,7 @@
    * <code>cells</code>.
    */
   private boolean doesNotContain(List<Cell> cells, TableName tableName) {
-    for (Cell cell: cells) {
+    for (Cell cell : cells) {
       String row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
       if (row.startsWith(tableName.toString() + HConstants.DELIMITER)) {
         return false;
@@ -291,7 +435,7 @@
    * Verify Replicas have results (exactly).
    */
   private void verifyReplication(TableName tableName, int regionReplication,
-      List<Result> contains) {
+    List<Result> contains) {
     final Region[] regions = getAllRegions(tableName, regionReplication);
 
     // Start count at '1' so we skip default, primary replica and only look at secondaries.
@@ -308,7 +452,7 @@
               continue;
             }
             return contains(contains, cells);
-          } catch(Throwable ex) {
+          } catch (Throwable ex) {
             LOG.warn("Verification from secondary region is not complete yet", ex);
             // still wait
             return false;
@@ -338,4 +482,134 @@
     }
     return !containsScanner.advance() && matches >= 1 && count >= matches && count == cells.size();
   }
+
+  private void doNGets(final Table table, final byte[][] keys) throws Exception {
+    for (byte[] key : keys) {
+      Result r = table.get(new Get(key));
+      assertArrayEquals(VALUE, r.getValue(HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY));
+    }
+  }
+
+  private void primaryNoChangeReplicaIncrease(final long[] before, final long[] after) {
+    assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID],
+      after[RegionInfo.DEFAULT_REPLICA_ID]);
+
+    for (int i = 1; i < after.length; i ++) {
+      assertTrue(after[i] > before[i]);
+    }
+  }
+
+  private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) {
+    // There are read requests increase for primary meta replica.
+    assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] >
+      before[RegionInfo.DEFAULT_REPLICA_ID]);
+
+    // No change for replica regions
+    for (int i = 1; i < after.length; i ++) {
+      assertEquals(before[i], after[i]);
+    }
+  }
+
+  private void getMetaReplicaReadRequests(final Region[] metaRegions, final long[] counters) {
+    int i = 0;
+    for (Region r : metaRegions) {
+      LOG.info("read request for region {} is {}", r, r.getReadRequestsCount());
+      counters[i] = r.getReadRequestsCount();
+      i ++;
+    }
+  }
+
+  @Test
+  public void testHBaseMetaReplicaGets() throws Exception {
+
+    TableName tn = TableName.valueOf(this.name.getMethodName());
+    final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
+    long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
+    long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica];
+    long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica];
+    long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica];
+    long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica];
+    Region userRegion = null;
+    HRegionServer srcRs = null;
+    HRegionServer destRs = null;
+
+    try (Table table = HTU.createTable(tn, HConstants.CATALOG_FAMILY,
+      Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) {
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
+      // load different values
+      HTU.loadTable(table, new byte[][] { HConstants.CATALOG_FAMILY }, VALUE);
+      for (int i = 0; i < NB_SERVERS; i++) {
+        HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+        List<HRegion> onlineRegions = rs.getRegions(tn);
+        if (onlineRegions.size() > 0) {
+          userRegion = onlineRegions.get(0);
+          srcRs = rs;
+          if (i > 0) {
+            destRs = HTU.getMiniHBaseCluster().getRegionServer(0);
+          } else {
+            destRs = HTU.getMiniHBaseCluster().getRegionServer(1);
+          }
+        }
+      }
+
+      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicas);
+
+      Configuration c = new Configuration(HTU.getConfiguration());
+      c.setBoolean(HConstants.USE_META_REPLICAS, true);
+      c.set(LOCATOR_META_REPLICAS_MODE, "LoadBalance");
+      Connection connection = ConnectionFactory.createConnection(c);
+      Table tableForGet = connection.getTable(tn);
+      byte[][] getRows = new byte[HBaseTestingUtility.KEYS.length][];
+
+      int i = 0;
+      for (byte[] key : HBaseTestingUtility.KEYS) {
+        getRows[i] = key;
+        i++;
+      }
+      getRows[0] = Bytes.toBytes("aaa");
+      doNGets(tableForGet, getRows);
+
+      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGet);
+
+      // There is no read requests increase for primary meta replica.
+      // For rest of meta replicas, there are more reads against them.
+      primaryNoChangeReplicaIncrease(readReqsForMetaReplicas, readReqsForMetaReplicasAfterGet);
+
+      // move one of regions so it meta cache may be invalid.
+      HTU.moveRegionAndWait(userRegion.getRegionInfo(), destRs.getServerName());
+
+      doNGets(tableForGet, getRows);
+
+      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterMove);
+
+      // There are read requests increase for primary meta replica.
+      // For rest of meta replicas, there is no change as regionMove will tell the new location
+      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGet,
+        readReqsForMetaReplicasAfterMove);
+      // Move region again.
+      HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName());
+
+      // Wait until moveRegion cache timeout.
+      while (destRs.getMovedRegion(userRegion.getRegionInfo().getEncodedName()) != null) {
+        Thread.sleep(1000);
+      }
+
+      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterSecondMove);
+
+      // There are read requests increase for primary meta replica.
+      // For rest of meta replicas, there is no change.
+      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterMove,
+        readReqsForMetaReplicasAfterSecondMove);
+
+      doNGets(tableForGet, getRows);
+
+      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterThirdGet);
+
+      // Since it gets RegionNotServedException, it will go to primary for the next lookup.
+      // There are read requests increase for primary meta replica.
+      // For rest of meta replicas, there is no change.
+      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterSecondMove,
+        readReqsForMetaReplicasAfterThirdGet);
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 99f0ac6..796c0e3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -533,7 +533,6 @@
     try {
       rs.startup();
       assertTrue(rs.isSourceActive());
-      Waiter.waitFor(conf, 1000, () -> FaultyReplicationEndpoint.count > 0);
       Waiter.waitFor(conf, 1000, () -> rss.isAborted());
       assertTrue(rss.isAborted());
       Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive());