blob: 1b2aefff41904e93ac3615d3db8cc124065f6553 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.library.common.sort.impl;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.tez.runtime.library.utils.BufferUtils;
import org.apache.tez.runtime.library.utils.CodecUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.tez.common.counters.TezCounter;
/**
* <code>IFile</code> is the simple <key-len, value-len, key, value> format
* for the intermediate map-outputs in Map-Reduce.
*
* There is a <code>Writer</code> to write out map-outputs in this format and
* a <code>Reader</code> to read files of this format.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class IFile {
private static final Logger LOG = LoggerFactory.getLogger(IFile.class);
public static final int EOF_MARKER = -1; // End of File Marker
public static final int RLE_MARKER = -2; // Repeat same key marker
public static final int V_END_MARKER = -3; // End of values marker
public static final DataInputBuffer REPEAT_KEY = new DataInputBuffer();
static final byte[] HEADER = new byte[] { (byte) 'T', (byte) 'I',
(byte) 'F' , (byte) 0};
private static final String INCOMPLETE_READ = "Requested to read %d got %d";
private static final String REQ_BUFFER_SIZE_TOO_LARGE = "Size of data %d is greater than the max allowed of %d";
/**
* IFileWriter which stores data in memory for specified limit, beyond
* which it falls back to file based writer. It creates files lazily on
* need basis and avoids any disk hit (in cases, where data fits entirely in mem).
* <p>
* This class should not make any changes to IFile logic and should just flip streams
* from mem to disk on need basis.
*
* During write, it verifies whether uncompressed payload can fit in memory. If so, it would
* store in buffer. Otherwise, it falls back to file based writer. Note that data stored
* internally would be in compressed format (if codec is provided). However, for easier
* comparison and spill over, uncompressed payload check is done. This is
* done intentionally, as it is not possible to know compressed data length
* upfront.
*/
public static class FileBackedInMemIFileWriter extends Writer {
private FileSystem fs;
private boolean bufferFull;
// For lazy creation of file
private TezTaskOutput taskOutput;
private int totalSize;
private Path outputPath;
private CompressionCodec fileCodec;
private BoundedByteArrayOutputStream cacheStream;
private static final int checksumSize = IFileOutputStream.getCheckSumSize();
/**
* Note that we do not allow compression in in-mem stream.
* When spilled over to file, compression gets enabled.
*
* @param keySerialization
* @param valSerialization
* @param fs
* @param taskOutput
* @param keyClass
* @param valueClass
* @param codec
* @param writesCounter
* @param serializedBytesCounter
* @param cacheSize
* @throws IOException
*/
public FileBackedInMemIFileWriter(Serialization<?> keySerialization,
Serialization<?> valSerialization, FileSystem fs, TezTaskOutput taskOutput,
Class<?> keyClass, Class<?> valueClass, CompressionCodec codec, TezCounter writesCounter,
TezCounter serializedBytesCounter, int cacheSize) throws IOException {
super(keySerialization, valSerialization, new FSDataOutputStream(createBoundedBuffer(cacheSize), null),
keyClass, valueClass, null, writesCounter, serializedBytesCounter);
this.fs = fs;
this.cacheStream = (BoundedByteArrayOutputStream) this.rawOut.getWrappedStream();
this.taskOutput = taskOutput;
this.bufferFull = (cacheStream == null);
this.totalSize = getBaseCacheSize();
this.fileCodec = codec;
}
/**
* For basic cache size checks: header + checksum + EOF marker
*
* @return size of the base cache needed
*/
static int getBaseCacheSize() {
return (HEADER.length + checksumSize
+ (2 * WritableUtils.getVIntSize(EOF_MARKER)));
}
boolean shouldWriteToDisk() {
return totalSize >= cacheStream.getLimit();
}
/**
* Create in mem stream. In it is too small, adjust it's size
*
* @param size
* @return in memory stream
*/
public static BoundedByteArrayOutputStream createBoundedBuffer(int size) {
int resize = Math.max(getBaseCacheSize(), size);
return new BoundedByteArrayOutputStream(resize);
}
/**
* Flip over from memory to file based writer.
*
* 1. Content format: HEADER + real data + CHECKSUM. Checksum is for real
* data.
* 2. Before flipping, close checksum stream, so that checksum is written
* out.
* 3. Create relevant file based writer.
* 4. Write header and then real data.
*
* @throws IOException
*/
private void resetToFileBasedWriter() throws IOException {
// Close out stream, so that data checksums are written.
// Buf contents = HEADER + real data + CHECKSUM
this.out.close();
// Get the buffer which contains data in memory
BoundedByteArrayOutputStream bout =
(BoundedByteArrayOutputStream) this.rawOut.getWrappedStream();
// Create new file based writer
if (outputPath == null) {
outputPath = taskOutput.getOutputFileForWrite();
}
LOG.info("Switching from mem stream to disk stream. File: " + outputPath);
FSDataOutputStream newRawOut = fs.create(outputPath);
this.rawOut = newRawOut;
this.ownOutputStream = true;
setupOutputStream(fileCodec);
// Write header to file
headerWritten = false;
writeHeader(newRawOut);
// write real data
int sPos = HEADER.length;
int len = (bout.size() - checksumSize - HEADER.length);
this.out.write(bout.getBuffer(), sPos, len);
bufferFull = true;
bout.reset();
}
@Override
protected void writeKVPair(byte[] keyData, int keyPos, int keyLength,
byte[] valueData, int valPos, int valueLength) throws IOException {
if (!bufferFull) {
// Compute actual payload size: write RLE marker, length info and then entire data.
totalSize += ((prevKey == REPEAT_KEY) ? V_END_MARKER_SIZE : 0)
+ WritableUtils.getVIntSize(keyLength) + keyLength
+ WritableUtils.getVIntSize(valueLength) + valueLength;
if (shouldWriteToDisk()) {
resetToFileBasedWriter();
}
}
super.writeKVPair(keyData, keyPos, keyLength, valueData, valPos, valueLength);
}
@Override
protected void writeValue(byte[] data, int offset, int length) throws IOException {
if (!bufferFull) {
totalSize += ((prevKey != REPEAT_KEY) ? RLE_MARKER_SIZE : 0)
+ WritableUtils.getVIntSize(length) + length;
if (shouldWriteToDisk()) {
resetToFileBasedWriter();
}
}
super.writeValue(data, offset, length);
}
/**
* Check if data was flushed to disk.
*
* @return whether data is flushed to disk ot not
*/
public boolean isDataFlushedToDisk() {
return bufferFull;
}
/**
* Get cached data if any
*
* @return if data is not flushed to disk, it returns in-mem contents
*/
public ByteBuffer getData() {
if (!isDataFlushedToDisk()) {
return ByteBuffer.wrap(cacheStream.getBuffer(), 0, cacheStream.size());
}
return null;
}
@VisibleForTesting
void setOutputPath(Path outputPath) {
this.outputPath = outputPath;
}
public Path getOutputPath() {
return this.outputPath;
}
}
/**
* <code>IFile.Writer</code> to write out intermediate map-outputs.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@SuppressWarnings({"unchecked", "rawtypes"})
public static class Writer {
protected DataOutputStream out;
boolean ownOutputStream = false;
long start = 0;
FSDataOutputStream rawOut;
final AtomicBoolean closed = new AtomicBoolean(false);
CompressionOutputStream compressedOut;
Compressor compressor;
boolean compressOutput = false;
long decompressedBytesWritten = 0;
long compressedBytesWritten = 0;
// Count records written to disk
private long numRecordsWritten = 0;
private long rleWritten = 0; //number of RLE markers written
private long totalKeySaving = 0; //number of keys saved due to multi KV writes + RLE
private final TezCounter writtenRecordsCounter;
private final TezCounter serializedUncompressedBytes;
IFileOutputStream checksumOut;
boolean closeSerializers = false;
Serializer keySerializer = null;
Serializer valueSerializer = null;
final DataOutputBuffer buffer = new DataOutputBuffer();
final DataOutputBuffer previous = new DataOutputBuffer();
Object prevKey = null;
boolean headerWritten = false;
@VisibleForTesting
boolean sameKey = false;
final int RLE_MARKER_SIZE = WritableUtils.getVIntSize(RLE_MARKER);
final int V_END_MARKER_SIZE = WritableUtils.getVIntSize(V_END_MARKER);
// de-dup keys or not
protected final boolean rle;
public Writer(Serialization keySerialization, Serialization valSerialization, FileSystem fs, Path file,
Class keyClass, Class valueClass,
CompressionCodec codec,
TezCounter writesCounter,
TezCounter serializedBytesCounter) throws IOException {
this(keySerialization, valSerialization, fs.create(file), keyClass, valueClass, codec,
writesCounter, serializedBytesCounter);
ownOutputStream = true;
}
protected Writer(TezCounter writesCounter, TezCounter serializedBytesCounter, boolean rle) {
writtenRecordsCounter = writesCounter;
serializedUncompressedBytes = serializedBytesCounter;
this.rle = rle;
}
public Writer(Serialization keySerialization, Serialization valSerialization, FSDataOutputStream outputStream,
Class keyClass, Class valueClass, CompressionCodec codec, TezCounter writesCounter,
TezCounter serializedBytesCounter) throws IOException {
this(keySerialization, valSerialization, outputStream, keyClass, valueClass, codec, writesCounter,
serializedBytesCounter, false);
}
public Writer(Serialization keySerialization, Serialization valSerialization, FSDataOutputStream outputStream,
Class keyClass, Class valueClass,
CompressionCodec codec, TezCounter writesCounter, TezCounter serializedBytesCounter,
boolean rle) throws IOException {
this.rawOut = outputStream;
this.writtenRecordsCounter = writesCounter;
this.serializedUncompressedBytes = serializedBytesCounter;
this.start = this.rawOut.getPos();
this.rle = rle;
setupOutputStream(codec);
writeHeader(outputStream);
if (keyClass != null) {
this.closeSerializers = true;
this.keySerializer = keySerialization.getSerializer(keyClass);
this.keySerializer.open(buffer);
this.valueSerializer = valSerialization.getSerializer(valueClass);
this.valueSerializer.open(buffer);
} else {
this.closeSerializers = false;
}
}
void setupOutputStream(CompressionCodec codec) throws IOException {
this.checksumOut = new IFileOutputStream(this.rawOut);
if (codec != null) {
this.compressor = CodecPool.getCompressor(codec);
if (this.compressor != null) {
this.compressor.reset();
this.compressedOut = codec.createOutputStream(checksumOut, compressor);
this.out = new FSDataOutputStream(this.compressedOut, null);
this.compressOutput = true;
} else {
LOG.warn("Could not obtain compressor from CodecPool");
this.out = new FSDataOutputStream(checksumOut,null);
}
} else {
this.out = new FSDataOutputStream(checksumOut,null);
}
}
public Writer(Serialization keySerialization, Serialization valSerialization, FileSystem fs, Path file) throws IOException {
this(keySerialization, valSerialization, fs, file, null, null, null, null, null);
}
protected void writeHeader(OutputStream outputStream) throws IOException {
if (!headerWritten) {
outputStream.write(HEADER, 0, HEADER.length - 1);
outputStream.write((compressOutput) ? (byte) 1 : (byte) 0);
headerWritten = true;
}
}
public void close() throws IOException {
if (closed.getAndSet(true)) {
throw new IOException("Writer was already closed earlier");
}
// When IFile writer is created by BackupStore, we do not have
// Key and Value classes set. So, check before closing the
// serializers
if (closeSerializers) {
keySerializer.close();
valueSerializer.close();
}
// write V_END_MARKER as needed
writeValueMarker(out);
// Write EOF_MARKER for key/value length
WritableUtils.writeVInt(out, EOF_MARKER);
WritableUtils.writeVInt(out, EOF_MARKER);
decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
//account for header bytes
decompressedBytesWritten += HEADER.length;
// Close the underlying stream iff we own it...
if (ownOutputStream) {
out.close();
} else {
if (compressOutput) {
// Flush
compressedOut.finish();
compressedOut.resetState();
}
// Write the checksum and flush the buffer
checksumOut.finish();
}
//header bytes are already included in rawOut
compressedBytesWritten = rawOut.getPos() - start;
if (compressOutput) {
// Return back the compressor
CodecPool.returnCompressor(compressor);
compressor = null;
}
out = null;
if (writtenRecordsCounter != null) {
writtenRecordsCounter.increment(numRecordsWritten);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Total keys written=" + numRecordsWritten + "; rleEnabled=" + rle + "; Savings" +
"(due to multi-kv/rle)=" + totalKeySaving + "; number of RLEs written=" +
rleWritten + "; compressedLen=" + compressedBytesWritten + "; rawLen="
+ decompressedBytesWritten);
}
}
/**
* Send key/value to be appended to IFile. To represent same key as previous
* one, send IFile.REPEAT_KEY as key parameter. Should not call this method with
* IFile.REPEAT_KEY as the first key. It is caller's responsibility to ensure that correct
* key/value type checks and key/value length (non-negative) checks are done properly.
*
* @param key
* @param value
* @throws IOException
*/
public void append(Object key, Object value) throws IOException {
int keyLength = 0;
sameKey = (key == REPEAT_KEY);
if (!sameKey) {
keySerializer.serialize(key);
keyLength = buffer.getLength();
assert(keyLength >= 0);
if (rle && (keyLength == previous.getLength())) {
sameKey = (BufferUtils.compare(previous, buffer) == 0);
}
}
// Append the 'value'
valueSerializer.serialize(value);
int valueLength = buffer.getLength() - keyLength;
assert(valueLength >= 0);
if (!sameKey) {
//dump entire key value pair
writeKVPair(buffer.getData(), 0, keyLength, buffer.getData(),
keyLength, buffer.getLength() - keyLength);
if (rle) {
previous.reset();
previous.write(buffer.getData(), 0, keyLength); //store the key
}
} else {
writeValue(buffer.getData(), keyLength, valueLength);
}
prevKey = (sameKey) ? REPEAT_KEY : key;
// Reset
buffer.reset();
++numRecordsWritten;
}
/**
* Appends the value to previous key. Assumes that the caller has already done relevant checks
* for identical keys. Also, no validations are done in this method
*
* @param value
* @throws IOException
*/
public void appendValue(Object value) throws IOException {
valueSerializer.serialize(value);
int valueLength = buffer.getLength();
writeValue(buffer.getData(), 0, valueLength);
buffer.reset();
++numRecordsWritten;
prevKey = REPEAT_KEY;
}
/**
* Appends the value to previous key. Assumes that the caller has already done relevant checks
* for identical keys. Also, no validations are done in this method. It is caller's responsibility
* to pass non-negative key/value lengths. Otherwise,IndexOutOfBoundsException could be
* thrown at runtime.
*
* @param value
* @throws IOException
*/
public void appendValue(DataInputBuffer value) throws IOException {
int valueLength = value.getLength() - value.getPosition();
assert(valueLength >= 0);
writeValue(value.getData(), value.getPosition(), valueLength);
buffer.reset();
++numRecordsWritten;
prevKey = REPEAT_KEY;
}
/**
* Appends the value to previous key. Assumes that the caller has already done relevant checks
* for identical keys. Also, no validations are done in this method
*
* @param valuesItr
* @throws IOException
*/
public <V> void appendValues(Iterator<V> valuesItr) throws IOException {
while(valuesItr.hasNext()) {
appendValue(valuesItr.next());
}
}
/**
* Append key and its associated set of values.
*
* @param key
* @param valuesItr
* @param <K>
* @param <V>
* @throws IOException
*/
public <K, V> void appendKeyValues(K key, Iterator<V> valuesItr) throws IOException {
if (valuesItr.hasNext()) {
append(key, valuesItr.next()); //append first KV pair
}
//append the remaining values
while(valuesItr.hasNext()) {
appendValue(valuesItr.next());
}
}
/**
* Send key/value to be appended to IFile. To represent same key as previous
* one, send IFile.REPEAT_KEY as key parameter. Should not call this method with
* IFile.REPEAT_KEY as the first key. It is caller's responsibility to pass non-negative
* key/value lengths. Otherwise,IndexOutOfBoundsException could be thrown at runtime.
*
*
* @param key
* @param value
* @throws IOException
*/
public void append(DataInputBuffer key, DataInputBuffer value) throws IOException {
int keyLength = key.getLength() - key.getPosition();
assert(key == REPEAT_KEY || keyLength >=0);
int valueLength = value.getLength() - value.getPosition();
assert(valueLength >= 0);
sameKey = (key == REPEAT_KEY);
if (!sameKey && rle) {
sameKey = (keyLength != 0) && (BufferUtils.compare(previous, key) == 0);
}
if (!sameKey) {
writeKVPair(key.getData(), key.getPosition(), keyLength,
value.getData(), value.getPosition(), valueLength);
if (rle) {
BufferUtils.copy(key, previous);
}
} else {
writeValue(value.getData(), value.getPosition(), valueLength);
}
prevKey = (sameKey) ? REPEAT_KEY : key;
++numRecordsWritten;
}
protected void writeValue(byte[] data, int offset, int length) throws IOException {
writeRLE(out);
WritableUtils.writeVInt(out, length); // value length
out.write(data, offset, length);
// Update bytes written
decompressedBytesWritten +=
length + WritableUtils.getVIntSize(length);
if (serializedUncompressedBytes != null) {
serializedUncompressedBytes.increment(length);
}
totalKeySaving++;
}
protected void writeKVPair(byte[] keyData, int keyPos, int keyLength,
byte[] valueData, int valPos, int valueLength) throws IOException {
writeValueMarker(out);
WritableUtils.writeVInt(out, keyLength);
WritableUtils.writeVInt(out, valueLength);
out.write(keyData, keyPos, keyLength);
out.write(valueData, valPos, valueLength);
// Update bytes written
decompressedBytesWritten +=
keyLength + valueLength + WritableUtils.getVIntSize(keyLength)
+ WritableUtils.getVIntSize(valueLength);
if (serializedUncompressedBytes != null) {
serializedUncompressedBytes.increment(keyLength + valueLength);
}
}
protected void writeRLE(DataOutputStream out) throws IOException {
/**
* To strike a balance between 2 use cases (lots of unique KV in stream
* vs lots of identical KV in stream), we start off by writing KV pair.
* If subsequent KV is identical, we write RLE marker along with V_END_MARKER
* {KL1, VL1, K1, V1}
* {RLE, VL2, V2, VL3, V3, ...V_END_MARKER}
*/
if (prevKey != REPEAT_KEY) {
WritableUtils.writeVInt(out, RLE_MARKER);
decompressedBytesWritten += RLE_MARKER_SIZE;
rleWritten++;
}
}
protected void writeValueMarker(DataOutputStream out) throws IOException {
/**
* Write V_END_MARKER only in RLE scenario. This will
* save space in conditions where lots of unique KV pairs are found in the
* stream.
*/
if (prevKey == REPEAT_KEY) {
WritableUtils.writeVInt(out, V_END_MARKER);
decompressedBytesWritten += V_END_MARKER_SIZE;
}
}
// Required for mark/reset
public DataOutputStream getOutputStream () {
return out;
}
// Required for mark/reset
public void updateCountersForExternalAppend(long length) {
++numRecordsWritten;
decompressedBytesWritten += length;
}
public long getRawLength() {
return decompressedBytesWritten;
}
public long getCompressedLength() {
return compressedBytesWritten;
}
}
/**
* <code>IFile.Reader</code> to read intermediate map-outputs.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static class Reader {
public enum KeyState {NO_KEY, NEW_KEY, SAME_KEY}
private static final int DEFAULT_BUFFER_SIZE = 128*1024;
@VisibleForTesting
// Not final for testing
protected static int MAX_BUFFER_SIZE
= Integer.MAX_VALUE - 8; // The maximum array size is a little less than the
// max integer value. Trying to create a larger array
// will result in an OOM exception. The exact value
// is JVM dependent so setting it to max int - 8 to be safe.
// Count records read from disk
private long numRecordsRead = 0;
private final TezCounter readRecordsCounter;
private final TezCounter bytesReadCounter;
final InputStream in; // Possibly decompressed stream that we read
Decompressor decompressor;
public long bytesRead = 0;
final long fileLength;
protected boolean eof = false;
IFileInputStream checksumIn;
protected byte[] buffer = null;
protected int bufferSize = DEFAULT_BUFFER_SIZE;
protected DataInputStream dataIn = null;
protected int recNo = 1;
protected int originalKeyLength;
protected int prevKeyLength;
byte keyBytes[] = new byte[0];
protected int currentKeyLength;
protected int currentValueLength;
long startPos;
/**
* Construct an IFile Reader.
*
* @param fs FileSystem
* @param file Path of the file to be opened. This file should have
* checksum bytes for the data at the end of the file.
* @param codec codec
* @param readsCounter Counter for records read from disk
* @throws IOException
*/
public Reader(FileSystem fs, Path file,
CompressionCodec codec,
TezCounter readsCounter, TezCounter bytesReadCounter, boolean ifileReadAhead,
int ifileReadAheadLength, int bufferSize) throws IOException {
this(fs.open(file), fs.getFileStatus(file).getLen(), codec,
readsCounter, bytesReadCounter, ifileReadAhead,
ifileReadAheadLength, bufferSize);
}
/**
* Construct an IFile Reader.
*
* @param in The input stream
* @param length Length of the data in the stream, including the checksum
* bytes.
* @param codec codec
* @param readsCounter Counter for records read from disk
* @throws IOException
*/
public Reader(InputStream in, long length,
CompressionCodec codec,
TezCounter readsCounter, TezCounter bytesReadCounter,
boolean readAhead, int readAheadLength,
int bufferSize) throws IOException {
this(in, ((in != null) ? (length - HEADER.length) : length), codec,
readsCounter, bytesReadCounter, readAhead, readAheadLength,
bufferSize, ((in != null) ? isCompressedFlagEnabled(in) : false));
if (in != null && bytesReadCounter != null) {
bytesReadCounter.increment(IFile.HEADER.length);
}
}
/**
* Construct an IFile Reader.
*
* @param in The input stream
* @param length Length of the data in the stream, including the checksum
* bytes.
* @param codec codec
* @param readsCounter Counter for records read from disk
* @throws IOException
*/
public Reader(InputStream in, long length,
CompressionCodec codec,
TezCounter readsCounter, TezCounter bytesReadCounter,
boolean readAhead, int readAheadLength,
int bufferSize, boolean isCompressed) throws IOException {
if (in != null) {
checksumIn = new IFileInputStream(in, length, readAhead,
readAheadLength/* , isCompressed */);
if (isCompressed && codec != null) {
decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
this.in = codec.createInputStream(checksumIn, decompressor);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
this.in = checksumIn;
}
} else {
this.in = checksumIn;
}
startPos = checksumIn.getPosition();
} else {
this.in = null;
}
if (in != null) {
this.dataIn = new DataInputStream(this.in);
}
this.readRecordsCounter = readsCounter;
this.bytesReadCounter = bytesReadCounter;
this.fileLength = length;
this.bufferSize = Math.max(0, bufferSize);
}
/**
* Read entire ifile content to memory.
*
* @param buffer
* @param in
* @param compressedLength
* @param codec
* @param ifileReadAhead
* @param ifileReadAheadLength
* @throws IOException
*/
public static void readToMemory(byte[] buffer, InputStream in, int compressedLength,
CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength)
throws IOException {
boolean isCompressed = IFile.Reader.isCompressedFlagEnabled(in);
IFileInputStream checksumIn = new IFileInputStream(in,
compressedLength - IFile.HEADER.length, ifileReadAhead,
ifileReadAheadLength);
in = checksumIn;
Decompressor decompressor = null;
if (isCompressed && codec != null) {
decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
decompressor.reset();
in = CodecUtils.getDecompressedInputStreamWithBufferSize(codec, checksumIn, decompressor,
compressedLength);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
in = checksumIn;
}
}
try {
IOUtils.readFully(in, buffer, 0, buffer.length - IFile.HEADER.length);
/*
* We've gotten the amount of data we were expecting. Verify the
* decompressor has nothing more to offer. This action also forces the
* decompressor to read any trailing bytes that weren't critical for
* decompression, which is necessary to keep the stream in sync.
*/
if (in.read() >= 0) {
throw new IOException("Unexpected extra bytes from input stream");
}
} catch (IOException ioe) {
if(in != null) {
try {
in.close();
} catch(IOException e) {
if(LOG.isDebugEnabled()) {
LOG.debug("Exception in closing " + in, e);
}
}
}
throw ioe;
} finally {
if (decompressor != null) {
decompressor.reset();
CodecPool.returnDecompressor(decompressor);
}
}
}
/**
* Read entire IFile content to disk.
*
* @param out the output stream that will receive the data
* @param in the input stream containing the IFile data
* @param length the amount of data to read from the input
* @return the number of bytes copied
* @throws IOException
*/
public static long readToDisk(OutputStream out, InputStream in, long length,
boolean ifileReadAhead, int ifileReadAheadLength)
throws IOException {
final int BYTES_TO_READ = 64 * 1024;
byte[] buf = new byte[BYTES_TO_READ];
// copy the IFile header
if (length < HEADER.length) {
throw new IOException("Missing IFile header");
}
IOUtils.readFully(in, buf, 0, HEADER.length);
verifyHeaderMagic(buf);
out.write(buf, 0, HEADER.length);
long bytesLeft = length - HEADER.length;
@SuppressWarnings("resource")
IFileInputStream ifInput = new IFileInputStream(in, bytesLeft,
ifileReadAhead, ifileReadAheadLength);
while (bytesLeft > 0) {
int n = ifInput.readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
if (n < 0) {
throw new IOException("read past end of stream");
}
out.write(buf, 0, n);
bytesLeft -= n;
}
return length - bytesLeft;
}
public long getLength() {
return fileLength - checksumIn.getSize();
}
public long getPosition() throws IOException {
return checksumIn.getPosition();
}
/**
* Read up to len bytes into buf starting at offset off.
*
* @param buf buffer
* @param off offset
* @param len length of buffer
* @return the no. of bytes read
* @throws IOException
*/
private int readData(byte[] buf, int off, int len) throws IOException {
int bytesRead = 0;
while (bytesRead < len) {
int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead,
len - bytesRead);
if (n < 0) {
return bytesRead;
}
bytesRead += n;
}
return len;
}
protected void readValueLength(DataInput dIn) throws IOException {
currentValueLength = WritableUtils.readVInt(dIn);
bytesRead += WritableUtils.getVIntSize(currentValueLength);
if (currentValueLength == V_END_MARKER) {
readKeyValueLength(dIn);
}
}
protected void readKeyValueLength(DataInput dIn) throws IOException {
currentKeyLength = WritableUtils.readVInt(dIn);
currentValueLength = WritableUtils.readVInt(dIn);
if (currentKeyLength != RLE_MARKER) {
// original key length
originalKeyLength = currentKeyLength;
}
bytesRead +=
WritableUtils.getVIntSize(currentKeyLength)
+ WritableUtils.getVIntSize(currentValueLength);
}
/**
* Reset key length and value length for next record in the file
*
* @param dIn
* @return true if key length and value length were set to the next
* false if end of file (EOF) marker was reached
* @throws IOException
*/
protected boolean positionToNextRecord(DataInput dIn) throws IOException {
// Sanity check
if (eof) {
throw new IOException(String.format("Reached EOF. Completed reading %d", bytesRead));
}
prevKeyLength = currentKeyLength;
if (prevKeyLength == RLE_MARKER) {
// Same key as previous one. Just read value length alone
readValueLength(dIn);
} else {
readKeyValueLength(dIn);
}
// Check for EOF
if (currentKeyLength == EOF_MARKER && currentValueLength == EOF_MARKER) {
eof = true;
return false;
}
// Sanity check
if (currentKeyLength != RLE_MARKER && currentKeyLength < 0) {
throw new IOException("Rec# " + recNo + ": Negative key-length: " +
currentKeyLength + " PreviousKeyLen: " + prevKeyLength);
}
if (currentValueLength < 0) {
throw new IOException("Rec# " + recNo + ": Negative value-length: " +
currentValueLength);
}
return true;
}
public final boolean nextRawKey(DataInputBuffer key) throws IOException {
return readRawKey(key) != KeyState.NO_KEY;
}
private static byte[] createLargerArray(int currentLength) {
if (currentLength > MAX_BUFFER_SIZE) {
throw new IllegalArgumentException(
String.format(REQ_BUFFER_SIZE_TOO_LARGE, currentLength, MAX_BUFFER_SIZE));
}
int newLength;
if (currentLength > (MAX_BUFFER_SIZE - currentLength)) {
// possible overflow: if (2*currentLength > MAX_BUFFER_SIZE)
newLength = currentLength;
} else {
newLength = currentLength << 1;
}
return new byte[newLength];
}
public KeyState readRawKey(DataInputBuffer key) throws IOException {
if (!positionToNextRecord(dataIn)) {
if (LOG.isDebugEnabled()) {
LOG.debug("currentKeyLength=" + currentKeyLength +
", currentValueLength=" + currentValueLength +
", bytesRead=" + bytesRead +
", length=" + fileLength);
}
return KeyState.NO_KEY;
}
if(currentKeyLength == RLE_MARKER) {
// get key length from original key
key.reset(keyBytes, originalKeyLength);
return KeyState.SAME_KEY;
}
if (keyBytes.length < currentKeyLength) {
keyBytes = createLargerArray(currentKeyLength);
}
int i = readData(keyBytes, 0, currentKeyLength);
if (i != currentKeyLength) {
throw new IOException(String.format(INCOMPLETE_READ, currentKeyLength, i));
}
key.reset(keyBytes, currentKeyLength);
bytesRead += currentKeyLength;
return KeyState.NEW_KEY;
}
public void nextRawValue(DataInputBuffer value) throws IOException {
final byte[] valBytes;
if ((value.getData().length < currentValueLength) || (value.getData() == keyBytes)) {
valBytes = createLargerArray(currentValueLength);
} else {
valBytes = value.getData();
}
int i = readData(valBytes, 0, currentValueLength);
if (i != currentValueLength) {
throw new IOException(String.format(INCOMPLETE_READ, currentValueLength, i));
}
value.reset(valBytes, currentValueLength);
// Record the bytes read
bytesRead += currentValueLength;
++recNo;
++numRecordsRead;
}
private static void verifyHeaderMagic(byte[] header) throws IOException {
if (!(header[0] == 'T' && header[1] == 'I'
&& header[2] == 'F')) {
throw new IOException("Not a valid ifile header");
}
}
public static boolean isCompressedFlagEnabled(InputStream in) throws IOException {
byte[] header = new byte[HEADER.length];
IOUtils.readFully(in, header, 0, HEADER.length);
verifyHeaderMagic(header);
return (header[3] == 1);
}
public void close() throws IOException {
// Close the underlying stream
in.close();
// Release the buffer
dataIn = null;
buffer = null;
if (readRecordsCounter != null) {
readRecordsCounter.increment(numRecordsRead);
}
if (bytesReadCounter != null) {
bytesReadCounter.increment(checksumIn.getPosition() - startPos + checksumIn.getSize());
}
// Return the decompressor
if (decompressor != null) {
decompressor.reset();
CodecPool.returnDecompressor(decompressor);
decompressor = null;
}
}
public void reset(int offset) {
return;
}
public void disableChecksumValidation() {
checksumIn.disableChecksumValidation();
}
}
}