blob: a121e651934d9536c0f9445d9e4c3867dc3d665f [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
* Class that handles the source of a replication stream.
* Currently does not handle more than 1 slave
* For each slave cluster it selects a random number of peers
* using a replication ratio. For example, if replication ration = 0.1
* and slave cluster has 100 region servers, 10 will be selected.
* <p>
* A stream is considered down when we cannot contact a region server on the
* peer cluster for more than 55 seconds by default.
* </p>
*
*/
@InterfaceAudience.Private
public class ReplicationSource extends Thread implements ReplicationSourceInterface {
private static final Log LOG = LogFactory.getLog(ReplicationSource.class);
protected ReplicationSourceLogQueue logQueue;
// per group queue size, keep no more than this number of logs in each wal group
private int queueSizePerGroup;
private ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers;
private Configuration conf;
private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
private String peerId;
// The manager of all sources to which we ping back our progress
private ReplicationSourceManager manager;
// Should we stop everything?
private Stoppable stopper;
// How long should we sleep for each retry
private long sleepForRetries;
private FileSystem fs;
// id of this cluster
private UUID clusterId;
// id of the other cluster
private UUID peerClusterId;
// total number of edits we replicated
private AtomicLong totalReplicatedEdits = new AtomicLong(0);
// total number of edits we replicated
private AtomicLong totalReplicatedOperations = new AtomicLong(0);
// The znode we currently play with
private String peerClusterZnode;
// Maximum number of retries before taking bold actions
private int maxRetriesMultiplier;
// Indicates if this particular source is running
private volatile boolean sourceRunning = false;
// Indicates if the source initialization is in progress
private volatile boolean startupOngoing = false;
// Metrics for this source
private MetricsSource metrics;
// ReplicationEndpoint which will handle the actual replication
private ReplicationEndpoint replicationEndpoint;
// A filter (or a chain of filters) for the WAL entries.
private WALEntryFilter walEntryFilter;
// throttler
private ReplicationThrottler throttler;
private long defaultBandwidth;
private long currentBandwidth;
private ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads =
new ConcurrentHashMap<String, ReplicationSourceShipperThread>();
// Hold the state of a replication worker thread
public enum WorkerState {
RUNNING,
STOPPED,
FINISHED // The worker is done processing a recovered queue
}
private AtomicLong totalBufferUsed;
/**
* Instantiation method used by region servers
*
* @param conf configuration to use
* @param fs file system to use
* @param manager replication manager to ping to
* @param stopper the atomic boolean to use to stop the regionserver
* @param peerClusterZnode the name of our znode
* @param clusterId unique UUID for the cluster
* @param replicationEndpoint the replication endpoint implementation
* @param metrics metrics for replication source
* @throws IOException IO Exception
*/
@Override
public void init(final Configuration conf, final FileSystem fs,
final ReplicationSourceManager manager, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final Stoppable stopper,
final String peerClusterZnode, final UUID clusterId, ReplicationEndpoint replicationEndpoint,
final MetricsSource metrics)
throws IOException {
this.stopper = stopper;
this.conf = HBaseConfiguration.create(conf);
decorateConf();
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
this.logQueue = new ReplicationSourceLogQueue(conf, metrics);
this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
this.manager = manager;
this.fs = fs;
this.metrics = metrics;
this.clusterId = clusterId;
this.peerClusterZnode = peerClusterZnode;
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
this.replicationEndpoint = replicationEndpoint;
defaultBandwidth = this.conf.getLong("replication.source.per.peer.node.bandwidth", 0);
currentBandwidth = getCurrentBandwidth();
this.throttler = new ReplicationThrottler((double) currentBandwidth / 10.0);
this.totalBufferUsed = manager.getTotalBufferUsed();
LOG.info("peerClusterZnode=" + peerClusterZnode + ", ReplicationSource : " + peerId
+ ", currentBandwidth=" + this.currentBandwidth);
}
private void decorateConf() {
String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
if (StringUtils.isNotEmpty(replicationCodec)) {
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
}
}
@Override
public void enqueueLog(Path log) {
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(log.getName());
boolean queueExists = logQueue.enqueueLog(log, logPrefix);
if (!queueExists) {
if (this.sourceRunning) {
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
final ReplicationSourceShipperThread worker =
new ReplicationSourceShipperThread(logPrefix, logQueue, replicationQueueInfo, this);
ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(logPrefix, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + logPrefix);
} else {
LOG.debug("Starting up worker for wal group " + logPrefix);
worker.startup();
}
}
}
}
@InterfaceAudience.Private
public Map<String, PriorityBlockingQueue<Path>> getQueues() {
return logQueue.getQueues();
}
@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
String peerId = peerClusterZnode;
if (peerId.contains("-")) {
// peerClusterZnode will be in the form peerId + "-" + rsZNode.
// A peerId will not have "-" in its name, see HBASE-11394
peerId = peerClusterZnode.split("-")[0];
}
Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs();
if (tableCFMap != null) {
List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName)
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
this.replicationQueues.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
+ Bytes.toString(family) + " to peer id " + peerId);
}
} else {
// user has explicitly not defined any table cfs for replication, means replicate all the
// data
this.replicationQueues.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
}
}
private void uninitialize() {
LOG.debug("Source exiting " + this.peerId);
metrics.clear();
if (replicationEndpoint.state() == Service.State.STARTING
|| replicationEndpoint.state() == Service.State.RUNNING) {
replicationEndpoint.stopAndWait();
}
}
@Override
public void run() {
// mark we are running now
this.sourceRunning = true;
this.setSourceStartupStatus(true);
try {
// start the endpoint, connect to the cluster
Service.State state = replicationEndpoint.start().get();
if (state != Service.State.RUNNING) {
LOG.warn("ReplicationEndpoint was not started. Exiting");
uninitialize();
this.setSourceStartupStatus(false);
return;
}
} catch (Exception ex) {
LOG.warn("Error starting ReplicationEndpoint, exiting", ex);
this.setSourceStartupStatus(false);
throw new RuntimeException(ex);
}
// get the WALEntryFilter from ReplicationEndpoint and add it to default filters
ArrayList<WALEntryFilter> filters = Lists.newArrayList(
(WALEntryFilter)new SystemTableWALEntryFilter());
WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter();
if (filterFromEndpoint != null) {
filters.add(filterFromEndpoint);
}
this.walEntryFilter = new ChainWALEntryFilter(filters);
int sleepMultiplier = 1;
// delay this until we are in an asynchronous thread
while (this.isSourceActive() && this.peerClusterId == null) {
this.peerClusterId = replicationEndpoint.getPeerUUID();
if (this.isSourceActive() && this.peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
if (!this.isSourceActive()) {
this.setSourceStartupStatus(false);
return;
}
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
this.manager.closeQueue(this);
this.setSourceStartupStatus(false);
return;
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
// start workers
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : logQueue.getQueues().entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
final ReplicationSourceShipperThread worker =
new ReplicationSourceShipperThread(walGroupId, logQueue, replicationQueueInfo, this);
ReplicationSourceShipperThread extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup();
}
}
this.setSourceStartupStatus(false);
}
private synchronized void setSourceStartupStatus(boolean initializing) {
startupOngoing = initializing;
if (initializing) {
metrics.incrSourceInitializing();
} else {
metrics.decrSourceInitializing();
}
}
/**
* Do the sleeping logic
* @param msg Why we sleep
* @param sleepMultiplier by how many times the default sleeping time is augmented
* @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
*/
protected boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
}
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries");
Thread.currentThread().interrupt();
}
return sleepMultiplier < maxRetriesMultiplier;
}
/**
* check whether the peer is enabled or not
*
* @return true if the peer is enabled, otherwise false
*/
protected boolean isPeerEnabled() {
return this.replicationPeers.getStatusOfPeer(this.peerId);
}
@Override
public void startup() {
String n = Thread.currentThread().getName();
Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
LOG.error("Unexpected exception in ReplicationSource", e);
}
};
Threads
.setDaemonThreadRunning(this, n + ".replicationSource," + this.peerClusterZnode, handler);
}
@Override
public void terminate(String reason) {
terminate(reason, null);
}
@Override
public void terminate(String reason, Exception cause) {
terminate(reason, cause, true);
}
public void terminate(String reason, Exception cause, boolean join) {
if (cause == null) {
LOG.info("Closing source "
+ this.peerClusterZnode + " because: " + reason);
} else {
LOG.error("Closing source " + this.peerClusterZnode
+ " because an error occurred: " + reason, cause);
}
this.sourceRunning = false;
Collection<ReplicationSourceShipperThread> workers = workerThreads.values();
for (ReplicationSourceShipperThread worker : workers) {
worker.setWorkerState(WorkerState.STOPPED);
worker.entryReader.interrupt();
worker.interrupt();
}
ListenableFuture<Service.State> future = null;
if (this.replicationEndpoint != null) {
future = this.replicationEndpoint.stop();
}
if (join) {
for (ReplicationSourceShipperThread worker : workers) {
Threads.shutdown(worker, this.sleepForRetries);
LOG.info("ReplicationSourceWorker " + worker.getName() + " terminated");
}
if (future != null) {
try {
future.get(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
+ this.peerClusterZnode,
e);
}
}
}
}
@Override
public String getPeerClusterZnode() {
return this.peerClusterZnode;
}
@Override
public String getPeerClusterId() {
return this.peerId;
}
@Override
public Path getCurrentPath() {
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
if (worker.getCurrentPath() != null) {
return worker.getCurrentPath();
}
}
return null;
}
public Path getLastLoggedPath() {
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
return worker.getLastLoggedPath();
}
return null;
}
public long getLastLoggedPosition() {
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
return worker.getLastLoggedPosition();
}
return 0;
}
public boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning;
}
/**
* Comparator used to compare logs together based on their start time
*/
public static class LogsComparator implements Comparator<Path> {
@Override
public int compare(Path o1, Path o2) {
return Long.compare(getTS(o1), getTS(o2));
}
/**
* Split a path to get the start time
* For example: 10.20.20.171%3A60020.1277499063250
* @param p path to split
* @return start time
*/
public static long getTS(Path p) {
int tsIndex = p.getName().lastIndexOf('.') + 1;
return Long.parseLong(p.getName().substring(tsIndex));
}
}
@Override
public String getStats() {
StringBuilder sb = new StringBuilder();
sb.append("Total replicated edits: ").append(totalReplicatedEdits)
.append(", current progress: \n");
for (Map.Entry<String, ReplicationSourceShipperThread> entry : workerThreads.entrySet()) {
String walGroupId = entry.getKey();
ReplicationSourceShipperThread worker = entry.getValue();
long position = worker.getLastLoggedPosition();
Path currentPath = worker.getLastLoggedPath();
sb.append("walGroup [").append(walGroupId).append("]: ");
if (currentPath != null) {
sb.append("currently replicating from: ").append(currentPath).append(" at position: ")
.append(position).append("\n");
} else {
sb.append("no replication ongoing, waiting for new log");
}
}
return sb.toString();
}
@Override
public MetricsSource getSourceMetrics() {
return this.metrics;
}
private long getCurrentBandwidth() {
ReplicationPeer replicationPeer = this.replicationPeers.getPeer(peerId);
long peerBandwidth = replicationPeer != null ? replicationPeer.getPeerBandwidth() : 0;
// user can set peer bandwidth to 0 to use default bandwidth
return peerBandwidth != 0 ? peerBandwidth : defaultBandwidth;
}
@Override
public Map<String, ReplicationStatus> getWalGroupStatus() {
Map<String, ReplicationStatus> sourceReplicationStatus = new TreeMap<>();
long lastTimeStamp, ageOfLastShippedOp, replicationDelay, fileSize;
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
String walGroupId = worker.getWalGroupId();
lastTimeStamp = metrics.getLastTimeStampOfWalGroup(walGroupId);
ageOfLastShippedOp = metrics.getAgeOfLastShippedOp(walGroupId);
int queueSize = logQueue.getQueueSize(walGroupId);
replicationDelay =
ReplicationLoad.calculateReplicationDelay(ageOfLastShippedOp, lastTimeStamp, queueSize);
Path currentPath = worker.getLastLoggedPath();
fileSize = -1;
if (currentPath != null) {
try {
fileSize = fs.getContentSummary(currentPath).getLength();
} catch (IOException e) {
// Ignore, only affects the UI, which will show a size of -1 as expected
// when there is a problem getting the file size
}
} else {
currentPath = new Path("NO_LOGS_IN_QUEUE");
LOG.warn("No replication ongoing, waiting for new log");
}
ReplicationStatus.ReplicationStatusBuilder statusBuilder = ReplicationStatus.newBuilder();
statusBuilder.withPeerId(this.getPeerClusterId())
.withQueueSize(queueSize)
.withWalGroup(walGroupId)
.withCurrentPath(currentPath)
.withCurrentPosition(worker.getLastLoggedPosition())
.withFileSize(fileSize)
.withAgeOfLastShippedOp(ageOfLastShippedOp)
.withReplicationDelay(replicationDelay);
sourceReplicationStatus.put(this.getPeerClusterId() + "=>" + walGroupId,
statusBuilder.build());
}
return sourceReplicationStatus;
}
// This thread reads entries from a queue and ships them.
// Entries are placed onto the queue by ReplicationSourceWALReaderThread
public class ReplicationSourceShipperThread extends Thread {
ReplicationSourceInterface source;
String walGroupId;
ReplicationSourceLogQueue logQueue;
ReplicationQueueInfo replicationQueueInfo;
// Last position in the log that we sent to ZooKeeper
private long lastLoggedPosition = -1;
// Path of the current log
private volatile Path lastLoggedPath;
// Current state of the worker thread
private WorkerState state;
ReplicationSourceWALReaderThread entryReader;
public ReplicationSourceShipperThread(String walGroupId,
ReplicationSourceLogQueue logQueue, ReplicationQueueInfo replicationQueueInfo,
ReplicationSourceInterface source) {
this.walGroupId = walGroupId;
this.logQueue = logQueue;
this.replicationQueueInfo = replicationQueueInfo;
this.source = source;
}
public String getWalGroupId() {
return walGroupId;
}
@Override
public void run() {
setWorkerState(WorkerState.RUNNING);
// Loop until we close down
while (isWorkerActive()) {
int sleepMultiplier = 1;
// Sleep until replication is enabled again
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
while (entryReader == null) {
if (sleepForRetries("Replication WAL entry reader thread not initialized",
sleepMultiplier)) {
sleepMultiplier++;
}
if (sleepMultiplier == maxRetriesMultiplier) {
LOG.warn("Replication WAL entry reader thread not initialized");
}
}
try {
WALEntryBatch entryBatch = entryReader.take();
shipEdits(entryBatch);
if (!entryBatch.hasMoreEntries()) {
LOG.debug("Finished recovering queue for group "
+ walGroupId + " of peer " + peerClusterZnode);
metrics.incrCompletedRecoveryQueue();
setWorkerState(WorkerState.FINISHED);
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt();
}
}
if (getWorkerState() == WorkerState.FINISHED) {
// use synchronize to make sure one last thread will clean the queue
synchronized (this) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
boolean allOtherTaskDone = true;
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
if (!worker.equals(this) && worker.getWorkerState() != WorkerState.FINISHED) {
allOtherTaskDone = false;
break;
}
}
if (allOtherTaskDone) {
manager.closeRecoveredQueue(this.source);
// stop replication endpoint
if (source instanceof ReplicationSource) {
((ReplicationSource) source).replicationEndpoint.stop();
}
LOG.info("Finished recovering queue " + peerClusterZnode
+ " with the following stats: " + getStats());
}
}
}
// If the worker exits run loop without finishing it's task, mark it as stopped.
if (state != WorkerState.FINISHED) {
setWorkerState(WorkerState.STOPPED);
}
}
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = peerClusterZnode;
if (peerId.contains("-")) {
// peerClusterZnode will be in the form peerId + "-" + rsZNode.
// A peerId will not have "-" in its name, see HBASE-11394
peerId = peerClusterZnode.split("-")[0];
}
List<Cell> cells = edit.getCells();
int totalCells = cells.size();
for (int i = 0; i < totalCells; i++) {
Cell cell = cells.get(i);
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
List<StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
List<String> storeFileList = stores.get(j).getStoreFileList();
manager.cleanUpHFileRefs(peerId, storeFileList);
metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
}
}
}
}
private void checkBandwidthChangeAndResetThrottler() {
long peerBandwidth = getCurrentBandwidth();
if (peerBandwidth != currentBandwidth) {
currentBandwidth = peerBandwidth;
throttler.setBandwidth((double) currentBandwidth / 10.0);
LOG.info("ReplicationSource : " + peerId
+ " bandwidth throttling changed, currentBandWidth=" + currentBandwidth);
}
}
/**
* get batchEntry size excludes bulk load file sizes.
* Uses ReplicationSourceWALReader's static method.
*/
private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
int totalSize = 0;
for(Entry entry : entryBatch.getWalEntries()) {
totalSize += entryReader.getEntrySizeExcludeBulkLoad(entry);
}
return totalSize;
}
/**
* Do the shipping logic
*/
protected void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
long lastReadPosition = entryBatch.getLastWalPosition();
lastLoggedPath = entryBatch.getLastWalPath();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
updateLogPosition(lastReadPosition);
// if there was nothing to ship and it's not an error
// set "ageOfLastShippedOp" to <now> to indicate that we're current
metrics.setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), walGroupId);
return;
}
int currentSize = (int) entryBatch.getHeapSize();
int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
while (isWorkerActive()) {
try {
checkBandwidthChangeAndResetThrottler();
if (throttler.isEnabled()) {
long sleepTicks = throttler.getNextSleepInterval(sizeExcludeBulkLoad);
if (sleepTicks > 0) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("To sleep " + sleepTicks + "ms for throttling control");
}
Thread.sleep(sleepTicks);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping for throttling control");
Thread.currentThread().interrupt();
// current thread might be interrupted to terminate
// directly go back to while() for confirm this
continue;
}
// reset throttler's cycle start tick when sleep for throttling occurs
throttler.resetStartTick();
}
}
// create replicateContext here, so the entries can be GC'd upon return from this call
// stack
ReplicationEndpoint.ReplicateContext replicateContext =
new ReplicationEndpoint.ReplicateContext();
replicateContext.setEntries(entries).setSize(currentSize);
replicateContext.setWalGroupId(walGroupId);
long startTimeNs = System.nanoTime();
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged
boolean replicated = replicationEndpoint.replicate(replicateContext);
long endTimeNs = System.nanoTime();
if (!replicated) {
continue;
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
if (this.lastLoggedPosition != lastReadPosition) {
//Clean up hfile references
int size = entries.size();
for (int i = 0; i < size; i++) {
cleanUpHFileRefs(entries.get(i).getEdit());
}
//Log and clean up WAL logs
updateLogPosition(lastReadPosition);
}
if (throttler.isEnabled()) {
throttler.addPushSize(sizeExcludeBulkLoad);
}
releaseBufferQuota(sizeExcludeBulkLoad);
totalReplicatedEdits.addAndGet(entries.size());
totalReplicatedOperations.addAndGet(entryBatch.getNbOperations());
// FIXME check relationship between wal group and overall
metrics.shipBatch(entryBatch.getNbOperations(), currentSize, entryBatch.getNbHFiles());
metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
walGroupId);
source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());
if (LOG.isTraceEnabled()) {
LOG.trace("Replicated " + totalReplicatedEdits + " entries in total, or "
+ totalReplicatedOperations + " operations in "
+ ((endTimeNs - startTimeNs) / 1000000) + " ms");
}
break;
} catch (Exception ex) {
LOG.warn(replicationEndpoint.getClass().getName() + " threw unknown exception:"
+ org.apache.hadoop.util.StringUtils.stringifyException(ex));
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
}
private void updateLogPosition(long lastReadPosition) {
manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
this.replicationQueueInfo.isQueueRecovered(), false);
lastLoggedPosition = lastReadPosition;
}
public void startup() {
String n = Thread.currentThread().getName();
Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in ReplicationSourceWorkerThread," + " currentPath="
+ getLastLoggedPath(), e);
stopper.stop("Unexpected exception in ReplicationSourceWorkerThread");
}
};
Threads.setDaemonThreadRunning(this, n + ".replicationSource." + walGroupId + ","
+ peerClusterZnode, handler);
workerThreads.put(walGroupId, this);
long startPosition = 0;
if (this.replicationQueueInfo.isQueueRecovered()) {
startPosition = getRecoveredQueueStartPos(startPosition);
int numRetries = 0;
while (numRetries <= maxRetriesMultiplier) {
try {
locateRecoveredPaths();
break;
} catch (IOException e) {
LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
numRetries++;
}
}
}
startWALReaderThread(n, handler, startPosition);
}
// If this is a recovered queue, the queue is already full and the first log
// normally has a position (unless the RS failed between 2 logs)
private long getRecoveredQueueStartPos(long startPosition) {
try {
startPosition = (replicationQueues.getLogPosition(peerClusterZnode,
this.logQueue.getQueue(walGroupId).peek().getName()));
if (LOG.isTraceEnabled()) {
LOG.trace("Recovered queue started with log " +
this.logQueue.getQueue(walGroupId).peek() + " at position " + startPosition);
}
} catch (ReplicationException e) {
terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
}
return startPosition;
}
// start a background thread to read and batch entries
private void startWALReaderThread(String threadName, Thread.UncaughtExceptionHandler handler,
long startPosition) {
ArrayList<WALEntryFilter> filters = Lists.newArrayList(walEntryFilter,
new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, logQueue,
startPosition, fs, conf, readerFilter, metrics, ReplicationSource.this,
this.walGroupId);
Threads.setDaemonThreadRunning(entryReader, threadName
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
handler);
}
// Loops through the recovered queue and tries to find the location of each log
// this is necessary because the logs may have moved before recovery was initiated
private void locateRecoveredPaths() throws IOException {
boolean hasPathChanged = false;
PriorityBlockingQueue<Path> newPaths =
new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
pathsLoop: for (Path path : queue) {
if (fs.exists(path)) { // still in same location, don't need to do anything
newPaths.add(path);
continue;
}
// Path changed - try to find the right path.
hasPathChanged = true;
if (stopper instanceof ReplicationSyncUp.DummyServer) {
// In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
// from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
Path newPath = getReplSyncUpPath(path);
newPaths.add(newPath);
continue;
} else {
// See if Path exists in the dead RS folder (there could be a chain of failures
// to look at)
List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
LOG.info("NB dead servers : " + deadRegionServers.size());
final Path walDir = FSUtils.getWALRootDir(conf);
for (String curDeadServerName : deadRegionServers) {
final Path deadRsDirectory =
new Path(walDir, DefaultWALProvider.getWALDirectoryName(curDeadServerName));
Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
deadRsDirectory.suffix(DefaultWALProvider.SPLITTING_EXT), path.getName()) };
for (Path possibleLogLocation : locs) {
LOG.info("Possible location " + possibleLogLocation.toUri().toString());
if (manager.getFs().exists(possibleLogLocation)) {
// We found the right new location
LOG.info("Log " + path + " still exists at " + possibleLogLocation);
newPaths.add(possibleLogLocation);
continue pathsLoop;
}
}
}
// didn't find a new location
LOG.error(
String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
newPaths.add(path);
}
}
if (hasPathChanged) {
if (newPaths.size() != queue.size()) { // this shouldn't happen
LOG.error("Recovery queue size is incorrect");
throw new IOException("Recovery queue size error");
}
// put the correct locations in the queue
// since this is a recovered queue with no new incoming logs,
// there shouldn't be any concurrency issues
logQueue.clear(walGroupId);
for (Path path : newPaths) {
logQueue.enqueueLog(path, walGroupId);
}
}
}
// N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
// area rather than to the wal area for a particular region server.
private Path getReplSyncUpPath(Path path) throws IOException {
FileStatus[] rss = fs.listStatus(manager.getLogDir());
for (FileStatus rs : rss) {
Path p = rs.getPath();
FileStatus[] logs = fs.listStatus(p);
for (FileStatus log : logs) {
String logName = log.getPath().getName();
if (logName.equals(path.getName())) {
p = new Path(p, log.getPath().getName());
LOG.info("Log " + p.getName() + " found at " + p);
return p;
}
}
}
LOG.error("Didn't find path for: " + path.getName());
return path;
}
public Path getCurrentPath() {
return this.entryReader.getCurrentPath();
}
public Path getLastLoggedPath() {
return lastLoggedPath;
}
public long getLastLoggedPosition() {
return lastLoggedPosition;
}
private boolean isWorkerActive() {
return !stopper.isStopped() && state == WorkerState.RUNNING && !isInterrupted();
}
private void terminate(String reason, Exception cause) {
if (cause == null) {
LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
} else {
LOG.error("Closing worker for wal group " + this.walGroupId
+ " because an error occurred: " + reason, cause);
}
entryReader.interrupt();
Threads.shutdown(entryReader, sleepForRetries);
setWorkerState(WorkerState.STOPPED);
this.interrupt();
Threads.shutdown(this, sleepForRetries);
LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
}
/**
* Set the worker state
* @param state the state of the wal reader
*/
public void setWorkerState(WorkerState state) {
this.state = state;
if (entryReader != null) {
entryReader.setReaderRunning(state == WorkerState.RUNNING);
}
}
/**
* Get the current state of this worker.
* @return WorkerState
*/
public WorkerState getWorkerState() {
return state;
}
private void releaseBufferQuota(int size) {
totalBufferUsed.addAndGet(-size);
}
}
}