blob: f60e97db4836d9fee25bede76ea67fd13d1bde21 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.util.DNS;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator;
import org.apache.hbase.thirdparty.com.google.common.math.LongMath;
/**
* HBASE-15181 This is a simple implementation of date-based tiered compaction similar to
* Cassandra's for the following benefits:
* <ol>
* <li>Improve date-range-based scan by structuring store files in date-based tiered layout.</li>
* <li>Reduce compaction overhead.</li>
* <li>Improve TTL efficiency.</li>
* </ol>
* Perfect fit for the use cases that:
* <ol>
* <li>has mostly date-based data write and scan and a focus on the most recent data.</li>
* </ol>
* Out-of-order writes are handled gracefully. Time range overlapping among store files is tolerated
* and the performance impact is minimized. Configuration can be set at hbase-site or overridden at
* per-table or per-column-family level by hbase shell. Design spec is at
* https://docs.google.com/document/d/1_AmlNb2N8Us1xICsTeGDLKIqL6T-oHoRLZ323MG_uy8/
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class DateTieredCompactionPolicy extends SortedCompactionPolicy {
private static final Logger LOG = LoggerFactory.getLogger(DateTieredCompactionPolicy.class);
private final RatioBasedCompactionPolicy compactionPolicyPerWindow;
private final CompactionWindowFactory windowFactory;
public DateTieredCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo)
throws IOException {
super(conf, storeConfigInfo);
try {
compactionPolicyPerWindow = ReflectionUtils.instantiateWithCustomCtor(
comConf.getCompactionPolicyForDateTieredWindow(),
new Class[] { Configuration.class, StoreConfigInformation.class },
new Object[] { conf, storeConfigInfo });
} catch (Exception e) {
throw new IOException("Unable to load configured compaction policy '"
+ comConf.getCompactionPolicyForDateTieredWindow() + "'", e);
}
try {
windowFactory = ReflectionUtils.instantiateWithCustomCtor(
comConf.getDateTieredCompactionWindowFactory(),
new Class[] { CompactionConfiguration.class }, new Object[] { comConf });
} catch (Exception e) {
throw new IOException("Unable to load configured window factory '"
+ comConf.getDateTieredCompactionWindowFactory() + "'", e);
}
}
/**
* Heuristics for guessing whether we need minor compaction.
*/
@Override
@InterfaceAudience.Private
public boolean needsCompaction(Collection<HStoreFile> storeFiles,
List<HStoreFile> filesCompacting) {
ArrayList<HStoreFile> candidates = new ArrayList<>(storeFiles);
try {
return !selectMinorCompaction(candidates, false, true).getFiles().isEmpty();
} catch (Exception e) {
LOG.error("Can not check for compaction: ", e);
return false;
}
}
@Override
public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact)
throws IOException {
long mcTime = getNextMajorCompactTime(filesToCompact);
if (filesToCompact == null || mcTime == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("filesToCompact: " + filesToCompact + " mcTime: " + mcTime);
}
return false;
}
// TODO: Use better method for determining stamp of last major (HBASE-2990)
long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
long now = EnvironmentEdgeManager.currentTime();
if (lowTimestamp <= 0L || lowTimestamp >= (now - mcTime)) {
if (LOG.isDebugEnabled()) {
LOG.debug("lowTimestamp: " + lowTimestamp + " lowTimestamp: " + lowTimestamp + " now: " +
now + " mcTime: " + mcTime);
}
return false;
}
long cfTTL = this.storeConfigInfo.getStoreFileTtl();
HDFSBlocksDistribution hdfsBlocksDistribution = new HDFSBlocksDistribution();
List<Long> boundaries = getCompactBoundariesForMajor(filesToCompact, now);
boolean[] filesInWindow = new boolean[boundaries.size()];
for (HStoreFile file: filesToCompact) {
OptionalLong minTimestamp = file.getMinimumTimestamp();
long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong() : Long.MIN_VALUE;
if (cfTTL != Long.MAX_VALUE && oldest >= cfTTL) {
LOG.debug("Major compaction triggered on store " + this
+ "; for TTL maintenance");
return true;
}
if (!file.isMajorCompactionResult() || file.isBulkLoadResult()) {
LOG.debug("Major compaction triggered on store " + this
+ ", because there are new files and time since last major compaction "
+ (now - lowTimestamp) + "ms");
return true;
}
int lowerWindowIndex =
Collections.binarySearch(boundaries, minTimestamp.orElse(Long.MAX_VALUE));
int upperWindowIndex =
Collections.binarySearch(boundaries, file.getMaximumTimestamp().orElse(Long.MAX_VALUE));
// Handle boundary conditions and negative values of binarySearch
lowerWindowIndex = (lowerWindowIndex < 0) ? Math.abs(lowerWindowIndex + 2) : lowerWindowIndex;
upperWindowIndex = (upperWindowIndex < 0) ? Math.abs(upperWindowIndex + 2) : upperWindowIndex;
if (lowerWindowIndex != upperWindowIndex) {
LOG.debug("Major compaction triggered on store " + this + "; because file "
+ file.getPath() + " has data with timestamps cross window boundaries");
return true;
} else if (filesInWindow[upperWindowIndex]) {
LOG.debug("Major compaction triggered on store " + this +
"; because there are more than one file in some windows");
return true;
} else {
filesInWindow[upperWindowIndex] = true;
}
hdfsBlocksDistribution.add(file.getHDFSBlockDistribution());
}
float blockLocalityIndex = hdfsBlocksDistribution
.getBlockLocalityIndex(DNS.getHostname(comConf.conf, DNS.ServerType.REGIONSERVER));
if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
LOG.debug("Major compaction triggered on store " + this
+ "; to make hdfs blocks local, current blockLocalityIndex is "
+ blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");
return true;
}
LOG.debug("Skipping major compaction of " + this +
", because the files are already major compacted");
return false;
}
@Override
protected CompactionRequestImpl createCompactionRequest(ArrayList<HStoreFile> candidateSelection,
boolean tryingMajor, boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
CompactionRequestImpl result = tryingMajor ? selectMajorCompaction(candidateSelection)
: selectMinorCompaction(candidateSelection, mayUseOffPeak, mayBeStuck);
if (LOG.isDebugEnabled()) {
LOG.debug("Generated compaction request: " + result);
}
return result;
}
public CompactionRequestImpl selectMajorCompaction(ArrayList<HStoreFile> candidateSelection) {
long now = EnvironmentEdgeManager.currentTime();
List<Long> boundaries = getCompactBoundariesForMajor(candidateSelection, now);
Map<Long, String> boundariesPolicies = getBoundariesStoragePolicyForMajor(boundaries, now);
return new DateTieredCompactionRequest(candidateSelection,
boundaries, boundariesPolicies);
}
/**
* We receive store files sorted in ascending order by seqId then scan the list of files. If the
* current file has a maxTimestamp older than last known maximum, treat this file as it carries
* the last known maximum. This way both seqId and timestamp are in the same order. If files carry
* the same maxTimestamps, they are ordered by seqId. We then reverse the list so they are ordered
* by seqId and maxTimestamp in descending order and build the time windows. All the out-of-order
* data into the same compaction windows, guaranteeing contiguous compaction based on sequence id.
*/
public CompactionRequestImpl selectMinorCompaction(ArrayList<HStoreFile> candidateSelection,
boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
long now = EnvironmentEdgeManager.currentTime();
long oldestToCompact = getOldestToCompact(comConf.getDateTieredMaxStoreFileAgeMillis(), now);
List<Pair<HStoreFile, Long>> storefileMaxTimestampPairs =
Lists.newArrayListWithCapacity(candidateSelection.size());
long maxTimestampSeen = Long.MIN_VALUE;
for (HStoreFile storeFile : candidateSelection) {
// if there is out-of-order data,
// we put them in the same window as the last file in increasing order
maxTimestampSeen =
Math.max(maxTimestampSeen, storeFile.getMaximumTimestamp().orElse(Long.MIN_VALUE));
storefileMaxTimestampPairs.add(new Pair<>(storeFile, maxTimestampSeen));
}
Collections.reverse(storefileMaxTimestampPairs);
CompactionWindow window = getIncomingWindow(now);
int minThreshold = comConf.getDateTieredIncomingWindowMin();
PeekingIterator<Pair<HStoreFile, Long>> it =
Iterators.peekingIterator(storefileMaxTimestampPairs.iterator());
while (it.hasNext()) {
if (window.compareToTimestamp(oldestToCompact) < 0) {
break;
}
int compResult = window.compareToTimestamp(it.peek().getSecond());
if (compResult > 0) {
// If the file is too old for the window, switch to the next window
window = window.nextEarlierWindow();
minThreshold = comConf.getMinFilesToCompact();
} else {
// The file is within the target window
ArrayList<HStoreFile> fileList = Lists.newArrayList();
// Add all files in the same window. For incoming window
// we tolerate files with future data although it is sub-optimal
while (it.hasNext() && window.compareToTimestamp(it.peek().getSecond()) <= 0) {
fileList.add(it.next().getFirst());
}
if (fileList.size() >= minThreshold) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing files: " + fileList + " for window: " + window);
}
DateTieredCompactionRequest request = generateCompactionRequest(fileList, window,
mayUseOffPeak, mayBeStuck, minThreshold, now);
if (request != null) {
return request;
}
}
}
}
// A non-null file list is expected by HStore
return new CompactionRequestImpl(Collections.emptyList());
}
private DateTieredCompactionRequest generateCompactionRequest(ArrayList<HStoreFile> storeFiles,
CompactionWindow window, boolean mayUseOffPeak, boolean mayBeStuck, int minThreshold,
long now) throws IOException {
// The files has to be in ascending order for ratio-based compaction to work right
// and removeExcessFile to exclude youngest files.
Collections.reverse(storeFiles);
// Compact everything in the window if have more files than comConf.maxBlockingFiles
compactionPolicyPerWindow.setMinThreshold(minThreshold);
ArrayList<HStoreFile> storeFileSelection = mayBeStuck ? storeFiles
: compactionPolicyPerWindow.applyCompactionPolicy(storeFiles, mayUseOffPeak, false);
if (storeFileSelection != null && !storeFileSelection.isEmpty()) {
// If there is any file in the window excluded from compaction,
// only one file will be output from compaction.
boolean singleOutput = storeFiles.size() != storeFileSelection.size() ||
comConf.useDateTieredSingleOutputForMinorCompaction();
List<Long> boundaries = getCompactionBoundariesForMinor(window, singleOutput);
// we want to generate policy to boundaries for minor compaction
Map<Long, String> boundaryPolicyMap =
getBoundariesStoragePolicyForMinor(singleOutput, window, now);
DateTieredCompactionRequest result = new DateTieredCompactionRequest(storeFileSelection,
boundaries, boundaryPolicyMap);
return result;
}
return null;
}
/**
* Return a list of boundaries for multiple compaction output in ascending order.
*/
private List<Long> getCompactBoundariesForMajor(Collection<HStoreFile> filesToCompact, long now) {
long minTimestamp =
filesToCompact.stream().mapToLong(f -> f.getMinimumTimestamp().orElse(Long.MAX_VALUE)).min()
.orElse(Long.MAX_VALUE);
List<Long> boundaries = new ArrayList<>();
// Add startMillis of all windows between now and min timestamp
for (CompactionWindow window = getIncomingWindow(now); window
.compareToTimestamp(minTimestamp) > 0; window = window.nextEarlierWindow()) {
boundaries.add(window.startMillis());
}
boundaries.add(Long.MIN_VALUE);
Collections.reverse(boundaries);
return boundaries;
}
/**
* @return a list of boundaries for multiple compaction output from minTimestamp to maxTimestamp.
*/
private static List<Long> getCompactionBoundariesForMinor(CompactionWindow window,
boolean singleOutput) {
List<Long> boundaries = new ArrayList<>();
boundaries.add(Long.MIN_VALUE);
if (!singleOutput) {
boundaries.add(window.startMillis());
}
return boundaries;
}
private CompactionWindow getIncomingWindow(long now) {
return windowFactory.newIncomingWindow(now);
}
private static long getOldestToCompact(long maxAgeMillis, long now) {
try {
return LongMath.checkedSubtract(now, maxAgeMillis);
} catch (ArithmeticException ae) {
LOG.warn("Value for " + CompactionConfiguration.DATE_TIERED_MAX_AGE_MILLIS_KEY + ": "
+ maxAgeMillis + ". All the files will be eligible for minor compaction.");
return Long.MIN_VALUE;
}
}
private Map<Long, String> getBoundariesStoragePolicyForMinor(boolean singleOutput,
CompactionWindow window, long now) {
Map<Long, String> boundariesPolicy = new HashMap<>();
if (!comConf.isDateTieredStoragePolicyEnable()) {
return boundariesPolicy;
}
String windowStoragePolicy = getWindowStoragePolicy(now, window.startMillis());
if (singleOutput) {
boundariesPolicy.put(Long.MIN_VALUE, windowStoragePolicy);
} else {
boundariesPolicy.put(window.startMillis(), windowStoragePolicy);
}
return boundariesPolicy;
}
private Map<Long, String> getBoundariesStoragePolicyForMajor(List<Long> boundaries, long now) {
Map<Long, String> boundariesPolicy = new HashMap<>();
if (!comConf.isDateTieredStoragePolicyEnable()) {
return boundariesPolicy;
}
for (Long startTs : boundaries) {
boundariesPolicy.put(startTs, getWindowStoragePolicy(now, startTs));
}
return boundariesPolicy;
}
private String getWindowStoragePolicy(long now, long windowStartMillis) {
if (windowStartMillis >= (now - comConf.getHotWindowAgeMillis())) {
return comConf.getHotWindowStoragePolicy();
} else if (windowStartMillis >= (now - comConf.getWarmWindowAgeMillis())) {
return comConf.getWarmWindowStoragePolicy();
}
return comConf.getColdWindowStoragePolicy();
}
}