HBASE-28521 Use standard ConnectionRegistry and Client API to get region server list in in replication (#5825)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
Reviewed-by: Andor Molnár <andor@apache.org>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index 564f433..f0ea993 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -19,28 +19,25 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZKListener;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.ReservoirSample;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.AuthFailedException;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,12 +53,11 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
 
-  private ZKWatcher zkw = null;
-  private final Object zkwLock = new Object();
-
   protected Configuration conf;
 
-  private AsyncClusterConnection conn;
+  private final Object connLock = new Object();
+
+  private volatile AsyncClusterConnection conn;
 
   /**
    * Default maximum number of times a replication sink can be reported as bad before it will no
@@ -106,36 +102,15 @@
     this.badReportCounts = Maps.newHashMap();
   }
 
-  protected void disconnect() {
-    synchronized (zkwLock) {
-      if (zkw != null) {
-        zkw.close();
-      }
-    }
-    if (this.conn != null) {
-      try {
-        this.conn.close();
-        this.conn = null;
-      } catch (IOException e) {
-        LOG.warn("{} Failed to close the connection", ctx.getPeerId());
-      }
-    }
-  }
-
-  /**
-   * A private method used to re-establish a zookeeper session with a peer cluster.
-   */
-  private void reconnect(KeeperException ke) {
-    if (
-      ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
-        || ke instanceof AuthFailedException
-    ) {
-      String clusterKey = ctx.getPeerConfig().getClusterKey();
-      LOG.warn("Lost the ZooKeeper connection for peer {}", clusterKey, ke);
-      try {
-        reloadZkWatcher();
-      } catch (IOException io) {
-        LOG.warn("Creation of ZookeeperWatcher failed for peer {}", clusterKey, io);
+  private void disconnect() {
+    synchronized (connLock) {
+      if (this.conn != null) {
+        try {
+          this.conn.close();
+          this.conn = null;
+        } catch (IOException e) {
+          LOG.warn("{} Failed to close the connection", ctx.getPeerId());
+        }
       }
     }
   }
@@ -152,13 +127,7 @@
 
   @Override
   protected void doStart() {
-    try {
-      reloadZkWatcher();
-      connectPeerCluster();
-      notifyStarted();
-    } catch (IOException e) {
-      notifyFailed(e);
-    }
+    notifyStarted();
   }
 
   @Override
@@ -168,44 +137,40 @@
   }
 
   @Override
-  // Synchronize peer cluster connection attempts to avoid races and rate
-  // limit connections when multiple replication sources try to connect to
-  // the peer cluster. If the peer cluster is down we can get out of control
-  // over time.
   public UUID getPeerUUID() {
-    UUID peerUUID = null;
     try {
-      synchronized (zkwLock) {
-        peerUUID = ZKClusterId.getUUIDForCluster(zkw);
-      }
-    } catch (KeeperException ke) {
-      reconnect(ke);
-    }
-    return peerUUID;
-  }
-
-  /**
-   * Closes the current ZKW (if not null) and creates a new one
-   * @throws IOException If anything goes wrong connecting
-   */
-  private void reloadZkWatcher() throws IOException {
-    synchronized (zkwLock) {
-      if (zkw != null) {
-        zkw.close();
-      }
-      zkw =
-        new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this);
-      zkw.registerListener(new PeerRegionServerListener(this));
+      AsyncClusterConnection conn = connect();
+      String clusterId = FutureUtils
+        .get(conn.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)))
+        .getClusterId();
+      return UUID.fromString(clusterId);
+    } catch (IOException e) {
+      LOG.warn("Failed to get cluster id for cluster", e);
+      return null;
     }
   }
 
