blob: 33e44707c13c6d6e1ca81c1dec055cf8ec70c8d4 [file] [log] [blame]
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.ByteBloomFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Hash;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.nio.ByteBuffer;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* A Store data file. Stores usually have one or more of these files. They
* are produced by flushing the memstore to disk. To
* create, call {@link #createWriter(FileSystem, Path, int)} and append data. Be
* sure to add any metadata before calling close on the Writer
* (Use the appendMetadata convenience methods). On close, a StoreFile is
* sitting in the Filesystem. To refer to it, create a StoreFile instance
* passing filesystem and path. To read, call {@link #createReader()}.
* <p>StoreFiles may also reference store files in another Store.
*
* The reason for this weird pattern where you use a different instance for the
* writer and a reader is that we write once but read a lot more.
*/
public class StoreFile {
static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
// Config keys.
static final String IO_STOREFILE_BLOOM_ERROR_RATE = "io.storefile.bloom.error.rate";
static final String IO_STOREFILE_BLOOM_MAX_FOLD = "io.storefile.bloom.max.fold";
static final String IO_STOREFILE_BLOOM_MAX_KEYS = "io.storefile.bloom.max.keys";
static final String IO_STOREFILE_BLOOM_ENABLED = "io.storefile.bloom.enabled";
static final String HFILE_BLOCK_CACHE_SIZE_KEY = "hfile.block.cache.size";
public static enum BloomType {
/**
* Bloomfilters disabled
*/
NONE,
/**
* Bloom enabled with Table row as Key
*/
ROW,
/**
* Bloom enabled with Table row & column (family+qualifier) as Key
*/
ROWCOL
}
// Keys for fileinfo values in HFile
/** Max Sequence ID in FileInfo */
public static final byte [] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY");
/** Major compaction flag in FileInfo */
public static final byte [] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
/** Bloom filter Type in FileInfo */
static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
/** Key for Timerange information in metadata*/
static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
/** Meta data block name for bloom filter meta-info (ie: bloom params/specs) */
static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
/** Meta data block name for bloom filter data (ie: bloom bits) */
static final String BLOOM_FILTER_DATA_KEY = "BLOOM_FILTER_DATA";
// Make default block size for StoreFiles 8k while testing. TODO: FIX!
// Need to make it 8k for testing.
public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
private static BlockCache hfileBlockCache = null;
private final FileSystem fs;
// This file's path.
private final Path path;
// If this storefile references another, this is the reference instance.
private Reference reference;
// If this StoreFile references another, this is the other files path.
private Path referencePath;
// Should the block cache be used or not.
private boolean blockcache;
// Is this from an in-memory store
private boolean inMemory;
// Keys for metadata stored in backing HFile.
// Set when we obtain a Reader.
private long sequenceid = -1;
// If true, this file was product of a major compaction. Its then set
// whenever you get a Reader.
private AtomicBoolean majorCompaction = null;
/** Meta key set when store file is a result of a bulk load */
public static final byte[] BULKLOAD_TASK_KEY =
Bytes.toBytes("BULKLOAD_SOURCE_TASK");
public static final byte[] BULKLOAD_TIME_KEY =
Bytes.toBytes("BULKLOAD_TIMESTAMP");
/**
* Map of the metadata entries in the corresponding HFile
*/
private Map<byte[], byte[]> metadataMap;
/*
* Regex that will work for straight filenames and for reference names.
* If reference, then the regex has more than just one group. Group 1 is
* this files id. Group 2 the referenced region name, etc.
*/
private static final Pattern REF_NAME_PARSER =
Pattern.compile("^(\\d+)(?:\\.(.+))?$");
// StoreFile.Reader
private volatile Reader reader;
// Used making file ids.
private final static Random rand = new Random();
private final Configuration conf;
private final BloomType bloomType;
/**
* Constructor, loads a reader and it's indices, etc. May allocate a
* substantial amount of ram depending on the underlying files (10-20MB?).
*
* @param fs The current file system to use.
* @param p The path of the file.
* @param blockcache <code>true</code> if the block cache is enabled.
* @param conf The current configuration.
* @param bt The bloom type to use for this store file
* @throws IOException When opening the reader fails.
*/
StoreFile(final FileSystem fs,
final Path p,
final boolean blockcache,
final Configuration conf,
final BloomType bt,
final boolean inMemory)
throws IOException {
this.conf = conf;
this.fs = fs;
this.path = p;
this.blockcache = blockcache;
this.inMemory = inMemory;
if (isReference(p)) {
this.reference = Reference.read(fs, p);
this.referencePath = getReferredToFile(this.path);
}
// ignore if the column family config says "no bloom filter"
// even if there is one in the hfile.
if (conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true)) {
this.bloomType = bt;
} else {
this.bloomType = BloomType.NONE;
LOG.info("Ignoring bloom filter check for file (disabled in config)");
}
}
/**
* @return Path or null if this StoreFile was made with a Stream.
*/
Path getPath() {
return this.path;
}
/**
* @return The Store/ColumnFamily this file belongs to.
*/
byte [] getFamily() {
return Bytes.toBytes(this.path.getParent().getName());
}
/**
* @return True if this is a StoreFile Reference; call after {@link #open()}
* else may get wrong answer.
*/
boolean isReference() {
return this.reference != null;
}
/**
* @param p Path to check.
* @return True if the path has format of a HStoreFile reference.
*/
public static boolean isReference(final Path p) {
return !p.getName().startsWith("_") &&
isReference(p, REF_NAME_PARSER.matcher(p.getName()));
}
/**
* @param p Path to check.
* @param m Matcher to use.
* @return True if the path has format of a HStoreFile reference.
*/
public static boolean isReference(final Path p, final Matcher m) {
if (m == null || !m.matches()) {
LOG.warn("Failed match of store file name " + p.toString());
throw new RuntimeException("Failed match of store file name " +
p.toString());
}
return m.groupCount() > 1 && m.group(2) != null;
}
/*
* Return path to the file referred to by a Reference. Presumes a directory
* hierarchy of <code>${hbase.rootdir}/tablename/regionname/familyname</code>.
* @param p Path to a Reference file.
* @return Calculated path to parent region file.
* @throws IOException
*/
static Path getReferredToFile(final Path p) {
Matcher m = REF_NAME_PARSER.matcher(p.getName());
if (m == null || !m.matches()) {
LOG.warn("Failed match of store file name " + p.toString());
throw new RuntimeException("Failed match of store file name " +
p.toString());
}
// Other region name is suffix on the passed Reference file name
String otherRegion = m.group(2);
// Tabledir is up two directories from where Reference was written.
Path tableDir = p.getParent().getParent().getParent();
String nameStrippedOfSuffix = m.group(1);
// Build up new path with the referenced region in place of our current
// region in the reference path. Also strip regionname suffix from name.
return new Path(new Path(new Path(tableDir, otherRegion),
p.getParent().getName()), nameStrippedOfSuffix);
}
/**
* @return True if this file was made by a major compaction.
*/
boolean isMajorCompaction() {
if (this.majorCompaction == null) {
throw new NullPointerException("This has not been set yet");
}
return this.majorCompaction.get();
}
/**
* @return This files maximum edit sequence id.
*/
public long getMaxSequenceId() {
return this.sequenceid;
}
/**
* Return the highest sequence ID found across all storefiles in
* the given list. Store files that were created by a mapreduce
* bulk load are ignored, as they do not correspond to any edit
* log items.
* @return 0 if no non-bulk-load files are provided or, this is Store that
* does not yet have any store files.
*/
public static long getMaxSequenceIdInList(List<StoreFile> sfs) {
long max = 0;
for (StoreFile sf : sfs) {
if (!sf.isBulkLoadResult()) {
max = Math.max(max, sf.getMaxSequenceId());
}
}
return max;
}
/**
* @return true if this storefile was created by HFileOutputFormat
* for a bulk load.
*/
boolean isBulkLoadResult() {
return metadataMap.containsKey(BULKLOAD_TIME_KEY);
}
/**
* Return the timestamp at which this bulk load file was generated.
*/
public long getBulkLoadTimestamp() {
return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY));
}
/**
* Returns the block cache or <code>null</code> in case none should be used.
*
* @param conf The current configuration.
* @return The block cache or <code>null</code>.
*/
public static synchronized BlockCache getBlockCache(Configuration conf) {
if (hfileBlockCache != null) return hfileBlockCache;
float cachePercentage = conf.getFloat(HFILE_BLOCK_CACHE_SIZE_KEY, 0.2f);
// There should be a better way to optimize this. But oh well.
if (cachePercentage == 0L) return null;
if (cachePercentage > 1.0) {
throw new IllegalArgumentException(HFILE_BLOCK_CACHE_SIZE_KEY +
" must be between 0.0 and 1.0, not > 1.0");
}
// Calculate the amount of heap to give the heap.
MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
long cacheSize = (long)(mu.getMax() * cachePercentage);
LOG.info("Allocating LruBlockCache with maximum size " +
StringUtils.humanReadableInt(cacheSize));
hfileBlockCache = new LruBlockCache(cacheSize, DEFAULT_BLOCKSIZE_SMALL);
return hfileBlockCache;
}
/**
* @return the blockcache
*/
public BlockCache getBlockCache() {
return blockcache ? getBlockCache(conf) : null;
}
/**
* Opens reader on this store file. Called by Constructor.
* @return Reader for the store file.
* @throws IOException
* @see #closeReader()
*/
private Reader open() throws IOException {
if (this.reader != null) {
throw new IllegalAccessError("Already open");
}
if (isReference()) {
this.reader = new HalfStoreFileReader(this.fs, this.referencePath,
getBlockCache(), this.reference);
} else {
this.reader = new Reader(this.fs, this.path, getBlockCache(),
this.inMemory);
}
// Load up indices and fileinfo.
metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
// Read in our metadata.
byte [] b = metadataMap.get(MAX_SEQ_ID_KEY);
if (b != null) {
// By convention, if halfhfile, top half has a sequence number > bottom
// half. Thats why we add one in below. Its done for case the two halves
// are ever merged back together --rare. Without it, on open of store,
// since store files are distingushed by sequence id, the one half would
// subsume the other.
this.sequenceid = Bytes.toLong(b);
if (isReference()) {
if (Reference.isTopFileRegion(this.reference.getFileRegion())) {
this.sequenceid += 1;
}
}
}
this.reader.setSequenceID(this.sequenceid);
b = metadataMap.get(MAJOR_COMPACTION_KEY);
if (b != null) {
boolean mc = Bytes.toBoolean(b);
if (this.majorCompaction == null) {
this.majorCompaction = new AtomicBoolean(mc);
} else {
this.majorCompaction.set(mc);
}
} else {
// Presume it is not major compacted if it doesn't explicity say so
// HFileOutputFormat explicitly sets the major compacted key.
this.majorCompaction = new AtomicBoolean(false);
}
if (this.bloomType != BloomType.NONE) {
this.reader.loadBloomfilter();
}
try {
byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
if (timerangeBytes != null) {
this.reader.timeRangeTracker = new TimeRangeTracker();
Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
}
} catch (IllegalArgumentException e) {
LOG.error("Error reading timestamp range data from meta -- " +
"proceeding without", e);
this.reader.timeRangeTracker = null;
}
return this.reader;
}
/**
* @return Reader for StoreFile. creates if necessary
* @throws IOException
*/
public Reader createReader() throws IOException {
if (this.reader == null) {
this.reader = open();
}
return this.reader;
}
/**
* @return Current reader. Must call createReader first else returns null.
* @throws IOException
* @see #createReader()
*/
public Reader getReader() {
return this.reader;
}
/**
* @throws IOException
*/
public synchronized void closeReader() throws IOException {
if (this.reader != null) {
this.reader.close();
this.reader = null;
}
}
/**
* Delete this file
* @throws IOException
*/
public void deleteReader() throws IOException {
closeReader();
this.fs.delete(getPath(), true);
}
@Override
public String toString() {
return this.path.toString() +
(isReference()? "-" + this.referencePath + "-" + reference.toString(): "");
}
/**
* @return a length description of this StoreFile, suitable for debug output
*/
public String toStringDetailed() {
StringBuilder sb = new StringBuilder();
sb.append(this.path.toString());
sb.append(", isReference=").append(isReference());
sb.append(", isBulkLoadResult=").append(isBulkLoadResult());
if (isBulkLoadResult()) {
sb.append(", bulkLoadTS=").append(getBulkLoadTimestamp());
} else {
sb.append(", seqid=").append(getMaxSequenceId());
}
sb.append(", majorCompaction=").append(isMajorCompaction());
return sb.toString();
}
/**
* Utility to help with rename.
* @param fs
* @param src
* @param tgt
* @return True if succeeded.
* @throws IOException
*/
public static Path rename(final FileSystem fs,
final Path src,
final Path tgt)
throws IOException {
if (!fs.exists(src)) {
throw new FileNotFoundException(src.toString());
}
if (!fs.rename(src, tgt)) {
throw new IOException("Failed rename of " + src + " to " + tgt);
}
return tgt;
}
/**
* Get a store file writer. Client is responsible for closing file when done.
*
* @param fs
* @param dir Path to family directory. Makes the directory if doesn't exist.
* Creates a file with a unique name in this directory.
* @param blocksize size per filesystem block
* @return StoreFile.Writer
* @throws IOException
*/
public static Writer createWriter(final FileSystem fs,
final Path dir,
final int blocksize)
throws IOException {
return createWriter(fs, dir, blocksize, null, null, null, BloomType.NONE, 0);
}
/**
* Create a store file writer. Client is responsible for closing file when done.
* If metadata, add BEFORE closing using appendMetadata()
* @param fs
* @param dir Path to family directory. Makes the directory if doesn't exist.
* Creates a file with a unique name in this directory.
* @param blocksize
* @param algorithm Pass null to get default.
* @param conf HBase system configuration. used with bloom filters
* @param bloomType column family setting for bloom filters
* @param c Pass null to get default.
* @param maxKeySize peak theoretical entry size (maintains error rate)
* @return HFile.Writer
* @throws IOException
*/
public static StoreFile.Writer createWriter(final FileSystem fs,
final Path dir,
final int blocksize,
final Compression.Algorithm algorithm,
final KeyValue.KVComparator c,
final Configuration conf,
BloomType bloomType,
int maxKeySize)
throws IOException {
if (!fs.exists(dir)) {
fs.mkdirs(dir);
}
Path path = getUniqueFile(fs, dir);
if(conf == null || !conf.getBoolean(IO_STOREFILE_BLOOM_ENABLED, true)) {
bloomType = BloomType.NONE;
}
return new Writer(fs, path, blocksize,
algorithm == null? HFile.DEFAULT_COMPRESSION_ALGORITHM: algorithm,
conf, c == null? KeyValue.COMPARATOR: c, bloomType, maxKeySize);
}
/**
* @param fs
* @param dir Directory to create file in.
* @return random filename inside passed <code>dir</code>
*/
public static Path getUniqueFile(final FileSystem fs, final Path dir)
throws IOException {
if (!fs.getFileStatus(dir).isDir()) {
throw new IOException("Expecting " + dir.toString() +
" to be a directory");
}
return fs.getFileStatus(dir).isDir()? getRandomFilename(fs, dir): dir;
}
/**
*
* @param fs
* @param dir
* @return Path to a file that doesn't exist at time of this invocation.
* @throws IOException
*/
static Path getRandomFilename(final FileSystem fs, final Path dir)
throws IOException {
return getRandomFilename(fs, dir, null);
}
/**
*
* @param fs
* @param dir
* @param suffix
* @return Path to a file that doesn't exist at time of this invocation.
* @throws IOException
*/
static Path getRandomFilename(final FileSystem fs,
final Path dir,
final String suffix)
throws IOException {
long id = -1;
Path p = null;
do {
id = Math.abs(rand.nextLong());
p = new Path(dir, Long.toString(id) +
((suffix == null || suffix.length() <= 0)? "": suffix));
} while(fs.exists(p));
return p;
}
/**
* Write out a split reference.
*
* Package local so it doesnt leak out of regionserver.
*
* @param fs
* @param splitDir Presumes path format is actually
* <code>SOME_DIRECTORY/REGIONNAME/FAMILY</code>.
* @param f File to split.
* @param splitRow
* @param range
* @return Path to created reference.
* @throws IOException
*/
static Path split(final FileSystem fs,
final Path splitDir,
final StoreFile f,
final byte [] splitRow,
final Reference.Range range)
throws IOException {
// A reference to the bottom half of the hsf store file.
Reference r = new Reference(splitRow, range);
// Add the referred-to regions name as a dot separated suffix.
// See REF_NAME_PARSER regex above. The referred-to regions name is
// up in the path of the passed in <code>f</code> -- parentdir is family,
// then the directory above is the region name.
String parentRegionName = f.getPath().getParent().getParent().getName();
// Write reference with same file id only with the other region name as
// suffix and into the new region location (under same family).
Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
return r.write(fs, p);
}
/**
* A StoreFile writer. Use this to read/write HBase Store Files. It is package
* local because it is an implementation detail of the HBase regionserver.
*/
public static class Writer {
private final BloomFilter bloomFilter;
private final BloomType bloomType;
private KVComparator kvComparator;
private KeyValue lastKv = null;
private byte[] lastByteArray = null;
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
/* isTimeRangeTrackerSet keeps track if the timeRange has already been set
* When flushing a memstore, we set TimeRange and use this variable to
* indicate that it doesn't need to be calculated again while
* appending KeyValues.
* It is not set in cases of compactions when it is recalculated using only
* the appended KeyValues*/
boolean isTimeRangeTrackerSet = false;
protected HFile.Writer writer;
/**
* Creates an HFile.Writer that also write helpful meta data.
* @param fs file system to write to
* @param path file name to create
* @param blocksize HDFS block size
* @param compress HDFS block compression
* @param conf user configuration
* @param comparator key comparator
* @param bloomType bloom filter setting
* @param maxKeys maximum amount of keys to add (for blooms)
* @throws IOException problem writing to FS
*/
public Writer(FileSystem fs, Path path, int blocksize,
Compression.Algorithm compress, final Configuration conf,
final KVComparator comparator, BloomType bloomType, int maxKeys)
throws IOException {
writer = new HFile.Writer(fs, path, blocksize, compress, comparator.getRawComparator());
this.kvComparator = comparator;
BloomFilter bloom = null;
BloomType bt = BloomType.NONE;
if (bloomType != BloomType.NONE && conf != null) {
float err = conf.getFloat(IO_STOREFILE_BLOOM_ERROR_RATE, (float)0.01);
// Since in row+col blooms we have 2 calls to shouldSeek() instead of 1
// and the false positives are adding up, we should keep the error rate
// twice as low in order to maintain the number of false positives as
// desired by the user
if (bloomType == BloomType.ROWCOL) {
err /= 2;
}
int maxFold = conf.getInt(IO_STOREFILE_BLOOM_MAX_FOLD, 7);
int tooBig = conf.getInt(IO_STOREFILE_BLOOM_MAX_KEYS, 128*1000*1000);
if (maxKeys < tooBig) {
try {
bloom = new ByteBloomFilter(maxKeys, err,
Hash.getHashType(conf), maxFold);
bloom.allocBloom();
bt = bloomType;
} catch (IllegalArgumentException iae) {
LOG.warn(String.format(
"Parse error while creating bloom for %s (%d, %f)",
path, maxKeys, err), iae);
bloom = null;
bt = BloomType.NONE;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping bloom filter because max keysize too large: "
+ maxKeys);
}
}
}
this.bloomFilter = bloom;
this.bloomType = bt;
}
/**
* Writes meta data.
* Call before {@link #close()} since its written as meta data to this file.
* @param maxSequenceId Maximum sequence id.
* @param majorCompaction True if this file is product of a major compaction
* @throws IOException problem writing to FS
*/
public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
throws IOException {
writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
writer.appendFileInfo(MAJOR_COMPACTION_KEY,
Bytes.toBytes(majorCompaction));
appendTimeRangeMetadata();
}
/**
* Add TimestampRange to Metadata
*/
public void appendTimeRangeMetadata() throws IOException {
appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
}
/**
* Set TimeRangeTracker
* @param trt
*/
public void setTimeRangeTracker(final TimeRangeTracker trt) {
this.timeRangeTracker = trt;
isTimeRangeTrackerSet = true;
}
/**
* If the timeRangeTracker is not set,
* update TimeRangeTracker to include the timestamp of this key
* @param kv
* @throws IOException
*/
public void includeInTimeRangeTracker(final KeyValue kv) {
if (!isTimeRangeTrackerSet) {
timeRangeTracker.includeTimestamp(kv);
}
}
/**
* If the timeRangeTracker is not set,
* update TimeRangeTracker to include the timestamp of this key
* @param key
* @throws IOException
*/
public void includeInTimeRangeTracker(final byte [] key) {
if (!isTimeRangeTrackerSet) {
timeRangeTracker.includeTimestamp(key);
}
}
public void append(final KeyValue kv) throws IOException {
if (this.bloomFilter != null) {
// only add to the bloom filter on a new, unique key
boolean newKey = true;
if (this.lastKv != null) {
switch(bloomType) {
case ROW:
newKey = ! kvComparator.matchingRows(kv, lastKv);
break;
case ROWCOL:
newKey = ! kvComparator.matchingRowColumn(kv, lastKv);
break;
case NONE:
newKey = false;
}
}
if (newKey) {
/*
* http://2.bp.blogspot.com/_Cib_A77V54U/StZMrzaKufI/AAAAAAAAADo/ZhK7bGoJdMQ/s400/KeyValue.png
* Key = RowLen + Row + FamilyLen + Column [Family + Qualifier] + TimeStamp
*
* 2 Types of Filtering:
* 1. Row = Row
* 2. RowCol = Row + Qualifier
*/
switch (bloomType) {
case ROW:
this.bloomFilter.add(kv.getBuffer(), kv.getRowOffset(),
kv.getRowLength());
break;
case ROWCOL:
// merge(row, qualifier)
int ro = kv.getRowOffset();
int rl = kv.getRowLength();
int qo = kv.getQualifierOffset();
int ql = kv.getQualifierLength();
byte [] result = new byte[rl + ql];
System.arraycopy(kv.getBuffer(), ro, result, 0, rl);
System.arraycopy(kv.getBuffer(), qo, result, rl, ql);
this.bloomFilter.add(result);
break;
default:
}
this.lastKv = kv;
}
}
writer.append(kv);
includeInTimeRangeTracker(kv);
}
public Path getPath() {
return this.writer.getPath();
}
boolean hasBloom() {
return this.bloomFilter != null;
}
public void append(final byte [] key, final byte [] value) throws IOException {
if (this.bloomFilter != null) {
// only add to the bloom filter on a new row
if (this.lastByteArray == null || !Arrays.equals(key, lastByteArray)) {
this.bloomFilter.add(key);
this.lastByteArray = key;
}
}
writer.append(key, value);
includeInTimeRangeTracker(key);
}
public void close() throws IOException {
// make sure we wrote something to the bloom before adding it
if (this.bloomFilter != null && this.bloomFilter.getKeyCount() > 0) {
bloomFilter.compactBloom();
if (this.bloomFilter.getMaxKeys() > 0) {
int b = this.bloomFilter.getByteSize();
int k = this.bloomFilter.getKeyCount();
int m = this.bloomFilter.getMaxKeys();
StoreFile.LOG.info("Bloom added to HFile (" +
getPath() + "): " + StringUtils.humanReadableInt(b) + ", " +
k + "/" + m + " (" + NumberFormat.getPercentInstance().format(
((double)k) / ((double)m)) + ")");
}
writer.appendMetaBlock(BLOOM_FILTER_META_KEY, bloomFilter.getMetaWriter());
writer.appendMetaBlock(BLOOM_FILTER_DATA_KEY, bloomFilter.getDataWriter());
writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString()));
}
writer.close();
}
public void appendFileInfo(byte[] key, byte[] value) throws IOException {
writer.appendFileInfo(key, value);
}
}
/**
* Reader for a StoreFile.
*/
public static class Reader {
static final Log LOG = LogFactory.getLog(Reader.class.getName());
protected BloomFilter bloomFilter = null;
protected BloomType bloomFilterType;
private final HFile.Reader reader;
protected TimeRangeTracker timeRangeTracker = null;
protected long sequenceID = -1;
public Reader(FileSystem fs, Path path, BlockCache blockCache, boolean inMemory)
throws IOException {
reader = new HFile.Reader(fs, path, blockCache, inMemory);
bloomFilterType = BloomType.NONE;
}
public RawComparator<byte []> getComparator() {
return reader.getComparator();
}
/**
* Get a scanner to scan over this StoreFile.
*
* @param cacheBlocks should this scanner cache blocks?
* @param pread use pread (for highly concurrent small readers)
* @return a scanner
*/
public StoreFileScanner getStoreFileScanner(boolean cacheBlocks, boolean pread) {
return new StoreFileScanner(this, getScanner(cacheBlocks, pread));
}
/**
* Warning: Do not write further code which depends on this call. Instead
* use getStoreFileScanner() which uses the StoreFileScanner class/interface
* which is the preferred way to scan a store with higher level concepts.
*
* @param cacheBlocks should we cache the blocks?
* @param pread use pread (for concurrent small readers)
* @return the underlying HFileScanner
*/
@Deprecated
public HFileScanner getScanner(boolean cacheBlocks, boolean pread) {
return reader.getScanner(cacheBlocks, pread);
}
public void close() throws IOException {
reader.close();
}
public boolean shouldSeek(Scan scan, final SortedSet<byte[]> columns) {
return (passesTimerangeFilter(scan) && passesBloomFilter(scan,columns));
}
/**
* Check if this storeFile may contain keys within the TimeRange
* @param scan
* @return False if it definitely does not exist in this StoreFile
*/
private boolean passesTimerangeFilter(Scan scan) {
if (timeRangeTracker == null) {
return true;
} else {
return timeRangeTracker.includesTimeRange(scan.getTimeRange());
}
}
private boolean passesBloomFilter(Scan scan, final SortedSet<byte[]> columns) {
if (this.bloomFilter == null || !scan.isGetScan()) {
return true;
}
byte[] row = scan.getStartRow();
byte[] key;
switch (this.bloomFilterType) {
case ROW:
key = row;
break;
case ROWCOL:
if (columns != null && columns.size() == 1) {
byte[] col = columns.first();
key = Bytes.add(row, col);
break;
}
//$FALL-THROUGH$
default:
return true;
}
try {
ByteBuffer bloom = reader.getMetaBlock(BLOOM_FILTER_DATA_KEY, true);
if (bloom != null) {
if (this.bloomFilterType == BloomType.ROWCOL) {
// Since a Row Delete is essentially a DeleteFamily applied to all
// columns, a file might be skipped if using row+col Bloom filter.
// In order to ensure this file is included an additional check is
// required looking only for a row bloom.
return this.bloomFilter.contains(key, bloom) ||
this.bloomFilter.contains(row, bloom);
}
else {
return this.bloomFilter.contains(key, bloom);
}
}
} catch (IOException e) {
LOG.error("Error reading bloom filter data -- proceeding without",
e);
setBloomFilterFaulty();
} catch (IllegalArgumentException e) {
LOG.error("Bad bloom filter data -- proceeding without", e);
setBloomFilterFaulty();
}
return true;
}
public Map<byte[], byte[]> loadFileInfo() throws IOException {
Map<byte [], byte []> fi = reader.loadFileInfo();
byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY);
if (b != null) {
bloomFilterType = BloomType.valueOf(Bytes.toString(b));
}
return fi;
}
public void loadBloomfilter() {
if (this.bloomFilter != null) {
return; // already loaded
}
try {
ByteBuffer b = reader.getMetaBlock(BLOOM_FILTER_META_KEY, false);
if (b != null) {
if (bloomFilterType == BloomType.NONE) {
throw new IOException("valid bloom filter type not found in FileInfo");
}
this.bloomFilter = new ByteBloomFilter(b);
LOG.info("Loaded " + (bloomFilterType== BloomType.ROW? "row":"col")
+ " bloom filter metadata for " + reader.getName());
}
} catch (IOException e) {
LOG.error("Error reading bloom filter meta -- proceeding without", e);
this.bloomFilter = null;
} catch (IllegalArgumentException e) {
LOG.error("Bad bloom filter meta -- proceeding without", e);
this.bloomFilter = null;
}
}
public int getFilterEntries() {
return (this.bloomFilter != null) ? this.bloomFilter.getKeyCount()
: reader.getFilterEntries();
}
public ByteBuffer getMetaBlock(String bloomFilterDataKey, boolean cacheBlock) throws IOException {
return reader.getMetaBlock(bloomFilterDataKey, cacheBlock);
}
public void setBloomFilterFaulty() {
bloomFilter = null;
}
public byte[] getLastKey() {
return reader.getLastKey();
}
public byte[] midkey() throws IOException {
return reader.midkey();
}
public long length() {
return reader.length();
}
public int getEntries() {
return reader.getEntries();
}
public byte[] getFirstKey() {
return reader.getFirstKey();
}
public long indexSize() {
return reader.indexSize();
}
public BloomType getBloomFilterType() {
return this.bloomFilterType;
}
public long getSequenceID() {
return sequenceID;
}
public void setSequenceID(long sequenceID) {
this.sequenceID = sequenceID;
}
}
/**
* Useful comparators for comparing StoreFiles.
*/
abstract static class Comparators {
/**
* Comparator that compares based on the flush time of
* the StoreFiles. All bulk loads are placed before all non-
* bulk loads, and then all files are sorted by sequence ID.
* If there are ties, the path name is used as a tie-breaker.
*/
static final Comparator<StoreFile> FLUSH_TIME =
Ordering.compound(ImmutableList.of(
Ordering.natural().onResultOf(new GetBulkTime()),
Ordering.natural().onResultOf(new GetSeqId()),
Ordering.natural().onResultOf(new GetPathName())
));
private static class GetBulkTime implements Function<StoreFile, Long> {
@Override
public Long apply(StoreFile sf) {
if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
return sf.getBulkLoadTimestamp();
}
}
private static class GetSeqId implements Function<StoreFile, Long> {
@Override
public Long apply(StoreFile sf) {
if (sf.isBulkLoadResult()) return -1L;
return sf.getMaxSequenceId();
}
}
private static class GetPathName implements Function<StoreFile, String> {
@Override
public String apply(StoreFile sf) {
return sf.getPath().getName();
}
}
}
}