blob: b5f7a44219f1cf8edd8e3fbdd6d155efd72d3877 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nutch.tools.arc;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.zip.GZIPInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
/**
* The <code>ArchRecordReader</code> class provides a record reader which reads
* records from arc files.
* <p>
* Arc files are essentially tars of gzips. Each record in an arc file is a
* compressed gzip. Multiple records are concatenated together to form a
* complete arc.</p>
* <p>For more information on the arc file format
* @see <a href='http://www.archive.org/web/researcher/ArcFileFormat.php'>ArcFileFormat</a>.
* </p>
*
* <p>
* Arc files are used by the internet archive and grub projects.
* </p>
*
* @see <a href='http://www.archive.org/'>archive.org</a>
* @see <a href='http://www.grub.org/'>grub.org</a>
*/
public class ArcRecordReader extends RecordReader<Text, BytesWritable> {
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
protected Configuration conf;
protected long splitStart = 0;
protected long pos = 0;
protected long splitEnd = 0;
protected long splitLen = 0;
protected long fileLen = 0;
protected FSDataInputStream in;
private static byte[] MAGIC = { (byte) 0x1F, (byte) 0x8B };
/**
* <p>
* Returns true if the byte array passed matches the gzip header magic number.
* </p>
*
* @param input
* The byte array to check.
*
* @return True if the byte array matches the gzip header magic number.
*/
public static boolean isMagic(byte[] input) {
// check for null and incorrect length
if (input == null || input.length != MAGIC.length) {
return false;
}
// check byte by byte
for (int i = 0; i < MAGIC.length; i++) {
if (MAGIC[i] != input[i]) {
return false;
}
}
// must match
return true;
}
/**
* Constructor that sets the configuration and file split.
*
* @param conf
* The job configuration.
* @param split
* The file split to read from.
*
* @throws IOException
* If an IO error occurs while initializing file split.
*/
public ArcRecordReader(Configuration conf, FileSplit split)
throws IOException {
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
fileLen = fs.getFileStatus(split.getPath()).getLen();
this.conf = conf;
this.in = fs.open(split.getPath());
this.splitStart = split.getStart();
this.splitEnd = splitStart + split.getLength();
this.splitLen = split.getLength();
in.seek(splitStart);
}
/**
* Closes the record reader resources.
*/
public void close() throws IOException {
this.in.close();
}
/**
* Creates a new instance of the <code>Text</code> object for the key.
*/
public Text createKey() {
return ReflectionUtils.newInstance(Text.class, conf);
}
/**
* Creates a new instance of the <code>BytesWritable</code> object for the key
*/
public BytesWritable createValue() {
return ReflectionUtils.newInstance(BytesWritable.class, conf);
}
/**
* Returns the current position in the file.
*
* @return The long of the current position in the file.
*/
public long getPos() throws IOException {
return in.getPos();
}
/**
* Returns the percentage of progress in processing the file. This will be
* represented as a float from 0 to 1 with 1 being 100% completed.
*
* @return The percentage of progress as a float from 0 to 1.
*/
public float getProgress() throws IOException {
// if we haven't even started
if (splitEnd == splitStart) {
return 0.0f;
} else {
// the progress is current pos - where we started / length of the split
return Math.min(1.0f, (getPos() - splitStart) / (float) splitLen);
}
}
public BytesWritable getCurrentValue(){
return new BytesWritable();
}
public Text getCurrentKey(){
return new Text();
}
public boolean nextKeyValue(){
return false;
}
public void initialize(InputSplit split, TaskAttemptContext context){
}
/**
* <p>
* Returns true if the next record in the split is read into the key and value
* pair. The key will be the arc record header and the values will be the raw
* content bytes of the arc record.
* </p>
*
* @param key
* The record key
* @param value
* The record value
*
* @return True if the next record is read.
*
* @throws IOException
* If an error occurs while reading the record value.
*/
public boolean next(Text key, BytesWritable value) throws IOException {
try {
// get the starting position on the input stream
long startRead = in.getPos();
byte[] magicBuffer = null;
// we need this loop to handle false positives in reading of gzip records
while (true) {
// while we haven't passed the end of the split
if (startRead >= splitEnd) {
return false;
}
// scanning for the gzip header
boolean foundStart = false;
while (!foundStart) {
// start at the current file position and scan for 1K at time, break
// if there is no more to read
startRead = in.getPos();
magicBuffer = new byte[1024];
int read = in.read(magicBuffer);
if (read < 0) {
break;
}
// scan the byte array for the gzip header magic number. This happens
// byte by byte
for (int i = 0; i < read - 1; i++) {
byte[] testMagic = new byte[2];
System.arraycopy(magicBuffer, i, testMagic, 0, 2);
if (isMagic(testMagic)) {
// set the next start to the current gzip header
startRead += i;
foundStart = true;
break;
}
}
}
// seek to the start of the gzip header
in.seek(startRead);
ByteArrayOutputStream baos = null;
int totalRead = 0;
try {
// read 4K of the gzip at a time putting into a byte array
byte[] buffer = new byte[4096];
GZIPInputStream zin = new GZIPInputStream(in);
int gzipRead = -1;
baos = new ByteArrayOutputStream();
while ((gzipRead = zin.read(buffer, 0, buffer.length)) != -1) {
baos.write(buffer, 0, gzipRead);
totalRead += gzipRead;
}
} catch (Exception e) {
// there are times we get false positives where the gzip header exists
// but it is not an actual gzip record, so we ignore it and start
// over seeking
System.out.println("Ignoring position: " + (startRead));
if (startRead + 1 < fileLen) {
in.seek(startRead + 1);
}
continue;
}
// change the output stream to a byte array
byte[] content = baos.toByteArray();
// the first line of the raw content in arc files is the header
int eol = 0;
for (int i = 0; i < content.length; i++) {
if (i > 0 && content[i] == '\n') {
eol = i;
break;
}
}
// create the header and the raw content minus the header
String header = new String(content, 0, eol).trim();
byte[] raw = new byte[(content.length - eol) - 1];
System.arraycopy(content, eol + 1, raw, 0, raw.length);
// populate key and values with the header and raw content.
Text keyText = key;
keyText.set(header);
BytesWritable valueBytes = value;
valueBytes.set(raw, 0, raw.length);
// TODO: It would be best to start at the end of the gzip read but
// the bytes read in gzip don't match raw bytes in the file so we
// overshoot the next header. With this current method you get
// some false positives but don't miss records.
if (startRead + 1 < fileLen) {
in.seek(startRead + 1);
}
// populated the record, now return
return true;
}
} catch (Exception e) {
LOG.error("Failed reading ARC record: ", e);
}
// couldn't populate the record or there is no next record to read
return false;
}
}