blob: d3af995d6d9cbe8209a1bbe56445c43b592c7cd3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
/**
* This thread reads entries from a queue and ships them. Entries are placed onto the queue by
* ReplicationSourceWALReaderThread
*/
@InterfaceAudience.Private
public class ReplicationSourceShipper extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class);
// Hold the state of a replication worker thread
public enum WorkerState {
RUNNING,
STOPPED,
FINISHED, // The worker is done processing a queue
}
private final Configuration conf;
protected final String walGroupId;
protected final PriorityBlockingQueue<Path> queue;
private final ReplicationSource source;
// Last position in the log that we sent to ZooKeeper
// It will be accessed by the stats thread so make it volatile
private volatile long currentPosition = -1;
// Path of the current log
private Path currentPath;
// Current state of the worker thread
private volatile WorkerState state;
protected ReplicationSourceWALReader entryReader;
// How long should we sleep for each retry
protected final long sleepForRetries;
// Maximum number of retries before taking bold actions
protected final int maxRetriesMultiplier;
private final int DEFAULT_TIMEOUT = 20000;
private final int getEntriesTimeout;
private final int shipEditsTimeout;
public ReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationSource source) {
this.conf = conf;
this.walGroupId = walGroupId;
this.queue = queue;
this.source = source;
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.getEntriesTimeout =
this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds
this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
}
@Override
public final void run() {
setWorkerState(WorkerState.RUNNING);
LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
// Loop until we close down
while (isActive()) {
// Sleep until replication is enabled again
if (!source.isPeerEnabled()) {
// The peer enabled check is in memory, not expensive, so do not need to increase the
// sleep interval as it may cause a long lag when we enable the peer.
sleepForRetries("Replication is disabled", sleepForRetries, 1, maxRetriesMultiplier);
continue;
}
try {
WALEntryBatch entryBatch = entryReader.poll(getEntriesTimeout);
LOG.debug("Shipper from source {} got entry batch from reader: {}",
source.getQueueId(), entryBatch);
if (entryBatch == null) {
continue;
}
// the NO_MORE_DATA instance has no path so do not call shipEdits
if (entryBatch == WALEntryBatch.NO_MORE_DATA) {
noMoreData();
} else {
shipEdits(entryBatch);
}
} catch (InterruptedException | ReplicationRuntimeException e) {
// It is interrupted and needs to quit.
LOG.warn("Interrupted while waiting for next replication entry batch", e);
Thread.currentThread().interrupt();
}
}
// If the worker exits run loop without finishing its task, mark it as stopped.
if (!isFinished()) {
setWorkerState(WorkerState.STOPPED);
} else {
source.removeWorker(this);
postFinish();
}
}
private void noMoreData() {
if (source.isRecovered()) {
LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId,
source.getQueueId());
source.getSourceMetrics().incrCompletedRecoveryQueue();
} else {
LOG.debug("Finished queue for group {} of peer {}", walGroupId, source.getQueueId());
}
setWorkerState(WorkerState.FINISHED);
}
// To be implemented by recovered shipper
protected void postFinish() {
}
/**
* get batchEntry size excludes bulk load file sizes.
* Uses ReplicationSourceWALReader's static method.
*/
private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch entryBatch) {
int totalSize = 0;
for(Entry entry : entryBatch.getWalEntries()) {
totalSize += ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(entry);
}
return totalSize;
}
/**
* Do the shipping logic
*/
private void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
updateLogPosition(entryBatch);
return;
}
int currentSize = (int) entryBatch.getHeapSize();
int sizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(entryBatch);
source.getSourceMetrics().setTimeStampNextToReplicate(entries.get(entries.size() - 1)
.getKey().getWriteTime());
while (isActive()) {
try {
try {
source.tryThrottle(currentSize);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping for throttling control");
Thread.currentThread().interrupt();
// current thread might be interrupted to terminate
// directly go back to while() for confirm this
continue;
}
// create replicateContext here, so the entries can be GC'd upon return from this call
// stack
ReplicationEndpoint.ReplicateContext replicateContext =
new ReplicationEndpoint.ReplicateContext();
replicateContext.setEntries(entries).setSize(currentSize);
replicateContext.setWalGroupId(walGroupId);
replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));
long startTimeNs = System.nanoTime();
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged
boolean replicated = source.getReplicationEndpoint().replicate(replicateContext);
long endTimeNs = System.nanoTime();
if (!replicated) {
continue;
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
// Clean up hfile references
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
LOG.trace("shipped entry {}: ", entry);
}
// Log and clean up WAL logs
updateLogPosition(entryBatch);
//offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
//this sizeExcludeBulkLoad has to use same calculation that when calling
//acquireBufferQuota() in ReplicationSourceWALReader because they maintain
//same variable: totalBufferUsed
source.postShipEdits(entries, sizeExcludeBulkLoad);
// FIXME check relationship between wal group and overall
source.getSourceMetrics().shipBatch(entryBatch.getNbOperations(), currentSize,
entryBatch.getNbHFiles());
source.getSourceMetrics().setAgeOfLastShippedOp(
entries.get(entries.size() - 1).getKey().getWriteTime(), walGroupId);
source.getSourceMetrics().updateTableLevelMetrics(entryBatch.getWalEntriesWithSize());
if (LOG.isTraceEnabled()) {
LOG.debug("Replicated {} entries or {} operations in {} ms",
entries.size(), entryBatch.getNbOperations(), (endTimeNs - startTimeNs) / 1000000);
}
break;
} catch (Exception ex) {
LOG.warn("{} threw unknown exception:",
source.getReplicationEndpoint().getClass().getName(), ex);
if (sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier,
maxRetriesMultiplier)) {
sleepMultiplier++;
}
}
}
}
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = source.getPeerId();
if (peerId.contains("-")) {
// peerClusterZnode will be in the form peerId + "-" + rsZNode.
// A peerId will not have "-" in its name, see HBASE-11394
peerId = peerId.split("-")[0];
}
List<Cell> cells = edit.getCells();
int totalCells = cells.size();
for (int i = 0; i < totalCells; i++) {
Cell cell = cells.get(i);
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
List<StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
List<String> storeFileList = stores.get(j).getStoreFileList();
source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
}
}
}
}
private boolean updateLogPosition(WALEntryBatch batch) {
boolean updated = false;
// if end of file is true, then the logPositionAndCleanOldLogs method will remove the file
// record on zk, so let's call it. The last wal position maybe zero if end of file is true and
// there is no entry in the batch. It is OK because that the queue storage will ignore the zero
// position and the file will be removed soon in cleanOldLogs.
if (batch.isEndOfFile() || !batch.getLastWalPath().equals(currentPath) ||
batch.getLastWalPosition() != currentPosition) {
source.logPositionAndCleanOldLogs(batch);
updated = true;
}
// if end of file is true, then we can just skip to the next file in queue.
// the only exception is for recovered queue, if we reach the end of the queue, then there will
// no more files so here the currentPath may be null.
if (batch.isEndOfFile()) {
currentPath = entryReader.getCurrentPath();
currentPosition = 0L;
} else {
currentPath = batch.getLastWalPath();
currentPosition = batch.getLastWalPosition();
}
return updated;
}
public void startup(UncaughtExceptionHandler handler) {
String name = Thread.currentThread().getName();
Threads.setDaemonThreadRunning(this,
name + ".replicationSource.shipper" + walGroupId + "," + source.getQueueId(),
handler::uncaughtException);
}
Path getCurrentPath() {
return entryReader.getCurrentPath();
}
long getCurrentPosition() {
return currentPosition;
}
void setWALReader(ReplicationSourceWALReader entryReader) {
this.entryReader = entryReader;
}
long getStartPosition() {
return 0;
}
protected boolean isActive() {
return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
}
protected final void setWorkerState(WorkerState state) {
this.state = state;
}
void stopWorker() {
setWorkerState(WorkerState.STOPPED);
}
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
}