blob: 9af91e5a59f10bd18a384481d86cec7c25c0cb66 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.EOFException;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
/**
* Reads and filters WAL entries, groups the filtered entries into batches, and puts the batches
* onto a queue
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
class ReplicationSourceWALReader extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceWALReader.class);
private final ReplicationSourceLogQueue logQueue;
private final FileSystem fs;
private final Configuration conf;
private final WALEntryFilter filter;
private final ReplicationSource source;
@InterfaceAudience.Private
final BlockingQueue<WALEntryBatch> entryBatchQueue;
// max (heap) size of each batch - multiply by number of batches in queue to get total
private final long replicationBatchSizeCapacity;
// max count of each batch - multiply by number of batches in queue to get total
private final int replicationBatchCountCapacity;
// position in the WAL to start reading at
private long currentPosition;
private final long sleepForRetries;
private final int maxRetriesMultiplier;
private final boolean eofAutoRecovery;
//Indicates whether this particular worker is running
private boolean isReaderRunning = true;
private AtomicLong totalBufferUsed;
private long totalBufferQuota;
private final String walGroupId;
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
* @param fs the files system to use
* @param conf configuration to use
* @param logQueue The WAL queue to read off of
* @param startPosition position in the first WAL to start reading from
* @param filter The filter to use while reading
* @param source replication source
*/
public ReplicationSourceWALReader(FileSystem fs, Configuration conf,
ReplicationSourceLogQueue logQueue, long startPosition, WALEntryFilter filter,
ReplicationSource source, String walGroupId) {
this.logQueue = logQueue;
this.currentPosition = startPosition;
this.fs = fs;
this.conf = conf;
this.filter = filter;
this.source = source;
this.replicationBatchSizeCapacity =
this.conf.getLong("replication.source.size.capacity", 1024 * 1024 * 64);
this.replicationBatchCountCapacity = this.conf.getInt("replication.source.nb.capacity", 25000);
// memory used will be batchSizeCapacity * (nb.batches + 1)
// the +1 is for the current thread reading before placing onto the queue
int batchCount = conf.getInt("replication.source.nb.batches", 1);
this.totalBufferUsed = source.getSourceManager().getTotalBufferUsed();
this.totalBufferQuota = source.getSourceManager().getTotalBufferLimit();
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000); // 1 second
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.eofAutoRecovery = conf.getBoolean("replication.source.eof.autorecovery", false);
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
this.walGroupId = walGroupId;
LOG.info("peerClusterZnode=" + source.getQueueId()
+ ", ReplicationSourceWALReaderThread : " + source.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
+ ", replicationBatchCountCapacity=" + replicationBatchCountCapacity
+ ", replicationBatchQueueCapacity=" + batchCount);
}
@Override
public void run() {
int sleepMultiplier = 1;
while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
WALEntryBatch batch = null;
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, conf, currentPosition,
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
source.getSourceMetrics(), walGroupId)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
batch = null;
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
}
batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
if (batch == null) {
// got no entries and didn't advance position in WAL
handleEmptyWALEntryBatch();
entryStream.reset(); // reuse stream
continue;
}
// if we have already switched a file, skip reading and put it directly to the ship queue
if (!batch.isEndOfFile()) {
readWALEntries(entryStream, batch);
currentPosition = entryStream.getPosition();
}
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
sleepMultiplier = 1;
}
} catch (WALEntryFilterRetryableException | IOException e) { // stream related
if (!handleEofException(e, batch)) {
LOG.warn("Failed to read stream of replication entries", e);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue");
Thread.currentThread().interrupt();
}
}
}
// returns true if we reach the size limit for batch, i.e, we need to finish the batch and return.
protected final boolean addEntryToBatch(WALEntryBatch batch, Entry entry) {
WALEdit edit = entry.getEdit();
if (edit == null || edit.isEmpty()) {
LOG.debug("Edit null or empty for entry {} ", entry);
return false;
}
LOG.debug("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}",
entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
long entrySize = getEntrySizeIncludeBulkLoad(entry);
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry, entrySize);
updateBatchStats(batch, entry, entrySize);
boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
// Stop if too many entries or too big
return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity ||
batch.getNbEntries() >= replicationBatchCountCapacity;
}
protected static final boolean switched(WALEntryStream entryStream, Path path) {
Path newPath = entryStream.getCurrentPath();
return newPath == null || !path.getName().equals(newPath.getName());
}
// We need to get the WALEntryBatch from the caller so we can add entries in there
// This is required in case there is any exception in while reading entries
// we do not want to loss the existing entries in the batch
protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath();
for (;;) {
Entry entry = entryStream.next();
batch.setLastWalPosition(entryStream.getPosition());
entry = filterEntry(entry);
if (entry != null) {
if (addEntryToBatch(batch, entry)) {
break;
}
}
boolean hasNext = entryStream.hasNext();
// always return if we have switched to a new file
if (switched(entryStream, currentPath)) {
batch.setEndOfFile(true);
break;
}
if (!hasNext) {
break;
}
}
}
private void handleEmptyWALEntryBatch() throws InterruptedException {
LOG.trace("Didn't read any new entries from WAL");
if (logQueue.getQueue(walGroupId).isEmpty()) {
// we're done with current queue, either this is a recovered queue, or it is the special group
// for a sync replication peer and the peer has been transited to DA or S state.
LOG.debug("Stopping the replication source wal reader");
setReaderRunning(false);
// shuts down shipper thread immediately
entryBatchQueue.put(WALEntryBatch.NO_MORE_DATA);
} else {
Thread.sleep(sleepForRetries);
}
}
private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream entryStream)
throws IOException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
// check whether we have switched a file
if (currentPath != null && switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
} else {
return null;
}
}
if (currentPath != null) {
if (switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
}
}
return createBatch(entryStream);
}
/**
* This is to handle the EOFException from the WAL entry stream. EOFException should
* be handled carefully because there are chances of data loss because of never replicating
* the data. Thus we should always try to ship existing batch of entries here.
* If there was only one log in the queue before EOF, we ship the empty batch here
* and since reader is still active, in the next iteration of reader we will
* stop the reader.
* <p/>
* If there was more than one log in the queue before EOF, we ship the existing batch
* and reset the wal patch and position to the log with EOF, so shipper can remove
* logs from replication queue
* @return true only the IOE can be handled
*/
private boolean handleEofException(Exception e, WALEntryBatch batch) {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
(source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
Path path = queue.peek();
try {
if (!fs.exists(path)) {
// There is a chance that wal has moved to oldWALs directory, so look there also.
path = AbstractFSWALProvider.findArchivedLog(path, conf);
// path is null if it couldn't find archive path.
}
if (path != null && fs.getFileStatus(path).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: {}", path);
logQueue.remove(walGroupId);
currentPosition = 0;
if (batch != null) {
// After we removed the WAL from the queue, we should try shipping the existing batch of
// entries
addBatchToShippingQueue(batch);
}
return true;
}
} catch (IOException ioe) {
LOG.warn("Couldn't get file length information about log " + path, ioe);
} catch (InterruptedException ie) {
LOG.trace("Interrupted while adding WAL batch to ship queue");
Thread.currentThread().interrupt();
}
}
return false;
}
/**
* Update the batch try to ship and return true if shipped
* @param batch Batch of entries to ship
* @throws InterruptedException throws interrupted exception
*/
private void addBatchToShippingQueue(WALEntryBatch batch) throws InterruptedException {
// need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
entryBatchQueue.put(batch);
}
public Path getCurrentPath() {
// if we've read some WAL entries, get the Path we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
if (batchQueueHead != null) {
return batchQueueHead.getLastWalPath();
}
// otherwise, we must be currently reading from the head of the log queue
return logQueue.getQueue(walGroupId).peek();
}
//returns false if we've already exceeded the global quota
private boolean checkQuota() {
// try not to go over total quota
if (totalBufferUsed.get() > totalBufferQuota) {
LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B",
this.source.getPeerId(), totalBufferUsed.get(), totalBufferQuota);
Threads.sleep(sleepForRetries);
return false;
}
return true;
}
private WALEntryBatch createBatch(WALEntryStream entryStream) {
return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
}
protected final Entry filterEntry(Entry entry) {
Entry filtered = filter.filter(entry);
if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) {
LOG.debug("Filtered entry for replication: {}", entry);
source.getSourceMetrics().incrLogEditsFiltered();
}
return filtered;
}
/**
* Retrieves the next batch of WAL entries from the queue, waiting up to the specified time for a
* batch to become available
* @return A batch of entries, along with the position in the log after reading the batch
* @throws InterruptedException if interrupted while waiting
*/
public WALEntryBatch take() throws InterruptedException {
return entryBatchQueue.take();
}
public WALEntryBatch poll(long timeout) throws InterruptedException {
return entryBatchQueue.poll(timeout, TimeUnit.MILLISECONDS);
}
private long getEntrySizeIncludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
return getEntrySizeExcludeBulkLoad(entry) + sizeOfStoreFilesIncludeBulkLoad(edit);
}
public static long getEntrySizeExcludeBulkLoad(Entry entry) {
WALEdit edit = entry.getEdit();
WALKey key = entry.getKey();
return edit.heapSize() + key.estimatedSerializedSizeOf();
}
private void updateBatchStats(WALEntryBatch batch, Entry entry, long entrySize) {
WALEdit edit = entry.getEdit();
batch.incrementHeapSize(entrySize);
Pair<Integer, Integer> nbRowsAndHFiles = countDistinctRowKeysAndHFiles(edit);
batch.incrementNbRowKeys(nbRowsAndHFiles.getFirst());
batch.incrementNbHFiles(nbRowsAndHFiles.getSecond());
}
/**
* Count the number of different row keys in the given edit because of mini-batching. We assume
* that there's at least one Cell in the WALEdit.
* @param edit edit to count row keys from
* @return number of different row keys and HFiles
*/
private Pair<Integer, Integer> countDistinctRowKeysAndHFiles(WALEdit edit) {
List<Cell> cells = edit.getCells();
int distinctRowKeys = 1;
int totalHFileEntries = 0;
Cell lastCell = cells.get(0);
int totalCells = edit.size();
for (int i = 0; i < totalCells; i++) {
// Count HFiles to be replicated
if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
try {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
List<StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
totalHFileEntries += stores.get(j).getStoreFileList().size();
}
} catch (IOException e) {
LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ "Then its hfiles count will not be added into metric.", e);
}
}
if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
distinctRowKeys++;
}
lastCell = cells.get(i);
}
Pair<Integer, Integer> result = new Pair<>(distinctRowKeys, totalHFileEntries);
return result;
}
/**
* Calculate the total size of all the store files
* @param edit edit to count row keys from
* @return the total size of the store files
*/
private int sizeOfStoreFilesIncludeBulkLoad(WALEdit edit) {
List<Cell> cells = edit.getCells();
int totalStoreFilesSize = 0;
int totalCells = edit.size();
for (int i = 0; i < totalCells; i++) {
if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
try {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
List<StoreDescriptor> stores = bld.getStoresList();
int totalStores = stores.size();
for (int j = 0; j < totalStores; j++) {
totalStoreFilesSize =
(int) (totalStoreFilesSize + stores.get(j).getStoreFileSizeBytes());
}
} catch (IOException e) {
LOG.error("Failed to deserialize bulk load entry from wal edit. "
+ "Size of HFiles part of cell will not be considered in replication "
+ "request size calculation.", e);
}
}
}
return totalStoreFilesSize;
}
/**
* @param size delta size for grown buffer
* @return true if we should clear buffer and push all
*/
private boolean acquireBufferQuota(long size) {
long newBufferUsed = totalBufferUsed.addAndGet(size);
// Record the new buffer usage
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
return newBufferUsed >= totalBufferQuota;
}
/**
* @return whether the reader thread is running
*/
public boolean isReaderRunning() {
return isReaderRunning && !isInterrupted();
}
/**
* @param readerRunning the readerRunning to set
*/
public void setReaderRunning(boolean readerRunning) {
this.isReaderRunning = readerRunning;
}
}