| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.pig.piggybank.storage; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.Arrays; |
| import java.util.Comparator; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PositionedReadable; |
| import org.apache.hadoop.fs.Seekable; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.WritableComparable; |
| import org.apache.hadoop.mapreduce.InputFormat; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.OutputFormat; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.mapreduce.RecordWriter; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| import org.apache.hadoop.mapreduce.lib.input.FileSplit; |
| import org.apache.pig.IndexableLoadFunc; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat; |
| import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; |
| import org.apache.pig.piggybank.storage.IndexedStorage.IndexedStorageInputFormat.IndexedStorageRecordReader; |
| import org.apache.pig.piggybank.storage.IndexedStorage.IndexedStorageInputFormat.IndexedStorageRecordReader.IndexedStorageRecordReaderComparator; |
| import org.apache.pig.builtin.PigStorage; |
| import org.apache.pig.data.DataReaderWriter; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.data.TupleFactory; |
| import org.apache.pig.impl.util.StorageUtil; |
| import org.apache.pig.data.DataType; |
| import org.apache.pig.data.DataByteArray; |
| import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; |
| |
| /** |
| * <code>IndexedStorage</code> is a form of <code>PigStorage</code> that supports a |
| * per record seek. <code>IndexedStorage</code> creates a separate (hidden) index file for |
| * every data file that is written. The format of the index file is: |
| * <pre> |
| * | Header | |
| * | Index Body | |
| * | Footer | |
| * </pre> |
| * The Header contains the list of record indices (field numbers) that represent index keys. |
| * The Index Body contains a <code>Tuple</code> for each record in the data. |
| * The fields of the <code>Tuple</code> are: |
| * <ul> |
| * <li> The index key(s) <code>Tuple</code> </li> |
| * <li> The number of records that share this index key. </li> |
| * <li> Offset into the data file to read the first matching record. </li> |
| * </ul> |
| * The Footer contains sequentially: |
| * <ul> |
| * <li> The smallest key(s) <code>Tuple</code> in the index. </li> |
| * <li> The largest key(s) <code>Tuple</code> in the index. </li> |
| * <li> The offset in bytes to the start of the footer </li> |
| * </ul> |
| * |
| * <code>IndexStorage</code> implements <code>IndexableLoadFunc</code> and |
| * can be used as the 'right table' in a PIG 'merge' or 'merge-sparse' join. |
| * |
| * <code>IndexStorage</code> does not require the data to be globally partitioned & sorted |
| * by index keys. Each partition (separate index) must be locally sorted. |
| * |
| * Also note IndexStorage is a loader to demonstrate "merge-sparse" join. |
| */ |
| public class IndexedStorage extends PigStorage implements IndexableLoadFunc { |
| |
| /** |
| * Constructs a Pig Storer that uses specified regex as a field delimiter. |
| * @param delimiter - field delimiter to use |
| * @param offsetsToIndexKeys - list of offset into Tuple for index keys (comma separated) |
| */ |
| public IndexedStorage(String delimiter, String offsetsToIndexKeys) { |
| super(delimiter); |
| |
| this.fieldDelimiter = StorageUtil.parseFieldDel(delimiter); |
| |
| String[] stroffsetsToIndexKeys = offsetsToIndexKeys.split(","); |
| this.offsetsToIndexKeys = new int[stroffsetsToIndexKeys.length]; |
| for (int i = 0; i < stroffsetsToIndexKeys.length; ++i) { |
| this.offsetsToIndexKeys[i] = Integer.parseInt(stroffsetsToIndexKeys[i]); |
| } |
| } |
| |
| @Override |
| public OutputFormat getOutputFormat() { |
| return new IndexedStorageOutputFormat(fieldDelimiter, offsetsToIndexKeys); |
| } |
| |
| /** |
| * Assumes this list of readers is already sorted except for the provided element. |
| * This element is bubbled up the array to its appropriate sort location |
| * (faster than doing a Utils sort). |
| */ |
| private void sortReader(int startIndex) { |
| int idx = startIndex; |
| while (idx < this.readers.length - 1) { |
| IndexedStorageRecordReader reader1 = this.readers[idx]; |
| IndexedStorageRecordReader reader2 = this.readers[idx+1]; |
| if (this.readerComparator.compare(reader1, reader2) <= 0) { |
| return; |
| } |
| this.readers[idx] = reader2; |
| this.readers[idx+1] = reader1; |
| idx++; |
| } |
| } |
| |
| /** |
| * Internal OutputFormat class |
| */ |
| public static class IndexedStorageOutputFormat extends PigTextOutputFormat { |
| |
| public IndexedStorageOutputFormat(byte delimiter, int[] offsetsToIndexKeys) { |
| /* Call the base class constructor */ |
| super(delimiter); |
| |
| this.fieldDelimiter = delimiter; |
| this.offsetsToIndexKeys = offsetsToIndexKeys; |
| } |
| |
| @Override |
| public RecordWriter<WritableComparable, Tuple> getRecordWriter( |
| TaskAttemptContext context) throws IOException, |
| InterruptedException { |
| |
| Configuration conf = context.getConfiguration(); |
| |
| FileSystem fs = FileSystem.get(conf); |
| Path file = this.getDefaultWorkFile(context, ""); |
| FSDataOutputStream fileOut = fs.create(file, false); |
| |
| IndexManager indexManager = new IndexManager(offsetsToIndexKeys); |
| indexManager.createIndexFile(fs, file); |
| return new IndexedStorageRecordWriter(fileOut, this.fieldDelimiter, indexManager); |
| } |
| |
| /** |
| * Internal class to do the actual record writing and index generation |
| * |
| */ |
| public static class IndexedStorageRecordWriter extends PigLineRecordWriter { |
| |
| public IndexedStorageRecordWriter(FSDataOutputStream fileOut, byte fieldDel, IndexManager indexManager) throws IOException { |
| super(fileOut, fieldDel); |
| |
| this.fileOut = fileOut; |
| this.indexManager = indexManager; |
| |
| /* Write the index header first */ |
| this.indexManager.WriteIndexHeader(); |
| } |
| |
| @Override |
| public void write(WritableComparable key, Tuple value) throws IOException { |
| /* Write the data */ |
| long offset = this.fileOut.getPos(); |
| super.write(key, value); |
| |
| /* Build index */ |
| this.indexManager.BuildIndex(value, offset); |
| } |
| |
| @Override |
| public void close(TaskAttemptContext context) |
| throws IOException { |
| this.indexManager.WriterIndexFooter(); |
| this.indexManager.Close(); |
| super.close(context); |
| } |
| |
| /** |
| * Output stream for data |
| */ |
| private FSDataOutputStream fileOut; |
| |
| /** |
| * Index builder |
| */ |
| private IndexManager indexManager = null; |
| } |
| |
| /** |
| * Delimiter to use between fields |
| */ |
| final private byte fieldDelimiter; |
| |
| /** |
| * Offsets to index keys in given tuple |
| */ |
| final protected int[] offsetsToIndexKeys; |
| } |
| |
| @Override |
| public InputFormat getInputFormat() { |
| return new IndexedStorageInputFormat(); |
| } |
| |
| @Override |
| public Tuple getNext() throws IOException { |
| if (this.readers == null) { |
| return super.getNext(); |
| } |
| |
| while (currentReaderIndexStart < this.readers.length) { |
| IndexedStorageRecordReader r = this.readers[currentReaderIndexStart]; |
| |
| this.prepareToRead(r, null); |
| Tuple tuple = super.getNext(); |
| if (tuple == null) { |
| currentReaderIndexStart++; |
| r.close(); |
| continue; //next Reader |
| } |
| |
| //if we haven't yet initialized the indexManager (by reading the first index key) |
| if (r.indexManager.lastIndexKeyTuple == null) { |
| |
| //initialize the indexManager |
| if (r.indexManager.ReadIndex() == null) { |
| //There should never be a case where there is a non-null record - but no corresponding index. |
| throw new IOException("Missing Index for Tuple: " + tuple); |
| } |
| } |
| |
| r.indexManager.numberOfTuples--; |
| |
| if (r.indexManager.numberOfTuples == 0) { |
| if (r.indexManager.ReadIndex() == null) { |
| r.close(); |
| currentReaderIndexStart++; |
| } else { |
| //Since the index of the current reader was increased, we may need to push the |
| //current reader back in the sorted list of readers. |
| sortReader(currentReaderIndexStart); |
| } |
| } |
| return tuple; |
| } |
| |
| return null; |
| } |
| |
| /** |
| * IndexableLoadFunc interface implementation |
| */ |
| @Override |
| public void initialize(Configuration conf) throws IOException { |
| try { |
| InputFormat inputFormat = this.getInputFormat(); |
| TaskAttemptID id = HadoopShims.getNewTaskAttemptID(); |
| |
| if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { |
| conf.set(MRConfiguration.JOB_CREDENTIALS_BINARY, System.getenv("HADOOP_TOKEN_FILE_LOCATION")); |
| } |
| List<FileSplit> fileSplits = inputFormat.getSplits(HadoopShims.createJobContext(conf, null)); |
| this.readers = new IndexedStorageRecordReader[fileSplits.size()]; |
| |
| int idx = 0; |
| Iterator<FileSplit> it = fileSplits.iterator(); |
| while (it.hasNext()) { |
| FileSplit fileSplit = it.next(); |
| TaskAttemptContext context = HadoopShims.createTaskAttemptContext(conf, id); |
| IndexedStorageRecordReader r = (IndexedStorageRecordReader) inputFormat.createRecordReader(fileSplit, context); |
| r.initialize(fileSplit, context); |
| this.readers[idx] = r; |
| idx++; |
| } |
| |
| Arrays.sort(this.readers, this.readerComparator); |
| } catch (InterruptedException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| @Override |
| /* The list of readers is always sorted before and after this call. */ |
| public void seekNear(Tuple keys) throws IOException { |
| |
| /* Keeps track of the last (if any) reader where seekNear was called */ |
| int lastIndexModified = -1; |
| |
| int idx = currentReaderIndexStart; |
| while (idx < this.readers.length) { |
| IndexedStorageRecordReader r = this.readers[idx]; |
| |
| /* The key falls within the range of the reader index */ |
| if (keys.compareTo(r.indexManager.maxIndexKeyTuple) <= 0 && keys.compareTo(r.indexManager.minIndexKeyTuple) >= 0) { |
| r.seekNear(keys); |
| lastIndexModified = idx; |
| |
| /* The key is greater than the current range of the reader index */ |
| } else if (keys.compareTo(r.indexManager.maxIndexKeyTuple) > 0) { |
| currentReaderIndexStart++; |
| /* DO NOTHING - The key is less than the current range of the reader index */ |
| } else { |
| break; |
| } |
| idx++; |
| } |
| |
| /* |
| * There is something to sort. |
| * We can rely on the following invariants that make the following check accurate: |
| * - currentReaderIndexStart is always >= 0. |
| * - lastIndexModified is only positive if seekNear was called. |
| * - lastIndexModified >= currentReaderIndexStart if lastIndexModifed >= 0. This is true because the list |
| * is already sorted. |
| */ |
| if (lastIndexModified - currentReaderIndexStart >= 0) { |
| |
| /* |
| * The following logic is optimized for the (common) case where there are a tiny number of readers that |
| * need to be repositioned relative to the other readers in the much larger sorted list. |
| */ |
| |
| /* First, just sort the readers that were updated relative to one another. */ |
| Arrays.sort(this.readers, currentReaderIndexStart, lastIndexModified+1, this.readerComparator); |
| |
| /* In descending order, push the updated readers back in the the sorted list. */ |
| for (idx = lastIndexModified; idx >= currentReaderIndexStart; idx--) { |
| sortReader(idx); |
| } |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| for (IndexedStorageRecordReader reader : this.readers) { |
| reader.close(); |
| } |
| } |
| |
| /** |
| * <code>IndexManager</code> manages the index file (both writing and reading) |
| * It keeps track of the last index read during reading. |
| */ |
| public static class IndexManager { |
| |
| /** |
| * Constructor (called during reading) |
| * @param ifile index file to read |
| */ |
| public IndexManager(FileStatus ifile) { |
| this.indexFile = ifile; |
| this.offsetToFooter = -1; |
| } |
| |
| /** |
| * Constructor (called during writing) |
| * @param offsetsToIndexKeys |
| */ |
| public IndexManager(int[] offsetsToIndexKeys) { |
| this.offsetsToIndexKeys = offsetsToIndexKeys; |
| this.offsetToFooter = -1; |
| } |
| |
| /** |
| * Construct index file path for a given a data file |
| * @param file - Data file |
| * @return - Index file path for given data file |
| */ |
| private static Path getIndexFileName(Path file) { |
| return new Path(file.getParent(), "." + file.getName() + ".index"); |
| } |
| |
| /** |
| * Open the index file for writing for given data file |
| * @param fs |
| * @param file |
| * @throws IOException |
| */ |
| public void createIndexFile(FileSystem fs, Path file) throws IOException { |
| this.indexOut = fs.create(IndexManager.getIndexFileName(file), false); |
| } |
| |
| /** |
| * Opens the index file. |
| */ |
| public void openIndexFile(FileSystem fs) throws IOException { |
| this.indexIn = fs.open(this.indexFile.getPath()); |
| } |
| |
| /** |
| * Close the index file |
| * @throws IOException |
| */ |
| public void Close() throws IOException { |
| this.indexOut.close(); |
| } |
| |
| /** |
| * Build index tuple |
| * |
| * @throws IOException |
| */ |
| private void BuildIndex(Tuple t, long offset) throws IOException { |
| /* Build index key tuple */ |
| Tuple indexKeyTuple = tupleFactory.newTuple(this.offsetsToIndexKeys.length); |
| for (int i = 0; i < this.offsetsToIndexKeys.length; ++i) { |
| indexKeyTuple.set(i, t.get(this.offsetsToIndexKeys[i])); |
| } |
| |
| /* Check if we have already seen Tuple(s) with same index keys */ |
| if (indexKeyTuple.compareTo(this.lastIndexKeyTuple) == 0) { |
| /* We have seen Tuple(s) with given index keys, update the tuple count */ |
| this.numberOfTuples += 1; |
| } |
| else { |
| if (this.lastIndexKeyTuple != null) |
| this.WriteIndex(); |
| |
| this.lastIndexKeyTuple = indexKeyTuple; |
| this.minIndexKeyTuple = ((this.minIndexKeyTuple == null) || (indexKeyTuple.compareTo(this.minIndexKeyTuple) < 0)) ? indexKeyTuple : this.minIndexKeyTuple; |
| this.maxIndexKeyTuple = ((this.maxIndexKeyTuple == null) || (indexKeyTuple.compareTo(this.maxIndexKeyTuple) > 0)) ? indexKeyTuple : this.maxIndexKeyTuple; |
| |
| /* New index tuple for newly seen index key */ |
| this.indexTuple = tupleFactory.newTuple(3); |
| |
| /* Add index keys to index Tuple */ |
| this.indexTuple.set(0, indexKeyTuple); |
| |
| /* Reset Tuple count for index key */ |
| this.numberOfTuples = 1; |
| |
| /* Remember offset to Tuple with new index keys */ |
| this.indexTuple.set(2, offset); |
| } |
| } |
| |
| /** |
| * Write index header |
| * @param indexOut - Stream to write to |
| * @param ih - Index header to write |
| * @throws IOException |
| */ |
| public void WriteIndexHeader() throws IOException { |
| /* Number of index keys */ |
| indexOut.writeInt(this.offsetsToIndexKeys.length); |
| |
| /* Offset to index keys */ |
| for (int i = 0; i < this.offsetsToIndexKeys.length; ++i) { |
| indexOut.writeInt(this.offsetsToIndexKeys[i]); |
| } |
| } |
| |
| /** |
| * Read index header |
| * @param indexIn - Stream to read from |
| * @return Index header |
| * @throws IOException |
| */ |
| public void ReadIndexHeader() throws IOException { |
| /* Number of index keys */ |
| int nkeys = this.indexIn.readInt(); |
| |
| /* Offset to index keys */ |
| this.offsetsToIndexKeys = new int[nkeys]; |
| for (int i = 0; i < nkeys; ++i) { |
| offsetsToIndexKeys[i] = this.indexIn.readInt(); |
| } |
| } |
| |
| /** |
| * Writes the index footer |
| */ |
| public void WriterIndexFooter() throws IOException { |
| /* Flush indexes for remaining records */ |
| this.WriteIndex(); |
| |
| /* record the offset to footer */ |
| this.offsetToFooter = this.indexOut.getPos(); |
| |
| /* Write index footer */ |
| DataReaderWriter.writeDatum(indexOut, this.minIndexKeyTuple); |
| DataReaderWriter.writeDatum(indexOut, this.maxIndexKeyTuple); |
| |
| /* Offset to footer */ |
| indexOut.writeLong(this.offsetToFooter); |
| } |
| |
| /** |
| * Reads the index footer |
| */ |
| public void ReadIndexFooter() throws IOException { |
| long currentOffset = this.indexIn.getPos(); |
| |
| this.SeekToIndexFooter(); |
| this.minIndexKeyTuple = (Tuple)DataReaderWriter.readDatum(this.indexIn); |
| this.maxIndexKeyTuple = (Tuple)DataReaderWriter.readDatum(this.indexIn); |
| |
| this.indexIn.seek(currentOffset); |
| } |
| |
| /** |
| * Seeks to the index footer |
| */ |
| public void SeekToIndexFooter() throws IOException { |
| if (this.offsetToFooter < 0) { |
| /* offset to footer is at last long (8 bytes) in the file */ |
| this.indexIn.seek(this.indexFile.getLen()-8); |
| this.offsetToFooter = this.indexIn.readLong(); |
| } |
| this.indexIn.seek(this.offsetToFooter); |
| } |
| |
| /** |
| * Writes the current index. |
| */ |
| public void WriteIndex() throws IOException { |
| this.indexTuple.set(1, this.numberOfTuples); |
| DataReaderWriter.writeDatum(this.indexOut, this.indexTuple); |
| } |
| |
| /** |
| * Extracts the index key from the index tuple |
| */ |
| public Tuple getIndexKeyTuple(Tuple indexTuple) throws IOException { |
| if (indexTuple.size() == 3) |
| return (Tuple)indexTuple.get(0); |
| else |
| throw new IOException("Invalid index record with size " + indexTuple.size()); |
| } |
| |
| /** |
| * Extracts the number of records that share the current key from the index tuple. |
| */ |
| public long getIndexKeyTupleCount(Tuple indexTuple) throws IOException { |
| if (indexTuple.size() == 3) |
| return (Long)indexTuple.get(1); |
| else |
| throw new IOException("Invalid index record with size " + indexTuple.size()); |
| } |
| |
| /** |
| * Extracts the offset into the data file from the index tuple. |
| */ |
| public long getOffset(Tuple indexTuple) throws IOException { |
| if (indexTuple.size() == 3) |
| return (Long)indexTuple.get(2); |
| else |
| throw new IOException("Invalid index record with size " + indexTuple.size()); |
| } |
| |
| /** |
| * Reads the next index from the index file (or null if EOF) and extracts |
| * the index fields. |
| */ |
| public Tuple ReadIndex() throws IOException { |
| if (this.indexIn.getPos() < this.offsetToFooter) { |
| indexTuple = (Tuple)DataReaderWriter.readDatum(this.indexIn); |
| if (indexTuple != null) { |
| this.lastIndexKeyTuple = this.getIndexKeyTuple(indexTuple); |
| this.numberOfTuples = this.getIndexKeyTupleCount(indexTuple); |
| } |
| return indexTuple; |
| } |
| return null; |
| } |
| |
| /** |
| * Scans the index looking for a given key. |
| * @return the matching index tuple OR the last index tuple |
| * greater than the requested key if no match is found. |
| */ |
| public Tuple ScanIndex(Tuple keys) throws IOException { |
| if (lastIndexKeyTuple != null && keys.compareTo(this.lastIndexKeyTuple) <= 0) { |
| return indexTuple; |
| } |
| |
| /* Scan the index looking for given key */ |
| while ((indexTuple = this.ReadIndex()) != null) { |
| if (keys.compareTo(this.lastIndexKeyTuple) > 0) |
| continue; |
| else |
| break; |
| } |
| |
| return indexTuple; |
| } |
| |
| /** |
| * stores the list of record indices that identify keys. |
| */ |
| private int[] offsetsToIndexKeys = null; |
| |
| /** |
| * offset in bytes to the start of the footer of the index. |
| */ |
| private long offsetToFooter = -1; |
| |
| /** |
| * output stream when writing the index. |
| */ |
| FSDataOutputStream indexOut; |
| |
| /** |
| * input stream when reading the index. |
| */ |
| FSDataInputStream indexIn; |
| |
| /** |
| * Tuple factory to create index tuples |
| */ |
| private TupleFactory tupleFactory = TupleFactory.getInstance(); |
| |
| /** |
| * Index key tuple of the form |
| * ((Tuple of index keys), count of tuples with index keys, offset to first tuple with index keys) |
| */ |
| private Tuple indexTuple = tupleFactory.newTuple(3); |
| |
| /** |
| * "Smallest" index key tuple seen |
| */ |
| private Tuple minIndexKeyTuple = null; |
| |
| /** |
| * "Biggest" index key tuple seen |
| */ |
| private Tuple maxIndexKeyTuple = null; |
| |
| /** |
| * Last seen index key tuple |
| */ |
| private Tuple lastIndexKeyTuple = null; |
| |
| /** |
| * Number of tuples seen for a index key |
| */ |
| private long numberOfTuples = 0; |
| |
| /** |
| * The index file. |
| */ |
| private FileStatus indexFile; |
| } |
| |
| /** |
| * Internal InputFormat class |
| */ |
| public static class IndexedStorageInputFormat extends PigTextInputFormat { |
| |
| @Override |
| public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { |
| IndexManager im = null; |
| try { |
| FileSystem fs = FileSystem.get(context.getConfiguration()); |
| Path indexFile = IndexManager.getIndexFileName(((FileSplit)split).getPath()); |
| im = new IndexManager(fs.getFileStatus(indexFile)); |
| im.openIndexFile(fs); |
| im.ReadIndexHeader(); |
| im.ReadIndexFooter(); |
| } catch (IOException e) { |
| // TODO Auto-generated catch block |
| e.printStackTrace(); |
| } |
| |
| return new IndexedStorageRecordReader(im); |
| } |
| |
| @Override |
| public boolean isSplitable(JobContext context, Path filename) { |
| return false; |
| } |
| |
| /** |
| * Internal RecordReader class |
| */ |
| public static class IndexedStorageRecordReader extends RecordReader<LongWritable, Text> { |
| private long start; |
| private long pos; |
| private long end; |
| private IndexedStorageLineReader in; |
| private int maxLineLength; |
| private LongWritable key = null; |
| private Text value = null; |
| private IndexManager indexManager = null; |
| |
| @Override |
| public String toString() { |
| return indexManager.minIndexKeyTuple + "|" + indexManager.lastIndexKeyTuple + "|" + indexManager.maxIndexKeyTuple; |
| } |
| |
| public IndexedStorageRecordReader(IndexManager im) { |
| this.indexManager = im; |
| } |
| |
| /** |
| * Class to compare record readers using underlying indexes |
| * |
| */ |
| public static class IndexedStorageRecordReaderComparator implements Comparator<IndexedStorageRecordReader> { |
| @Override |
| public int compare(IndexedStorageRecordReader o1, IndexedStorageRecordReader o2) { |
| Tuple t1 = (o1.indexManager.lastIndexKeyTuple == null) ? o1.indexManager.minIndexKeyTuple : o1.indexManager.lastIndexKeyTuple; |
| Tuple t2 = (o2.indexManager.lastIndexKeyTuple == null) ? o2.indexManager.minIndexKeyTuple : o2.indexManager.lastIndexKeyTuple; |
| return t1.compareTo(t2); |
| } |
| } |
| |
| public static class IndexedStorageLineReader { |
| private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; |
| private int bufferSize = DEFAULT_BUFFER_SIZE; |
| private InputStream in; |
| private byte[] buffer; |
| // the number of bytes of real data in the buffer |
| private int bufferLength = 0; |
| // the current position in the buffer |
| private int bufferPosn = 0; |
| private long bufferOffset = 0; |
| |
| private static final byte CR = '\r'; |
| private static final byte LF = '\n'; |
| |
| /** |
| * Create a line reader that reads from the given stream using the |
| * default buffer-size (64k). |
| * @param in The input stream |
| * @throws IOException |
| */ |
| public IndexedStorageLineReader(InputStream in) { |
| this(in, DEFAULT_BUFFER_SIZE); |
| } |
| |
| /** |
| * Create a line reader that reads from the given stream using the |
| * given buffer-size. |
| * @param in The input stream |
| * @param bufferSize Size of the read buffer |
| * @throws IOException |
| */ |
| public IndexedStorageLineReader(InputStream in, int bufferSize) { |
| if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) { |
| throw new IllegalArgumentException( |
| "In is not an instance of Seekable or PositionedReadable"); |
| } |
| |
| this.in = in; |
| this.bufferSize = bufferSize; |
| this.buffer = new byte[this.bufferSize]; |
| } |
| |
| /** |
| * Create a line reader that reads from the given stream using the |
| * <code>io.file.buffer.size</code> specified in the given |
| * <code>Configuration</code>. |
| * @param in input stream |
| * @param conf configuration |
| * @throws IOException |
| */ |
| public IndexedStorageLineReader(InputStream in, Configuration conf) throws IOException { |
| this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); |
| } |
| |
| /** |
| * Close the underlying stream. |
| * @throws IOException |
| */ |
| public void close() throws IOException { |
| in.close(); |
| } |
| |
| /** |
| * Read one line from the InputStream into the given Text. A line |
| * can be terminated by one of the following: '\n' (LF) , '\r' (CR), |
| * or '\r\n' (CR+LF). EOF also terminates an otherwise unterminated |
| * line. |
| * |
| * @param str the object to store the given line (without newline) |
| * @param maxLineLength the maximum number of bytes to store into str; |
| * the rest of the line is silently discarded. |
| * @param maxBytesToConsume the maximum number of bytes to consume |
| * in this call. This is only a hint, because if the line cross |
| * this threshold, we allow it to happen. It can overshoot |
| * potentially by as much as one buffer length. |
| * |
| * @return the number of bytes read including the (longest) newline |
| * found. |
| * |
| * @throws IOException if the underlying stream throws |
| */ |
| public int readLine(Text str, int maxLineLength, |
| int maxBytesToConsume) throws IOException { |
| /* We're reading data from in, but the head of the stream may be |
| * already buffered in buffer, so we have several cases: |
| * 1. No newline characters are in the buffer, so we need to copy |
| * everything and read another buffer from the stream. |
| * 2. An unambiguously terminated line is in buffer, so we just |
| * copy to str. |
| * 3. Ambiguously terminated line is in buffer, i.e. buffer ends |
| * in CR. In this case we copy everything up to CR to str, but |
| * we also need to see what follows CR: if it's LF, then we |
| * need consume LF as well, so next call to readLine will read |
| * from after that. |
| * We use a flag prevCharCR to signal if previous character was CR |
| * and, if it happens to be at the end of the buffer, delay |
| * consuming it until we have a chance to look at the char that |
| * follows. |
| */ |
| str.clear(); |
| int txtLength = 0; //tracks str.getLength(), as an optimization |
| int newlineLength = 0; //length of terminating newline |
| boolean prevCharCR = false; //true of prev char was CR |
| long bytesConsumed = 0; |
| do { |
| int startPosn = bufferPosn; //starting from where we left off the last time |
| if (bufferPosn >= bufferLength) { |
| startPosn = bufferPosn = 0; |
| if (prevCharCR) |
| ++bytesConsumed; //account for CR from previous read |
| |
| bufferOffset = ((Seekable)in).getPos(); |
| bufferLength = in.read(buffer); |
| |
| if (bufferLength <= 0) |
| break; // EOF |
| } |
| for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline |
| if (buffer[bufferPosn] == LF) { |
| newlineLength = (prevCharCR) ? 2 : 1; |
| ++bufferPosn; // at next invocation proceed from following byte |
| break; |
| } |
| if (prevCharCR) { //CR + notLF, we are at notLF |
| newlineLength = 1; |
| break; |
| } |
| prevCharCR = (buffer[bufferPosn] == CR); |
| } |
| int readLength = bufferPosn - startPosn; |
| if (prevCharCR && newlineLength == 0) |
| --readLength; //CR at the end of the buffer |
| bytesConsumed += readLength; |
| int appendLength = readLength - newlineLength; |
| if (appendLength > maxLineLength - txtLength) { |
| appendLength = maxLineLength - txtLength; |
| } |
| if (appendLength > 0) { |
| str.append(buffer, startPosn, appendLength); |
| txtLength += appendLength; |
| } |
| } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); |
| |
| if (bytesConsumed > (long)Integer.MAX_VALUE) |
| throw new IOException("Too many bytes before newline: " + bytesConsumed); |
| return (int)bytesConsumed; |
| } |
| |
| /** |
| * Read from the InputStream into the given Text. |
| * @param str the object to store the given line |
| * @param maxLineLength the maximum number of bytes to store into str. |
| * @return the number of bytes read including the newline |
| * @throws IOException if the underlying stream throws |
| */ |
| public int readLine(Text str, int maxLineLength) throws IOException { |
| return readLine(str, maxLineLength, Integer.MAX_VALUE); |
| } |
| |
| /** |
| * Read from the InputStream into the given Text. |
| * @param str the object to store the given line |
| * @return the number of bytes read including the newline |
| * @throws IOException if the underlying stream throws |
| */ |
| public int readLine(Text str) throws IOException { |
| return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); |
| } |
| |
| /** |
| * If given offset is within the buffer, adjust the buffer position to read from |
| * otherwise seek to the given offset from start of the file. |
| * @param offset |
| * @throws IOException |
| */ |
| public void seek(long offset) throws IOException { |
| if ((offset >= bufferOffset) && (offset < (bufferOffset + bufferLength))) |
| bufferPosn = (int) (offset - bufferOffset); |
| else { |
| bufferPosn = bufferLength; |
| ((Seekable)in).seek(offset); |
| } |
| } |
| } |
| |
| @Override |
| public void initialize(InputSplit genericSplit, TaskAttemptContext context) |
| throws IOException, InterruptedException { |
| |
| FileSplit split = (FileSplit) genericSplit; |
| Configuration job = context.getConfiguration(); |
| this.maxLineLength = job.getInt(MRConfiguration.LINERECORDREADER_MAXLENGTH, Integer.MAX_VALUE); |
| start = split.getStart(); |
| end = start + split.getLength(); |
| final Path file = split.getPath(); |
| |
| FileSystem fs = file.getFileSystem(job); |
| FSDataInputStream fileIn = fs.open(split.getPath()); |
| boolean skipFirstLine = false; |
| if (start != 0) { |
| skipFirstLine = true; |
| --start; |
| fileIn.seek(start); |
| } |
| in = new IndexedStorageLineReader(fileIn, job); |
| if (skipFirstLine) { |
| start += in.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start)); |
| } |
| this.pos = start; |
| } |
| |
| public void seek(long offset) throws IOException { |
| in.seek(offset); |
| pos = offset; |
| } |
| |
| /** |
| * Scan the index for given key and seek to appropriate offset in the data |
| * @param keys to look for |
| * @return true if the given key was found, false otherwise |
| * @throws IOException |
| */ |
| public boolean seekNear(Tuple keys) throws IOException { |
| boolean ret = false; |
| Tuple indexTuple = this.indexManager.ScanIndex(keys); |
| if (indexTuple != null) { |
| long offset = this.indexManager.getOffset(indexTuple) ; |
| in.seek(offset); |
| |
| if (keys.compareTo(this.indexManager.getIndexKeyTuple(indexTuple)) == 0) { |
| ret = true; |
| } |
| } |
| |
| return ret; |
| } |
| |
| @Override |
| public boolean nextKeyValue() throws IOException, |
| InterruptedException { |
| if (key == null) { |
| key = new LongWritable(); |
| } |
| key.set(pos); |
| if (value == null) { |
| value = new Text(); |
| } |
| int newSize = 0; |
| while (pos < end) { |
| newSize = in.readLine(value, maxLineLength, |
| Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), |
| maxLineLength)); |
| if (newSize == 0) { |
| break; |
| } |
| pos += newSize; |
| if (newSize < maxLineLength) { |
| break; |
| } |
| } |
| if (newSize == 0) { |
| key = null; |
| value = null; |
| return false; |
| } else { |
| return true; |
| } |
| } |
| |
| @Override |
| public LongWritable getCurrentKey() throws IOException, |
| InterruptedException { |
| return key; |
| } |
| |
| @Override |
| public Text getCurrentValue() throws IOException, |
| InterruptedException { |
| return value; |
| } |
| |
| @Override |
| public float getProgress() throws IOException, InterruptedException { |
| if (start == end) { |
| return 0.0f; |
| } else { |
| return Math.min(1.0f, (pos - start) / (float)(end - start)); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| if (in != null) { |
| in.close(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * List of record readers. |
| */ |
| protected IndexedStorageRecordReader[] readers = null; |
| |
| /** |
| * Index into the the list of readers to the current reader. |
| * Readers before this index have been fully scanned for keys. |
| */ |
| protected int currentReaderIndexStart = 0; |
| |
| /** |
| * Delimiter to use between fields |
| */ |
| protected byte fieldDelimiter = '\t'; |
| |
| /** |
| * Offsets to index keys in tuple |
| */ |
| final protected int[] offsetsToIndexKeys; |
| |
| /** |
| * Comparator used to compare key tuples. |
| */ |
| protected Comparator<IndexedStorageRecordReader> readerComparator = new IndexedStorageInputFormat.IndexedStorageRecordReader.IndexedStorageRecordReaderComparator(); |
| } |