blob: 95a46477b919370fb838c1c3f44ef27e16c918fe [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.hadoop;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
import edu.uci.ics.hyracks.dataflow.std.file.AbstractFileWriteOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IRecordWriter;
public class HadoopWriteOperatorDescriptor extends AbstractFileWriteOperatorDescriptor {
private class HadoopFileWriter implements IRecordWriter {
Object recordWriter;
JobConf conf;
Path finalOutputFile;
Path tempOutputFile;
HadoopFileWriter(Object recordWriter,Path tempOutputFile,Path outputFile,JobConf conf) {
this.recordWriter = recordWriter;
this.conf = conf;
this.finalOutputFile = outputFile;
this.tempOutputFile = tempOutputFile;
}
@Override
public void write(Object[] record) throws Exception {
if (conf.getUseNewMapper()){
((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).write(record[0], record[1]);
} else {
((org.apache.hadoop.mapred.RecordWriter)recordWriter).write(record[0], record[1]);
}
}
@Override
public void close() {
try {
if (conf.getUseNewMapper()){
((org.apache.hadoop.mapreduce.RecordWriter)recordWriter).close(new TaskAttemptContext(conf, new TaskAttemptID()));
} else {
((org.apache.hadoop.mapred.RecordWriter)recordWriter).close(null);
}
FileSystem.get(conf).rename( tempOutputFile, finalOutputFile);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static class HadoopSequenceWriter implements IRecordWriter {
private Writer writer;
HadoopSequenceWriter(Writer writer) throws Exception {
this.writer = writer;
}
@Override
public void close() {
try {
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Object[] record) throws Exception {
Object key = record[0];
Object value = record[1];
writer.append(key, value);
}
}
private static final long serialVersionUID = 1L;
Map<String, String> jobConfMap;
@Override
protected IRecordWriter createRecordWriter(FileSplit fileSplit, int index) throws Exception {
JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
conf.setClassLoader(this.getClass().getClassLoader());
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(conf);
} catch (IOException ioe) {
ioe.printStackTrace();
}
Path path = new Path(fileSplit.getPath());
Path tempOutputFile = null;
Path finalOutputFile = null;
checkIfCanWriteToHDFS(new FileSplit[] { fileSplit });
Object recordWriter = null;
Object outputFormat = null;
String taskAttempId = new TaskAttemptID().toString();
conf.set("mapred.task.id",taskAttempId);
outputPath = new Path(conf.get("mapred.output.dir"));
outputTempPath = new Path(outputPath,"_temporary");
if(outputPath != null && !fileSystem.exists(outputPath)) {
fileSystem.mkdirs(outputTempPath);
}
String suffix = new String("part-r-00000");
suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
suffix = suffix + index;
tempOutputFile = new Path(outputTempPath,"_" + taskAttempId + "/" + suffix);
if (conf.getNumReduceTasks() == 0 ) {
suffix.replace("-r-", "-m-");
}
finalOutputFile = new Path(outputPath,suffix);
if(conf.getUseNewMapper()){
org.apache.hadoop.mapreduce.OutputFormat newOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat)ReflectionUtils.newInstance((new JobContext(conf,null)).getOutputFormatClass(),conf);
recordWriter = newOutputFormat.getRecordWriter(new TaskAttemptContext(conf, new TaskAttemptID()));
}else {
recordWriter = conf.getOutputFormat().getRecordWriter(fileSystem, conf,suffix, new Progressable() {
@Override
public void progress() {}
});
}
return new HadoopFileWriter(recordWriter, tempOutputFile, finalOutputFile, conf);
}
Path outputPath;
Path outputTempPath;
protected Reporter createReporter() {
return new Reporter() {
@Override
public Counter getCounter(Enum<?> name) {
return null;
}
@Override
public Counter getCounter(String group, String name) {
return null;
}
@Override
public InputSplit getInputSplit() throws UnsupportedOperationException {
return null;
}
@Override
public void incrCounter(Enum<?> key, long amount) {
}
@Override
public void incrCounter(String group, String counter, long amount) {
}
@Override
public void progress() {
}
@Override
public void setStatus(String status) {
}
};
}
private boolean checkIfCanWriteToHDFS(FileSplit[] fileSplits) throws Exception {
JobConf conf = DatatypeHelper.map2JobConf((HashMap) jobConfMap);
try {
FileSystem fileSystem = FileSystem.get(conf);
for (FileSplit fileSplit : fileSplits) {
Path path = new Path(fileSplit.getPath());
if (fileSystem.exists(path)) {
throw new Exception(" Output path : already exists : " + path);
}
}
} catch (IOException ioe) {
ioe.printStackTrace();
throw ioe;
}
return true;
}
private static FileSplit[] getOutputSplits(JobConf conf, int noOfMappers) throws ClassNotFoundException {
int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfMappers;
Object outputFormat = null;
if(conf.getUseNewMapper()) {
outputFormat = ReflectionUtils.newInstance(new JobContext(conf,null).getOutputFormatClass(), conf);
} else {
outputFormat = conf.getOutputFormat();
}
if (outputFormat instanceof NullOutputFormat) {
FileSplit[] outputFileSplits = new FileSplit[numOutputters];
for (int i = 0; i < numOutputters; i++) {
String outputPath = "/tmp/" + System.currentTimeMillis() + i;
outputFileSplits[i] = new FileSplit("localhost", new File(outputPath));
}
return outputFileSplits;
} else {
FileSplit[] outputFileSplits = new FileSplit[numOutputters];
String absolutePath = FileOutputFormat.getOutputPath(conf).toString();
System.out.println("absolute path:" + absolutePath);
for (int index = 0; index < numOutputters; index++) {
String suffix = new String("part-00000");
suffix = new String(suffix.substring(0, suffix.length() - ("" + index).length()));
suffix = suffix + index;
String outputPath = absolutePath + "/" + suffix;
System.out.println("output path :" + outputPath);
outputFileSplits[index] = new FileSplit("localhost", outputPath);
}
return outputFileSplits;
}
}
public HadoopWriteOperatorDescriptor(JobSpecification jobSpec, JobConf jobConf, int numMapTasks) throws Exception {
super(jobSpec, getOutputSplits(jobConf, numMapTasks));
this.jobConfMap = DatatypeHelper.jobConf2Map(jobConf);
checkIfCanWriteToHDFS(super.splits);
}
}