blob: a70752a1860cdf448e23007300c58564ebc3f470 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.bzip2r;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFileInputFormat;
import org.apache.tools.bzip2r.CBZip2InputStream;
@SuppressWarnings("unchecked")
public class Bzip2TextInputFormat extends PigFileInputFormat {
/**
* Treats keys as offset in file and value as line. Since the input file is
* compressed, the offset for a particular line is not well-defined. This
* implementation returns the starting position of a compressed block as the
* key for every line in that block.
*/
private static class BZip2LineRecordReader extends RecordReader<LongWritable,Text> {
private long start;
private long end;
private long pos;
private CBZip2InputStream in;
private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
// flag to indicate if previous character read was Carriage Return ('\r')
// and the next character was not Line Feed ('\n')
private boolean CRFollowedByNonLF = false;
// in the case where a Carriage Return ('\r') was not followed by a
// Line Feed ('\n'), this variable will hold that non Line Feed character
// that was read from the underlying stream.
private byte nonLFChar;
/**
* Provide a bridge to get the bytes from the ByteArrayOutputStream without
* creating a new byte array.
*/
private static class TextStuffer extends OutputStream {
public Text target;
@Override
public void write(int b) {
throw new UnsupportedOperationException("write(byte) not supported");
}
@Override
public void write(byte[] data, int offset, int len) throws IOException {
target.clear();
target.set(data, offset, len);
}
}
private TextStuffer bridge = new TextStuffer();
private LongWritable key = new LongWritable();
private Text value = new Text();
public BZip2LineRecordReader(Configuration job, FileSplit split)
throws IOException {
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
fileIn.seek(start);
in = new CBZip2InputStream(fileIn, 9, end);
if (start != 0) {
// skip first line and re-establish "start".
// LineRecordReader.readLine(this.in, null);
readLine(this.in, null);
start = in.getPos();
}
pos = in.getPos();
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
/*
* LineRecordReader.readLine() is depricated in HAdoop 0.17. So it is added here
* locally.
*/
private long readLine(InputStream in,
OutputStream out) throws IOException {
long bytes = 0;
while (true) {
int b = -1;
if(CRFollowedByNonLF) {
// In the previous call, a Carriage Return ('\r') was followed
// by a non Line Feed ('\n') character - in that call we would
// have not returned the non Line Feed character but would have
// read it from the stream - lets use that already read character
// now
b = nonLFChar;
CRFollowedByNonLF = false;
} else {
b = in.read();
}
if (b == -1) {
break;
}
bytes += 1;
byte c = (byte)b;
if (c == '\n') {
break;
}
if (c == '\r') {
byte nextC = (byte)in.read();
if (nextC != '\n') {
CRFollowedByNonLF = true;
nonLFChar = nextC;
} else {
bytes += 1;
}
break;
}
if (out != null) {
out.write(c);
}
}
return bytes;
}
/** Read a line. */
public boolean next(LongWritable key, Text value)
throws IOException {
if (pos > end)
return false;
key.set(pos); // key is position
buffer.reset();
// long bytesRead = LineRecordReader.readLine(in, buffer);
long bytesRead = readLine(in, buffer);
if (bytesRead == 0) {
return false;
}
pos = in.getPos();
// if we have read ahead because we encountered a carriage return
// char followed by a non line feed char, decrement the pos
if(CRFollowedByNonLF) {
pos--;
}
bridge.target = value;
buffer.writeTo(bridge);
return true;
}
/**
* Get the progress within the split
*/
@Override
public float getProgress() {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float) (end - start));
}
}
public long getPos() throws IOException {
return pos;
}
@Override
public void close() throws IOException {
in.close();
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// no op
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return next(key, value);
}
}
@Override
public RecordReader createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
return new BZip2LineRecordReader(context.getConfiguration(),
(FileSplit) split);
}
}