-  private void connectPeerCluster() throws IOException {
-    try {
-      conn = createConnection(this.conf);
-    } catch (IOException ioe) {
-      LOG.warn("{} Failed to create connection for peer cluster", ctx.getPeerId(), ioe);
-      throw ioe;
+  // do not call this method in doStart method, only initialize the connection to remote cluster
+  // when you actually wants to make use of it. The problem here is that, starting the replication
+  // endpoint is part of the region server initialization work, so if the peer cluster is fully
+  // down and we can not connect to it, we will cause the initialization to fail and crash the
+  // region server, as we need the cluster id while setting up the AsyncClusterConnection, which
+  // needs to at least connect to zookeeper or some other servers in the peer cluster based on
+  // different connection registry implementation
+  private AsyncClusterConnection connect() throws IOException {
+    AsyncClusterConnection c = this.conn;
+    if (c != null) {
+      return c;
     }
+    synchronized (connLock) {
+      c = this.conn;
+      if (c != null) {
+        return c;
+      }
+      c = createConnection(this.conf);
+      conn = c;
+    }
+    return c;
   }
 
   @Override
@@ -224,36 +189,27 @@
    * Get the list of all the region servers from the specified peer
    * @return list of region server addresses or an empty list if the slave is unavailable
    */
-  protected List<ServerName> fetchSlavesAddresses() {
-    List<String> children = null;
+  // will be overrided in tests so protected
+  protected Collection<ServerName> fetchPeerAddresses() {
     try {
-      synchronized (zkwLock) {
-        children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode);
-      }
-    } catch (KeeperException ke) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Fetch slaves addresses failed", ke);
-      }
-      reconnect(ke);
-    }
-    if (children == null) {
+      return FutureUtils.get(connect().getAdmin().getRegionServers(true));
+    } catch (IOException e) {
+      LOG.debug("Fetch peer addresses failed", e);
       return Collections.emptyList();
     }
-    List<ServerName> addresses = new ArrayList<>(children.size());
-    for (String child : children) {
-      addresses.add(ServerName.parseServerName(child));
-    }
-    return addresses;
   }
 
   protected synchronized void chooseSinks() {
-    List<ServerName> slaveAddresses = fetchSlavesAddresses();
+    Collection<ServerName> slaveAddresses = fetchPeerAddresses();
     if (slaveAddresses.isEmpty()) {
       LOG.warn("No sinks available at peer. Will not be able to replicate");
+      this.sinkServers = Collections.emptyList();
+    } else {
+      int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
+      ReservoirSample<ServerName> sample = new ReservoirSample<>(numSinks);
+      sample.add(slaveAddresses.iterator());
+      this.sinkServers = sample.getSamplingResult();
     }
-    Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
-    int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
-    this.sinkServers = slaveAddresses.subList(0, numSinks);
     badReportCounts.clear();
   }
 
@@ -275,7 +231,7 @@
     }
     ServerName serverName =
       sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
