| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with this |
| * work for additional information regarding copyright ownership. The ASF |
| * licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.hadoop.io.file.tfile; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.Closeable; |
| import java.io.DataInput; |
| import java.io.DataInputStream; |
| import java.io.DataOutput; |
| import java.io.DataOutputStream; |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.util.ArrayList; |
| import java.util.Comparator; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.io.BoundedByteArrayOutputStream; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.io.DataInputBuffer; |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.RawComparator; |
| import org.apache.hadoop.io.WritableComparator; |
| import org.apache.hadoop.io.file.tfile.BCFile.Reader.BlockReader; |
| import org.apache.hadoop.io.file.tfile.BCFile.Writer.BlockAppender; |
| import org.apache.hadoop.io.file.tfile.Chunk.ChunkDecoder; |
| import org.apache.hadoop.io.file.tfile.Chunk.ChunkEncoder; |
| import org.apache.hadoop.io.file.tfile.CompareUtils.BytesComparator; |
| import org.apache.hadoop.io.file.tfile.CompareUtils.MemcmpRawComparator; |
| import org.apache.hadoop.io.file.tfile.Utils.Version; |
| import org.apache.hadoop.io.serializer.JavaSerializationComparator; |
| |
| /** |
| * A TFile is a container of key-value pairs. Both keys and values are type-less |
| * bytes. Keys are restricted to 64KB, value length is not restricted |
| * (practically limited to the available disk storage). TFile further provides |
| * the following features: |
| * <ul> |
| * <li>Block Compression. |
| * <li>Named meta data blocks. |
| * <li>Sorted or unsorted keys. |
| * <li>Seek by key or by file offset. |
| * </ul> |
| * The memory footprint of a TFile includes the following: |
| * <ul> |
| * <li>Some constant overhead of reading or writing a compressed block. |
| * <ul> |
| * <li>Each compressed block requires one compression/decompression codec for |
| * I/O. |
| * <li>Temporary space to buffer the key. |
| * <li>Temporary space to buffer the value (for TFile.Writer only). Values are |
| * chunk encoded, so that we buffer at most one chunk of user data. By default, |
| * the chunk buffer is 1MB. Reading chunked value does not require additional |
| * memory. |
| * </ul> |
| * <li>TFile index, which is proportional to the total number of Data Blocks. |
| * The total amount of memory needed to hold the index can be estimated as |
| * (56+AvgKeySize)*NumBlocks. |
| * <li>MetaBlock index, which is proportional to the total number of Meta |
| * Blocks.The total amount of memory needed to hold the index for Meta Blocks |
| * can be estimated as (40+AvgMetaBlockName)*NumMetaBlock. |
| * </ul> |
| * <p> |
| * The behavior of TFile can be customized by the following variables through |
| * Configuration: |
| * <ul> |
| * <li><b>tfile.io.chunk.size</b>: Value chunk size. Integer (in bytes). Default |
| * to 1MB. Values of the length less than the chunk size is guaranteed to have |
| * known value length in read time (See |
| * {@link TFile.Reader.Scanner.Entry#isValueLengthKnown()}). |
| * <li><b>tfile.fs.output.buffer.size</b>: Buffer size used for |
| * FSDataOutputStream. Integer (in bytes). Default to 256KB. |
| * <li><b>tfile.fs.input.buffer.size</b>: Buffer size used for |
| * FSDataInputStream. Integer (in bytes). Default to 256KB. |
| * </ul> |
| * <p> |
| * Suggestions on performance optimization. |
| * <ul> |
| * <li>Minimum block size. We recommend a setting of minimum block size between |
| * 256KB to 1MB for general usage. Larger block size is preferred if files are |
| * primarily for sequential access. However, it would lead to inefficient random |
| * access (because there are more data to decompress). Smaller blocks are good |
| * for random access, but require more memory to hold the block index, and may |
| * be slower to create (because we must flush the compressor stream at the |
| * conclusion of each data block, which leads to an FS I/O flush). Further, due |
| * to the internal caching in Compression codec, the smallest possible block |
| * size would be around 20KB-30KB. |
| * <li>The current implementation does not offer true multi-threading for |
| * reading. The implementation uses FSDataInputStream seek()+read(), which is |
| * shown to be much faster than positioned-read call in single thread mode. |
| * However, it also means that if multiple threads attempt to access the same |
| * TFile (using multiple scanners) simultaneously, the actual I/O is carried out |
| * sequentially even if they access different DFS blocks. |
| * <li>Compression codec. Use "none" if the data is not very compressable (by |
| * compressable, I mean a compression ratio at least 2:1). Generally, use "lzo" |
| * as the starting point for experimenting. "gz" overs slightly better |
| * compression ratio over "lzo" but requires 4x CPU to compress and 2x CPU to |
| * decompress, comparing to "lzo". |
| * <li>File system buffering, if the underlying FSDataInputStream and |
| * FSDataOutputStream is already adequately buffered; or if applications |
| * reads/writes keys and values in large buffers, we can reduce the sizes of |
| * input/output buffering in TFile layer by setting the configuration parameters |
| * "tfile.fs.input.buffer.size" and "tfile.fs.output.buffer.size". |
| * </ul> |
| * |
| * Some design rationale behind TFile can be found at <a |
| * href=https://issues.apache.org/jira/browse/HADOOP-3315>Hadoop-3315</a>. |
| */ |
| @InterfaceAudience.Public |
| @InterfaceStability.Evolving |
| public class TFile { |
| static final Log LOG = LogFactory.getLog(TFile.class); |
| |
| private static final String CHUNK_BUF_SIZE_ATTR = "tfile.io.chunk.size"; |
| private static final String FS_INPUT_BUF_SIZE_ATTR = |
| "tfile.fs.input.buffer.size"; |
| private static final String FS_OUTPUT_BUF_SIZE_ATTR = |
| "tfile.fs.output.buffer.size"; |
| |
| static int getChunkBufferSize(Configuration conf) { |
| int ret = conf.getInt(CHUNK_BUF_SIZE_ATTR, 1024 * 1024); |
| return (ret > 0) ? ret : 1024 * 1024; |
| } |
| |
| static int getFSInputBufferSize(Configuration conf) { |
| return conf.getInt(FS_INPUT_BUF_SIZE_ATTR, 256 * 1024); |
| } |
| |
| static int getFSOutputBufferSize(Configuration conf) { |
| return conf.getInt(FS_OUTPUT_BUF_SIZE_ATTR, 256 * 1024); |
| } |
| |
| private static final int MAX_KEY_SIZE = 64 * 1024; // 64KB |
| static final Version API_VERSION = new Version((short) 1, (short) 0); |
| |
| /** compression: gzip */ |
| public static final String COMPRESSION_GZ = "gz"; |
| /** compression: lzo */ |
| public static final String COMPRESSION_LZO = "lzo"; |
| /** compression: none */ |
| public static final String COMPRESSION_NONE = "none"; |
| /** comparator: memcmp */ |
| public static final String COMPARATOR_MEMCMP = "memcmp"; |
| /** comparator prefix: java class */ |
| public static final String COMPARATOR_JCLASS = "jclass:"; |
| |
| /** |
| * Make a raw comparator from a string name. |
| * |
| * @param name |
| * Comparator name |
| * @return A RawComparable comparator. |
| */ |
| static public Comparator<RawComparable> makeComparator(String name) { |
| return TFileMeta.makeComparator(name); |
| } |
| |
| // Prevent the instantiation of TFiles |
| private TFile() { |
| // nothing |
| } |
| |
| /** |
| * Get names of supported compression algorithms. The names are acceptable by |
| * TFile.Writer. |
| * |
| * @return Array of strings, each represents a supported compression |
| * algorithm. Currently, the following compression algorithms are |
| * supported. |
| * <ul> |
| * <li>"none" - No compression. |
| * <li>"lzo" - LZO compression. |
| * <li>"gz" - GZIP compression. |
| * </ul> |
| */ |
| public static String[] getSupportedCompressionAlgorithms() { |
| return Compression.getSupportedAlgorithms(); |
| } |
| |
| /** |
| * TFile Writer. |
| */ |
| @InterfaceStability.Evolving |
| public static class Writer implements Closeable { |
| // minimum compressed size for a block. |
| private final int sizeMinBlock; |
| |
| // Meta blocks. |
| final TFileIndex tfileIndex; |
| final TFileMeta tfileMeta; |
| |
| // reference to the underlying BCFile. |
| private BCFile.Writer writerBCF; |
| |
| // current data block appender. |
| BlockAppender blkAppender; |
| long blkRecordCount; |
| |
| // buffers for caching the key. |
| BoundedByteArrayOutputStream currentKeyBufferOS; |
| BoundedByteArrayOutputStream lastKeyBufferOS; |
| |
| // buffer used by chunk codec |
| private byte[] valueBuffer; |
| |
| /** |
| * Writer states. The state always transits in circles: READY -> IN_KEY -> |
| * END_KEY -> IN_VALUE -> READY. |
| */ |
| private enum State { |
| READY, // Ready to start a new key-value pair insertion. |
| IN_KEY, // In the middle of key insertion. |
| END_KEY, // Key insertion complete, ready to insert value. |
| IN_VALUE, // In value insertion. |
| // ERROR, // Error encountered, cannot continue. |
| CLOSED, // TFile already closed. |
| }; |
| |
| // current state of Writer. |
| State state = State.READY; |
| Configuration conf; |
| long errorCount = 0; |
| |
| /** |
| * Constructor |
| * |
| * @param fsdos |
| * output stream for writing. Must be at position 0. |
| * @param minBlockSize |
| * Minimum compressed block size in bytes. A compression block will |
| * not be closed until it reaches this size except for the last |
| * block. |
| * @param compressName |
| * Name of the compression algorithm. Must be one of the strings |
| * returned by {@link TFile#getSupportedCompressionAlgorithms()}. |
| * @param comparator |
| * Leave comparator as null or empty string if TFile is not sorted. |
| * Otherwise, provide the string name for the comparison algorithm |
| * for keys. Two kinds of comparators are supported. |
| * <ul> |
| * <li>Algorithmic comparator: binary comparators that is language |
| * independent. Currently, only "memcmp" is supported. |
| * <li>Language-specific comparator: binary comparators that can |
| * only be constructed in specific language. For Java, the syntax |
| * is "jclass:", followed by the class name of the RawComparator. |
| * Currently, we only support RawComparators that can be |
| * constructed through the default constructor (with no |
| * parameters). Parameterized RawComparators such as |
| * {@link WritableComparator} or |
| * {@link JavaSerializationComparator} may not be directly used. |
| * One should write a wrapper class that inherits from such classes |
| * and use its default constructor to perform proper |
| * initialization. |
| * </ul> |
| * @param conf |
| * The configuration object. |
| * @throws IOException |
| */ |
| public Writer(FSDataOutputStream fsdos, int minBlockSize, |
| String compressName, String comparator, Configuration conf) |
| throws IOException { |
| sizeMinBlock = minBlockSize; |
| tfileMeta = new TFileMeta(comparator); |
| tfileIndex = new TFileIndex(tfileMeta.getComparator()); |
| |
| writerBCF = new BCFile.Writer(fsdos, compressName, conf); |
| currentKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE); |
| lastKeyBufferOS = new BoundedByteArrayOutputStream(MAX_KEY_SIZE); |
| this.conf = conf; |
| } |
| |
| /** |
| * Close the Writer. Resources will be released regardless of the exceptions |
| * being thrown. Future close calls will have no effect. |
| * |
| * The underlying FSDataOutputStream is not closed. |
| */ |
| @Override |
| public void close() throws IOException { |
| if ((state == State.CLOSED)) { |
| return; |
| } |
| try { |
| // First try the normal finish. |
| // Terminate upon the first Exception. |
| if (errorCount == 0) { |
| if (state != State.READY) { |
| throw new IllegalStateException( |
| "Cannot close TFile in the middle of key-value insertion."); |
| } |
| |
| finishDataBlock(true); |
| |
| // first, write out data:TFile.meta |
| BlockAppender outMeta = |
| writerBCF |
| .prepareMetaBlock(TFileMeta.BLOCK_NAME, COMPRESSION_NONE); |
| try { |
| tfileMeta.write(outMeta); |
| } finally { |
| outMeta.close(); |
| } |
| |
| // second, write out data:TFile.index |
| BlockAppender outIndex = |
| writerBCF.prepareMetaBlock(TFileIndex.BLOCK_NAME); |
| try { |
| tfileIndex.write(outIndex); |
| } finally { |
| outIndex.close(); |
| } |
| |
| writerBCF.close(); |
| } |
| } finally { |
| IOUtils.cleanup(LOG, blkAppender, writerBCF); |
| blkAppender = null; |
| writerBCF = null; |
| state = State.CLOSED; |
| } |
| } |
| |
| /** |
| * Adding a new key-value pair to the TFile. This is synonymous to |
| * append(key, 0, key.length, value, 0, value.length) |
| * |
| * @param key |
| * Buffer for key. |
| * @param value |
| * Buffer for value. |
| * @throws IOException |
| */ |
| public void append(byte[] key, byte[] value) throws IOException { |
| append(key, 0, key.length, value, 0, value.length); |
| } |
| |
| /** |
| * Adding a new key-value pair to TFile. |
| * |
| * @param key |
| * buffer for key. |
| * @param koff |
| * offset in key buffer. |
| * @param klen |
| * length of key. |
| * @param value |
| * buffer for value. |
| * @param voff |
| * offset in value buffer. |
| * @param vlen |
| * length of value. |
| * @throws IOException |
| * Upon IO errors. |
| * <p> |
| * If an exception is thrown, the TFile will be in an inconsistent |
| * state. The only legitimate call after that would be close |
| */ |
| public void append(byte[] key, int koff, int klen, byte[] value, int voff, |
| int vlen) throws IOException { |
| if ((koff | klen | (koff + klen) | (key.length - (koff + klen))) < 0) { |
| throw new IndexOutOfBoundsException( |
| "Bad key buffer offset-length combination."); |
| } |
| |
| if ((voff | vlen | (voff + vlen) | (value.length - (voff + vlen))) < 0) { |
| throw new IndexOutOfBoundsException( |
| "Bad value buffer offset-length combination."); |
| } |
| |
| try { |
| DataOutputStream dosKey = prepareAppendKey(klen); |
| try { |
| ++errorCount; |
| dosKey.write(key, koff, klen); |
| --errorCount; |
| } finally { |
| dosKey.close(); |
| } |
| |
| DataOutputStream dosValue = prepareAppendValue(vlen); |
| try { |
| ++errorCount; |
| dosValue.write(value, voff, vlen); |
| --errorCount; |
| } finally { |
| dosValue.close(); |
| } |
| } finally { |
| state = State.READY; |
| } |
| } |
| |
| /** |
| * Helper class to register key after close call on key append stream. |
| */ |
| private class KeyRegister extends DataOutputStream { |
| private final int expectedLength; |
| private boolean closed = false; |
| |
| public KeyRegister(int len) { |
| super(currentKeyBufferOS); |
| if (len >= 0) { |
| currentKeyBufferOS.reset(len); |
| } else { |
| currentKeyBufferOS.reset(); |
| } |
| expectedLength = len; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| if (closed == true) { |
| return; |
| } |
| |
| try { |
| ++errorCount; |
| byte[] key = currentKeyBufferOS.getBuffer(); |
| int len = currentKeyBufferOS.size(); |
| /** |
| * verify length. |
| */ |
| if (expectedLength >= 0 && expectedLength != len) { |
| throw new IOException("Incorrect key length: expected=" |
| + expectedLength + " actual=" + len); |
| } |
| |
| Utils.writeVInt(blkAppender, len); |
| blkAppender.write(key, 0, len); |
| if (tfileIndex.getFirstKey() == null) { |
| tfileIndex.setFirstKey(key, 0, len); |
| } |
| |
| if (tfileMeta.isSorted() && tfileMeta.getRecordCount()>0) { |
| byte[] lastKey = lastKeyBufferOS.getBuffer(); |
| int lastLen = lastKeyBufferOS.size(); |
| if (tfileMeta.getComparator().compare(key, 0, len, lastKey, 0, |
| lastLen) < 0) { |
| throw new IOException("Keys are not added in sorted order"); |
| } |
| } |
| |
| BoundedByteArrayOutputStream tmp = currentKeyBufferOS; |
| currentKeyBufferOS = lastKeyBufferOS; |
| lastKeyBufferOS = tmp; |
| --errorCount; |
| } finally { |
| closed = true; |
| state = State.END_KEY; |
| } |
| } |
| } |
| |
| /** |
| * Helper class to register value after close call on value append stream. |
| */ |
| private class ValueRegister extends DataOutputStream { |
| private boolean closed = false; |
| |
| public ValueRegister(OutputStream os) { |
| super(os); |
| } |
| |
| // Avoiding flushing call to down stream. |
| @Override |
| public void flush() { |
| // do nothing |
| } |
| |
| @Override |
| public void close() throws IOException { |
| if (closed == true) { |
| return; |
| } |
| |
| try { |
| ++errorCount; |
| super.close(); |
| blkRecordCount++; |
| // bump up the total record count in the whole file |
| tfileMeta.incRecordCount(); |
| finishDataBlock(false); |
| --errorCount; |
| } finally { |
| closed = true; |
| state = State.READY; |
| } |
| } |
| } |
| |
| /** |
| * Obtain an output stream for writing a key into TFile. This may only be |
| * called when there is no active Key appending stream or value appending |
| * stream. |
| * |
| * @param length |
| * The expected length of the key. If length of the key is not |
| * known, set length = -1. Otherwise, the application must write |
| * exactly as many bytes as specified here before calling close on |
| * the returned output stream. |
| * @return The key appending output stream. |
| * @throws IOException |
| * |
| */ |
| public DataOutputStream prepareAppendKey(int length) throws IOException { |
| if (state != State.READY) { |
| throw new IllegalStateException("Incorrect state to start a new key: " |
| + state.name()); |
| } |
| |
| initDataBlock(); |
| DataOutputStream ret = new KeyRegister(length); |
| state = State.IN_KEY; |
| return ret; |
| } |
| |
| /** |
| * Obtain an output stream for writing a value into TFile. This may only be |
| * called right after a key appending operation (the key append stream must |
| * be closed). |
| * |
| * @param length |
| * The expected length of the value. If length of the value is not |
| * known, set length = -1. Otherwise, the application must write |
| * exactly as many bytes as specified here before calling close on |
| * the returned output stream. Advertising the value size up-front |
| * guarantees that the value is encoded in one chunk, and avoids |
| * intermediate chunk buffering. |
| * @throws IOException |
| * |
| */ |
| public DataOutputStream prepareAppendValue(int length) throws IOException { |
| if (state != State.END_KEY) { |
| throw new IllegalStateException( |
| "Incorrect state to start a new value: " + state.name()); |
| } |
| |
| DataOutputStream ret; |
| |
| // unknown length |
| if (length < 0) { |
| if (valueBuffer == null) { |
| valueBuffer = new byte[getChunkBufferSize(conf)]; |
| } |
| ret = new ValueRegister(new ChunkEncoder(blkAppender, valueBuffer)); |
| } else { |
| ret = |
| new ValueRegister(new Chunk.SingleChunkEncoder(blkAppender, length)); |
| } |
| |
| state = State.IN_VALUE; |
| return ret; |
| } |
| |
| /** |
| * Obtain an output stream for creating a meta block. This function may not |
| * be called when there is a key append stream or value append stream |
| * active. No more key-value insertion is allowed after a meta data block |
| * has been added to TFile. |
| * |
| * @param name |
| * Name of the meta block. |
| * @param compressName |
| * Name of the compression algorithm to be used. Must be one of the |
| * strings returned by |
| * {@link TFile#getSupportedCompressionAlgorithms()}. |
| * @return A DataOutputStream that can be used to write Meta Block data. |
| * Closing the stream would signal the ending of the block. |
| * @throws IOException |
| * @throws MetaBlockAlreadyExists |
| * the Meta Block with the same name already exists. |
| */ |
| public DataOutputStream prepareMetaBlock(String name, String compressName) |
| throws IOException, MetaBlockAlreadyExists { |
| if (state != State.READY) { |
| throw new IllegalStateException( |
| "Incorrect state to start a Meta Block: " + state.name()); |
| } |
| |
| finishDataBlock(true); |
| DataOutputStream outputStream = |
| writerBCF.prepareMetaBlock(name, compressName); |
| return outputStream; |
| } |
| |
| /** |
| * Obtain an output stream for creating a meta block. This function may not |
| * be called when there is a key append stream or value append stream |
| * active. No more key-value insertion is allowed after a meta data block |
| * has been added to TFile. Data will be compressed using the default |
| * compressor as defined in Writer's constructor. |
| * |
| * @param name |
| * Name of the meta block. |
| * @return A DataOutputStream that can be used to write Meta Block data. |
| * Closing the stream would signal the ending of the block. |
| * @throws IOException |
| * @throws MetaBlockAlreadyExists |
| * the Meta Block with the same name already exists. |
| */ |
| public DataOutputStream prepareMetaBlock(String name) throws IOException, |
| MetaBlockAlreadyExists { |
| if (state != State.READY) { |
| throw new IllegalStateException( |
| "Incorrect state to start a Meta Block: " + state.name()); |
| } |
| |
| finishDataBlock(true); |
| return writerBCF.prepareMetaBlock(name); |
| } |
| |
| /** |
| * Check if we need to start a new data block. |
| * |
| * @throws IOException |
| */ |
| private void initDataBlock() throws IOException { |
| // for each new block, get a new appender |
| if (blkAppender == null) { |
| blkAppender = writerBCF.prepareDataBlock(); |
| } |
| } |
| |
| /** |
| * Close the current data block if necessary. |
| * |
| * @param bForceFinish |
| * Force the closure regardless of the block size. |
| * @throws IOException |
| */ |
| void finishDataBlock(boolean bForceFinish) throws IOException { |
| if (blkAppender == null) { |
| return; |
| } |
| |
| // exceeded the size limit, do the compression and finish the block |
| if (bForceFinish || blkAppender.getCompressedSize() >= sizeMinBlock) { |
| // keep tracks of the last key of each data block, no padding |
| // for now |
| TFileIndexEntry keyLast = |
| new TFileIndexEntry(lastKeyBufferOS.getBuffer(), 0, lastKeyBufferOS |
| .size(), blkRecordCount); |
| tfileIndex.addEntry(keyLast); |
| // close the appender |
| blkAppender.close(); |
| blkAppender = null; |
| blkRecordCount = 0; |
| } |
| } |
| } |
| |
| /** |
| * TFile Reader. Users may only read TFiles by creating TFile.Reader.Scanner. |
| * objects. A scanner may scan the whole TFile ({@link Reader#createScanner()} |
| * ) , a portion of TFile based on byte offsets ( |
| * {@link Reader#createScannerByByteRange(long, long)}), or a portion of TFile with keys |
| * fall in a certain key range (for sorted TFile only, |
| * {@link Reader#createScannerByKey(byte[], byte[])} or |
| * {@link Reader#createScannerByKey(RawComparable, RawComparable)}). |
| */ |
| @InterfaceStability.Evolving |
| public static class Reader implements Closeable { |
| // The underlying BCFile reader. |
| final BCFile.Reader readerBCF; |
| |
| // TFile index, it is loaded lazily. |
| TFileIndex tfileIndex = null; |
| final TFileMeta tfileMeta; |
| final BytesComparator comparator; |
| |
| // global begin and end locations. |
| private final Location begin; |
| private final Location end; |
| |
| /** |
| * Location representing a virtual position in the TFile. |
| */ |
| static final class Location implements Comparable<Location>, Cloneable { |
| private int blockIndex; |
| // distance/offset from the beginning of the block |
| private long recordIndex; |
| |
| Location(int blockIndex, long recordIndex) { |
| set(blockIndex, recordIndex); |
| } |
| |
| void incRecordIndex() { |
| ++recordIndex; |
| } |
| |
| Location(Location other) { |
| set(other); |
| } |
| |
| int getBlockIndex() { |
| return blockIndex; |
| } |
| |
| long getRecordIndex() { |
| return recordIndex; |
| } |
| |
| void set(int blockIndex, long recordIndex) { |
| if ((blockIndex | recordIndex) < 0) { |
| throw new IllegalArgumentException( |
| "Illegal parameter for BlockLocation."); |
| } |
| this.blockIndex = blockIndex; |
| this.recordIndex = recordIndex; |
| } |
| |
| void set(Location other) { |
| set(other.blockIndex, other.recordIndex); |
| } |
| |
| /** |
| * @see java.lang.Comparable#compareTo(java.lang.Object) |
| */ |
| @Override |
| public int compareTo(Location other) { |
| return compareTo(other.blockIndex, other.recordIndex); |
| } |
| |
| int compareTo(int bid, long rid) { |
| if (this.blockIndex == bid) { |
| long ret = this.recordIndex - rid; |
| if (ret > 0) return 1; |
| if (ret < 0) return -1; |
| return 0; |
| } |
| return this.blockIndex - bid; |
| } |
| |
| /** |
| * @see java.lang.Object#clone() |
| */ |
| @Override |
| protected Location clone() { |
| return new Location(blockIndex, recordIndex); |
| } |
| |
| /** |
| * @see java.lang.Object#hashCode() |
| */ |
| @Override |
| public int hashCode() { |
| final int prime = 31; |
| int result = prime + blockIndex; |
| result = (int) (prime * result + recordIndex); |
| return result; |
| } |
| |
| /** |
| * @see java.lang.Object#equals(java.lang.Object) |
| */ |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) return true; |
| if (obj == null) return false; |
| if (getClass() != obj.getClass()) return false; |
| Location other = (Location) obj; |
| if (blockIndex != other.blockIndex) return false; |
| if (recordIndex != other.recordIndex) return false; |
| return true; |
| } |
| } |
| |
| /** |
| * Constructor |
| * |
| * @param fsdis |
| * FS input stream of the TFile. |
| * @param fileLength |
| * The length of TFile. This is required because we have no easy |
| * way of knowing the actual size of the input file through the |
| * File input stream. |
| * @param conf |
| * @throws IOException |
| */ |
| public Reader(FSDataInputStream fsdis, long fileLength, Configuration conf) |
| throws IOException { |
| readerBCF = new BCFile.Reader(fsdis, fileLength, conf); |
| |
| // first, read TFile meta |
| BlockReader brMeta = readerBCF.getMetaBlock(TFileMeta.BLOCK_NAME); |
| try { |
| tfileMeta = new TFileMeta(brMeta); |
| } finally { |
| brMeta.close(); |
| } |
| |
| comparator = tfileMeta.getComparator(); |
| // Set begin and end locations. |
| begin = new Location(0, 0); |
| end = new Location(readerBCF.getBlockCount(), 0); |
| } |
| |
| /** |
| * Close the reader. The state of the Reader object is undefined after |
| * close. Calling close() for multiple times has no effect. |
| */ |
| @Override |
| public void close() throws IOException { |
| readerBCF.close(); |
| } |
| |
| /** |
| * Get the begin location of the TFile. |
| * |
| * @return If TFile is not empty, the location of the first key-value pair. |
| * Otherwise, it returns end(). |
| */ |
| Location begin() { |
| return begin; |
| } |
| |
| /** |
| * Get the end location of the TFile. |
| * |
| * @return The location right after the last key-value pair in TFile. |
| */ |
| Location end() { |
| return end; |
| } |
| |
| /** |
| * Get the string representation of the comparator. |
| * |
| * @return If the TFile is not sorted by keys, an empty string will be |
| * returned. Otherwise, the actual comparator string that is |
| * provided during the TFile creation time will be returned. |
| */ |
| public String getComparatorName() { |
| return tfileMeta.getComparatorString(); |
| } |
| |
| /** |
| * Is the TFile sorted? |
| * |
| * @return true if TFile is sorted. |
| */ |
| public boolean isSorted() { |
| return tfileMeta.isSorted(); |
| } |
| |
| /** |
| * Get the number of key-value pair entries in TFile. |
| * |
| * @return the number of key-value pairs in TFile |
| */ |
| public long getEntryCount() { |
| return tfileMeta.getRecordCount(); |
| } |
| |
| /** |
| * Lazily loading the TFile index. |
| * |
| * @throws IOException |
| */ |
| synchronized void checkTFileDataIndex() throws IOException { |
| if (tfileIndex == null) { |
| BlockReader brIndex = readerBCF.getMetaBlock(TFileIndex.BLOCK_NAME); |
| try { |
| tfileIndex = |
| new TFileIndex(readerBCF.getBlockCount(), brIndex, tfileMeta |
| .getComparator()); |
| } finally { |
| brIndex.close(); |
| } |
| } |
| } |
| |
| /** |
| * Get the first key in the TFile. |
| * |
| * @return The first key in the TFile. |
| * @throws IOException |
| */ |
| public RawComparable getFirstKey() throws IOException { |
| checkTFileDataIndex(); |
| return tfileIndex.getFirstKey(); |
| } |
| |
| /** |
| * Get the last key in the TFile. |
| * |
| * @return The last key in the TFile. |
| * @throws IOException |
| */ |
| public RawComparable getLastKey() throws IOException { |
| checkTFileDataIndex(); |
| return tfileIndex.getLastKey(); |
| } |
| |
| /** |
| * Get a Comparator object to compare Entries. It is useful when you want |
| * stores the entries in a collection (such as PriorityQueue) and perform |
| * sorting or comparison among entries based on the keys without copying out |
| * the key. |
| * |
| * @return An Entry Comparator.. |
| */ |
| public Comparator<Scanner.Entry> getEntryComparator() { |
| if (!isSorted()) { |
| throw new RuntimeException( |
| "Entries are not comparable for unsorted TFiles"); |
| } |
| |
| return new Comparator<Scanner.Entry>() { |
| /** |
| * Provide a customized comparator for Entries. This is useful if we |
| * have a collection of Entry objects. However, if the Entry objects |
| * come from different TFiles, users must ensure that those TFiles share |
| * the same RawComparator. |
| */ |
| @Override |
| public int compare(Scanner.Entry o1, Scanner.Entry o2) { |
| return comparator.compare(o1.getKeyBuffer(), 0, o1.getKeyLength(), o2 |
| .getKeyBuffer(), 0, o2.getKeyLength()); |
| } |
| }; |
| } |
| |
| /** |
| * Get an instance of the RawComparator that is constructed based on the |
| * string comparator representation. |
| * |
| * @return a Comparator that can compare RawComparable's. |
| */ |
| public Comparator<RawComparable> getComparator() { |
| return comparator; |
| } |
| |
| /** |
| * Stream access to a meta block.`` |
| * |
| * @param name |
| * The name of the meta block. |
| * @return The input stream. |
| * @throws IOException |
| * on I/O error. |
| * @throws MetaBlockDoesNotExist |
| * If the meta block with the name does not exist. |
| */ |
| public DataInputStream getMetaBlock(String name) throws IOException, |
| MetaBlockDoesNotExist { |
| return readerBCF.getMetaBlock(name); |
| } |
| |
| /** |
| * if greater is true then returns the beginning location of the block |
| * containing the key strictly greater than input key. if greater is false |
| * then returns the beginning location of the block greater than equal to |
| * the input key |
| * |
| * @param key |
| * the input key |
| * @param greater |
| * boolean flag |
| * @return |
| * @throws IOException |
| */ |
| Location getBlockContainsKey(RawComparable key, boolean greater) |
| throws IOException { |
| if (!isSorted()) { |
| throw new RuntimeException("Seeking in unsorted TFile"); |
| } |
| checkTFileDataIndex(); |
| int blkIndex = |
| (greater) ? tfileIndex.upperBound(key) : tfileIndex.lowerBound(key); |
| if (blkIndex < 0) return end; |
| return new Location(blkIndex, 0); |
| } |
| |
| Location getLocationByRecordNum(long recNum) throws IOException { |
| checkTFileDataIndex(); |
| return tfileIndex.getLocationByRecordNum(recNum); |
| } |
| |
| long getRecordNumByLocation(Location location) throws IOException { |
| checkTFileDataIndex(); |
| return tfileIndex.getRecordNumByLocation(location); |
| } |
| |
| int compareKeys(byte[] a, int o1, int l1, byte[] b, int o2, int l2) { |
| if (!isSorted()) { |
| throw new RuntimeException("Cannot compare keys for unsorted TFiles."); |
| } |
| return comparator.compare(a, o1, l1, b, o2, l2); |
| } |
| |
| int compareKeys(RawComparable a, RawComparable b) { |
| if (!isSorted()) { |
| throw new RuntimeException("Cannot compare keys for unsorted TFiles."); |
| } |
| return comparator.compare(a, b); |
| } |
| |
| /** |
| * Get the location pointing to the beginning of the first key-value pair in |
| * a compressed block whose byte offset in the TFile is greater than or |
| * equal to the specified offset. |
| * |
| * @param offset |
| * the user supplied offset. |
| * @return the location to the corresponding entry; or end() if no such |
| * entry exists. |
| */ |
| Location getLocationNear(long offset) { |
| int blockIndex = readerBCF.getBlockIndexNear(offset); |
| if (blockIndex == -1) return end; |
| return new Location(blockIndex, 0); |
| } |
| |
| /** |
| * Get the RecordNum for the first key-value pair in a compressed block |
| * whose byte offset in the TFile is greater than or equal to the specified |
| * offset. |
| * |
| * @param offset |
| * the user supplied offset. |
| * @return the RecordNum to the corresponding entry. If no such entry |
| * exists, it returns the total entry count. |
| * @throws IOException |
| */ |
| public long getRecordNumNear(long offset) throws IOException { |
| return getRecordNumByLocation(getLocationNear(offset)); |
| } |
| |
| /** |
| * Get a sample key that is within a block whose starting offset is greater |
| * than or equal to the specified offset. |
| * |
| * @param offset |
| * The file offset. |
| * @return the key that fits the requirement; or null if no such key exists |
| * (which could happen if the offset is close to the end of the |
| * TFile). |
| * @throws IOException |
| */ |
| public RawComparable getKeyNear(long offset) throws IOException { |
| int blockIndex = readerBCF.getBlockIndexNear(offset); |
| if (blockIndex == -1) return null; |
| checkTFileDataIndex(); |
| return new ByteArray(tfileIndex.getEntry(blockIndex).key); |
| } |
| |
| /** |
| * Get a scanner than can scan the whole TFile. |
| * |
| * @return The scanner object. A valid Scanner is always returned even if |
| * the TFile is empty. |
| * @throws IOException |
| */ |
| public Scanner createScanner() throws IOException { |
| return new Scanner(this, begin, end); |
| } |
| |
| /** |
| * Get a scanner that covers a portion of TFile based on byte offsets. |
| * |
| * @param offset |
| * The beginning byte offset in the TFile. |
| * @param length |
| * The length of the region. |
| * @return The actual coverage of the returned scanner tries to match the |
| * specified byte-region but always round up to the compression |
| * block boundaries. It is possible that the returned scanner |
| * contains zero key-value pairs even if length is positive. |
| * @throws IOException |
| */ |
| public Scanner createScannerByByteRange(long offset, long length) throws IOException { |
| return new Scanner(this, offset, offset + length); |
| } |
| |
| /** |
| * Get a scanner that covers a portion of TFile based on keys. |
| * |
| * @param beginKey |
| * Begin key of the scan (inclusive). If null, scan from the first |
| * key-value entry of the TFile. |
| * @param endKey |
| * End key of the scan (exclusive). If null, scan up to the last |
| * key-value entry of the TFile. |
| * @return The actual coverage of the returned scanner will cover all keys |
| * greater than or equal to the beginKey and less than the endKey. |
| * @throws IOException |
| * |
| * @deprecated Use {@link #createScannerByKey(byte[], byte[])} instead. |
| */ |
| @Deprecated |
| public Scanner createScanner(byte[] beginKey, byte[] endKey) |
| throws IOException { |
| return createScannerByKey(beginKey, endKey); |
| } |
| |
| /** |
| * Get a scanner that covers a portion of TFile based on keys. |
| * |
| * @param beginKey |
| * Begin key of the scan (inclusive). If null, scan from the first |
| * key-value entry of the TFile. |
| * @param endKey |
| * End key of the scan (exclusive). If null, scan up to the last |
| * key-value entry of the TFile. |
| * @return The actual coverage of the returned scanner will cover all keys |
| * greater than or equal to the beginKey and less than the endKey. |
| * @throws IOException |
| */ |
| public Scanner createScannerByKey(byte[] beginKey, byte[] endKey) |
| throws IOException { |
| return createScannerByKey((beginKey == null) ? null : new ByteArray(beginKey, |
| 0, beginKey.length), (endKey == null) ? null : new ByteArray(endKey, |
| 0, endKey.length)); |
| } |
| |
| /** |
| * Get a scanner that covers a specific key range. |
| * |
| * @param beginKey |
| * Begin key of the scan (inclusive). If null, scan from the first |
| * key-value entry of the TFile. |
| * @param endKey |
| * End key of the scan (exclusive). If null, scan up to the last |
| * key-value entry of the TFile. |
| * @return The actual coverage of the returned scanner will cover all keys |
| * greater than or equal to the beginKey and less than the endKey. |
| * @throws IOException |
| * |
| * @deprecated Use {@link #createScannerByKey(RawComparable, RawComparable)} |
| * instead. |
| */ |
| @Deprecated |
| public Scanner createScanner(RawComparable beginKey, RawComparable endKey) |
| throws IOException { |
| return createScannerByKey(beginKey, endKey); |
| } |
| |
| /** |
| * Get a scanner that covers a specific key range. |
| * |
| * @param beginKey |
| * Begin key of the scan (inclusive). If null, scan from the first |
| * key-value entry of the TFile. |
| * @param endKey |
| * End key of the scan (exclusive). If null, scan up to the last |
| * key-value entry of the TFile. |
| * @return The actual coverage of the returned scanner will cover all keys |
| * greater than or equal to the beginKey and less than the endKey. |
| * @throws IOException |
| */ |
| public Scanner createScannerByKey(RawComparable beginKey, RawComparable endKey) |
| throws IOException { |
| if ((beginKey != null) && (endKey != null) |
| && (compareKeys(beginKey, endKey) >= 0)) { |
| return new Scanner(this, beginKey, beginKey); |
| } |
| return new Scanner(this, beginKey, endKey); |
| } |
| |
| /** |
| * Create a scanner that covers a range of records. |
| * |
| * @param beginRecNum |
| * The RecordNum for the first record (inclusive). |
| * @param endRecNum |
| * The RecordNum for the last record (exclusive). To scan the whole |
| * file, either specify endRecNum==-1 or endRecNum==getEntryCount(). |
| * @return The TFile scanner that covers the specified range of records. |
| * @throws IOException |
| */ |
| public Scanner createScannerByRecordNum(long beginRecNum, long endRecNum) |
| throws IOException { |
| if (beginRecNum < 0) beginRecNum = 0; |
| if (endRecNum < 0 || endRecNum > getEntryCount()) { |
| endRecNum = getEntryCount(); |
| } |
| return new Scanner(this, getLocationByRecordNum(beginRecNum), |
| getLocationByRecordNum(endRecNum)); |
| } |
| |
| /** |
| * The TFile Scanner. The Scanner has an implicit cursor, which, upon |
| * creation, points to the first key-value pair in the scan range. If the |
| * scan range is empty, the cursor will point to the end of the scan range. |
| * <p> |
| * Use {@link Scanner#atEnd()} to test whether the cursor is at the end |
| * location of the scanner. |
| * <p> |
| * Use {@link Scanner#advance()} to move the cursor to the next key-value |
| * pair (or end if none exists). Use seekTo methods ( |
| * {@link Scanner#seekTo(byte[])} or |
| * {@link Scanner#seekTo(byte[], int, int)}) to seek to any arbitrary |
| * location in the covered range (including backward seeking). Use |
| * {@link Scanner#rewind()} to seek back to the beginning of the scanner. |
| * Use {@link Scanner#seekToEnd()} to seek to the end of the scanner. |
| * <p> |
| * Actual keys and values may be obtained through {@link Scanner.Entry} |
| * object, which is obtained through {@link Scanner#entry()}. |
| */ |
| public static class Scanner implements Closeable { |
| // The underlying TFile reader. |
| final Reader reader; |
| // current block (null if reaching end) |
| private BlockReader blkReader; |
| |
| Location beginLocation; |
| Location endLocation; |
| Location currentLocation; |
| |
| // flag to ensure value is only examined once. |
| boolean valueChecked = false; |
| // reusable buffer for keys. |
| final byte[] keyBuffer; |
| // length of key, -1 means key is invalid. |
| int klen = -1; |
| |
| static final int MAX_VAL_TRANSFER_BUF_SIZE = 128 * 1024; |
| BytesWritable valTransferBuffer; |
| |
| DataInputBuffer keyDataInputStream; |
| ChunkDecoder valueBufferInputStream; |
| DataInputStream valueDataInputStream; |
| // vlen == -1 if unknown. |
| int vlen; |
| |
| /** |
| * Constructor |
| * |
| * @param reader |
| * The TFile reader object. |
| * @param offBegin |
| * Begin byte-offset of the scan. |
| * @param offEnd |
| * End byte-offset of the scan. |
| * @throws IOException |
| * |
| * The offsets will be rounded to the beginning of a compressed |
| * block whose offset is greater than or equal to the specified |
| * offset. |
| */ |
| protected Scanner(Reader reader, long offBegin, long offEnd) |
| throws IOException { |
| this(reader, reader.getLocationNear(offBegin), reader |
| .getLocationNear(offEnd)); |
| } |
| |
| /** |
| * Constructor |
| * |
| * @param reader |
| * The TFile reader object. |
| * @param begin |
| * Begin location of the scan. |
| * @param end |
| * End location of the scan. |
| * @throws IOException |
| */ |
| Scanner(Reader reader, Location begin, Location end) throws IOException { |
| this.reader = reader; |
| // ensure the TFile index is loaded throughout the life of scanner. |
| reader.checkTFileDataIndex(); |
| beginLocation = begin; |
| endLocation = end; |
| |
| valTransferBuffer = new BytesWritable(); |
| // TODO: remember the longest key in a TFile, and use it to replace |
| // MAX_KEY_SIZE. |
| keyBuffer = new byte[MAX_KEY_SIZE]; |
| keyDataInputStream = new DataInputBuffer(); |
| valueBufferInputStream = new ChunkDecoder(); |
| valueDataInputStream = new DataInputStream(valueBufferInputStream); |
| |
| if (beginLocation.compareTo(endLocation) >= 0) { |
| currentLocation = new Location(endLocation); |
| } else { |
| currentLocation = new Location(0, 0); |
| initBlock(beginLocation.getBlockIndex()); |
| inBlockAdvance(beginLocation.getRecordIndex()); |
| } |
| } |
| |
| /** |
| * Constructor |
| * |
| * @param reader |
| * The TFile reader object. |
| * @param beginKey |
| * Begin key of the scan. If null, scan from the first <K,V> |
| * entry of the TFile. |
| * @param endKey |
| * End key of the scan. If null, scan up to the last <K, V> entry |
| * of the TFile. |
| * @throws IOException |
| */ |
| protected Scanner(Reader reader, RawComparable beginKey, |
| RawComparable endKey) throws IOException { |
| this(reader, (beginKey == null) ? reader.begin() : reader |
| .getBlockContainsKey(beginKey, false), reader.end()); |
| if (beginKey != null) { |
| inBlockAdvance(beginKey, false); |
| beginLocation.set(currentLocation); |
| } |
| if (endKey != null) { |
| seekTo(endKey, false); |
| endLocation.set(currentLocation); |
| seekTo(beginLocation); |
| } |
| } |
| |
| /** |
| * Move the cursor to the first entry whose key is greater than or equal |
| * to the input key. Synonymous to seekTo(key, 0, key.length). The entry |
| * returned by the previous entry() call will be invalid. |
| * |
| * @param key |
| * The input key |
| * @return true if we find an equal key. |
| * @throws IOException |
| */ |
| public boolean seekTo(byte[] key) throws IOException { |
| return seekTo(key, 0, key.length); |
| } |
| |
| /** |
| * Move the cursor to the first entry whose key is greater than or equal |
| * to the input key. The entry returned by the previous entry() call will |
| * be invalid. |
| * |
| * @param key |
| * The input key |
| * @param keyOffset |
| * offset in the key buffer. |
| * @param keyLen |
| * key buffer length. |
| * @return true if we find an equal key; false otherwise. |
| * @throws IOException |
| */ |
| public boolean seekTo(byte[] key, int keyOffset, int keyLen) |
| throws IOException { |
| return seekTo(new ByteArray(key, keyOffset, keyLen), false); |
| } |
| |
| private boolean seekTo(RawComparable key, boolean beyond) |
| throws IOException { |
| Location l = reader.getBlockContainsKey(key, beyond); |
| if (l.compareTo(beginLocation) < 0) { |
| l = beginLocation; |
| } else if (l.compareTo(endLocation) >= 0) { |
| seekTo(endLocation); |
| return false; |
| } |
| |
| // check if what we are seeking is in the later part of the current |
| // block. |
| if (atEnd() || (l.getBlockIndex() != currentLocation.getBlockIndex()) |
| || (compareCursorKeyTo(key) >= 0)) { |
| // sorry, we must seek to a different location first. |
| seekTo(l); |
| } |
| |
| return inBlockAdvance(key, beyond); |
| } |
| |
| /** |
| * Move the cursor to the new location. The entry returned by the previous |
| * entry() call will be invalid. |
| * |
| * @param l |
| * new cursor location. It must fall between the begin and end |
| * location of the scanner. |
| * @throws IOException |
| */ |
| private void seekTo(Location l) throws IOException { |
| if (l.compareTo(beginLocation) < 0) { |
| throw new IllegalArgumentException( |
| "Attempt to seek before the begin location."); |
| } |
| |
| if (l.compareTo(endLocation) > 0) { |
| throw new IllegalArgumentException( |
| "Attempt to seek after the end location."); |
| } |
| |
| if (l.compareTo(endLocation) == 0) { |
| parkCursorAtEnd(); |
| return; |
| } |
| |
| if (l.getBlockIndex() != currentLocation.getBlockIndex()) { |
| // going to a totally different block |
| initBlock(l.getBlockIndex()); |
| } else { |
| if (valueChecked) { |
| // may temporarily go beyond the last record in the block (in which |
| // case the next if loop will always be true). |
| inBlockAdvance(1); |
| } |
| if (l.getRecordIndex() < currentLocation.getRecordIndex()) { |
| initBlock(l.getBlockIndex()); |
| } |
| } |
| |
| inBlockAdvance(l.getRecordIndex() - currentLocation.getRecordIndex()); |
| |
| return; |
| } |
| |
| /** |
| * Rewind to the first entry in the scanner. The entry returned by the |
| * previous entry() call will be invalid. |
| * |
| * @throws IOException |
| */ |
| public void rewind() throws IOException { |
| seekTo(beginLocation); |
| } |
| |
| /** |
| * Seek to the end of the scanner. The entry returned by the previous |
| * entry() call will be invalid. |
| * |
| * @throws IOException |
| */ |
| public void seekToEnd() throws IOException { |
| parkCursorAtEnd(); |
| } |
| |
| /** |
| * Move the cursor to the first entry whose key is greater than or equal |
| * to the input key. Synonymous to lowerBound(key, 0, key.length). The |
| * entry returned by the previous entry() call will be invalid. |
| * |
| * @param key |
| * The input key |
| * @throws IOException |
| */ |
| public void lowerBound(byte[] key) throws IOException { |
| lowerBound(key, 0, key.length); |
| } |
| |
| /** |
| * Move the cursor to the first entry whose key is greater than or equal |
| * to the input key. The entry returned by the previous entry() call will |
| * be invalid. |
| * |
| * @param key |
| * The input key |
| * @param keyOffset |
| * offset in the key buffer. |
| * @param keyLen |
| * key buffer length. |
| * @throws IOException |
| */ |
| public void lowerBound(byte[] key, int keyOffset, int keyLen) |
| throws IOException { |
| seekTo(new ByteArray(key, keyOffset, keyLen), false); |
| } |
| |
| /** |
| * Move the cursor to the first entry whose key is strictly greater than |
| * the input key. Synonymous to upperBound(key, 0, key.length). The entry |
| * returned by the previous entry() call will be invalid. |
| * |
| * @param key |
| * The input key |
| * @throws IOException |
| */ |
| public void upperBound(byte[] key) throws IOException { |
| upperBound(key, 0, key.length); |
| } |
| |
| /** |
| * Move the cursor to the first entry whose key is strictly greater than |
| * the input key. The entry returned by the previous entry() call will be |
| * invalid. |
| * |
| * @param key |
| * The input key |
| * @param keyOffset |
| * offset in the key buffer. |
| * @param keyLen |
| * key buffer length. |
| * @throws IOException |
| */ |
| public void upperBound(byte[] key, int keyOffset, int keyLen) |
| throws IOException { |
| seekTo(new ByteArray(key, keyOffset, keyLen), true); |
| } |
| |
| /** |
| * Move the cursor to the next key-value pair. The entry returned by the |
| * previous entry() call will be invalid. |
| * |
| * @return true if the cursor successfully moves. False when cursor is |
| * already at the end location and cannot be advanced. |
| * @throws IOException |
| */ |
| public boolean advance() throws IOException { |
| if (atEnd()) { |
| return false; |
| } |
| |
| int curBid = currentLocation.getBlockIndex(); |
| long curRid = currentLocation.getRecordIndex(); |
| long entriesInBlock = reader.getBlockEntryCount(curBid); |
| if (curRid + 1 >= entriesInBlock) { |
| if (endLocation.compareTo(curBid + 1, 0) <= 0) { |
| // last entry in TFile. |
| parkCursorAtEnd(); |
| } else { |
| // last entry in Block. |
| initBlock(curBid + 1); |
| } |
| } else { |
| inBlockAdvance(1); |
| } |
| return true; |
| } |
| |
| /** |
| * Load a compressed block for reading. Expecting blockIndex is valid. |
| * |
| * @throws IOException |
| */ |
| private void initBlock(int blockIndex) throws IOException { |
| klen = -1; |
| if (blkReader != null) { |
| try { |
| blkReader.close(); |
| } finally { |
| blkReader = null; |
| } |
| } |
| blkReader = reader.getBlockReader(blockIndex); |
| currentLocation.set(blockIndex, 0); |
| } |
| |
| private void parkCursorAtEnd() throws IOException { |
| klen = -1; |
| currentLocation.set(endLocation); |
| if (blkReader != null) { |
| try { |
| blkReader.close(); |
| } finally { |
| blkReader = null; |
| } |
| } |
| } |
| |
| /** |
| * Close the scanner. Release all resources. The behavior of using the |
| * scanner after calling close is not defined. The entry returned by the |
| * previous entry() call will be invalid. |
| */ |
| @Override |
| public void close() throws IOException { |
| parkCursorAtEnd(); |
| } |
| |
| /** |
| * Is cursor at the end location? |
| * |
| * @return true if the cursor is at the end location. |
| */ |
| public boolean atEnd() { |
| return (currentLocation.compareTo(endLocation) >= 0); |
| } |
| |
| /** |
| * check whether we have already successfully obtained the key. It also |
| * initializes the valueInputStream. |
| */ |
| void checkKey() throws IOException { |
| if (klen >= 0) return; |
| if (atEnd()) { |
| throw new EOFException("No key-value to read"); |
| } |
| klen = -1; |
| vlen = -1; |
| valueChecked = false; |
| |
| klen = Utils.readVInt(blkReader); |
| blkReader.readFully(keyBuffer, 0, klen); |
| valueBufferInputStream.reset(blkReader); |
| if (valueBufferInputStream.isLastChunk()) { |
| vlen = valueBufferInputStream.getRemain(); |
| } |
| } |
| |
| /** |
| * Get an entry to access the key and value. |
| * |
| * @return The Entry object to access the key and value. |
| * @throws IOException |
| */ |
| public Entry entry() throws IOException { |
| checkKey(); |
| return new Entry(); |
| } |
| |
| /** |
| * Get the RecordNum corresponding to the entry pointed by the cursor. |
| * @return The RecordNum corresponding to the entry pointed by the cursor. |
| * @throws IOException |
| */ |
| public long getRecordNum() throws IOException { |
| return reader.getRecordNumByLocation(currentLocation); |
| } |
| |
| /** |
| * Internal API. Comparing the key at cursor to user-specified key. |
| * |
| * @param other |
| * user-specified key. |
| * @return negative if key at cursor is smaller than user key; 0 if equal; |
| * and positive if key at cursor greater than user key. |
| * @throws IOException |
| */ |
| int compareCursorKeyTo(RawComparable other) throws IOException { |
| checkKey(); |
| return reader.compareKeys(keyBuffer, 0, klen, other.buffer(), other |
| .offset(), other.size()); |
| } |
| |
| /** |
| * Entry to a <Key, Value> pair. |
| */ |
| public class Entry implements Comparable<RawComparable> { |
| /** |
| * Get the length of the key. |
| * |
| * @return the length of the key. |
| */ |
| public int getKeyLength() { |
| return klen; |
| } |
| |
| byte[] getKeyBuffer() { |
| return keyBuffer; |
| } |
| |
| /** |
| * Copy the key and value in one shot into BytesWritables. This is |
| * equivalent to getKey(key); getValue(value); |
| * |
| * @param key |
| * BytesWritable to hold key. |
| * @param value |
| * BytesWritable to hold value |
| * @throws IOException |
| */ |
| public void get(BytesWritable key, BytesWritable value) |
| throws IOException { |
| getKey(key); |
| getValue(value); |
| } |
| |
| /** |
| * Copy the key into BytesWritable. The input BytesWritable will be |
| * automatically resized to the actual key size. |
| * |
| * @param key |
| * BytesWritable to hold the key. |
| * @throws IOException |
| */ |
| public int getKey(BytesWritable key) throws IOException { |
| key.setSize(getKeyLength()); |
| getKey(key.getBytes()); |
| return key.getLength(); |
| } |
| |
| /** |
| * Copy the value into BytesWritable. The input BytesWritable will be |
| * automatically resized to the actual value size. The implementation |
| * directly uses the buffer inside BytesWritable for storing the value. |
| * The call does not require the value length to be known. |
| * |
| * @param value |
| * @throws IOException |
| */ |
| public long getValue(BytesWritable value) throws IOException { |
| DataInputStream dis = getValueStream(); |
| int size = 0; |
| try { |
| int remain; |
| while ((remain = valueBufferInputStream.getRemain()) > 0) { |
| value.setSize(size + remain); |
| dis.readFully(value.getBytes(), size, remain); |
| size += remain; |
| } |
| return value.getLength(); |
| } finally { |
| dis.close(); |
| } |
| } |
| |
| /** |
| * Writing the key to the output stream. This method avoids copying key |
| * buffer from Scanner into user buffer, then writing to the output |
| * stream. |
| * |
| * @param out |
| * The output stream |
| * @return the length of the key. |
| * @throws IOException |
| */ |
| public int writeKey(OutputStream out) throws IOException { |
| out.write(keyBuffer, 0, klen); |
| return klen; |
| } |
| |
| /** |
| * Writing the value to the output stream. This method avoids copying |
| * value data from Scanner into user buffer, then writing to the output |
| * stream. It does not require the value length to be known. |
| * |
| * @param out |
| * The output stream |
| * @return the length of the value |
| * @throws IOException |
| */ |
| public long writeValue(OutputStream out) throws IOException { |
| DataInputStream dis = getValueStream(); |
| long size = 0; |
| try { |
| int chunkSize; |
| while ((chunkSize = valueBufferInputStream.getRemain()) > 0) { |
| chunkSize = Math.min(chunkSize, MAX_VAL_TRANSFER_BUF_SIZE); |
| valTransferBuffer.setSize(chunkSize); |
| dis.readFully(valTransferBuffer.getBytes(), 0, chunkSize); |
| out.write(valTransferBuffer.getBytes(), 0, chunkSize); |
| size += chunkSize; |
| } |
| return size; |
| } finally { |
| dis.close(); |
| } |
| } |
| |
| /** |
| * Copy the key into user supplied buffer. |
| * |
| * @param buf |
| * The buffer supplied by user. The length of the buffer must |
| * not be shorter than the key length. |
| * @return The length of the key. |
| * |
| * @throws IOException |
| */ |
| public int getKey(byte[] buf) throws IOException { |
| return getKey(buf, 0); |
| } |
| |
| /** |
| * Copy the key into user supplied buffer. |
| * |
| * @param buf |
| * The buffer supplied by user. |
| * @param offset |
| * The starting offset of the user buffer where we should copy |
| * the key into. Requiring the key-length + offset no greater |
| * than the buffer length. |
| * @return The length of the key. |
| * @throws IOException |
| */ |
| public int getKey(byte[] buf, int offset) throws IOException { |
| if ((offset | (buf.length - offset - klen)) < 0) { |
| throw new IndexOutOfBoundsException( |
| "Bufer not enough to store the key"); |
| } |
| System.arraycopy(keyBuffer, 0, buf, offset, klen); |
| return klen; |
| } |
| |
| /** |
| * Streaming access to the key. Useful for desrializing the key into |
| * user objects. |
| * |
| * @return The input stream. |
| */ |
| public DataInputStream getKeyStream() { |
| keyDataInputStream.reset(keyBuffer, klen); |
| return keyDataInputStream; |
| } |
| |
| /** |
| * Get the length of the value. isValueLengthKnown() must be tested |
| * true. |
| * |
| * @return the length of the value. |
| */ |
| public int getValueLength() { |
| if (vlen >= 0) { |
| return vlen; |
| } |
| |
| throw new RuntimeException("Value length unknown."); |
| } |
| |
| /** |
| * Copy value into user-supplied buffer. User supplied buffer must be |
| * large enough to hold the whole value. The value part of the key-value |
| * pair pointed by the current cursor is not cached and can only be |
| * examined once. Calling any of the following functions more than once |
| * without moving the cursor will result in exception: |
| * {@link #getValue(byte[])}, {@link #getValue(byte[], int)}, |
| * {@link #getValueStream}. |
| * |
| * @return the length of the value. Does not require |
| * isValueLengthKnown() to be true. |
| * @throws IOException |
| * |
| */ |
| public int getValue(byte[] buf) throws IOException { |
| return getValue(buf, 0); |
| } |
| |
| /** |
| * Copy value into user-supplied buffer. User supplied buffer must be |
| * large enough to hold the whole value (starting from the offset). The |
| * value part of the key-value pair pointed by the current cursor is not |
| * cached and can only be examined once. Calling any of the following |
| * functions more than once without moving the cursor will result in |
| * exception: {@link #getValue(byte[])}, {@link #getValue(byte[], int)}, |
| * {@link #getValueStream}. |
| * |
| * @return the length of the value. Does not require |
| * isValueLengthKnown() to be true. |
| * @throws IOException |
| */ |
| public int getValue(byte[] buf, int offset) throws IOException { |
| DataInputStream dis = getValueStream(); |
| try { |
| if (isValueLengthKnown()) { |
| if ((offset | (buf.length - offset - vlen)) < 0) { |
| throw new IndexOutOfBoundsException( |
| "Buffer too small to hold value"); |
| } |
| dis.readFully(buf, offset, vlen); |
| return vlen; |
| } |
| |
| int nextOffset = offset; |
| while (nextOffset < buf.length) { |
| int n = dis.read(buf, nextOffset, buf.length - nextOffset); |
| if (n < 0) { |
| break; |
| } |
| nextOffset += n; |
| } |
| if (dis.read() >= 0) { |
| // attempt to read one more byte to determine whether we reached |
| // the |
| // end or not. |
| throw new IndexOutOfBoundsException( |
| "Buffer too small to hold value"); |
| } |
| return nextOffset - offset; |
| } finally { |
| dis.close(); |
| } |
| } |
| |
| /** |
| * Stream access to value. The value part of the key-value pair pointed |
| * by the current cursor is not cached and can only be examined once. |
| * Calling any of the following functions more than once without moving |
| * the cursor will result in exception: {@link #getValue(byte[])}, |
| * {@link #getValue(byte[], int)}, {@link #getValueStream}. |
| * |
| * @return The input stream for reading the value. |
| * @throws IOException |
| */ |
| public DataInputStream getValueStream() throws IOException { |
| if (valueChecked == true) { |
| throw new IllegalStateException( |
| "Attempt to examine value multiple times."); |
| } |
| valueChecked = true; |
| return valueDataInputStream; |
| } |
| |
| /** |
| * Check whether it is safe to call getValueLength(). |
| * |
| * @return true if value length is known before hand. Values less than |
| * the chunk size will always have their lengths known before |
| * hand. Values that are written out as a whole (with advertised |
| * length up-front) will always have their lengths known in |
| * read. |
| */ |
| public boolean isValueLengthKnown() { |
| return (vlen >= 0); |
| } |
| |
| /** |
| * Compare the entry key to another key. Synonymous to compareTo(key, 0, |
| * key.length). |
| * |
| * @param buf |
| * The key buffer. |
| * @return comparison result between the entry key with the input key. |
| */ |
| public int compareTo(byte[] buf) { |
| return compareTo(buf, 0, buf.length); |
| } |
| |
| /** |
| * Compare the entry key to another key. Synonymous to compareTo(new |
| * ByteArray(buf, offset, length) |
| * |
| * @param buf |
| * The key buffer |
| * @param offset |
| * offset into the key buffer. |
| * @param length |
| * the length of the key. |
| * @return comparison result between the entry key with the input key. |
| */ |
| public int compareTo(byte[] buf, int offset, int length) { |
| return compareTo(new ByteArray(buf, offset, length)); |
| } |
| |
| /** |
| * Compare an entry with a RawComparable object. This is useful when |
| * Entries are stored in a collection, and we want to compare a user |
| * supplied key. |
| */ |
| @Override |
| public int compareTo(RawComparable key) { |
| return reader.compareKeys(keyBuffer, 0, getKeyLength(), key.buffer(), |
| key.offset(), key.size()); |
| } |
| |
| /** |
| * Compare whether this and other points to the same key value. |
| */ |
| @Override |
| public boolean equals(Object other) { |
| if (this == other) return true; |
| if (!(other instanceof Entry)) return false; |
| return ((Entry) other).compareTo(keyBuffer, 0, getKeyLength()) == 0; |
| } |
| |
| @Override |
| public int hashCode() { |
| return WritableComparator.hashBytes(keyBuffer, 0, getKeyLength()); |
| } |
| } |
| |
| /** |
| * Advance cursor by n positions within the block. |
| * |
| * @param n |
| * Number of key-value pairs to skip in block. |
| * @throws IOException |
| */ |
| private void inBlockAdvance(long n) throws IOException { |
| for (long i = 0; i < n; ++i) { |
| checkKey(); |
| if (!valueBufferInputStream.isClosed()) { |
| valueBufferInputStream.close(); |
| } |
| klen = -1; |
| currentLocation.incRecordIndex(); |
| } |
| } |
| |
| /** |
| * Advance cursor in block until we find a key that is greater than or |
| * equal to the input key. |
| * |
| * @param key |
| * Key to compare. |
| * @param greater |
| * advance until we find a key greater than the input key. |
| * @return true if we find a equal key. |
| * @throws IOException |
| */ |
| private boolean inBlockAdvance(RawComparable key, boolean greater) |
| throws IOException { |
| int curBid = currentLocation.getBlockIndex(); |
| long entryInBlock = reader.getBlockEntryCount(curBid); |
| if (curBid == endLocation.getBlockIndex()) { |
| entryInBlock = endLocation.getRecordIndex(); |
| } |
| |
| while (currentLocation.getRecordIndex() < entryInBlock) { |
| int cmp = compareCursorKeyTo(key); |
| if (cmp > 0) return false; |
| if (cmp == 0 && !greater) return true; |
| if (!valueBufferInputStream.isClosed()) { |
| valueBufferInputStream.close(); |
| } |
| klen = -1; |
| currentLocation.incRecordIndex(); |
| } |
| |
| throw new RuntimeException("Cannot find matching key in block."); |
| } |
| } |
| |
| long getBlockEntryCount(int curBid) { |
| return tfileIndex.getEntry(curBid).entries(); |
| } |
| |
| BlockReader getBlockReader(int blockIndex) throws IOException { |
| return readerBCF.getDataBlock(blockIndex); |
| } |
| } |
| |
| /** |
| * Data structure representing "TFile.meta" meta block. |
| */ |
| static final class TFileMeta { |
| final static String BLOCK_NAME = "TFile.meta"; |
| final Version version; |
| private long recordCount; |
| private final String strComparator; |
| private final BytesComparator comparator; |
| |
| // ctor for writes |
| public TFileMeta(String comparator) { |
| // set fileVersion to API version when we create it. |
| version = TFile.API_VERSION; |
| recordCount = 0; |
| strComparator = (comparator == null) ? "" : comparator; |
| this.comparator = makeComparator(strComparator); |
| } |
| |
| // ctor for reads |
| public TFileMeta(DataInput in) throws IOException { |
| version = new Version(in); |
| if (!version.compatibleWith(TFile.API_VERSION)) { |
| throw new RuntimeException("Incompatible TFile fileVersion."); |
| } |
| recordCount = Utils.readVLong(in); |
| strComparator = Utils.readString(in); |
| comparator = makeComparator(strComparator); |
| } |
| |
| @SuppressWarnings("unchecked") |
| static BytesComparator makeComparator(String comparator) { |
| if (comparator.length() == 0) { |
| // unsorted keys |
| return null; |
| } |
| if (comparator.equals(COMPARATOR_MEMCMP)) { |
| // default comparator |
| return new BytesComparator(new MemcmpRawComparator()); |
| } else if (comparator.startsWith(COMPARATOR_JCLASS)) { |
| String compClassName = |
| comparator.substring(COMPARATOR_JCLASS.length()).trim(); |
| try { |
| Class compClass = Class.forName(compClassName); |
| // use its default ctor to create an instance |
| return new BytesComparator((RawComparator<Object>) compClass |
| .newInstance()); |
| } catch (Exception e) { |
| throw new IllegalArgumentException( |
| "Failed to instantiate comparator: " + comparator + "(" |
| + e.toString() + ")"); |
| } |
| } else { |
| throw new IllegalArgumentException("Unsupported comparator: " |
| + comparator); |
| } |
| } |
| |
| public void write(DataOutput out) throws IOException { |
| TFile.API_VERSION.write(out); |
| Utils.writeVLong(out, recordCount); |
| Utils.writeString(out, strComparator); |
| } |
| |
| public long getRecordCount() { |
| return recordCount; |
| } |
| |
| public void incRecordCount() { |
| ++recordCount; |
| } |
| |
| public boolean isSorted() { |
| return !strComparator.equals(""); |
| } |
| |
| public String getComparatorString() { |
| return strComparator; |
| } |
| |
| public BytesComparator getComparator() { |
| return comparator; |
| } |
| |
| public Version getVersion() { |
| return version; |
| } |
| } // END: class MetaTFileMeta |
| |
| /** |
| * Data structure representing "TFile.index" meta block. |
| */ |
| static class TFileIndex { |
| final static String BLOCK_NAME = "TFile.index"; |
| private ByteArray firstKey; |
| private final ArrayList<TFileIndexEntry> index; |
| private final ArrayList<Long> recordNumIndex; |
| private final BytesComparator comparator; |
| private long sum = 0; |
| |
| /** |
| * For reading from file. |
| * |
| * @throws IOException |
| */ |
| public TFileIndex(int entryCount, DataInput in, BytesComparator comparator) |
| throws IOException { |
| index = new ArrayList<TFileIndexEntry>(entryCount); |
| recordNumIndex = new ArrayList<Long>(entryCount); |
| int size = Utils.readVInt(in); // size for the first key entry. |
| if (size > 0) { |
| byte[] buffer = new byte[size]; |
| in.readFully(buffer); |
| DataInputStream firstKeyInputStream = |
| new DataInputStream(new ByteArrayInputStream(buffer, 0, size)); |
| |
| int firstKeyLength = Utils.readVInt(firstKeyInputStream); |
| firstKey = new ByteArray(new byte[firstKeyLength]); |
| firstKeyInputStream.readFully(firstKey.buffer()); |
| |
| for (int i = 0; i < entryCount; i++) { |
| size = Utils.readVInt(in); |
| if (buffer.length < size) { |
| buffer = new byte[size]; |
| } |
| in.readFully(buffer, 0, size); |
| TFileIndexEntry idx = |
| new TFileIndexEntry(new DataInputStream(new ByteArrayInputStream( |
| buffer, 0, size))); |
| index.add(idx); |
| sum += idx.entries(); |
| recordNumIndex.add(sum); |
| } |
| } else { |
| if (entryCount != 0) { |
| throw new RuntimeException("Internal error"); |
| } |
| } |
| this.comparator = comparator; |
| } |
| |
| /** |
| * @param key |
| * input key. |
| * @return the ID of the first block that contains key >= input key. Or -1 |
| * if no such block exists. |
| */ |
| public int lowerBound(RawComparable key) { |
| if (comparator == null) { |
| throw new RuntimeException("Cannot search in unsorted TFile"); |
| } |
| |
| if (firstKey == null) { |
| return -1; // not found |
| } |
| |
| int ret = Utils.lowerBound(index, key, comparator); |
| if (ret == index.size()) { |
| return -1; |
| } |
| return ret; |
| } |
| |
| /** |
| * @param key |
| * input key. |
| * @return the ID of the first block that contains key > input key. Or -1 |
| * if no such block exists. |
| */ |
| public int upperBound(RawComparable key) { |
| if (comparator == null) { |
| throw new RuntimeException("Cannot search in unsorted TFile"); |
| } |
| |
| if (firstKey == null) { |
| return -1; // not found |
| } |
| |
| int ret = Utils.upperBound(index, key, comparator); |
| if (ret == index.size()) { |
| return -1; |
| } |
| return ret; |
| } |
| |
| /** |
| * For writing to file. |
| */ |
| public TFileIndex(BytesComparator comparator) { |
| index = new ArrayList<TFileIndexEntry>(); |
| recordNumIndex = new ArrayList<Long>(); |
| this.comparator = comparator; |
| } |
| |
| public RawComparable getFirstKey() { |
| return firstKey; |
| } |
| |
| public Reader.Location getLocationByRecordNum(long recNum) { |
| int idx = Utils.upperBound(recordNumIndex, recNum); |
| long lastRecNum = (idx == 0)? 0: recordNumIndex.get(idx-1); |
| return new Reader.Location(idx, recNum-lastRecNum); |
| } |
| |
| public long getRecordNumByLocation(Reader.Location location) { |
| int blkIndex = location.getBlockIndex(); |
| long lastRecNum = (blkIndex == 0) ? 0: recordNumIndex.get(blkIndex-1); |
| return lastRecNum + location.getRecordIndex(); |
| } |
| |
| public void setFirstKey(byte[] key, int offset, int length) { |
| firstKey = new ByteArray(new byte[length]); |
| System.arraycopy(key, offset, firstKey.buffer(), 0, length); |
| } |
| |
| public RawComparable getLastKey() { |
| if (index.size() == 0) { |
| return null; |
| } |
| return new ByteArray(index.get(index.size() - 1).buffer()); |
| } |
| |
| public void addEntry(TFileIndexEntry keyEntry) { |
| index.add(keyEntry); |
| sum += keyEntry.entries(); |
| recordNumIndex.add(sum); |
| } |
| |
| public TFileIndexEntry getEntry(int bid) { |
| return index.get(bid); |
| } |
| |
| public void write(DataOutput out) throws IOException { |
| if (firstKey == null) { |
| Utils.writeVInt(out, 0); |
| return; |
| } |
| |
| DataOutputBuffer dob = new DataOutputBuffer(); |
| Utils.writeVInt(dob, firstKey.size()); |
| dob.write(firstKey.buffer()); |
| Utils.writeVInt(out, dob.size()); |
| out.write(dob.getData(), 0, dob.getLength()); |
| |
| for (TFileIndexEntry entry : index) { |
| dob.reset(); |
| entry.write(dob); |
| Utils.writeVInt(out, dob.getLength()); |
| out.write(dob.getData(), 0, dob.getLength()); |
| } |
| } |
| } |
| |
| /** |
| * TFile Data Index entry. We should try to make the memory footprint of each |
| * index entry as small as possible. |
| */ |
| static final class TFileIndexEntry implements RawComparable { |
| final byte[] key; |
| // count of <key, value> entries in the block. |
| final long kvEntries; |
| |
| public TFileIndexEntry(DataInput in) throws IOException { |
| int len = Utils.readVInt(in); |
| key = new byte[len]; |
| in.readFully(key, 0, len); |
| kvEntries = Utils.readVLong(in); |
| } |
| |
| // default entry, without any padding |
| public TFileIndexEntry(byte[] newkey, int offset, int len, long entries) { |
| key = new byte[len]; |
| System.arraycopy(newkey, offset, key, 0, len); |
| this.kvEntries = entries; |
| } |
| |
| @Override |
| public byte[] buffer() { |
| return key; |
| } |
| |
| @Override |
| public int offset() { |
| return 0; |
| } |
| |
| @Override |
| public int size() { |
| return key.length; |
| } |
| |
| long entries() { |
| return kvEntries; |
| } |
| |
| public void write(DataOutput out) throws IOException { |
| Utils.writeVInt(out, key.length); |
| out.write(key, 0, key.length); |
| Utils.writeVLong(out, kvEntries); |
| } |
| } |
| |
| /** |
| * Dumping the TFile information. |
| * |
| * @param args |
| * A list of TFile paths. |
| */ |
| public static void main(String[] args) { |
| System.out.printf("TFile Dumper (TFile %s, BCFile %s)\n", TFile.API_VERSION |
| .toString(), BCFile.API_VERSION.toString()); |
| if (args.length == 0) { |
| System.out |
| .println("Usage: java ... org.apache.hadoop.io.file.tfile.TFile tfile-path [tfile-path ...]"); |
| System.exit(0); |
| } |
| Configuration conf = new Configuration(); |
| |
| for (String file : args) { |
| System.out.println("===" + file + "==="); |
| try { |
| TFileDumper.dumpInfo(file, System.out, conf); |
| } catch (IOException e) { |
| e.printStackTrace(System.err); |
| } |
| } |
| } |
| } |