blob: 61256a0b3ed0d6df151a8b365c667f7025889230 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
import org.apache.hadoop.mapreduce.task.MapContextImpl;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.IndexedSorter;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.QuickSort;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
/** A Map task. */
class MapTask extends Task {
/**
* The size of each record in the index file for the map-outputs.
*/
public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
private BytesWritable split = new BytesWritable();
private String splitClass;
private final static int APPROX_HEADER_LENGTH = 150;
private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
private Progress mapPhase;
private Progress sortPhase;
{ // set phase for this task
setPhase(TaskStatus.Phase.MAP);
getProgress().setStatus("map");
}
public MapTask() {
super();
}
public MapTask(String jobFile, TaskAttemptID taskId,
int partition, String splitClass, BytesWritable split,
int numSlotsRequired) {
super(jobFile, taskId, partition, numSlotsRequired);
this.splitClass = splitClass;
this.split = split;
}
@Override
public boolean isMapTask() {
return true;
}
@Override
public void localizeConfiguration(JobConf conf)
throws IOException {
super.localizeConfiguration(conf);
// split.dta file is used only by IsolationRunner.
// Write the split file to the local disk if it is a normal map task (not a
// job-setup or a job-cleanup task) and if the user wishes to run
// IsolationRunner either by setting keep.failed.tasks.files to true or by
// using keep.tasks.files.pattern
if (isMapOrReduce()
&& (conf.getKeepTaskFilesPattern() != null || conf
.getKeepFailedTaskFiles())) {
Path localSplit =
new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathForWrite(
TaskTracker.getLocalSplitFile(conf.getUser(), getJobID()
.toString(), getTaskID().toString()), conf);
LOG.debug("Writing local split to " + localSplit);
DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
Text.writeString(out, splitClass);
split.write(out);
out.close();
}
}
@Override
public TaskRunner createRunner(TaskTracker tracker,
TaskTracker.TaskInProgress tip) {
return new MapTaskRunner(tip, tracker, this.conf);
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
if (isMapOrReduce()) {
Text.writeString(out, splitClass);
split.write(out);
split = null;
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
if (isMapOrReduce()) {
splitClass = Text.readString(in);
split.readFields(in);
}
}
/**
* This class wraps the user's record reader to update the counters and progress
* as records are read.
* @param <K>
* @param <V>
*/
class TrackedRecordReader<K, V>
implements RecordReader<K,V> {
private RecordReader<K,V> rawIn;
private Counters.Counter inputByteCounter;
private Counters.Counter inputRecordCounter;
private TaskReporter reporter;
private long beforePos = -1;
private long afterPos = -1;
TrackedRecordReader(RecordReader<K,V> raw, TaskReporter reporter)
throws IOException{
rawIn = raw;
inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
inputByteCounter = reporter.getCounter(
FileInputFormat.COUNTER_GROUP,
FileInputFormat.BYTES_READ);
this.reporter = reporter;
}
public K createKey() {
return rawIn.createKey();
}
public V createValue() {
return rawIn.createValue();
}
public synchronized boolean next(K key, V value)
throws IOException {
boolean ret = moveToNext(key, value);
if (ret) {
incrCounters();
}
return ret;
}
protected void incrCounters() {
inputRecordCounter.increment(1);
inputByteCounter.increment(afterPos - beforePos);
}
protected synchronized boolean moveToNext(K key, V value)
throws IOException {
beforePos = getPos();
boolean ret = rawIn.next(key, value);
afterPos = getPos();
reporter.setProgress(getProgress());
return ret;
}
public long getPos() throws IOException { return rawIn.getPos(); }
public void close() throws IOException { rawIn.close(); }
public float getProgress() throws IOException {
return rawIn.getProgress();
}
TaskReporter getTaskReporter() {
return reporter;
}
}
/**
* This class skips the records based on the failed ranges from previous
* attempts.
*/
class SkippingRecordReader<K, V> extends TrackedRecordReader<K,V> {
private SkipRangeIterator skipIt;
private SequenceFile.Writer skipWriter;
private boolean toWriteSkipRecs;
private TaskUmbilicalProtocol umbilical;
private Counters.Counter skipRecCounter;
private long recIndex = -1;
SkippingRecordReader(RecordReader<K,V> raw, TaskUmbilicalProtocol umbilical,
TaskReporter reporter) throws IOException{
super(raw, reporter);
this.umbilical = umbilical;
this.skipRecCounter = reporter.getCounter(TaskCounter.MAP_SKIPPED_RECORDS);
this.toWriteSkipRecs = toWriteSkipRecs() &&
SkipBadRecords.getSkipOutputPath(conf)!=null;
skipIt = getSkipRanges().skipRangeIterator();
}
public synchronized boolean next(K key, V value)
throws IOException {
if(!skipIt.hasNext()) {
LOG.warn("Further records got skipped.");
return false;
}
boolean ret = moveToNext(key, value);
long nextRecIndex = skipIt.next();
long skip = 0;
while(recIndex<nextRecIndex && ret) {
if(toWriteSkipRecs) {
writeSkippedRec(key, value);
}
ret = moveToNext(key, value);
skip++;
}
//close the skip writer once all the ranges are skipped
if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) {
skipWriter.close();
}
skipRecCounter.increment(skip);
reportNextRecordRange(umbilical, recIndex);
if (ret) {
incrCounters();
}
return ret;
}
protected synchronized boolean moveToNext(K key, V value)
throws IOException {
recIndex++;
return super.moveToNext(key, value);
}
@SuppressWarnings("unchecked")
private void writeSkippedRec(K key, V value) throws IOException{
if(skipWriter==null) {
Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
Path skipFile = new Path(skipDir, getTaskID().toString());
skipWriter =
SequenceFile.createWriter(
skipFile.getFileSystem(conf), conf, skipFile,
(Class<K>) createKey().getClass(),
(Class<V>) createValue().getClass(),
CompressionType.BLOCK, getTaskReporter());
}
skipWriter.append(key, value);
}
}
@Override
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException {
this.umbilical = umbilical;
if (isMapTask()) {
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase = getProgress().addPhase("sort", 0.333f);
}
TaskReporter reporter = startReporter(umbilical);
boolean useNewApi = job.getUseNewMapper();
initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
if (useNewApi) {
runNewMapper(job, split, umbilical, reporter);
} else {
runOldMapper(job, split, umbilical, reporter);
}
done(umbilical, reporter);
}
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runOldMapper(final JobConf job,
final BytesWritable rawSplit,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, InterruptedException,
ClassNotFoundException {
InputSplit inputSplit = null;
// reinstantiate the split
try {
inputSplit = (InputSplit)
ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
} catch (ClassNotFoundException exp) {
IOException wrap = new IOException("Split class " + splitClass +
" not found");
wrap.initCause(exp);
throw wrap;
}
DataInputBuffer splitBuffer = new DataInputBuffer();
splitBuffer.reset(split.getBytes(), 0, split.getLength());
inputSplit.readFields(splitBuffer);
updateJobWithSplit(job, inputSplit);
reporter.setInputSplit(inputSplit);
RecordReader<INKEY,INVALUE> rawIn = // open input
job.getInputFormat().getRecordReader(inputSplit, job, reporter);
RecordReader<INKEY,INVALUE> in = isSkipping() ?
new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
int numReduceTasks = conf.getNumReduceTasks();
LOG.info("numReduceTasks: " + numReduceTasks);
MapOutputCollector collector = null;
if (numReduceTasks > 0) {
collector = new MapOutputBuffer(umbilical, job, reporter);
} else {
collector = new DirectMapOutputCollector(umbilical, job, reporter);
}
MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
try {
runner.run(in, new OldOutputCollector(collector, conf), reporter);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
collector.flush();
} finally {
//close
in.close(); // close input
collector.close();
}
}
/**
* Update the job with details about the file split
* @param job the job configuration to update
* @param inputSplit the file split
*/
private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) {
if (inputSplit instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) inputSplit;
job.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString());
job.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
job.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
}
}
static class NewTrackingRecordReader<K,V>
extends org.apache.hadoop.mapreduce.RecordReader<K,V> {
private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
private final TaskReporter reporter;
NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader<K,V> real,
TaskReporter reporter) {
this.real = real;
this.reporter = reporter;
this.inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
}
@Override
public void close() throws IOException {
real.close();
}
@Override
public K getCurrentKey() throws IOException, InterruptedException {
return real.getCurrentKey();
}
@Override
public V getCurrentValue() throws IOException, InterruptedException {
return real.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return real.getProgress();
}
@Override
public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
org.apache.hadoop.mapreduce.TaskAttemptContext context
) throws IOException, InterruptedException {
real.initialize(split, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
boolean result = real.nextKeyValue();
if (result) {
inputRecordCounter.increment(1);
}
reporter.setProgress(getProgress());
return result;
}
}
/**
* Since the mapred and mapreduce Partitioners don't share a common interface
* (JobConfigurable is deprecated and a subtype of mapred.Partitioner), the
* partitioner lives in Old/NewOutputCollector. Note that, for map-only jobs,
* the configured partitioner should not be called. It's common for
* partitioners to compute a result mod numReduces, which causes a div0 error
*/
private static class OldOutputCollector<K,V> implements OutputCollector<K,V> {
private final Partitioner<K,V> partitioner;
private final MapOutputCollector<K,V> collector;
private final int numPartitions;
@SuppressWarnings("unchecked")
OldOutputCollector(MapOutputCollector<K,V> collector, JobConf conf) {
numPartitions = conf.getNumReduceTasks();
if (numPartitions > 0) {
partitioner = (Partitioner<K,V>)
ReflectionUtils.newInstance(conf.getPartitionerClass(), conf);
} else {
partitioner = new Partitioner<K,V>() {
@Override
public void configure(JobConf job) { }
@Override
public int getPartition(K key, V value, int numPartitions) {
return -1;
}
};
}
this.collector = collector;
}
@Override
public void collect(K key, V value) throws IOException {
try {
collector.collect(key, value,
partitioner.getPartition(key, value, numPartitions));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("interrupt exception", ie);
}
}
}
private class NewDirectOutputCollector<K,V>
extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
private final org.apache.hadoop.mapreduce.RecordWriter out;
private final TaskReporter reporter;
private final Counters.Counter mapOutputRecordCounter;
@SuppressWarnings("unchecked")
NewDirectOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter)
throws IOException, ClassNotFoundException, InterruptedException {
this.reporter = reporter;
out = outputFormat.getRecordWriter(taskContext);
mapOutputRecordCounter =
reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
}
@Override
@SuppressWarnings("unchecked")
public void write(K key, V value)
throws IOException, InterruptedException {
reporter.progress();
out.write(key, value);
mapOutputRecordCounter.increment(1);
}
@Override
public void close(TaskAttemptContext context)
throws IOException,InterruptedException {
reporter.progress();
if (out != null) {
out.close(context);
}
}
}
private class NewOutputCollector<K,V>
extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
private final MapOutputCollector<K,V> collector;
private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
private final int partitions;
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
partitions = jobContext.getNumReduceTasks();
if (partitions > 0) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return -1;
}
};
}
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
@Override
public void close(TaskAttemptContext context
) throws IOException,InterruptedException {
try {
collector.flush();
} catch (ClassNotFoundException cnf) {
throw new IOException("can't find class ", cnf);
}
collector.close();
}
}
@SuppressWarnings("unchecked")
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,
final BytesWritable rawSplit,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID());
// make a mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
// make the input format
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
// rebuild the input split
org.apache.hadoop.mapreduce.InputSplit split = null;
DataInputBuffer splitBuffer = new DataInputBuffer();
splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());
SerializationFactory factory = new SerializationFactory(job);
Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>
deserializer =
(Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>)
factory.getDeserializer(job.getClassByName(splitClass));
deserializer.open(splitBuffer);
split = deserializer.deserialize(null);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(inputFormat.createRecordReader(split, taskContext), reporter);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
if (job.getNumReduceTasks() == 0) {
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),
input, output,
committer,
reporter, split);
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
mapContext);
input.initialize(split, mapperContext);
mapper.run(mapperContext);
mapPhase.complete();
setPhase(TaskStatus.Phase.SORT);
statusUpdate(umbilical);
input.close();
output.close(mapperContext);
}
interface MapOutputCollector<K, V> {
public void collect(K key, V value, int partition
) throws IOException, InterruptedException;
public void close() throws IOException, InterruptedException;
public void flush() throws IOException, InterruptedException,
ClassNotFoundException;
}
class DirectMapOutputCollector<K, V>
implements MapOutputCollector<K, V> {
private RecordWriter<K, V> out = null;
private TaskReporter reporter = null;
private final Counters.Counter mapOutputRecordCounter;
@SuppressWarnings("unchecked")
public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
JobConf job, TaskReporter reporter) throws IOException {
this.reporter = reporter;
String finalName = getOutputName(getPartition());
FileSystem fs = FileSystem.get(job);
out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
}
public void close() throws IOException {
if (this.out != null) {
out.close(this.reporter);
}
}
public void flush() throws IOException, InterruptedException,
ClassNotFoundException {
}
public void collect(K key, V value, int partition) throws IOException {
reporter.progress();
out.write(key, value);
mapOutputRecordCounter.increment(1);
}
}
class MapOutputBuffer<K extends Object, V extends Object>
implements MapOutputCollector<K, V>, IndexedSortable {
private final int partitions;
private final JobConf job;
private final TaskReporter reporter;
private final Class<K> keyClass;
private final Class<V> valClass;
private final RawComparator<K> comparator;
private final SerializationFactory serializationFactory;
private final Serializer<K> keySerializer;
private final Serializer<V> valSerializer;
private final CombinerRunner<K,V> combinerRunner;
private final CombineOutputCollector<K, V> combineCollector;
// Compression for map-outputs
private CompressionCodec codec = null;
// k/v accounting
private volatile int kvstart = 0; // marks beginning of spill
private volatile int kvend = 0; // marks beginning of collectable
private int kvindex = 0; // marks end of collected
private final int[] kvoffsets; // indices into kvindices
private final int[] kvindices; // partition, k/v offsets into kvbuffer
private volatile int bufstart = 0; // marks beginning of spill
private volatile int bufend = 0; // marks beginning of collectable
private volatile int bufvoid = 0; // marks the point where we should stop
// reading at the end of the buffer
private int bufindex = 0; // marks end of collected
private int bufmark = 0; // marks end of record
private byte[] kvbuffer; // main output buffer
private static final int PARTITION = 0; // partition offset in acct
private static final int KEYSTART = 1; // key offset in acct
private static final int VALSTART = 2; // val offset in acct
private static final int ACCTSIZE = 3; // total #fields in acct
private static final int RECSIZE =
(ACCTSIZE + 1) * 4; // acct bytes per record
// spill accounting
private volatile int numSpills = 0;
private volatile Throwable sortSpillException = null;
private final int softRecordLimit;
private final int softBufferLimit;
private int recordRemaining;
private int bufferRemaining;
private final int minSpillsForCombine;
private final IndexedSorter sorter;
private final ReentrantLock spillLock = new ReentrantLock();
private final Condition spillDone = spillLock.newCondition();
private final Condition spillReady = spillLock.newCondition();
private final BlockingBuffer bb = new BlockingBuffer();
private volatile boolean spillThreadRunning = false;
private final SpillThread spillThread = new SpillThread();
private final FileSystem localFs;
private final FileSystem rfs;
private final Counters.Counter mapOutputByteCounter;
private final Counters.Counter mapOutputRecordCounter;
private final Counters.Counter combineOutputCounter;
private ArrayList<SpillRecord> indexCacheList;
private int totalIndexCacheMemory;
private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024;
@SuppressWarnings("unchecked")
public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
this.job = job;
this.reporter = reporter;
localFs = FileSystem.getLocal(job);
partitions = job.getNumReduceTasks();
rfs = ((LocalFileSystem)localFs).getRaw();
indexCacheList = new ArrayList<SpillRecord>();
//sanity checks
final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT,(float)0.8);
final float recper = job.getFloat(JobContext.MAP_SORT_RECORD_PERCENT,(float)0.05);
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
if (spillper > (float)1.0 || spillper < (float)0.0) {
throw new IOException("Invalid \"mapreduce.map.sort.spill.percent\": " + spillper);
}
if (recper > (float)1.0 || recper < (float)0.01) {
throw new IOException("Invalid \"mapreduce.map.sort.record.percent\": " + recper);
}
if ((sortmb & 0x7FF) != sortmb) {
throw new IOException("Invalid \"mapreduce.task.mapreduce.task.io.sort.mb\": " + sortmb);
}
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
QuickSort.class, IndexedSorter.class), job);
LOG.info("mapreduce.task.mapreduce.task.io.sort.mb = " + sortmb);
// buffers and accounting
int maxMemUsage = sortmb << 20;
int recordCapacity = (int)(maxMemUsage * recper);
recordCapacity -= recordCapacity % RECSIZE;
kvbuffer = new byte[maxMemUsage - recordCapacity];
bufvoid = kvbuffer.length;
recordCapacity /= RECSIZE;
kvoffsets = new int[recordCapacity];
kvindices = new int[recordCapacity * ACCTSIZE];
softBufferLimit = (int)(kvbuffer.length * spillper);
softRecordLimit = (int)(kvoffsets.length * spillper);
recordRemaining = softRecordLimit;
bufferRemaining = softBufferLimit;
LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length);
LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
// k/v serialization
comparator = job.getOutputKeyComparator();
keyClass = (Class<K>)job.getMapOutputKeyClass();
valClass = (Class<V>)job.getMapOutputValueClass();
serializationFactory = new SerializationFactory(job);
keySerializer = serializationFactory.getSerializer(keyClass);
keySerializer.open(bb);
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb);
// counters
mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
Counters.Counter combineInputCounter =
reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
combineOutputCounter = reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
// compression
if (job.getCompressMapOutput()) {
Class<? extends CompressionCodec> codecClass =
job.getMapOutputCompressorClass(DefaultCodec.class);
codec = ReflectionUtils.newInstance(codecClass, job);
}
// combiner
combinerRunner = CombinerRunner.create(job, getTaskID(),
combineInputCounter,
reporter, null);
if (combinerRunner != null) {
combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
} else {
combineCollector = null;
}
minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPISS, 3);
spillThread.setDaemon(true);
spillThread.setName("SpillThread");
spillLock.lock();
try {
spillThread.start();
while (!spillThreadRunning) {
spillDone.await();
}
} catch (InterruptedException e) {
throw (IOException)new IOException("Spill thread failed to initialize"
).initCause(sortSpillException);
} finally {
spillLock.unlock();
}
if (sortSpillException != null) {
throw (IOException)new IOException("Spill thread failed to initialize"
).initCause(sortSpillException);
}
}
public synchronized void collect(K key, V value, int partition
) throws IOException {
reporter.progress();
if (key.getClass() != keyClass) {
throw new IOException("Type mismatch in key from map: expected "
+ keyClass.getName() + ", recieved "
+ key.getClass().getName());
}
if (value.getClass() != valClass) {
throw new IOException("Type mismatch in value from map: expected "
+ valClass.getName() + ", recieved "
+ value.getClass().getName());
}
final int kvnext = (kvindex + 1) % kvoffsets.length;
if (--recordRemaining <= 0) {
// Possible for check to remain < zero, if soft limit remains
// in force but unsatisfiable because spill is in progress
spillLock.lock();
try {
boolean kvfull;
do {
if (sortSpillException != null) {
throw (IOException)new IOException("Spill failed"
).initCause(sortSpillException);
}
// sufficient acct space
kvfull = kvnext == kvstart;
final boolean kvsoftlimit = ((kvnext > kvend)
? kvnext - kvend > softRecordLimit
: kvend - kvnext <= kvoffsets.length - softRecordLimit);
if (kvstart == kvend && kvsoftlimit) {
LOG.info("Spilling map output: record full = " + kvfull);
startSpill();
}
if (kvfull) {
try {
while (kvstart != kvend) {
reporter.progress();
spillDone.await();
}
} catch (InterruptedException e) {
throw (IOException)new IOException(
"Collector interrupted while waiting for the writer"
).initCause(e);
}
}
} while (kvfull);
final int softOff = kvend + softRecordLimit;
recordRemaining = Math.min(
// out of acct space
(kvnext < kvstart
? kvstart - kvnext
: kvoffsets.length - kvnext + kvstart),
// soft limit
(kvend < kvnext
? softOff - kvnext
: kvnext + (softOff - kvoffsets.length)));
} finally {
spillLock.unlock();
}
}
try {
// serialize key bytes into buffer
int keystart = bufindex;
keySerializer.serialize(key);
if (bufindex < keystart) {
// wrapped the key; reset required
bb.reset();
keystart = 0;
}
// serialize value bytes into buffer
final int valstart = bufindex;
valSerializer.serialize(value);
int valend = bb.markRecord();
if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")");
}
mapOutputRecordCounter.increment(1);
mapOutputByteCounter.increment(valend >= keystart
? valend - keystart
: (bufvoid - keystart) + valend);
// update accounting info
int ind = kvindex * ACCTSIZE;
kvoffsets[kvindex] = ind;
kvindices[ind + PARTITION] = partition;
kvindices[ind + KEYSTART] = keystart;
kvindices[ind + VALSTART] = valstart;
kvindex = kvnext;
} catch (MapBufferTooSmallException e) {
LOG.info("Record too large for in-memory buffer: " + e.getMessage());
spillSingleRecord(key, value, partition);
mapOutputRecordCounter.increment(1);
return;
}
}
/**
* Compare logical range, st i, j MOD offset capacity.
* Compare by partition, then by key.
* @see IndexedSortable#compare
*/
public int compare(int i, int j) {
final int ii = kvoffsets[i % kvoffsets.length];
final int ij = kvoffsets[j % kvoffsets.length];
// sort by partition
if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {
return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
}
// sort by key
return comparator.compare(kvbuffer,
kvindices[ii + KEYSTART],
kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
kvbuffer,
kvindices[ij + KEYSTART],
kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
}
/**
* Swap logical indices st i, j MOD offset capacity.
* @see IndexedSortable#swap
*/
public void swap(int i, int j) {
i %= kvoffsets.length;
j %= kvoffsets.length;
int tmp = kvoffsets[i];
kvoffsets[i] = kvoffsets[j];
kvoffsets[j] = tmp;
}
/**
* Inner class managing the spill of serialized records to disk.
*/
protected class BlockingBuffer extends DataOutputStream {
public BlockingBuffer() {
this(new Buffer());
}
private BlockingBuffer(OutputStream out) {
super(out);
}
/**
* Mark end of record. Note that this is required if the buffer is to
* cut the spill in the proper place.
*/
public int markRecord() {
bufmark = bufindex;
return bufindex;
}
/**
* Set position from last mark to end of writable buffer, then rewrite
* the data between last mark and kvindex.
* This handles a special case where the key wraps around the buffer.
* If the key is to be passed to a RawComparator, then it must be
* contiguous in the buffer. This recopies the data in the buffer back
* into itself, but starting at the beginning of the buffer. Note that
* reset() should <b>only</b> be called immediately after detecting
* this condition. To call it at any other time is undefined and would
* likely result in data loss or corruption.
* @see #markRecord()
*/
protected void reset() throws IOException {
// spillLock unnecessary; If spill wraps, then
// bufindex < bufstart < bufend so contention is impossible
// a stale value for bufstart does not affect correctness, since
// we can only get false negatives that force the more
// conservative path
int headbytelen = bufvoid - bufmark;
bufvoid = bufmark;
if (bufindex + headbytelen < bufstart) {
System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
bufindex += headbytelen;
} else {
byte[] keytmp = new byte[bufindex];
System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
bufindex = 0;
out.write(kvbuffer, bufmark, headbytelen);
out.write(keytmp);
}
}
}
public class Buffer extends OutputStream {
private final byte[] scratch = new byte[1];
@Override
public void write(int v)
throws IOException {
scratch[0] = (byte)v;
write(scratch, 0, 1);
}
/**
* Attempt to write a sequence of bytes to the collection buffer.
* This method will block if the spill thread is running and it
* cannot write.
* @throws MapBufferTooSmallException if record is too large to
* deserialize into the collection buffer.
*/
@Override
public void write(byte b[], int off, int len)
throws IOException {
boolean buffull = false;
boolean wrap = false;
bufferRemaining -= len;
if (bufferRemaining <= 0) {
// writing these bytes could exhaust available buffer space
// check if spill or blocking is necessary
spillLock.lock();
try {
do {
if (sortSpillException != null) {
throw (IOException)new IOException("Spill failed"
).initCause(sortSpillException);
}
// sufficient buffer space?
if (bufstart <= bufend && bufend <= bufindex) {
buffull = bufindex + len > bufvoid;
wrap = (bufvoid - bufindex) + bufstart > len;
} else {
// bufindex <= bufstart <= bufend
// bufend <= bufindex <= bufstart
wrap = false;
buffull = bufindex + len > bufstart;
}
if (kvstart == kvend) {
// spill thread not running
if (kvend != kvindex) {
// we have records we can spill
final boolean bufsoftlimit = (bufindex > bufend)
? bufindex - bufend > softBufferLimit
: bufend - bufindex < bufvoid - softBufferLimit;
if (bufsoftlimit || (buffull && !wrap)) {
LOG.info("Spilling map output: buffer full= " + (buffull && !wrap));
startSpill();
}
} else if (buffull && !wrap) {
// We have no buffered records, and this record is too large
// to write into kvbuffer. We must spill it directly from
// collect
final int size = ((bufend <= bufindex)
? bufindex - bufend
: (bufvoid - bufend) + bufindex) + len;
bufstart = bufend = bufindex = bufmark = 0;
kvstart = kvend = kvindex = 0;
bufvoid = kvbuffer.length;
throw new MapBufferTooSmallException(size + " bytes");
}
}
if (buffull && !wrap) {
try {
while (kvstart != kvend) {
reporter.progress();
spillDone.await();
}
} catch (InterruptedException e) {
throw (IOException)new IOException(
"Buffer interrupted while waiting for the writer"
).initCause(e);
}
}
} while (buffull && !wrap);
final int softOff = bufend + softBufferLimit;
bufferRemaining = Math.min(
// out of buffer space
(bufindex < bufstart
? bufstart - bufindex
: kvbuffer.length - bufindex + bufstart),
// soft limit
(bufend < bufindex
? softOff - bufindex
: bufindex + (softOff - kvbuffer.length)));
} finally {
spillLock.unlock();
}
} else {
buffull = bufindex + len > bufvoid;
}
// here, we know that we have sufficient space to write
if (buffull) {
final int gaplen = bufvoid - bufindex;
System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
len -= gaplen;
off += gaplen;
bufindex = 0;
}
System.arraycopy(b, off, kvbuffer, bufindex, len);
bufindex += len;
bufferRemaining -= len;
}
}
public void flush() throws IOException, ClassNotFoundException,
InterruptedException {
LOG.info("Starting flush of map output");
spillLock.lock();
try {
while (kvstart != kvend) {
reporter.progress();
spillDone.await();
}
if (sortSpillException != null) {
throw (IOException)new IOException("Spill failed"
).initCause(sortSpillException);
}
if (kvend != kvindex) {
kvend = kvindex;
bufend = bufmark;
sortAndSpill();
}
} catch (InterruptedException e) {
throw (IOException)new IOException(
"Buffer interrupted while waiting for the writer"
).initCause(e);
} finally {
spillLock.unlock();
}
assert !spillLock.isHeldByCurrentThread();
// shut down spill thread and wait for it to exit. Since the preceding
// ensures that it is finished with its work (and sortAndSpill did not
// throw), we elect to use an interrupt instead of setting a flag.
// Spilling simultaneously from this thread while the spill thread
// finishes its work might be both a useful way to extend this and also
// sufficient motivation for the latter approach.
try {
spillThread.interrupt();
spillThread.join();
} catch (InterruptedException e) {
throw (IOException)new IOException("Spill failed"
).initCause(e);
}
// release sort buffer before the merge
kvbuffer = null;
mergeParts();
}
public void close() { }
protected class SpillThread extends Thread {
@Override
public void run() {
spillLock.lock();
spillThreadRunning = true;
try {
while (true) {
spillDone.signal();
while (kvstart == kvend) {
spillReady.await();
}
try {
spillLock.unlock();
sortAndSpill();
} catch (Exception e) {
sortSpillException = e;
} catch (Throwable t) {
sortSpillException = t;
String logMsg = "Task " + getTaskID() + " failed : "
+ StringUtils.stringifyException(t);
reportFatalError(getTaskID(), t, logMsg);
} finally {
spillLock.lock();
if (bufend < bufindex && bufindex < bufstart) {
bufvoid = kvbuffer.length;
}
kvstart = kvend;
bufstart = bufend;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
spillLock.unlock();
spillThreadRunning = false;
}
}
}
private void startSpill() {
LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
"; bufvoid = " + bufvoid);
LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
"; length = " + kvoffsets.length);
kvend = kvindex;
bufend = bufmark;
spillReady.signal();
}
private void sortAndSpill() throws IOException, ClassNotFoundException,
InterruptedException {
//approximate the length of the output file to be the length of the
//buffer + header lengths for the partitions
long size = (bufend >= bufstart
? bufend - bufstart
: (bufvoid - bufend) + bufstart) +
partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
final int endPosition = (kvend > kvstart)
? kvend
: kvoffsets.length + kvend;
sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
int spindex = kvstart;
IndexRecord rec = new IndexRecord();
InMemValBytes value = new InMemValBytes();
for (int i = 0; i < partitions; ++i) {
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
spilledRecordsCounter);
if (combinerRunner == null) {
// spill directly
DataInputBuffer key = new DataInputBuffer();
while (spindex < endPosition &&
kvindices[kvoffsets[spindex % kvoffsets.length]
+ PARTITION] == i) {
final int kvoff = kvoffsets[spindex % kvoffsets.length];
getVBytesForOffset(kvoff, value);
key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
(kvindices[kvoff + VALSTART] -
kvindices[kvoff + KEYSTART]));
writer.append(key, value);
++spindex;
}
} else {
int spstart = spindex;
while (spindex < endPosition &&
kvindices[kvoffsets[spindex % kvoffsets.length]
+ PARTITION] == i) {
++spindex;
}
// Note: we would like to avoid the combiner if we've fewer
// than some threshold of records for a partition
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter, combineCollector);
}
}
// close the writer
writer.close();
// record offsets
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
rec.partLength = writer.getCompressedLength();
spillRec.putIndex(rec, i);
writer = null;
} finally {
if (null != writer) writer.close();
}
}
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
LOG.info("Finished spill " + numSpills);
++numSpills;
} finally {
if (out != null) out.close();
}
}
/**
* Handles the degenerate case where serialization fails to fit in
* the in-memory buffer, so we must spill the record from collect
* directly to a spill file. Consider this "losing".
*/
private void spillSingleRecord(final K key, final V value,
int partition) throws IOException {
long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
FSDataOutputStream out = null;
try {
// create spill file
final SpillRecord spillRec = new SpillRecord(partitions);
final Path filename =
mapOutputFile.getSpillFileForWrite(numSpills, size);
out = rfs.create(filename);
// we don't run the combiner for a single record
IndexRecord rec = new IndexRecord();
for (int i = 0; i < partitions; ++i) {
IFile.Writer<K, V> writer = null;
try {
long segmentStart = out.getPos();
// Create a new codec, don't care!
writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
spilledRecordsCounter);
if (i == partition) {
final long recordStart = out.getPos();
writer.append(key, value);
// Note that our map byte count will not be accurate with
// compression
mapOutputByteCounter.increment(out.getPos() - recordStart);
}
writer.close();
// record offsets
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
rec.partLength = writer.getCompressedLength();
spillRec.putIndex(rec, i);
writer = null;
} catch (IOException e) {
if (null != writer) writer.close();
throw e;
}
}
if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
// create spill index file
Path indexFilename =
mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
* MAP_OUTPUT_INDEX_RECORD_LENGTH);
spillRec.writeToFile(indexFilename, job);
} else {
indexCacheList.add(spillRec);
totalIndexCacheMemory +=
spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
}
++numSpills;
} finally {
if (out != null) out.close();
}
}
/**
* Given an offset, populate vbytes with the associated set of
* deserialized value bytes. Should only be called during a spill.
*/
private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
final int nextindex = (kvoff / ACCTSIZE ==
(kvend - 1 + kvoffsets.length) % kvoffsets.length)
? bufend
: kvindices[(kvoff + ACCTSIZE + KEYSTART) % kvindices.length];
int vallen = (nextindex >= kvindices[kvoff + VALSTART])
? nextindex - kvindices[kvoff + VALSTART]
: (bufvoid - kvindices[kvoff + VALSTART]) + nextindex;
vbytes.reset(kvbuffer, kvindices[kvoff + VALSTART], vallen);
}
/**
* Inner class wrapping valuebytes, used for appendRaw.
*/
protected class InMemValBytes extends DataInputBuffer {
private byte[] buffer;
private int start;
private int length;
public void reset(byte[] buffer, int start, int length) {
this.buffer = buffer;
this.start = start;
this.length = length;
if (start + length > bufvoid) {
this.buffer = new byte[this.length];
final int taillen = bufvoid - start;
System.arraycopy(buffer, start, this.buffer, 0, taillen);
System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
this.start = 0;
}
super.reset(this.buffer, this.start, this.length);
}
}
protected class MRResultIterator implements RawKeyValueIterator {
private final DataInputBuffer keybuf = new DataInputBuffer();
private final InMemValBytes vbytes = new InMemValBytes();
private final int end;
private int current;
public MRResultIterator(int start, int end) {
this.end = end;
current = start - 1;
}
public boolean next() throws IOException {
return ++current < end;
}
public DataInputBuffer getKey() throws IOException {
final int kvoff = kvoffsets[current % kvoffsets.length];
keybuf.reset(kvbuffer, kvindices[kvoff + KEYSTART],
kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART]);
return keybuf;
}
public DataInputBuffer getValue() throws IOException {
getVBytesForOffset(kvoffsets[current % kvoffsets.length], vbytes);
return vbytes;
}
public Progress getProgress() {
return null;
}
public void close() { }
}
private void mergeParts() throws IOException, InterruptedException,
ClassNotFoundException {
// get the approximate size of the final output/index files
long finalOutFileSize = 0;
long finalIndexFileSize = 0;
final Path[] filename = new Path[numSpills];
final TaskAttemptID mapId = getTaskID();
for(int i = 0; i < numSpills; i++) {
filename[i] = mapOutputFile.getSpillFile(i);
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
}
if (numSpills == 1) { //the spill is the final output
rfs.rename(filename[0],
new Path(filename[0].getParent(), "file.out"));
if (indexCacheList.size() == 0) {
rfs.rename(mapOutputFile.getSpillIndexFile(0),
new Path(filename[0].getParent(),"file.out.index"));
} else {
indexCacheList.get(0).writeToFile(
new Path(filename[0].getParent(),"file.out.index"), job);
}
return;
}
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
indexCacheList.add(new SpillRecord(indexFileName, job));
}
//make correction in the length to include the sequence file header
//lengths for each partition
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
Path finalOutputFile =
mapOutputFile.getOutputFileForWrite(finalOutFileSize);
Path finalIndexFile =
mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
if (numSpills == 0) {
//create dummy files
IndexRecord rec = new IndexRecord();
SpillRecord sr = new SpillRecord(partitions);
try {
for (int i = 0; i < partitions; i++) {
long segmentStart = finalOut.getPos();
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
writer.close();
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
rec.partLength = writer.getCompressedLength();
sr.putIndex(rec, i);
}
sr.writeToFile(finalIndexFile, job);
} finally {
finalOut.close();
}
return;
}
{
sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
Merger.considerFinalMergeForProgress();
IndexRecord rec = new IndexRecord();
final SpillRecord spillRec = new SpillRecord(partitions);
for (int parts = 0; parts < partitions; parts++) {
//create the segments to be merged
List<Segment<K,V>> segmentList =
new ArrayList<Segment<K, V>>(numSpills);
for(int i = 0; i < numSpills; i++) {
IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
Segment<K,V> s =
new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
indexRecord.partLength, codec, true);
segmentList.add(i, s);
if (LOG.isDebugEnabled()) {
LOG.debug("MapId=" + mapId + " Reducer=" + parts +
"Spill =" + i + "(" + indexRecord.startOffset + "," +
indexRecord.rawLength + ", " + indexRecord.partLength + ")");
}
}
int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);
// sort the segments only if there are intermediate merges
boolean sortSegments = segmentList.size() > mergeFactor;
//merge
@SuppressWarnings("unchecked")
RawKeyValueIterator kvIter = Merger.merge(job, rfs,
keyClass, valClass, codec,
segmentList, mergeFactor,
new Path(mapId.toString()),
job.getOutputKeyComparator(), reporter, sortSegments,
null, spilledRecordsCounter, sortPhase.phase());
//write merged output to disk
long segmentStart = finalOut.getPos();
Writer<K, V> writer =
new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (combinerRunner == null || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
} else {
combineCollector.setWriter(writer);
combinerRunner.combine(kvIter, combineCollector);
}
//close
writer.close();
sortPhase.startNextPhase();
// record offsets
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength();
rec.partLength = writer.getCompressedLength();
spillRec.putIndex(rec, parts);
}
spillRec.writeToFile(finalIndexFile, job);
finalOut.close();
for(int i = 0; i < numSpills; i++) {
rfs.delete(filename[i],true);
}
}
}
} // MapOutputBuffer
/**
* Exception indicating that the allocated sort buffer is insufficient
* to hold the current record.
*/
@SuppressWarnings("serial")
private static class MapBufferTooSmallException extends IOException {
public MapBufferTooSmallException(String s) {
super(s);
}
}
}