HBASE-26084 Add owner of replication queue for ReplicationQueueInfo (#3477)
Signed-off-by: stack <stack@apple.com>
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
index d39a37e..49a2153 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
@@ -36,6 +36,7 @@
public class ReplicationQueueInfo {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueueInfo.class);
+ private final ServerName owner;
private final String peerId;
private final String queueId;
private boolean queueRecovered;
@@ -46,7 +47,8 @@
* The passed queueId will be either the id of the peer or the handling story of that queue
* in the form of id-servername-*
*/
- public ReplicationQueueInfo(String queueId) {
+ public ReplicationQueueInfo(ServerName owner, String queueId) {
+ this.owner = owner;
this.queueId = queueId;
String[] parts = queueId.split("-", 2);
this.queueRecovered = parts.length != 1;
@@ -58,6 +60,22 @@
}
/**
+ * A util method to parse the peerId from queueId.
+ */
+ public static String parsePeerId(String queueId) {
+ String[] parts = queueId.split("-", 2);
+ return parts.length != 1 ? parts[0] : queueId;
+ }
+
+ /**
+ * A util method to check whether a queue is recovered.
+ */
+ public static boolean isQueueRecovered(String queueId) {
+ String[] parts = queueId.split("-", 2);
+ return parts.length != 1;
+ }
+
+ /**
* Parse dead server names from queue id. servername can contain "-" such as
* "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
* cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-...
@@ -114,6 +132,10 @@
return Collections.unmodifiableList(this.deadRegionServers);
}
+ public ServerName getOwner() {
+ return this.owner;
+ }
+
public String getPeerId() {
return this.peerId;
}
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 7bafbc2..7dbfe41 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -86,8 +86,7 @@
for (ServerName replicator : queueStorage.getListOfReplicators()) {
List<String> queueIds = queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- if (queueInfo.getPeerId().equals(peerId)) {
+ if (ReplicationQueueInfo.parsePeerId(queueId).equals(peerId)) {
queueStorage.removeQueue(replicator, queueId);
}
}
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 08ac142..141e890 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -205,7 +205,7 @@
private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException {
- String peerId = new ReplicationQueueInfo(queueId).getPeerId();
+ String peerId = ReplicationQueueInfo.parsePeerId(queueId);
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
Pair<Long, Integer> p = getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index add5121..c2a21a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -103,8 +103,7 @@
for (ServerName replicator : queueStorage.getListOfReplicators()) {
List<String> queueIds = queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- if (queueInfo.getPeerId().equals(peerId)) {
+ if (ReplicationQueueInfo.parsePeerId(queueId).equals(peerId)) {
throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId +
", replicator: " + replicator + ", queueId: " + queueId);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
index 2654565..f31a98d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -701,25 +701,24 @@
public void finishRecoveredSource(RecoveredReplicationSource src) {
this.sources.remove(src.getQueueId());
this.sourceMetrics.remove(src.getQueueId());
- deleteQueue(src.getQueueId());
+ deleteQueue(src.getReplicationQueueInfo());
LOG.info("Finished recovering queue {} with the following stats: {}", src.getQueueId(),
src.getStats());
}
- public void startReplicationSource(ServerName producer, String queueId)
+ public void startReplicationSource(ServerName owner, String queueId)
throws IOException, ReplicationException {
- ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
+ ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(owner, queueId);
String peerId = replicationQueueInfo.getPeerId();
this.replicationPeers.addPeer(peerId);
- Path walDir =
- new Path(walRootDir, AbstractFSWALProvider.getWALDirectoryName(producer.toString()));
+ Path walDir = new Path(walRootDir, AbstractFSWALProvider.getWALDirectoryName(owner.toString()));
MetricsSource metrics = new MetricsSource(queueId);
ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
// init replication source
src.init(conf, walFs, walDir, this, queueStorage, replicationPeers.getPeer(peerId), this,
- producer, queueId, clusterId, createWALFileLengthProvider(producer, queueId), metrics);
- queueStorage.getWALsInQueue(producer, queueId)
+ replicationQueueInfo, clusterId, createWALFileLengthProvider(owner, queueId), metrics);
+ queueStorage.getWALsInQueue(owner, queueId)
.forEach(walName -> src.enqueueLog(new Path(walDir, walName)));
src.startup();
sources.put(queueId, src);
@@ -728,10 +727,11 @@
/**
* Delete a complete queue of wals associated with a replication source
- * @param queueId the id of replication queue to delete
+ * @param queueInfo the replication queue to delete
*/
- private void deleteQueue(String queueId) {
- abortWhenFail(() -> this.queueStorage.removeQueue(getServerName(), queueId));
+ private void deleteQueue(ReplicationQueueInfo queueInfo) {
+ abortWhenFail(() ->
+ this.queueStorage.removeQueue(queueInfo.getOwner(), queueInfo.getQueueId()));
}
@FunctionalInterface
@@ -748,7 +748,7 @@
}
private WALFileLengthProvider createWALFileLengthProvider(ServerName producer, String queueId) {
- if (new ReplicationQueueInfo(queueId).isQueueRecovered()) {
+ if (ReplicationQueueInfo.isQueueRecovered(queueId)) {
return p -> OptionalLong.empty();
}
return new RemoteWALFileLengthProvider(asyncClusterConnection, producer);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 11b0c7c..ef4fde3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -316,7 +316,7 @@
deadRegionServers.add(regionserver.getServerName());
}
for (String queueId : queueIds) {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+ ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(regionserver, queueId);
List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
Collections.sort(wals);
if (!peerIds.contains(queueInfo.getPeerId())) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 147556f..c8200ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -29,6 +29,7 @@
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.util.CommonFSUtils;
@@ -51,11 +52,11 @@
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
- ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+ ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo queueInfo,
UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
throws IOException {
- super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server, producer,
- queueId, clusterId, walFileLengthProvider, metrics);
+ super.init(conf, fs, walDir, overallController, queueStorage, replicationPeer, server,
+ queueInfo, clusterId, walFileLengthProvider, metrics);
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 0d9ee4b..4e46a79 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -112,8 +112,6 @@
private UUID clusterId;
// total number of edits we replicated
private AtomicLong totalReplicatedEdits = new AtomicLong(0);
- // The znode we currently play with
- protected String queueId;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
// Indicates if this particular source is running
@@ -190,7 +188,7 @@
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
- ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+ ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo queueInfo,
UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
throws IOException {
this.server = server;
@@ -212,8 +210,7 @@
this.metrics = metrics;
this.clusterId = clusterId;
- this.queueId = queueId;
- this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
+ this.replicationQueueInfo = queueInfo;
// A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
@@ -229,15 +226,15 @@
if (queueStorage instanceof ZKReplicationQueueStorage) {
ZKReplicationQueueStorage zkQueueStorage = (ZKReplicationQueueStorage) queueStorage;
zkQueueStorage.getZookeeper().registerListener(
- new ReplicationQueueListener(this, zkQueueStorage, producer, queueId, walDir));
+ new ReplicationQueueListener(this, zkQueueStorage, queueInfo, walDir));
LOG.info("Register a ZKListener to track the WALs from {}'s replication queue, queueId={}",
- producer, queueId);
+ queueInfo.getOwner(), queueInfo.getQueueId());
} else {
throw new UnsupportedOperationException(
"hbase.replication.offload.enabled=true only support ZKReplicationQueueStorage");
}
}
- LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", queueId,
+ LOG.info("queueId={}, ReplicationSource: {}, currentBandwidth={}", getQueueId(),
replicationPeer.getId(), this.currentBandwidth);
}
@@ -346,7 +343,7 @@
createNewWALReader(walGroupId, worker.getStartPosition());
Threads.setDaemonThreadRunning(
walReader, Thread.currentThread().getName()
- + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
+ + ".replicationSource.wal-reader." + walGroupId + "," + getQueueId(),
(t,e) -> this.uncaughtException(t, e, null, this.getPeerId()));
worker.setWALReader(walReader);
worker.startup((t,e) -> this.uncaughtException(t, e, null, this.getPeerId()));
@@ -594,7 +591,7 @@
setSourceStartupStatus(true);
initThread = new Thread(this::initialize);
Threads.setDaemonThreadRunning(initThread,
- Thread.currentThread().getName() + ".replicationSource," + this.queueId,
+ Thread.currentThread().getName() + ".replicationSource," + getQueueId(),
(t,e) -> {
//if first initialization attempt failed, and abortOnError is false, we will
//keep looping in this thread until initialize eventually succeeds,
@@ -638,10 +635,10 @@
public void terminate(String reason, Exception cause, boolean clearMetrics,
boolean join) {
if (cause == null) {
- LOG.info("{} Closing source {} because: {}", logPeerId(), this.queueId, reason);
+ LOG.info("{} Closing source {} because: {}", logPeerId(), getQueueId(), reason);
} else {
LOG.error(String.format("%s Closing source %s because an error occurred: %s",
- logPeerId(), this.queueId, reason), cause);
+ logPeerId(), getQueueId(), reason), cause);
}
this.sourceRunning = false;
if (initThread != null && Thread.currentThread() != initThread) {
@@ -698,7 +695,7 @@
TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
LOG.warn("{} Got exception while waiting for endpoint to shutdown "
- + "for replication source : {}", logPeerId(), this.queueId, te);
+ + "for replication source : {}", logPeerId(), getQueueId(), te);
}
}
}
@@ -711,11 +708,6 @@
}
@Override
- public String getQueueId() {
- return this.queueId;
- }
-
- @Override
public Path getCurrentPath() {
// only for testing
for (ReplicationSourceShipper worker : workerThreads.values()) {
@@ -957,10 +949,10 @@
private final Path walDir;
public ReplicationQueueListener(ReplicationSource source,
- ZKReplicationQueueStorage zkQueueStorage, ServerName producer, String queueId, Path walDir) {
+ ZKReplicationQueueStorage zkQueueStorage, ReplicationQueueInfo queueInfo, Path walDir) {
super(zkQueueStorage.getZookeeper());
this.source = source;
- this.queueNode = zkQueueStorage.getQueueNode(producer, queueId);
+ this.queueNode = zkQueueStorage.getQueueNode(queueInfo.getOwner(), queueInfo.getQueueId());
this.walDir = walDir;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index 56c8ee4..6254af5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -36,8 +36,7 @@
private ReplicationSourceFactory() {}
public static ReplicationSourceInterface create(Configuration conf, String queueId) {
- ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
- boolean isQueueRecovered = replicationQueueInfo.isQueueRecovered();
+ boolean isQueueRecovered = ReplicationQueueInfo.isQueueRecovered(queueId);
ReplicationSourceInterface src;
try {
String defaultReplicationSourceImpl =
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 461276e..1122e5e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -50,15 +51,14 @@
* @param queueStorage the replication queue storage
* @param replicationPeer the replication peer
* @param server the server which start and run this replication source
- * @param producer the name of region server which produce WAL to the replication queue
- * @param queueId the id of our replication queue
+ * @param queueInfo the replication queue
* @param clusterId unique UUID for the cluster
* @param walFileLengthProvider used to get the WAL length
* @param metrics metrics for this replication source
*/
void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
- ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+ ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo queueInfo,
UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
throws IOException;
@@ -105,7 +105,16 @@
*
* @return queue id
*/
- String getQueueId();
+ default String getQueueId() {
+ return getReplicationQueueInfo().getQueueId();
+ }
+
+ /**
+ * Get the replication queue info
+ *
+ * @return the replication queue info
+ */
+ ReplicationQueueInfo getReplicationQueueInfo();
/**
* Get the id that the source is replicating to.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 3dc2d12..c050e1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -298,14 +298,15 @@
private ReplicationSourceInterface createSource(String queueId, ReplicationPeer replicationPeer)
throws IOException {
ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
+ ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(server.getServerName(), queueId);
// Init the just created replication source. Pass the default walProvider's wal file length
// provider. Presumption is we replicate user-space Tables only. For hbase:meta region replica
// replication, see #createCatalogReplicationSource().
WALFileLengthProvider walFileLengthProvider =
this.walFactory.getWALProvider() != null?
this.walFactory.getWALProvider().getWALFileLengthProvider() : p -> OptionalLong.empty();
- src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, server.getServerName(),
- queueId, clusterId, walFileLengthProvider, new MetricsSource(queueId));
+ src.init(conf, fs, logDir, this, queueStorage, replicationPeer, server, queueInfo,
+ clusterId, walFileLengthProvider, new MetricsSource(queueId));
return src;
}
@@ -587,7 +588,7 @@
// a copy of the replication peer first to decide whether we should start the
// RecoveredReplicationSource. If the latest peer is not the old peer, we should also skip to
// start the RecoveredReplicationSource, Otherwise the rs will abort (See HBASE-20475).
- String peerId = new ReplicationQueueInfo(queue).getPeerId();
+ String peerId = ReplicationQueueInfo.parsePeerId(queue);
ReplicationPeerImpl oldPeer = replicationPeers.getPeer(peerId);
if (oldPeer == null) {
LOG.info("Not transferring queue since the replication peer {} for queue {} does not exist",
@@ -926,8 +927,8 @@
this.clusterId.toString());
final ReplicationSourceInterface crs = new CatalogReplicationSource();
crs.init(conf, fs, logDir, this, new NoopReplicationQueueStorage(), peer, server,
- server.getServerName(), peer.getId(), clusterId, walProvider.getWALFileLengthProvider(),
- new MetricsSource(peer.getId()));
+ new ReplicationQueueInfo(server.getServerName(), peer.getId()), clusterId,
+ walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
// Add listener on the provider so we can pick up the WAL to replicate on roll.
WALActionsListener listener = new WALActionsListener() {
@Override public void postLogRoll(Path oldPath, Path newPath) throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
index 7203fd1..ba440fc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -69,13 +69,11 @@
Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds());
for (ServerName replicator : queueStorage.getListOfReplicators()) {
for (String queueId : queueStorage.getAllQueues(replicator)) {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- if (!peerIds.contains(queueInfo.getPeerId())) {
+ String peerId = ReplicationQueueInfo.parsePeerId(queueId);
+ if (!peerIds.contains(peerId)) {
undeletedQueues.computeIfAbsent(replicator, key -> new ArrayList<>()).add(queueId);
- LOG.debug(
- "Undeleted replication queue for removed peer found: " +
- "[removedPeerId={}, replicator={}, queueId={}]",
- queueInfo.getPeerId(), replicator, queueId);
+ LOG.debug("Undeleted replication queue for removed peer found: " +
+ "[removedPeerId={}, replicator={}, queueId={}]", peerId, replicator, queueId);
}
}
}
@@ -99,10 +97,9 @@
undeletedQueueIds = getUnDeletedQueues();
undeletedQueueIds.forEach((replicator, queueIds) -> {
queueIds.forEach(queueId -> {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
String msg = "Undeleted replication queue for removed peer found: " +
- String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(),
- replicator, queueId);
+ String.format("[removedPeerId=%s, replicator=%s, queueId=%s]",
+ ReplicationQueueInfo.parsePeerId(queueId), replicator, queueId);
errorReporter.reportError(HbckErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg);
});
});
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index 8f28dee..6b87047 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -39,7 +39,7 @@
public class ReplicationSourceDummy implements ReplicationSourceInterface {
private ReplicationPeer replicationPeer;
- private String queueId;
+ private ReplicationQueueInfo replicationQueueInfo;
private Path currentPath;
private MetricsSource metrics;
private WALFileLengthProvider walFileLengthProvider;
@@ -48,10 +48,10 @@
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
- ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+ ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo queueInfo,
UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
throws IOException {
- this.queueId = queueId;
+ this.replicationQueueInfo = queueInfo;
this.metrics = metrics;
this.walFileLengthProvider = walFileLengthProvider;
this.replicationPeer = replicationPeer;
@@ -69,6 +69,11 @@
}
@Override
+ public ReplicationQueueInfo getReplicationQueueInfo() {
+ return this.replicationQueueInfo;
+ }
+
+ @Override
public ReplicationSourceInterface startup() {
startup.set(true);
return this;
@@ -96,15 +101,8 @@
}
@Override
- public String getQueueId() {
- return queueId;
- }
-
- @Override
public String getPeerId() {
- String[] parts = queueId.split("-", 2);
- return parts.length != 1 ?
- parts[0] : queueId;
+ return this.replicationQueueInfo.getPeerId();
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index bd673bc..dde2c3c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -139,7 +140,8 @@
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, null,
+ rs.init(conf, null, null, manager, null, mockPeer, rss,
+ new ReplicationQueueInfo(rss.getServerName(), queueId), null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
try {
rs.startup();
@@ -177,7 +179,8 @@
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, uuid,
+ rs.init(conf, null, null, manager, null, mockPeer, rss,
+ new ReplicationQueueInfo(rss.getServerName(), queueId), uuid,
p -> OptionalLong.empty(), new MetricsSource(queueId));
try {
rs.startup();
@@ -265,8 +268,8 @@
testConf.setInt("replication.source.maxretriesmultiplier", 1);
ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
- source.init(testConf, null, null, manager, null, mockPeer, null, null, "testPeer", null,
- p -> OptionalLong.empty(), null);
+ source.init(testConf, null, null, manager, null, mockPeer, null,
+ new ReplicationQueueInfo(null, "testPeer"), null, p -> OptionalLong.empty(), null);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(
() -> source.terminate("testing source termination"));
@@ -289,8 +292,9 @@
ReplicationPeer mockPeer = mock(ReplicationPeer.class);
Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
Configuration testConf = HBaseConfiguration.create();
- source.init(testConf, null, null, mockManager, null, mockPeer, null, null,
- "testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
+ source.init(testConf, null, null, mockManager, null, mockPeer, null,
+ new ReplicationQueueInfo(null, "testPeer"), null, p -> OptionalLong.empty(),
+ mock(MetricsSource.class));
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
conf, null, 0, null, source, null);
ReplicationSourceShipper shipper =
@@ -536,7 +540,8 @@
String queueId = "qid";
RegionServerServices rss =
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
- rs.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), queueId, null,
+ rs.init(conf, null, null, manager, null, mockPeer, rss,
+ new ReplicationQueueInfo(rss.getServerName(), queueId), null,
p -> OptionalLong.empty(), new MetricsSource(queueId));
return rss;
}
@@ -655,7 +660,8 @@
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
ReplicationSource source = new ReplicationSource();
- source.init(conf, null, null, manager, null, mockPeer, rss, rss.getServerName(), id, null,
+ source.init(conf, null, null, manager, null, mockPeer, rss,
+ new ReplicationQueueInfo(rss.getServerName(), id), null,
p -> OptionalLong.empty(), metrics);
final Path log1 = new Path(logDir, "log-walgroup-a.8");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index e6b745e..2b6ab55 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -70,6 +70,7 @@
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceController;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
@@ -414,8 +415,8 @@
assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group));
ReplicationSourceInterface source = new ReplicationSource();
source.init(conf, fs, null, manager, manager.getQueueStorage(), rp1.getPeer("1"),
- manager.getServer(), manager.getServer().getServerName(), id, null, p -> OptionalLong.empty(),
- null);
+ manager.getServer(), new ReplicationQueueInfo(manager.getServer().getServerName(), id), null,
+ p -> OptionalLong.empty(), null);
source.cleanOldWALs(file2, false);
// log1 should be deleted
assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group));
@@ -632,7 +633,8 @@
ReplicationSourceInterface source = new ReplicationSource();
source.init(conf, fs, null, manager, manager.getQueueStorage(),
mockReplicationPeerForSyncReplication(peerId2), manager.getServer(),
- manager.getServer().getServerName(), peerId2, null, p -> OptionalLong.empty(), null);
+ new ReplicationQueueInfo(manager.getServer().getServerName(), peerId2), null,
+ p -> OptionalLong.empty(), null);
source.cleanOldWALs(walName, true);
// still there if peer id does not match
assertTrue(fs.exists(remoteWAL));
@@ -640,7 +642,8 @@
source = new ReplicationSource();
source.init(conf, fs, null, manager, manager.getQueueStorage(),
mockReplicationPeerForSyncReplication(slaveId), manager.getServer(),
- manager.getServer().getServerName(), slaveId, null, p -> OptionalLong.empty(), null);
+ new ReplicationQueueInfo(manager.getServer().getServerName(), slaveId), null,
+ p -> OptionalLong.empty(), null);
source.cleanOldWALs(walName, true);
assertFalse(fs.exists(remoteWAL));
} finally {
@@ -821,7 +824,7 @@
@Override
public void init(Configuration conf, FileSystem fs, Path walDir,
ReplicationSourceController overallController, ReplicationQueueStorage queueStorage,
- ReplicationPeer replicationPeer, Server server, ServerName producer, String queueId,
+ ReplicationPeer replicationPeer, Server server, ReplicationQueueInfo queueInfo,
UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics)
throws IOException {
throw new IOException("Failing deliberately");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
index 8e0ab0f..86c141b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
@@ -95,7 +95,8 @@
queueStorage.claimQueue(serverName, unclaimed.get(0), s3.getServerName()).getFirst();
queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
- ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3);
+ ReplicationQueueInfo replicationQueueInfo =
+ new ReplicationQueueInfo(s3.getServerName(), queue3);
List<ServerName> result = replicationQueueInfo.getDeadRegionServers();
// verify
assertTrue(result.contains(server.getServerName()));