blob: 11184e49e97c514c6ad225c27743c75c5758e774 [file] [log] [blame]
package org.apache.tez.mapreduce.output;
import java.io.IOException;
import java.text.NumberFormat;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskID;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.mapreduce.common.Utils;
import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.library.api.KVWriter;
public class MROutput implements LogicalOutput {
private static final Log LOG = LogFactory.getLog(MROutput.class);
private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
static {
NUMBER_FORMAT.setMinimumIntegerDigits(5);
NUMBER_FORMAT.setGroupingUsed(false);
}
private TezOutputContext outputContext;
private JobConf jobConf;
boolean useNewApi;
private AtomicBoolean closed = new AtomicBoolean(false);
@SuppressWarnings("rawtypes")
org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
@SuppressWarnings("rawtypes")
org.apache.hadoop.mapreduce.RecordWriter newRecordWriter;
@SuppressWarnings("rawtypes")
org.apache.hadoop.mapred.OutputFormat oldOutputFormat;
@SuppressWarnings("rawtypes")
org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
private TezCounter outputRecordCounter;
private TezCounter fileOutputByteCounter;
private List<Statistics> fsStats;
private TaskAttemptContext newApiTaskAttemptContext;
private org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext;
private boolean isMapperOutput;
private OutputCommitter committer;
@Override
public List<Event> initialize(TezOutputContext outputContext)
throws IOException, InterruptedException {
LOG.info("Initializing Simple Output");
this.outputContext = outputContext;
Configuration conf = TezUtils.createConfFromUserPayload(
outputContext.getUserPayload());
this.jobConf = new JobConf(conf);
this.useNewApi = this.jobConf.getUseNewMapper();
this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
false);
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
outputContext.getDAGAttemptNumber());
outputRecordCounter = outputContext.getCounters().findCounter(
TaskCounter.MAP_OUTPUT_RECORDS);
fileOutputByteCounter = outputContext.getCounters().findCounter(
FileOutputFormatCounter.BYTES_WRITTEN);
if (useNewApi) {
newApiTaskAttemptContext = createTaskAttemptContext();
try {
newOutputFormat =
ReflectionUtils.newInstance(
newApiTaskAttemptContext.getOutputFormatClass(), jobConf);
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
}
List<Statistics> matchedStats = null;
if (newOutputFormat instanceof
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
matchedStats =
Utils.getFsStatistics(
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
.getOutputPath(newApiTaskAttemptContext),
jobConf);
}
fsStats = matchedStats;
long bytesOutPrev = getOutputBytes();
try {
newRecordWriter =
newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
} catch (InterruptedException e) {
throw new IOException("Interrupted while creating record writer", e);
}
long bytesOutCurr = getOutputBytes();
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
} else {
TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString(
outputContext.getApplicationId().getClusterTimestamp()),
outputContext.getApplicationId().getId(),
(isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
outputContext.getTaskIndex()),
outputContext.getTaskAttemptNumber());
jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
jobConf.setInt(JobContext.TASK_PARTITION,
taskAttemptId.getTaskID().getId());
jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
oldApiTaskAttemptContext =
new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
jobConf, taskAttemptId,
new MRTaskReporter(outputContext));
oldOutputFormat = jobConf.getOutputFormat();
List<Statistics> matchedStats = null;
if (oldOutputFormat
instanceof org.apache.hadoop.mapred.FileOutputFormat) {
matchedStats =
Utils.getFsStatistics(
org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
jobConf),
jobConf);
}
fsStats = matchedStats;
FileSystem fs = FileSystem.get(jobConf);
String finalName = getOutputName();
long bytesOutPrev = getOutputBytes();
oldRecordWriter =
oldOutputFormat.getRecordWriter(
fs, jobConf, finalName, new MRReporter(outputContext));
long bytesOutCurr = getOutputBytes();
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
}
initCommitter(jobConf, useNewApi);
LOG.info("Initialized Simple Output"
+ ", using_new_api: " + useNewApi);
return null;
}
public void initCommitter(JobConf job, boolean useNewApi)
throws IOException, InterruptedException {
if (useNewApi) {
if (LOG.isDebugEnabled()) {
LOG.debug("using new api for output committer");
}
OutputFormat<?, ?> outputFormat = null;
try {
outputFormat = ReflectionUtils.newInstance(
newApiTaskAttemptContext.getOutputFormatClass(), job);
} catch (ClassNotFoundException cnfe) {
throw new IOException("Unknown OutputFormat", cnfe);
}
this.committer = outputFormat.getOutputCommitter(
newApiTaskAttemptContext);
} else {
this.committer = job.getOutputCommitter();
}
Path outputPath = FileOutputFormat.getOutputPath(job);
if (outputPath != null) {
if ((this.committer instanceof FileOutputCommitter)) {
FileOutputFormat.setWorkOutputPath(job,
((FileOutputCommitter) this.committer).getTaskAttemptPath(
oldApiTaskAttemptContext));
} else {
FileOutputFormat.setWorkOutputPath(job, outputPath);
}
}
if (useNewApi) {
this.committer.setupTask(newApiTaskAttemptContext);
} else {
this.committer.setupTask(oldApiTaskAttemptContext);
}
}
public boolean isCommitRequired() throws IOException {
if (useNewApi) {
return committer.needsTaskCommit(newApiTaskAttemptContext);
} else {
return committer.needsTaskCommit(oldApiTaskAttemptContext);
}
}
private TaskAttemptContext createTaskAttemptContext() {
return new TaskAttemptContextImpl(this.jobConf, outputContext,
isMapperOutput);
}
private long getOutputBytes() {
if (fsStats == null) return 0;
long bytesWritten = 0;
for (Statistics stat: fsStats) {
bytesWritten = bytesWritten + stat.getBytesWritten();
}
return bytesWritten;
}
private String getOutputName() {
return "part-" + NUMBER_FORMAT.format(outputContext.getTaskIndex());
}
@Override
public KVWriter getWriter() throws IOException {
return new KVWriter() {
private final boolean useNewWriter = useNewApi;
@SuppressWarnings("unchecked")
@Override
public void write(Object key, Object value) throws IOException {
long bytesOutPrev = getOutputBytes();
if (useNewWriter) {
try {
newRecordWriter.write(key, value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while writing next key-value",e);
}
} else {
oldRecordWriter.write(key, value);
}
long bytesOutCurr = getOutputBytes();
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
outputRecordCounter.increment(1);
}
};
}
@Override
public void handleEvents(List<Event> outputEvents) {
// Not expecting any events at the moment.
}
@Override
public synchronized List<Event> close() throws IOException {
if (closed.getAndSet(true)) {
return null;
}
LOG.info("Closing Simple Output");
long bytesOutPrev = getOutputBytes();
if (useNewApi) {
try {
newRecordWriter.close(newApiTaskAttemptContext);
} catch (InterruptedException e) {
throw new IOException("Interrupted while closing record writer", e);
}
} else {
oldRecordWriter.close(null);
}
long bytesOutCurr = getOutputBytes();
fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
LOG.info("Closed Simple Output");
return null;
}
@Override
public void setNumPhysicalOutputs(int numOutputs) {
// Nothing to do for now
}
/**
* MROutput expects that a Processor call commit prior to the
* Processor's completion
* @throws IOException
*/
public void commit() throws IOException {
close();
if (useNewApi) {
committer.commitTask(newApiTaskAttemptContext);
} else {
committer.commitTask(oldApiTaskAttemptContext);
}
}
/**
* MROutput expects that a Processor call abort in case of any error
* ( including an error during commit ) prior to the Processor's completion
* @throws IOException
*/
public void abort() throws IOException {
close();
if (useNewApi) {
committer.abortTask(newApiTaskAttemptContext);
} else {
committer.abortTask(oldApiTaskAttemptContext);
}
}
}