blob: 7ce3b00ffe6c88324a465545694cf47074662c09 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.hadoop;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IRecordWriter;
public class HadoopWriteOperatorDescriptor extends AbstractFileWriteOperatorDescriptor {
private class HadoopFileWriter implements IRecordWriter {
Object recordWriter;
JobConf conf;
Path finalOutputFile;
Path tempOutputFile;
Path tempDir;
HadoopFileWriter(Object recordWriter, int index, JobConf conf) throws Exception {
this.recordWriter = recordWriter;
this.conf = conf;
initialize(index, conf);
}
private void initialize(int index, JobConf conf) throws Exception {
if (!(conf.getOutputFormat() instanceof NullOutputFormat)) {
boolean isMap = conf.getNumReduceTasks() == 0;
TaskAttemptID taskAttempId = new TaskAttemptID("0", index, isMap, index, index);
conf.set("mapred.task.id", taskAttempId.toString());
String suffix = new String("part-00000");
suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
suffix = suffix + index;
outputPath = new Path(conf.get("mapred.output.dir"));
tempDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fileSys = tempDir.getFileSystem(conf);
if (!fileSys.mkdirs(tempDir)) {
throw new IOException("Mkdirs failed to create " + tempDir.toString());
}
tempOutputFile = new Path(tempDir, new Path("_" + taskAttempId.toString()));
tempOutputFile = new Path(tempOutputFile, suffix);
finalOutputFile = new Path(outputPath, suffix);
if (conf.getUseNewMapper()) {
org.apache.hadoop.mapreduce.JobContext jobContext = new org.apache.hadoop.mapreduce.JobContext(
conf, null);
org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat) ReflectionUtils
.newInstance(jobContext.getOutputFormatClass(), conf);
recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, taskAttempId));
} else {
recordWriter = conf.getOutputFormat().getRecordWriter(FileSystem.get(conf), conf, suffix,
new Progressable() {
@Override
public void progress() {
}
});
}
}
}
@Override
public void write(Object[] record) throws Exception {
if (recordWriter != null) {
if (conf.getUseNewMapper()) {
((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).write(record[0], record[1]);
} else {
((org.apache.hadoop.mapred.RecordWriter) recordWriter).write(record[0], record[1]);
}
}
}
@Override
public void close() {
try {
if (recordWriter != null) {
if (conf.getUseNewMapper()) {
((org.apache.hadoop.mapreduce.RecordWriter) recordWriter).close(new TaskAttemptContext(conf,
new TaskAttemptID()));
} else {
((org.apache.hadoop.mapred.RecordWriter) recordWriter).close(null);
}
if (outputPath != null) {
FileSystem fileSystem = FileSystem.get(conf);
fileSystem.rename(tempOutputFile, finalOutputFile);
fileSystem.delete(tempDir, true);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static final long serialVersionUID = 1L;
Map<String, String> jobConfMap;
@Override
protected IRecordWriter createRecordWriter(FileSplit fileSplit, int index) throws Exception {
JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
conf.setClassLoader(this.getClass().getClassLoader());
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
FileSystem fileSystem = FileSystem.get(conf);
Object recordWriter = null;
return new HadoopFileWriter(recordWriter, index, conf);
}
Path outputPath;
Path outputTempPath;
protected Reporter createReporter() {
return new Reporter() {
@Override
public Counter getCounter(Enum<?> name) {
return null;
}
@Override
public Counter getCounter(String group, String name) {
return null;
}
@Override
public InputSplit getInputSplit() throws UnsupportedOperationException {
return null;
}
@Override
public void incrCounter(Enum<?> key, long amount) {
}
@Override
public void incrCounter(String group, String counter, long amount) {
}
@Override
public void progress() {
}
@Override
public void setStatus(String status) {
}
};
}
private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception {
JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
try {
FileSystem fileSystem = FileSystem.get(conf);
for (FileSplit fileSplit : fileSplits) {
Path path = new Path(fileSplit.getLocalFile().getFile().getPath());
if (fileSystem.exists(path)) {
throw new Exception(" Output path : already exists : " + path);
}
}
} catch (IOException ioe) {
ioe.printStackTrace();
throw ioe;
}
return true;
}
private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) throws ClassNotFoundException {
int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
Object outputFormat = null;
if (conf.getUseNewMapper()) {
outputFormat = ReflectionUtils.newInstance(new org.apache.hadoop.mapreduce.JobContext(conf, null)
.getOutputFormatClass(), conf);
} else {
outputFormat = conf.getOutputFormat();
}
if (outputFormat instanceof NullOutputFormat) {
FileSplit[] outputFileSplits = new FileSplit[numOutputters];
for (int i = 0; i < numOutputters; i++) {
String outputPath = "/tmp/" + System.currentTimeMillis() + i;
outputFileSplits[i] = new FileSplit("localhost", new FileReference(new File(outputPath)));
}
return outputFileSplits;
} else {
FileSplit[] outputFileSplits = new FileSplit[numOutputters];
String absolutePath = FileOutputFormat.getOutputPath(conf).toString();
for (int index = 0; index < numOutputters; index++) {
String suffix = new String("part-00000");
suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
suffix = suffix + index;
String outputPath = absolutePath + "/" + suffix;
outputFileSplits[index] = new FileSplit("localhost", outputPath);
}
return outputFileSplits;
}
}
public HadoopWriteOperatorDescriptor(IOperatorDescriptorRegistry jobSpec, JobConf jobConf, int numMapTasks) throws Exception {
super(jobSpec, getOutputSplits(jobConf, numMapTasks));
this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
checkIfCanWriteToHDFS(super.splits);
}
}