blob: e34c334957b4cac97bf1b929c1f6ee7880ad4477 [file] [log] [blame]
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.io.hfile;
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.KeyValue.KeyComparator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.HbaseMapWritable;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.BloomFilter;
import org.apache.hadoop.hbase.util.ByteBloomFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CompressionTest;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
/**
* File format for hbase.
* A file of sorted key/value pairs. Both keys and values are byte arrays.
* <p>
* The memory footprint of a HFile includes the following (below is taken from the
* <a
* href=https://issues.apache.org/jira/browse/HADOOP-3315>TFile</a> documentation
* but applies also to HFile):
* <ul>
* <li>Some constant overhead of reading or writing a compressed block.
* <ul>
* <li>Each compressed block requires one compression/decompression codec for
* I/O.
* <li>Temporary space to buffer the key.
* <li>Temporary space to buffer the value.
* </ul>
* <li>HFile index, which is proportional to the total number of Data Blocks.
* The total amount of memory needed to hold the index can be estimated as
* (56+AvgKeySize)*NumBlocks.
* </ul>
* Suggestions on performance optimization.
* <ul>
* <li>Minimum block size. We recommend a setting of minimum block size between
* 8KB to 1MB for general usage. Larger block size is preferred if files are
* primarily for sequential access. However, it would lead to inefficient random
* access (because there are more data to decompress). Smaller blocks are good
* for random access, but require more memory to hold the block index, and may
* be slower to create (because we must flush the compressor stream at the
* conclusion of each data block, which leads to an FS I/O flush). Further, due
* to the internal caching in Compression codec, the smallest possible block
* size would be around 20KB-30KB.
* <li>The current implementation does not offer true multi-threading for
* reading. The implementation uses FSDataInputStream seek()+read(), which is
* shown to be much faster than positioned-read call in single thread mode.
* However, it also means that if multiple threads attempt to access the same
* HFile (using multiple scanners) simultaneously, the actual I/O is carried out
* sequentially even if they access different DFS blocks (Reexamine! pread seems
* to be 10% faster than seek+read in my testing -- stack).
* <li>Compression codec. Use "none" if the data is not very compressable (by
* compressable, I mean a compression ratio at least 2:1). Generally, use "lzo"
* as the starting point for experimenting. "gz" overs slightly better
* compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to
* decompress, comparing to "lzo".
* </ul>
*
* For more on the background behind HFile, see <a
* href=https://issues.apache.org/jira/browse/HBASE-61>HBASE-61</a>.
* <p>
* File is made of data blocks followed by meta data blocks (if any), a fileinfo
* block, data block index, meta data block index, and a fixed size trailer
* which records the offsets at which file changes content type.
* <pre>&lt;data blocks>&lt;meta blocks>&lt;fileinfo>&lt;data index>&lt;meta index>&lt;trailer></pre>
* Each block has a bit of magic at its start. Block are comprised of
* key/values. In data blocks, they are both byte arrays. Metadata blocks are
* a String key and a byte array value. An empty file looks like this:
* <pre>&lt;fileinfo>&lt;trailer></pre>. That is, there are not data nor meta
* blocks present.
* <p>
* TODO: Do scanners need to be able to take a start and end row?
* TODO: Should BlockIndex know the name of its file? Should it have a Path
* that points at its file say for the case where an index lives apart from
* an HFile instance?
*/
public class HFile {
static final Log LOG = LogFactory.getLog(HFile.class);
/* These values are more or less arbitrary, and they are used as a
* form of check to make sure the file isn't completely corrupt.
*/
final static byte [] DATABLOCKMAGIC =
{'D', 'A', 'T', 'A', 'B', 'L', 'K', 42 };
final static byte [] INDEXBLOCKMAGIC =
{ 'I', 'D', 'X', 'B', 'L', 'K', 41, 43 };
final static byte [] METABLOCKMAGIC =
{ 'M', 'E', 'T', 'A', 'B', 'L', 'K', 99 };
final static byte [] TRAILERBLOCKMAGIC =
{ 'T', 'R', 'A', 'B', 'L', 'K', 34, 36 };
/**
* Maximum length of key in HFile.
*/
public final static int MAXIMUM_KEY_LENGTH = Integer.MAX_VALUE;
/**
* Default blocksize for hfile.
*/
public final static int DEFAULT_BLOCKSIZE = 64 * 1024;
/**
* Default compression: none.
*/
public final static Compression.Algorithm DEFAULT_COMPRESSION_ALGORITHM =
Compression.Algorithm.NONE;
/** Default compression name: none. */
public final static String DEFAULT_COMPRESSION =
DEFAULT_COMPRESSION_ALGORITHM.getName();
// For measuring latency of "typical" reads and writes
private static volatile long readOps;
private static volatile long readTime;
private static volatile long writeOps;
private static volatile long writeTime;
public static final long getReadOps() {
long ret = readOps;
readOps = 0;
return ret;
}
public static final long getReadTime() {
long ret = readTime;
readTime = 0;
return ret;
}
public static final long getWriteOps() {
long ret = writeOps;
writeOps = 0;
return ret;
}
public static final long getWriteTime() {
long ret = writeTime;
writeTime = 0;
return ret;
}
/**
* HFile Writer.
*/
public static class Writer implements Closeable {
// FileSystem stream to write on.
private FSDataOutputStream outputStream;
// True if we opened the <code>outputStream</code> (and so will close it).
private boolean closeOutputStream;
// Name for this object used when logging or in toString. Is either
// the result of a toString on stream or else toString of passed file Path.
protected String name;
// Total uncompressed bytes, maybe calculate a compression ratio later.
private long totalBytes = 0;
// Total # of key/value entries, ie: how many times add() was called.
private int entryCount = 0;
// Used calculating average key and value lengths.
private long keylength = 0;
private long valuelength = 0;
// Used to ensure we write in order.
private final RawComparator<byte []> comparator;
// A stream made per block written.
private DataOutputStream out;
// Number of uncompressed bytes per block. Reinitialized when we start
// new block.
private int blocksize;
// Offset where the current block began.
private long blockBegin;
// First key in a block (Not first key in file).
private byte [] firstKey = null;
// Key previously appended. Becomes the last key in the file.
private byte [] lastKeyBuffer = null;
private int lastKeyOffset = -1;
private int lastKeyLength = -1;
// See {@link BlockIndex}. Below four fields are used to write the block
// index.
ArrayList<byte[]> blockKeys = new ArrayList<byte[]>();
// Block offset in backing stream.
ArrayList<Long> blockOffsets = new ArrayList<Long>();
// Raw (decompressed) data size.
ArrayList<Integer> blockDataSizes = new ArrayList<Integer>();
// Meta block system.
private ArrayList<byte []> metaNames = new ArrayList<byte []>();
private ArrayList<Writable> metaData = new ArrayList<Writable>();
// Used compression. Used even if no compression -- 'none'.
private final Compression.Algorithm compressAlgo;
private Compressor compressor;
// Special datastructure to hold fileinfo.
private FileInfo fileinfo = new FileInfo();
// May be null if we were passed a stream.
private Path path = null;
/**
* Constructor that uses all defaults for compression and block size.
* @param fs
* @param path
* @throws IOException
*/
public Writer(FileSystem fs, Path path)
throws IOException {
this(fs, path, DEFAULT_BLOCKSIZE, (Compression.Algorithm) null, null);
}
/**
* Constructor that takes a Path.
* @param fs
* @param path
* @param blocksize
* @param compress
* @param comparator
* @throws IOException
* @throws IOException
*/
public Writer(FileSystem fs, Path path, int blocksize,
String compress, final KeyComparator comparator)
throws IOException {
this(fs, path, blocksize,
compress == null? DEFAULT_COMPRESSION_ALGORITHM:
Compression.getCompressionAlgorithmByName(compress),
comparator);
}
/**
* Constructor that takes a Path.
* @param fs
* @param path
* @param blocksize
* @param compress
* @param comparator
* @throws IOException
*/
public Writer(FileSystem fs, Path path, int blocksize,
Compression.Algorithm compress,
final KeyComparator comparator)
throws IOException {
this(fs.create(path), blocksize, compress, comparator);
this.closeOutputStream = true;
this.name = path.toString();
this.path = path;
}
/**
* Constructor that takes a stream.
* @param ostream Stream to use.
* @param blocksize
* @param compress
* @param c RawComparator to use.
* @throws IOException
*/
public Writer(final FSDataOutputStream ostream, final int blocksize,
final String compress, final KeyComparator c)
throws IOException {
this(ostream, blocksize,
Compression.getCompressionAlgorithmByName(compress), c);
}
/**
* Constructor that takes a stream.
* @param ostream Stream to use.
* @param blocksize
* @param compress
* @param c
* @throws IOException
*/
public Writer(final FSDataOutputStream ostream, final int blocksize,
final Compression.Algorithm compress, final KeyComparator c)
throws IOException {
this.outputStream = ostream;
this.closeOutputStream = false;
this.blocksize = blocksize;
this.comparator = c == null? Bytes.BYTES_RAWCOMPARATOR: c;
this.name = this.outputStream.toString();
this.compressAlgo = compress == null?
DEFAULT_COMPRESSION_ALGORITHM: compress;
}
/*
* If at block boundary, opens new block.
* @throws IOException
*/
private void checkBlockBoundary() throws IOException {
if (this.out != null && this.out.size() < blocksize) return;
finishBlock();
newBlock();
}
/*
* Do the cleanup if a current block.
* @throws IOException
*/
private void finishBlock() throws IOException {
if (this.out == null) return;
long now = System.currentTimeMillis();
int size = releaseCompressingStream(this.out);
this.out = null;
blockKeys.add(firstKey);
blockOffsets.add(Long.valueOf(blockBegin));
blockDataSizes.add(Integer.valueOf(size));
this.totalBytes += size;
writeTime += System.currentTimeMillis() - now;
writeOps++;
}
/*
* Ready a new block for writing.
* @throws IOException
*/
private void newBlock() throws IOException {
// This is where the next block begins.
blockBegin = outputStream.getPos();
this.out = getCompressingStream();
this.out.write(DATABLOCKMAGIC);
firstKey = null;
}
/*
* Sets up a compressor and creates a compression stream on top of
* this.outputStream. Get one per block written.
* @return A compressing stream; if 'none' compression, returned stream
* does not compress.
* @throws IOException
* @see {@link #releaseCompressingStream(DataOutputStream)}
*/
private DataOutputStream getCompressingStream() throws IOException {
this.compressor = compressAlgo.getCompressor();
// Get new DOS compression stream. In tfile, the DOS, is not closed,
// just finished, and that seems to be fine over there. TODO: Check
// no memory retention of the DOS. Should I disable the 'flush' on the
// DOS as the BCFile over in tfile does? It wants to make it so flushes
// don't go through to the underlying compressed stream. Flush on the
// compressed downstream should be only when done. I was going to but
// looks like when we call flush in here, its legitimate flush that
// should go through to the compressor.
OutputStream os =
this.compressAlgo.createCompressionStream(this.outputStream,
this.compressor, 0);
return new DataOutputStream(os);
}
/*
* Let go of block compressor and compressing stream gotten in call
* {@link #getCompressingStream}.
* @param dos
* @return How much was written on this stream since it was taken out.
* @see #getCompressingStream()
* @throws IOException
*/
private int releaseCompressingStream(final DataOutputStream dos)
throws IOException {
dos.flush();
this.compressAlgo.returnCompressor(this.compressor);
this.compressor = null;
return dos.size();
}
/**
* Add a meta block to the end of the file. Call before close().
* Metadata blocks are expensive. Fill one with a bunch of serialized data
* rather than do a metadata block per metadata instance. If metadata is
* small, consider adding to file info using
* {@link #appendFileInfo(byte[], byte[])}
* @param metaBlockName name of the block
* @param content will call readFields to get data later (DO NOT REUSE)
*/
public void appendMetaBlock(String metaBlockName, Writable content) {
byte[] key = Bytes.toBytes(metaBlockName);
int i;
for (i = 0; i < metaNames.size(); ++i) {
// stop when the current key is greater than our own
byte[] cur = metaNames.get(i);
if (Bytes.BYTES_RAWCOMPARATOR.compare(cur, 0, cur.length,
key, 0, key.length) > 0) {
break;
}
}
metaNames.add(i, key);
metaData.add(i, content);
}
/**
* Add to the file info. Added key value can be gotten out of the return
* from {@link Reader#loadFileInfo()}.
* @param k Key
* @param v Value
* @throws IOException
*/
public void appendFileInfo(final byte [] k, final byte [] v)
throws IOException {
appendFileInfo(this.fileinfo, k, v, true);
}
static FileInfo appendFileInfo(FileInfo fi, final byte [] k, final byte [] v,
final boolean checkPrefix)
throws IOException {
if (k == null || v == null) {
throw new NullPointerException("Key nor value may be null");
}
if (checkPrefix &&
Bytes.startsWith(k, FileInfo.RESERVED_PREFIX_BYTES)) {
throw new IOException("Keys with a " + FileInfo.RESERVED_PREFIX +
" are reserved");
}
fi.put(k, v);
return fi;
}
/**
* @return Path or null if we were passed a stream rather than a Path.
*/
public Path getPath() {
return this.path;
}
@Override
public String toString() {
return "writer=" + this.name + ", compression=" +
this.compressAlgo.getName();
}
/**
* Add key/value to file.
* Keys must be added in an order that agrees with the Comparator passed
* on construction.
* @param kv KeyValue to add. Cannot be empty nor null.
* @throws IOException
*/
public void append(final KeyValue kv)
throws IOException {
append(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength(),
kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
}
/**
* Add key/value to file.
* Keys must be added in an order that agrees with the Comparator passed
* on construction.
* @param key Key to add. Cannot be empty nor null.
* @param value Value to add. Cannot be empty nor null.
* @throws IOException
*/
public void append(final byte [] key, final byte [] value)
throws IOException {
append(key, 0, key.length, value, 0, value.length);
}
/**
* Add key/value to file.
* Keys must be added in an order that agrees with the Comparator passed
* on construction.
* @param key
* @param koffset
* @param klength
* @param value
* @param voffset
* @param vlength
* @throws IOException
*/
private void append(final byte [] key, final int koffset, final int klength,
final byte [] value, final int voffset, final int vlength)
throws IOException {
boolean dupKey = checkKey(key, koffset, klength);
checkValue(value, voffset, vlength);
if (!dupKey) {
checkBlockBoundary();
}
// Write length of key and value and then actual key and value bytes.
this.out.writeInt(klength);
this.keylength += klength;
this.out.writeInt(vlength);
this.valuelength += vlength;
this.out.write(key, koffset, klength);
this.out.write(value, voffset, vlength);
// Are we the first key in this block?
if (this.firstKey == null) {
// Copy the key.
this.firstKey = new byte [klength];
System.arraycopy(key, koffset, this.firstKey, 0, klength);
}
this.lastKeyBuffer = key;
this.lastKeyOffset = koffset;
this.lastKeyLength = klength;
this.entryCount ++;
}
/*
* @param key Key to check.
* @return the flag of duplicate Key or not
* @throws IOException
*/
private boolean checkKey(final byte [] key, final int offset, final int length)
throws IOException {
boolean dupKey = false;
if (key == null || length <= 0) {
throw new IOException("Key cannot be null or empty");
}
if (length > MAXIMUM_KEY_LENGTH) {
throw new IOException("Key length " + length + " > " +
MAXIMUM_KEY_LENGTH);
}
if (this.lastKeyBuffer != null) {
int keyComp = this.comparator.compare(this.lastKeyBuffer, this.lastKeyOffset,
this.lastKeyLength, key, offset, length);
if (keyComp > 0) {
throw new IOException("Added a key not lexically larger than" +
" previous key=" + Bytes.toStringBinary(key, offset, length) +
", lastkey=" + Bytes.toStringBinary(this.lastKeyBuffer, this.lastKeyOffset,
this.lastKeyLength));
} else if (keyComp == 0) {
dupKey = true;
}
}
return dupKey;
}
private void checkValue(final byte [] value, final int offset,
final int length) throws IOException {
if (value == null) {
throw new IOException("Value cannot be null");
}
}
public long getTotalBytes() {
return this.totalBytes;
}
public void close() throws IOException {
if (this.outputStream == null) {
return;
}
// Write out the end of the data blocks, then write meta data blocks.
// followed by fileinfo, data block index and meta block index.
finishBlock();
FixedFileTrailer trailer = new FixedFileTrailer();
// Write out the metadata blocks if any.
ArrayList<Long> metaOffsets = null;
ArrayList<Integer> metaDataSizes = null;
if (metaNames.size() > 0) {
metaOffsets = new ArrayList<Long>(metaNames.size());
metaDataSizes = new ArrayList<Integer>(metaNames.size());
for (int i = 0 ; i < metaNames.size() ; ++ i ) {
// store the beginning offset
long curPos = outputStream.getPos();
metaOffsets.add(curPos);
// write the metadata content
DataOutputStream dos = getCompressingStream();
dos.write(METABLOCKMAGIC);
metaData.get(i).write(dos);
int size = releaseCompressingStream(dos);
// store the metadata size
metaDataSizes.add(size);
}
}
// Write fileinfo.
trailer.fileinfoOffset = writeFileInfo(this.outputStream);
// Write the data block index.
trailer.dataIndexOffset = BlockIndex.writeIndex(this.outputStream,
this.blockKeys, this.blockOffsets, this.blockDataSizes);
// Meta block index.
if (metaNames.size() > 0) {
trailer.metaIndexOffset = BlockIndex.writeIndex(this.outputStream,
this.metaNames, metaOffsets, metaDataSizes);
}
// Now finish off the trailer.
trailer.dataIndexCount = blockKeys.size();
trailer.metaIndexCount = metaNames.size();
trailer.totalUncompressedBytes = totalBytes;
trailer.entryCount = entryCount;
trailer.compressionCodec = this.compressAlgo.ordinal();
trailer.serialize(outputStream);
if (this.closeOutputStream) {
this.outputStream.close();
this.outputStream = null;
}
}
/*
* Add last bits of metadata to fileinfo and then write it out.
* Reader will be expecting to find all below.
* @param o Stream to write on.
* @return Position at which we started writing.
* @throws IOException
*/
private long writeFileInfo(FSDataOutputStream o) throws IOException {
if (this.lastKeyBuffer != null) {
// Make a copy. The copy is stuffed into HMapWritable. Needs a clean
// byte buffer. Won't take a tuple.
byte [] b = new byte[this.lastKeyLength];
System.arraycopy(this.lastKeyBuffer, this.lastKeyOffset, b, 0,
this.lastKeyLength);
appendFileInfo(this.fileinfo, FileInfo.LASTKEY, b, false);
}
int avgKeyLen = this.entryCount == 0? 0:
(int)(this.keylength/this.entryCount);
appendFileInfo(this.fileinfo, FileInfo.AVG_KEY_LEN,
Bytes.toBytes(avgKeyLen), false);
int avgValueLen = this.entryCount == 0? 0:
(int)(this.valuelength/this.entryCount);
appendFileInfo(this.fileinfo, FileInfo.AVG_VALUE_LEN,
Bytes.toBytes(avgValueLen), false);
appendFileInfo(this.fileinfo, FileInfo.COMPARATOR,
Bytes.toBytes(this.comparator.getClass().getName()), false);
long pos = o.getPos();
this.fileinfo.write(o);
return pos;
}
}
/**
* HFile Reader.
*/
public static class Reader implements Closeable {
// Stream to read from.
private FSDataInputStream istream;
// True if we should close istream when done. We don't close it if we
// didn't open it.
private boolean closeIStream;
// These are read in when the file info is loaded.
HFile.BlockIndex blockIndex;
private BlockIndex metaIndex;
FixedFileTrailer trailer;
private volatile boolean fileInfoLoaded = false;
// Filled when we read in the trailer.
private Compression.Algorithm compressAlgo;
// Last key in the file. Filled in when we read in the file info
private byte [] lastkey = null;
// Stats read in when we load file info.
private int avgKeyLen = -1;
private int avgValueLen = -1;
// Used to ensure we seek correctly.
RawComparator<byte []> comparator;
// Size of this file.
private final long fileSize;
// Block cache to use.
private final BlockCache cache;
public int cacheHits = 0;
public int blockLoads = 0;
public int metaLoads = 0;
// Whether file is from in-memory store
private boolean inMemory = false;
// Name for this object used when logging or in toString. Is either
// the result of a toString on the stream or else is toString of passed
// file Path plus metadata key/value pairs.
protected String name;
/**
* Opens a HFile. You must load the file info before you can
* use it by calling {@link #loadFileInfo()}.
*
* @param fs filesystem to load from
* @param path path within said filesystem
* @param cache block cache. Pass null if none.
* @throws IOException
*/
public Reader(FileSystem fs, Path path, BlockCache cache, boolean inMemory)
throws IOException {
this(fs.open(path), fs.getFileStatus(path).getLen(), cache, inMemory);
this.closeIStream = true;
this.name = path.toString();
}
/**
* Opens a HFile. You must load the index before you can
* use it by calling {@link #loadFileInfo()}.
*
* @param fsdis input stream. Caller is responsible for closing the passed
* stream.
* @param size Length of the stream.
* @param cache block cache. Pass null if none.
* @throws IOException
*/
public Reader(final FSDataInputStream fsdis, final long size,
final BlockCache cache, final boolean inMemory) {
this.cache = cache;
this.fileSize = size;
this.istream = fsdis;
this.closeIStream = false;
this.name = this.istream == null? "": this.istream.toString();
this.inMemory = inMemory;
}
@Override
public String toString() {
return "reader=" + this.name +
(!isFileInfoLoaded()? "":
", compression=" + this.compressAlgo.getName() +
", inMemory=" + this.inMemory +
", firstKey=" + toStringFirstKey() +
", lastKey=" + toStringLastKey()) +
", avgKeyLen=" + this.avgKeyLen +
", avgValueLen=" + this.avgValueLen +
", entries=" + this.trailer.entryCount +
", length=" + this.fileSize;
}
protected String toStringFirstKey() {
return KeyValue.keyToString(getFirstKey());
}
protected String toStringLastKey() {
return KeyValue.keyToString(getLastKey());
}
public long length() {
return this.fileSize;
}
public boolean inMemory() {
return this.inMemory;
}
private byte[] readAllIndex(final FSDataInputStream in, final long indexOffset,
final int indexSize) throws IOException {
byte[] allIndex = new byte[indexSize];
in.seek(indexOffset);
IOUtils.readFully(in, allIndex, 0, allIndex.length);
return allIndex;
}
/**
* Read in the index and file info.
* @return A map of fileinfo data.
* See {@link Writer#appendFileInfo(byte[], byte[])}.
* @throws IOException
*/
public Map<byte [], byte []> loadFileInfo()
throws IOException {
this.trailer = readTrailer();
// Read in the fileinfo and get what we need from it.
this.istream.seek(this.trailer.fileinfoOffset);
FileInfo fi = new FileInfo();
fi.readFields(this.istream);
this.lastkey = fi.get(FileInfo.LASTKEY);
this.avgKeyLen = Bytes.toInt(fi.get(FileInfo.AVG_KEY_LEN));
this.avgValueLen = Bytes.toInt(fi.get(FileInfo.AVG_VALUE_LEN));
String clazzName = Bytes.toString(fi.get(FileInfo.COMPARATOR));
this.comparator = getComparator(clazzName);
int allIndexSize = (int)(this.fileSize - this.trailer.dataIndexOffset - FixedFileTrailer.trailerSize());
byte[] dataAndMetaIndex = readAllIndex(this.istream, this.trailer.dataIndexOffset, allIndexSize);
ByteArrayInputStream bis = new ByteArrayInputStream(dataAndMetaIndex);
DataInputStream dis = new DataInputStream(bis);
// Read in the data index.
this.blockIndex =
BlockIndex.readIndex(this.comparator, dis, this.trailer.dataIndexCount);
// Read in the metadata index.
if (trailer.metaIndexCount > 0) {
this.metaIndex = BlockIndex.readIndex(Bytes.BYTES_RAWCOMPARATOR, dis,
this.trailer.metaIndexCount);
}
this.fileInfoLoaded = true;
if (null != dis) {
dis.close();
}
return fi;
}
boolean isFileInfoLoaded() {
return this.fileInfoLoaded;
}
@SuppressWarnings("unchecked")
private RawComparator<byte []> getComparator(final String clazzName)
throws IOException {
if (clazzName == null || clazzName.length() == 0) {
return null;
}
try {
return (RawComparator<byte []>)Class.forName(clazzName).newInstance();
} catch (InstantiationException e) {
throw new IOException(e);
} catch (IllegalAccessException e) {
throw new IOException(e);
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
/* Read the trailer off the input stream. As side effect, sets the
* compression algorithm.
* @return Populated FixedFileTrailer.
* @throws IOException
*/
private FixedFileTrailer readTrailer() throws IOException {
FixedFileTrailer fft = new FixedFileTrailer();
long seekPoint = this.fileSize - FixedFileTrailer.trailerSize();
this.istream.seek(seekPoint);
fft.deserialize(this.istream);
// Set up the codec.
this.compressAlgo =
Compression.Algorithm.values()[fft.compressionCodec];
CompressionTest.testCompression(this.compressAlgo);
return fft;
}
/**
* Create a Scanner on this file. No seeks or reads are done on creation.
* Call {@link HFileScanner#seekTo(byte[])} to position an start the read.
* There is nothing to clean up in a Scanner. Letting go of your references
* to the scanner is sufficient.
* @param pread Use positional read rather than seek+read if true (pread is
* better for random reads, seek+read is better scanning).
* @param cacheBlocks True if we should cache blocks read in by this scanner.
* @return Scanner on this file.
*/
public HFileScanner getScanner(boolean cacheBlocks, final boolean pread) {
return new Scanner(this, cacheBlocks, pread);
}
/**
* @param key Key to search.
* @return Block number of the block containing the key or -1 if not in this
* file.
*/
protected int blockContainingKey(final byte [] key, int offset, int length) {
if (blockIndex == null) {
throw new RuntimeException("Block index not loaded");
}
return blockIndex.blockContainingKey(key, offset, length);
}
/**
* @param metaBlockName
* @param cacheBlock Add block to cache, if found
* @return Block wrapped in a ByteBuffer
* @throws IOException
*/
public ByteBuffer getMetaBlock(String metaBlockName, boolean cacheBlock)
throws IOException {
if (trailer.metaIndexCount == 0) {
return null; // there are no meta blocks
}
if (metaIndex == null) {
throw new IOException("Meta index not loaded");
}
byte [] mbname = Bytes.toBytes(metaBlockName);
int block = metaIndex.blockContainingKey(mbname, 0, mbname.length);
if (block == -1)
return null;
long blockSize;
if (block == metaIndex.count - 1) {
blockSize = trailer.fileinfoOffset - metaIndex.blockOffsets[block];
} else {
blockSize = metaIndex.blockOffsets[block+1] - metaIndex.blockOffsets[block];
}
long now = System.currentTimeMillis();
// Per meta key from any given file, synchronize reads for said block
synchronized (metaIndex.blockKeys[block]) {
metaLoads++;
// Check cache for block. If found return.
if (cache != null) {
ByteBuffer cachedBuf = cache.getBlock(name + "meta" + block,
cacheBlock);
if (cachedBuf != null) {
// Return a distinct 'shallow copy' of the block,
// so pos doesnt get messed by the scanner
cacheHits++;
return cachedBuf.duplicate();
}
// Cache Miss, please load.
}
ByteBuffer buf = decompress(metaIndex.blockOffsets[block],
longToInt(blockSize), metaIndex.blockDataSizes[block], true);
byte [] magic = new byte[METABLOCKMAGIC.length];
buf.get(magic, 0, magic.length);
if (! Arrays.equals(magic, METABLOCKMAGIC)) {
throw new IOException("Meta magic is bad in block " + block);
}
// Create a new ByteBuffer 'shallow copy' to hide the magic header
buf = buf.slice();
readTime += System.currentTimeMillis() - now;
readOps++;
// Cache the block
if(cacheBlock && cache != null) {
cache.cacheBlock(name + "meta" + block, buf.duplicate(), inMemory);
}
return buf;
}
}
/**
* Read in a file block.
* @param block Index of block to read.
* @param pread Use positional read instead of seek+read (positional is
* better doing random reads whereas seek+read is better scanning).
* @return Block wrapped in a ByteBuffer.
* @throws IOException
*/
ByteBuffer readBlock(int block, boolean cacheBlock, final boolean pread)
throws IOException {
if (blockIndex == null) {
throw new IOException("Block index not loaded");
}
if (block < 0 || block >= blockIndex.count) {
throw new IOException("Requested block is out of range: " + block +
", max: " + blockIndex.count);
}
// For any given block from any given file, synchronize reads for said
// block.
// Without a cache, this synchronizing is needless overhead, but really
// the other choice is to duplicate work (which the cache would prevent you from doing).
synchronized (blockIndex.blockKeys[block]) {
blockLoads++;
// Check cache for block. If found return.
if (cache != null) {
ByteBuffer cachedBuf = cache.getBlock(name + block, cacheBlock);
if (cachedBuf != null) {
// Return a distinct 'shallow copy' of the block,
// so pos doesnt get messed by the scanner
cacheHits++;
return cachedBuf.duplicate();
}
// Carry on, please load.
}
// Load block from filesystem.
long now = System.currentTimeMillis();
long onDiskBlockSize;
if (block == blockIndex.count - 1) {
// last block! The end of data block is first meta block if there is
// one or if there isn't, the fileinfo offset.
long offset = this.metaIndex != null?
this.metaIndex.blockOffsets[0]: this.trailer.fileinfoOffset;
onDiskBlockSize = offset - blockIndex.blockOffsets[block];
} else {
onDiskBlockSize = blockIndex.blockOffsets[block+1] -
blockIndex.blockOffsets[block];
}
ByteBuffer buf = decompress(blockIndex.blockOffsets[block],
longToInt(onDiskBlockSize), this.blockIndex.blockDataSizes[block],
pread);
byte [] magic = new byte[DATABLOCKMAGIC.length];
buf.get(magic, 0, magic.length);
if (!Arrays.equals(magic, DATABLOCKMAGIC)) {
throw new IOException("Data magic is bad in block " + block);
}
// 'shallow copy' to hide the header
// NOTE: you WILL GET BIT if you call buf.array() but don't start
// reading at buf.arrayOffset()
buf = buf.slice();
readTime += System.currentTimeMillis() - now;
readOps++;
// Cache the block
if(cacheBlock && cache != null) {
cache.cacheBlock(name + block, buf.duplicate(), inMemory);
}
return buf;
}
}
/*
* Decompress <code>compressedSize</code> bytes off the backing
* FSDataInputStream.
* @param offset
* @param compressedSize
* @param decompressedSize
*
* @return
* @throws IOException
*/
private ByteBuffer decompress(final long offset, final int compressedSize,
final int decompressedSize, final boolean pread)
throws IOException {
Decompressor decompressor = null;
ByteBuffer buf = null;
try {
decompressor = this.compressAlgo.getDecompressor();
// My guess is that the bounded range fis is needed to stop the
// decompressor reading into next block -- IIRC, it just grabs a
// bunch of data w/o regard to whether decompressor is coming to end of a
// decompression.
// We use a buffer of DEFAULT_BLOCKSIZE size. This might be extreme.
// Could maybe do with less. Study and figure it: TODO
InputStream is = this.compressAlgo.createDecompressionStream(
new BufferedInputStream(
new BoundedRangeFileInputStream(this.istream, offset, compressedSize,
pread),
Math.min(DEFAULT_BLOCKSIZE, compressedSize)),
decompressor, 0);
buf = ByteBuffer.allocate(decompressedSize);
IOUtils.readFully(is, buf.array(), 0, buf.capacity());
is.close();
} finally {
if (null != decompressor) {
this.compressAlgo.returnDecompressor(decompressor);
}
}
return buf;
}
/**
* @return First key in the file. May be null if file has no entries.
* Note that this is not the first rowkey, but rather the byte form of
* the first KeyValue.
*/
public byte [] getFirstKey() {
if (blockIndex == null) {
throw new RuntimeException("Block index not loaded");
}
return this.blockIndex.isEmpty()? null: this.blockIndex.blockKeys[0];
}
/**
* @return the first row key, or null if the file is empty.
* TODO move this to StoreFile after Ryan's patch goes in
* to eliminate KeyValue here
*/
public byte[] getFirstRowKey() {
byte[] firstKey = getFirstKey();
if (firstKey == null) return null;
return KeyValue.createKeyValueFromKey(firstKey).getRow();
}
/**
* @return number of KV entries in this HFile
*/
public int getEntries() {
if (!this.isFileInfoLoaded()) {
throw new RuntimeException("File info not loaded");
}
return this.trailer.entryCount;
}
/**
* @return Last key in the file. May be null if file has no entries.
* Note that this is not the last rowkey, but rather the byte form of
* the last KeyValue.
*/
public byte [] getLastKey() {
if (!isFileInfoLoaded()) {
throw new RuntimeException("Load file info first");
}
return this.blockIndex.isEmpty()? null: this.lastkey;
}
/**
* @return the last row key, or null if the file is empty.
* TODO move this to StoreFile after Ryan's patch goes in
* to eliminate KeyValue here
*/
public byte[] getLastRowKey() {
byte[] lastKey = getLastKey();
if (lastKey == null) return null;
return KeyValue.createKeyValueFromKey(lastKey).getRow();
}
/**
* @return number of K entries in this HFile's filter. Returns KV count if no filter.
*/
public int getFilterEntries() {
return getEntries();
}
/**
* @return Comparator.
*/
public RawComparator<byte []> getComparator() {
return this.comparator;
}
/**
* @return index size
*/
public long indexSize() {
return (this.blockIndex != null? this.blockIndex.heapSize(): 0) +
((this.metaIndex != null)? this.metaIndex.heapSize(): 0);
}
/**
* @return Midkey for this file. We work with block boundaries only so
* returned midkey is an approximation only.
* @throws IOException
*/
public byte [] midkey() throws IOException {
if (!isFileInfoLoaded() || this.blockIndex.isEmpty()) {
return null;
}
return this.blockIndex.midkey();
}
public void close() throws IOException {
if (this.closeIStream && this.istream != null) {
this.istream.close();
this.istream = null;
}
}
public String getName() {
return name;
}
/*
* Implementation of {@link HFileScanner} interface.
*/
protected static class Scanner implements HFileScanner {
private final Reader reader;
private ByteBuffer block;
private int currBlock;
private final boolean cacheBlocks;
private final boolean pread;
private int currKeyLen = 0;
private int currValueLen = 0;
public int blockFetches = 0;
public Scanner(Reader r, boolean cacheBlocks, final boolean pread) {
this.reader = r;
this.cacheBlocks = cacheBlocks;
this.pread = pread;
}
public KeyValue getKeyValue() {
if(this.block == null) {
return null;
}
return new KeyValue(this.block.array(),
this.block.arrayOffset() + this.block.position() - 8,
this.currKeyLen+this.currValueLen+8);
}
public ByteBuffer getKey() {
if (this.block == null || this.currKeyLen == 0) {
throw new RuntimeException("you need to seekTo() before calling getKey()");
}
ByteBuffer keyBuff = this.block.slice();
keyBuff.limit(this.currKeyLen);
keyBuff.rewind();
// Do keyBuff.asReadOnly()?
return keyBuff;
}
public ByteBuffer getValue() {
if (block == null || currKeyLen == 0) {
throw new RuntimeException("you need to seekTo() before calling getValue()");
}
// TODO: Could this be done with one ByteBuffer rather than create two?
ByteBuffer valueBuff = this.block.slice();
valueBuff.position(this.currKeyLen);
valueBuff = valueBuff.slice();
valueBuff.limit(currValueLen);
valueBuff.rewind();
return valueBuff;
}
public boolean next() throws IOException {
// LOG.deug("rem:" + block.remaining() + " p:" + block.position() +
// " kl: " + currKeyLen + " kv: " + currValueLen);
if (block == null) {
throw new IOException("Next called on non-seeked scanner");
}
block.position(block.position() + currKeyLen + currValueLen);
if (block.remaining() <= 0) {
// LOG.debug("Fetch next block");
currBlock++;
if (currBlock >= reader.blockIndex.count) {
// damn we are at the end
currBlock = 0;
block = null;
return false;
}
block = reader.readBlock(this.currBlock, this.cacheBlocks, this.pread);
currKeyLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position(), 4);
currValueLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position()+4, 4);
block.position(block.position()+8);
blockFetches++;
return true;
}
// LOG.debug("rem:" + block.remaining() + " p:" + block.position() +
// " kl: " + currKeyLen + " kv: " + currValueLen);
currKeyLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position(), 4);
currValueLen = Bytes.toInt(block.array(), block.arrayOffset()+block.position()+4, 4);
block.position(block.position()+8);
return true;
}
public int seekTo(byte [] key) throws IOException {
return seekTo(key, 0, key.length);
}
public int seekTo(byte[] key, int offset, int length) throws IOException {
int b = reader.blockContainingKey(key, offset, length);
if (b < 0) return -1; // falls before the beginning of the file! :-(
// Avoid re-reading the same block (that'd be dumb).
loadBlock(b, true);
return blockSeek(key, offset, length, false);
}
public int reseekTo(byte [] key) throws IOException {
return reseekTo(key, 0, key.length);
}
public int reseekTo(byte[] key, int offset, int length)
throws IOException {
if (this.block != null && this.currKeyLen != 0) {
ByteBuffer bb = getKey();
int compared = this.reader.comparator.compare(key, offset, length,
bb.array(), bb.arrayOffset(), bb.limit());
if (compared < 1) {
//If the required key is less than or equal to current key, then
//don't do anything.
return compared;
}
}
int b = reader.blockContainingKey(key, offset, length);
if (b < 0) {
return -1;
}
loadBlock(b, false);
return blockSeek(key, offset, length, false);
}
/**
* Within a loaded block, seek looking for the first key
* that is smaller than (or equal to?) the key we are interested in.
*
* A note on the seekBefore - if you have seekBefore = true, AND the
* first key in the block = key, then you'll get thrown exceptions.
* @param key to find
* @param seekBefore find the key before the exact match.
* @return
*/
private int blockSeek(byte[] key, int offset, int length, boolean seekBefore) {
int klen, vlen;
int lastLen = 0;
do {
klen = block.getInt();
vlen = block.getInt();
int comp = this.reader.comparator.compare(key, offset, length,
block.array(), block.arrayOffset() + block.position(), klen);
if (comp == 0) {
if (seekBefore) {
block.position(block.position() - lastLen - 16);
currKeyLen = block.getInt();
currValueLen = block.getInt();
return 1; // non exact match.
}
currKeyLen = klen;
currValueLen = vlen;
return 0; // indicate exact match
}
if (comp < 0) {
// go back one key:
block.position(block.position() - lastLen - 16);
currKeyLen = block.getInt();
currValueLen = block.getInt();
return 1;
}
block.position(block.position() + klen + vlen);
lastLen = klen + vlen ;
} while(block.remaining() > 0);
// ok we are at the end, so go back a littleeeeee....
// The 8 in the below is intentionally different to the 16s in the above
// Do the math you you'll figure it.
block.position(block.position() - lastLen - 8);
currKeyLen = block.getInt();
currValueLen = block.getInt();
return 1; // didn't exactly find it.
}
public boolean seekBefore(byte [] key) throws IOException {
return seekBefore(key, 0, key.length);
}
public boolean seekBefore(byte[] key, int offset, int length)
throws IOException {
int b = reader.blockContainingKey(key, offset, length);
if (b < 0)
return false; // key is before the start of the file.
// Question: does this block begin with 'key'?
if (this.reader.comparator.compare(reader.blockIndex.blockKeys[b],
0, reader.blockIndex.blockKeys[b].length,
key, offset, length) == 0) {
// Ok the key we're interested in is the first of the block, so go back one.
if (b == 0) {
// we have a 'problem', the key we want is the first of the file.
return false;
}
b--;
// TODO shortcut: seek forward in this block to the last key of the block.
}
loadBlock(b, true);
blockSeek(key, offset, length, true);
return true;
}
public String getKeyString() {
return Bytes.toStringBinary(block.array(), block.arrayOffset() +
block.position(), currKeyLen);
}
public String getValueString() {
return Bytes.toString(block.array(), block.arrayOffset() +
block.position() + currKeyLen, currValueLen);
}
public Reader getReader() {
return this.reader;
}
public boolean isSeeked(){
return this.block != null;
}
public boolean seekTo() throws IOException {
if (this.reader.blockIndex.isEmpty()) {
return false;
}
if (block != null && currBlock == 0) {
block.rewind();
currKeyLen = block.getInt();
currValueLen = block.getInt();
return true;
}
currBlock = 0;
block = reader.readBlock(this.currBlock, this.cacheBlocks, this.pread);
currKeyLen = block.getInt();
currValueLen = block.getInt();
blockFetches++;
return true;
}
private void loadBlock(int bloc, boolean rewind) throws IOException {
if (block == null) {
block = reader.readBlock(bloc, this.cacheBlocks, this.pread);
currBlock = bloc;
blockFetches++;
} else {
if (bloc != currBlock) {
block = reader.readBlock(bloc, this.cacheBlocks, this.pread);
currBlock = bloc;
blockFetches++;
} else {
// we are already in the same block, just rewind to seek again.
if (rewind) {
block.rewind();
}
else {
//Go back by (size of rowlength + size of valuelength) = 8 bytes
block.position(block.position()-8);
}
}
}
}
@Override
public String toString() {
return "HFileScanner for reader " + String.valueOf(reader);
}
}
public String getTrailerInfo() {
return trailer.toString();
}
}
/*
* The RFile has a fixed trailer which contains offsets to other variable
* parts of the file. Also includes basic metadata on this file.
*/
private static class FixedFileTrailer {
// Offset to the fileinfo data, a small block of vitals..
long fileinfoOffset;
// Offset to the data block index.
long dataIndexOffset;
// How many index counts are there (aka: block count)
int dataIndexCount;
// Offset to the meta block index.
long metaIndexOffset;
// How many meta block index entries (aka: meta block count)
int metaIndexCount;
long totalUncompressedBytes;
int entryCount;
int compressionCodec;
int version = 1;
FixedFileTrailer() {
super();
}
static int trailerSize() {
// Keep this up to date...
return
( Bytes.SIZEOF_INT * 5 ) +
( Bytes.SIZEOF_LONG * 4 ) +
TRAILERBLOCKMAGIC.length;
}
void serialize(DataOutputStream outputStream) throws IOException {
outputStream.write(TRAILERBLOCKMAGIC);
outputStream.writeLong(fileinfoOffset);
outputStream.writeLong(dataIndexOffset);
outputStream.writeInt(dataIndexCount);
outputStream.writeLong(metaIndexOffset);
outputStream.writeInt(metaIndexCount);
outputStream.writeLong(totalUncompressedBytes);
outputStream.writeInt(entryCount);
outputStream.writeInt(compressionCodec);
outputStream.writeInt(version);
}
void deserialize(DataInputStream inputStream) throws IOException {
byte [] header = new byte[TRAILERBLOCKMAGIC.length];
inputStream.readFully(header);
if ( !Arrays.equals(header, TRAILERBLOCKMAGIC)) {
throw new IOException("Trailer 'header' is wrong; does the trailer " +
"size match content?");
}
fileinfoOffset = inputStream.readLong();
dataIndexOffset = inputStream.readLong();
dataIndexCount = inputStream.readInt();
metaIndexOffset = inputStream.readLong();
metaIndexCount = inputStream.readInt();
totalUncompressedBytes = inputStream.readLong();
entryCount = inputStream.readInt();
compressionCodec = inputStream.readInt();
version = inputStream.readInt();
if (version != 1) {
throw new IOException("Wrong version: " + version);
}
}
@Override
public String toString() {
return "fileinfoOffset=" + fileinfoOffset +
", dataIndexOffset=" + dataIndexOffset +
", dataIndexCount=" + dataIndexCount +
", metaIndexOffset=" + metaIndexOffset +
", metaIndexCount=" + metaIndexCount +
", totalBytes=" + totalUncompressedBytes +
", entryCount=" + entryCount +
", version=" + version;
}
}
/*
* The block index for a RFile.
* Used reading.
*/
static class BlockIndex implements HeapSize {
// How many actual items are there? The next insert location too.
int count = 0;
byte [][] blockKeys;
long [] blockOffsets;
int [] blockDataSizes;
int size = 0;
/* Needed doing lookup on blocks.
*/
final RawComparator<byte []> comparator;
/*
* Shutdown default constructor
*/
@SuppressWarnings("unused")
private BlockIndex() {
this(null);
}
/**
* @param c comparator used to compare keys.
*/
BlockIndex(final RawComparator<byte []>c) {
this.comparator = c;
// Guess that cost of three arrays + this object is 4 * 8 bytes.
this.size += (4 * 8);
}
/**
* @return True if block index is empty.
*/
boolean isEmpty() {
return this.blockKeys.length <= 0;
}
/**
* Adds a new entry in the block index.
*
* @param key Last key in the block
* @param offset file offset where the block is stored
* @param dataSize the uncompressed data size
*/
void add(final byte[] key, final long offset, final int dataSize) {
blockOffsets[count] = offset;
blockKeys[count] = key;
blockDataSizes[count] = dataSize;
count++;
this.size += (Bytes.SIZEOF_INT * 2 + key.length);
}
/**
* @param key Key to find
* @return Offset of block containing <code>key</code> or -1 if this file
* does not contain the request.
*/
int blockContainingKey(final byte[] key, int offset, int length) {
int pos = Bytes.binarySearch(blockKeys, key, offset, length, this.comparator);
if (pos < 0) {
pos ++;
pos *= -1;
if (pos == 0) {
// falls before the beginning of the file.
return -1;
}
// When switched to "first key in block" index, binarySearch now returns
// the block with a firstKey < key. This means the value we want is potentially
// in the next block.
pos --; // in previous block.
return pos;
}
// wow, a perfect hit, how unlikely?
return pos;
}
/*
* @return File midkey. Inexact. Operates on block boundaries. Does
* not go into blocks.
*/
byte [] midkey() throws IOException {
int pos = ((this.count - 1)/2); // middle of the index
if (pos < 0) {
throw new IOException("HFile empty");
}
return this.blockKeys[pos];
}
/*
* Write out index. Whatever we write here must jibe with what
* BlockIndex#readIndex is expecting. Make sure the two ends of the
* index serialization match.
* @param o
* @param keys
* @param offsets
* @param sizes
* @param c
* @return Position at which we entered the index.
* @throws IOException
*/
static long writeIndex(final FSDataOutputStream o,
final List<byte []> keys, final List<Long> offsets,
final List<Integer> sizes)
throws IOException {
long pos = o.getPos();
// Don't write an index if nothing in the index.
if (keys.size() > 0) {
o.write(INDEXBLOCKMAGIC);
// Write the index.
for (int i = 0; i < keys.size(); ++i) {
o.writeLong(offsets.get(i).longValue());
o.writeInt(sizes.get(i).intValue());
byte [] key = keys.get(i);
Bytes.writeByteArray(o, key);
}
}
return pos;
}
/*
* Read in the index that is at <code>indexOffset</code>
* Must match what was written by writeIndex in the Writer.close.
* @param c Comparator to use.
* @param in
* @param indexSize
* @throws IOException
*/
static BlockIndex readIndex(final RawComparator<byte []> c,
DataInputStream in, final int indexSize)
throws IOException {
BlockIndex bi = new BlockIndex(c);
bi.blockOffsets = new long[indexSize];
bi.blockKeys = new byte[indexSize][];
bi.blockDataSizes = new int[indexSize];
// If index size is zero, no index was written.
if (indexSize > 0) {
byte [] magic = new byte[INDEXBLOCKMAGIC.length];
in.readFully(magic);
if (!Arrays.equals(magic, INDEXBLOCKMAGIC)) {
throw new IOException("Index block magic is wrong: " +
Arrays.toString(magic));
}
for (int i = 0; i < indexSize; ++i ) {
long offset = in.readLong();
int dataSize = in.readInt();
byte [] key = Bytes.readByteArray(in);
bi.add(key, offset, dataSize);
}
}
return bi;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("size=" + count);
for (int i = 0; i < count ; i++) {
sb.append(", ");
sb.append("key=").append(Bytes.toStringBinary(blockKeys[i])).
append(", offset=").append(blockOffsets[i]).
append(", dataSize=" + blockDataSizes[i]);
}
return sb.toString();
}
public long heapSize() {
long heapsize = ClassSize.align(ClassSize.OBJECT +
2 * Bytes.SIZEOF_INT + (3 + 1) * ClassSize.REFERENCE);
//Calculating the size of blockKeys
if(blockKeys != null) {
//Adding array + references overhead
heapsize += ClassSize.align(ClassSize.ARRAY +
blockKeys.length * ClassSize.REFERENCE);
//Adding bytes
for(byte [] bs : blockKeys) {
heapsize += ClassSize.align(ClassSize.ARRAY + bs.length);
}
}
if(blockOffsets != null) {
heapsize += ClassSize.align(ClassSize.ARRAY +
blockOffsets.length * Bytes.SIZEOF_LONG);
}
if(blockDataSizes != null) {
heapsize += ClassSize.align(ClassSize.ARRAY +
blockDataSizes.length * Bytes.SIZEOF_INT);
}
return ClassSize.align(heapsize);
}
}
/*
* Metadata for this file. Conjured by the writer. Read in by the reader.
*/
static class FileInfo extends HbaseMapWritable<byte [], byte []> {
static final String RESERVED_PREFIX = "hfile.";
static final byte[] RESERVED_PREFIX_BYTES = Bytes.toBytes(RESERVED_PREFIX);
static final byte [] LASTKEY = Bytes.toBytes(RESERVED_PREFIX + "LASTKEY");
static final byte [] AVG_KEY_LEN =
Bytes.toBytes(RESERVED_PREFIX + "AVG_KEY_LEN");
static final byte [] AVG_VALUE_LEN =
Bytes.toBytes(RESERVED_PREFIX + "AVG_VALUE_LEN");
static final byte [] COMPARATOR =
Bytes.toBytes(RESERVED_PREFIX + "COMPARATOR");
/*
* Constructor.
*/
FileInfo() {
super();
}
}
/**
* Return true if the given file info key is reserved for internal
* use by HFile.
*/
public static boolean isReservedFileInfoKey(byte[] key) {
return Bytes.startsWith(key, FileInfo.RESERVED_PREFIX_BYTES);
}
/**
* Get names of supported compression algorithms. The names are acceptable by
* HFile.Writer.
*
* @return Array of strings, each represents a supported compression
* algorithm. Currently, the following compression algorithms are
* supported.
* <ul>
* <li>"none" - No compression.
* <li>"gz" - GZIP compression.
* </ul>
*/
public static String[] getSupportedCompressionAlgorithms() {
return Compression.getSupportedAlgorithms();
}
// Utility methods.
/*
* @param l Long to convert to an int.
* @return <code>l</code> cast as an int.
*/
static int longToInt(final long l) {
// Expecting the size() of a block not exceeding 4GB. Assuming the
// size() will wrap to negative integer if it exceeds 2GB (From tfile).
return (int)(l & 0x00000000ffffffffL);
}
/**
* Returns all files belonging to the given region directory. Could return an
* empty list.
*
* @param fs The file system reference.
* @param regionDir The region directory to scan.
* @return The list of files found.
* @throws IOException When scanning the files fails.
*/
static List<Path> getStoreFiles(FileSystem fs, Path regionDir)
throws IOException {
List<Path> res = new ArrayList<Path>();
PathFilter dirFilter = new FSUtils.DirFilter(fs);
FileStatus[] familyDirs = fs.listStatus(regionDir, dirFilter);
for(FileStatus dir : familyDirs) {
FileStatus[] files = fs.listStatus(dir.getPath());
for (FileStatus file : files) {
if (!file.isDir()) {
res.add(file.getPath());
}
}
}
return res;
}
public static void main(String []args) throws IOException {
try {
// create options
Options options = new Options();
options.addOption("v", "verbose", false, "Verbose output; emits file and meta data delimiters");
options.addOption("p", "printkv", false, "Print key/value pairs");
options.addOption("m", "printmeta", false, "Print meta data of file");
options.addOption("k", "checkrow", false,
"Enable row order check; looks for out-of-order keys");
options.addOption("a", "checkfamily", false, "Enable family check");
options.addOption("f", "file", true,
"File to scan. Pass full-path; e.g. hdfs://a:9000/hbase/.META./12/34");
options.addOption("r", "region", true,
"Region to scan. Pass region name; e.g. '.META.,,1'");
if (args.length == 0) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("HFile ", options, true);
System.exit(-1);
}
CommandLineParser parser = new PosixParser();
CommandLine cmd = parser.parse(options, args);
boolean verbose = cmd.hasOption("v");
boolean printKeyValue = cmd.hasOption("p");
boolean printMeta = cmd.hasOption("m");
boolean checkRow = cmd.hasOption("k");
boolean checkFamily = cmd.hasOption("a");
// get configuration, file system and get list of files
Configuration conf = HBaseConfiguration.create();
conf.set("fs.defaultFS",
conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
conf.set("fs.default.name",
conf.get(org.apache.hadoop.hbase.HConstants.HBASE_DIR));
FileSystem fs = FileSystem.get(conf);
ArrayList<Path> files = new ArrayList<Path>();
if (cmd.hasOption("f")) {
files.add(new Path(cmd.getOptionValue("f")));
}
if (cmd.hasOption("r")) {
String regionName = cmd.getOptionValue("r");
byte[] rn = Bytes.toBytes(regionName);
byte[][] hri = HRegionInfo.parseRegionName(rn);
Path rootDir = FSUtils.getRootDir(conf);
Path tableDir = new Path(rootDir, Bytes.toString(hri[0]));
String enc = HRegionInfo.encodeRegionName(rn);
Path regionDir = new Path(tableDir, enc);
if (verbose) System.out.println("region dir -> " + regionDir);
List<Path> regionFiles = getStoreFiles(fs, regionDir);
if (verbose) System.out.println("Number of region files found -> " +
regionFiles.size());
if (verbose) {
int i = 1;
for (Path p : regionFiles) {
if (verbose) System.out.println("Found file[" + i++ + "] -> " + p);
}
}
files.addAll(regionFiles);
}
// iterate over all files found
for (Path file : files) {
if (verbose) System.out.println("Scanning -> " + file);
if (!fs.exists(file)) {
System.err.println("ERROR, file doesnt exist: " + file);
continue;
}
// create reader and load file info
HFile.Reader reader = new HFile.Reader(fs, file, null, false);
Map<byte[],byte[]> fileInfo = reader.loadFileInfo();
// scan over file and read key/value's and check if requested
HFileScanner scanner = reader.getScanner(false, false);
scanner.seekTo();
KeyValue pkv = null;
int count = 0;
do {
KeyValue kv = scanner.getKeyValue();
// dump key value
if (printKeyValue) {
System.out.println("K: " + kv +
" V: " + Bytes.toStringBinary(kv.getValue()));
}
// check if rows are in order
if (checkRow && pkv != null) {
if (Bytes.compareTo(pkv.getRow(), kv.getRow()) > 0) {
System.err.println("WARNING, previous row is greater then" +
" current row\n\tfilename -> " + file +
"\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey()) +
"\n\tcurrent -> " + Bytes.toStringBinary(kv.getKey()));
}
}
// check if families are consistent
if (checkFamily) {
String fam = Bytes.toString(kv.getFamily());
if (!file.toString().contains(fam)) {
System.err.println("WARNING, filename does not match kv family," +
"\n\tfilename -> " + file +
"\n\tkeyvalue -> " + Bytes.toStringBinary(kv.getKey()));
}
if (pkv != null && Bytes.compareTo(pkv.getFamily(), kv.getFamily()) != 0) {
System.err.println("WARNING, previous kv has different family" +
" compared to current key\n\tfilename -> " + file +
"\n\tprevious -> " + Bytes.toStringBinary(pkv.getKey()) +
"\n\tcurrent -> " + Bytes.toStringBinary(kv.getKey()));
}
}
pkv = kv;
count++;
} while (scanner.next());
if (verbose || printKeyValue) {
System.out.println("Scanned kv count -> " + count);
}
// print meta data
if (printMeta) {
System.out.println("Block index size as per heapsize: " + reader.indexSize());
System.out.println(reader.toString());
System.out.println(reader.getTrailerInfo());
System.out.println("Fileinfo:");
for (Map.Entry<byte[], byte[]> e : fileInfo.entrySet()) {
System.out.print(Bytes.toString(e.getKey()) + " = " );
if (Bytes.compareTo(e.getKey(), Bytes.toBytes("MAX_SEQ_ID_KEY"))==0) {
long seqid = Bytes.toLong(e.getValue());
System.out.println(seqid);
} else if (Bytes.compareTo(e.getKey(),
Bytes.toBytes("TIMERANGE")) == 0) {
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
Writables.copyWritable(e.getValue(), timeRangeTracker);
System.out.println(timeRangeTracker.getMinimumTimestamp() +
"...." + timeRangeTracker.getMaximumTimestamp());
} else if (Bytes.compareTo(e.getKey(), FileInfo.AVG_KEY_LEN) == 0 ||
Bytes.compareTo(e.getKey(), FileInfo.AVG_VALUE_LEN) == 0) {
System.out.println(Bytes.toInt(e.getValue()));
} else {
System.out.println(Bytes.toStringBinary(e.getValue()));
}
}
//Printing bloom information
ByteBuffer b = reader.getMetaBlock("BLOOM_FILTER_META", false);
if (b!= null) {
BloomFilter bloomFilter = new ByteBloomFilter(b);
System.out.println("BloomSize: " + bloomFilter.getByteSize());
System.out.println("No of Keys in bloom: " +
bloomFilter.getKeyCount());
System.out.println("Max Keys for bloom: " +
bloomFilter.getMaxKeys());
} else {
System.out.println("Could not get bloom data from meta block");
}
}
reader.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}