blob: 363fa29909e06e47fde65b00d286fd9f41916cbf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES;
import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.PrivateConstants;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Shipper;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.StoreFileReader;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
/**
* A compactor is a compaction algorithm associated a given policy. Base class also contains
* reusable parts for implementing compactors (what is common and what isn't is evolving).
* <p>
* Compactions might be concurrent against a given store and the Compactor is shared among them. Do
* not put mutable state into class fields. All Compactor class fields should be final or
* effectively final. 'keepSeqIdPeriod' is an exception to this rule because unit tests may set it.
*/
@InterfaceAudience.Private
public abstract class Compactor<T extends CellSink> {
private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000;
protected final Configuration conf;
protected final HStore store;
protected final int compactionKVMax;
protected final Compression.Algorithm majorCompactionCompression;
protected final Compression.Algorithm minorCompactionCompression;
/** specify how many days to keep MVCC values during major compaction **/
protected int keepSeqIdPeriod;
// Configs that drive whether we drop page cache behind compactions
protected static final String MAJOR_COMPACTION_DROP_CACHE =
"hbase.regionserver.majorcompaction.pagecache.drop";
protected static final String MINOR_COMPACTION_DROP_CACHE =
"hbase.regionserver.minorcompaction.pagecache.drop";
protected final boolean dropCacheMajor;
protected final boolean dropCacheMinor;
// We track progress per request using the CompactionRequestImpl identity as key.
// completeCompaction() cleans up this state.
private final Set<CompactionProgress> progressSet =
Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
// TODO: depending on Store is not good but, realistically, all compactors currently do.
Compactor(Configuration conf, HStore store) {
this.conf = conf;
this.store = store;
this.compactionKVMax =
this.conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
this.majorCompactionCompression = (store.getColumnFamilyDescriptor() == null)
? Compression.Algorithm.NONE
: store.getColumnFamilyDescriptor().getMajorCompactionCompressionType();
this.minorCompactionCompression = (store.getColumnFamilyDescriptor() == null)
? Compression.Algorithm.NONE
: store.getColumnFamilyDescriptor().getMinorCompactionCompressionType();
this.keepSeqIdPeriod =
Math.max(this.conf.getInt(HConstants.KEEP_SEQID_PERIOD, HConstants.MIN_KEEP_SEQID_PERIOD),
HConstants.MIN_KEEP_SEQID_PERIOD);
this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true);
this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true);
}
protected interface CellSinkFactory<S> {
S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind, boolean major,
Consumer<Path> writerCreationTracker) throws IOException;
}
/** The sole reason this class exists is that java has no ref/out/pointer parameters. */
protected static class FileDetails {
/** Maximum key count after compaction (for blooms) */
public long maxKeyCount = 0;
/** Earliest put timestamp if major compaction */
public long earliestPutTs = HConstants.LATEST_TIMESTAMP;
/** Latest put timestamp */
public long latestPutTs = HConstants.LATEST_TIMESTAMP;
/** The last key in the files we're compacting. */
public long maxSeqId = 0;
/** Latest memstore read point found in any of the involved files */
public long maxMVCCReadpoint = 0;
/** Max tags length **/
public int maxTagsLength = 0;
/** Min SeqId to keep during a major compaction **/
public long minSeqIdToKeep = 0;
/** Total size of the compacted files **/
private long totalCompactedFilesSize = 0;
}
/**
* Extracts some details about the files to compact that are commonly needed by compactors.
* @param filesToCompact Files.
* @param allFiles Whether all files are included for compaction
* @parma major If major compaction
* @return The result.
*/
private FileDetails getFileDetails(Collection<HStoreFile> filesToCompact, boolean allFiles,
boolean major) throws IOException {
FileDetails fd = new FileDetails();
long oldestHFileTimestampToKeepMVCC =
EnvironmentEdgeManager.currentTime() - (1000L * 60 * 60 * 24 * this.keepSeqIdPeriod);
for (HStoreFile file : filesToCompact) {
if (allFiles && (file.getModificationTimestamp() < oldestHFileTimestampToKeepMVCC)) {
// when isAllFiles is true, all files are compacted so we can calculate the smallest
// MVCC value to keep
if (fd.minSeqIdToKeep < file.getMaxMemStoreTS()) {
fd.minSeqIdToKeep = file.getMaxMemStoreTS();
}
}
long seqNum = file.getMaxSequenceId();
fd.maxSeqId = Math.max(fd.maxSeqId, seqNum);
StoreFileReader r = file.getReader();
if (r == null) {
LOG.warn("Null reader for " + file.getPath());
continue;
}
// NOTE: use getEntries when compacting instead of getFilterEntries, otherwise under-sized
// blooms can cause progress to be miscalculated or if the user switches bloom
// type (e.g. from ROW to ROWCOL)
long keyCount = r.getEntries();
fd.maxKeyCount += keyCount;
// calculate the latest MVCC readpoint in any of the involved store files
Map<byte[], byte[]> fileInfo = r.loadFileInfo();
// calculate the total size of the compacted files
fd.totalCompactedFilesSize += r.length();
byte[] tmp = null;
// Get and set the real MVCCReadpoint for bulk loaded files, which is the
// SeqId number.
if (r.isBulkLoaded()) {
fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, r.getSequenceID());
} else {
tmp = fileInfo.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
if (tmp != null) {
fd.maxMVCCReadpoint = Math.max(fd.maxMVCCReadpoint, Bytes.toLong(tmp));
}
}
tmp = fileInfo.get(HFileInfo.MAX_TAGS_LEN);
if (tmp != null) {
fd.maxTagsLength = Math.max(fd.maxTagsLength, Bytes.toInt(tmp));
}
// If required, calculate the earliest put timestamp of all involved storefiles.
// This is used to remove family delete marker during compaction.
long earliestPutTs = 0;
if (allFiles) {
tmp = fileInfo.get(EARLIEST_PUT_TS);
if (tmp == null) {
// There's a file with no information, must be an old one
// assume we have very old puts
fd.earliestPutTs = earliestPutTs = PrivateConstants.OLDEST_TIMESTAMP;
} else {
earliestPutTs = Bytes.toLong(tmp);
fd.earliestPutTs = Math.min(fd.earliestPutTs, earliestPutTs);
}
}
tmp = fileInfo.get(TIMERANGE_KEY);
fd.latestPutTs =
tmp == null ? HConstants.LATEST_TIMESTAMP : TimeRangeTracker.parseFrom(tmp).getMax();
LOG.debug(
"Compacting {}, keycount={}, bloomtype={}, size={}, "
+ "encoding={}, compression={}, seqNum={}{}",
(file.getPath() == null ? null : file.getPath().getName()), keyCount,
r.getBloomFilterType().toString(), TraditionalBinaryPrefix.long2String(r.length(), "", 1),
r.getHFileReader().getDataBlockEncoding(),
major ? majorCompactionCompression : minorCompactionCompression, seqNum,
(allFiles ? ", earliestPutTs=" + earliestPutTs : ""));
}
return fd;
}
/**
* Creates file scanners for compaction.
* @param filesToCompact Files.
* @return Scanners.
*/
private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact,
long smallestReadPoint, boolean useDropBehind) throws IOException {
return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind,
smallestReadPoint);
}
private long getSmallestReadPoint() {
return store.getSmallestReadPoint();
}
protected interface InternalScannerFactory {
ScanType getScanType(CompactionRequestImpl request);
InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException;
}
protected final InternalScannerFactory defaultScannerFactory = new InternalScannerFactory() {
@Override
public ScanType getScanType(CompactionRequestImpl request) {
return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES;
}
@Override
public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
fd.earliestPutTs);
}
};
protected final CreateStoreFileWriterParams createParams(FileDetails fd, boolean shouldDropBehind,
boolean major, Consumer<Path> writerCreationTracker) {
return CreateStoreFileWriterParams.create().maxKeyCount(fd.maxKeyCount)
.compression(major ? majorCompactionCompression : minorCompactionCompression)
.isCompaction(true).includeMVCCReadpoint(fd.maxMVCCReadpoint > 0)
.includesTag(fd.maxTagsLength > 0).shouldDropBehind(shouldDropBehind)
.totalCompactedFilesSize(fd.totalCompactedFilesSize)
.writerCreationTracker(writerCreationTracker);
}
/**
* Creates a writer for a new file.
* @param fd The file details.
* @return Writer for a new StoreFile
* @throws IOException if creation failed
*/
protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
boolean major, Consumer<Path> writerCreationTracker) throws IOException {
// When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389.
return store.getStoreEngine()
.createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker));
}
protected final StoreFileWriter createWriter(FileDetails fd, boolean shouldDropBehind,
String fileStoragePolicy, boolean major, Consumer<Path> writerCreationTracker)
throws IOException {
return store.getStoreEngine()
.createWriter(createParams(fd, shouldDropBehind, major, writerCreationTracker)
.fileStoragePolicy(fileStoragePolicy));
}
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
User user) throws IOException {
if (store.getCoprocessorHost() == null) {
return store.getScanInfo();
}
return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(),
request, user);
}
/**
* Calls coprocessor, if any, to create scanners - after normal scanner creation.
* @param request Compaction request.
* @param scanType Scan type.
* @param scanner The default scanner created for compaction.
* @return Scanner scanner to use (usually the default); null if compaction should not proceed.
*/
private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
InternalScanner scanner, User user) throws IOException {
if (store.getCoprocessorHost() == null) {
return scanner;
}
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
request, user);
}
protected final List<Path> compact(final CompactionRequestImpl request,
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
ThroughputController throughputController, User user) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles(), request.isMajor());
// Find the smallest read point across all the Scanners.
long smallestReadPoint = getSmallestReadPoint();
boolean dropCache;
if (request.isMajor() || request.isAllFiles()) {
dropCache = this.dropCacheMajor;
} else {
dropCache = this.dropCacheMinor;
}
InternalScanner scanner = null;
boolean finished = false;
List<StoreFileScanner> scanners =
createFileScanners(request.getFiles(), smallestReadPoint, dropCache);
T writer = null;
CompactionProgress progress = new CompactionProgress(fd.maxKeyCount);
progressSet.add(progress);
try {
/* Include deletes, unless we are doing a major compaction */
ScanType scanType = scannerFactory.getScanType(request);
ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user);
scanner = postCompactScannerOpen(request, scanType,
scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user);
boolean cleanSeqId = false;
if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) {
// For mvcc-sensitive family, we never set mvcc to 0.
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
writer = sinkFactory.createWriter(scanner, fd, dropCache, request.isMajor(),
request.getWriterCreationTracker());
finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId,
throughputController, request, progress);
if (!finished) {
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
+ store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
}
} finally {
// createScanner may fail when seeking hfiles encounter Exception, e.g. even only one hfile
// reader encounters java.io.IOException: Invalid HFile block magic:
// \x00\x00\x00\x00\x00\x00\x00\x00
// and then scanner will be null, but scanners for all the hfiles should be closed,
// or else we will find leak of ESTABLISHED sockets.
if (scanner == null) {
for (StoreFileScanner sfs : scanners) {
sfs.close();
}
} else {
Closeables.close(scanner, true);
}
if (!finished) {
if (writer != null) {
abortWriter(writer);
}
} else {
store.updateCompactedMetrics(request.isMajor(), progress);
}
progressSet.remove(progress);
}
assert finished : "We should have exited the method on all error paths";
assert writer != null : "Writer should be non-null if no error";
return commitWriter(writer, fd, request);
}
protected abstract List<Path> commitWriter(T writer, FileDetails fd,
CompactionRequestImpl request) throws IOException;
protected abstract void abortWriter(T writer) throws IOException;
/**
* Performs the compaction.
* @param fd FileDetails of cell sink writer
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @param cleanSeqId When true, remove seqId(used to be mvcc) value which is &lt;=
* smallestReadPoint
* @param request compaction request.
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
CompactionRequestImpl request, CompactionProgress progress) throws IOException {
assert writer instanceof ShipperListener;
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
List<Cell> cells = new ArrayList<>();
long currentTime = EnvironmentEdgeManager.currentTime();
long lastMillis = 0;
if (LOG.isDebugEnabled()) {
lastMillis = currentTime;
}
CloseChecker closeChecker = new CloseChecker(conf, currentTime);
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
long now = 0;
boolean hasMore;
ScannerContext scannerContext =
ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build();
throughputController.start(compactionName);
Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null;
long shippedCallSizeLimit =
(long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();
try {
do {
hasMore = scanner.next(cells, scannerContext);
currentTime = EnvironmentEdgeManager.currentTime();
if (LOG.isDebugEnabled()) {
now = currentTime;
}
if (closeChecker.isTimeLimit(store, currentTime)) {
progress.cancel();
return false;
}
// output to writer:
Cell lastCleanCell = null;
long lastCleanCellSeqId = 0;
for (Cell c : cells) {
if (cleanSeqId && c.getSequenceId() <= smallestReadPoint) {
lastCleanCell = c;
lastCleanCellSeqId = c.getSequenceId();
PrivateCellUtil.setSequenceId(c, 0);
} else {
lastCleanCell = null;
lastCleanCellSeqId = 0;
}
writer.append(c);
int len = c.getSerializedSize();
++progress.currentCompactedKVs;
progress.totalCompactedSize += len;
bytesWrittenProgressForShippedCall += len;
if (LOG.isDebugEnabled()) {
bytesWrittenProgressForLog += len;
}
throughputController.control(compactionName, len);
if (closeChecker.isSizeLimit(store, len)) {
progress.cancel();
return false;
}
}
if (shipper != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
if (lastCleanCell != null) {
// HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly.
// ShipperListener will do a clone of the last cells it refer, so need to set back
// sequence id before ShipperListener.beforeShipped
PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
}
// Clone the cells that are in the writer so that they are freed of references,
// if they are holding any.
((ShipperListener) writer).beforeShipped();
// The SHARED block references, being read for compaction, will be kept in prevBlocks
// list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells
// being returned to client, we will call shipped() which can clear this list. Here by
// we are doing the similar thing. In between the compaction (after every N cells
// written with collective size of 'shippedCallSizeLimit') we will call shipped which
// may clear prevBlocks list.
shipper.shipped();
bytesWrittenProgressForShippedCall = 0;
}
if (lastCleanCell != null) {
// HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly
PrivateCellUtil.setSequenceId(lastCleanCell, lastCleanCellSeqId);
}
// Log the progress of long running compactions every minute if
// logging at DEBUG level
if (LOG.isDebugEnabled()) {
if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
String rate = String.format("%.2f",
(bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
compactionName, progress, rate, throughputController);
lastMillis = now;
bytesWrittenProgressForLog = 0;
}
}
cells.clear();
} while (hasMore);
} catch (InterruptedException e) {
progress.cancel();
throw new InterruptedIOException(
"Interrupted while control throughput of compacting " + compactionName);
} finally {
// Clone last cell in the final because writer will append last cell when committing. If
// don't clone here and once the scanner get closed, then the memory of last cell will be
// released. (HBASE-22582)
((ShipperListener) writer).beforeShipped();
throughputController.finish(compactionName);
}
progress.complete();
return true;
}
/**
* @param store store
* @param scanners Store file scanners.
* @param scanType Scan type.
* @param smallestReadPoint Smallest MVCC read point.
* @param earliestPutTs Earliest put across all files.
* @return A compaction scanner.
*/
protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs)
throws IOException {
return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs);
}
/**
* @param store The store.
* @param scanners Store file scanners.
* @param smallestReadPoint Smallest MVCC read point.
* @param earliestPutTs Earliest put across all files.
* @param dropDeletesFromRow Drop deletes starting with this row, inclusive. Can be null.
* @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null.
* @return A compaction scanner.
*/
protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
dropDeletesFromRow, dropDeletesToRow);
}
/**
* Return the aggregate progress for all currently active compactions.
*/
public CompactionProgress getProgress() {
synchronized (progressSet) {
long totalCompactingKVs = 0;
long currentCompactedKVs = 0;
long totalCompactedSize = 0;
for (CompactionProgress progress : progressSet) {
totalCompactingKVs += progress.totalCompactingKVs;
currentCompactedKVs += progress.currentCompactedKVs;
totalCompactedSize += progress.totalCompactedSize;
}
CompactionProgress result = new CompactionProgress(totalCompactingKVs);
result.currentCompactedKVs = currentCompactedKVs;
result.totalCompactedSize = totalCompactedSize;
return result;
}
}
public boolean isCompacting() {
return !progressSet.isEmpty();
}
}