HBASE-28521 Use standard ConnectionRegistry and Client API to get region server list in in replication (#5825)
Signed-off-by: Guanghao Zhang <zghao@apache.org>
Reviewed-by: Andor Molnár <andor@apache.org>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
index 564f433..f0ea993 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java
@@ -19,28 +19,25 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZKListener;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.ReservoirSample;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.AuthFailedException;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,12 +53,11 @@
private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
- private ZKWatcher zkw = null;
- private final Object zkwLock = new Object();
-
protected Configuration conf;
- private AsyncClusterConnection conn;
+ private final Object connLock = new Object();
+
+ private volatile AsyncClusterConnection conn;
/**
* Default maximum number of times a replication sink can be reported as bad before it will no
@@ -106,36 +102,15 @@
this.badReportCounts = Maps.newHashMap();
}
- protected void disconnect() {
- synchronized (zkwLock) {
- if (zkw != null) {
- zkw.close();
- }
- }
- if (this.conn != null) {
- try {
- this.conn.close();
- this.conn = null;
- } catch (IOException e) {
- LOG.warn("{} Failed to close the connection", ctx.getPeerId());
- }
- }
- }
-
- /**
- * A private method used to re-establish a zookeeper session with a peer cluster.
- */
- private void reconnect(KeeperException ke) {
- if (
- ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
- || ke instanceof AuthFailedException
- ) {
- String clusterKey = ctx.getPeerConfig().getClusterKey();
- LOG.warn("Lost the ZooKeeper connection for peer {}", clusterKey, ke);
- try {
- reloadZkWatcher();
- } catch (IOException io) {
- LOG.warn("Creation of ZookeeperWatcher failed for peer {}", clusterKey, io);
+ private void disconnect() {
+ synchronized (connLock) {
+ if (this.conn != null) {
+ try {
+ this.conn.close();
+ this.conn = null;
+ } catch (IOException e) {
+ LOG.warn("{} Failed to close the connection", ctx.getPeerId());
+ }
}
}
}
@@ -152,13 +127,7 @@
@Override
protected void doStart() {
- try {
- reloadZkWatcher();
- connectPeerCluster();
- notifyStarted();
- } catch (IOException e) {
- notifyFailed(e);
- }
+ notifyStarted();
}
@Override
@@ -168,44 +137,40 @@
}
@Override
- // Synchronize peer cluster connection attempts to avoid races and rate
- // limit connections when multiple replication sources try to connect to
- // the peer cluster. If the peer cluster is down we can get out of control
- // over time.
public UUID getPeerUUID() {
- UUID peerUUID = null;
try {
- synchronized (zkwLock) {
- peerUUID = ZKClusterId.getUUIDForCluster(zkw);
- }
- } catch (KeeperException ke) {
- reconnect(ke);
- }
- return peerUUID;
- }
-
- /**
- * Closes the current ZKW (if not null) and creates a new one
- * @throws IOException If anything goes wrong connecting
- */
- private void reloadZkWatcher() throws IOException {
- synchronized (zkwLock) {
- if (zkw != null) {
- zkw.close();
- }
- zkw =
- new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this);
- zkw.registerListener(new PeerRegionServerListener(this));
+ AsyncClusterConnection conn = connect();
+ String clusterId = FutureUtils
+ .get(conn.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)))
+ .getClusterId();
+ return UUID.fromString(clusterId);
+ } catch (IOException e) {
+ LOG.warn("Failed to get cluster id for cluster", e);
+ return null;
}
}
- private void connectPeerCluster() throws IOException {
- try {
- conn = createConnection(this.conf);
- } catch (IOException ioe) {
- LOG.warn("{} Failed to create connection for peer cluster", ctx.getPeerId(), ioe);
- throw ioe;
+ // do not call this method in doStart method, only initialize the connection to remote cluster
+ // when you actually wants to make use of it. The problem here is that, starting the replication
+ // endpoint is part of the region server initialization work, so if the peer cluster is fully
+ // down and we can not connect to it, we will cause the initialization to fail and crash the
+ // region server, as we need the cluster id while setting up the AsyncClusterConnection, which
+ // needs to at least connect to zookeeper or some other servers in the peer cluster based on
+ // different connection registry implementation
+ private AsyncClusterConnection connect() throws IOException {
+ AsyncClusterConnection c = this.conn;
+ if (c != null) {
+ return c;
}
+ synchronized (connLock) {
+ c = this.conn;
+ if (c != null) {
+ return c;
+ }
+ c = createConnection(this.conf);
+ conn = c;
+ }
+ return c;
}
@Override
@@ -224,36 +189,27 @@
* Get the list of all the region servers from the specified peer
* @return list of region server addresses or an empty list if the slave is unavailable
*/
- protected List<ServerName> fetchSlavesAddresses() {
- List<String> children = null;
+ // will be overrided in tests so protected
+ protected Collection<ServerName> fetchPeerAddresses() {
try {
- synchronized (zkwLock) {
- children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode);
- }
- } catch (KeeperException ke) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Fetch slaves addresses failed", ke);
- }
- reconnect(ke);
- }
- if (children == null) {
+ return FutureUtils.get(connect().getAdmin().getRegionServers(true));
+ } catch (IOException e) {
+ LOG.debug("Fetch peer addresses failed", e);
return Collections.emptyList();
}
- List<ServerName> addresses = new ArrayList<>(children.size());
- for (String child : children) {
- addresses.add(ServerName.parseServerName(child));
- }
- return addresses;
}
protected synchronized void chooseSinks() {
- List<ServerName> slaveAddresses = fetchSlavesAddresses();
+ Collection<ServerName> slaveAddresses = fetchPeerAddresses();
if (slaveAddresses.isEmpty()) {
LOG.warn("No sinks available at peer. Will not be able to replicate");
+ this.sinkServers = Collections.emptyList();
+ } else {
+ int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
+ ReservoirSample<ServerName> sample = new ReservoirSample<>(numSinks);
+ sample.add(slaveAddresses.iterator());
+ this.sinkServers = sample.getSamplingResult();
}
- Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
- int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
- this.sinkServers = slaveAddresses.subList(0, numSinks);
badReportCounts.clear();
}
@@ -275,7 +231,7 @@
}
ServerName serverName =
sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
- return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
+ return new SinkPeer(serverName, connect().getRegionServerAdmin(serverName));
}
/**
@@ -308,29 +264,6 @@
}
/**
- * Tracks changes to the list of region servers in a peer's cluster.
- */
- public static class PeerRegionServerListener extends ZKListener {
-
- private final HBaseReplicationEndpoint replicationEndpoint;
- private final String regionServerListNode;
-
- public PeerRegionServerListener(HBaseReplicationEndpoint endpoint) {
- super(endpoint.zkw);
- this.replicationEndpoint = endpoint;
- this.regionServerListNode = endpoint.zkw.getZNodePaths().rsZNode;
- }
-
- @Override
- public synchronized void nodeChildrenChanged(String path) {
- if (path.equals(regionServerListNode)) {
- LOG.info("Detected change to peer region servers, fetching updated list");
- replicationEndpoint.chooseSinks();
- }
- }
- }
-
- /**
* Wraps a replication region server sink to provide the ability to identify it.
*/
public static class SinkPeer {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index d895920..6bdc977 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -106,7 +106,6 @@
private boolean isSerial = false;
// Initialising as 0 to guarantee at least one logging message
private long lastSinkFetchTime = 0;
- private volatile boolean stopping = false;
@Override
public void init(Context context) throws IOException {
@@ -449,7 +448,7 @@
}
List<List<Entry>> batches = createBatches(replicateContext.getEntries());
- while (this.isRunning() && !this.stopping) {
+ while (this.isRunning()) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
@@ -514,14 +513,6 @@
return ctx.getReplicationPeer().isPeerEnabled();
}
- @Override
- protected void doStop() {
- // Allow currently running replication tasks to finish
- this.stopping = true;
- disconnect(); // don't call super.doStop()
- notifyStopped();
- }
-
protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex,
int timeout) {
int entriesHashCode = System.identityHashCode(entries);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
index 7e1df9d..95adc8a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestHBaseReplicationEndpoint.java
@@ -21,6 +21,7 @@
import static org.mockito.Mockito.mock;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -166,6 +167,9 @@
ServerName serverNameA = endpoint.getSinkServers().get(0);
ServerName serverNameB = endpoint.getSinkServers().get(1);
+ serverNames.remove(serverNameA);
+ serverNames.remove(serverNameB);
+
SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
@@ -191,7 +195,7 @@
}
@Override
- public List<ServerName> fetchSlavesAddresses() {
+ protected Collection<ServerName> fetchPeerAddresses() {
return regionServers;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 2747752..1429c32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -25,12 +25,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -369,6 +371,14 @@
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
}
+ protected static void stopAllRegionServers(HBaseTestingUtil util) throws IOException {
+ List<ServerName> rses = util.getMiniHBaseCluster().getRegionServerThreads().stream()
+ .map(t -> t.getRegionServer().getServerName()).collect(Collectors.toList());
+ for (ServerName rs : rses) {
+ util.getMiniHBaseCluster().stopRegionServer(rs);
+ }
+ }
+
@AfterClass
public static void tearDownAfterClass() throws Exception {
if (htable2 != null) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
index 161e3c8..de19d0f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
@@ -44,7 +44,11 @@
@Test
public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception {
- UTIL2.shutdownMiniHBaseCluster();
+ // stop all region servers, we need to keep the master up as the below assertions need to get
+ // cluster id from remote cluster, if master is also down, we can not get any information from
+ // the remote cluster after source cluster restarts
+ stopAllRegionServers(UTIL2);
+
// add some values to cluster 1
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
index 92688cb..c9ef613 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
@@ -45,7 +45,10 @@
@Test
public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception {
- UTIL2.shutdownMiniHBaseCluster();
+ // stop all region servers, we need to keep the master up as the below assertions need to get
+ // cluster id from remote cluster, if master is also down, we can not get any information from
+ // the remote cluster after source cluster restarts
+ stopAllRegionServers(UTIL2);
restartSourceCluster(1);
Admin hbaseAdmin = UTIL1.getAdmin();
// add some values to source cluster
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
index 018bfb9..b3e52e8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
@@ -42,7 +42,10 @@
@Test
public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception {
- UTIL2.shutdownMiniHBaseCluster();
+ // stop all region servers, we need to keep the master up as the below assertions need to get
+ // cluster id from remote cluster, if master is also down, we can not get any information from
+ // the remote cluster after source cluster restarts
+ stopAllRegionServers(UTIL2);
restartSourceCluster(1);
Admin hbaseAdmin = UTIL1.getAdmin();
ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
index 3b097cf..269fa1b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
@@ -46,7 +46,10 @@
@Test
public void testReplicationStatusSourceStartedTargetStoppedWithRecovery() throws Exception {
- UTIL2.shutdownMiniHBaseCluster();
+ // stop all region servers, we need to keep the master up as the below assertions need to get
+ // cluster id from remote cluster, if master is also down, we can not get any information from
+ // the remote cluster after source cluster restarts
+ stopAllRegionServers(UTIL2);
// add some values to cluster 1
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));