| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapred; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.lang.reflect.Constructor; |
| import java.lang.reflect.InvocationTargetException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocalDirAllocator; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.io.DataInputBuffer; |
| import org.apache.hadoop.io.RawComparator; |
| import org.apache.hadoop.io.SequenceFile; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.WritableUtils; |
| import org.apache.hadoop.io.SequenceFile.CompressionType; |
| import org.apache.hadoop.io.compress.CompressionCodec; |
| import org.apache.hadoop.io.compress.DefaultCodec; |
| import org.apache.hadoop.io.serializer.Deserializer; |
| import org.apache.hadoop.io.serializer.SerializationFactory; |
| import org.apache.hadoop.io.serializer.Serializer; |
| import org.apache.hadoop.mapred.IFile.Writer; |
| import org.apache.hadoop.mapred.Merger.Segment; |
| import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator; |
| import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
| import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; |
| import org.apache.hadoop.mapreduce.split.JobSplit; |
| import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo; |
| import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; |
| import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; |
| import org.apache.hadoop.mapreduce.task.MapContextImpl; |
| import org.apache.hadoop.mapreduce.MRConfig; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.TaskCounter; |
| import org.apache.hadoop.util.IndexedSortable; |
| import org.apache.hadoop.util.IndexedSorter; |
| import org.apache.hadoop.util.Progress; |
| import org.apache.hadoop.util.QuickSort; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.StringUtils; |
| |
| /** A Map task. */ |
| class MapTask extends Task { |
| /** |
| * The size of each record in the index file for the map-outputs. |
| */ |
| public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24; |
| |
| private TaskSplitIndex splitMetaInfo = new TaskSplitIndex(); |
| private String splitClass; |
| private final static int APPROX_HEADER_LENGTH = 150; |
| |
| private static final Log LOG = LogFactory.getLog(MapTask.class.getName()); |
| |
| private Progress mapPhase; |
| private Progress sortPhase; |
| |
| { // set phase for this task |
| setPhase(TaskStatus.Phase.MAP); |
| getProgress().setStatus("map"); |
| } |
| |
| public MapTask() { |
| super(); |
| } |
| |
| public MapTask(String jobFile, TaskAttemptID taskId, |
| int partition, TaskSplitIndex splitIndex, |
| int numSlotsRequired) { |
| super(jobFile, taskId, partition, numSlotsRequired); |
| this.splitMetaInfo = splitIndex; |
| } |
| |
| @Override |
| public boolean isMapTask() { |
| return true; |
| } |
| |
| @Override |
| public void localizeConfiguration(JobConf conf) |
| throws IOException { |
| super.localizeConfiguration(conf); |
| // split.dta/split.info files are used only by IsolationRunner. |
| // Write the split file to the local disk if it is a normal map task (not a |
| // job-setup or a job-cleanup task) and if the user wishes to run |
| // IsolationRunner either by setting keep.failed.tasks.files to true or by |
| // using keep.tasks.files.pattern |
| if (supportIsolationRunner(conf) && isMapOrReduce()) { |
| // localize the split meta-information |
| Path localSplitMeta = |
| new LocalDirAllocator(MRConfig.LOCAL_DIR).getLocalPathForWrite( |
| TaskTracker.getLocalSplitMetaFile(conf.getUser(), |
| getJobID().toString(), getTaskID() |
| .toString()), conf); |
| LOG.debug("Writing local split to " + localSplitMeta); |
| DataOutputStream out = FileSystem.getLocal(conf).create(localSplitMeta); |
| splitMetaInfo.write(out); |
| out.close(); |
| } |
| } |
| |
| |
| @Override |
| public TaskRunner createRunner(TaskTracker tracker, |
| TaskTracker.TaskInProgress tip) { |
| return new MapTaskRunner(tip, tracker, this.conf); |
| } |
| |
| @Override |
| public void write(DataOutput out) throws IOException { |
| super.write(out); |
| if (isMapOrReduce()) { |
| splitMetaInfo.write(out); |
| splitMetaInfo = null; |
| } |
| } |
| |
| @Override |
| public void readFields(DataInput in) throws IOException { |
| super.readFields(in); |
| if (isMapOrReduce()) { |
| splitMetaInfo.readFields(in); |
| } |
| } |
| |
| /** |
| * This class wraps the user's record reader to update the counters and progress |
| * as records are read. |
| * @param <K> |
| * @param <V> |
| */ |
| class TrackedRecordReader<K, V> |
| implements RecordReader<K,V> { |
| private RecordReader<K,V> rawIn; |
| private Counters.Counter inputByteCounter; |
| private Counters.Counter inputRecordCounter; |
| private TaskReporter reporter; |
| private long beforePos = -1; |
| private long afterPos = -1; |
| |
| TrackedRecordReader(RecordReader<K,V> raw, TaskReporter reporter) |
| throws IOException{ |
| rawIn = raw; |
| inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS); |
| inputByteCounter = reporter.getCounter( |
| FileInputFormat.COUNTER_GROUP, |
| FileInputFormat.BYTES_READ); |
| this.reporter = reporter; |
| } |
| |
| public K createKey() { |
| return rawIn.createKey(); |
| } |
| |
| public V createValue() { |
| return rawIn.createValue(); |
| } |
| |
| public synchronized boolean next(K key, V value) |
| throws IOException { |
| boolean ret = moveToNext(key, value); |
| if (ret) { |
| incrCounters(); |
| } |
| return ret; |
| } |
| |
| protected void incrCounters() { |
| inputRecordCounter.increment(1); |
| inputByteCounter.increment(afterPos - beforePos); |
| } |
| |
| protected synchronized boolean moveToNext(K key, V value) |
| throws IOException { |
| beforePos = getPos(); |
| boolean ret = rawIn.next(key, value); |
| afterPos = getPos(); |
| reporter.setProgress(getProgress()); |
| return ret; |
| } |
| |
| public long getPos() throws IOException { return rawIn.getPos(); } |
| public void close() throws IOException { rawIn.close(); } |
| public float getProgress() throws IOException { |
| return rawIn.getProgress(); |
| } |
| TaskReporter getTaskReporter() { |
| return reporter; |
| } |
| } |
| |
| /** |
| * This class skips the records based on the failed ranges from previous |
| * attempts. |
| */ |
| class SkippingRecordReader<K, V> extends TrackedRecordReader<K,V> { |
| private SkipRangeIterator skipIt; |
| private SequenceFile.Writer skipWriter; |
| private boolean toWriteSkipRecs; |
| private TaskUmbilicalProtocol umbilical; |
| private Counters.Counter skipRecCounter; |
| private long recIndex = -1; |
| |
| SkippingRecordReader(RecordReader<K,V> raw, TaskUmbilicalProtocol umbilical, |
| TaskReporter reporter) throws IOException{ |
| super(raw, reporter); |
| this.umbilical = umbilical; |
| this.skipRecCounter = reporter.getCounter(TaskCounter.MAP_SKIPPED_RECORDS); |
| this.toWriteSkipRecs = toWriteSkipRecs() && |
| SkipBadRecords.getSkipOutputPath(conf)!=null; |
| skipIt = getSkipRanges().skipRangeIterator(); |
| } |
| |
| public synchronized boolean next(K key, V value) |
| throws IOException { |
| if(!skipIt.hasNext()) { |
| LOG.warn("Further records got skipped."); |
| return false; |
| } |
| boolean ret = moveToNext(key, value); |
| long nextRecIndex = skipIt.next(); |
| long skip = 0; |
| while(recIndex<nextRecIndex && ret) { |
| if(toWriteSkipRecs) { |
| writeSkippedRec(key, value); |
| } |
| ret = moveToNext(key, value); |
| skip++; |
| } |
| //close the skip writer once all the ranges are skipped |
| if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) { |
| skipWriter.close(); |
| } |
| skipRecCounter.increment(skip); |
| reportNextRecordRange(umbilical, recIndex); |
| if (ret) { |
| incrCounters(); |
| } |
| return ret; |
| } |
| |
| protected synchronized boolean moveToNext(K key, V value) |
| throws IOException { |
| recIndex++; |
| return super.moveToNext(key, value); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void writeSkippedRec(K key, V value) throws IOException{ |
| if(skipWriter==null) { |
| Path skipDir = SkipBadRecords.getSkipOutputPath(conf); |
| Path skipFile = new Path(skipDir, getTaskID().toString()); |
| skipWriter = |
| SequenceFile.createWriter( |
| skipFile.getFileSystem(conf), conf, skipFile, |
| (Class<K>) createKey().getClass(), |
| (Class<V>) createValue().getClass(), |
| CompressionType.BLOCK, getTaskReporter()); |
| } |
| skipWriter.append(key, value); |
| } |
| } |
| |
| @Override |
| public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) |
| throws IOException, ClassNotFoundException, InterruptedException { |
| this.umbilical = umbilical; |
| |
| if (isMapTask()) { |
| mapPhase = getProgress().addPhase("map", 0.667f); |
| sortPhase = getProgress().addPhase("sort", 0.333f); |
| } |
| TaskReporter reporter = startReporter(umbilical); |
| |
| boolean useNewApi = job.getUseNewMapper(); |
| initialize(job, getJobID(), reporter, useNewApi); |
| |
| // check if it is a cleanupJobTask |
| if (jobCleanup) { |
| runJobCleanupTask(umbilical, reporter); |
| return; |
| } |
| if (jobSetup) { |
| runJobSetupTask(umbilical, reporter); |
| return; |
| } |
| if (taskCleanup) { |
| runTaskCleanupTask(umbilical, reporter); |
| return; |
| } |
| |
| if (useNewApi) { |
| runNewMapper(job, splitMetaInfo, umbilical, reporter); |
| } else { |
| runOldMapper(job, splitMetaInfo, umbilical, reporter); |
| } |
| done(umbilical, reporter); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private <T> T getSplitDetails(Path file, long offset) |
| throws IOException { |
| FileSystem fs = file.getFileSystem(conf); |
| FSDataInputStream inFile = fs.open(file); |
| inFile.seek(offset); |
| String className = Text.readString(inFile); |
| Class<T> cls; |
| try { |
| cls = (Class<T>) conf.getClassByName(className); |
| } catch (ClassNotFoundException ce) { |
| IOException wrap = new IOException("Split class " + className + |
| " not found"); |
| wrap.initCause(ce); |
| throw wrap; |
| } |
| SerializationFactory factory = new SerializationFactory(conf); |
| Deserializer<T> deserializer = |
| (Deserializer<T>) factory.getDeserializer(cls); |
| deserializer.open(inFile); |
| T split = deserializer.deserialize(null); |
| long pos = inFile.getPos(); |
| getCounters().findCounter( |
| TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset); |
| inFile.close(); |
| return split; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private <INKEY,INVALUE,OUTKEY,OUTVALUE> |
| void runOldMapper(final JobConf job, |
| final TaskSplitIndex splitIndex, |
| final TaskUmbilicalProtocol umbilical, |
| TaskReporter reporter |
| ) throws IOException, InterruptedException, |
| ClassNotFoundException { |
| InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()), |
| splitIndex.getStartOffset()); |
| |
| updateJobWithSplit(job, inputSplit); |
| reporter.setInputSplit(inputSplit); |
| |
| RecordReader<INKEY,INVALUE> rawIn = // open input |
| job.getInputFormat().getRecordReader(inputSplit, job, reporter); |
| RecordReader<INKEY,INVALUE> in = isSkipping() ? |
| new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) : |
| new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter); |
| job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); |
| |
| |
| int numReduceTasks = conf.getNumReduceTasks(); |
| LOG.info("numReduceTasks: " + numReduceTasks); |
| MapOutputCollector collector = null; |
| if (numReduceTasks > 0) { |
| collector = new MapOutputBuffer(umbilical, job, reporter); |
| } else { |
| collector = new DirectMapOutputCollector(umbilical, job, reporter); |
| } |
| MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner = |
| ReflectionUtils.newInstance(job.getMapRunnerClass(), job); |
| |
| try { |
| runner.run(in, new OldOutputCollector(collector, conf), reporter); |
| mapPhase.complete(); |
| setPhase(TaskStatus.Phase.SORT); |
| statusUpdate(umbilical); |
| collector.flush(); |
| } finally { |
| //close |
| in.close(); // close input |
| collector.close(); |
| } |
| } |
| |
| /** |
| * Update the job with details about the file split |
| * @param job the job configuration to update |
| * @param inputSplit the file split |
| */ |
| private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) { |
| if (inputSplit instanceof FileSplit) { |
| FileSplit fileSplit = (FileSplit) inputSplit; |
| job.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString()); |
| job.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart()); |
| job.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength()); |
| } |
| } |
| |
| static class NewTrackingRecordReader<K,V> |
| extends org.apache.hadoop.mapreduce.RecordReader<K,V> { |
| private final org.apache.hadoop.mapreduce.RecordReader<K,V> real; |
| private final org.apache.hadoop.mapreduce.Counter inputRecordCounter; |
| private final TaskReporter reporter; |
| |
| NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader<K,V> real, |
| TaskReporter reporter) { |
| this.real = real; |
| this.reporter = reporter; |
| this.inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| real.close(); |
| } |
| |
| @Override |
| public K getCurrentKey() throws IOException, InterruptedException { |
| return real.getCurrentKey(); |
| } |
| |
| @Override |
| public V getCurrentValue() throws IOException, InterruptedException { |
| return real.getCurrentValue(); |
| } |
| |
| @Override |
| public float getProgress() throws IOException, InterruptedException { |
| return real.getProgress(); |
| } |
| |
| @Override |
| public void initialize(org.apache.hadoop.mapreduce.InputSplit split, |
| org.apache.hadoop.mapreduce.TaskAttemptContext context |
| ) throws IOException, InterruptedException { |
| real.initialize(split, context); |
| } |
| |
| @Override |
| public boolean nextKeyValue() throws IOException, InterruptedException { |
| boolean result = real.nextKeyValue(); |
| if (result) { |
| inputRecordCounter.increment(1); |
| } |
| reporter.setProgress(getProgress()); |
| return result; |
| } |
| } |
| |
| /** |
| * Since the mapred and mapreduce Partitioners don't share a common interface |
| * (JobConfigurable is deprecated and a subtype of mapred.Partitioner), the |
| * partitioner lives in Old/NewOutputCollector. Note that, for map-only jobs, |
| * the configured partitioner should not be called. It's common for |
| * partitioners to compute a result mod numReduces, which causes a div0 error |
| */ |
| private static class OldOutputCollector<K,V> implements OutputCollector<K,V> { |
| private final Partitioner<K,V> partitioner; |
| private final MapOutputCollector<K,V> collector; |
| private final int numPartitions; |
| |
| @SuppressWarnings("unchecked") |
| OldOutputCollector(MapOutputCollector<K,V> collector, JobConf conf) { |
| numPartitions = conf.getNumReduceTasks(); |
| if (numPartitions > 1) { |
| partitioner = (Partitioner<K,V>) |
| ReflectionUtils.newInstance(conf.getPartitionerClass(), conf); |
| } else { |
| partitioner = new Partitioner<K,V>() { |
| @Override |
| public void configure(JobConf job) { } |
| @Override |
| public int getPartition(K key, V value, int numPartitions) { |
| return numPartitions - 1; |
| } |
| }; |
| } |
| this.collector = collector; |
| } |
| |
| @Override |
| public void collect(K key, V value) throws IOException { |
| try { |
| collector.collect(key, value, |
| partitioner.getPartition(key, value, numPartitions)); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| throw new IOException("interrupt exception", ie); |
| } |
| } |
| } |
| |
| private class NewDirectOutputCollector<K,V> |
| extends org.apache.hadoop.mapreduce.RecordWriter<K,V> { |
| private final org.apache.hadoop.mapreduce.RecordWriter out; |
| |
| private final TaskReporter reporter; |
| |
| private final Counters.Counter mapOutputRecordCounter; |
| |
| @SuppressWarnings("unchecked") |
| NewDirectOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, |
| JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) |
| throws IOException, ClassNotFoundException, InterruptedException { |
| this.reporter = reporter; |
| out = outputFormat.getRecordWriter(taskContext); |
| mapOutputRecordCounter = |
| reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public void write(K key, V value) |
| throws IOException, InterruptedException { |
| reporter.progress(); |
| out.write(key, value); |
| mapOutputRecordCounter.increment(1); |
| } |
| |
| @Override |
| public void close(TaskAttemptContext context) |
| throws IOException,InterruptedException { |
| reporter.progress(); |
| if (out != null) { |
| out.close(context); |
| } |
| } |
| } |
| |
| private class NewOutputCollector<K,V> |
| extends org.apache.hadoop.mapreduce.RecordWriter<K,V> { |
| private final MapOutputCollector<K,V> collector; |
| private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner; |
| private final int partitions; |
| |
| @SuppressWarnings("unchecked") |
| NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, |
| JobConf job, |
| TaskUmbilicalProtocol umbilical, |
| TaskReporter reporter |
| ) throws IOException, ClassNotFoundException { |
| collector = new MapOutputBuffer<K,V>(umbilical, job, reporter); |
| partitions = jobContext.getNumReduceTasks(); |
| if (partitions > 1) { |
| partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) |
| ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); |
| } else { |
| partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() { |
| @Override |
| public int getPartition(K key, V value, int numPartitions) { |
| return partitions - 1; |
| } |
| }; |
| } |
| } |
| |
| @Override |
| public void write(K key, V value) throws IOException, InterruptedException { |
| collector.collect(key, value, |
| partitioner.getPartition(key, value, partitions)); |
| } |
| |
| @Override |
| public void close(TaskAttemptContext context |
| ) throws IOException,InterruptedException { |
| try { |
| collector.flush(); |
| } catch (ClassNotFoundException cnf) { |
| throw new IOException("can't find class ", cnf); |
| } |
| collector.close(); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| private <INKEY,INVALUE,OUTKEY,OUTVALUE> |
| void runNewMapper(final JobConf job, |
| final TaskSplitIndex splitIndex, |
| final TaskUmbilicalProtocol umbilical, |
| TaskReporter reporter |
| ) throws IOException, ClassNotFoundException, |
| InterruptedException { |
| // make a task context so we can get the classes |
| org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = |
| new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, |
| getTaskID()); |
| // make a mapper |
| org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = |
| (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) |
| ReflectionUtils.newInstance(taskContext.getMapperClass(), job); |
| // make the input format |
| org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = |
| (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) |
| ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); |
| // rebuild the input split |
| org.apache.hadoop.mapreduce.InputSplit split = null; |
| split = getSplitDetails(new Path(splitIndex.getSplitLocation()), |
| splitIndex.getStartOffset()); |
| |
| org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = |
| new NewTrackingRecordReader<INKEY,INVALUE> |
| (inputFormat.createRecordReader(split, taskContext), reporter); |
| |
| job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); |
| org.apache.hadoop.mapreduce.RecordWriter output = null; |
| |
| // get an output object |
| if (job.getNumReduceTasks() == 0) { |
| output = |
| new NewDirectOutputCollector(taskContext, job, umbilical, reporter); |
| } else { |
| output = new NewOutputCollector(taskContext, job, umbilical, reporter); |
| } |
| |
| org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> |
| mapContext = |
| new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), |
| input, output, |
| committer, |
| reporter, split); |
| |
| org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context |
| mapperContext = |
| new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( |
| mapContext); |
| |
| input.initialize(split, mapperContext); |
| mapper.run(mapperContext); |
| mapPhase.complete(); |
| setPhase(TaskStatus.Phase.SORT); |
| statusUpdate(umbilical); |
| input.close(); |
| output.close(mapperContext); |
| } |
| |
| interface MapOutputCollector<K, V> { |
| |
| public void collect(K key, V value, int partition |
| ) throws IOException, InterruptedException; |
| public void close() throws IOException, InterruptedException; |
| |
| public void flush() throws IOException, InterruptedException, |
| ClassNotFoundException; |
| |
| } |
| |
| class DirectMapOutputCollector<K, V> |
| implements MapOutputCollector<K, V> { |
| |
| private RecordWriter<K, V> out = null; |
| |
| private TaskReporter reporter = null; |
| |
| private final Counters.Counter mapOutputRecordCounter; |
| |
| @SuppressWarnings("unchecked") |
| public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical, |
| JobConf job, TaskReporter reporter) throws IOException { |
| this.reporter = reporter; |
| String finalName = getOutputName(getPartition()); |
| FileSystem fs = FileSystem.get(job); |
| |
| out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter); |
| |
| mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); |
| } |
| |
| public void close() throws IOException { |
| if (this.out != null) { |
| out.close(this.reporter); |
| } |
| |
| } |
| |
| public void flush() throws IOException, InterruptedException, |
| ClassNotFoundException { |
| } |
| |
| public void collect(K key, V value, int partition) throws IOException { |
| reporter.progress(); |
| out.write(key, value); |
| mapOutputRecordCounter.increment(1); |
| } |
| |
| } |
| |
| class MapOutputBuffer<K extends Object, V extends Object> |
| implements MapOutputCollector<K, V>, IndexedSortable { |
| private final int partitions; |
| private final JobConf job; |
| private final TaskReporter reporter; |
| private final Class<K> keyClass; |
| private final Class<V> valClass; |
| private final RawComparator<K> comparator; |
| private final SerializationFactory serializationFactory; |
| private final Serializer<K> keySerializer; |
| private final Serializer<V> valSerializer; |
| private final CombinerRunner<K,V> combinerRunner; |
| private final CombineOutputCollector<K, V> combineCollector; |
| |
| // Compression for map-outputs |
| private CompressionCodec codec = null; |
| |
| // k/v accounting |
| private volatile int kvstart = 0; // marks beginning of spill |
| private volatile int kvend = 0; // marks beginning of collectable |
| private int kvindex = 0; // marks end of collected |
| private final int[] kvoffsets; // indices into kvindices |
| private final int[] kvindices; // partition, k/v offsets into kvbuffer |
| private volatile int bufstart = 0; // marks beginning of spill |
| private volatile int bufend = 0; // marks beginning of collectable |
| private volatile int bufvoid = 0; // marks the point where we should stop |
| // reading at the end of the buffer |
| private int bufindex = 0; // marks end of collected |
| private int bufmark = 0; // marks end of record |
| private byte[] kvbuffer; // main output buffer |
| private static final int PARTITION = 0; // partition offset in acct |
| private static final int KEYSTART = 1; // key offset in acct |
| private static final int VALSTART = 2; // val offset in acct |
| private static final int ACCTSIZE = 3; // total #fields in acct |
| private static final int RECSIZE = |
| (ACCTSIZE + 1) * 4; // acct bytes per record |
| |
| // spill accounting |
| private volatile int numSpills = 0; |
| private volatile Throwable sortSpillException = null; |
| private final int softRecordLimit; |
| private final int softBufferLimit; |
| private int recordRemaining; |
| private int bufferRemaining; |
| private final int minSpillsForCombine; |
| private final IndexedSorter sorter; |
| private final ReentrantLock spillLock = new ReentrantLock(); |
| private final Condition spillDone = spillLock.newCondition(); |
| private final Condition spillReady = spillLock.newCondition(); |
| private final BlockingBuffer bb = new BlockingBuffer(); |
| private volatile boolean spillThreadRunning = false; |
| private final SpillThread spillThread = new SpillThread(); |
| |
| private final FileSystem localFs; |
| private final FileSystem rfs; |
| |
| private final Counters.Counter mapOutputByteCounter; |
| private final Counters.Counter mapOutputRecordCounter; |
| private final Counters.Counter combineOutputCounter; |
| |
| private ArrayList<SpillRecord> indexCacheList; |
| private int totalIndexCacheMemory; |
| private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024; |
| |
| @SuppressWarnings("unchecked") |
| public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job, |
| TaskReporter reporter |
| ) throws IOException, ClassNotFoundException { |
| this.job = job; |
| this.reporter = reporter; |
| localFs = FileSystem.getLocal(job); |
| partitions = job.getNumReduceTasks(); |
| |
| rfs = ((LocalFileSystem)localFs).getRaw(); |
| |
| indexCacheList = new ArrayList<SpillRecord>(); |
| |
| //sanity checks |
| final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT,(float)0.8); |
| final float recper = job.getFloat(JobContext.MAP_SORT_RECORD_PERCENT,(float)0.05); |
| final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); |
| if (spillper > (float)1.0 || spillper < (float)0.0) { |
| throw new IOException("Invalid \"mapreduce.map.sort.spill.percent\": " + spillper); |
| } |
| if (recper > (float)1.0 || recper < (float)0.01) { |
| throw new IOException("Invalid \"mapreduce.map.sort.record.percent\": " + recper); |
| } |
| if ((sortmb & 0x7FF) != sortmb) { |
| throw new IOException("Invalid " + JobContext.IO_SORT_MB + ": " + sortmb); |
| } |
| sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", |
| QuickSort.class, IndexedSorter.class), job); |
| LOG.info(JobContext.IO_SORT_MB + " = " + sortmb); |
| // buffers and accounting |
| int maxMemUsage = sortmb << 20; |
| int recordCapacity = (int)(maxMemUsage * recper); |
| recordCapacity -= recordCapacity % RECSIZE; |
| kvbuffer = new byte[maxMemUsage - recordCapacity]; |
| bufvoid = kvbuffer.length; |
| recordCapacity /= RECSIZE; |
| kvoffsets = new int[recordCapacity]; |
| kvindices = new int[recordCapacity * ACCTSIZE]; |
| softBufferLimit = (int)(kvbuffer.length * spillper); |
| softRecordLimit = (int)(kvoffsets.length * spillper); |
| recordRemaining = softRecordLimit; |
| bufferRemaining = softBufferLimit; |
| LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length); |
| LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length); |
| // k/v serialization |
| comparator = job.getOutputKeyComparator(); |
| keyClass = (Class<K>)job.getMapOutputKeyClass(); |
| valClass = (Class<V>)job.getMapOutputValueClass(); |
| serializationFactory = new SerializationFactory(job); |
| keySerializer = serializationFactory.getSerializer(keyClass); |
| keySerializer.open(bb); |
| valSerializer = serializationFactory.getSerializer(valClass); |
| valSerializer.open(bb); |
| // counters |
| mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES); |
| mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); |
| Counters.Counter combineInputCounter = |
| reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS); |
| combineOutputCounter = reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); |
| // compression |
| if (job.getCompressMapOutput()) { |
| Class<? extends CompressionCodec> codecClass = |
| job.getMapOutputCompressorClass(DefaultCodec.class); |
| codec = ReflectionUtils.newInstance(codecClass, job); |
| } |
| // combiner |
| combinerRunner = CombinerRunner.create(job, getTaskID(), |
| combineInputCounter, |
| reporter, null); |
| if (combinerRunner != null) { |
| combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter); |
| } else { |
| combineCollector = null; |
| } |
| minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3); |
| spillThread.setDaemon(true); |
| spillThread.setName("SpillThread"); |
| spillLock.lock(); |
| try { |
| spillThread.start(); |
| while (!spillThreadRunning) { |
| spillDone.await(); |
| } |
| } catch (InterruptedException e) { |
| throw (IOException)new IOException("Spill thread failed to initialize" |
| ).initCause(sortSpillException); |
| } finally { |
| spillLock.unlock(); |
| } |
| if (sortSpillException != null) { |
| throw (IOException)new IOException("Spill thread failed to initialize" |
| ).initCause(sortSpillException); |
| } |
| } |
| |
| public synchronized void collect(K key, V value, int partition |
| ) throws IOException { |
| reporter.progress(); |
| if (key.getClass() != keyClass) { |
| throw new IOException("Type mismatch in key from map: expected " |
| + keyClass.getName() + ", recieved " |
| + key.getClass().getName()); |
| } |
| if (value.getClass() != valClass) { |
| throw new IOException("Type mismatch in value from map: expected " |
| + valClass.getName() + ", recieved " |
| + value.getClass().getName()); |
| } |
| final int kvnext = (kvindex + 1) % kvoffsets.length; |
| if (--recordRemaining <= 0) { |
| // Possible for check to remain < zero, if soft limit remains |
| // in force but unsatisfiable because spill is in progress |
| spillLock.lock(); |
| try { |
| boolean kvfull; |
| do { |
| if (sortSpillException != null) { |
| throw (IOException)new IOException("Spill failed" |
| ).initCause(sortSpillException); |
| } |
| // sufficient acct space |
| kvfull = kvnext == kvstart; |
| final boolean kvsoftlimit = ((kvnext > kvend) |
| ? kvnext - kvend > softRecordLimit |
| : kvend - kvnext <= kvoffsets.length - softRecordLimit); |
| if (kvstart == kvend && kvsoftlimit) { |
| LOG.info("Spilling map output: record full = " + kvfull); |
| startSpill(); |
| } |
| if (kvfull) { |
| try { |
| while (kvstart != kvend) { |
| reporter.progress(); |
| spillDone.await(); |
| } |
| } catch (InterruptedException e) { |
| throw (IOException)new IOException( |
| "Collector interrupted while waiting for the writer" |
| ).initCause(e); |
| } |
| } |
| } while (kvfull); |
| final int softOff = kvend + softRecordLimit; |
| recordRemaining = Math.min( |
| // out of acct space |
| (kvnext < kvstart |
| ? kvstart - kvnext |
| : kvoffsets.length - kvnext + kvstart), |
| // soft limit |
| (kvend < kvnext |
| ? softOff - kvnext |
| : kvnext + (softOff - kvoffsets.length))); |
| } finally { |
| spillLock.unlock(); |
| } |
| } |
| |
| try { |
| // serialize key bytes into buffer |
| int keystart = bufindex; |
| keySerializer.serialize(key); |
| if (bufindex < keystart) { |
| // wrapped the key; reset required |
| bb.reset(); |
| keystart = 0; |
| } |
| // serialize value bytes into buffer |
| final int valstart = bufindex; |
| valSerializer.serialize(value); |
| int valend = bb.markRecord(); |
| |
| if (partition < 0 || partition >= partitions) { |
| throw new IOException("Illegal partition for " + key + " (" + |
| partition + ")"); |
| } |
| |
| mapOutputRecordCounter.increment(1); |
| mapOutputByteCounter.increment(valend >= keystart |
| ? valend - keystart |
| : (bufvoid - keystart) + valend); |
| |
| // update accounting info |
| int ind = kvindex * ACCTSIZE; |
| kvoffsets[kvindex] = ind; |
| kvindices[ind + PARTITION] = partition; |
| kvindices[ind + KEYSTART] = keystart; |
| kvindices[ind + VALSTART] = valstart; |
| kvindex = kvnext; |
| } catch (MapBufferTooSmallException e) { |
| LOG.info("Record too large for in-memory buffer: " + e.getMessage()); |
| spillSingleRecord(key, value, partition); |
| mapOutputRecordCounter.increment(1); |
| return; |
| } |
| |
| } |
| |
| /** |
| * Compare logical range, st i, j MOD offset capacity. |
| * Compare by partition, then by key. |
| * @see IndexedSortable#compare |
| */ |
| public int compare(int i, int j) { |
| final int ii = kvoffsets[i % kvoffsets.length]; |
| final int ij = kvoffsets[j % kvoffsets.length]; |
| // sort by partition |
| if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) { |
| return kvindices[ii + PARTITION] - kvindices[ij + PARTITION]; |
| } |
| // sort by key |
| return comparator.compare(kvbuffer, |
| kvindices[ii + KEYSTART], |
| kvindices[ii + VALSTART] - kvindices[ii + KEYSTART], |
| kvbuffer, |
| kvindices[ij + KEYSTART], |
| kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]); |
| } |
| |
| /** |
| * Swap logical indices st i, j MOD offset capacity. |
| * @see IndexedSortable#swap |
| */ |
| public void swap(int i, int j) { |
| i %= kvoffsets.length; |
| j %= kvoffsets.length; |
| int tmp = kvoffsets[i]; |
| kvoffsets[i] = kvoffsets[j]; |
| kvoffsets[j] = tmp; |
| } |
| |
| /** |
| * Inner class managing the spill of serialized records to disk. |
| */ |
| protected class BlockingBuffer extends DataOutputStream { |
| |
| public BlockingBuffer() { |
| this(new Buffer()); |
| } |
| |
| private BlockingBuffer(OutputStream out) { |
| super(out); |
| } |
| |
| /** |
| * Mark end of record. Note that this is required if the buffer is to |
| * cut the spill in the proper place. |
| */ |
| public int markRecord() { |
| bufmark = bufindex; |
| return bufindex; |
| } |
| |
| /** |
| * Set position from last mark to end of writable buffer, then rewrite |
| * the data between last mark and kvindex. |
| * This handles a special case where the key wraps around the buffer. |
| * If the key is to be passed to a RawComparator, then it must be |
| * contiguous in the buffer. This recopies the data in the buffer back |
| * into itself, but starting at the beginning of the buffer. Note that |
| * reset() should <b>only</b> be called immediately after detecting |
| * this condition. To call it at any other time is undefined and would |
| * likely result in data loss or corruption. |
| * @see #markRecord() |
| */ |
| protected void reset() throws IOException { |
| // spillLock unnecessary; If spill wraps, then |
| // bufindex < bufstart < bufend so contention is impossible |
| // a stale value for bufstart does not affect correctness, since |
| // we can only get false negatives that force the more |
| // conservative path |
| int headbytelen = bufvoid - bufmark; |
| bufvoid = bufmark; |
| if (bufindex + headbytelen < bufstart) { |
| System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex); |
| System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen); |
| bufindex += headbytelen; |
| } else { |
| byte[] keytmp = new byte[bufindex]; |
| System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex); |
| bufindex = 0; |
| out.write(kvbuffer, bufmark, headbytelen); |
| out.write(keytmp); |
| } |
| } |
| } |
| |
| public class Buffer extends OutputStream { |
| private final byte[] scratch = new byte[1]; |
| |
| @Override |
| public void write(int v) |
| throws IOException { |
| scratch[0] = (byte)v; |
| write(scratch, 0, 1); |
| } |
| |
| /** |
| * Attempt to write a sequence of bytes to the collection buffer. |
| * This method will block if the spill thread is running and it |
| * cannot write. |
| * @throws MapBufferTooSmallException if record is too large to |
| * deserialize into the collection buffer. |
| */ |
| @Override |
| public void write(byte b[], int off, int len) |
| throws IOException { |
| boolean buffull = false; |
| boolean wrap = false; |
| bufferRemaining -= len; |
| if (bufferRemaining <= 0) { |
| // writing these bytes could exhaust available buffer space |
| // check if spill or blocking is necessary |
| spillLock.lock(); |
| try { |
| do { |
| if (sortSpillException != null) { |
| throw (IOException)new IOException("Spill failed" |
| ).initCause(sortSpillException); |
| } |
| |
| // sufficient buffer space? |
| if (bufstart <= bufend && bufend <= bufindex) { |
| buffull = bufindex + len > bufvoid; |
| wrap = (bufvoid - bufindex) + bufstart > len; |
| } else { |
| // bufindex <= bufstart <= bufend |
| // bufend <= bufindex <= bufstart |
| wrap = false; |
| buffull = bufindex + len > bufstart; |
| } |
| |
| if (kvstart == kvend) { |
| // spill thread not running |
| if (kvend != kvindex) { |
| // we have records we can spill |
| final boolean bufsoftlimit = (bufindex > bufend) |
| ? bufindex - bufend > softBufferLimit |
| : bufend - bufindex < bufvoid - softBufferLimit; |
| if (bufsoftlimit || (buffull && !wrap)) { |
| LOG.info("Spilling map output: buffer full= " + (buffull && !wrap)); |
| startSpill(); |
| } |
| } else if (buffull && !wrap) { |
| // We have no buffered records, and this record is too large |
| // to write into kvbuffer. We must spill it directly from |
| // collect |
| final int size = ((bufend <= bufindex) |
| ? bufindex - bufend |
| : (bufvoid - bufend) + bufindex) + len; |
| bufstart = bufend = bufindex = bufmark = 0; |
| kvstart = kvend = kvindex = 0; |
| bufvoid = kvbuffer.length; |
| throw new MapBufferTooSmallException(size + " bytes"); |
| } |
| } |
| |
| if (buffull && !wrap) { |
| try { |
| while (kvstart != kvend) { |
| reporter.progress(); |
| spillDone.await(); |
| } |
| } catch (InterruptedException e) { |
| throw (IOException)new IOException( |
| "Buffer interrupted while waiting for the writer" |
| ).initCause(e); |
| } |
| } |
| } while (buffull && !wrap); |
| final int softOff = bufend + softBufferLimit; |
| bufferRemaining = Math.min( |
| // out of buffer space |
| (bufindex < bufstart |
| ? bufstart - bufindex |
| : kvbuffer.length - bufindex + bufstart), |
| // soft limit |
| (bufend < bufindex |
| ? softOff - bufindex |
| : bufindex + (softOff - kvbuffer.length))); |
| } finally { |
| spillLock.unlock(); |
| } |
| } else { |
| buffull = bufindex + len > bufvoid; |
| } |
| // here, we know that we have sufficient space to write |
| if (buffull) { |
| final int gaplen = bufvoid - bufindex; |
| System.arraycopy(b, off, kvbuffer, bufindex, gaplen); |
| len -= gaplen; |
| off += gaplen; |
| bufindex = 0; |
| } |
| System.arraycopy(b, off, kvbuffer, bufindex, len); |
| bufindex += len; |
| bufferRemaining -= len; |
| } |
| } |
| |
| public void flush() throws IOException, ClassNotFoundException, |
| InterruptedException { |
| LOG.info("Starting flush of map output"); |
| spillLock.lock(); |
| try { |
| while (kvstart != kvend) { |
| reporter.progress(); |
| spillDone.await(); |
| } |
| if (sortSpillException != null) { |
| throw (IOException)new IOException("Spill failed" |
| ).initCause(sortSpillException); |
| } |
| if (kvend != kvindex) { |
| kvend = kvindex; |
| bufend = bufmark; |
| sortAndSpill(); |
| } |
| } catch (InterruptedException e) { |
| throw (IOException)new IOException( |
| "Buffer interrupted while waiting for the writer" |
| ).initCause(e); |
| } finally { |
| spillLock.unlock(); |
| } |
| assert !spillLock.isHeldByCurrentThread(); |
| // shut down spill thread and wait for it to exit. Since the preceding |
| // ensures that it is finished with its work (and sortAndSpill did not |
| // throw), we elect to use an interrupt instead of setting a flag. |
| // Spilling simultaneously from this thread while the spill thread |
| // finishes its work might be both a useful way to extend this and also |
| // sufficient motivation for the latter approach. |
| try { |
| spillThread.interrupt(); |
| spillThread.join(); |
| } catch (InterruptedException e) { |
| throw (IOException)new IOException("Spill failed" |
| ).initCause(e); |
| } |
| // release sort buffer before the merge |
| kvbuffer = null; |
| mergeParts(); |
| } |
| |
| public void close() { } |
| |
| protected class SpillThread extends Thread { |
| |
| @Override |
| public void run() { |
| spillLock.lock(); |
| spillThreadRunning = true; |
| try { |
| while (true) { |
| spillDone.signal(); |
| while (kvstart == kvend) { |
| spillReady.await(); |
| } |
| try { |
| spillLock.unlock(); |
| sortAndSpill(); |
| } catch (Exception e) { |
| sortSpillException = e; |
| } catch (Throwable t) { |
| sortSpillException = t; |
| String logMsg = "Task " + getTaskID() + " failed : " |
| + StringUtils.stringifyException(t); |
| reportFatalError(getTaskID(), t, logMsg); |
| } finally { |
| spillLock.lock(); |
| if (bufend < bufindex && bufindex < bufstart) { |
| bufvoid = kvbuffer.length; |
| } |
| kvstart = kvend; |
| bufstart = bufend; |
| } |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } finally { |
| spillLock.unlock(); |
| spillThreadRunning = false; |
| } |
| } |
| } |
| |
| private void startSpill() { |
| LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark + |
| "; bufvoid = " + bufvoid); |
| LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex + |
| "; length = " + kvoffsets.length); |
| kvend = kvindex; |
| bufend = bufmark; |
| spillReady.signal(); |
| } |
| |
| private void sortAndSpill() throws IOException, ClassNotFoundException, |
| InterruptedException { |
| //approximate the length of the output file to be the length of the |
| //buffer + header lengths for the partitions |
| long size = (bufend >= bufstart |
| ? bufend - bufstart |
| : (bufvoid - bufend) + bufstart) + |
| partitions * APPROX_HEADER_LENGTH; |
| FSDataOutputStream out = null; |
| try { |
| // create spill file |
| final SpillRecord spillRec = new SpillRecord(partitions); |
| final Path filename = |
| mapOutputFile.getSpillFileForWrite(numSpills, size); |
| out = rfs.create(filename); |
| |
| final int endPosition = (kvend > kvstart) |
| ? kvend |
| : kvoffsets.length + kvend; |
| sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); |
| int spindex = kvstart; |
| IndexRecord rec = new IndexRecord(); |
| InMemValBytes value = new InMemValBytes(); |
| for (int i = 0; i < partitions; ++i) { |
| IFile.Writer<K, V> writer = null; |
| try { |
| long segmentStart = out.getPos(); |
| writer = new Writer<K, V>(job, out, keyClass, valClass, codec, |
| spilledRecordsCounter); |
| if (combinerRunner == null) { |
| // spill directly |
| DataInputBuffer key = new DataInputBuffer(); |
| while (spindex < endPosition && |
| kvindices[kvoffsets[spindex % kvoffsets.length] |
| + PARTITION] == i) { |
| final int kvoff = kvoffsets[spindex % kvoffsets.length]; |
| getVBytesForOffset(kvoff, value); |
| key.reset(kvbuffer, kvindices[kvoff + KEYSTART], |
| (kvindices[kvoff + VALSTART] - |
| kvindices[kvoff + KEYSTART])); |
| writer.append(key, value); |
| ++spindex; |
| } |
| } else { |
| int spstart = spindex; |
| while (spindex < endPosition && |
| kvindices[kvoffsets[spindex % kvoffsets.length] |
| + PARTITION] == i) { |
| ++spindex; |
| } |
| // Note: we would like to avoid the combiner if we've fewer |
| // than some threshold of records for a partition |
| if (spstart != spindex) { |
| combineCollector.setWriter(writer); |
| RawKeyValueIterator kvIter = |
| new MRResultIterator(spstart, spindex); |
| combinerRunner.combine(kvIter, combineCollector); |
| } |
| } |
| |
| // close the writer |
| writer.close(); |
| |
| // record offsets |
| rec.startOffset = segmentStart; |
| rec.rawLength = writer.getRawLength(); |
| rec.partLength = writer.getCompressedLength(); |
| spillRec.putIndex(rec, i); |
| |
| writer = null; |
| } finally { |
| if (null != writer) writer.close(); |
| } |
| } |
| |
| if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { |
| // create spill index file |
| Path indexFilename = |
| mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions |
| * MAP_OUTPUT_INDEX_RECORD_LENGTH); |
| spillRec.writeToFile(indexFilename, job); |
| } else { |
| indexCacheList.add(spillRec); |
| totalIndexCacheMemory += |
| spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; |
| } |
| LOG.info("Finished spill " + numSpills); |
| ++numSpills; |
| } finally { |
| if (out != null) out.close(); |
| } |
| } |
| |
| /** |
| * Handles the degenerate case where serialization fails to fit in |
| * the in-memory buffer, so we must spill the record from collect |
| * directly to a spill file. Consider this "losing". |
| */ |
| private void spillSingleRecord(final K key, final V value, |
| int partition) throws IOException { |
| long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH; |
| FSDataOutputStream out = null; |
| try { |
| // create spill file |
| final SpillRecord spillRec = new SpillRecord(partitions); |
| final Path filename = |
| mapOutputFile.getSpillFileForWrite(numSpills, size); |
| out = rfs.create(filename); |
| |
| // we don't run the combiner for a single record |
| IndexRecord rec = new IndexRecord(); |
| for (int i = 0; i < partitions; ++i) { |
| IFile.Writer<K, V> writer = null; |
| try { |
| long segmentStart = out.getPos(); |
| // Create a new codec, don't care! |
| writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec, |
| spilledRecordsCounter); |
| |
| if (i == partition) { |
| final long recordStart = out.getPos(); |
| writer.append(key, value); |
| // Note that our map byte count will not be accurate with |
| // compression |
| mapOutputByteCounter.increment(out.getPos() - recordStart); |
| } |
| writer.close(); |
| |
| // record offsets |
| rec.startOffset = segmentStart; |
| rec.rawLength = writer.getRawLength(); |
| rec.partLength = writer.getCompressedLength(); |
| spillRec.putIndex(rec, i); |
| |
| writer = null; |
| } catch (IOException e) { |
| if (null != writer) writer.close(); |
| throw e; |
| } |
| } |
| if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { |
| // create spill index file |
| Path indexFilename = |
| mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions |
| * MAP_OUTPUT_INDEX_RECORD_LENGTH); |
| spillRec.writeToFile(indexFilename, job); |
| } else { |
| indexCacheList.add(spillRec); |
| totalIndexCacheMemory += |
| spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; |
| } |
| ++numSpills; |
| } finally { |
| if (out != null) out.close(); |
| } |
| } |
| |
| /** |
| * Given an offset, populate vbytes with the associated set of |
| * deserialized value bytes. Should only be called during a spill. |
| */ |
| private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) { |
| final int nextindex = (kvoff / ACCTSIZE == |
| (kvend - 1 + kvoffsets.length) % kvoffsets.length) |
| ? bufend |
| : kvindices[(kvoff + ACCTSIZE + KEYSTART) % kvindices.length]; |
| int vallen = (nextindex >= kvindices[kvoff + VALSTART]) |
| ? nextindex - kvindices[kvoff + VALSTART] |
| : (bufvoid - kvindices[kvoff + VALSTART]) + nextindex; |
| vbytes.reset(kvbuffer, kvindices[kvoff + VALSTART], vallen); |
| } |
| |
| /** |
| * Inner class wrapping valuebytes, used for appendRaw. |
| */ |
| protected class InMemValBytes extends DataInputBuffer { |
| private byte[] buffer; |
| private int start; |
| private int length; |
| |
| public void reset(byte[] buffer, int start, int length) { |
| this.buffer = buffer; |
| this.start = start; |
| this.length = length; |
| |
| if (start + length > bufvoid) { |
| this.buffer = new byte[this.length]; |
| final int taillen = bufvoid - start; |
| System.arraycopy(buffer, start, this.buffer, 0, taillen); |
| System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen); |
| this.start = 0; |
| } |
| |
| super.reset(this.buffer, this.start, this.length); |
| } |
| } |
| |
| protected class MRResultIterator implements RawKeyValueIterator { |
| private final DataInputBuffer keybuf = new DataInputBuffer(); |
| private final InMemValBytes vbytes = new InMemValBytes(); |
| private final int end; |
| private int current; |
| public MRResultIterator(int start, int end) { |
| this.end = end; |
| current = start - 1; |
| } |
| public boolean next() throws IOException { |
| return ++current < end; |
| } |
| public DataInputBuffer getKey() throws IOException { |
| final int kvoff = kvoffsets[current % kvoffsets.length]; |
| keybuf.reset(kvbuffer, kvindices[kvoff + KEYSTART], |
| kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART]); |
| return keybuf; |
| } |
| public DataInputBuffer getValue() throws IOException { |
| getVBytesForOffset(kvoffsets[current % kvoffsets.length], vbytes); |
| return vbytes; |
| } |
| public Progress getProgress() { |
| return null; |
| } |
| public void close() { } |
| } |
| |
| private void mergeParts() throws IOException, InterruptedException, |
| ClassNotFoundException { |
| // get the approximate size of the final output/index files |
| long finalOutFileSize = 0; |
| long finalIndexFileSize = 0; |
| final Path[] filename = new Path[numSpills]; |
| final TaskAttemptID mapId = getTaskID(); |
| |
| for(int i = 0; i < numSpills; i++) { |
| filename[i] = mapOutputFile.getSpillFile(i); |
| finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); |
| } |
| if (numSpills == 1) { //the spill is the final output |
| rfs.rename(filename[0], |
| new Path(filename[0].getParent(), "file.out")); |
| if (indexCacheList.size() == 0) { |
| rfs.rename(mapOutputFile.getSpillIndexFile(0), |
| new Path(filename[0].getParent(),"file.out.index")); |
| } else { |
| indexCacheList.get(0).writeToFile( |
| new Path(filename[0].getParent(),"file.out.index"), job); |
| } |
| return; |
| } |
| |
| // read in paged indices |
| for (int i = indexCacheList.size(); i < numSpills; ++i) { |
| Path indexFileName = mapOutputFile.getSpillIndexFile(i); |
| indexCacheList.add(new SpillRecord(indexFileName, job)); |
| } |
| |
| //make correction in the length to include the sequence file header |
| //lengths for each partition |
| finalOutFileSize += partitions * APPROX_HEADER_LENGTH; |
| finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH; |
| Path finalOutputFile = |
| mapOutputFile.getOutputFileForWrite(finalOutFileSize); |
| Path finalIndexFile = |
| mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); |
| |
| //The output stream for the final single output file |
| FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); |
| |
| if (numSpills == 0) { |
| //create dummy files |
| IndexRecord rec = new IndexRecord(); |
| SpillRecord sr = new SpillRecord(partitions); |
| try { |
| for (int i = 0; i < partitions; i++) { |
| long segmentStart = finalOut.getPos(); |
| Writer<K, V> writer = |
| new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null); |
| writer.close(); |
| rec.startOffset = segmentStart; |
| rec.rawLength = writer.getRawLength(); |
| rec.partLength = writer.getCompressedLength(); |
| sr.putIndex(rec, i); |
| } |
| sr.writeToFile(finalIndexFile, job); |
| } finally { |
| finalOut.close(); |
| } |
| return; |
| } |
| { |
| sortPhase.addPhases(partitions); // Divide sort phase into sub-phases |
| Merger.considerFinalMergeForProgress(); |
| |
| IndexRecord rec = new IndexRecord(); |
| final SpillRecord spillRec = new SpillRecord(partitions); |
| for (int parts = 0; parts < partitions; parts++) { |
| //create the segments to be merged |
| List<Segment<K,V>> segmentList = |
| new ArrayList<Segment<K, V>>(numSpills); |
| for(int i = 0; i < numSpills; i++) { |
| IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); |
| |
| Segment<K,V> s = |
| new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset, |
| indexRecord.partLength, codec, true); |
| segmentList.add(i, s); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("MapId=" + mapId + " Reducer=" + parts + |
| "Spill =" + i + "(" + indexRecord.startOffset + "," + |
| indexRecord.rawLength + ", " + indexRecord.partLength + ")"); |
| } |
| } |
| |
| int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100); |
| // sort the segments only if there are intermediate merges |
| boolean sortSegments = segmentList.size() > mergeFactor; |
| //merge |
| @SuppressWarnings("unchecked") |
| RawKeyValueIterator kvIter = Merger.merge(job, rfs, |
| keyClass, valClass, codec, |
| segmentList, mergeFactor, |
| new Path(mapId.toString()), |
| job.getOutputKeyComparator(), reporter, sortSegments, |
| null, spilledRecordsCounter, sortPhase.phase()); |
| |
| //write merged output to disk |
| long segmentStart = finalOut.getPos(); |
| Writer<K, V> writer = |
| new Writer<K, V>(job, finalOut, keyClass, valClass, codec, |
| spilledRecordsCounter); |
| if (combinerRunner == null || numSpills < minSpillsForCombine) { |
| Merger.writeFile(kvIter, writer, reporter, job); |
| } else { |
| combineCollector.setWriter(writer); |
| combinerRunner.combine(kvIter, combineCollector); |
| } |
| |
| //close |
| writer.close(); |
| |
| sortPhase.startNextPhase(); |
| |
| // record offsets |
| rec.startOffset = segmentStart; |
| rec.rawLength = writer.getRawLength(); |
| rec.partLength = writer.getCompressedLength(); |
| spillRec.putIndex(rec, parts); |
| } |
| spillRec.writeToFile(finalIndexFile, job); |
| finalOut.close(); |
| for(int i = 0; i < numSpills; i++) { |
| rfs.delete(filename[i],true); |
| } |
| } |
| } |
| |
| } // MapOutputBuffer |
| |
| /** |
| * Exception indicating that the allocated sort buffer is insufficient |
| * to hold the current record. |
| */ |
| @SuppressWarnings("serial") |
| private static class MapBufferTooSmallException extends IOException { |
| public MapBufferTooSmallException(String s) { |
| super(s); |
| } |
| } |
| |
| } |