HBASE-25539: Add age of oldest wal metric (#2945)
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
index 1c04109..d4cc1e3 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
@@ -186,6 +186,18 @@
public void incrFailedRecoveryQueue() {
failedRecoveryQueue.incr(1L);
}
+
+ @Override
+ public void setOldestWalAge(long age) {
+ // Not implemented
+ }
+
+ @Override
+ public long getOldestWalAge() {
+ // Not implemented
+ return 0;
+ }
+
@Override
public void init() {
rms.init();
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index edb7864..ac396af 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -49,6 +49,8 @@
public static final String SOURCE_COMPLETED_LOGS = "source.completedLogs";
public static final String SOURCE_COMPLETED_RECOVERY_QUEUES = "source.completedRecoverQueues";
public static final String SOURCE_FAILED_RECOVERY_QUEUES = "source.failedRecoverQueues";
+ /* Used to track the age of oldest wal in ms since its creation time */
+ String OLDEST_WAL_AGE = "source.oldestWalAge";
void setLastShippedAge(long age);
void incrSizeOfLogQueue(int size);
@@ -76,4 +78,6 @@
long getWALEditsRead();
long getShippedOps();
long getEditsFiltered();
+ void setOldestWalAge(long age);
+ long getOldestWalAge();
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 8ce2993..2ceb77b 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -39,6 +39,7 @@
private final String logReadInBytesKey;
private final String shippedHFilesKey;
private final String sizeOfHFileRefsQueueKey;
+ private final String oldestWalAgeKey;
private final MutableHistogram ageOfLastShippedOpHist;
private final MutableGaugeLong sizeOfLogQueueGauge;
@@ -65,6 +66,7 @@
private final MutableFastCounter repeatedFileBytes;
private final MutableFastCounter completedWAL;
private final MutableFastCounter completedRecoveryQueue;
+ private final MutableGaugeLong oldestWalAge;
public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
this.rms = rms;
@@ -121,6 +123,9 @@
completedRecoveryKey = this.keyPrefix + "completedRecoverQueues";
completedRecoveryQueue = rms.getMetricsRegistry().getCounter(completedRecoveryKey, 0L);
+
+ oldestWalAgeKey = this.keyPrefix + "oldestWalAge";
+ oldestWalAge = rms.getMetricsRegistry().getGauge(oldestWalAgeKey, 0L);
}
@Override public void setLastShippedAge(long age) {
@@ -183,6 +188,7 @@
rms.removeMetric(repeatedBytesKey);
rms.removeMetric(completedLogsKey);
rms.removeMetric(completedRecoveryKey);
+ rms.removeMetric(oldestWalAgeKey);
}
@Override
@@ -248,6 +254,14 @@
@Override
public void incrFailedRecoveryQueue() {/*no op*/}
+ @Override public void setOldestWalAge(long age) {
+ oldestWalAge.set(age);
+ }
+
+ @Override public long getOldestWalAge() {
+ return oldestWalAge.value();
+ }
+
@Override
public void init() {
rms.init();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 098418d..4ef98d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -386,6 +386,17 @@
globalSourceSource.incrFailedRecoveryQueue();
}
+ /*
+ Sets the age of oldest log file just for source.
+ */
+ public void setOldestWalAge(long age) {
+ singleSourceSource.setOldestWalAge(age);
+ }
+
+ public long getOldestWalAge() {
+ return singleSourceSource.getOldestWalAge();
+ }
+
@Override
public void init() {
singleSourceSource.init();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index b66b7f1..526c3e3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -57,13 +57,13 @@
}
@Override
- protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
- PriorityBlockingQueue<Path> queue) {
- return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
+ protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId) {
+ return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, this, queueStorage);
}
- public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
+ public void locateRecoveredPaths(String walGroupId) throws IOException {
boolean hasPathChanged = false;
+ PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
PriorityBlockingQueue<Path> newPaths = new PriorityBlockingQueue<Path>(queueSizePerGroup,
new AbstractFSWALProvider.WALStartTimeComparator());
pathsLoop: for (Path path : queue) {
@@ -116,9 +116,9 @@
// put the correct locations in the queue
// since this is a recovered queue with no new incoming logs,
// there shouldn't be any concurrency issues
- queue.clear();
+ logQueue.clear(walGroupId);
for (Path path : newPaths) {
- queue.add(path);
+ logQueue.enqueueLog(path, walGroupId);
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
index b0d4db0..2c0f9f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSourceShipper.java
@@ -18,9 +18,7 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
-import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Threads;
@@ -40,9 +38,9 @@
private final ReplicationQueueStorage replicationQueues;
public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
- PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
+ ReplicationSourceLogQueue logQueue, RecoveredReplicationSource source,
ReplicationQueueStorage queueStorage) {
- super(conf, walGroupId, queue, source);
+ super(conf, walGroupId, logQueue, source);
this.source = source;
this.replicationQueues = queueStorage;
}
@@ -58,7 +56,7 @@
int numRetries = 0;
while (numRetries <= maxRetriesMultiplier) {
try {
- source.locateRecoveredPaths(queue);
+ source.locateRecoveredPaths(walGroupId);
break;
} catch (IOException e) {
LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
@@ -75,9 +73,9 @@
String peerClusterZNode = source.getQueueId();
try {
startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(),
- peerClusterZNode, this.queue.peek().getName());
- LOG.trace("Recovered queue started with log {} at position {}", this.queue.peek(),
- startPosition);
+ peerClusterZNode, this.logQueue.getQueue(walGroupId).peek().getName());
+ LOG.trace("Recovered queue started with log {} at position {}",
+ this.logQueue.getQueue(walGroupId).peek(), startPosition);
} catch (ReplicationException e) {
terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 6fb725d..3272cf1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -24,14 +24,12 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -85,11 +83,9 @@
public class ReplicationSource implements ReplicationSourceInterface {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
- // Queues of logs to process, entry in format of walGroupId->queue,
- // each presents a queue for one wal group
- private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
// per group queue size, keep no more than this number of logs in each wal group
protected int queueSizePerGroup;
+ protected ReplicationSourceLogQueue logQueue;
protected ReplicationQueueStorage queueStorage;
protected ReplicationPeer replicationPeer;
@@ -115,8 +111,6 @@
volatile boolean sourceRunning = false;
// Metrics for this source
private MetricsSource metrics;
- // WARN threshold for the number of queued logs, defaults to 2
- private int logQueueWarnThreshold;
// ReplicationEndpoint which will handle the actual replication
private volatile ReplicationEndpoint replicationEndpoint;
@@ -210,6 +204,7 @@
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
+ this.logQueue = new ReplicationSourceLogQueue(conf, metrics, this);
this.queueStorage = queueStorage;
this.replicationPeer = replicationPeer;
this.manager = manager;
@@ -219,7 +214,6 @@
this.queueId = queueId;
this.replicationQueueInfo = new ReplicationQueueInfo(queueId);
- this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
// A defaultBandwidth of '0' means no bandwidth; i.e. no throttling.
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
@@ -250,35 +244,20 @@
}
// Use WAL prefix as the WALGroupId for this peer.
String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal.getName());
- PriorityBlockingQueue<Path> queue = queues.get(walPrefix);
- if (queue == null) {
- queue = new PriorityBlockingQueue<>(queueSizePerGroup,
- new AbstractFSWALProvider.WALStartTimeComparator());
- // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
- // the shipper may quit immediately
- queue.put(wal);
- queues.put(walPrefix, queue);
+ boolean queueExists = logQueue.enqueueLog(wal, walPrefix);
+
+ if (!queueExists) {
if (this.isSourceActive() && this.walEntryFilter != null) {
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that wal enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
- tryStartNewShipper(walPrefix, queue);
+ tryStartNewShipper(walPrefix);
}
- } else {
- queue.put(wal);
}
if (LOG.isTraceEnabled()) {
LOG.trace("{} Added wal {} to queue of source {}.", logPeerId(), walPrefix,
this.replicationQueueInfo.getQueueId());
}
- this.metrics.incrSizeOfLogQueue();
- // This will wal a warning for each new wal that gets created above the warn threshold
- int queueSize = queue.size();
- if (queueSize > this.logQueueWarnThreshold) {
- LOG.warn("{} WAL group {} queue size: {} exceeds value of " +
- "replication.source.log.queue.warn {}", logPeerId(), walPrefix, queueSize,
- logQueueWarnThreshold);
- }
}
@Override
@@ -370,16 +349,16 @@
this.walEntryFilter = new ChainWALEntryFilter(filters);
}
- private void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
+ private void tryStartNewShipper(String walGroupId) {
workerThreads.compute(walGroupId, (key, value) -> {
if (value != null) {
LOG.debug("{} preempted start of shipping worker walGroupId={}", logPeerId(), walGroupId);
return value;
} else {
LOG.debug("{} starting shipping worker for walGroupId={}", logPeerId(), walGroupId);
- ReplicationSourceShipper worker = createNewShipper(walGroupId, queue);
+ ReplicationSourceShipper worker = createNewShipper(walGroupId);
ReplicationSourceWALReader walReader =
- createNewWALReader(walGroupId, queue, worker.getStartPosition());
+ createNewWALReader(walGroupId, worker.getStartPosition());
Threads.setDaemonThreadRunning(
walReader, Thread.currentThread().getName()
+ ".replicationSource.wal-reader." + walGroupId + "," + queueId,
@@ -399,7 +378,7 @@
String walGroupId = walGroupShipper.getKey();
ReplicationSourceShipper shipper = walGroupShipper.getValue();
ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
- int queueSize = queues.get(walGroupId).size();
+ int queueSize = logQueue.getQueueSize(walGroupId);
replicationDelay = metrics.getReplicationDelay();
Path currentPath = shipper.getCurrentPath();
fileSize = -1;
@@ -438,16 +417,16 @@
return fileSize;
}
- protected ReplicationSourceShipper createNewShipper(String walGroupId,
- PriorityBlockingQueue<Path> queue) {
- return new ReplicationSourceShipper(conf, walGroupId, queue, this);
+ protected ReplicationSourceShipper createNewShipper(String walGroupId) {
+ return new ReplicationSourceShipper(conf, walGroupId, logQueue, this);
}
- private ReplicationSourceWALReader createNewWALReader(String walGroupId,
- PriorityBlockingQueue<Path> queue, long startPosition) {
+ private ReplicationSourceWALReader createNewWALReader(String walGroupId, long startPosition) {
return replicationPeer.getPeerConfig().isSerial()
- ? new SerialReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this)
- : new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
+ ? new SerialReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
+ this, walGroupId)
+ : new ReplicationSourceWALReader(fs, conf, logQueue, startPosition, walEntryFilter,
+ this, walGroupId);
}
/**
@@ -607,14 +586,12 @@
throw new IllegalStateException("Source should be active.");
}
LOG.info("{} queueId={} (queues={}) is replicating from cluster={} to cluster={}",
- logPeerId(), this.replicationQueueInfo.getQueueId(), this.queues.size(), clusterId,
+ logPeerId(), this.replicationQueueInfo.getQueueId(), logQueue.getNumQueues(), clusterId,
peerClusterId);
initializeWALEntryFilter(peerClusterId);
// Start workers
- for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
- String walGroupId = entry.getKey();
- PriorityBlockingQueue<Path> queue = entry.getValue();
- tryStartNewShipper(walGroupId, queue);
+ for (String walGroupId: logQueue.getQueues().keySet()) {
+ tryStartNewShipper(walGroupId);
}
this.startupOngoing.set(false);
}
@@ -844,7 +821,7 @@
workerThreads.remove(worker.walGroupId, worker);
}
- private String logPeerId(){
+ public String logPeerId(){
return "peerId=" + this.getPeerId() + ",";
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java
new file mode 100644
index 0000000..8a774fb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceLogQueue.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ Class that does enqueueing/dequeuing of wal at one place so that we can update the metrics
+ just at one place.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ReplicationSourceLogQueue {
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
+ // Queues of logs to process, entry in format of walGroupId->queue,
+ // each presents a queue for one wal group
+ private Map<String, PriorityBlockingQueue<Path>> queues = new ConcurrentHashMap<>();
+ private MetricsSource metrics;
+ private Configuration conf;
+ // per group queue size, keep no more than this number of logs in each wal group
+ private int queueSizePerGroup;
+ // WARN threshold for the number of queued logs, defaults to 2
+ private int logQueueWarnThreshold;
+ private ReplicationSource source;
+
+ public ReplicationSourceLogQueue(Configuration conf, MetricsSource metrics,
+ ReplicationSource source) {
+ this.conf = conf;
+ this.metrics = metrics;
+ this.source = source;
+ this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
+ this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
+ }
+
+ /**
+ * Enqueue the wal
+ * @param wal wal to be enqueued
+ * @param walGroupId Key for the wal in @queues map
+ * @return boolean whether this is the first time we are seeing this walGroupId.
+ */
+ public boolean enqueueLog(Path wal, String walGroupId) {
+ boolean exists = false;
+ PriorityBlockingQueue<Path> queue = queues.get(walGroupId);
+ if (queue == null) {
+ queue = new PriorityBlockingQueue<>(queueSizePerGroup,
+ new AbstractFSWALProvider.WALStartTimeComparator());
+ // make sure that we do not use an empty queue when setting up a ReplicationSource, otherwise
+ // the shipper may quit immediately
+ queue.put(wal);
+ queues.put(walGroupId, queue);
+ } else {
+ exists = true;
+ queue.put(wal);
+ }
+ // Increment size of logQueue
+ this.metrics.incrSizeOfLogQueue();
+ // Compute oldest wal age
+ this.metrics.setOldestWalAge(getOldestWalAge());
+ // This will wal a warning for each new wal that gets created above the warn threshold
+ int queueSize = queue.size();
+ if (queueSize > this.logQueueWarnThreshold) {
+ LOG.warn("{} WAL group {} queue size: {} exceeds value of " +
+ "replication.source.log.queue.warn {}", source.logPeerId(), walGroupId, queueSize,
+ logQueueWarnThreshold);
+ }
+ return exists;
+ }
+
+ /**
+ * Get the queue size for the given walGroupId.
+ * @param walGroupId walGroupId
+ */
+ public int getQueueSize(String walGroupId) {
+ Queue queue = queues.get(walGroupId);
+ if (queue == null) {
+ return 0;
+ }
+ return queue.size();
+ }
+
+ /**
+ * Returns number of queues.
+ */
+ public int getNumQueues() {
+ return queues.size();
+ }
+
+ public Map<String, PriorityBlockingQueue<Path>> getQueues() {
+ return queues;
+ }
+
+ /**
+ * Return queue for the given walGroupId
+ * Please don't add or remove elements from the returned queue.
+ * Use @enqueueLog and @remove methods respectively.
+ * @param walGroupId walGroupId
+ */
+ public PriorityBlockingQueue<Path> getQueue(String walGroupId) {
+ return queues.get(walGroupId);
+ }
+
+ /**
+ * Remove head from the queue corresponding to given walGroupId.
+ * @param walGroupId walGroupId
+ */
+ public void remove(String walGroupId) {
+ PriorityBlockingQueue<Path> queue = getQueue(walGroupId);
+ if (queue == null || queue.isEmpty()) {
+ return;
+ }
+ queue.remove();
+ // Decrease size logQueue.
+ this.metrics.decrSizeOfLogQueue();
+ // Re-compute age of oldest wal metric.
+ this.metrics.setOldestWalAge(getOldestWalAge());
+ }
+
+ /**
+ * Remove all the elements from the queue corresponding to walGroupId
+ * @param walGroupId walGroupId
+ */
+ public void clear(String walGroupId) {
+ PriorityBlockingQueue<Path> queue = getQueue(walGroupId);
+ while (!queue.isEmpty()) {
+ // Need to iterate since metrics#decrSizeOfLogQueue decrements just by 1.
+ queue.remove();
+ metrics.decrSizeOfLogQueue();
+ }
+ this.metrics.setOldestWalAge(getOldestWalAge());
+ }
+
+ /*
+ Returns the age of oldest wal.
+ */
+ long getOldestWalAge() {
+ long now = EnvironmentEdgeManager.currentTime();
+ long timestamp = getOldestWalTimestamp();
+ if (timestamp == Long.MAX_VALUE) {
+ // If there are no wals in the queue then set the oldest wal timestamp to current time
+ // so that the oldest wal age will be 0.
+ timestamp = now;
+ }
+ long age = now - timestamp;
+ return age;
+ }
+
+ /*
+ Get the oldest wal timestamp from all the queues.
+ */
+ private long getOldestWalTimestamp() {
+ long oldestWalTimestamp = Long.MAX_VALUE;
+ for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
+ PriorityBlockingQueue<Path> queue = entry.getValue();
+ Path path = queue.peek();
+ // Can path ever be null ?
+ if (path != null) {
+ oldestWalTimestamp = Math.min(oldestWalTimestamp,
+ AbstractFSWALProvider.WALStartTimeComparator.getTS(path));
+ }
+ }
+ return oldestWalTimestamp;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 78bf42f..098ba02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -21,7 +21,6 @@
import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.hadoop.conf.Configuration;
@@ -56,7 +55,7 @@
private final Configuration conf;
protected final String walGroupId;
- protected final PriorityBlockingQueue<Path> queue;
+ protected final ReplicationSourceLogQueue logQueue;
private final ReplicationSource source;
// Last position in the log that we sent to ZooKeeper
@@ -77,10 +76,10 @@
private final int shipEditsTimeout;
public ReplicationSourceShipper(Configuration conf, String walGroupId,
- PriorityBlockingQueue<Path> queue, ReplicationSource source) {
+ ReplicationSourceLogQueue logQueue, ReplicationSource source) {
this.conf = conf;
this.walGroupId = walGroupId;
- this.queue = queue;
+ this.logQueue = logQueue;
this.source = source;
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index be262a6..ad06df2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -54,7 +54,7 @@
class ReplicationSourceWALReader extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
- private final PriorityBlockingQueue<Path> logQueue;
+ private final ReplicationSourceLogQueue logQueue;
private final FileSystem fs;
private final Configuration conf;
private final WALEntryFilter filter;
@@ -77,6 +77,7 @@
private AtomicLong totalBufferUsed;
private long totalBufferQuota;
+ private final String walGroupId;
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
@@ -89,8 +90,8 @@
* @param source replication source
*/
public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
- PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
- ReplicationSource source) {
+ ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
+ ReplicationSource source, String walGroupId) {
this.logQueue = logQueue;
this.currentPosition = startPosition;
this.fs = fs;
@@ -111,6 +112,7 @@
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
+ this.walGroupId = walGroupId;
LOG.info("peerClusterZnode=" + source.getQueueId()
+ ", ReplicationSourceWALReaderThread : " + source.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
@@ -125,7 +127,7 @@
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, conf, currentPosition,
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
- source.getSourceMetrics())) {
+ source.getSourceMetrics(), walGroupId)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
@@ -232,7 +234,7 @@
private void handleEmptyWALEntryBatch() throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
- if (logQueue.isEmpty()) {
+ if (logQueue.getQueue(walGroupId).isEmpty()) {
// we're done with current queue, either this is a recovered queue, or it is the special group
// for a sync replication peer and the peer has been transited to DA or S state.
setReaderRunning(false);
@@ -247,18 +249,19 @@
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
// enabled, then dump the log
private void handleEofException(IOException e) {
+ PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
- (source.isRecovered() || logQueue.size() > 1) && this.eofAutoRecovery) {
+ (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
try {
- if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
- LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
- logQueue.remove();
+ if (fs.getFileStatus(queue.peek()).getLen() == 0) {
+ LOG.warn("Forcing removal of 0 length log in queue: " + queue.peek());
+ logQueue.remove(walGroupId);
currentPosition = 0;
}
} catch (IOException ioe) {
- LOG.warn("Couldn't get file length information about log " + logQueue.peek());
+ LOG.warn("Couldn't get file length information about log " + queue.peek());
}
}
}
@@ -270,7 +273,7 @@
return batchQueueHead.getLastWalPath();
}
// otherwise, we must be currently reading from the head of the log queue
- return logQueue.peek();
+ return logQueue.getQueue(walGroupId).peek();
}
//returns false if we've already exceeded the global quota
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
index 9edcc8a..d0e76fb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
-import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,9 +43,9 @@
private final SerialReplicationChecker checker;
public SerialReplicationSourceWALReader(FileSystem fs, Configuration conf,
- PriorityBlockingQueue<Path> logQueue, long startPosition, WALEntryFilter filter,
- ReplicationSource source) {
- super(fs, conf, logQueue, startPosition, filter, source);
+ ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
+ ReplicationSource source, String walGroupId) {
+ super(fs, conf, logQueue, startPosition, filter, source, walGroupId);
checker = new SerialReplicationChecker(conf, source);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
index a2da1ec..ab70501 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStream.java
@@ -63,7 +63,8 @@
private long currentPositionOfEntry = 0;
// position after reading current entry
private long currentPositionOfReader = 0;
- private final PriorityBlockingQueue<Path> logQueue;
+ private final ReplicationSourceLogQueue logQueue;
+ private final String walGroupId;
private final FileSystem fs;
private final Configuration conf;
private final WALFileLengthProvider walFileLengthProvider;
@@ -81,9 +82,9 @@
* @param metrics the replication metrics
* @throws IOException
*/
- public WALEntryStream(PriorityBlockingQueue<Path> logQueue, Configuration conf,
+ public WALEntryStream(ReplicationSourceLogQueue logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
- MetricsSource metrics) throws IOException {
+ MetricsSource metrics, String walGroupId) throws IOException {
this.logQueue = logQueue;
this.fs = CommonFSUtils.getWALFileSystem(conf);
this.conf = conf;
@@ -91,6 +92,7 @@
this.walFileLengthProvider = walFileLengthProvider;
this.serverName = serverName;
this.metrics = metrics;
+ this.walGroupId = walGroupId;
}
/**
@@ -251,10 +253,9 @@
private void dequeueCurrentLog() throws IOException {
LOG.debug("EOF, closing {}", currentPath);
closeReader();
- logQueue.remove();
+ logQueue.remove(walGroupId);
setCurrentPath(null);
setPosition(0);
- metrics.decrSizeOfLogQueue();
}
/**
@@ -301,7 +302,8 @@
// open a reader on the next log in queue
private boolean openNextLog() throws IOException {
- Path nextPath = logQueue.peek();
+ PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
+ Path nextPath = queue.peek();
if (nextPath != null) {
openReader(nextPath);
if (reader != null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
index 720e2c2..4b74e10 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java
@@ -458,7 +458,7 @@
* @param p path to split
* @return start time
*/
- private static long getTS(Path p) {
+ public static long getTS(Path p) {
return getTimestamp(p.getName());
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
index 50537b5..86a71c9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
@@ -22,6 +22,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
@@ -31,7 +32,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -39,6 +39,7 @@
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -60,6 +61,8 @@
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -289,7 +292,7 @@
source.init(testConf, null, mockManager, null, mockPeer, null,
"testPeer", null, p -> OptionalLong.empty(), mock(MetricsSource.class));
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(null,
- conf, null, 0, null, source);
+ conf, null, 0, null, source, null);
ReplicationSourceShipper shipper =
new ReplicationSourceShipper(conf, null, null, source);
shipper.entryReader = reader;
@@ -482,8 +485,6 @@
String walGroupId = "fake-wal-group-id";
ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
- PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
- queue.put(new Path("/www/html/test"));
RecoveredReplicationSource source = mock(RecoveredReplicationSource.class);
Server server = mock(Server.class);
Mockito.when(server.getServerName()).thenReturn(serverName);
@@ -496,8 +497,12 @@
.thenReturn(-1L);
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt("replication.source.maxretriesmultiplier", -1);
+ MetricsSource metricsSource = mock(MetricsSource.class);
+ doNothing().when(metricsSource).incrSizeOfLogQueue();
+ ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metricsSource, source);
+ logQueue.enqueueLog(new Path("/www/html/test"), walGroupId);
RecoveredReplicationSourceShipper shipper =
- new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
+ new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, source, storage);
assertEquals(1001L, shipper.getStartPosition());
}
@@ -590,5 +595,60 @@
rss.stop("Done");
}
}
-}
+ /*
+ Test age of oldest wal metric.
+ */
+ @Test
+ public void testAgeOfOldestWal() throws Exception {
+ try {
+ ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(manualEdge);
+
+ String id = "1";
+ MetricsSource metrics = new MetricsSource(id);
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.setInt("replication.source.maxretriesmultiplier", 1);
+ ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
+ Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
+ Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+ ReplicationPeerConfig peerConfig = Mockito.mock(ReplicationPeerConfig.class);
+ Mockito.when(peerConfig.getReplicationEndpointImpl()).
+ thenReturn(DoNothingReplicationEndpoint.class.getName());
+ Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
+ ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
+ Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+ Mockito.when(manager.getGlobalMetrics()).
+ thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
+ RegionServerServices rss =
+ TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
+
+ ReplicationSource source = new ReplicationSource();
+ source.init(conf, null, manager, null, mockPeer, rss, id, null,
+ p -> OptionalLong.empty(), metrics);
+
+ final Path log1 = new Path(logDir, "log-walgroup-a.8");
+ manualEdge.setValue(10);
+ // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2.
+ source.enqueueLog(log1);
+ MetricsReplicationSourceSource metricsSource1 = getSourceMetrics(id);
+ assertEquals(2, metricsSource1.getOldestWalAge());
+
+ final Path log2 = new Path(logDir, "log-walgroup-b.4");
+ // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
+ source.enqueueLog(log2);
+ assertEquals(6, metricsSource1.getOldestWalAge());
+ // Clear all metrics.
+ metrics.clear();
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+
+ private MetricsReplicationSourceSource getSourceMetrics(String sourceId) {
+ MetricsReplicationSourceFactoryImpl factory =
+ (MetricsReplicationSourceFactoryImpl) CompatibilitySingletonFactory.getInstance(
+ MetricsReplicationSourceFactory.class);
+ return factory.getSource(sourceId);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java
new file mode 100644
index 0000000..c28b180
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceLogQueue.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category({SmallTests.class,ReplicationTests.class})
+public class TestReplicationSourceLogQueue {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationSourceLogQueue.class);
+
+ /*
+ Testing enqueue and dequeuing of wal and check age of oldest wal.
+ */
+ @Test
+ public void testEnqueueDequeue() {
+ try {
+ String walGroupId1 = "fake-walgroup-id-1";
+ String walGroupId2 = "fake-walgroup-id-2";
+
+ ManualEnvironmentEdge manualEdge = new ManualEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(manualEdge);
+
+ MetricsSource metrics = new MetricsSource("1");
+ Configuration conf = HBaseConfiguration.create();
+ ReplicationSource source = mock(ReplicationSource.class);
+ Mockito.doReturn("peer").when(source).logPeerId();
+ ReplicationSourceLogQueue logQueue = new ReplicationSourceLogQueue(conf, metrics, source);
+ final Path log1 = new Path("log-walgroup-a.8");
+ manualEdge.setValue(10);
+ // Diff of current time (10) and log-walgroup-a.8 timestamp will be 2.
+ logQueue.enqueueLog(log1, walGroupId1);
+ assertEquals(2, logQueue.getOldestWalAge());
+
+ final Path log2 = new Path("log-walgroup-b.4");
+ // Diff of current time (10) and log-walgroup-b.4 will be 6 so oldestWalAge should be 6
+ logQueue.enqueueLog(log2, walGroupId2);
+ assertEquals(6, logQueue.getOldestWalAge());
+
+ // Remove an element from walGroupId2.
+ // After this op, there will be only one element in the queue log-walgroup-a.8
+ logQueue.remove(walGroupId2);
+ assertEquals(2, logQueue.getOldestWalAge());
+
+ // Remove last element from the queue.
+ logQueue.remove(walGroupId1);
+ // This will test the case where there are no elements in the queue.
+ assertEquals(0, logQueue.getOldestWalAge());
+ } finally {
+ EnvironmentEdgeManager.reset();
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
index 1db9c17..9c6fafc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java
@@ -24,6 +24,8 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
@@ -99,6 +101,7 @@
private static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
private static final NavigableMap<byte[], Integer> scopes = getScopes();
+ private final String fakeWalGroupId = "fake-wal-group-id";
private static NavigableMap<byte[], Integer> getScopes() {
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@@ -107,7 +110,7 @@
}
private WAL log;
- PriorityBlockingQueue<Path> walQueue;
+ ReplicationSourceLogQueue logQueue;
private PathWatcher pathWatcher;
@Rule
@@ -131,7 +134,8 @@
@Before
public void setUp() throws Exception {
- walQueue = new PriorityBlockingQueue<>();
+ ReplicationSource source = mock(ReplicationSource.class);
+ logQueue = new ReplicationSourceLogQueue(CONF, new MetricsSource("2"), source);
pathWatcher = new PathWatcher();
final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
wals.getWALProvider().addWALActionsListener(pathWatcher);
@@ -165,7 +169,8 @@
log.rollWriter();
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(logQueue, CONF, 0, log, null,
+ new MetricsSource("1"), fakeWalGroupId)) {
int i = 0;
while (entryStream.hasNext()) {
assertNotNull(entryStream.next());
@@ -192,7 +197,7 @@
appendToLogAndSync();
long oldPos;
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
// There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.peek();
@@ -206,8 +211,8 @@
appendToLogAndSync();
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos,
- log, null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
+ log, null, new MetricsSource("1"), fakeWalGroupId)) {
// Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
@@ -220,8 +225,8 @@
log.rollWriter();
appendToLogAndSync();
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, oldPos,
- log, null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, oldPos,
+ log, null, new MetricsSource("1"), fakeWalGroupId)) {
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
@@ -246,7 +251,8 @@
appendToLog("1");
appendToLog("2");// 2
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(logQueue, CONF, 0, log, null,
+ new MetricsSource("1"), fakeWalGroupId)) {
assertEquals("1", getRow(entryStream.next()));
appendToLog("3"); // 3 - comes in after reader opened
@@ -254,12 +260,12 @@
appendToLog("4"); // 4 - this append is in the rolled log
assertEquals("2", getRow(entryStream.next()));
- assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an
+ assertEquals(2, getQueue().size()); // we should not have dequeued yet since there's still an
// entry in first log
assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
// and 3 would be skipped
assertEquals("4", getRow(entryStream.next())); // 4
- assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly
+ assertEquals(1, getQueue().size()); // now we've dequeued and moved on to next log properly
assertFalse(entryStream.hasNext());
}
}
@@ -267,11 +273,13 @@
/**
* Tests that if writes come in while we have a stream open, we shouldn't miss them
*/
+
@Test
public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1");
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(logQueue, CONF, 0, log, null,
+ new MetricsSource("1"), fakeWalGroupId)) {
entryStream.next(); // we've hit the end of the stream at this point
// some new entries come in while we're streaming
@@ -294,7 +302,8 @@
long lastPosition = 0;
appendToLog("1");
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(logQueue, CONF, 0, log, null,
+ new MetricsSource("1"), fakeWalGroupId)) {
entryStream.next(); // we've hit the end of the stream at this point
appendToLog("2");
appendToLog("3");
@@ -302,11 +311,12 @@
}
// next stream should picks up where we left off
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(logQueue, CONF, lastPosition, log, null,
+ new MetricsSource("1"), fakeWalGroupId)) {
assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next()));
assertFalse(entryStream.hasNext()); // done
- assertEquals(1, walQueue.size());
+ assertEquals(1, getQueue().size());
}
}
@@ -314,19 +324,21 @@
* Tests that if we stop before hitting the end of a stream, we can continue where we left off
* using the last position
*/
+
@Test
public void testPosition() throws Exception {
long lastPosition = 0;
appendEntriesToLogAndSync(3);
// read only one element
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, lastPosition,
- log, null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition,
+ log, null, new MetricsSource("1"), fakeWalGroupId)) {
entryStream.next();
lastPosition = entryStream.getPosition();
}
// there should still be two more entries from where we left off
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, CONF, lastPosition, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(logQueue, CONF, lastPosition, log, null,
+ new MetricsSource("1"), fakeWalGroupId)) {
assertNotNull(entryStream.next());
assertNotNull(entryStream.next());
assertFalse(entryStream.hasNext());
@@ -337,7 +349,8 @@
@Test
public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(logQueue, CONF, 0, log, null,
+ new MetricsSource("1"), fakeWalGroupId)) {
assertFalse(entryStream.hasNext());
}
}
@@ -391,7 +404,8 @@
ReplicationSource source = mockReplicationSource(recovered, conf);
when(source.isPeerEnabled()).thenReturn(true);
ReplicationSourceWALReader reader =
- new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source);
+ new ReplicationSourceWALReader(fs, conf, logQueue, 0, getDummyFilter(), source,
+ fakeWalGroupId);
reader.start();
return reader;
}
@@ -402,7 +416,8 @@
// get ending position
long position;
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(logQueue, CONF, 0, log, null,
+ new MetricsSource("1"), fakeWalGroupId)) {
entryStream.next();
entryStream.next();
entryStream.next();
@@ -410,7 +425,7 @@
}
// start up a reader
- Path walPath = walQueue.peek();
+ Path walPath = getQueue().peek();
ReplicationSourceWALReader reader = createReader(false, CONF);
WALEntryBatch entryBatch = reader.take();
@@ -430,7 +445,7 @@
@Test
public void testReplicationSourceWALReaderRecovered() throws Exception {
appendEntriesToLogAndSync(10);
- Path walPath = walQueue.peek();
+ Path walPath = getQueue().peek();
log.rollWriter();
appendEntriesToLogAndSync(5);
log.shutdown();
@@ -450,7 +465,7 @@
assertEquals(0, batch.getNbEntries());
assertTrue(batch.isEndOfFile());
- walPath = walQueue.peek();
+ walPath = getQueue().peek();
batch = reader.take();
assertEquals(walPath, batch.getLastWalPath());
assertEquals(5, batch.getNbEntries());
@@ -463,7 +478,7 @@
@Test
public void testReplicationSourceWALReaderWrongPosition() throws Exception {
appendEntriesToLogAndSync(1);
- Path walPath = walQueue.peek();
+ Path walPath = getQueue().peek();
log.rollWriter();
appendEntriesToLogAndSync(20);
TEST_UTIL.waitFor(5000, new ExplainingPredicate<Exception>() {
@@ -490,7 +505,7 @@
assertEquals(1, entryBatch.getNbEntries());
assertTrue(entryBatch.isEndOfFile());
- Path walPath2 = walQueue.peek();
+ Path walPath2 = getQueue().peek();
entryBatch = reader.take();
assertEquals(walPath2, entryBatch.getLastWalPath());
assertEquals(20, entryBatch.getNbEntries());
@@ -503,7 +518,7 @@
assertEquals(0, entryBatch.getNbEntries());
assertTrue(entryBatch.isEndOfFile());
- Path walPath3 = walQueue.peek();
+ Path walPath3 = getQueue().peek();
entryBatch = reader.take();
assertEquals(walPath3, entryBatch.getLastWalPath());
assertEquals(10, entryBatch.getNbEntries());
@@ -517,7 +532,8 @@
// get ending position
long position;
try (WALEntryStream entryStream =
- new WALEntryStream(walQueue, CONF, 0, log, null, new MetricsSource("1"))) {
+ new WALEntryStream(logQueue, CONF, 0, log, null,
+ new MetricsSource("1"), fakeWalGroupId)) {
entryStream.next();
entryStream.next();
entryStream.next();
@@ -525,7 +541,7 @@
}
// start up a reader
- Path walPath = walQueue.peek();
+ Path walPath = getQueue().peek();
ReplicationSource source = mockReplicationSource(false, CONF);
AtomicInteger invokeCount = new AtomicInteger(0);
AtomicBoolean enabled = new AtomicBoolean(false);
@@ -535,7 +551,8 @@
});
ReplicationSourceWALReader reader =
- new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source);
+ new ReplicationSourceWALReader(fs, CONF, logQueue, 0, getDummyFilter(),
+ source, fakeWalGroupId);
reader.start();
Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
return reader.take();
@@ -621,8 +638,8 @@
Path currentPath;
@Override
- public void preLogRoll(Path oldPath, Path newPath) throws IOException {
- walQueue.add(newPath);
+ public void preLogRoll(Path oldPath, Path newPath) {
+ logQueue.enqueueLog(newPath, fakeWalGroupId);
currentPath = newPath;
}
}
@@ -631,10 +648,10 @@
public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
appendToLog("1");
appendToLog("2");
- long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
+ long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong();
AtomicLong fileLength = new AtomicLong(size - 1);
- try (WALEntryStream entryStream = new WALEntryStream(walQueue, CONF, 0,
- p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"))) {
+ try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 0,
+ p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"), fakeWalGroupId)) {
assertTrue(entryStream.hasNext());
assertNotNull(entryStream.next());
// can not get log 2
@@ -660,13 +677,11 @@
*/
@Test
public void testEOFExceptionForRecoveredQueue() throws Exception {
- PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
// Create a 0 length log.
Path emptyLog = new Path("emptyLog");
FSDataOutputStream fsdos = fs.create(emptyLog);
fsdos.close();
assertEquals(0, fs.getFileStatus(emptyLog).getLen());
- queue.add(emptyLog);
Configuration conf = new Configuration(CONF);
// Override the max retries multiplier to fail fast.
@@ -675,11 +690,22 @@
// Create a reader thread with source as recovered source.
ReplicationSource source = mockReplicationSource(true, conf);
when(source.isPeerEnabled()).thenReturn(true);
+
+ MetricsSource metrics = mock(MetricsSource.class);
+ doNothing().when(metrics).incrSizeOfLogQueue();
+ doNothing().when(metrics).decrSizeOfLogQueue();
+ ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
+ localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
ReplicationSourceWALReader reader =
- new ReplicationSourceWALReader(fs, conf, queue, 0, getDummyFilter(), source);
+ new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
+ getDummyFilter(), source, fakeWalGroupId);
reader.run();
// ReplicationSourceWALReaderThread#handleEofException method will
// remove empty log from logQueue.
- assertEquals(0, queue.size());
+ assertEquals(0, localLogQueue.getQueueSize(fakeWalGroupId));
+ }
+
+ private PriorityBlockingQueue<Path> getQueue() {
+ return logQueue.getQueue(fakeWalGroupId);
}
}