blob: b13f838a2e16d37783ef17b7a23683b0d62d5949 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
/**
* Class that handles the source of a replication stream.
* Currently does not handle more than 1 slave
* For each slave cluster it selects a random number of peers
* using a replication ratio. For example, if replication ration = 0.1
* and slave cluster has 100 region servers, 10 will be selected.
* <p>
* A stream is considered down when we cannot contact a region server on the
* peer cluster for more than 55 seconds by default.
* </p>
*
*/
@InterfaceAudience.Private
public class ReplicationSource extends Thread
implements ReplicationSourceInterface {
private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
// Queues of logs to process, entry in format of walGroupId->queue,
// each presents a queue for one wal group
private Map<String, PriorityBlockingQueue<Path>> queues =
new HashMap<String, PriorityBlockingQueue<Path>>();
// per group queue size, keep no more than this number of logs in each wal group
private int queueSizePerGroup;
private ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers;
private Configuration conf;
private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
private String peerId;
// The manager of all sources to which we ping back our progress
private ReplicationSourceManager manager;
// Should we stop everything?
private Stoppable stopper;
// How long should we sleep for each retry
private long sleepForRetries;
// Max size in bytes of entriesArray
private long replicationQueueSizeCapacity;
// Max number of entries in entriesArray
private int replicationQueueNbCapacity;
private FileSystem fs;
// id of this cluster
private UUID clusterId;
// id of the other cluster
private UUID peerClusterId;
// total number of edits we replicated
private AtomicLong totalReplicatedEdits = new AtomicLong(0);
// total number of edits we replicated
private AtomicLong totalReplicatedOperations = new AtomicLong(0);
// The znode we currently play with
private String peerClusterZnode;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
// Indicates if this particular source is running
private volatile boolean sourceRunning = false;
// Metrics for this source
private MetricsSource metrics;
//WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold;
// ReplicationEndpoint which will handle the actual replication
private ReplicationEndpoint replicationEndpoint;
// A filter (or a chain of filters) for the WAL entries.
private WALEntryFilter walEntryFilter;
// throttler
private ReplicationThrottler throttler;
private AtomicInteger logQueueSize = new AtomicInteger(0);
private ConcurrentHashMap<String, ReplicationSourceWorkerThread> workerThreads =
new ConcurrentHashMap<String, ReplicationSourceWorkerThread>();
// Hold the state of a replication worker thread
public enum WorkerState {
RUNNING,
STOPPED,
FINISHED // The worker is done processing a recovered queue
}
/**
* Instantiation method used by region servers
*
* @param conf configuration to use
* @param fs file system to use
* @param manager replication manager to ping to
* @param stopper the atomic boolean to use to stop the regionserver
* @param peerClusterZnode the name of our znode
* @param clusterId unique UUID for the cluster
* @param replicationEndpoint the replication endpoint implementation
* @param metrics metrics for replication source
* @throws IOException
*/
@Override
public void init(final Configuration conf, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
final MetricsSource metrics)
throws IOException {
this.stopper = stopper;
this.conf = HBaseConfiguration.create(conf);
decorateConf();
this.replicationQueueSizeCapacity =
this.conf.getLong("replication.source.size.capacity", 1024*1024*64);
this.replicationQueueNbCapacity =
this.conf.getInt("replication.source.nb.capacity", 25000);
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
long bandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
this.throttler = new ReplicationThrottler((double)bandwidth/10.0);
this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
this.manager = manager;
this.fs = fs;
this.metrics = metrics;
this.clusterId = clusterId;
this.peerClusterZnode = peerClusterZnode;
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
this.logQueueWarnThreshold = this.conf.getInt("replication.source.log.queue.warn", 2);
this.replicationEndpoint = replicationEndpoint;
}
private void decorateConf() {
String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
if (StringUtils.isNotEmpty(replicationCodec)) {
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
}
}
@Override
public void enqueueLog(Path log) {
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
PriorityBlockingQueue<Path> queue = queues.get(logPrefix);
if (queue == null) {
queue = new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
queues.put(logPrefix, queue);
if (this.sourceRunning) {
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
final ReplicationSourceWorkerThread worker =
new ReplicationSourceWorkerThread(logPrefix, queue, replicationQueueInfo, this);
ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(logPrefix, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
} else {
LOG.debug("Starting up worker for wal group " + logPrefix);
worker.startup();
}
}
}
queue.put(log);
int queueSize = logQueueSize.incrementAndGet();
this.metrics.setSizeOfLogQueue(queueSize);
// This will log a warning for each new log that gets created above the warn threshold
if (queue.size() > this.logQueueWarnThreshold) {
LOG.warn("WAL group " + logPrefix + " queue size: " + queueSize
+ " exceeds value of replication.source.log.queue.warn: " + logQueueWarnThreshold);
}
}
private void uninitialize() {
LOG.debug("Source exiting " + this.peerId);
metrics.clear();
if (replicationEndpoint.state() == Service.State.STARTING
|| replicationEndpoint.state() == Service.State.RUNNING) {
replicationEndpoint.stopAndWait();
}
}
@Override
public void run() {
// mark we are running now
this.sourceRunning = true;
try {
// start the endpoint, connect to the cluster
Service.State state = replicationEndpoint.start().get();
if (state != Service.State.RUNNING) {
LOG.warn("ReplicationEndpoint was not started. Exiting");
uninitialize();
return;
}
} catch (Exception ex) {
LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
throw new RuntimeException(ex);
}
// get the WALEntryFilter from ReplicationEndpoint and add it to default filters
ArrayList<WALEntryFilter> filters = Lists.newArrayList(
(WALEntryFilter)new SystemTableWALEntryFilter());
WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
if (filterFromEndpoint != null) {
filters.add(filterFromEndpoint);
}
this.walEntryFilter = new ChainWALEntryFilter(filters);
int sleepMultiplier = 1;
// delay this until we are in an asynchronous thread
while (this.isSourceActive() && this.peerClusterId == null) {
this.peerClusterId = replicationEndpoint.getPeerUUID();
if (this.isSourceActive() && this.peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
this.manager.closeQueue(this);
return;
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
// start workers
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
final ReplicationSourceWorkerThread worker =
new ReplicationSourceWorkerThread(walGroupId, queue, replicationQueueInfo, this);
ReplicationSourceWorkerThread extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup();
}
}
}
/**
* Do the sleeping logic
* @param msg Why we sleep
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
*/
protected boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
}
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries");
Thread.currentThread().interrupt();
}
return sleepMultiplier < maxRetriesMultiplier;
}
/**
* check whether the peer is enabled or not
*
* @return true if the peer is enabled, otherwise false
*/
protected boolean isPeerEnabled() {
return this.replicationPeers.getStatusOfPeer(this.peerId);
}
@Override
public void startup() {
String n = Thread.currentThread().getName();
Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
LOG.error("Unexpected exception in ReplicationSource", e);
}
};
Threads
.setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
}
@Override
public void terminate(String reason) {
terminate(reason, null);
}
@Override
public void terminate(String reason, Exception cause) {
terminate(reason, cause, true);
}
public void terminate(String reason, Exception cause, boolean join) {
if (cause == null) {
LOG.info("Closing source "
+ this.peerClusterZnode + " because: " + reason);
} else {
LOG.error("Closing source " + this.peerClusterZnode
+ " because an error occurred: " + reason, cause);
}
this.sourceRunning = false;
Collection<ReplicationSourceWorkerThread> workers = workerThreads.values();
for (ReplicationSourceWorkerThread worker : workers) {
worker.setWorkerState(WorkerState.STOPPED);
worker.interrupt();
}
ListenableFuture<Service.State> future = null;
if (this.replicationEndpoint != null) {
future = this.replicationEndpoint.stop();
}
if (join) {
for (ReplicationSourceWorkerThread worker : workers) {
Threads.shutdown(worker, this.sleepForRetries);
LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
}
if (future != null) {
try {
future.get(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
+ this.peerClusterZnode,
e);
}
}
}
}
@Override
public String getPeerClusterZnode() {
return this.peerClusterZnode;
}
@Override
public String getPeerClusterId() {
return this.peerId;
}
@Override
public Path getCurrentPath() {
// only for testing
for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
if (worker.getCurrentPath() != null) return worker.getCurrentPath();
}
return null;
}
private boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning;
}
/**
* Comparator used to compare logs together based on their start time
*/
public static class LogsComparator implements Comparator<Path> {
@Override
public int compare(Path o1, Path o2) {
return Long.compare(getTS(o1), getTS(o2));
}
/**
* Split a path to get the start time
* For example: 10.20.20.171%3A60020.1277499063250
* @param p path to split
* @return start time
*/
private static long getTS(Path p) {
int tsIndex = p.getName().lastIndexOf('.') + 1;
return Long.parseLong(p.getName().substring(tsIndex));
}
}
@Override
public String getStats() {
StringBuilder sb = new StringBuilder();
sb.append("Total replicated edits: ").append(totalReplicatedEdits)
.append(", current progress: \n");
for (Map.Entry<String, ReplicationSourceWorkerThread> entry : workerThreads.entrySet()) {
String walGroupId = entry.getKey();
ReplicationSourceWorkerThread worker = entry.getValue();
long position = worker.getCurrentPosition();
Path currentPath = worker.getCurrentPath();
sb.append("walGroup [").append(walGroupId).append("]: ");
if (currentPath != null) {
sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
.append(position).append("\n");
} else {
sb.append("no replication ongoing, waiting for new log");
}
}
return sb.toString();
}
/**
* Get Replication Source Metrics
* @return sourceMetrics
*/
public MetricsSource getSourceMetrics() {
return this.metrics;
}
public class ReplicationSourceWorkerThread extends Thread {
private ReplicationSource source;
private String walGroupId;
private PriorityBlockingQueue<Path> queue;
private ReplicationQueueInfo replicationQueueInfo;
// Our reader for the current log. open/close handled by repLogReader
private WAL.Reader reader;
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
private volatile Path currentPath;
// Handle on the log reader helper
private ReplicationWALReaderManager repLogReader;
// Current number of operations (Put/Delete) that we need to replicate
private int currentNbOperations = 0;
// Current size of data we need to replicate
private int currentSize = 0;
// Current state of the worker thread
private WorkerState state;
public ReplicationSourceWorkerThread(String walGroupId, PriorityBlockingQueue<Path> queue,
ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
this.walGroupId = walGroupId;
this.queue = queue;
this.replicationQueueInfo = replicationQueueInfo;
this.repLogReader = new ReplicationWALReaderManager(fs, conf);
this.source = source;
}
@Override
public void run() {
setWorkerState(WorkerState.RUNNING);
// If this is recovered, the queue is already full and the first log
// normally has a position (unless the RS failed between 2 logs)
if (this.replicationQueueInfo.isQueueRecovered()) {
try {
this.repLogReader.setPosition(replicationQueues.getLogPosition(peerClusterZnode,
this.queue.peek().getName()));
if (LOG.isTraceEnabled()) {
LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
+ this.repLogReader.getPosition());
}
} catch (ReplicationException e) {
terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
}
}
int sleepMultiplier = 1;
// Loop until we close down
while (isWorkerActive()) {
// Sleep until replication is enabled again
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
Path oldPath = getCurrentPath(); //note that in the current scenario,
//oldPath will be null when a log roll
//happens.
// Get a new path
boolean hasCurrentPath = getNextPath();
if (getCurrentPath() != null && oldPath == null) {
sleepMultiplier = 1; //reset the sleepMultiplier on a path change
}
if (!hasCurrentPath) {
if (sleepForRetries("No log to process", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
boolean currentWALisBeingWrittenTo = false;
//For WAL files we own (rather than recovered), take a snapshot of whether the
//current WAL file (this.currentPath) is in use (for writing) NOW!
//Since the new WAL paths are enqueued only after the prev WAL file
//is 'closed', presence of an element in the queue means that
//the previous WAL file was closed, else the file is in use (currentPath)
//We take the snapshot now so that we are protected against races
//where a new file gets enqueued while the current file is being processed
//(and where we just finished reading the current file).
if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
currentWALisBeingWrittenTo = true;
}
// Open a reader on it
if (!openReader(sleepMultiplier)) {
// Reset the sleep multiplier, else it'd be reused for the next file
sleepMultiplier = 1;
continue;
}
// If we got a null reader but didn't continue, then sleep and continue
if (this.reader == null) {
if (sleepForRetries("Unable to open a reader", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
boolean gotIOE = false;
currentNbOperations = 0;
List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
currentSize = 0;
try {
if (readAllEntriesToReplicateOrNextFile(currentWALisBeingWrittenTo, entries)) {
continue;
}
} catch (IOException ioe) {
LOG.warn(peerClusterZnode + " Got: ", ioe);
gotIOE = true;
if (ioe.getCause() instanceof EOFException) {
boolean considerDumping = false;
if (this.replicationQueueInfo.isQueueRecovered()) {
try {
FileStatus stat = fs.getFileStatus(this.currentPath);
if (stat.getLen() == 0) {
LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
}
considerDumping = true;
} catch (IOException e) {
LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
}
}
if (considerDumping &&
sleepMultiplier == maxRetriesMultiplier &&
processEndOfFile(false)) {
continue;
}
}
} finally {
try {
this.reader = null;
this.repLogReader.closeReader();
} catch (IOException e) {
gotIOE = true;
LOG.warn("Unable to finalize the tailing of a file", e);
}
}
// If we didn't get anything to replicate, or if we hit a IOE,
// wait a bit and retry.
// But if we need to stop, don't bother sleeping
if (isWorkerActive() && (gotIOE || entries.isEmpty())) {
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
manager.logPositionAndCleanOldLogs(this.currentPath,
peerClusterZnode, this.repLogReader.getPosition(),
this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.repLogReader.getPosition();
}
// Reset the sleep multiplier if nothing has actually gone wrong
if (!gotIOE) {
sleepMultiplier = 1;
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
}
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
sleepMultiplier = 1;
shipEdits(currentWALisBeingWrittenTo, entries);
}
if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) {
// use synchronize to make sure one last thread will clean the queue
synchronized (workerThreads) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
boolean allOtherTaskDone = true;
for (ReplicationSourceWorkerThread worker : workerThreads.values()) {
if (!worker.equals(this) && worker.getWorkerState() != WorkerState.FINISHED) {
allOtherTaskDone = false;
break;
}
}
if (allOtherTaskDone) {
manager.closeRecoveredQueue(this.source);
LOG.info("Finished recovering queue " + peerClusterZnode
+ " with the following stats: " + getStats());
}
}
}
// If the worker exits run loop without finishing it's task, mark it as stopped.
if (state != WorkerState.FINISHED) {
setWorkerState(WorkerState.STOPPED);
}
}
/**
* Read all the entries from the current log files and retain those that need to be replicated.
* Else, process the end of the current file.
* @param currentWALisBeingWrittenTo is the current WAL being written to
* @param entries resulting entries to be replicated
* @return true if we got nothing and went to the next file, false if we got entries
* @throws IOException
*/
protected boolean readAllEntriesToReplicateOrNextFile(boolean currentWALisBeingWrittenTo,
List<WAL.Entry> entries) throws IOException {
long seenEntries = 0;
if (LOG.isTraceEnabled()) {
LOG.trace("Seeking in " + this.currentPath + " at position "
+ this.repLogReader.getPosition());
}
this.repLogReader.seek();
long positionBeforeRead = this.repLogReader.getPosition();
WAL.Entry entry = this.repLogReader.readNextAndSetPosition();
while (entry != null) {
metrics.incrLogEditsRead();
seenEntries++;
// don't replicate if the log entries have already been consumed by the cluster
if (replicationEndpoint.canReplicateToSameCluster()
|| !entry.getKey().getClusterIds().contains(peerClusterId)) {
// Remove all KVs that should not be replicated
entry = walEntryFilter.filter(entry);
WALEdit edit = null;
WALKey logKey = null;
if (entry != null) {
edit = entry.getEdit();
logKey = entry.getKey();
}
if (edit != null && edit.size() != 0) {
// Mark that the current cluster has the change
logKey.addClusterId(clusterId);
currentNbOperations += countDistinctRowKeys(edit);
entries.add(entry);
currentSize += entry.getEdit().heapSize();
} else {
metrics.incrLogEditsFiltered();
}
}
// Stop if too many entries or too big
// FIXME check the relationship between single wal group and overall
if (currentSize >= replicationQueueSizeCapacity
|| entries.size() >= replicationQueueNbCapacity) {
break;
}
try {
entry = this.repLogReader.readNextAndSetPosition();
} catch (IOException ie) {
LOG.debug("Break on IOE: " + ie.getMessage());
break;
}
}
metrics.incrLogReadInBytes(this.repLogReader.getPosition() - positionBeforeRead);
if (currentWALisBeingWrittenTo) {
return false;
}
// If we didn't get anything and the queue has an object, it means we
// hit the end of the file for sure
return seenEntries == 0 && processEndOfFile(false);
}
/**
* Poll for the next path
* @return true if a path was obtained, false if not
*/
protected boolean getNextPath() {
try {
if (this.currentPath == null) {
this.currentPath = queue.poll(sleepForRetries, TimeUnit.MILLISECONDS);
int queueSize = logQueueSize.decrementAndGet();
metrics.setSizeOfLogQueue(queueSize);
if (this.currentPath != null) {
// For recovered queue: must use peerClusterZnode since peerId is a parsed value
manager.cleanOldLogs(this.currentPath.getName(), peerClusterZnode,
this.replicationQueueInfo.isQueueRecovered());
if (LOG.isTraceEnabled()) {
LOG.trace("New log: " + this.currentPath);
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while reading edits", e);
}
return this.currentPath != null;
}
/**
* Open a reader on the current path
*
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return true if we should continue with that file, false if we are over with it
*/
protected boolean openReader(int sleepMultiplier) {
try {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Opening log " + this.currentPath);
}
this.reader = repLogReader.openReader(this.currentPath);
} catch (FileNotFoundException fnfe) {
if (this.replicationQueueInfo.isQueueRecovered()) {
// We didn't find the log in the archive directory, look if it still
// exists in the dead RS folder (there could be a chain of failures
// to look at)
List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
LOG.info("NB dead servers : " + deadRegionServers.size());
final Path rootDir = FSUtils.getRootDir(conf);
for (String curDeadServerName : deadRegionServers) {
final Path deadRsDirectory = new Path(rootDir,
DefaultWALProvider.getWALDirectoryName(curDeadServerName));
Path[] locs = new Path[] {
new Path(deadRsDirectory, currentPath.getName()),
new Path(deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT),
currentPath.getName()),
};
for (Path possibleLogLocation : locs) {
LOG.info("Possible location " + possibleLogLocation.toUri().toString());
if (manager.getFs().exists(possibleLogLocation)) {
// We found the right new location
LOG.info("Log " + this.currentPath + " still exists at " +
possibleLogLocation);
// When running ReplicationSyncUp tool, we should replicate the data from WAL
// which is moved to WAL splitting directory also.
if (stopper instanceof ReplicationSyncUp.DummyServer) {
// Open the log at the this location
this.currentPath = possibleLogLocation;
this.openReader(sleepMultiplier);
}
return true;
}
}
}
// In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
// from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
if (stopper instanceof ReplicationSyncUp.DummyServer) {
// N.B. the ReplicationSyncUp tool sets the manager.getLogDir to the root of the wal
// area rather than to the wal area for a particular region server.
FileStatus[] rss = fs.listStatus(manager.getLogDir());
for (FileStatus rs : rss) {
Path p = rs.getPath();
FileStatus[] logs = fs.listStatus(p);
for (FileStatus log : logs) {
String logName = log.getPath().getName();
if (logName.equals(currentPath.getName())) {
currentPath = new Path(p, logName);
LOG.info("Log " + currentPath.getName() + " found at " + currentPath);
// Open the log at the new location
this.openReader(sleepMultiplier);
return true;
}
}
}
}
// TODO What happens if the log was missing from every single location?
// Although we need to check a couple of times as the log could have
// been moved by the master between the checks
// It can also happen if a recovered queue wasn't properly cleaned,
// such that the znode pointing to a log exists but the log was
// deleted a long time ago.
// For the moment, we'll throw the IO and processEndOfFile
throw new IOException("File from recovered queue is " +
"nowhere to be found", fnfe);
} else {
// If the log was archived, continue reading from there
Path archivedLogLocation =
new Path(manager.getOldLogDir(), currentPath.getName());
if (manager.getFs().exists(archivedLogLocation)) {
currentPath = archivedLogLocation;
LOG.info("Log " + this.currentPath + " was moved to " +
archivedLogLocation);
// Open the log at the new location
this.openReader(sleepMultiplier);
}
// TODO What happens the log is missing in both places?
}
}
} catch (LeaseNotRecoveredException lnre) {
// HBASE-15019 the WAL was not closed due to some hiccup.
LOG.warn(peerClusterZnode + " Try to recover the WAL lease " + currentPath, lnre);
recoverLease(conf, currentPath);
this.reader = null;
} catch (IOException ioe) {
if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
LOG.warn(peerClusterZnode + " Got: ", ioe);
this.reader = null;
if (ioe.getCause() instanceof NullPointerException) {
// Workaround for race condition in HDFS-4380
// which throws a NPE if we open a file before any data node has the most recent block
// Just sleep and retry. Will require re-reading compressed WALs for compressionContext.
LOG.warn("Got NPE opening reader, will retry.");
} else if (sleepMultiplier >= maxRetriesMultiplier
&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
// TODO Need a better way to determine if a file is really gone but
// TODO without scanning all logs dir
LOG.warn("Waited too long for this file, considering dumping");
return !processEndOfFile(true);
}
}
return true;
}
private void recoverLease(final Configuration conf, final Path path) {
try {
final FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable() {
@Override
public boolean progress() {
LOG.debug("recover WAL lease: " + path);
return isWorkerActive();
}
});
} catch (IOException e) {
LOG.warn("unable to recover lease for WAL: " + path, e);
}
}
/*
* Checks whether the current log file is empty, and it is not a recovered queue. This is to
* handle scenario when in an idle cluster, there is no entry in the current log and we keep on
* trying to read the log file and get EOFException. In case of a recovered queue the last log
* file may be empty, and we don't want to retry that.
*/
private boolean isCurrentLogEmpty() {
return (this.repLogReader.getPosition() == 0 &&
!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
}
/**
* Count the number of different row keys in the given edit because of mini-batching. We assume
* that there's at least one Cell in the WALEdit.
* @param edit edit to count row keys from
* @return number of different row keys
*/
private int countDistinctRowKeys(WALEdit edit) {
List<Cell> cells = edit.getCells();
int distinctRowKeys = 1;
Cell lastCell = cells.get(0);
for (int i = 0; i < edit.size(); i++) {
if (!CellUtil.matchingRow(cells.get(i), lastCell)) {
distinctRowKeys++;
}
}
return distinctRowKeys;
}
/**
* Do the shipping logic
* @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
* written to when this method was called
*/
protected void shipEdits(boolean currentWALisBeingWrittenTo, List<WAL.Entry> entries) {
int sleepMultiplier = 0;
if (entries.isEmpty()) {
LOG.warn("Was given 0 edits to ship");
return;
}
while (isWorkerActive()) {
try {
if (throttler.isEnabled()) {
long sleepTicks = throttler.getNextSleepInterval(currentSize);
if (sleepTicks > 0) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
}
Thread.sleep(sleepTicks);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping for throttling control");
Thread.currentThread().interrupt();
// current thread might be interrupted to terminate
// directly go back to while() for confirm this
continue;
}
// reset throttler's cycle start tick when sleep for throttling occurs
throttler.resetStartTick();
}
}
// create replicateContext here, so the entries can be GC'd upon return from this call
// stack
ReplicationEndpoint.ReplicateContext replicateContext =
new ReplicationEndpoint.ReplicateContext();
replicateContext.setEntries(entries).setSize(currentSize);
replicateContext.setWalGroupId(walGroupId);
long startTimeNs = System.nanoTime();
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged
boolean replicated = replicationEndpoint.replicate(replicateContext);
long endTimeNs = System.nanoTime();
if (!replicated) {
continue;
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
currentWALisBeingWrittenTo);
this.lastLoggedPosition = this.repLogReader.getPosition();
}
if (throttler.isEnabled()) {
throttler.addPushSize(currentSize);
}
totalReplicatedEdits.addAndGet(entries.size());
totalReplicatedOperations.addAndGet(currentNbOperations);
// FIXME check relationship between wal group and overall
metrics.shipBatch(currentNbOperations, currentSize / 1024);
metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
walGroupId);
if (LOG.isTraceEnabled()) {
LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
+ totalReplicatedOperations + " operations in "
+ ((endTimeNs - startTimeNs) / 1000000) + " ms");
}
break;
} catch (Exception ex) {
LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
+ org.apache.hadoop.util.StringUtils.stringifyException(ex));
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
}
/**
* If the queue isn't empty, switch to the next one Else if this is a recovered queue, it means
* we're done! Else we'll just continue to try reading the log file
* @return true if we're done with the current file, false if we should continue trying to read
* from it
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE",
justification = "Yeah, this is how it works")
protected boolean processEndOfFile(boolean dumpOnlyIfZeroLength) {
// We presume this means the file we're reading is closed.
if (this.queue.size() != 0) {
// -1 means the wal wasn't closed cleanly.
final long trailerSize = this.repLogReader.currentTrailerSize();
final long currentPosition = this.repLogReader.getPosition();
FileStatus stat = null;
try {
stat = fs.getFileStatus(this.currentPath);
} catch (IOException exception) {
LOG.warn("Couldn't get file length information about log " + this.currentPath + ", it " + (trailerSize < 0 ? "was not" : "was") + " closed cleanly"
+ ", stats: " + getStats());
metrics.incrUnknownFileLengthForClosedWAL();
}
if (stat != null) {
if (trailerSize < 0) {
if (currentPosition < stat.getLen()) {
final long skippedBytes = stat.getLen() - currentPosition;
LOG.info("Reached the end of WAL file '" + currentPath + "'. It was not closed cleanly, so we did not parse " + skippedBytes + " bytes of data.");
metrics.incrUncleanlyClosedWALs();
metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
}
} else if (currentPosition + trailerSize < stat.getLen()){
LOG.warn("Processing end of WAL file '" + currentPath + "'. At position " + currentPosition + ", which is too far away from reported file length " + stat.getLen() +
". Restarting WAL reading (see HBASE-15983 for details). stats: " + getStats());
repLogReader.setPosition(0);
metrics.incrRestartedWALReading();
metrics.incrRepeatedFileBytes(currentPosition);
return false;
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("Reached the end of log " + this.currentPath + ", stats: " + getStats()
+ ", and the length of the file is " + (stat == null ? "N/A" : stat.getLen()));
}
if (dumpOnlyIfZeroLength && stat.getLen() != 0) {
return false;
}
this.currentPath = null;
this.repLogReader.finishCurrentFile();
this.reader = null;
metrics.incrCompletedWAL();
return true;
} else if (this.replicationQueueInfo.isQueueRecovered()) {
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ peerClusterZnode);
metrics.incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
return true;
}
return false;
}
public void startup() {
String n = Thread.currentThread().getName();
Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
+ getCurrentPath(), e);
stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
}
};
Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
+ peerClusterZnode, handler);
workerThreads.put(walGroupId, this);
}
public Path getCurrentPath() {
return this.currentPath;
}
public long getCurrentPosition() {
return this.repLogReader.getPosition();
}
private boolean isWorkerActive() {
return !stopper.isStopped() && state == WorkerState.RUNNING && !isInterrupted();
}
private void terminate(String reason, Exception cause) {
if (cause == null) {
LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
} else {
LOG.error("Closing worker for wal group " + this.walGroupId
+ " because an error occurred: " + reason, cause);
}
setWorkerState(WorkerState.STOPPED);
this.interrupt();
Threads.shutdown(this, sleepForRetries);
LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
}
/**
* Set the worker state
* @param state
*/
public void setWorkerState(WorkerState state) {
this.state = state;
}
/**
* Get the current state of this worker.
* @return WorkerState
*/
public WorkerState getWorkerState() {
return state;
}
}
}