blob: 7a7468973af90ef94daa1b3d1e84f974988c57ec [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.ReaderContext;
import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
* A Store data file. Stores usually have one or more of these files. They
* are produced by flushing the memstore to disk. To
* create, instantiate a writer using {@link StoreFileWriter.Builder}
* and append data. Be sure to add any metadata before calling close on the
* Writer (Use the appendMetadata convenience methods). On close, a StoreFile
* is sitting in the Filesystem. To refer to it, create a StoreFile instance
* passing filesystem and path. To read, call {@link #initReader()}
* <p>StoreFiles may also reference store files in another Store.
*
* The reason for this weird pattern where you use a different instance for the
* writer and a reader is that we write once but read a lot more.
*/
@InterfaceAudience.Private
public class HStoreFile implements StoreFile {
private static final Logger LOG = LoggerFactory.getLogger(HStoreFile.class.getName());
// Keys for fileinfo values in HFile
/** Max Sequence ID in FileInfo */
public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
/** Major compaction flag in FileInfo */
public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
/** Minor compaction flag in FileInfo */
public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY =
Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION");
/**
* Key for compaction event which contains the compacted storefiles in FileInfo
*/
public static final byte[] COMPACTION_EVENT_KEY = Bytes.toBytes("COMPACTION_EVENT_KEY");
/** Bloom filter Type in FileInfo */
public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
/** Bloom filter param in FileInfo */
public static final byte[] BLOOM_FILTER_PARAM_KEY = Bytes.toBytes("BLOOM_FILTER_PARAM");
/** Delete Family Count in FileInfo */
public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT");
/** Last Bloom filter key in FileInfo */
public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY");
/** Key for Timerange information in metadata */
public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
/** Key for timestamp of earliest-put in metadata */
public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
/** Key for the number of mob cells in metadata */
public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT");
/** Null data */
public static final byte[] NULL_VALUE = new byte[] {0};
/** Key for the list of MOB file references */
public static final byte[] MOB_FILE_REFS = Bytes.toBytes("MOB_FILE_REFS");
/** Meta key set when store file is a result of a bulk load */
public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK");
public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP");
/**
* Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets
* the cell seqId with the latest one, if this metadata is set as true, the reset is skipped.
*/
public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
private final StoreFileInfo fileInfo;
// StoreFile.Reader
private volatile StoreFileReader initialReader;
// Block cache configuration and reference.
private final CacheConfig cacheConf;
// Indicates if the file got compacted
private volatile boolean compactedAway = false;
// Keys for metadata stored in backing HFile.
// Set when we obtain a Reader.
private long sequenceid = -1;
// max of the MemstoreTS in the KV's in this store
// Set when we obtain a Reader.
private long maxMemstoreTS = -1;
// firstKey, lastkey and cellComparator will be set when openReader.
private Optional<Cell> firstKey;
private Optional<Cell> lastKey;
private CellComparator comparator;
public CacheConfig getCacheConf() {
return this.cacheConf;
}
@Override
public Optional<Cell> getFirstKey() {
return firstKey;
}
@Override
public Optional<Cell> getLastKey() {
return lastKey;
}
@Override
public CellComparator getComparator() {
return comparator;
}
@Override
public long getMaxMemStoreTS() {
return maxMemstoreTS;
}
// If true, this file was product of a major compaction. Its then set
// whenever you get a Reader.
private AtomicBoolean majorCompaction = null;
// If true, this file should not be included in minor compactions.
// It's set whenever you get a Reader.
private boolean excludeFromMinorCompaction = false;
// This file was product of these compacted store files
private final Set<String> compactedStoreFiles = new HashSet<>();
/**
* Map of the metadata entries in the corresponding HFile. Populated when Reader is opened
* after which it is not modified again.
*/
private Map<byte[], byte[]> metadataMap;
/**
* Bloom filter type specified in column family configuration. Does not
* necessarily correspond to the Bloom filter type present in the HFile.
*/
private final BloomType cfBloomType;
/**
* Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
* depending on the underlying files (10-20MB?).
* @param fs The current file system to use.
* @param p The path of the file.
* @param conf The current configuration.
* @param cacheConf The cache configuration and block cache reference.
* @param cfBloomType The bloom type to use for this store file as specified by column family
* configuration. This may or may not be the same as the Bloom filter type actually
* present in the HFile, because column family configuration might change. If this is
* {@link BloomType#NONE}, the existing Bloom filter is ignored.
* @param primaryReplica true if this is a store file for primary replica, otherwise false.
* @throws IOException
*/
public HStoreFile(FileSystem fs, Path p, Configuration conf, CacheConfig cacheConf,
BloomType cfBloomType, boolean primaryReplica) throws IOException {
this(new StoreFileInfo(conf, fs, p, primaryReplica), cfBloomType, cacheConf);
}
/**
* Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram
* depending on the underlying files (10-20MB?).
* @param fileInfo The store file information.
* @param cfBloomType The bloom type to use for this store file as specified by column
* family configuration. This may or may not be the same as the Bloom filter type
* actually present in the HFile, because column family configuration might change. If
* this is {@link BloomType#NONE}, the existing Bloom filter is ignored.
* @param cacheConf The cache configuration and block cache reference.
*/
public HStoreFile(StoreFileInfo fileInfo, BloomType cfBloomType, CacheConfig cacheConf) {
this.fileInfo = fileInfo;
this.cacheConf = cacheConf;
if (BloomFilterFactory.isGeneralBloomEnabled(fileInfo.getConf())) {
this.cfBloomType = cfBloomType;
} else {
LOG.info("Ignoring bloom filter check for file " + this.getPath() + ": " + "cfBloomType=" +
cfBloomType + " (disabled in config)");
this.cfBloomType = BloomType.NONE;
}
}
/**
* @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a
* reference.
*/
public StoreFileInfo getFileInfo() {
return this.fileInfo;
}
@Override
public Path getPath() {
return this.fileInfo.getPath();
}
@Override
public Path getEncodedPath() {
try {
return new Path(URLEncoder.encode(fileInfo.getPath().toString(), HConstants.UTF8_ENCODING));
} catch (UnsupportedEncodingException ex) {
throw new RuntimeException("URLEncoder doesn't support UTF-8", ex);
}
}
@Override
public Path getQualifiedPath() {
FileSystem fs = fileInfo.getFileSystem();
return this.fileInfo.getPath().makeQualified(fs.getUri(), fs.getWorkingDirectory());
}
@Override
public boolean isReference() {
return this.fileInfo.isReference();
}
@Override
public boolean isHFile() {
return StoreFileInfo.isHFile(this.fileInfo.getPath());
}
@Override
public boolean isMajorCompactionResult() {
Preconditions.checkState(this.majorCompaction != null, "Major compation has not been set yet");
return this.majorCompaction.get();
}
@Override
public boolean excludeFromMinorCompaction() {
return this.excludeFromMinorCompaction;
}
@Override
public long getMaxSequenceId() {
return this.sequenceid;
}
@Override
public long getModificationTimestamp() throws IOException {
return fileInfo.getModificationTime();
}
/**
* @param key to look up
* @return value associated with the metadata key
*/
public byte[] getMetadataValue(byte[] key) {
return metadataMap.get(key);
}
@Override
public boolean isBulkLoadResult() {
boolean bulkLoadedHFile = false;
String fileName = this.getPath().getName();
int startPos = fileName.indexOf("SeqId_");
if (startPos != -1) {
bulkLoadedHFile = true;
}
return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY));
}
public boolean isCompactedAway() {
return compactedAway;
}
public int getRefCount() {
return fileInfo.refCount.get();
}
/**
* @return true if the file is still used in reads
*/
public boolean isReferencedInReads() {
int rc = fileInfo.refCount.get();
assert rc >= 0; // we should not go negative.
return rc > 0;
}
@Override
public OptionalLong getBulkLoadTimestamp() {
byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
return bulkLoadTimestamp == null ? OptionalLong.empty()
: OptionalLong.of(Bytes.toLong(bulkLoadTimestamp));
}
/**
* @return the cached value of HDFS blocks distribution. The cached value is calculated when store
* file is opened.
*/
public HDFSBlocksDistribution getHDFSBlockDistribution() {
return this.fileInfo.getHDFSBlockDistribution();
}
/**
* Opens reader on this store file. Called by Constructor.
* @see #closeStoreFile(boolean)
*/
private void open() throws IOException {
fileInfo.initHDFSBlocksDistribution();
long readahead = fileInfo.isNoReadahead() ? 0L : -1L;
ReaderContext context = fileInfo.createReaderContext(false, readahead, ReaderType.PREAD);
fileInfo.initHFileInfo(context);
StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf);
if (reader == null) {
reader = fileInfo.createReader(context, cacheConf);
fileInfo.getHFileInfo().initMetaAndIndex(reader.getHFileReader());
}
this.initialReader = fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
// Load up indices and fileinfo. This also loads Bloom filter type.
metadataMap = Collections.unmodifiableMap(initialReader.loadFileInfo());
// Read in our metadata.
byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
if (b != null) {
// By convention, if halfhfile, top half has a sequence number > bottom
// half. Thats why we add one in below. Its done for case the two halves
// are ever merged back together --rare. Without it, on open of store,
// since store files are distinguished by sequence id, the one half would
// subsume the other.
this.sequenceid = Bytes.toLong(b);
if (fileInfo.isTopReference()) {
this.sequenceid += 1;
}
}
if (isBulkLoadResult()){
// generate the sequenceId from the fileName
// fileName is of the form <randomName>_SeqId_<id-when-loaded>_
String fileName = this.getPath().getName();
// Use lastIndexOf() to get the last, most recent bulk load seqId.
int startPos = fileName.lastIndexOf("SeqId_");
if (startPos != -1) {
this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
fileName.indexOf('_', startPos + 6)));
// Handle reference files as done above.
if (fileInfo.isTopReference()) {
this.sequenceid += 1;
}
}
// SKIP_RESET_SEQ_ID only works in bulk loaded file.
// In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
// loaded to hbase, these cells have the same seqIds with the old ones. We do not want
// to reset new seqIds for them since this might make a mess of the visibility of cells that
// have the same row key but different seqIds.
boolean skipResetSeqId = isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID));
if (skipResetSeqId) {
// increase the seqId when it is a bulk loaded file from mob compaction.
this.sequenceid += 1;
}
initialReader.setSkipResetSeqId(skipResetSeqId);
initialReader.setBulkLoaded(true);
}
initialReader.setSequenceID(this.sequenceid);
b = metadataMap.get(HFile.Writer.MAX_MEMSTORE_TS_KEY);
if (b != null) {
this.maxMemstoreTS = Bytes.toLong(b);
}
b = metadataMap.get(MAJOR_COMPACTION_KEY);
if (b != null) {
boolean mc = Bytes.toBoolean(b);
if (this.majorCompaction == null) {
this.majorCompaction = new AtomicBoolean(mc);
} else {
this.majorCompaction.set(mc);
}
} else {
// Presume it is not major compacted if it doesn't explicity say so
// HFileOutputFormat explicitly sets the major compacted key.
this.majorCompaction = new AtomicBoolean(false);
}
b = metadataMap.get(EXCLUDE_FROM_MINOR_COMPACTION_KEY);
this.excludeFromMinorCompaction = (b != null && Bytes.toBoolean(b));
BloomType hfileBloomType = initialReader.getBloomFilterType();
if (cfBloomType != BloomType.NONE) {
initialReader.loadBloomfilter(BlockType.GENERAL_BLOOM_META);
if (hfileBloomType != cfBloomType) {
LOG.debug("HFile Bloom filter type for "
+ initialReader.getHFileReader().getName() + ": " + hfileBloomType
+ ", but " + cfBloomType + " specified in column family "
+ "configuration");
}
} else if (hfileBloomType != BloomType.NONE) {
LOG.info("Bloom filter turned off by CF config for "
+ initialReader.getHFileReader().getName());
}
// load delete family bloom filter
initialReader.loadBloomfilter(BlockType.DELETE_FAMILY_BLOOM_META);
try {
byte[] data = metadataMap.get(TIMERANGE_KEY);
initialReader.timeRange = data == null ? null :
TimeRangeTracker.parseFrom(data).toTimeRange();
} catch (IllegalArgumentException e) {
LOG.error("Error reading timestamp range data from meta -- " +
"proceeding without", e);
this.initialReader.timeRange = null;
}
try {
byte[] data = metadataMap.get(COMPACTION_EVENT_KEY);
this.compactedStoreFiles.addAll(ProtobufUtil.toCompactedStoreFiles(data));
} catch (IOException e) {
LOG.error("Error reading compacted storefiles from meta data", e);
}
// initialize so we can reuse them after reader closed.
firstKey = initialReader.getFirstKey();
lastKey = initialReader.getLastKey();
comparator = initialReader.getComparator();
}
/**
* Initialize the reader used for pread.
*/
public void initReader() throws IOException {
if (initialReader == null) {
synchronized (this) {
if (initialReader == null) {
try {
open();
} catch (Exception e) {
try {
boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
this.closeStoreFile(evictOnClose);
} catch (IOException ee) {
LOG.warn("failed to close reader", ee);
}
throw e;
}
}
}
}
}
private StoreFileReader createStreamReader(boolean canUseDropBehind) throws IOException {
initReader();
final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
ReaderContext context = fileInfo.createReaderContext(doDropBehind, -1, ReaderType.STREAM);
StoreFileReader reader = fileInfo.preStoreFileReaderOpen(context, cacheConf);
if (reader == null) {
reader = fileInfo.createReader(context, cacheConf);
// steam reader need copy stuffs from pread reader
reader.copyFields(initialReader);
}
return fileInfo.postStoreFileReaderOpen(context, cacheConf, reader);
}
/**
* Get a scanner which uses pread.
* <p>
* Must be called after initReader.
*/
public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
boolean canOptimizeForNonNullColumn) {
return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
canOptimizeForNonNullColumn);
}
/**
* Get a scanner which uses streaming read.
* <p>
* Must be called after initReader.
*/
public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
throws IOException {
return createStreamReader(canUseDropBehind)
.getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder,
canOptimizeForNonNullColumn);
}
/**
* @return Current reader. Must call initReader first else returns null.
* @see #initReader()
*/
public StoreFileReader getReader() {
return this.initialReader;
}
/**
* @param evictOnClose whether to evict blocks belonging to this file
* @throws IOException
*/
public synchronized void closeStoreFile(boolean evictOnClose) throws IOException {
if (this.initialReader != null) {
this.initialReader.close(evictOnClose);
this.initialReader = null;
}
}
/**
* Delete this file
* @throws IOException
*/
public void deleteStoreFile() throws IOException {
boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true;
closeStoreFile(evictOnClose);
this.fileInfo.getFileSystem().delete(getPath(), true);
}
public void markCompactedAway() {
this.compactedAway = true;
}
@Override
public String toString() {
return this.fileInfo.toString();
}
@Override
public String toStringDetailed() {
StringBuilder sb = new StringBuilder();
sb.append(this.getPath().toString());
sb.append(", isReference=").append(isReference());
sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
if (isBulkLoadResult()) {
sb.append(", bulkLoadTS=");
OptionalLong bulkLoadTS = getBulkLoadTimestamp();
if (bulkLoadTS.isPresent()) {
sb.append(bulkLoadTS.getAsLong());
} else {
sb.append("NotPresent");
}
} else {
sb.append(", seqid=").append(getMaxSequenceId());
}
sb.append(", majorCompaction=").append(isMajorCompactionResult());
return sb.toString();
}
/**
* Gets whether to skip resetting the sequence id for cells.
* @param skipResetSeqId The byte array of boolean.
* @return Whether to skip resetting the sequence id.
*/
private boolean isSkipResetSeqId(byte[] skipResetSeqId) {
if (skipResetSeqId != null && skipResetSeqId.length == 1) {
return Bytes.toBoolean(skipResetSeqId);
}
return false;
}
@Override
public OptionalLong getMinimumTimestamp() {
TimeRange tr = getReader().timeRange;
return tr != null ? OptionalLong.of(tr.getMin()) : OptionalLong.empty();
}
@Override
public OptionalLong getMaximumTimestamp() {
TimeRange tr = getReader().timeRange;
return tr != null ? OptionalLong.of(tr.getMax()) : OptionalLong.empty();
}
Set<String> getCompactedStoreFiles() {
return Collections.unmodifiableSet(this.compactedStoreFiles);
}
}