blob: 1c00e25be3c2b7ae23426a7366d13a85f9432c3e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mob;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.mob.compactions.MobCompactor;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactionRequest.CompactionPartitionId;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The mob utilities
*/
@InterfaceAudience.Private
public final class MobUtils {
private static final Logger LOG = LoggerFactory.getLogger(MobUtils.class);
private final static long WEEKLY_THRESHOLD_MULTIPLIER = 7;
private final static long MONTHLY_THRESHOLD_MULTIPLIER = 4 * WEEKLY_THRESHOLD_MULTIPLIER;
private static final ThreadLocal<SimpleDateFormat> LOCAL_FORMAT =
new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyyMMdd");
}
};
private static final byte[] REF_DELETE_MARKER_TAG_BYTES;
static {
List<Tag> tags = new ArrayList<>();
tags.add(MobConstants.MOB_REF_TAG);
REF_DELETE_MARKER_TAG_BYTES = TagUtil.fromList(tags);
}
/**
* Private constructor to keep this class from being instantiated.
*/
private MobUtils() {
}
/**
* Formats a date to a string.
* @param date The date.
* @return The string format of the date, it's yyyymmdd.
*/
public static String formatDate(Date date) {
return LOCAL_FORMAT.get().format(date);
}
/**
* Parses the string to a date.
* @param dateString The string format of a date, it's yyyymmdd.
* @return A date.
*/
public static Date parseDate(String dateString) throws ParseException {
return LOCAL_FORMAT.get().parse(dateString);
}
/**
* Get the first day of the input date's month
* @param calendar Calendar object
* @param date The date to find out its first day of that month
* @return The first day in the month
*/
public static Date getFirstDayOfMonth(final Calendar calendar, final Date date) {
calendar.setTime(date);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
calendar.set(Calendar.DAY_OF_MONTH, 1);
Date firstDayInMonth = calendar.getTime();
return firstDayInMonth;
}
/**
* Get the first day of the input date's week
* @param calendar Calendar object
* @param date The date to find out its first day of that week
* @return The first day in the week
*/
public static Date getFirstDayOfWeek(final Calendar calendar, final Date date) {
calendar.setTime(date);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
calendar.setFirstDayOfWeek(Calendar.MONDAY);
calendar.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY);
Date firstDayInWeek = calendar.getTime();
return firstDayInWeek;
}
/**
* Whether the current cell is a mob reference cell.
* @param cell The current cell.
* @return True if the cell has a mob reference tag, false if it doesn't.
*/
public static boolean isMobReferenceCell(Cell cell) {
if (cell.getTagsLength() > 0) {
Optional<Tag> tag = PrivateCellUtil.getTag(cell, TagType.MOB_REFERENCE_TAG_TYPE);
if (tag.isPresent()) {
return true;
}
}
return false;
}
/**
* Gets the table name tag.
* @param cell The current cell.
* @return The table name tag.
*/
public static Tag getTableNameTag(Cell cell) {
if (cell.getTagsLength() > 0) {
Optional<Tag> tag = PrivateCellUtil.getTag(cell, TagType.MOB_TABLE_NAME_TAG_TYPE);
if (tag.isPresent()) {
return tag.get();
}
}
return null;
}
/**
* Whether the tag list has a mob reference tag.
* @param tags The tag list.
* @return True if the list has a mob reference tag, false if it doesn't.
*/
public static boolean hasMobReferenceTag(List<Tag> tags) {
if (!tags.isEmpty()) {
for (Tag tag : tags) {
if (tag.getType() == TagType.MOB_REFERENCE_TAG_TYPE) {
return true;
}
}
}
return false;
}
/**
* Indicates whether it's a raw scan.
* The information is set in the attribute "hbase.mob.scan.raw" of scan.
* For a mob cell, in a normal scan the scanners retrieves the mob cell from the mob file.
* In a raw scan, the scanner directly returns cell in HBase without retrieve the one in
* the mob file.
* @param scan The current scan.
* @return True if it's a raw scan.
*/
public static boolean isRawMobScan(Scan scan) {
byte[] raw = scan.getAttribute(MobConstants.MOB_SCAN_RAW);
try {
return raw != null && Bytes.toBoolean(raw);
} catch (IllegalArgumentException e) {
return false;
}
}
/**
* Indicates whether it's a reference only scan.
* The information is set in the attribute "hbase.mob.scan.ref.only" of scan.
* If it's a ref only scan, only the cells with ref tag are returned.
* @param scan The current scan.
* @return True if it's a ref only scan.
*/
public static boolean isRefOnlyScan(Scan scan) {
byte[] refOnly = scan.getAttribute(MobConstants.MOB_SCAN_REF_ONLY);
try {
return refOnly != null && Bytes.toBoolean(refOnly);
} catch (IllegalArgumentException e) {
return false;
}
}
/**
* Indicates whether the scan contains the information of caching blocks.
* The information is set in the attribute "hbase.mob.cache.blocks" of scan.
* @param scan The current scan.
* @return True when the Scan attribute specifies to cache the MOB blocks.
*/
public static boolean isCacheMobBlocks(Scan scan) {
byte[] cache = scan.getAttribute(MobConstants.MOB_CACHE_BLOCKS);
try {
return cache != null && Bytes.toBoolean(cache);
} catch (IllegalArgumentException e) {
return false;
}
}
/**
* Sets the attribute of caching blocks in the scan.
*
* @param scan
* The current scan.
* @param cacheBlocks
* True, set the attribute of caching blocks into the scan, the scanner with this scan
* caches blocks.
* False, the scanner doesn't cache blocks for this scan.
*/
public static void setCacheMobBlocks(Scan scan, boolean cacheBlocks) {
scan.setAttribute(MobConstants.MOB_CACHE_BLOCKS, Bytes.toBytes(cacheBlocks));
}
/**
* Cleans the expired mob files.
* Cleans the files whose creation date is older than (current - columnFamily.ttl), and
* the minVersions of that column family is 0.
* @param fs The current file system.
* @param conf The current configuration.
* @param tableName The current table name.
* @param columnDescriptor The descriptor of the current column family.
* @param cacheConfig The cacheConfig that disables the block cache.
* @param current The current time.
*/
public static void cleanExpiredMobFiles(FileSystem fs, Configuration conf, TableName tableName,
ColumnFamilyDescriptor columnDescriptor, CacheConfig cacheConfig, long current)
throws IOException {
long timeToLive = columnDescriptor.getTimeToLive();
if (Integer.MAX_VALUE == timeToLive) {
// no need to clean, because the TTL is not set.
return;
}
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(current - timeToLive * 1000);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
Date expireDate = calendar.getTime();
LOG.info("MOB HFiles older than " + expireDate.toGMTString() + " will be deleted!");
FileStatus[] stats = null;
Path mobTableDir = FSUtils.getTableDir(getMobHome(conf), tableName);
Path path = getMobFamilyPath(conf, tableName, columnDescriptor.getNameAsString());
try {
stats = fs.listStatus(path);
} catch (FileNotFoundException e) {
LOG.warn("Failed to find the mob file " + path, e);
}
if (null == stats) {
// no file found
return;
}
List<HStoreFile> filesToClean = new ArrayList<>();
int deletedFileCount = 0;
for (FileStatus file : stats) {
String fileName = file.getPath().getName();
try {
if (HFileLink.isHFileLink(file.getPath())) {
HFileLink hfileLink = HFileLink.buildFromHFileLinkPattern(conf, file.getPath());
fileName = hfileLink.getOriginPath().getName();
}
Date fileDate = parseDate(MobFileName.getDateFromName(fileName));
if (LOG.isDebugEnabled()) {
LOG.debug("Checking file " + fileName);
}
if (fileDate.getTime() < expireDate.getTime()) {
if (LOG.isDebugEnabled()) {
LOG.debug(fileName + " is an expired file");
}
filesToClean
.add(new HStoreFile(fs, file.getPath(), conf, cacheConfig, BloomType.NONE, true));
}
} catch (Exception e) {
LOG.error("Cannot parse the fileName " + fileName, e);
}
}
if (!filesToClean.isEmpty()) {
try {
removeMobFiles(conf, fs, tableName, mobTableDir, columnDescriptor.getName(),
filesToClean);
deletedFileCount = filesToClean.size();
} catch (IOException e) {
LOG.error("Failed to delete the mob files " + filesToClean, e);
}
}
LOG.info(deletedFileCount + " expired mob files are deleted");
}
/**
* Gets the root dir of the mob files.
* It's {HBASE_DIR}/mobdir.
* @param conf The current configuration.
* @return the root dir of the mob file.
*/
public static Path getMobHome(Configuration conf) {
Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
return getMobHome(hbaseDir);
}
/**
* Gets the root dir of the mob files under the qualified HBase root dir.
* It's {rootDir}/mobdir.
* @param rootDir The qualified path of HBase root directory.
* @return The root dir of the mob file.
*/
public static Path getMobHome(Path rootDir) {
return new Path(rootDir, MobConstants.MOB_DIR_NAME);
}
/**
* Gets the qualified root dir of the mob files.
* @param conf The current configuration.
* @return The qualified root dir.
*/
public static Path getQualifiedMobRootDir(Configuration conf) throws IOException {
Path hbaseDir = new Path(conf.get(HConstants.HBASE_DIR));
Path mobRootDir = new Path(hbaseDir, MobConstants.MOB_DIR_NAME);
FileSystem fs = mobRootDir.getFileSystem(conf);
return mobRootDir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
}
/**
* Gets the table dir of the mob files under the qualified HBase root dir.
* It's {rootDir}/mobdir/data/${namespace}/${tableName}
* @param rootDir The qualified path of HBase root directory.
* @param tableName The name of table.
* @return The table dir of the mob file.
*/
public static Path getMobTableDir(Path rootDir, TableName tableName) {
return FSUtils.getTableDir(getMobHome(rootDir), tableName);
}
/**
* Gets the region dir of the mob files.
* It's {HBASE_DIR}/mobdir/data/{namespace}/{tableName}/{regionEncodedName}.
* @param conf The current configuration.
* @param tableName The current table name.
* @return The region dir of the mob files.
*/
public static Path getMobRegionPath(Configuration conf, TableName tableName) {
return getMobRegionPath(new Path(conf.get(HConstants.HBASE_DIR)), tableName);
}
/**
* Gets the region dir of the mob files under the specified root dir.
* It's {rootDir}/mobdir/data/{namespace}/{tableName}/{regionEncodedName}.
* @param rootDir The qualified path of HBase root directory.
* @param tableName The current table name.
* @return The region dir of the mob files.
*/
public static Path getMobRegionPath(Path rootDir, TableName tableName) {
Path tablePath = FSUtils.getTableDir(getMobHome(rootDir), tableName);
RegionInfo regionInfo = getMobRegionInfo(tableName);
return new Path(tablePath, regionInfo.getEncodedName());
}
/**
* Gets the family dir of the mob files.
* It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
* @param conf The current configuration.
* @param tableName The current table name.
* @param familyName The current family name.
* @return The family dir of the mob files.
*/
public static Path getMobFamilyPath(Configuration conf, TableName tableName, String familyName) {
return new Path(getMobRegionPath(conf, tableName), familyName);
}
/**
* Gets the family dir of the mob files.
* It's {HBASE_DIR}/mobdir/{namespace}/{tableName}/{regionEncodedName}/{columnFamilyName}.
* @param regionPath The path of mob region which is a dummy one.
* @param familyName The current family name.
* @return The family dir of the mob files.
*/
public static Path getMobFamilyPath(Path regionPath, String familyName) {
return new Path(regionPath, familyName);
}
/**
* Gets the RegionInfo of the mob files.
* This is a dummy region. The mob files are not saved in a region in HBase.
* This is only used in mob snapshot. It's internally used only.
* @param tableName
* @return A dummy mob region info.
*/
public static RegionInfo getMobRegionInfo(TableName tableName) {
return RegionInfoBuilder.newBuilder(tableName)
.setStartKey(MobConstants.MOB_REGION_NAME_BYTES)
.setEndKey(HConstants.EMPTY_END_ROW)
.setSplit(false)
.setRegionId(0)
.build();
}
/**
* Gets whether the current RegionInfo is a mob one.
* @param regionInfo The current RegionInfo.
* @return If true, the current RegionInfo is a mob one.
*/
public static boolean isMobRegionInfo(RegionInfo regionInfo) {
return regionInfo == null ? false : getMobRegionInfo(regionInfo.getTable()).getEncodedName()
.equals(regionInfo.getEncodedName());
}
/**
* Gets whether the current region name follows the pattern of a mob region name.
* @param tableName The current table name.
* @param regionName The current region name.
* @return True if the current region name follows the pattern of a mob region name.
*/
public static boolean isMobRegionName(TableName tableName, byte[] regionName) {
return Bytes.equals(regionName, getMobRegionInfo(tableName).getRegionName());
}
/**
* Gets the working directory of the mob compaction.
* @param root The root directory of the mob compaction.
* @param jobName The current job name.
* @return The directory of the mob compaction for the current job.
*/
public static Path getCompactionWorkingPath(Path root, String jobName) {
return new Path(root, jobName);
}
/**
* Archives the mob files.
* @param conf The current configuration.
* @param fs The current file system.
* @param tableName The table name.
* @param tableDir The table directory.
* @param family The name of the column family.
* @param storeFiles The files to be deleted.
*/
public static void removeMobFiles(Configuration conf, FileSystem fs, TableName tableName,
Path tableDir, byte[] family, Collection<HStoreFile> storeFiles) throws IOException {
HFileArchiver.archiveStoreFiles(conf, fs, getMobRegionInfo(tableName), tableDir, family,
storeFiles);
}
/**
* Creates a mob reference KeyValue.
* The value of the mob reference KeyValue is mobCellValueSize + mobFileName.
* @param cell The original Cell.
* @param fileName The mob file name where the mob reference KeyValue is written.
* @param tableNameTag The tag of the current table name. It's very important in
* cloning the snapshot.
* @return The mob reference KeyValue.
*/
public static Cell createMobRefCell(Cell cell, byte[] fileName, Tag tableNameTag) {
// Append the tags to the KeyValue.
// The key is same, the value is the filename of the mob file
List<Tag> tags = new ArrayList<>();
// Add the ref tag as the 1st one.
tags.add(MobConstants.MOB_REF_TAG);
// Add the tag of the source table name, this table is where this mob file is flushed
// from.
// It's very useful in cloning the snapshot. When reading from the cloning table, we need to
// find the original mob files by this table name. For details please see cloning
// snapshot for mob files.
tags.add(tableNameTag);
return createMobRefCell(cell, fileName, TagUtil.fromList(tags));
}
public static Cell createMobRefCell(Cell cell, byte[] fileName, byte[] refCellTags) {
byte[] refValue = Bytes.add(Bytes.toBytes(cell.getValueLength()), fileName);
return PrivateCellUtil.createCell(cell, refValue, TagUtil.concatTags(refCellTags, cell));
}
/**
* Creates a writer for the mob file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
* @param date The date string, its format is yyyymmmdd.
* @param basePath The basic path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param startKey The hex string of the start key.
* @param cacheConfig The current cache config.
* @param cryptoContext The encryption context.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file.
*/
public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, String startKey, CacheConfig cacheConfig,
Encryption.Context cryptoContext, boolean isCompaction)
throws IOException {
MobFileName mobFileName = MobFileName.create(startKey, date,
UUID.randomUUID().toString().replaceAll("-", ""));
return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
cacheConfig, cryptoContext, isCompaction);
}
/**
* Creates a writer for the ref file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
* @param basePath The basic path for a temp directory.
* @param maxKeyCount The key count.
* @param cacheConfig The current cache config.
* @param cryptoContext The encryption context.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file.
*/
public static StoreFileWriter createRefFileWriter(Configuration conf, FileSystem fs,
ColumnFamilyDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig,
Encryption.Context cryptoContext, boolean isCompaction)
throws IOException {
return createWriter(conf, fs, family,
new Path(basePath, UUID.randomUUID().toString().replaceAll("-", "")), maxKeyCount,
family.getCompactionCompressionType(), cacheConfig, cryptoContext,
HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf), family.getBlocksize(),
family.getBloomFilterType(), isCompaction);
}
/**
* Creates a writer for the mob file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
* @param date The date string, its format is yyyymmmdd.
* @param basePath The basic path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param startKey The start key.
* @param cacheConfig The current cache config.
* @param cryptoContext The encryption context.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file.
*/
public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
Encryption.Context cryptoContext, boolean isCompaction)
throws IOException {
MobFileName mobFileName = MobFileName.create(startKey, date,
UUID.randomUUID().toString().replaceAll("-", ""));
return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
cacheConfig, cryptoContext, isCompaction);
}
/**
* Creates a writer for the del file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
* @param date The date string, its format is yyyymmmdd.
* @param basePath The basic path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param startKey The start key.
* @param cacheConfig The current cache config.
* @param cryptoContext The encryption context.
* @return The writer for the del file.
*/
public static StoreFileWriter createDelFileWriter(Configuration conf, FileSystem fs,
ColumnFamilyDescriptor family, String date, Path basePath, long maxKeyCount,
Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig,
Encryption.Context cryptoContext)
throws IOException {
String suffix = UUID
.randomUUID().toString().replaceAll("-", "") + "_del";
MobFileName mobFileName = MobFileName.create(startKey, date, suffix);
return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression,
cacheConfig, cryptoContext, true);
}
/**
* Creates a writer for the mob file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
* @param mobFileName The mob file name.
* @param basePath The basic path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param cacheConfig The current cache config.
* @param cryptoContext The encryption context.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file.
*/
public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
ColumnFamilyDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount,
Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext,
boolean isCompaction)
throws IOException {
return createWriter(conf, fs, family,
new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, cacheConfig,
cryptoContext, HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf),
family.getBlocksize(), BloomType.NONE, isCompaction);
}
/**
* Creates a writer for the mob file in temp directory.
* @param conf The current configuration.
* @param fs The current file system.
* @param family The descriptor of the current column family.
* @param path The path for a temp directory.
* @param maxKeyCount The key count.
* @param compression The compression algorithm.
* @param cacheConfig The current cache config.
* @param cryptoContext The encryption context.
* @param checksumType The checksum type.
* @param bytesPerChecksum The bytes per checksum.
* @param blocksize The HFile block size.
* @param bloomType The bloom filter type.
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file.
*/
public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
ColumnFamilyDescriptor family, Path path, long maxKeyCount,
Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext,
ChecksumType checksumType, int bytesPerChecksum, int blocksize, BloomType bloomType,
boolean isCompaction)
throws IOException {
if (compression == null) {
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
}
final CacheConfig writerCacheConf;
if (isCompaction) {
writerCacheConf = new CacheConfig(cacheConfig);
writerCacheConf.setCacheDataOnWrite(false);
} else {
writerCacheConf = cacheConfig;
}
HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
.withIncludesMvcc(true).withIncludesTags(true)
.withCompressTags(family.isCompressTags())
.withChecksumType(checksumType)
.withBytesPerCheckSum(bytesPerChecksum)
.withBlockSize(blocksize)
.withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding())
.withEncryptionContext(cryptoContext)
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs)
.withFilePath(path).withBloomType(bloomType)
.withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
return w;
}
/**
* Commits the mob file.
* @param conf The current configuration.
* @param fs The current file system.
* @param sourceFile The path where the mob file is saved.
* @param targetPath The directory path where the source file is renamed to.
* @param cacheConfig The current cache config.
* @return The target file path the source file is renamed to.
*/
public static Path commitFile(Configuration conf, FileSystem fs, final Path sourceFile,
Path targetPath, CacheConfig cacheConfig) throws IOException {
if (sourceFile == null) {
return null;
}
Path dstPath = new Path(targetPath, sourceFile.getName());
validateMobFile(conf, fs, sourceFile, cacheConfig, true);
String msg = "Renaming flushed file from " + sourceFile + " to " + dstPath;
LOG.info(msg);
Path parent = dstPath.getParent();
if (!fs.exists(parent)) {
fs.mkdirs(parent);
}
if (!fs.rename(sourceFile, dstPath)) {
throw new IOException("Failed rename of " + sourceFile + " to " + dstPath);
}
return dstPath;
}
/**
* Validates a mob file by opening and closing it.
* @param conf The current configuration.
* @param fs The current file system.
* @param path The path where the mob file is saved.
* @param cacheConfig The current cache config.
*/
private static void validateMobFile(Configuration conf, FileSystem fs, Path path,
CacheConfig cacheConfig, boolean primaryReplica) throws IOException {
HStoreFile storeFile = null;
try {
storeFile = new HStoreFile(fs, path, conf, cacheConfig, BloomType.NONE, primaryReplica);
storeFile.initReader();
} catch (IOException e) {
LOG.error("Failed to open mob file[" + path + "], keep it in temp directory.", e);
throw e;
} finally {
if (storeFile != null) {
storeFile.closeStoreFile(false);
}
}
}
/**
* Indicates whether the current mob ref cell has a valid value.
* A mob ref cell has a mob reference tag.
* The value of a mob ref cell consists of two parts, real mob value length and mob file name.
* The real mob value length takes 4 bytes.
* The remaining part is the mob file name.
* @param cell The mob ref cell.
* @return True if the cell has a valid value.
*/
public static boolean hasValidMobRefCellValue(Cell cell) {
return cell.getValueLength() > Bytes.SIZEOF_INT;
}
/**
* Gets the mob value length from the mob ref cell.
* A mob ref cell has a mob reference tag.
* The value of a mob ref cell consists of two parts, real mob value length and mob file name.
* The real mob value length takes 4 bytes.
* The remaining part is the mob file name.
* @param cell The mob ref cell.
* @return The real mob value length.
*/
public static int getMobValueLength(Cell cell) {
return PrivateCellUtil.getValueAsInt(cell);
}
/**
* Gets the mob file name from the mob ref cell.
* A mob ref cell has a mob reference tag.
* The value of a mob ref cell consists of two parts, real mob value length and mob file name.
* The real mob value length takes 4 bytes.
* The remaining part is the mob file name.
* @param cell The mob ref cell.
* @return The mob file name.
*/
public static String getMobFileName(Cell cell) {
return Bytes.toString(cell.getValueArray(), cell.getValueOffset() + Bytes.SIZEOF_INT,
cell.getValueLength() - Bytes.SIZEOF_INT);
}
/**
* Gets the table name used in the table lock.
* The table lock name is a dummy one, it's not a table name. It's tableName + ".mobLock".
* @param tn The table name.
* @return The table name used in table lock.
*/
public static TableName getTableLockName(TableName tn) {
byte[] tableName = tn.getName();
return TableName.valueOf(Bytes.add(tableName, MobConstants.MOB_TABLE_LOCK_SUFFIX));
}
/**
* Performs the mob compaction.
* @param conf the Configuration
* @param fs the file system
* @param tableName the table the compact
* @param hcd the column descriptor
* @param pool the thread pool
* @param allFiles Whether add all mob files into the compaction.
*/
public static void doMobCompaction(Configuration conf, FileSystem fs, TableName tableName,
ColumnFamilyDescriptor hcd, ExecutorService pool, boolean allFiles,
LockManager.MasterLock lock)
throws IOException {
String className = conf.get(MobConstants.MOB_COMPACTOR_CLASS_KEY,
PartitionedMobCompactor.class.getName());
// instantiate the mob compactor.
MobCompactor compactor = null;
try {
compactor = ReflectionUtils.instantiateWithCustomCtor(className, new Class[] {
Configuration.class, FileSystem.class, TableName.class, ColumnFamilyDescriptor.class,
ExecutorService.class }, new Object[] { conf, fs, tableName, hcd, pool });
} catch (Exception e) {
throw new IOException("Unable to load configured mob file compactor '" + className + "'", e);
}
// compact only for mob-enabled column.
// obtain a write table lock before performing compaction to avoid race condition
// with major compaction in mob-enabled column.
try {
lock.acquire();
LOG.info("start MOB compaction of files for table='{}', column='{}', allFiles={}, " +
"compactor='{}'", tableName, hcd.getNameAsString(), allFiles, compactor.getClass());
compactor.compact(allFiles);
} catch (Exception e) {
LOG.error("Failed to compact the mob files for the column " + hcd.getNameAsString()
+ " in the table " + tableName.getNameAsString(), e);
} finally {
LOG.info("end MOB compaction of files for table='{}', column='{}', allFiles={}, " +
"compactor='{}'", tableName, hcd.getNameAsString(), allFiles, compactor.getClass());
lock.release();
}
}
/**
* Creates a thread pool.
* @param conf the Configuration
* @return A thread pool.
*/
public static ExecutorService createMobCompactorThreadPool(Configuration conf) {
int maxThreads = conf.getInt(MobConstants.MOB_COMPACTION_THREADS_MAX,
MobConstants.DEFAULT_MOB_COMPACTION_THREADS_MAX);
if (maxThreads == 0) {
maxThreads = 1;
}
final SynchronousQueue<Runnable> queue = new SynchronousQueue<>();
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS, queue,
Threads.newDaemonThreadFactory("MobCompactor"), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// waiting for a thread to pick up instead of throwing exceptions.
queue.put(r);
} catch (InterruptedException e) {
throw new RejectedExecutionException(e);
}
}
});
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
return pool;
}
/**
* Checks whether this table has mob-enabled columns.
* @param htd The current table descriptor.
* @return Whether this table has mob-enabled columns.
*/
public static boolean hasMobColumns(TableDescriptor htd) {
ColumnFamilyDescriptor[] hcds = htd.getColumnFamilies();
for (ColumnFamilyDescriptor hcd : hcds) {
if (hcd.isMobEnabled()) {
return true;
}
}
return false;
}
/**
* Indicates whether return null value when the mob file is missing or corrupt.
* The information is set in the attribute "empty.value.on.mobcell.miss" of scan.
* @param scan The current scan.
* @return True if the readEmptyValueOnMobCellMiss is enabled.
*/
public static boolean isReadEmptyValueOnMobCellMiss(Scan scan) {
byte[] readEmptyValueOnMobCellMiss =
scan.getAttribute(MobConstants.EMPTY_VALUE_ON_MOBCELL_MISS);
try {
return readEmptyValueOnMobCellMiss != null && Bytes.toBoolean(readEmptyValueOnMobCellMiss);
} catch (IllegalArgumentException e) {
return false;
}
}
/**
* Creates a mob ref delete marker.
* @param cell The current delete marker.
* @return A delete marker with the ref tag.
*/
public static Cell createMobRefDeleteMarker(Cell cell) {
return PrivateCellUtil.createCell(cell, TagUtil.concatTags(REF_DELETE_MARKER_TAG_BYTES, cell));
}
/**
* Checks if the mob file is expired.
* @param column The descriptor of the current column family.
* @param current The current time.
* @param fileDate The date string parsed from the mob file name.
* @return True if the mob file is expired.
*/
public static boolean isMobFileExpired(ColumnFamilyDescriptor column, long current,
String fileDate) {
if (column.getMinVersions() > 0) {
return false;
}
long timeToLive = column.getTimeToLive();
if (Integer.MAX_VALUE == timeToLive) {
return false;
}
Date expireDate = new Date(current - timeToLive * 1000);
expireDate = new Date(expireDate.getYear(), expireDate.getMonth(), expireDate.getDate());
try {
Date date = parseDate(fileDate);
if (date.getTime() < expireDate.getTime()) {
return true;
}
} catch (ParseException e) {
LOG.warn("Failed to parse the date " + fileDate, e);
return false;
}
return false;
}
/**
* fill out partition id based on compaction policy and date, threshold...
* @param id Partition id to be filled out
* @param firstDayOfCurrentMonth The first day in the current month
* @param firstDayOfCurrentWeek The first day in the current week
* @param dateStr Date string from the mob file
* @param policy Mob compaction policy
* @param calendar Calendar object
* @param threshold Mob compaciton threshold configured
* @return true if the file needs to be excluded from compaction
*/
public static boolean fillPartitionId(final CompactionPartitionId id,
final Date firstDayOfCurrentMonth, final Date firstDayOfCurrentWeek, final String dateStr,
final MobCompactPartitionPolicy policy, final Calendar calendar, final long threshold) {
boolean skipCompcation = false;
id.setThreshold(threshold);
if (threshold <= 0) {
id.setDate(dateStr);
return skipCompcation;
}
long finalThreshold;
Date date;
try {
date = MobUtils.parseDate(dateStr);
} catch (ParseException e) {
LOG.warn("Failed to parse date " + dateStr, e);
id.setDate(dateStr);
return true;
}
/* The algorithm works as follows:
* For monthly policy:
* 1). If the file's date is in past months, apply 4 * 7 * threshold
* 2). If the file's date is in past weeks, apply 7 * threshold
* 3). If the file's date is in current week, exclude it from the compaction
* For weekly policy:
* 1). If the file's date is in past weeks, apply 7 * threshold
* 2). If the file's date in currently, apply threshold
* For daily policy:
* 1). apply threshold
*/
if (policy == MobCompactPartitionPolicy.MONTHLY) {
if (date.before(firstDayOfCurrentMonth)) {
// Check overflow
if (threshold < (Long.MAX_VALUE / MONTHLY_THRESHOLD_MULTIPLIER)) {
finalThreshold = MONTHLY_THRESHOLD_MULTIPLIER * threshold;
} else {
finalThreshold = Long.MAX_VALUE;
}
id.setThreshold(finalThreshold);
// set to the date for the first day of that month
id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfMonth(calendar, date)));
return skipCompcation;
}
}
if ((policy == MobCompactPartitionPolicy.MONTHLY) ||
(policy == MobCompactPartitionPolicy.WEEKLY)) {
// Check if it needs to apply weekly multiplier
if (date.before(firstDayOfCurrentWeek)) {
// Check overflow
if (threshold < (Long.MAX_VALUE / WEEKLY_THRESHOLD_MULTIPLIER)) {
finalThreshold = WEEKLY_THRESHOLD_MULTIPLIER * threshold;
} else {
finalThreshold = Long.MAX_VALUE;
}
id.setThreshold(finalThreshold);
id.setDate(MobUtils.formatDate(MobUtils.getFirstDayOfWeek(calendar, date)));
return skipCompcation;
} else if (policy == MobCompactPartitionPolicy.MONTHLY) {
skipCompcation = true;
}
}
// Rest is daily
id.setDate(dateStr);
return skipCompcation;
}
}