HBASE-25126 Add load balance logic in hbase-client to distribute read load over meta replica regions

It adds load balance support for meta lookup in AsyncTableRegionLocator.
The existing meta replica mode is renamed as "HedgedRead", client sends scan request to the primary meta replica region first,
if response is not back within a configured amount of time, it will send scan requests to all meta replica regions and
take the first response. On top of the existing mode, a new mode "LoadBalance" is introduced. In this mode, client first
choose a meta replica region to send scan request. If the response is stale, it may send the request to another meta replica region or
the primary region. In this mode, meta scan requests are load balanced across all replica regions with the primary mode as
the ultimate source of truth.

Two new config knobs are added:

1. hbase.locator.meta.replicas.mode
   Valid options are "None", "HedgedRead" and "LoadBalance", they are case insensitive. The default mode is "None".

2. hbase.locator.meta.replicas.mode.loadbalance.selector
   The load balance alogrithm to select a meta replica to send the requests.
   Only org.apache.hadoop.hbase.client.CatalogReplicaLoadBalanceReplicaSimpleSelector.class
   is supported for now, which is the default as well. The algorithm works as follows:
      a. Clients select a randome meta replica region to send the requests if there is no entry for the location in the stale
         location cache.
      b. If the location from one meta replica region is stale, a stale entry will be created in the statle location cache
         for the region.
      c. Clients select the primary meta region if the location is in the stale location cache.
      d. The stale location cache entries time out in 3 seconds.

If there is no "hbase.locator.meta.replicas.mode" configured, it will check the config knob "hbase.meta.replicas.use".
If "hbase.meta.replicas.use" is configured, the mode will be set to "HedgedRead".

For branch-2 support, it introduces support for ReversedScan over a specific non-default replica region, this is mainly
for load balance meta scan among all its replica regions.

Remove ConnectionImplementation#setUseMetaReplicas()
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index f76e084..ad8ef0a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -113,7 +113,7 @@
   private final Optional<ServerStatisticTracker> stats;
   private final ClientBackoffPolicy backoffPolicy;
 
-  private ChoreService authService;
+  private ChoreService choreService;
 
   private volatile boolean closed = false;
 
@@ -125,6 +125,7 @@
       User user) {
     this.conf = conf;
     this.user = user;
+
     if (user.isLoginFromKeytab()) {
       spawnRenewalChore(user.getUGI());
     }
@@ -176,8 +177,19 @@
   }
 
   private void spawnRenewalChore(final UserGroupInformation user) {
-    authService = new ChoreService("Relogin service");
-    authService.scheduleChore(AuthUtil.getAuthRenewalChore(user));
+    ChoreService service = getChoreService();
+    service.scheduleChore(AuthUtil.getAuthRenewalChore(user));
+  }
+
+  /**
+   * If choreService has not been created yet, create the ChoreService.
+   * @return ChoreService
+   */
+  synchronized ChoreService getChoreService() {
+    if (choreService == null) {
+      choreService = new ChoreService("AsyncConn Chore Service");
+    }
+    return choreService;
   }
 
   @Override
@@ -199,8 +211,8 @@
     IOUtils.closeQuietly(clusterStatusListener);
     IOUtils.closeQuietly(rpcClient);
     IOUtils.closeQuietly(registry);
