blob: abe6c7224755c4a243c48a89d97538760eb7b2b8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.OutputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile.ValueBytes;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
import org.apache.hadoop.util.Progressable;
/** This class implements the sort interface using primitive int arrays as
* the data structures (that is why this class is called 'BasicType'SorterBase)
*/
abstract class BasicTypeSorterBase implements BufferSorter {
protected OutputBuffer keyValBuffer; //the buffer used for storing
//key/values
protected int[] startOffsets; //the array used to store the start offsets of
//keys in keyValBuffer
protected int[] keyLengths; //the array used to store the lengths of
//keys
protected int[] valueLengths; //the array used to store the value lengths
protected int[] pointers; //the array of startOffsets's indices. This will
//be sorted at the end to contain a sorted array of
//indices to offsets
protected RawComparator comparator; //the comparator for the map output
protected int count; //the number of key/values
//the overhead of the arrays in memory
//12 => 4 for keyoffsets, 4 for keylengths, 4 for valueLengths, and
//4 for indices into startOffsets array in the
//pointers array (ignored the partpointers list itself)
static private final int BUFFERED_KEY_VAL_OVERHEAD = 16;
static private final int INITIAL_ARRAY_SIZE = 5;
//we maintain the max lengths of the key/val that we encounter. During
//iteration of the sorted results, we will create a DataOutputBuffer to
//return the keys. The max size of the DataOutputBuffer will be the max
//keylength that we encounter. Expose this value to model memory more
//accurately.
private int maxKeyLength = 0;
private int maxValLength = 0;
//Reference to the Progressable object for sending KeepAlive
protected Progressable reporter;
//Implementation of methods of the SorterBase interface
//
public void configure(JobConf conf) {
comparator = conf.getOutputKeyComparator();
}
public void setProgressable(Progressable reporter) {
this.reporter = reporter;
}
public void addKeyValue(int recordOffset, int keyLength, int valLength) {
//Add the start offset of the key in the startOffsets array and the
//length in the keyLengths array.
if (startOffsets == null || count == startOffsets.length)
grow();
startOffsets[count] = recordOffset;
keyLengths[count] = keyLength;
if (keyLength > maxKeyLength) {
maxKeyLength = keyLength;
}
if (valLength > maxValLength) {
maxValLength = valLength;
}
valueLengths[count] = valLength;
pointers[count] = count;
count++;
}
public void setInputBuffer(OutputBuffer buffer) {
//store a reference to the keyValBuffer that we need to read during sort
this.keyValBuffer = buffer;
}
public long getMemoryUtilized() {
//the total length of the arrays + the max{Key,Val}Length (this will be the
//max size of the DataOutputBuffers during the iteration of the sorted
//keys).
if (startOffsets != null) {
return (startOffsets.length) * BUFFERED_KEY_VAL_OVERHEAD +
maxKeyLength + maxValLength;
}
else { //nothing from this yet
return 0;
}
}
public abstract RawKeyValueIterator sort();
public void close() {
//set count to 0; also, we don't reuse the arrays since we want to maintain
//consistency in the memory model
count = 0;
startOffsets = null;
keyLengths = null;
valueLengths = null;
pointers = null;
maxKeyLength = 0;
maxValLength = 0;
//release the large key-value buffer so that the GC, if necessary,
//can collect it away
keyValBuffer = null;
}
private void grow() {
int currLength = 0;
if (startOffsets != null) {
currLength = startOffsets.length;
}
int newLength = (int)(currLength * 1.1) + 1;
startOffsets = grow(startOffsets, newLength);
keyLengths = grow(keyLengths, newLength);
valueLengths = grow(valueLengths, newLength);
pointers = grow(pointers, newLength);
}
private int[] grow(int[] old, int newLength) {
int[] result = new int[newLength];
if(old != null) {
System.arraycopy(old, 0, result, 0, old.length);
}
return result;
}
} //BasicTypeSorterBase
//Implementation of methods of the RawKeyValueIterator interface. These
//methods must be invoked to iterate over key/vals after sort is done.
//
class MRSortResultIterator implements RawKeyValueIterator {
private int count;
private int[] pointers;
private int[] startOffsets;
private int[] keyLengths;
private int[] valLengths;
private int currStartOffsetIndex;
private int currIndexInPointers;
private OutputBuffer keyValBuffer;
private DataOutputBuffer key = new DataOutputBuffer();
private InMemUncompressedBytes value = new InMemUncompressedBytes();
public MRSortResultIterator(OutputBuffer keyValBuffer,
int []pointers, int []startOffsets,
int []keyLengths, int []valLengths) {
this.count = pointers.length;
this.pointers = pointers;
this.startOffsets = startOffsets;
this.keyLengths = keyLengths;
this.valLengths = valLengths;
this.keyValBuffer = keyValBuffer;
}
public Progress getProgress() {
return null;
}
public DataOutputBuffer getKey() throws IOException {
int currKeyOffset = startOffsets[currStartOffsetIndex];
int currKeyLength = keyLengths[currStartOffsetIndex];
//reuse the same key
key.reset();
key.write(keyValBuffer.getData(), currKeyOffset, currKeyLength);
return key;
}
public ValueBytes getValue() throws IOException {
//value[i] is stored in the following byte range:
//startOffsets[i] + keyLengths[i] through valLengths[i]
value.reset(keyValBuffer,
startOffsets[currStartOffsetIndex] + keyLengths[currStartOffsetIndex],
valLengths[currStartOffsetIndex]);
return value;
}
public boolean next() throws IOException {
if (count == currIndexInPointers)
return false;
currStartOffsetIndex = pointers[currIndexInPointers];
currIndexInPointers++;
return true;
}
public void close() {
return;
}
//An implementation of the ValueBytes interface for the in-memory value
//buffers.
private static class InMemUncompressedBytes implements ValueBytes {
private byte[] data;
int start;
int dataSize;
private void reset(OutputBuffer d, int start, int length)
throws IOException {
data = d.getData();
this.start = start;
dataSize = length;
}
public int getSize() {
return dataSize;
}
public void writeUncompressedBytes(DataOutputStream outStream)
throws IOException {
outStream.write(data, start, dataSize);
}
public void writeCompressedBytes(DataOutputStream outStream)
throws IllegalArgumentException, IOException {
throw
new IllegalArgumentException("UncompressedBytes cannot be compressed!");
}
} // InMemUncompressedBytes
} //MRSortResultIterator