| package org.apache.tez.mapreduce.output; |
| |
| import java.io.IOException; |
| import java.text.NumberFormat; |
| import java.util.List; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.FileSystem.Statistics; |
| import org.apache.hadoop.mapred.FileOutputCommitter; |
| import org.apache.hadoop.mapred.FileOutputFormat; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.JobContext; |
| import org.apache.hadoop.mapred.TaskAttemptID; |
| import org.apache.hadoop.mapred.TaskID; |
| import org.apache.hadoop.mapreduce.OutputCommitter; |
| import org.apache.hadoop.mapreduce.OutputFormat; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.TaskType; |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.tez.common.TezUtils; |
| import org.apache.tez.common.counters.TaskCounter; |
| import org.apache.tez.common.counters.TezCounter; |
| import org.apache.tez.mapreduce.common.Utils; |
| import org.apache.tez.mapreduce.hadoop.MRConfig; |
| import org.apache.tez.mapreduce.hadoop.MRJobConfig; |
| import org.apache.tez.mapreduce.hadoop.mapred.MRReporter; |
| import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl; |
| import org.apache.tez.mapreduce.processor.MRTaskReporter; |
| import org.apache.tez.runtime.api.Event; |
| import org.apache.tez.runtime.api.LogicalOutput; |
| import org.apache.tez.runtime.api.TezOutputContext; |
| import org.apache.tez.runtime.library.api.KVWriter; |
| |
| public class MROutput implements LogicalOutput { |
| |
| private static final Log LOG = LogFactory.getLog(MROutput.class); |
| |
| private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance(); |
| static { |
| NUMBER_FORMAT.setMinimumIntegerDigits(5); |
| NUMBER_FORMAT.setGroupingUsed(false); |
| } |
| |
| private TezOutputContext outputContext; |
| private JobConf jobConf; |
| boolean useNewApi; |
| private AtomicBoolean closed = new AtomicBoolean(false); |
| |
| @SuppressWarnings("rawtypes") |
| org.apache.hadoop.mapreduce.OutputFormat newOutputFormat; |
| @SuppressWarnings("rawtypes") |
| org.apache.hadoop.mapreduce.RecordWriter newRecordWriter; |
| |
| @SuppressWarnings("rawtypes") |
| org.apache.hadoop.mapred.OutputFormat oldOutputFormat; |
| @SuppressWarnings("rawtypes") |
| org.apache.hadoop.mapred.RecordWriter oldRecordWriter; |
| |
| private TezCounter outputRecordCounter; |
| private TezCounter fileOutputByteCounter; |
| private List<Statistics> fsStats; |
| |
| private TaskAttemptContext newApiTaskAttemptContext; |
| private org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext; |
| |
| private boolean isMapperOutput; |
| |
| private OutputCommitter committer; |
| |
| @Override |
| public List<Event> initialize(TezOutputContext outputContext) |
| throws IOException, InterruptedException { |
| LOG.info("Initializing Simple Output"); |
| this.outputContext = outputContext; |
| Configuration conf = TezUtils.createConfFromUserPayload( |
| outputContext.getUserPayload()); |
| this.jobConf = new JobConf(conf); |
| this.useNewApi = this.jobConf.getUseNewMapper(); |
| this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR, |
| false); |
| jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, |
| outputContext.getDAGAttemptNumber()); |
| |
| outputRecordCounter = outputContext.getCounters().findCounter( |
| TaskCounter.MAP_OUTPUT_RECORDS); |
| fileOutputByteCounter = outputContext.getCounters().findCounter( |
| FileOutputFormatCounter.BYTES_WRITTEN); |
| |
| if (useNewApi) { |
| newApiTaskAttemptContext = createTaskAttemptContext(); |
| try { |
| newOutputFormat = |
| ReflectionUtils.newInstance( |
| newApiTaskAttemptContext.getOutputFormatClass(), jobConf); |
| } catch (ClassNotFoundException cnfe) { |
| throw new IOException(cnfe); |
| } |
| |
| List<Statistics> matchedStats = null; |
| if (newOutputFormat instanceof |
| org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) { |
| matchedStats = |
| Utils.getFsStatistics( |
| org.apache.hadoop.mapreduce.lib.output.FileOutputFormat |
| .getOutputPath(newApiTaskAttemptContext), |
| jobConf); |
| } |
| fsStats = matchedStats; |
| |
| long bytesOutPrev = getOutputBytes(); |
| try { |
| newRecordWriter = |
| newOutputFormat.getRecordWriter(newApiTaskAttemptContext); |
| } catch (InterruptedException e) { |
| throw new IOException("Interrupted while creating record writer", e); |
| } |
| long bytesOutCurr = getOutputBytes(); |
| fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); |
| } else { |
| TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString( |
| outputContext.getApplicationId().getClusterTimestamp()), |
| outputContext.getApplicationId().getId(), |
| (isMapperOutput ? TaskType.MAP : TaskType.REDUCE), |
| outputContext.getTaskIndex()), |
| outputContext.getTaskAttemptNumber()); |
| jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString()); |
| jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString()); |
| jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput); |
| jobConf.setInt(JobContext.TASK_PARTITION, |
| taskAttemptId.getTaskID().getId()); |
| jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString()); |
| |
| oldApiTaskAttemptContext = |
| new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl( |
| jobConf, taskAttemptId, |
| new MRTaskReporter(outputContext)); |
| oldOutputFormat = jobConf.getOutputFormat(); |
| |
| List<Statistics> matchedStats = null; |
| if (oldOutputFormat |
| instanceof org.apache.hadoop.mapred.FileOutputFormat) { |
| matchedStats = |
| Utils.getFsStatistics( |
| org.apache.hadoop.mapred.FileOutputFormat.getOutputPath( |
| jobConf), |
| jobConf); |
| } |
| fsStats = matchedStats; |
| |
| FileSystem fs = FileSystem.get(jobConf); |
| String finalName = getOutputName(); |
| |
| long bytesOutPrev = getOutputBytes(); |
| oldRecordWriter = |
| oldOutputFormat.getRecordWriter( |
| fs, jobConf, finalName, new MRReporter(outputContext)); |
| long bytesOutCurr = getOutputBytes(); |
| fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); |
| } |
| initCommitter(jobConf, useNewApi); |
| |
| LOG.info("Initialized Simple Output" |
| + ", using_new_api: " + useNewApi); |
| return null; |
| } |
| |
| public void initCommitter(JobConf job, boolean useNewApi) |
| throws IOException, InterruptedException { |
| |
| if (useNewApi) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("using new api for output committer"); |
| } |
| |
| OutputFormat<?, ?> outputFormat = null; |
| try { |
| outputFormat = ReflectionUtils.newInstance( |
| newApiTaskAttemptContext.getOutputFormatClass(), job); |
| } catch (ClassNotFoundException cnfe) { |
| throw new IOException("Unknown OutputFormat", cnfe); |
| } |
| this.committer = outputFormat.getOutputCommitter( |
| newApiTaskAttemptContext); |
| } else { |
| this.committer = job.getOutputCommitter(); |
| } |
| |
| Path outputPath = FileOutputFormat.getOutputPath(job); |
| if (outputPath != null) { |
| if ((this.committer instanceof FileOutputCommitter)) { |
| FileOutputFormat.setWorkOutputPath(job, |
| ((FileOutputCommitter) this.committer).getTaskAttemptPath( |
| oldApiTaskAttemptContext)); |
| } else { |
| FileOutputFormat.setWorkOutputPath(job, outputPath); |
| } |
| } |
| if (useNewApi) { |
| this.committer.setupTask(newApiTaskAttemptContext); |
| } else { |
| this.committer.setupTask(oldApiTaskAttemptContext); |
| } |
| } |
| |
| public boolean isCommitRequired() throws IOException { |
| if (useNewApi) { |
| return committer.needsTaskCommit(newApiTaskAttemptContext); |
| } else { |
| return committer.needsTaskCommit(oldApiTaskAttemptContext); |
| } |
| } |
| |
| private TaskAttemptContext createTaskAttemptContext() { |
| return new TaskAttemptContextImpl(this.jobConf, outputContext, |
| isMapperOutput); |
| } |
| |
| private long getOutputBytes() { |
| if (fsStats == null) return 0; |
| long bytesWritten = 0; |
| for (Statistics stat: fsStats) { |
| bytesWritten = bytesWritten + stat.getBytesWritten(); |
| } |
| return bytesWritten; |
| } |
| |
| private String getOutputName() { |
| return "part-" + NUMBER_FORMAT.format(outputContext.getTaskIndex()); |
| } |
| |
| @Override |
| public KVWriter getWriter() throws IOException { |
| return new KVWriter() { |
| private final boolean useNewWriter = useNewApi; |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public void write(Object key, Object value) throws IOException { |
| long bytesOutPrev = getOutputBytes(); |
| if (useNewWriter) { |
| try { |
| newRecordWriter.write(key, value); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException("Interrupted while writing next key-value",e); |
| } |
| } else { |
| oldRecordWriter.write(key, value); |
| } |
| |
| long bytesOutCurr = getOutputBytes(); |
| fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); |
| outputRecordCounter.increment(1); |
| } |
| }; |
| } |
| |
| @Override |
| public void handleEvents(List<Event> outputEvents) { |
| // Not expecting any events at the moment. |
| } |
| |
| @Override |
| public synchronized List<Event> close() throws IOException { |
| if (closed.getAndSet(true)) { |
| return null; |
| } |
| |
| LOG.info("Closing Simple Output"); |
| long bytesOutPrev = getOutputBytes(); |
| if (useNewApi) { |
| try { |
| newRecordWriter.close(newApiTaskAttemptContext); |
| } catch (InterruptedException e) { |
| throw new IOException("Interrupted while closing record writer", e); |
| } |
| } else { |
| oldRecordWriter.close(null); |
| } |
| long bytesOutCurr = getOutputBytes(); |
| fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); |
| LOG.info("Closed Simple Output"); |
| return null; |
| } |
| |
| @Override |
| public void setNumPhysicalOutputs(int numOutputs) { |
| // Nothing to do for now |
| } |
| |
| /** |
| * MROutput expects that a Processor call commit prior to the |
| * Processor's completion |
| * @throws IOException |
| */ |
| public void commit() throws IOException { |
| close(); |
| if (useNewApi) { |
| committer.commitTask(newApiTaskAttemptContext); |
| } else { |
| committer.commitTask(oldApiTaskAttemptContext); |
| } |
| } |
| |
| |
| /** |
| * MROutput expects that a Processor call abort in case of any error |
| * ( including an error during commit ) prior to the Processor's completion |
| * @throws IOException |
| */ |
| public void abort() throws IOException { |
| close(); |
| if (useNewApi) { |
| committer.abortTask(newApiTaskAttemptContext); |
| } else { |
| committer.abortTask(oldApiTaskAttemptContext); |
| } |
| } |
| |
| } |