-    if (authService != null) {
-      authService.shutdown();
+    if (choreService != null) {
+      choreService.shutdown();
     }
     metrics.ifPresent(MetricsConnection::shutdown);
     closed = true;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 5cdacf0..2627336 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -30,6 +30,7 @@
 import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
 import static org.apache.hadoop.hbase.client.RegionInfo.createRegionName;
+import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
 import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
 
@@ -46,6 +47,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.ObjectUtils;
 import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
@@ -89,7 +91,10 @@
 
   private final int locatePrefetchLimit;
 
-  private final boolean useMetaReplicas;
+  // The mode tells if HedgedRead, LoadBalance mode is supported.
+  // The default mode is CatalogReplicaMode.None.
+  private CatalogReplicaMode metaReplicaMode;
+  private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
 
   private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
 
@@ -196,8 +201,42 @@
       MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
     this.locatePrefetchLimit =
       conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
-    this.useMetaReplicas =
-      conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS);
+
+    // Get the region locator's meta replica mode.
+    this.metaReplicaMode = CatalogReplicaMode.fromString(conn.getConfiguration()
+      .get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));
+
+    switch (this.metaReplicaMode) {
+      case LOAD_BALANCE:
+        String replicaSelectorClass = conn.getConfiguration().
+          get(RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
+          CatalogReplicaLoadBalanceSimpleSelector.class.getName());
+
+        this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory.createSelector(
+          replicaSelectorClass, META_TABLE_NAME, conn.getChoreService(), () -> {
+            int numOfReplicas = 1;
+            try {
+              RegionLocations metaLocations = conn.registry.getMetaRegionLocations().get(
+                conn.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS);
+              numOfReplicas = metaLocations.size();
+            } catch (Exception e) {
+              LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
+            }
+            return numOfReplicas;
+          });
+        break;
+      case NONE:
+        // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
+
+        boolean useMetaReplicas = conn.getConfiguration().getBoolean(USE_META_REPLICAS,
+          DEFAULT_USE_META_REPLICAS);
+        if (useMetaReplicas) {
+          this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
+        }
+        break;
+      default:
+        // Doing nothing
+    }
   }
 
   private TableCache getTableCache(TableName tableName) {
@@ -433,9 +472,24 @@
     Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
       .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
       .setReadType(ReadType.PREAD);
-    if (useMetaReplicas) {
-      scan.setConsistency(Consistency.TIMELINE);
+
+    switch (this.metaReplicaMode) {
+      case LOAD_BALANCE:
+        int metaReplicaId = this.metaReplicaSelector.select(tableName, req.row, req.locateType);
+        if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
+          // If the selector gives a non-primary meta replica region, then go with it.
+          // Otherwise, just go to primary in non-hedgedRead mode.
+          scan.setConsistency(Consistency.TIMELINE);
+          scan.setReplicaId(metaReplicaId);
+        }
+        break;
+      case HEDGED_READ:
+        scan.setConsistency(Consistency.TIMELINE);
+        break;
+      default:
+        // do nothing
     }
+
     conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
 
       private boolean completeNormally = false;
@@ -577,6 +631,13 @@
       if (!canUpdateOnError(loc, oldLoc)) {
         return;
       }
+      // Tell metaReplicaSelector that the location is stale. It will create a stale entry
+      // with timestamp internally. Next time the client looks up the same location,
+      // it will pick a different meta replica region.
+      if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
+        metaReplicaSelector.onError(loc);
+      }
+
       RegionLocations newLocs = removeRegionLocation(oldLocs, loc.getRegion().getReplicaId());
       if (newLocs == null) {
         if (tableCache.cache.remove(startKey, oldLocs)) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelector.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelector.java
new file mode 100644
index 0000000..f9572b3
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelector.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A Catalog replica selector decides which catalog replica to go for read requests when it is
+ * configured as CatalogReplicaMode.LoadBalance.
+ */
+@InterfaceAudience.Private
+interface CatalogReplicaLoadBalanceSelector {
+
+  /**
+   * This method is called when input location is stale, i.e, when clients run into
+   * org.apache.hadoop.hbase.NotServingRegionException.
+   * @param loc stale location
+   */
+  void onError(HRegionLocation loc);
+
+  /**
+   * Select a catalog replica region where client go to loop up the input row key.
+   *
+   * @param tablename table name
+   * @param row  key to look up
+   * @param locateType  locate type
+   * @return replica id
+   */
+  int select(TableName tablename, byte[] row, RegionLocateType locateType);
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelectorFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelectorFactory.java
new file mode 100644
index 0000000..4db0d31
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSelectorFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.function.IntSupplier;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Factory to create a {@link CatalogReplicaLoadBalanceSelector}
+ */
+@InterfaceAudience.Private
+final class CatalogReplicaLoadBalanceSelectorFactory {
+  /**
+   * Private Constructor
+   */
+  private CatalogReplicaLoadBalanceSelectorFactory() {
+  }
+
+  /**
+   * Create a CatalogReplicaLoadBalanceReplicaSelector based on input config.
+   * @param replicaSelectorClass  Selector classname.
+   * @param tableName  System table name.
+   * @param choreService {@link ChoreService}
+   * @return  {@link CatalogReplicaLoadBalanceSelector}
+   */
+  public static CatalogReplicaLoadBalanceSelector createSelector(String replicaSelectorClass,
+    TableName tableName, ChoreService choreService, IntSupplier getReplicaCount) {
+    return ReflectionUtils.instantiateWithCustomCtor(replicaSelectorClass,
+      new Class[] { TableName.class, ChoreService.class, IntSupplier.class },
+      new Object[] { tableName, choreService, getReplicaCount });
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSimpleSelector.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSimpleSelector.java
new file mode 100644
index 0000000..410a2c3
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaLoadBalanceSimpleSelector.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
+import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.IntSupplier;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * <p>CatalogReplicaLoadBalanceReplicaSimpleSelector implements a simple catalog replica load
+ * balancing algorithm. It maintains a stale location cache for each table. Whenever client looks
+ * up location, it first check if the row is the stale location cache. If yes, the location from
+ * catalog replica is stale, it will go to the primary region to look up update-to-date location;
+ * otherwise, it will randomly pick up a replica region for lookup. When clients receive
+ * RegionNotServedException from region servers, it will add these region locations to the stale
+ * location cache. The stale cache will be cleaned up periodically by a chore.</p>
+ *
+ * It follows a simple algorithm to choose a replica to go:
+ *
+ * <ol>
+ *  <li>If there is no stale location entry for rows it looks up, it will randomly
+ *     pick a replica region to do lookup. </li>
+ *  <li>If the location from the replica region is stale, client gets RegionNotServedException
+ *     from region server, in this case, it will create StaleLocationCacheEntry in
+ *     CatalogReplicaLoadBalanceReplicaSimpleSelector.</li>
+ *  <li>When client tries to do location lookup, it checks StaleLocationCache first for rows it
+ *     tries to lookup, if entry exists, it will go with primary meta region to do lookup;
+ *     otherwise, it will follow step 1.</li>
+ *  <li>A chore will periodically run to clean up cache entries in the StaleLocationCache.</li>
+ * </ol>
+ */
+class CatalogReplicaLoadBalanceSimpleSelector implements
+  CatalogReplicaLoadBalanceSelector, Stoppable {
+  private static final Logger LOG =
+    LoggerFactory.getLogger(CatalogReplicaLoadBalanceSimpleSelector.class);
+  private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds
+  private final int STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS = 1500; // 1.5 seconds
+  private final int REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS = 60000; // 1 minute
+
+  /**
+   * StaleLocationCacheEntry is the entry when a stale location is reported by an client.
+   */
+  private static final class StaleLocationCacheEntry {
+    // timestamp in milliseconds
+    private final long timestamp;
+
+    private final byte[] endKey;
+
+    StaleLocationCacheEntry(final byte[] endKey) {
+      this.endKey = endKey;
+      timestamp = EnvironmentEdgeManager.currentTime();
+    }
+
+    public byte[] getEndKey() {
+      return this.endKey;
+    }
+
+    public long getTimestamp() {
+      return this.timestamp;
+    }
+
+    @Override
+    public String toString() {
+      return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+        .append("endKey", endKey)
+        .append("timestamp", timestamp)
+        .toString();
+    }
+  }
+
+  private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], StaleLocationCacheEntry>>
+    staleCache = new ConcurrentHashMap<>();
+  private volatile int numOfReplicas;
+  private final ChoreService choreService;
+  private final TableName tableName;
+  private final IntSupplier getNumOfReplicas;
+  private volatile boolean isStopped = false;
+  private final static int UNINITIALIZED_NUM_OF_REPLICAS = -1;
+
+  CatalogReplicaLoadBalanceSimpleSelector(TableName tableName, ChoreService choreService,
+    IntSupplier getNumOfReplicas) {
+    this.choreService = choreService;
+    this.tableName = tableName;
+    this.getNumOfReplicas = getNumOfReplicas;
+
+    // This numOfReplicas is going to be lazy initialized.
+    this.numOfReplicas = UNINITIALIZED_NUM_OF_REPLICAS;
+    // Start chores
+    this.choreService.scheduleChore(getCacheCleanupChore(this));
+    this.choreService.scheduleChore(getRefreshReplicaCountChore(this));
+  }
+
+  /**
+   * When a client runs into RegionNotServingException, it will call this method to
+   * update Selector's internal state.
+   * @param loc the location which causes exception.
+   */
+  public void onError(HRegionLocation loc) {
+    ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache =
+      computeIfAbsent(staleCache, loc.getRegion().getTable(),
+        () -> new ConcurrentSkipListMap<>(BYTES_COMPARATOR));
+    byte[] startKey = loc.getRegion().getStartKey();
+    tableCache.putIfAbsent(startKey,
+      new StaleLocationCacheEntry(loc.getRegion().getEndKey()));
+    LOG.debug("Add entry to stale cache for table {} with startKey {}, {}",
+      loc.getRegion().getTable(), startKey, loc.getRegion().getEndKey());
+  }
+
+  /**
+   * Select an random replica id. In case there is no replica region configured, return
+   * the primary replica id.
+   * @return Replica id
+   */
+  private int getRandomReplicaId() {
+    int cachedNumOfReplicas = this.numOfReplicas;
+    if (cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) {
+      cachedNumOfReplicas = refreshCatalogReplicaCount();
+      this.numOfReplicas = cachedNumOfReplicas;
+    }
+    // In case of no replica configured, return the primary region id.
+    if (cachedNumOfReplicas <= 1) {
+      return RegionInfo.DEFAULT_REPLICA_ID;
+    }
+    return 1 + ThreadLocalRandom.current().nextInt(cachedNumOfReplicas - 1);
+  }
+
+  /**
+   * When it looks up a location, it will call this method to find a replica region to go.
+   * For a normal case, > 99% of region locations from catalog/meta replica will be up to date.
+   * In extreme cases such as region server crashes, it will depends on how fast replication
+   * catches up.
+   *
+   * @param tablename table name it looks up
+   * @param row key it looks up.
+   * @param locateType locateType, Only BEFORE and CURRENT will be passed in.
+   * @return catalog replica id
+   */
+  public int select(final TableName tablename, final byte[] row,
+    final RegionLocateType locateType) {
+    Preconditions.checkArgument(locateType == RegionLocateType.BEFORE ||
+        locateType == RegionLocateType.CURRENT,
+      "Expected type BEFORE or CURRENT but got: %s", locateType);
+
+    ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = staleCache.get(tablename);
+
+    // If there is no entry in StaleCache, select a random replica id.
+    if (tableCache == null) {
+      return getRandomReplicaId();
+    }
+
+    Map.Entry<byte[], StaleLocationCacheEntry> entry;
+    boolean isEmptyStopRow = isEmptyStopRow(row);
+    // Only BEFORE and CURRENT are passed in.
+    if (locateType == RegionLocateType.BEFORE) {
+      entry = isEmptyStopRow ? tableCache.lastEntry() : tableCache.lowerEntry(row);
+    } else {
+      entry = tableCache.floorEntry(row);
+    }
+
+    // It is not in the stale cache, return a random replica id.
+    if (entry == null) {
+      return getRandomReplicaId();
+    }
+
+    // The entry here is a possible match for the location. Check if the entry times out first as
+    // long comparing is faster than comparing byte arrays(in most cases). It could remove
+    // stale entries faster. If the possible match entry does not time out, it will check if
+    // the entry is a match for the row passed in and select the replica id accordingly.
+    if ((EnvironmentEdgeManager.currentTime() - entry.getValue().getTimestamp()) >=
+      STALE_CACHE_TIMEOUT_IN_MILLISECONDS) {
+      LOG.debug("Entry for table {} with startKey {}, {} times out", tablename, entry.getKey(),
+        entry);
+      tableCache.remove(entry.getKey());
+      return getRandomReplicaId();
+    }
+
+    byte[] endKey =  entry.getValue().getEndKey();
+
+    // The following logic is borrowed from AsyncNonMetaRegionLocator.
+    if (isEmptyStopRow(endKey)) {
+      LOG.debug("Lookup {} goes to primary region", row);
+      return RegionInfo.DEFAULT_REPLICA_ID;
+    }
+
+    if (locateType == RegionLocateType.BEFORE) {
+      if (!isEmptyStopRow && Bytes.compareTo(endKey, row) >= 0) {
+        LOG.debug("Lookup {} goes to primary meta", row);
+        return RegionInfo.DEFAULT_REPLICA_ID;
+      }
+    } else {
+      if (Bytes.compareTo(row, endKey) < 0) {
+        LOG.debug("Lookup {} goes to primary meta", row);
+        return RegionInfo.DEFAULT_REPLICA_ID;
+      }
+    }
+
+    // Not in stale cache, return a random replica id.
+    return getRandomReplicaId();
+  }
+
+  // This class implements the Stoppable interface as chores needs a Stopable object, there is
+  // no-op on this Stoppable object currently.
+  @Override
+  public void stop(String why) {
+    isStopped = true;
+  }
+
+  @Override
+  public boolean isStopped() {
+    return isStopped;
+  }
+
+  private void cleanupReplicaReplicaStaleCache() {
+    long curTimeInMills = EnvironmentEdgeManager.currentTime();
+    for (ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache : staleCache.values()) {
+      Iterator<Map.Entry<byte[], StaleLocationCacheEntry>> it =
+        tableCache.entrySet().iterator();
+      while (it.hasNext()) {
+        Map.Entry<byte[], StaleLocationCacheEntry> entry = it.next();
+        if (curTimeInMills - entry.getValue().getTimestamp() >=
+          STALE_CACHE_TIMEOUT_IN_MILLISECONDS) {
+          LOG.debug("clean entry {}, {} from stale cache", entry.getKey(), entry.getValue());
+          it.remove();
+        }
+      }
+    }
+  }
+
+  private int refreshCatalogReplicaCount() {
+    int newNumOfReplicas = this.getNumOfReplicas.getAsInt();
+    LOG.debug("Refreshed replica count {}", newNumOfReplicas);
+    if (newNumOfReplicas == 1) {
+      LOG.warn("Table {}'s region replica count is 1, maybe a misconfiguration or failure to "
+        + "fetch the replica count", tableName);
+    }
+    int cachedNumOfReplicas = this.numOfReplicas;
+
+    // If the returned number of replicas is 1, it is mostly caused by failure to fetch the
+    // replica count. Do not update the numOfReplicas in this case.
+    if ((cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) ||
+      ((cachedNumOfReplicas != newNumOfReplicas) && (newNumOfReplicas != 1))) {
+      this.numOfReplicas = newNumOfReplicas;
+    }
+    return newNumOfReplicas;
+  }
+
+  private ScheduledChore getCacheCleanupChore(
+    final CatalogReplicaLoadBalanceSimpleSelector selector) {
+    return new ScheduledChore("CleanupCatalogReplicaStaleCache", this,
+      STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS) {
+      @Override
+      protected void chore() {
+        selector.cleanupReplicaReplicaStaleCache();
+      }
+    };
+  }
+
+  private ScheduledChore getRefreshReplicaCountChore(
+    final CatalogReplicaLoadBalanceSimpleSelector selector) {
+    return new ScheduledChore("RefreshReplicaCountChore", this,
+      REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS) {
+      @Override
+      protected void chore() {
+        selector.refreshCatalogReplicaCount();
+      }
+    };
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaMode.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaMode.java
new file mode 100644
index 0000000..0f126e1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CatalogReplicaMode.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * <p>There are two modes with catalog replica support. </p>
+ *
+ * <ol>
+ *   <li>HEDGED_READ  - Client sends requests to the primary region first, within a
+ *                 configured amount of time, if there is no response coming back,
+ *                 client sends requests to all replica regions and takes the first
+ *                 response. </li>
+ *
+ *   <li>LOAD_BALANCE - Client sends requests to replica regions in a round-robin mode,
+ *                 if results from replica regions are stale, next time, client sends requests for
+ *                 these stale locations to the primary region. In this mode, scan
+ *                 requests are load balanced across all replica regions.</li>
+ * </ol>
+ */
+@InterfaceAudience.Private
+enum CatalogReplicaMode {
+  NONE {
+    @Override
+    public String toString() {
+      return "None";
+    }
+  },
+  HEDGED_READ {
+    @Override
+    public String toString() {
+      return "HedgedRead";
+    }
+  },
+  LOAD_BALANCE {
+    @Override
+    public String toString() {
+      return "LoadBalance";
+    }
+  };
+
+  public static CatalogReplicaMode fromString(final String value) {
+    for(CatalogReplicaMode mode : values()) {
+      if (mode.toString().equalsIgnoreCase(value)) {
+        return mode;
+      }
+    }
+    throw new IllegalArgumentException();
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 61c45dd..6fbc6ca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -18,10 +18,14 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
+import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
+import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
 import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
+import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsentEx;
 
@@ -161,7 +165,11 @@
   private final boolean hostnamesCanChange;
   private final long pause;
   private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
-  private boolean useMetaReplicas;
+  // The mode tells if HedgedRead, LoadBalance mode is supported.
+  // The default mode is CatalogReplicaMode.None.
+  private CatalogReplicaMode metaReplicaMode;
+  private CatalogReplicaLoadBalanceSelector metaReplicaSelector;
+
   private final int metaReplicaCallTimeoutScanInMicroSecond;
   private final int numTries;
   final int rpcTimeout;
@@ -232,7 +240,7 @@
   /** lock guards against multiple threads trying to query the meta region at the same time */
   private final ReentrantLock userRegionLock = new ReentrantLock();
 
-  private ChoreService authService;
+  private ChoreService choreService;
 
   /**
    * constructor
@@ -258,8 +266,6 @@
     } else {
       this.pauseForCQTBE = configuredPauseForCQTBE;
     }
-    this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
-      HConstants.DEFAULT_USE_META_REPLICAS);
     this.metaReplicaCallTimeoutScanInMicroSecond =
         connectionConfig.getMetaReplicaCallTimeoutMicroSecondScan();
 
@@ -332,19 +338,47 @@
       close();
       throw e;
     }
+
+    // Get the region locator's meta replica mode.
+    this.metaReplicaMode = CatalogReplicaMode.fromString(conf.get(LOCATOR_META_REPLICAS_MODE,
+      CatalogReplicaMode.NONE.toString()));
+
+    switch (this.metaReplicaMode) {
+      case LOAD_BALANCE:
+        String replicaSelectorClass = conf.get(
+          RegionLocator.LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR,
+          CatalogReplicaLoadBalanceSimpleSelector.class.getName());
+
+        this.metaReplicaSelector = CatalogReplicaLoadBalanceSelectorFactory.createSelector(
+          replicaSelectorClass, META_TABLE_NAME, getChoreService(), () -> {
+            int numOfReplicas = 1;
+            try {
+              RegionLocations metaLocations = registry.getMetaRegionLocations().get(
+                connectionConfig.getReadRpcTimeout(), TimeUnit.MILLISECONDS);
+              numOfReplicas = metaLocations.size();
+            } catch (Exception e) {
+              LOG.error("Failed to get table {}'s region replication, ", META_TABLE_NAME, e);
+            }
+            return numOfReplicas;
+          });
+        break;
+      case NONE:
+        // If user does not configure LOCATOR_META_REPLICAS_MODE, let's check the legacy config.
+
+        boolean useMetaReplicas = conf.getBoolean(USE_META_REPLICAS,
+          DEFAULT_USE_META_REPLICAS);
+        if (useMetaReplicas) {
+          this.metaReplicaMode = CatalogReplicaMode.HEDGED_READ;
+        }
+        break;
+      default:
+        // Doing nothing
+    }
   }
 
   private void spawnRenewalChore(final UserGroupInformation user) {
-    authService = new ChoreService("Relogin service");
-    authService.scheduleChore(AuthUtil.getAuthRenewalChore(user));
-  }
-
-  /**
-   * @param useMetaReplicas
-   */
-  @VisibleForTesting
-  void setUseMetaReplicas(final boolean useMetaReplicas) {
-    this.useMetaReplicas = useMetaReplicas;
+    ChoreService service = getChoreService();
+    service.scheduleChore(AuthUtil.getAuthRenewalChore(user));
   }
 
   /**
@@ -580,6 +614,17 @@
     }
   }
 
+  /**
+   * If choreService has not been created yet, create the ChoreService.
+   * @return ChoreService
+   */
+  synchronized ChoreService getChoreService() {
+    if (choreService == null) {
+      choreService = new ChoreService("AsyncConn Chore Service");
+    }
+    return choreService;
+  }
+
   @Override
   public Configuration getConfiguration() {
     return this.conf;
@@ -841,8 +886,23 @@
     Scan s = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
       .addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(5)
       .setReadType(ReadType.PREAD);
-    if (this.useMetaReplicas) {
-      s.setConsistency(Consistency.TIMELINE);
+
+    switch (this.metaReplicaMode) {
+      case LOAD_BALANCE:
+        int metaReplicaId = this.metaReplicaSelector.select(tableName, row,
+          RegionLocateType.CURRENT);
+        if (metaReplicaId != RegionInfo.DEFAULT_REPLICA_ID) {
+          // If the selector gives a non-primary meta replica region, then go with it.
+          // Otherwise, just go to primary in non-hedgedRead mode.
+          s.setConsistency(Consistency.TIMELINE);
+          s.setReplicaId(metaReplicaId);
+        }
+        break;
+      case HEDGED_READ:
+        s.setConsistency(Consistency.TIMELINE);
+        break;
+      default:
+        // do nothing
     }
     int maxAttempts = (retry ? numTries : 1);
     boolean relocateMeta = false;
@@ -1980,6 +2040,13 @@
       metrics.incrCacheDroppingExceptions(exception);
     }
 
+    // Tell metaReplicaSelector that the location is stale. It will create a stale entry
+    // with timestamp internally. Next time the client looks up the same location,
+    // it will pick a different meta replica region.
+    if (this.metaReplicaMode == CatalogReplicaMode.LOAD_BALANCE) {
+      metaReplicaSelector.onError(oldLocation);
+    }
+
     // If we're here, it means that can cannot be sure about the location, so we remove it from
     // the cache. Do not send the source because source can be a new server in the same host:port
     metaCache.clearCache(regionInfo);
@@ -2050,8 +2117,8 @@
     if (rpcClient != null) {
       rpcClient.close();
     }
-    if (authService != null) {
-      authService.shutdown();
+    if (choreService != null) {
+      choreService.shutdown();
     }
   }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java
index c7440c6..7ea6e4a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionLocator.java
@@ -37,6 +37,18 @@
  */
 @InterfaceAudience.Public
 public interface RegionLocator extends Closeable {
+
+  /** Configuration for Region Locator's mode when meta replica is configured.
+   * Valid values are: HedgedRead, LoadBalance, None
+   */
+  String LOCATOR_META_REPLICAS_MODE = "hbase.locator.meta.replicas.mode";
+
+  /** Configuration for meta replica selector when Region Locator's LoadBalance mode is configured.
+   * The default value is org.apache.hadoop.hbase.client.CatalogReplicaLoadBalanceSimpleSelector.
+   */
+  String LOCATOR_META_REPLICAS_MODE_LOADBALANCE_SELECTOR =
+    "hbase.locator.meta.replicas.mode.loadbalance.selector";
+
   /**
    * Finds the region on which the given row is being served. Does not reload the cache.
    * @param row Row to find.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
index 70d32d5..965b13c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
@@ -237,7 +237,6 @@
       return f;
     }
 
-    // impossible to reach
     return null;
   }
 
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 2ab92d5..902e15e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -45,10 +45,11 @@
 
 /**
  * This class has the logic for handling scanners for regions with and without replicas.
- * 1. A scan is attempted on the default (primary) region
- * 2. The scanner sends all the RPCs to the default region until it is done, or, there
- * is a timeout on the default (a timeout of zero is disallowed).
- * 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s)
+ * 1. A scan is attempted on the default (primary) region, or a specific region.
+ * 2. The scanner sends all the RPCs to the default/specific region until it is done, or, there
+ * is a timeout on the default/specific region (a timeout of zero is disallowed).
+ * 3. If there is a timeout in (2) above, scanner(s) is opened on the non-default replica(s) only
+ *    for Consistency.TIMELINE without specific replica id specified.
  * 4. The results from the first successful scanner are taken, and it is stored which server
  * returned the results.
  * 5. The next RPCs are done on the above stored server until it is done or there is a timeout,
@@ -160,7 +161,7 @@
       RegionLocations rl = null;
       try {
         rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true,
-            RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
+          RegionReplicaUtil.DEFAULT_REPLICA_ID, cConnection, tableName,
             currentScannerCallable.getRow());
       } catch (RetriesExhaustedException | DoNotRetryIOException e) {
         // We cannot get the primary replica region location, it is possible that the region server
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
index a2466a5..989fdbb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java
@@ -142,6 +142,21 @@
       locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, false)
         .getDefaultRegionLocation().getServerName());
     // should get the new location when reload = true
+    // when meta replica LoadBalance mode is enabled, it may delay a bit.
+    util.waitFor(3000, new ExplainingPredicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        ServerName sn = locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID,
+          true).getDefaultRegionLocation().getServerName();
+        return newServerName.equals(sn);
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "New location does not show up in meta (replica) region";
+      }
+    });
+
     assertEquals(newServerName,
       locator.getRegionLocations(tableName, RegionReplicaUtil.DEFAULT_REPLICA_ID, true)
         .getDefaultRegionLocation().getServerName());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
index 70f867a..b822dc9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java
@@ -29,12 +29,15 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.IntStream;
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -49,41 +52,61 @@
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Category({ MediumTests.class, ClientTests.class })
+@RunWith(Parameterized.class)
 public class TestAsyncNonMetaRegionLocator {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestAsyncNonMetaRegionLocator.class);
 
+  private static final Logger LOG = LoggerFactory.getLogger(TestAsyncNonMetaRegionLocator.class);
+
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
   private static TableName TABLE_NAME = TableName.valueOf("async");
 
   private static byte[] FAMILY = Bytes.toBytes("cf");
+  private static final int META_STOREFILE_REFRESH_PERIOD = 100;
+  private static final int NB_SERVERS = 4;
+  private static int numOfMetaReplica = NB_SERVERS - 1;
 
   private static AsyncConnectionImpl CONN;
 
   private static AsyncNonMetaRegionLocator LOCATOR;
+  private static ConnectionRegistry registry;
 
   private static byte[][] SPLIT_KEYS;
+  private CatalogReplicaMode metaReplicaMode;
 
   @BeforeClass
   public static void setUp() throws Exception {
-    TEST_UTIL.startMiniCluster(3);
-    TEST_UTIL.getAdmin().balancerSwitch(false, true);
-    ConnectionRegistry registry =
-        ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
-    CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry,
-      registry.getClusterId().get(), User.getCurrent());
-    LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+    Configuration conf = TEST_UTIL.getConfiguration();
+
+    // Enable hbase:meta replication.
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
+    conf.setLong("replication.source.sleepforretries", 10);    // 10 ms
+    TEST_UTIL.startMiniCluster(NB_SERVERS);
+    Admin admin = TEST_UTIL.getAdmin();
+    admin.balancerSwitch(false, true);
+    // Enable hbase:meta replication.
+    HBaseTestingUtility.setReplicas(admin, TableName.META_TABLE_NAME, numOfMetaReplica);
+    TEST_UTIL.waitFor(30000, () -> TEST_UTIL.getMiniHBaseCluster().getRegions(
+      TableName.META_TABLE_NAME).size() >= numOfMetaReplica);
+
+    registry = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
     SPLIT_KEYS = new byte[8][];
     for (int i = 111; i < 999; i += 111) {
       SPLIT_KEYS[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i));
@@ -108,6 +131,26 @@
     LOCATOR.clearCache(TABLE_NAME);
   }
 
+  @Parameterized.Parameters
+  public static Collection<Object[]> parameters() {
+    return Arrays.asList(new Object[][] {
+      { null },
+      { CatalogReplicaMode.LOAD_BALANCE.toString() }
+    });
+  }
+
+  public TestAsyncNonMetaRegionLocator(String clientMetaReplicaMode) throws Exception {
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    // Enable meta replica LoadBalance mode for this connection.
+    if (clientMetaReplicaMode != null) {
+      c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, clientMetaReplicaMode);
+      metaReplicaMode = CatalogReplicaMode.fromString(clientMetaReplicaMode);
+    }
+
+    CONN = new AsyncConnectionImpl(c, registry, registry.getClusterId().get(), User.getCurrent());
+    LOCATOR = new AsyncNonMetaRegionLocator(CONN);
+  }
+
   private void createSingleRegionTable() throws IOException, InterruptedException {
     TEST_UTIL.createTable(TABLE_NAME, FAMILY);
     TEST_UTIL.waitTableAvailable(TABLE_NAME);
@@ -347,8 +390,21 @@
         getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, locateType, false).get());
     }
     // should get the new location when reload = true
-    assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
-      getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW, RegionLocateType.CURRENT, true).get());
+    // when meta replica LoadBalance mode is enabled, it may delay a bit.
+    TEST_UTIL.waitFor(3000, new ExplainingPredicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        HRegionLocation loc = getDefaultRegionLocation(TABLE_NAME, EMPTY_START_ROW,
+          RegionLocateType.CURRENT, true).get();
+        return newServerName.equals(loc.getServerName());
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "New location does not show up in meta (replica) region";
+      }
+    });
+
     // the cached location should be replaced
     for (RegionLocateType locateType : RegionLocateType.values()) {
       assertLocEquals(EMPTY_START_ROW, EMPTY_END_ROW, newServerName,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 87e6c00..722df93 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -59,8 +60,10 @@
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,6 +87,9 @@
   private final static int REFRESH_PERIOD = 1000;
   private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200;
 
+  @Rule
+  public TestName name = new TestName();
+
   /**
    * This copro is used to synchronize the tests.
    */
@@ -280,7 +286,7 @@
   @Test
   public void testCreateDeleteTable() throws IOException {
     // Create table then get the single region for our new table.
-    HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
+    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
     hdt.setRegionReplication(NB_SERVERS);
     hdt.addCoprocessor(SlowMeCopro.class.getName());
     Table table = HTU.createTable(hdt, new byte[][]{f}, null);
@@ -312,7 +318,7 @@
 
   @Test
   public void testChangeTable() throws Exception {
-    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("testChangeTable"))
+    TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
             .setRegionReplication(NB_SERVERS)
             .setCoprocessor(SlowMeCopro.class.getName())
             .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f))
@@ -372,7 +378,7 @@
   @SuppressWarnings("deprecation")
   @Test
   public void testReplicaAndReplication() throws Exception {
-    HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaAndReplication");
+    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
     hdt.setRegionReplication(NB_SERVERS);
 
     HColumnDescriptor fam = new HColumnDescriptor(row);
@@ -458,14 +464,14 @@
   public void testBulkLoad() throws IOException {
     // Create table then get the single region for our new table.
     LOG.debug("Creating test table");
-    HTableDescriptor hdt = HTU.createTableDescriptor("testBulkLoad");
+    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
     hdt.setRegionReplication(NB_SERVERS);
     hdt.addCoprocessor(SlowMeCopro.class.getName());
     Table table = HTU.createTable(hdt, new byte[][]{f}, null);
 
     // create hfiles to load.
     LOG.debug("Creating test data");
-    Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad");
+    Path dir = HTU.getDataTestDirOnTestFS(name.getMethodName());
     final int numRows = 10;
     final byte[] qual = Bytes.toBytes("qual");
     final byte[] val  = Bytes.toBytes("val");
@@ -537,7 +543,7 @@
   @Test
   public void testReplicaGetWithPrimaryDown() throws IOException {
     // Create table then get the single region for our new table.
-    HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
+    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
     hdt.setRegionReplication(NB_SERVERS);
     hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
     try {
@@ -571,7 +577,7 @@
   @Test
   public void testReplicaScanWithPrimaryDown() throws IOException {
     // Create table then get the single region for our new table.
-    HTableDescriptor hdt = HTU.createTableDescriptor("testCreateDeleteTable");
+    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
     hdt.setRegionReplication(NB_SERVERS);
     hdt.addCoprocessor(RegionServerStoppedCopro.class.getName());
 
@@ -618,7 +624,7 @@
     HTU.getConfiguration().set(
         "hbase.rpc.client.impl", "org.apache.hadoop.hbase.ipc.AsyncRpcClient");
     // Create table then get the single region for our new table.
-    HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithAsyncRpcClientImpl");
+    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
     hdt.setRegionReplication(NB_SERVERS);
     hdt.addCoprocessor(SlowMeCopro.class.getName());
 
@@ -669,11 +675,12 @@
   @Test
   public void testGetRegionLocationFromPrimaryMetaRegion() throws IOException, InterruptedException {
     HTU.getAdmin().setBalancerRunning(false, true);
-
-    ((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(true);
+    Configuration conf = new Configuration(HTU.getConfiguration());
+    conf.setBoolean(USE_META_REPLICAS, true);
+    Connection conn = ConnectionFactory.createConnection(conf);
 
     // Create table then get the single region for our new table.
-    HTableDescriptor hdt = HTU.createTableDescriptor("testGetRegionLocationFromPrimaryMetaRegion");
+    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
     hdt.setRegionReplication(2);
     try {
 
@@ -682,12 +689,11 @@
       RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = true;
 
       // Get user table location, always get it from the primary meta replica
-      RegionLocations url = ((ClusterConnection) HTU.getConnection())
+      RegionLocations url = ((ClusterConnection)conn)
           .locateRegion(hdt.getTableName(), row, false, false);
 
     } finally {
       RegionServerHostingPrimayMetaRegionSlowOrStopCopro.slowDownPrimaryMetaScan = false;
-      ((ConnectionImplementation) HTU.getAdmin().getConnection()).setUseMetaReplicas(false);
       HTU.getAdmin().setBalancerRunning(true, true);
       HTU.getAdmin().disableTable(hdt.getTableName());
       HTU.deleteTable(hdt.getTableName());
@@ -703,22 +709,25 @@
   public void testReplicaGetWithPrimaryAndMetaDown() throws IOException, InterruptedException {
     HTU.getAdmin().setBalancerRunning(false, true);
 
-    ((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(true);
+    Configuration conf = new Configuration(HTU.getConfiguration());
+    conf.setBoolean(USE_META_REPLICAS, true);
+    Connection conn = ConnectionFactory.createConnection(conf);
 
     // Create table then get the single region for our new table.
-    HTableDescriptor hdt = HTU.createTableDescriptor("testReplicaGetWithPrimaryAndMetaDown");
+    HTableDescriptor hdt = HTU.createTableDescriptor(name.getMethodName());
     hdt.setRegionReplication(2);
     try {
 
-      Table table = HTU.createTable(hdt, new byte[][] { f }, null);
+      HTU.createTable(hdt, new byte[][] { f }, null);
+      Table table = conn.getTable(TableName.valueOf(name.getMethodName()));
 
       // Get Meta location
-      RegionLocations mrl = ((ClusterConnection) HTU.getConnection())
+      RegionLocations mrl = ((ClusterConnection)conn)
           .locateRegion(TableName.META_TABLE_NAME,
               HConstants.EMPTY_START_ROW, false, false);
 
       // Get user table location
-      RegionLocations url = ((ClusterConnection) HTU.getConnection())
+      RegionLocations url = ((ClusterConnection)conn)
           .locateRegion(hdt.getTableName(), row, false, false);
 
       // Make sure that user primary region is co-hosted with the meta region
@@ -738,11 +747,11 @@
 
       // Wait until the meta table is updated with new location info
       while (true) {
-        mrl = ((ClusterConnection) HTU.getConnection())
+        mrl = ((ClusterConnection)conn)
             .locateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, false, false);
 
         // Get user table location
-        url = ((ClusterConnection) HTU.getConnection())
+        url = ((ClusterConnection)conn)
             .locateRegion(hdt.getTableName(), row, false, true);
 
         LOG.info("meta locations " + mrl);
@@ -783,9 +792,7 @@
       // The second Get will succeed as well
       r = table.get(g);
       Assert.assertTrue(r.isStale());
-
     } finally {
-      ((ConnectionImplementation)HTU.getAdmin().getConnection()).setUseMetaReplicas(false);
       RegionServerHostingPrimayMetaRegionSlowOrStopCopro.throwException = false;
       HTU.getAdmin().setBalancerRunning(true, true);
       HTU.getAdmin().disableTable(hdt.getTableName());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
index 3cefce5..dd17eeb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -38,6 +40,10 @@
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
@@ -62,17 +68,20 @@
 /**
  * Tests RegionReplicaReplicationEndpoint class for hbase:meta by setting up region replicas and
  * verifying async wal replication replays the edits to the secondary region in various scenarios.
+ *
  * @see TestRegionReplicaReplicationEndpoint
  */
 @Category({LargeTests.class})
 public class TestMetaRegionReplicaReplicationEndpoint {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class);
+    HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class);
   private static final Logger LOG =
-      LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class);
-  private static final int NB_SERVERS = 3;
-  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+    LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class);
+  private static final int NB_SERVERS = 4;
+  private final HBaseTestingUtility HTU = new HBaseTestingUtility();
+  private int numOfMetaReplica = NB_SERVERS - 1;
+  private static byte[] VALUE = Bytes.toBytes("value");
 
   @Rule
   public TestName name = new TestName();
@@ -92,12 +101,17 @@
     conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
     conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
     // Enable hbase:meta replication.
-    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
+    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY,
+      true);
     // Set hbase:meta replicas to be 3.
-    conf.setInt(HConstants.META_REPLICAS_NUM, NB_SERVERS);
+    // conf.setInt(HConstants.META_REPLICAS_NUM, numOfMetaReplica);
     HTU.startMiniCluster(NB_SERVERS);
+    // Enable hbase:meta replication.
+    HBaseTestingUtility.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, numOfMetaReplica);
+
     HTU.waitFor(30000,
-      () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size() >= NB_SERVERS);
+      () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size()
+      >= numOfMetaReplica);
   }
 
   @After
@@ -128,7 +142,7 @@
     assertNotNull(hrsOther);
     assertFalse(isMetaRegionReplicaReplicationSource(hrsOther));
     Region meta = null;
-    for (Region region: hrs.getOnlineRegionsLocalContext()) {
+    for (Region region : hrs.getOnlineRegionsLocalContext()) {
       if (region.getRegionInfo().isMetaRegion()) {
         meta = region;
         break;
@@ -156,8 +170,8 @@
   private void testHBaseMetaReplicatesOneRow(int i) throws Exception {
     waitForMetaReplicasToOnline();
     try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_" + i),
-        HConstants.CATALOG_FAMILY)) {
-      verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
+      HConstants.CATALOG_FAMILY)) {
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
     }
   }
 
@@ -175,18 +189,136 @@
    */
   @Test
   public void testHBaseMetaReplicates() throws Exception {
-    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"),
-      HConstants.CATALOG_FAMILY,
-        Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length)))  {
-      verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
+    try (Table table = HTU
+      .createTable(TableName.valueOf(this.name.getMethodName() + "_0"), HConstants.CATALOG_FAMILY,
+        Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) {
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
     }
-    try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"),
-      HConstants.CATALOG_FAMILY,
-      Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length)))  {
-      verifyReplication(TableName.META_TABLE_NAME, NB_SERVERS, getMetaCells(table.getName()));
+    try (Table table = HTU
+      .createTable(TableName.valueOf(this.name.getMethodName() + "_1"), HConstants.CATALOG_FAMILY,
+        Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) {
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
       // Try delete.
       HTU.deleteTableIfAny(table.getName());
-      verifyDeletedReplication(TableName.META_TABLE_NAME, NB_SERVERS, table.getName());
+      verifyDeletedReplication(TableName.META_TABLE_NAME, numOfMetaReplica, table.getName());
+    }
+  }
+
+  @Test
+  public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Exception {
+    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+    TableName tableName = TableName.valueOf("hbase:meta");
+    Table table = connection.getTable(tableName);
+    try {
+      // load the data to the table
+      for (int i = 0; i < 5; i++) {
+        LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
+        HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
+        LOG.info("flushing table");
+        HTU.flush(tableName);
+        LOG.info("compacting table");
+        if (i < 4) {
+          HTU.compact(tableName, false);
+        }
+      }
+
+      verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
+    } finally {
+      table.close();
+      connection.close();
+    }
+  }
+
+  @Test
+  public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception {
+    MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();
+    HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
+
+    HRegionServer hrsMetaReplica = null;
+    HRegionServer hrsNoMetaReplica = null;
+    HRegionServer server = null;
+    Region metaReplica = null;
+    boolean hostingMeta;
+
+    for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
+      server = cluster.getRegionServer(i);
+      hostingMeta = false;
+      if (server == hrs) {
+        continue;
+      }
+      for (Region region : server.getOnlineRegionsLocalContext()) {
+        if (region.getRegionInfo().isMetaRegion()) {
+          if (metaReplica == null) {
+            metaReplica = region;
+          }
+          hostingMeta = true;
+          break;
+        }
+      }
+      if (!hostingMeta) {
+        hrsNoMetaReplica = server;
+      }
+    }
+
+    Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+    TableName tableName = TableName.valueOf("hbase:meta");
+    Table table = connection.getTable(tableName);
+    try {
+      // load the data to the table
+      for (int i = 0; i < 5; i++) {
+        LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
+        HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
+        if (i == 0) {
+          HTU.moveRegionAndWait(metaReplica.getRegionInfo(), hrsNoMetaReplica.getServerName());
+        }
+      }
+
+      verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
+    } finally {
+      table.close();
+      connection.close();
+    }
+  }
+
+  protected void verifyReplication(TableName tableName, int regionReplication, final int startRow,
+    final int endRow, final byte[] family) throws Exception {
+    verifyReplication(tableName, regionReplication, startRow, endRow, family, true);
+  }
+
+  private void verifyReplication(TableName tableName, int regionReplication, final int startRow,
+    final int endRow, final byte[] family, final boolean present) throws Exception {
+    // find the regions
+    final Region[] regions = new Region[regionReplication];
+
+    for (int i = 0; i < NB_SERVERS; i++) {
+      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+      List<HRegion> onlineRegions = rs.getRegions(tableName);
+      for (HRegion region : onlineRegions) {
+        regions[region.getRegionInfo().getReplicaId()] = region;
+      }
+    }
+
+    for (Region region : regions) {
+      assertNotNull(region);
+    }
+
+    for (int i = 1; i < regionReplication; i++) {
+      final Region region = regions[i];
+      // wait until all the data is replicated to all secondary regions
+      Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          LOG.info("verifying replication for region replica:" + region.getRegionInfo());
+          try {
+            HTU.verifyNumericRows(region, family, startRow, endRow, present);
+          } catch (Throwable ex) {
+            LOG.warn("Verification from secondary region is not complete yet", ex);
+            // still wait
+            return false;
+          }
+          return true;
+        }
+      });
     }
   }
 
@@ -200,10 +332,10 @@
       // getRegionLocations returns an entry for each replica but if unassigned, entry is null.
       // Pass reload to force us to skip cache else it just keeps returning default.
       () -> regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW, true).stream().
-        filter(Objects::nonNull).count() >= NB_SERVERS);
+        filter(Objects::nonNull).count() >= numOfMetaReplica);
     List<HRegionLocation> locations = regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW);
     LOG.info("Found locations {}", locations);
-    assertEquals(NB_SERVERS, locations.size());
+    assertEquals(numOfMetaReplica, locations.size());
   }
 
   /**
@@ -224,7 +356,7 @@
   /**
    * @return All Regions for tableName including Replicas.
    */
-  private Region [] getAllRegions(TableName tableName, int replication) {
+  private Region[] getAllRegions(TableName tableName, int replication) {
     final Region[] regions = new Region[replication];
     for (int i = 0; i < NB_SERVERS; i++) {
       HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
@@ -239,12 +371,23 @@
     return regions;
   }
 
+  private Region getOneRegion(TableName tableName) {
+    for (int i = 0; i < NB_SERVERS; i++) {
+      HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+      List<HRegion> onlineRegions = rs.getRegions(tableName);
+      if (onlineRegions.size() > 1) {
+        return onlineRegions.get(0);
+      }
+    }
+    return null;
+  }
+
   /**
    * Verify when a Table is deleted from primary, then there are no references in replicas
    * (because they get the delete of the table rows too).
    */
   private void verifyDeletedReplication(TableName tableName, int regionReplication,
-      final TableName deletedTableName) {
+    final TableName deletedTableName) {
     final Region[] regions = getAllRegions(tableName, regionReplication);
 
     // Start count at '1' so we skip default, primary replica and only look at secondaries.
@@ -261,7 +404,7 @@
               continue;
             }
             return doesNotContain(cells, deletedTableName);
-          } catch(Throwable ex) {
+          } catch (Throwable ex) {
             LOG.warn("Verification from secondary region is not complete yet", ex);
             // still wait
             return false;
@@ -277,7 +420,7 @@
    * <code>cells</code>.
    */
   private boolean doesNotContain(List<Cell> cells, TableName tableName) {
-    for (Cell cell: cells) {
+    for (Cell cell : cells) {
       String row = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
       if (row.startsWith(tableName.toString() + HConstants.DELIMITER)) {
         return false;
@@ -290,7 +433,7 @@
    * Verify Replicas have results (exactly).
    */
   private void verifyReplication(TableName tableName, int regionReplication,
-      List<Result> contains) {
+    List<Result> contains) {
     final Region[] regions = getAllRegions(tableName, regionReplication);
 
     // Start count at '1' so we skip default, primary replica and only look at secondaries.
@@ -307,7 +450,7 @@
               continue;
             }
             return contains(contains, cells);
-          } catch(Throwable ex) {
+          } catch (Throwable ex) {
             LOG.warn("Verification from secondary region is not complete yet", ex);
             // still wait
             return false;
@@ -337,4 +480,144 @@
     }
     return !containsScanner.advance() && matches >= 1 && count >= matches && count == cells.size();
   }
+
+  private void doNGets(final Table table, final byte[][] keys) throws Exception {
+    for (byte[] key : keys) {
+      Result r = table.get(new Get(key));
+      assertArrayEquals(VALUE, r.getValue(HConstants.CATALOG_FAMILY, HConstants.CATALOG_FAMILY));
+    }
+  }
+
+  private void primaryNoChangeReplicaIncrease(final long[] before, final long[] after) {
+    assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID],
+      after[RegionInfo.DEFAULT_REPLICA_ID]);
+
+    for (int i = 1; i < after.length; i ++) {
+      assertTrue(after[i] > before[i]);
+    }
+  }
+
+  private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) {
+    // There are read requests increase for primary meta replica.
+    assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] >
+      before[RegionInfo.DEFAULT_REPLICA_ID]);
+
+    // No change for replica regions
+    for (int i = 1; i < after.length; i ++) {
+      assertEquals(before[i], after[i]);
+    }
+  }
+
+  private void primaryMayIncreaseReplicaNoChange(final long[] before, final long[] after) {
+    // For primary meta replica, scan request may increase. No change for replica meta regions.
+    assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] >=
+      before[RegionInfo.DEFAULT_REPLICA_ID]);
+
+    // No change for replica regions
+    for (int i = 1; i < after.length; i ++) {
+      assertEquals(before[i], after[i]);
+    }
+  }
+
+  private void getMetaReplicaReadRequests(final Region[] metaRegions, final long[] counters) {
+    int i = 0;
+    for (Region r : metaRegions) {
+      LOG.info("read request for region {} is {}", r, r.getReadRequestsCount());
+      counters[i] = r.getReadRequestsCount();
+      i ++;
+    }
+  }
+
+  @Test
+  public void testHBaseMetaReplicaGets() throws Exception {
+
+    TableName tn = TableName.valueOf(this.name.getMethodName());
+    final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
+    long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
+    long[] readReqsForMetaReplicasAfterGet = new long[numOfMetaReplica];
+    long[] readReqsForMetaReplicasAfterMove = new long[numOfMetaReplica];
+    long[] readReqsForMetaReplicasAfterSecondMove = new long[numOfMetaReplica];
+    long[] readReqsForMetaReplicasAfterThirdGet = new long[numOfMetaReplica];
+    Region userRegion = null;
+    HRegionServer srcRs = null;
+    HRegionServer destRs = null;
+
+    try (Table table = HTU.createTable(tn, HConstants.CATALOG_FAMILY,
+      Arrays.copyOfRange(HBaseTestingUtility.KEYS, 1, HBaseTestingUtility.KEYS.length))) {
+      verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
+      // load different values
+      HTU.loadTable(table, new byte[][] { HConstants.CATALOG_FAMILY }, VALUE);
+      for (int i = 0; i < NB_SERVERS; i++) {
+        HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+        List<HRegion> onlineRegions = rs.getRegions(tn);
+        if (onlineRegions.size() > 0) {
+          userRegion = onlineRegions.get(0);
+          srcRs = rs;
+          if (i > 0) {
+            destRs = HTU.getMiniHBaseCluster().getRegionServer(0);
+          } else {
+            destRs = HTU.getMiniHBaseCluster().getRegionServer(1);
+          }
+        }
+      }
+
+      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicas);
+
+      Configuration c = new Configuration(HTU.getConfiguration());
+      c.set(LOCATOR_META_REPLICAS_MODE, "LoadBalance");
+      Connection connection = ConnectionFactory.createConnection(c);
+      Table tableForGet = connection.getTable(tn);
+      byte[][] getRows = new byte[HBaseTestingUtility.KEYS.length][];
+
+      int i = 0;
+      for (byte[] key : HBaseTestingUtility.KEYS) {
+        getRows[i] = key;
+        i++;
+      }
+      getRows[0] = Bytes.toBytes("aaa");
+      doNGets(tableForGet, getRows);
+
+      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterGet);
+
+      // There is no read requests increase for primary meta replica.
+      // For rest of meta replicas, there are more reads against them.
+      primaryNoChangeReplicaIncrease(readReqsForMetaReplicas, readReqsForMetaReplicasAfterGet);
+
+      // move one of regions so it meta cache may be invalid.
+      HTU.moveRegionAndWait(userRegion.getRegionInfo(), destRs.getServerName());
+
+      doNGets(tableForGet, getRows);
+
+      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterMove);
+
+      // There are read requests increase for primary meta replica.
+      // For rest of meta replicas, there is no change as regionMove will tell the new location
+      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterGet,
+        readReqsForMetaReplicasAfterMove);
+      // Move region again.
+      HTU.moveRegionAndWait(userRegion.getRegionInfo(), srcRs.getServerName());
+
+      // Wait until moveRegion cache timeout.
+      while (destRs.getMovedRegion(userRegion.getRegionInfo().getEncodedName()) != null) {
+        Thread.sleep(1000);
+      }
+
+      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterSecondMove);
+
+      // There may be read requests increase for primary meta replica.
+      // For rest of meta replicas, there is no change.
+      primaryMayIncreaseReplicaNoChange(readReqsForMetaReplicasAfterMove,
+        readReqsForMetaReplicasAfterSecondMove);
+
+      doNGets(tableForGet, getRows);
+
+      getMetaReplicaReadRequests(metaRegions, readReqsForMetaReplicasAfterThirdGet);
+
+      // Since it gets RegionNotServedException, it will go to primary for the next lookup.
+      // There are read requests increase for primary meta replica.
+      // For rest of meta replicas, there is no change.
+      primaryIncreaseReplicaNoChange(readReqsForMetaReplicasAfterSecondMove,
+        readReqsForMetaReplicasAfterThirdGet);
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index c050b86..607c837 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -536,7 +536,6 @@
     try {
       rs.startup();
       assertTrue(rs.isSourceActive());
-      Waiter.waitFor(conf, 1000, () -> FaultyReplicationEndpoint.count > 0);
       Waiter.waitFor(conf, 1000, () -> rss.isAborted());
       assertTrue(rss.isAborted());
       Waiter.waitFor(conf, 1000, () -> !rs.isSourceActive());