| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nutch.tools.arc; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.util.zip.GZIPInputStream; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.mapred.FileSplit; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.StringUtils; |
| |
| /** |
| * The <code>ArchRecordReader</code> class provides a record reader which reads |
| * records from arc files. |
| * <p> |
| * Arc files are essentially tars of gzips. Each record in an arc file is a |
| * compressed gzip. Multiple records are concatenated together to form a |
| * complete arc.</p> |
| * <p>For more information on the arc file format |
| * @see <a href='http://www.archive.org/web/researcher/ArcFileFormat.php'>ArcFileFormat</a>. |
| * </p> |
| * |
| * <p> |
| * Arc files are used by the internet archive and grub projects. |
| * </p> |
| * |
| * @see <a href='http://www.archive.org/'>archive.org</a> |
| * @see <a href='http://www.grub.org/'>grub.org</a> |
| */ |
| public class ArcRecordReader extends RecordReader<Text, BytesWritable> { |
| |
| private static final Logger LOG = LoggerFactory |
| .getLogger(MethodHandles.lookup().lookupClass()); |
| |
| protected Configuration conf; |
| protected long splitStart = 0; |
| protected long pos = 0; |
| protected long splitEnd = 0; |
| protected long splitLen = 0; |
| protected long fileLen = 0; |
| protected FSDataInputStream in; |
| |
| private static byte[] MAGIC = { (byte) 0x1F, (byte) 0x8B }; |
| |
| /** |
| * <p> |
| * Returns true if the byte array passed matches the gzip header magic number. |
| * </p> |
| * |
| * @param input |
| * The byte array to check. |
| * |
| * @return True if the byte array matches the gzip header magic number. |
| */ |
| public static boolean isMagic(byte[] input) { |
| |
| // check for null and incorrect length |
| if (input == null || input.length != MAGIC.length) { |
| return false; |
| } |
| |
| // check byte by byte |
| for (int i = 0; i < MAGIC.length; i++) { |
| if (MAGIC[i] != input[i]) { |
| return false; |
| } |
| } |
| |
| // must match |
| return true; |
| } |
| |
| /** |
| * Constructor that sets the configuration and file split. |
| * |
| * @param conf |
| * The job configuration. |
| * @param split |
| * The file split to read from. |
| * |
| * @throws IOException |
| * If an IO error occurs while initializing file split. |
| */ |
| public ArcRecordReader(Configuration conf, FileSplit split) |
| throws IOException { |
| |
| Path path = split.getPath(); |
| FileSystem fs = path.getFileSystem(conf); |
| fileLen = fs.getFileStatus(split.getPath()).getLen(); |
| this.conf = conf; |
| this.in = fs.open(split.getPath()); |
| this.splitStart = split.getStart(); |
| this.splitEnd = splitStart + split.getLength(); |
| this.splitLen = split.getLength(); |
| in.seek(splitStart); |
| } |
| |
| /** |
| * Closes the record reader resources. |
| */ |
| public void close() throws IOException { |
| this.in.close(); |
| } |
| |
| /** |
| * Creates a new instance of the <code>Text</code> object for the key. |
| */ |
| public Text createKey() { |
| return ReflectionUtils.newInstance(Text.class, conf); |
| } |
| |
| /** |
| * Creates a new instance of the <code>BytesWritable</code> object for the key |
| */ |
| public BytesWritable createValue() { |
| return ReflectionUtils.newInstance(BytesWritable.class, conf); |
| } |
| |
| /** |
| * Returns the current position in the file. |
| * |
| * @return The long of the current position in the file. |
| */ |
| public long getPos() throws IOException { |
| return in.getPos(); |
| } |
| |
| /** |
| * Returns the percentage of progress in processing the file. This will be |
| * represented as a float from 0 to 1 with 1 being 100% completed. |
| * |
| * @return The percentage of progress as a float from 0 to 1. |
| */ |
| public float getProgress() throws IOException { |
| |
| // if we haven't even started |
| if (splitEnd == splitStart) { |
| return 0.0f; |
| } else { |
| // the progress is current pos - where we started / length of the split |
| return Math.min(1.0f, (getPos() - splitStart) / (float) splitLen); |
| } |
| } |
| |
| public BytesWritable getCurrentValue(){ |
| return new BytesWritable(); |
| } |
| |
| public Text getCurrentKey(){ |
| return new Text(); |
| } |
| |
| public boolean nextKeyValue(){ |
| return false; |
| } |
| |
| public void initialize(InputSplit split, TaskAttemptContext context){ |
| |
| } |
| |
| /** |
| * <p> |
| * Returns true if the next record in the split is read into the key and value |
| * pair. The key will be the arc record header and the values will be the raw |
| * content bytes of the arc record. |
| * </p> |
| * |
| * @param key |
| * The record key |
| * @param value |
| * The record value |
| * |
| * @return True if the next record is read. |
| * |
| * @throws IOException |
| * If an error occurs while reading the record value. |
| */ |
| public boolean next(Text key, BytesWritable value) throws IOException { |
| |
| try { |
| |
| // get the starting position on the input stream |
| long startRead = in.getPos(); |
| byte[] magicBuffer = null; |
| |
| // we need this loop to handle false positives in reading of gzip records |
| while (true) { |
| |
| // while we haven't passed the end of the split |
| if (startRead >= splitEnd) { |
| return false; |
| } |
| |
| // scanning for the gzip header |
| boolean foundStart = false; |
| while (!foundStart) { |
| |
| // start at the current file position and scan for 1K at time, break |
| // if there is no more to read |
| startRead = in.getPos(); |
| magicBuffer = new byte[1024]; |
| int read = in.read(magicBuffer); |
| if (read < 0) { |
| break; |
| } |
| |
| // scan the byte array for the gzip header magic number. This happens |
| // byte by byte |
| for (int i = 0; i < read - 1; i++) { |
| byte[] testMagic = new byte[2]; |
| System.arraycopy(magicBuffer, i, testMagic, 0, 2); |
| if (isMagic(testMagic)) { |
| // set the next start to the current gzip header |
| startRead += i; |
| foundStart = true; |
| break; |
| } |
| } |
| } |
| |
| // seek to the start of the gzip header |
| in.seek(startRead); |
| ByteArrayOutputStream baos = null; |
| int totalRead = 0; |
| |
| try { |
| |
| // read 4K of the gzip at a time putting into a byte array |
| byte[] buffer = new byte[4096]; |
| GZIPInputStream zin = new GZIPInputStream(in); |
| int gzipRead = -1; |
| baos = new ByteArrayOutputStream(); |
| while ((gzipRead = zin.read(buffer, 0, buffer.length)) != -1) { |
| baos.write(buffer, 0, gzipRead); |
| totalRead += gzipRead; |
| } |
| } catch (Exception e) { |
| |
| // there are times we get false positives where the gzip header exists |
| // but it is not an actual gzip record, so we ignore it and start |
| // over seeking |
| System.out.println("Ignoring position: " + (startRead)); |
| if (startRead + 1 < fileLen) { |
| in.seek(startRead + 1); |
| } |
| continue; |
| } |
| |
| // change the output stream to a byte array |
| byte[] content = baos.toByteArray(); |
| |
| // the first line of the raw content in arc files is the header |
| int eol = 0; |
| for (int i = 0; i < content.length; i++) { |
| if (i > 0 && content[i] == '\n') { |
| eol = i; |
| break; |
| } |
| } |
| |
| // create the header and the raw content minus the header |
| String header = new String(content, 0, eol).trim(); |
| byte[] raw = new byte[(content.length - eol) - 1]; |
| System.arraycopy(content, eol + 1, raw, 0, raw.length); |
| |
| // populate key and values with the header and raw content. |
| Text keyText = key; |
| keyText.set(header); |
| BytesWritable valueBytes = value; |
| valueBytes.set(raw, 0, raw.length); |
| |
| // TODO: It would be best to start at the end of the gzip read but |
| // the bytes read in gzip don't match raw bytes in the file so we |
| // overshoot the next header. With this current method you get |
| // some false positives but don't miss records. |
| if (startRead + 1 < fileLen) { |
| in.seek(startRead + 1); |
| } |
| |
| // populated the record, now return |
| return true; |
| } |
| } catch (Exception e) { |
| LOG.error("Failed reading ARC record: ", e); |
| } |
| |
| // couldn't populate the record or there is no next record to read |
| return false; |
| } |
| } |