blob: 7be5938661a15130f0129fceb82f944950ced7be [file] [log] [blame]
package edu.uci.ics.hyracks.hadoop.compat.util;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hyracks.api.constraints.ExplicitPartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraint;
import edu.uci.ics.hyracks.api.constraints.PartitionCountConstraint;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.hadoop.HadoopMapperOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.hadoop.HadoopReducerOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.hadoop.HadoopWriteOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopHashTuplePartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopPartitionerTuplePartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.util.ClasspathBasedHadoopClassFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNHashPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
public class HadoopAdapter {
public static final String FS_DEFAULT_NAME = "fs.default.name";
private JobConf jobConf;
public static final String HYRACKS_EX_SORT_FRAME_LIMIT = "HYRACKS_EX_SORT_FRAME_LIMIT";
public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096;
public static final int DEFAULT_MAX_MAPPERS = 40;
public static final int DEFAULT_MAX_REDUCERS= 40;
public static final String MAX_MAPPERS_KEY = "maxMappers";
public static final String MAX_REDUCERS_KEY = "maxReducers";
public static final String EX_SORT_FRAME_LIMIT_KEY = "sortFrameLimit";
private int maxMappers = DEFAULT_MAX_MAPPERS;
private int maxReducers = DEFAULT_MAX_REDUCERS;
private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT;
class NewHadoopConstants {
public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.inputformat.class";
public static final String MAP_CLASS_ATTR = "mapreduce.map.class";
public static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
public static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.outputformat.class";
public static final String PARTITIONER_CLASS_ATTR = "mapreduce.partitioner.class";
}
public HadoopAdapter(String namenodeUrl) {
jobConf = new JobConf(true);
jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
if(System.getenv(MAX_MAPPERS_KEY) != null) {
maxMappers = Integer.parseInt(System.getenv(MAX_MAPPERS_KEY));
}
if(System.getenv(MAX_REDUCERS_KEY) != null) {
maxReducers= Integer.parseInt(System.getenv(MAX_REDUCERS_KEY));
}
if(System.getenv(EX_SORT_FRAME_LIMIT_KEY) != null) {
exSortFrame= Integer.parseInt(System.getenv(EX_SORT_FRAME_LIMIT_KEY));
}
}
private String getEnvironmentVariable(String key, String def) {
String ret = System.getenv(key);
return ret != null ? ret : def;
}
public JobConf getConf() {
return jobConf;
}
public static VersionedProtocol getProtocol(Class protocolClass, InetSocketAddress inetAddress, JobConf jobConf)
throws IOException {
VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass, ClientProtocol.versionID, inetAddress,
jobConf);
return versionedProtocol;
}
private static RecordDescriptor getHadoopRecordDescriptor(String className1, String className2) {
RecordDescriptor recordDescriptor = null;
try {
recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor((Class<? extends Writable>) Class
.forName(className1), (Class<? extends Writable>) Class.forName(className2));
} catch (ClassNotFoundException cnfe) {
cnfe.printStackTrace();
}
return recordDescriptor;
}
private Object[] getInputSplits(JobConf conf) throws IOException, ClassNotFoundException, InterruptedException {
if (conf.getUseNewMapper()) {
return getNewInputSplits(conf);
} else {
return getOldInputSplits(conf);
}
}
private org.apache.hadoop.mapreduce.InputSplit[] getNewInputSplits(JobConf conf) throws ClassNotFoundException, IOException, InterruptedException {
org.apache.hadoop.mapreduce.InputSplit[] splits = null;
JobContext context = new JobContext(conf,null);
org.apache.hadoop.mapreduce.InputFormat inputFormat = ReflectionUtils.newInstance(context.getInputFormatClass(),conf);
List<org.apache.hadoop.mapreduce.InputSplit> inputSplits = inputFormat.getSplits(context);
return inputSplits.toArray(new org.apache.hadoop.mapreduce.InputSplit[]{});
}
private InputSplit[] getOldInputSplits(JobConf conf) throws IOException {
InputFormat inputFormat = conf.getInputFormat();
return inputFormat.getSplits(conf, conf.getNumMapTasks());
}
public HadoopMapperOperatorDescriptor getMapper(JobConf conf,JobSpecification spec, IOperatorDescriptor previousOp)
throws Exception {
boolean selfRead = previousOp == null;
IHadoopClassFactory classFactory = new ClasspathBasedHadoopClassFactory();
HadoopMapperOperatorDescriptor mapOp = null;
PartitionConstraint constraint;
if(selfRead) {
Object [] splits = getInputSplits(conf,maxMappers);
mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,classFactory);
mapOp.setPartitionConstraint(new PartitionCountConstraint(splits.length));
System.out.println("No of mappers :" + splits.length);
} else {
constraint = previousOp.getPartitionConstraint();
mapOp.setPartitionConstraint(constraint);
mapOp = new HadoopMapperOperatorDescriptor(spec,conf,classFactory);
}
return mapOp;
}
public HadoopReducerOperatorDescriptor getReducer(JobConf conf, JobSpecification spec) {
HadoopReducerOperatorDescriptor reduceOp = new HadoopReducerOperatorDescriptor(spec, conf, null,
new ClasspathBasedHadoopClassFactory());
return reduceOp;
}
public FileSystem getHDFSClient() {
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(jobConf);
} catch (IOException ioe) {
ioe.printStackTrace();
}
return fileSystem;
}
public JobSpecification getJobSpecification(List<JobConf> jobConfs) throws Exception {
JobSpecification spec = null;
if (jobConfs.size() == 1) {
spec = getJobSpecification(jobConfs.get(0));
} else {
spec = getPipelinedSpec(jobConfs);
}
return spec;
}
private IOperatorDescriptor configureOutput(IOperatorDescriptor previousOperator, JobConf conf,
JobSpecification spec) throws Exception {
PartitionConstraint previousOpConstraint = previousOperator.getPartitionConstraint();
int noOfInputs = previousOpConstraint instanceof PartitionCountConstraint ? ((PartitionCountConstraint) previousOpConstraint)
.getCount()
: ((ExplicitPartitionConstraint) previousOpConstraint).getLocationConstraints().length;
int numOutputters = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : noOfInputs;
HadoopWriteOperatorDescriptor writer = null;
writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
writer.setPartitionConstraint(previousOperator.getPartitionConstraint());
spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, writer, 0);
return writer;
}
private IOperatorDescriptor addCombiner(IOperatorDescriptor previousOperator, JobConf jobConf,
JobSpecification spec) throws Exception {
boolean useCombiner = (jobConf.getCombinerClass() != null);
IOperatorDescriptor mapSideOutputOp = previousOperator;
if (useCombiner) {
System.out.println("Using Combiner:" + jobConf.getCombinerClass().getName());
PartitionConstraint mapperPartitionConstraint = previousOperator.getPartitionConstraint();
IOperatorDescriptor mapSideCombineSortOp = getExternalSorter(jobConf, spec);
mapSideCombineSortOp.setPartitionConstraint(mapperPartitionConstraint);
HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(jobConf, spec);
mapSideCombineReduceOp.setPartitionConstraint(mapperPartitionConstraint);
spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator, 0, mapSideCombineSortOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
mapSideOutputOp = mapSideCombineSortOp;
}
return mapSideOutputOp;
}
private IOperatorDescriptor addReducer(IOperatorDescriptor previousOperator, JobConf jobConf,
JobSpecification spec) throws Exception {
IOperatorDescriptor mrOutputOperator = previousOperator;
if (jobConf.getNumReduceTasks() != 0) {
IOperatorDescriptor sorter = getExternalSorter(jobConf, spec);
HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec);
int numReduceTasks = Math.min(maxReducers,jobConf.getNumReduceTasks());
System.out.println("No of Reducers :" + numReduceTasks);
PartitionConstraint reducerPartitionConstraint = new PartitionCountConstraint(numReduceTasks);
sorter.setPartitionConstraint(reducerPartitionConstraint);
reducer.setPartitionConstraint(reducerPartitionConstraint);
IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(jobConf, spec);
spec.connect(mToNConnectorDescriptor, previousOperator, 0, sorter, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, reducer, 0);
mrOutputOperator = reducer;
}
return mrOutputOperator;
}
private long getInputSize(Object[] splits,JobConf conf) throws IOException, InterruptedException {
long totalInputSize =0;
if(conf.getUseNewMapper()) {
for (org.apache.hadoop.mapreduce.InputSplit split : (org.apache.hadoop.mapreduce.InputSplit[])splits) {
totalInputSize += split.getLength();
}
} else {
for (InputSplit split : (InputSplit[])splits) {
totalInputSize += split.getLength();
}
}
return totalInputSize;
}
private Object[] getInputSplits(JobConf conf, int desiredMaxMappers) throws Exception {
Object[] splits = getInputSplits(conf);
System.out.println(" initial split count :" + splits.length);
System.out.println(" desired mappers :" + desiredMaxMappers);
if (splits.length > desiredMaxMappers) {
long totalInputSize = getInputSize(splits,conf);
long goalSize = (totalInputSize/desiredMaxMappers);
System.out.println(" total input length :" + totalInputSize);
System.out.println(" goal size :" + goalSize);
conf.setLong("mapred.min.split.size", goalSize);
conf.setNumMapTasks(desiredMaxMappers);
splits = getInputSplits(conf);
System.out.println(" revised split count :" + splits.length);
}
return splits;
}
public JobSpecification getPipelinedSpec(List<JobConf> jobConfs) throws Exception {
JobSpecification spec = new JobSpecification();
Iterator<JobConf> iterator = jobConfs.iterator();
JobConf firstMR = iterator.next();
IOperatorDescriptor mrOutputOp = configureMapReduce(null, spec,firstMR);
while (iterator.hasNext())
for (JobConf currentJobConf : jobConfs) {
mrOutputOp = configureMapReduce(mrOutputOp, spec , currentJobConf);
}
configureOutput(mrOutputOp, jobConfs.get(jobConfs.size() - 1), spec);
return spec;
}
public JobSpecification getJobSpecification(JobConf conf) throws Exception {
JobSpecification spec = new JobSpecification();
IOperatorDescriptor mrOutput = configureMapReduce(null,spec, conf);
IOperatorDescriptor printer = configureOutput(mrOutput, conf, spec);
spec.addRoot(printer);
System.out.println(spec);
return spec;
}
private IOperatorDescriptor configureMapReduce(IOperatorDescriptor previousOuputOp, JobSpecification spec, JobConf conf) throws Exception {
IOperatorDescriptor mapper = getMapper(conf,spec,previousOuputOp);
IOperatorDescriptor mapSideOutputOp = addCombiner(mapper,conf,spec);
IOperatorDescriptor reducer = addReducer(mapSideOutputOp, conf, spec);
return reducer;
}
public static InMemorySortOperatorDescriptor getInMemorySorter(JobConf conf, JobSpecification spec) {
InMemorySortOperatorDescriptor inMemorySortOp = null;
RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf.getMapOutputKeyClass().getName(), conf
.getMapOutputValueClass().getName());
Class<? extends RawComparator> rawComparatorClass = null;
WritableComparator writableComparator = WritableComparator.get(conf.getMapOutputKeyClass().asSubclass(
WritableComparable.class));
WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
writableComparator.getClass());
inMemorySortOp = new InMemorySortOperatorDescriptor(spec, new int[] { 0 },
new IBinaryComparatorFactory[] { comparatorFactory }, recordDescriptor);
return inMemorySortOp;
}
public static ExternalSortOperatorDescriptor getExternalSorter(JobConf conf, JobSpecification spec) {
ExternalSortOperatorDescriptor externalSortOp = null;
RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf.getMapOutputKeyClass().getName(), conf
.getMapOutputValueClass().getName());
Class<? extends RawComparator> rawComparatorClass = null;
WritableComparator writableComparator = WritableComparator.get(conf.getMapOutputKeyClass().asSubclass(
WritableComparable.class));
WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
writableComparator.getClass());
externalSortOp = new ExternalSortOperatorDescriptor(spec,conf.getInt(HYRACKS_EX_SORT_FRAME_LIMIT,DEFAULT_EX_SORT_FRAME_LIMIT),new int[] { 0 },
new IBinaryComparatorFactory[] { comparatorFactory }, recordDescriptor);
return externalSortOp;
}
public static MToNHashPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(JobConf conf,
JobSpecification spec) {
Class mapOutputKeyClass = conf.getMapOutputKeyClass();
Class mapOutputValueClass = conf.getMapOutputValueClass();
MToNHashPartitioningConnectorDescriptor connectorDescriptor = null;
ITuplePartitionComputerFactory factory = null;
conf.getMapOutputKeyClass();
if (conf.getPartitionerClass() != null && !conf.getPartitionerClass().getName().startsWith("org.apache.hadoop")) {
Class<? extends Partitioner> partitioner = conf.getPartitionerClass();
factory = new HadoopPartitionerTuplePartitionComputerFactory(partitioner, DatatypeHelper
.createSerializerDeserializer(mapOutputKeyClass), DatatypeHelper
.createSerializerDeserializer(mapOutputValueClass));
} else {
RecordDescriptor recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(mapOutputKeyClass,
mapOutputValueClass);
ISerializerDeserializer mapOutputKeySerializerDerserializer = DatatypeHelper
.createSerializerDeserializer(mapOutputKeyClass);
factory = new HadoopHashTuplePartitionComputerFactory(mapOutputKeySerializerDerserializer);
}
connectorDescriptor = new MToNHashPartitioningConnectorDescriptor(spec, factory);
return connectorDescriptor;
}
}