blob: 04909fdb89e9d07a2d1e842b43aecbaaaf08ecc3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.util.Comparator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.util.MergeSort;
import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
/** This class implements the sort method from BasicTypeSorterBase class as
* MergeSort. Note that this class is really a wrapper over the actual
* mergesort implementation that is there in the util package. The main intent
* of providing this class is to setup the input data for the util.MergeSort
* algo so that the latter doesn't need to bother about the various data
* structures that have been created for the Map output but rather concentrate
* on the core algorithm (thereby allowing easy integration of a mergesort
* implementation). The bridge between this class and the util.MergeSort class
* is the Comparator.
*/
class MergeSorter extends BasicTypeSorterBase
implements Comparator<IntWritable> {
private static int progressUpdateFrequency = 10000;
private int progressCalls = 0;
/** The sort method derived from BasicTypeSorterBase and overridden here*/
public RawKeyValueIterator sort() {
MergeSort m = new MergeSort(this);
int count = super.count;
if (count == 0) return null;
int [] pointers = super.pointers;
int [] pointersCopy = new int[count];
System.arraycopy(pointers, 0, pointersCopy, 0, count);
m.mergeSort(pointers, pointersCopy, 0, count);
return new MRSortResultIterator(super.keyValBuffer, pointersCopy,
super.startOffsets, super.keyLengths, super.valueLengths);
}
/** The implementation of the compare method from Comparator. Note that
* Comparator.compare takes objects as inputs and so the int values are
* wrapped in (reusable) IntWritables from the class util.MergeSort
* @param i
* @param j
* @return int as per the specification of Comparator.compare
*/
public int compare (IntWritable i, IntWritable j) {
// indicate we're making progress but do a batch update
if (progressCalls < progressUpdateFrequency) {
progressCalls++;
} else {
progressCalls = 0;
reporter.progress();
}
return comparator.compare(keyValBuffer.getData(), startOffsets[i.get()],
keyLengths[i.get()],
keyValBuffer.getData(), startOffsets[j.get()],
keyLengths[j.get()]);
}
/** Add the extra memory that will be utilized by the sort method */
public long getMemoryUtilized() {
//this is memory that will be actually utilized (considering the temp
//array that will be allocated by the sort() method (mergesort))
return super.getMemoryUtilized() + super.count * 4;
}
}