-    return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
+    return new SinkPeer(serverName, connect().getRegionServerAdmin(serverName));
   }
 
   /**
@@ -308,29 +264,6 @@
   }
 
   /**
-   * Tracks changes to the list of region servers in a peer's cluster.
-   */
-  public static class PeerRegionServerListener extends ZKListener {
-
-    private final HBaseReplicationEndpoint replicationEndpoint;
-    private final String regionServerListNode;
-
-    public PeerRegionServerListener(HBaseReplicationEndpoint endpoint) {
-      super(endpoint.zkw);
-      this.replicationEndpoint = endpoint;
-      this.regionServerListNode = endpoint.zkw.getZNodePaths().rsZNode;
-    }
-
-    @Override
-    public synchronized void nodeChildrenChanged(String path) {
-      if (path.equals(regionServerListNode)) {
-        LOG.info("Detected change to peer region servers, fetching updated list");
-        replicationEndpoint.chooseSinks();
-      }
-    }
-  }
-
-  /**
    * Wraps a replication region server sink to provide the ability to identify it.
    */
   public static class SinkPeer {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index d895920..6bdc977 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -106,7 +106,6 @@
   private boolean isSerial = false;
   // Initialising as 0 to guarantee at least one logging message
   private long lastSinkFetchTime = 0;
-  private volatile boolean stopping = false;
 
   @Override
   public void init(Context context) throws IOException {
@@ -449,7 +448,7 @@
     }
 
     List<List<Entry>> batches = createBatches(replicateContext.getEntries());
-    while (this.isRunning() && !this.stopping) {
+    while (this.isRunning()) {
       if (!isPeerEnabled()) {
         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
           sleepMultiplier++;
@@ -514,14 +513,6 @@
     return ctx.getReplicationPeer().isPeerEnabled();
   }
 
-  @Override
-  protected void doStop() {
-    // Allow currently running replication tasks to finish
-    this.stopping = true;
-    disconnect(); // don't call super.doStop()
-    notifyStopped();
-  }
-
   protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex,
     int timeout) {
     int entriesHashCode = System.identityHashCode(entries);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
index 7e1df9d..95adc8a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
@@ -21,6 +21,7 @@
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -166,6 +167,9 @@
     ServerName serverNameA = endpoint.getSinkServers().get(0);
     ServerName serverNameB = endpoint.getSinkServers().get(1);
 
+    serverNames.remove(serverNameA);
+    serverNames.remove(serverNameB);
+
     SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
     SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
 
@@ -191,7 +195,7 @@
     }
 
     @Override
-    public List<ServerName> fetchSlavesAddresses() {
+    protected Collection<ServerName> fetchPeerAddresses() {
       return regionServers;
     }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 2747752..1429c32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -25,12 +25,14 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -369,6 +371,14 @@
     waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
   }
 
+  protected static void stopAllRegionServers(HBaseTestingUtil util) throws IOException {
+    List<ServerName> rses = util.getMiniHBaseCluster().getRegionServerThreads().stream()
+      .map(t -> t.getRegionServer().getServerName()).collect(Collectors.toList());
+    for (ServerName rs : rses) {
+      util.getMiniHBaseCluster().stopRegionServer(rs);
+    }
+  }
+
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
     if (htable2 != null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
index 161e3c8..de19d0f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
@@ -44,7 +44,11 @@
 
   @Test
   public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception {
-    UTIL2.shutdownMiniHBaseCluster();
+    // stop all region servers, we need to keep the master up as the below assertions need to get
+    // cluster id from remote cluster, if master is also down, we can not get any information from
+    // the remote cluster after source cluster restarts
+    stopAllRegionServers(UTIL2);
+
     // add some values to cluster 1
     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
       Put p = new Put(Bytes.toBytes("row" + i));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
index 92688cb..c9ef613 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
@@ -45,7 +45,10 @@
 
   @Test
   public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception {
-    UTIL2.shutdownMiniHBaseCluster();
+    // stop all region servers, we need to keep the master up as the below assertions need to get
+    // cluster id from remote cluster, if master is also down, we can not get any information from
+    // the remote cluster after source cluster restarts
+    stopAllRegionServers(UTIL2);
     restartSourceCluster(1);
     Admin hbaseAdmin = UTIL1.getAdmin();
     // add some values to source cluster
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
index 018bfb9..b3e52e8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
@@ -42,7 +42,10 @@
 
   @Test
   public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception {
-    UTIL2.shutdownMiniHBaseCluster();
+    // stop all region servers, we need to keep the master up as the below assertions need to get
+    // cluster id from remote cluster, if master is also down, we can not get any information from
+    // the remote cluster after source cluster restarts
+    stopAllRegionServers(UTIL2);
     restartSourceCluster(1);
     Admin hbaseAdmin = UTIL1.getAdmin();
     ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
index 3b097cf..269fa1b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
@@ -46,7 +46,10 @@
 
   @Test
   public void testReplicationStatusSourceStartedTargetStoppedWithRecovery() throws Exception {
-    UTIL2.shutdownMiniHBaseCluster();
+    // stop all region servers, we need to keep the master up as the below assertions need to get
+    // cluster id from remote cluster, if master is also down, we can not get any information from
+    // the remote cluster after source cluster restarts
+    stopAllRegionServers(UTIL2);
     // add some values to cluster 1
     for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
       Put p = new Put(Bytes.toBytes("row" + i));