blob: 02665a3454181c0ffb385f41c46d2312ce48a7b4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_PARAM_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.COMPACTION_EVENT_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.util.BloomContext;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.RowBloomContext;
import org.apache.hadoop.hbase.util.RowColBloomContext;
import org.apache.hadoop.hbase.util.RowPrefixFixedLengthBloomContext;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
* A StoreFile writer. Use this to read/write HBase Store Files. It is package
* local because it is an implementation detail of the HBase regionserver.
*/
@InterfaceAudience.Private
public class StoreFileWriter implements CellSink, ShipperListener {
private static final Logger LOG = LoggerFactory.getLogger(StoreFileWriter.class.getName());
private static final Pattern dash = Pattern.compile("-");
private final BloomFilterWriter generalBloomFilterWriter;
private final BloomFilterWriter deleteFamilyBloomFilterWriter;
private final BloomType bloomType;
private byte[] bloomParam = null;
private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
private long deleteFamilyCnt = 0;
private BloomContext bloomContext = null;
private BloomContext deleteFamilyBloomContext = null;
private final TimeRangeTracker timeRangeTracker;
private final Supplier<Collection<HStoreFile>> compactedFilesSupplier;
protected HFile.Writer writer;
/**
* Creates an HFile.Writer that also write helpful meta data.
*
* @param fs file system to write to
* @param path file name to create
* @param conf user configuration
* @param bloomType bloom filter setting
* @param maxKeys the expected maximum number of keys to be added. Was used
* for Bloom filter size in {@link HFile} format version 1.
* @param favoredNodes an array of favored nodes or possibly null
* @param fileContext The HFile context
* @param shouldDropCacheBehind Drop pages written to page cache after writing the store file.
* @param compactedFilesSupplier Returns the {@link HStore} compacted files which not archived
* @throws IOException problem writing to FS
*/
private StoreFileWriter(FileSystem fs, Path path, final Configuration conf, CacheConfig cacheConf,
BloomType bloomType, long maxKeys, InetSocketAddress[] favoredNodes, HFileContext fileContext,
boolean shouldDropCacheBehind, Supplier<Collection<HStoreFile>> compactedFilesSupplier)
throws IOException {
this.compactedFilesSupplier = compactedFilesSupplier;
this.timeRangeTracker = TimeRangeTracker.create(TimeRangeTracker.Type.NON_SYNC);
// TODO : Change all writers to be specifically created for compaction context
writer = HFile.getWriterFactory(conf, cacheConf)
.withPath(fs, path)
.withFavoredNodes(favoredNodes)
.withFileContext(fileContext)
.withShouldDropCacheBehind(shouldDropCacheBehind)
.create();
generalBloomFilterWriter = BloomFilterFactory.createGeneralBloomAtWrite(
conf, cacheConf, bloomType,
(int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
if (generalBloomFilterWriter != null) {
this.bloomType = bloomType;
this.bloomParam = BloomFilterUtil.getBloomFilterParam(bloomType, conf);
if (LOG.isTraceEnabled()) {
LOG.trace("Bloom filter type for " + path + ": " + this.bloomType + ", param: "
+ (bloomType == BloomType.ROWPREFIX_FIXED_LENGTH?
Bytes.toInt(bloomParam):Bytes.toStringBinary(bloomParam))
+ ", " + generalBloomFilterWriter.getClass().getSimpleName());
}
// init bloom context
switch (bloomType) {
case ROW:
bloomContext =
new RowBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
break;
case ROWCOL:
bloomContext =
new RowColBloomContext(generalBloomFilterWriter, fileContext.getCellComparator());
break;
case ROWPREFIX_FIXED_LENGTH:
bloomContext = new RowPrefixFixedLengthBloomContext(generalBloomFilterWriter,
fileContext.getCellComparator(), Bytes.toInt(bloomParam));
break;
default:
throw new IOException(
"Invalid Bloom filter type: " + bloomType + " (ROW or ROWCOL or ROWPREFIX expected)");
}
} else {
// Not using Bloom filters.
this.bloomType = BloomType.NONE;
}
// initialize delete family Bloom filter when there is NO RowCol Bloom
// filter
if (this.bloomType != BloomType.ROWCOL) {
this.deleteFamilyBloomFilterWriter = BloomFilterFactory
.createDeleteBloomAtWrite(conf, cacheConf,
(int) Math.min(maxKeys, Integer.MAX_VALUE), writer);
deleteFamilyBloomContext =
new RowBloomContext(deleteFamilyBloomFilterWriter, fileContext.getCellComparator());
} else {
deleteFamilyBloomFilterWriter = null;
}
if (deleteFamilyBloomFilterWriter != null && LOG.isTraceEnabled()) {
LOG.trace("Delete Family Bloom filter type for " + path + ": " +
deleteFamilyBloomFilterWriter.getClass().getSimpleName());
}
}
/**
* Writes meta data.
* Call before {@link #close()} since its written as meta data to this file.
* @param maxSequenceId Maximum sequence id.
* @param majorCompaction True if this file is product of a major compaction
* @throws IOException problem writing to FS
*/
public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
throws IOException {
appendMetadata(maxSequenceId, majorCompaction, Collections.emptySet());
}
/**
* Writes meta data.
* Call before {@link #close()} since its written as meta data to this file.
* @param maxSequenceId Maximum sequence id.
* @param majorCompaction True if this file is product of a major compaction
* @param storeFiles The compacted store files to generate this new file
* @throws IOException problem writing to FS
*/
public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
final Collection<HStoreFile> storeFiles) throws IOException {
writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
writer.appendFileInfo(COMPACTION_EVENT_KEY, toCompactionEventTrackerBytes(storeFiles));
appendTrackedTimestampsToMetadata();
}
/**
* Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's file info. The compacted
* store files's name is needed. But if the compacted store file is a result of compaction, it's
* compacted files which still not archived is needed, too. And don't need to add compacted files
* recursively. If file A, B, C compacted to new file D, and file D compacted to new file E, will
* write A, B, C, D to file E's compacted files. So if file E compacted to new file F, will add E
* to F's compacted files first, then add E's compacted files: A, B, C, D to it. And no need to
* add D's compacted file, as D's compacted files has been in E's compacted files, too.
* See HBASE-20724 for more details.
*
* @param storeFiles The compacted store files to generate this new file
* @return bytes of CompactionEventTracker
*/
private byte[] toCompactionEventTrackerBytes(Collection<HStoreFile> storeFiles) {
Set<String> notArchivedCompactedStoreFiles =
this.compactedFilesSupplier.get().stream().map(sf -> sf.getPath().getName())
.collect(Collectors.toSet());
Set<String> compactedStoreFiles = new HashSet<>();
for (HStoreFile storeFile : storeFiles) {
compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName());
for (String csf : storeFile.getCompactedStoreFiles()) {
if (notArchivedCompactedStoreFiles.contains(csf)) {
compactedStoreFiles.add(csf);
}
}
}
return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles);
}
/**
* Writes meta data.
* Call before {@link #close()} since its written as meta data to this file.
* @param maxSequenceId Maximum sequence id.
* @param majorCompaction True if this file is product of a major compaction
* @param mobCellsCount The number of mob cells.
* @throws IOException problem writing to FS
*/
public void appendMetadata(final long maxSequenceId, final boolean majorCompaction,
final long mobCellsCount) throws IOException {
writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount));
appendTrackedTimestampsToMetadata();
}
/**
* Add TimestampRange and earliest put timestamp to Metadata
*/
public void appendTrackedTimestampsToMetadata() throws IOException {
// TODO: The StoreFileReader always converts the byte[] to TimeRange
// via TimeRangeTracker, so we should write the serialization data of TimeRange directly.
appendFileInfo(TIMERANGE_KEY, TimeRangeTracker.toByteArray(timeRangeTracker));
appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
}
/**
* Record the earlest Put timestamp.
*
* If the timeRangeTracker is not set,
* update TimeRangeTracker to include the timestamp of this key
*/
public void trackTimestamps(final Cell cell) {
if (KeyValue.Type.Put.getCode() == cell.getTypeByte()) {
earliestPutTs = Math.min(earliestPutTs, cell.getTimestamp());
}
timeRangeTracker.includeTimestamp(cell);
}
private void appendGeneralBloomfilter(final Cell cell) throws IOException {
if (this.generalBloomFilterWriter != null) {
/*
* http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
* Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + Timestamp
*
* 3 Types of Filtering:
* 1. Row = Row
* 2. RowCol = Row + Qualifier
* 3. RowPrefixFixedLength = Fixed Length Row Prefix
*/
bloomContext.writeBloom(cell);
}
}
private void appendDeleteFamilyBloomFilter(final Cell cell)
throws IOException {
if (!PrivateCellUtil.isDeleteFamily(cell) && !PrivateCellUtil.isDeleteFamilyVersion(cell)) {
return;
}
// increase the number of delete family in the store file
deleteFamilyCnt++;
if (this.deleteFamilyBloomFilterWriter != null) {
deleteFamilyBloomContext.writeBloom(cell);
}
}
@Override
public void append(final Cell cell) throws IOException {
appendGeneralBloomfilter(cell);
appendDeleteFamilyBloomFilter(cell);
writer.append(cell);
trackTimestamps(cell);
}
@Override
public void beforeShipped() throws IOException {
// For now these writer will always be of type ShipperListener true.
// TODO : Change all writers to be specifically created for compaction context
writer.beforeShipped();
if (generalBloomFilterWriter != null) {
generalBloomFilterWriter.beforeShipped();
}
if (deleteFamilyBloomFilterWriter != null) {
deleteFamilyBloomFilterWriter.beforeShipped();
}
}
public Path getPath() {
return this.writer.getPath();
}
public boolean hasGeneralBloom() {
return this.generalBloomFilterWriter != null;
}
/**
* For unit testing only.
*
* @return the Bloom filter used by this writer.
*/
BloomFilterWriter getGeneralBloomWriter() {
return generalBloomFilterWriter;
}
private boolean closeBloomFilter(BloomFilterWriter bfw) throws IOException {
boolean haveBloom = (bfw != null && bfw.getKeyCount() > 0);
if (haveBloom) {
bfw.compactBloom();
}
return haveBloom;
}
private boolean closeGeneralBloomFilter() throws IOException {
boolean hasGeneralBloom = closeBloomFilter(generalBloomFilterWriter);
// add the general Bloom filter writer and append file info
if (hasGeneralBloom) {
writer.addGeneralBloomFilter(generalBloomFilterWriter);
writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
if (bloomParam != null) {
writer.appendFileInfo(BLOOM_FILTER_PARAM_KEY, bloomParam);
}
bloomContext.addLastBloomKey(writer);
}
return hasGeneralBloom;
}
private boolean closeDeleteFamilyBloomFilter() throws IOException {
boolean hasDeleteFamilyBloom = closeBloomFilter(deleteFamilyBloomFilterWriter);
// add the delete family Bloom filter writer
if (hasDeleteFamilyBloom) {
writer.addDeleteFamilyBloomFilter(deleteFamilyBloomFilterWriter);
}
// append file info about the number of delete family kvs
// even if there is no delete family Bloom.
writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt));
return hasDeleteFamilyBloom;
}
public void close() throws IOException {
boolean hasGeneralBloom = this.closeGeneralBloomFilter();
boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter();
writer.close();
// Log final Bloom filter statistics. This needs to be done after close()
// because compound Bloom filters might be finalized as part of closing.
if (LOG.isTraceEnabled()) {
LOG.trace((hasGeneralBloom ? "" : "NO ") + "General Bloom and " +
(hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile " +
getPath());
}
}
public void appendFileInfo(byte[] key, byte[] value) throws IOException {
writer.appendFileInfo(key, value);
}
/** For use in testing.
*/
HFile.Writer getHFileWriter() {
return writer;
}
/**
* @param dir Directory to create file in.
* @return random filename inside passed <code>dir</code>
*/
static Path getUniqueFile(final FileSystem fs, final Path dir) throws IOException {
if (!fs.getFileStatus(dir).isDirectory()) {
throw new IOException("Expecting " + dir.toString() + " to be a directory");
}
return new Path(dir, dash.matcher(UUID.randomUUID().toString()).replaceAll(""));
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ICAST_INTEGER_MULTIPLY_CAST_TO_LONG",
justification="Will not overflow")
public static class Builder {
private final Configuration conf;
private final CacheConfig cacheConf;
private final FileSystem fs;
private BloomType bloomType = BloomType.NONE;
private long maxKeyCount = 0;
private Path dir;
private Path filePath;
private InetSocketAddress[] favoredNodes;
private HFileContext fileContext;
private boolean shouldDropCacheBehind;
private Supplier<Collection<HStoreFile>> compactedFilesSupplier = () -> Collections.emptySet();
public Builder(Configuration conf, CacheConfig cacheConf,
FileSystem fs) {
this.conf = conf;
this.cacheConf = cacheConf;
this.fs = fs;
}
/**
* Creates Builder with cache configuration disabled
*/
public Builder(Configuration conf, FileSystem fs) {
this.conf = conf;
this.cacheConf = CacheConfig.DISABLED;
this.fs = fs;
}
/**
* Use either this method or {@link #withFilePath}, but not both.
* @param dir Path to column family directory. The directory is created if
* does not exist. The file is given a unique name within this
* directory.
* @return this (for chained invocation)
*/
public Builder withOutputDir(Path dir) {
Preconditions.checkNotNull(dir);
this.dir = dir;
return this;
}
/**
* Use either this method or {@link #withOutputDir}, but not both.
* @param filePath the StoreFile path to write
* @return this (for chained invocation)
*/
public Builder withFilePath(Path filePath) {
Preconditions.checkNotNull(filePath);
this.filePath = filePath;
return this;
}
/**
* @param favoredNodes an array of favored nodes or possibly null
* @return this (for chained invocation)
*/
public Builder withFavoredNodes(InetSocketAddress[] favoredNodes) {
this.favoredNodes = favoredNodes;
return this;
}
public Builder withBloomType(BloomType bloomType) {
Preconditions.checkNotNull(bloomType);
this.bloomType = bloomType;
return this;
}
/**
* @param maxKeyCount estimated maximum number of keys we expect to add
* @return this (for chained invocation)
*/
public Builder withMaxKeyCount(long maxKeyCount) {
this.maxKeyCount = maxKeyCount;
return this;
}
public Builder withFileContext(HFileContext fileContext) {
this.fileContext = fileContext;
return this;
}
public Builder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
this.shouldDropCacheBehind = shouldDropCacheBehind;
return this;
}
public Builder withCompactedFilesSupplier(
Supplier<Collection<HStoreFile>> compactedFilesSupplier) {
this.compactedFilesSupplier = compactedFilesSupplier;
return this;
}
/**
* Create a store file writer. Client is responsible for closing file when
* done. If metadata, add BEFORE closing using
* {@link StoreFileWriter#appendMetadata}.
*/
public StoreFileWriter build() throws IOException {
if ((dir == null ? 0 : 1) + (filePath == null ? 0 : 1) != 1) {
throw new IllegalArgumentException("Either specify parent directory " +
"or file path");
}
if (dir == null) {
dir = filePath.getParent();
}
if (!fs.exists(dir)) {
// Handle permission for non-HDFS filesystem properly
// See HBASE-17710
HRegionFileSystem.mkdirs(fs, conf, dir);
}
// set block storage policy for temp path
String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY);
if (null == policyName) {
policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY);
}
FSUtils.setStoragePolicy(this.fs, dir, policyName);
if (filePath == null) {
filePath = getUniqueFile(fs, dir);
if (!BloomFilterFactory.isGeneralBloomEnabled(conf)) {
bloomType = BloomType.NONE;
}
}
return new StoreFileWriter(fs, filePath, conf, cacheConf, bloomType, maxKeyCount,
favoredNodes, fileContext, shouldDropCacheBehind, compactedFilesSupplier);
}
}
}