blob: 57e74df819c274f8b79502017a4e82225d28691b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* <code>IFile</code> is the simple <key-len, value-len, key, value> format
* for the intermediate map-outputs in Map-Reduce.
*
* There is a <code>Writer</code> to write out map-outputs in this format and
* a <code>Reader</code> to read files of this format.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class IFile {
private static final Log LOG = LogFactory.getLog(IFile.class);
public static final int EOF_MARKER = -1; // End of File Marker
/**
* <code>IFile.Writer</code> to write out intermediate map-outputs.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static class Writer<K extends Object, V extends Object> {
FSDataOutputStream out;
boolean ownOutputStream = false;
long start = 0;
FSDataOutputStream rawOut;
CompressionOutputStream compressedOut;
Compressor compressor;
boolean compressOutput = false;
long decompressedBytesWritten = 0;
long compressedBytesWritten = 0;
// Count records written to disk
private long numRecordsWritten = 0;
private final Counters.Counter writtenRecordsCounter;
IFileOutputStream checksumOut;
Class<K> keyClass;
Class<V> valueClass;
Serializer<K> keySerializer;
Serializer<V> valueSerializer;
DataOutputBuffer buffer = new DataOutputBuffer();
public Writer(Configuration conf, FileSystem fs, Path file,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec,
Counters.Counter writesCounter) throws IOException {
this(conf, fs.create(file), keyClass, valueClass, codec,
writesCounter);
ownOutputStream = true;
}
protected Writer(Counters.Counter writesCounter) {
writtenRecordsCounter = writesCounter;
}
public Writer(Configuration conf, FSDataOutputStream out,
Class<K> keyClass, Class<V> valueClass,
CompressionCodec codec, Counters.Counter writesCounter)
throws IOException {
this.writtenRecordsCounter = writesCounter;
this.checksumOut = new IFileOutputStream(out);
this.rawOut = out;
this.start = this.rawOut.getPos();
if (codec != null) {
this.compressor = CodecPool.getCompressor(codec);
if (this.compressor != null) {
this.compressor.reset();
this.compressedOut = codec.createOutputStream(checksumOut, compressor);
this.out = new FSDataOutputStream(this.compressedOut, null);
this.compressOutput = true;
} else {
LOG.warn("Could not obtain compressor from CodecPool");
this.out = new FSDataOutputStream(checksumOut,null);
}
} else {
this.out = new FSDataOutputStream(checksumOut,null);
}
this.keyClass = keyClass;
this.valueClass = valueClass;
if (keyClass != null) {
SerializationFactory serializationFactory =
new SerializationFactory(conf);
this.keySerializer = serializationFactory.getSerializer(keyClass);
this.keySerializer.open(buffer);
this.valueSerializer = serializationFactory.getSerializer(valueClass);
this.valueSerializer.open(buffer);
}
}
public Writer(Configuration conf, FileSystem fs, Path file)
throws IOException {
this(conf, fs, file, null, null, null, null);
}
public void close() throws IOException {
// When IFile writer is created by BackupStore, we do not have
// Key and Value classes set. So, check before closing the
// serializers
if (keyClass != null) {
keySerializer.close();
valueSerializer.close();
}
// Write EOF_MARKER for key/value length
WritableUtils.writeVInt(out, EOF_MARKER);
WritableUtils.writeVInt(out, EOF_MARKER);
decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
//Flush the stream
out.flush();
if (compressOutput) {
// Flush
compressedOut.finish();
compressedOut.resetState();
}
// Close the underlying stream iff we own it...
if (ownOutputStream) {
out.close();
}
else {
// Write the checksum
checksumOut.finish();
}
compressedBytesWritten = rawOut.getPos() - start;
if (compressOutput) {
// Return back the compressor
CodecPool.returnCompressor(compressor);
compressor = null;
}
out = null;
if(writtenRecordsCounter != null) {
writtenRecordsCounter.increment(numRecordsWritten);
}
}
public void append(K key, V value) throws IOException {
if (key.getClass() != keyClass)
throw new IOException("wrong key class: "+ key.getClass()
+" is not "+ keyClass);
if (value.getClass() != valueClass)
throw new IOException("wrong value class: "+ value.getClass()
+" is not "+ valueClass);
// Append the 'key'
keySerializer.serialize(key);
int keyLength = buffer.getLength();
if (keyLength < 0) {
throw new IOException("Negative key-length not allowed: " + keyLength +
" for " + key);
}
// Append the 'value'
valueSerializer.serialize(value);
int valueLength = buffer.getLength() - keyLength;
if (valueLength < 0) {
throw new IOException("Negative value-length not allowed: " +
valueLength + " for " + value);
}
// Write the record out
WritableUtils.writeVInt(out, keyLength); // key length
WritableUtils.writeVInt(out, valueLength); // value length
out.write(buffer.getData(), 0, buffer.getLength()); // data
// Reset
buffer.reset();
// Update bytes written
decompressedBytesWritten += keyLength + valueLength +
WritableUtils.getVIntSize(keyLength) +
WritableUtils.getVIntSize(valueLength);
++numRecordsWritten;
}
public void append(DataInputBuffer key, DataInputBuffer value)
throws IOException {
int keyLength = key.getLength() - key.getPosition();
if (keyLength < 0) {
throw new IOException("Negative key-length not allowed: " + keyLength +
" for " + key);
}
int valueLength = value.getLength() - value.getPosition();
if (valueLength < 0) {
throw new IOException("Negative value-length not allowed: " +
valueLength + " for " + value);
}
WritableUtils.writeVInt(out, keyLength);
WritableUtils.writeVInt(out, valueLength);
out.write(key.getData(), key.getPosition(), keyLength);
out.write(value.getData(), value.getPosition(), valueLength);
// Update bytes written
decompressedBytesWritten += keyLength + valueLength +
WritableUtils.getVIntSize(keyLength) +
WritableUtils.getVIntSize(valueLength);
++numRecordsWritten;
}
// Required for mark/reset
public DataOutputStream getOutputStream () {
return out;
}
// Required for mark/reset
public void updateCountersForExternalAppend(long length) {
++numRecordsWritten;
decompressedBytesWritten += length;
}
public long getRawLength() {
return decompressedBytesWritten;
}
public long getCompressedLength() {
return compressedBytesWritten;
}
}
/**
* <code>IFile.Reader</code> to read intermediate map-outputs.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static class Reader<K extends Object, V extends Object> {
private static final int DEFAULT_BUFFER_SIZE = 128*1024;
private static final int MAX_VINT_SIZE = 9;
// Count records read from disk
private long numRecordsRead = 0;
private final Counters.Counter readRecordsCounter;
final InputStream in; // Possibly decompressed stream that we read
Decompressor decompressor;
public long bytesRead = 0;
protected final long fileLength;
protected boolean eof = false;
final IFileInputStream checksumIn;
protected byte[] buffer = null;
protected int bufferSize = DEFAULT_BUFFER_SIZE;
protected DataInputStream dataIn;
protected int recNo = 1;
protected int currentKeyLength;
protected int currentValueLength;
byte keyBytes[] = new byte[0];
/**
* Construct an IFile Reader.
*
* @param conf Configuration File
* @param fs FileSystem
* @param file Path of the file to be opened. This file should have
* checksum bytes for the data at the end of the file.
* @param codec codec
* @param readsCounter Counter for records read from disk
* @throws IOException
*/
public Reader(Configuration conf, FileSystem fs, Path file,
CompressionCodec codec,
Counters.Counter readsCounter) throws IOException {
this(conf, fs.open(file),
fs.getFileStatus(file).getLen(),
codec, readsCounter);
}
/**
* Construct an IFile Reader.
*
* @param conf Configuration File
* @param in The input stream
* @param length Length of the data in the stream, including the checksum
* bytes.
* @param codec codec
* @param readsCounter Counter for records read from disk
* @throws IOException
*/
public Reader(Configuration conf, FSDataInputStream in, long length,
CompressionCodec codec,
Counters.Counter readsCounter) throws IOException {
readRecordsCounter = readsCounter;
checksumIn = new IFileInputStream(in,length);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
if (decompressor != null) {
this.in = codec.createInputStream(checksumIn, decompressor);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
this.in = checksumIn;
}
} else {
this.in = checksumIn;
}
this.dataIn = new DataInputStream(this.in);
this.fileLength = length;
if (conf != null) {
bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
}
}
public long getLength() {
return fileLength - checksumIn.getSize();
}
public long getPosition() throws IOException {
return checksumIn.getPosition();
}
/**
* Read upto len bytes into buf starting at offset off.
*
* @param buf buffer
* @param off offset
* @param len length of buffer
* @return the no. of bytes read
* @throws IOException
*/
private int readData(byte[] buf, int off, int len) throws IOException {
int bytesRead = 0;
while (bytesRead < len) {
int n = in.read(buf, off+bytesRead, len-bytesRead);
if (n < 0) {
return bytesRead;
}
bytesRead += n;
}
return len;
}
protected boolean positionToNextRecord(DataInput dIn) throws IOException {
// Sanity check
if (eof) {
throw new EOFException("Completed reading " + bytesRead);
}
// Read key and value lengths
currentKeyLength = WritableUtils.readVInt(dIn);
currentValueLength = WritableUtils.readVInt(dIn);
bytesRead += WritableUtils.getVIntSize(currentKeyLength) +
WritableUtils.getVIntSize(currentValueLength);
// Check for EOF
if (currentKeyLength == EOF_MARKER && currentValueLength == EOF_MARKER) {
eof = true;
return false;
}
// Sanity check
if (currentKeyLength < 0) {
throw new IOException("Rec# " + recNo + ": Negative key-length: " +
currentKeyLength);
}
if (currentValueLength < 0) {
throw new IOException("Rec# " + recNo + ": Negative value-length: " +
currentValueLength);
}
return true;
}
public boolean nextRawKey(DataInputBuffer key) throws IOException {
if (!positionToNextRecord(dataIn)) {
return false;
}
if (keyBytes.length < currentKeyLength) {
keyBytes = new byte[currentKeyLength << 1];
}
int i = readData(keyBytes, 0, currentKeyLength);
if (i != currentKeyLength) {
throw new IOException ("Asked for " + currentKeyLength + " Got: " + i);
}
key.reset(keyBytes, currentKeyLength);
bytesRead += currentKeyLength;
return true;
}
public void nextRawValue(DataInputBuffer value) throws IOException {
final byte[] valBytes = (value.getData().length < currentValueLength)
? new byte[currentValueLength << 1]
: value.getData();
int i = readData(valBytes, 0, currentValueLength);
if (i != currentValueLength) {
throw new IOException ("Asked for " + currentValueLength + " Got: " + i);
}
value.reset(valBytes, currentValueLength);
// Record the bytes read
bytesRead += currentValueLength;
++recNo;
++numRecordsRead;
}
public void close() throws IOException {
// Close the underlying stream
in.close();
// Release the buffer
dataIn = null;
buffer = null;
if(readRecordsCounter != null) {
readRecordsCounter.increment(numRecordsRead);
}
// Return the decompressor
if (decompressor != null) {
decompressor.reset();
CodecPool.returnDecompressor(decompressor);
decompressor = null;
}
}
public void reset(int offset) {
return;
}
public void disableChecksumValidation() {
checksumIn.disableChecksumValidation();
}
}
}