blob: 1d6369a10e9460826c95a527fd11313dffd05ac4 [file] [log] [blame]
package edu.uci.ics.hyracks.hadoop.compat.util;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hyracks.api.constraints.PartitionConstraintHelper;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.hadoop.HadoopMapperOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.hadoop.HadoopReducerOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.hadoop.HadoopWriteOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopHashTuplePartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopPartitionerTuplePartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.util.ClasspathBasedHadoopClassFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
import edu.uci.ics.hyracks.dataflow.hadoop.util.IHadoopClassFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
public class HadoopAdapter {
public static final String FS_DEFAULT_NAME = "fs.default.name";
private JobConf jobConf;
private Map<OperatorDescriptorId, Integer> operatorInstanceCount = new HashMap<OperatorDescriptorId, Integer>();
public static final String HYRACKS_EX_SORT_FRAME_LIMIT = "HYRACKS_EX_SORT_FRAME_LIMIT";
public static final int DEFAULT_EX_SORT_FRAME_LIMIT = 4096;
public static final int DEFAULT_MAX_MAPPERS = 40;
public static final int DEFAULT_MAX_REDUCERS = 40;
public static final String MAX_MAPPERS_KEY = "maxMappers";
public static final String MAX_REDUCERS_KEY = "maxReducers";
public static final String EX_SORT_FRAME_LIMIT_KEY = "sortFrameLimit";
private int maxMappers = DEFAULT_MAX_MAPPERS;
private int maxReducers = DEFAULT_MAX_REDUCERS;
private int exSortFrame = DEFAULT_EX_SORT_FRAME_LIMIT;
class NewHadoopConstants {
public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.inputformat.class";
public static final String MAP_CLASS_ATTR = "mapreduce.map.class";
public static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
public static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.outputformat.class";
public static final String PARTITIONER_CLASS_ATTR = "mapreduce.partitioner.class";
}
public HadoopAdapter(String namenodeUrl) {
jobConf = new JobConf(true);
jobConf.set(FS_DEFAULT_NAME, namenodeUrl);
if (System.getenv(MAX_MAPPERS_KEY) != null) {
maxMappers = Integer.parseInt(System.getenv(MAX_MAPPERS_KEY));
}
if (System.getenv(MAX_REDUCERS_KEY) != null) {
maxReducers = Integer.parseInt(System.getenv(MAX_REDUCERS_KEY));
}
if (System.getenv(EX_SORT_FRAME_LIMIT_KEY) != null) {
exSortFrame = Integer.parseInt(System
.getenv(EX_SORT_FRAME_LIMIT_KEY));
}
}
private String getEnvironmentVariable(String key, String def) {
String ret = System.getenv(key);
return ret != null ? ret : def;
}
public JobConf getConf() {
return jobConf;
}
public static VersionedProtocol getProtocol(Class protocolClass,
InetSocketAddress inetAddress, JobConf jobConf) throws IOException {
VersionedProtocol versionedProtocol = RPC.getProxy(protocolClass,
ClientProtocol.versionID, inetAddress, jobConf);
return versionedProtocol;
}
private static RecordDescriptor getHadoopRecordDescriptor(
String className1, String className2) {
RecordDescriptor recordDescriptor = null;
try {
recordDescriptor = DatatypeHelper.createKeyValueRecordDescriptor(
(Class<? extends Writable>) Class.forName(className1),
(Class<? extends Writable>) Class.forName(className2));
} catch (ClassNotFoundException cnfe) {
cnfe.printStackTrace();
}
return recordDescriptor;
}
private Object[] getInputSplits(JobConf conf) throws IOException,
ClassNotFoundException, InterruptedException {
if (conf.getUseNewMapper()) {
return getNewInputSplits(conf);
} else {
return getOldInputSplits(conf);
}
}
private org.apache.hadoop.mapreduce.InputSplit[] getNewInputSplits(
JobConf conf) throws ClassNotFoundException, IOException,
InterruptedException {
org.apache.hadoop.mapreduce.InputSplit[] splits = null;
JobContext context = new JobContext(conf, null);
org.apache.hadoop.mapreduce.InputFormat inputFormat = ReflectionUtils
.newInstance(context.getInputFormatClass(), conf);
List<org.apache.hadoop.mapreduce.InputSplit> inputSplits = inputFormat
.getSplits(context);
return inputSplits
.toArray(new org.apache.hadoop.mapreduce.InputSplit[] {});
}
private InputSplit[] getOldInputSplits(JobConf conf) throws IOException {
InputFormat inputFormat = conf.getInputFormat();
return inputFormat.getSplits(conf, conf.getNumMapTasks());
}
private void configurePartitionCountConstraint(JobSpecification spec,
IOperatorDescriptor operator, int instanceCount) {
PartitionConstraintHelper.addPartitionCountConstraint(spec, operator,
instanceCount);
operatorInstanceCount.put(operator.getOperatorId(), instanceCount);
}
public HadoopMapperOperatorDescriptor getMapper(JobConf conf,
JobSpecification spec, IOperatorDescriptor previousOp)
throws Exception {
boolean selfRead = previousOp == null;
IHadoopClassFactory classFactory = new ClasspathBasedHadoopClassFactory();
HadoopMapperOperatorDescriptor mapOp = null;
if (selfRead) {
Object[] splits = getInputSplits(conf, maxMappers);
mapOp = new HadoopMapperOperatorDescriptor(spec, conf, splits,
classFactory);
configurePartitionCountConstraint(spec, mapOp, splits.length);
} else {
configurePartitionCountConstraint(spec, mapOp,
getInstanceCount(previousOp));
mapOp = new HadoopMapperOperatorDescriptor(spec, conf, classFactory);
spec.connect(new OneToOneConnectorDescriptor(spec), previousOp, 0,
mapOp, 0);
}
return mapOp;
}
public HadoopReducerOperatorDescriptor getReducer(JobConf conf,
IOperatorDescriptorRegistry spec, boolean useAsCombiner) {
HadoopReducerOperatorDescriptor reduceOp = new HadoopReducerOperatorDescriptor(
spec, conf, null, new ClasspathBasedHadoopClassFactory(),
useAsCombiner);
return reduceOp;
}
public FileSystem getHDFSClient() {
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(jobConf);
} catch (IOException ioe) {
ioe.printStackTrace();
}
return fileSystem;
}
public JobSpecification getJobSpecification(List<JobConf> jobConfs)
throws Exception {
JobSpecification spec = null;
if (jobConfs.size() == 1) {
spec = getJobSpecification(jobConfs.get(0));
} else {
spec = getPipelinedSpec(jobConfs);
}
return spec;
}
private IOperatorDescriptor configureOutput(
IOperatorDescriptor previousOperator, JobConf conf,
JobSpecification spec) throws Exception {
int instanceCountPreviousOperator = operatorInstanceCount
.get(previousOperator.getOperatorId());
int numOutputters = conf.getNumReduceTasks() != 0 ? conf
.getNumReduceTasks() : instanceCountPreviousOperator;
HadoopWriteOperatorDescriptor writer = null;
writer = new HadoopWriteOperatorDescriptor(spec, conf, numOutputters);
configurePartitionCountConstraint(spec, writer, numOutputters);
spec.connect(new OneToOneConnectorDescriptor(spec), previousOperator,
0, writer, 0);
return writer;
}
private int getInstanceCount(IOperatorDescriptor operator) {
return operatorInstanceCount.get(operator.getOperatorId());
}
private IOperatorDescriptor addCombiner(
IOperatorDescriptor previousOperator, JobConf jobConf,
JobSpecification spec) throws Exception {
boolean useCombiner = (jobConf.getCombinerClass() != null);
IOperatorDescriptor mapSideOutputOp = previousOperator;
if (useCombiner) {
System.out.println("Using Combiner:"
+ jobConf.getCombinerClass().getName());
IOperatorDescriptor mapSideCombineSortOp = getExternalSorter(
jobConf, spec);
configurePartitionCountConstraint(spec, mapSideCombineSortOp,
getInstanceCount(previousOperator));
HadoopReducerOperatorDescriptor mapSideCombineReduceOp = getReducer(
jobConf, spec, true);
configurePartitionCountConstraint(spec, mapSideCombineReduceOp,
getInstanceCount(previousOperator));
spec.connect(new OneToOneConnectorDescriptor(spec),
previousOperator, 0, mapSideCombineSortOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec),
mapSideCombineSortOp, 0, mapSideCombineReduceOp, 0);
mapSideOutputOp = mapSideCombineReduceOp;
}
return mapSideOutputOp;
}
private int getNumReduceTasks(JobConf jobConf) {
int numReduceTasks = Math.min(maxReducers, jobConf.getNumReduceTasks());
return numReduceTasks;
}
private IOperatorDescriptor addReducer(
IOperatorDescriptor previousOperator, JobConf jobConf,
JobSpecification spec) throws Exception {
IOperatorDescriptor mrOutputOperator = previousOperator;
if (jobConf.getNumReduceTasks() != 0) {
IOperatorDescriptor sorter = getExternalSorter(jobConf, spec);
HadoopReducerOperatorDescriptor reducer = getReducer(jobConf, spec,
false);
int numReduceTasks = getNumReduceTasks(jobConf);
configurePartitionCountConstraint(spec, sorter, numReduceTasks);
configurePartitionCountConstraint(spec, reducer, numReduceTasks);
IConnectorDescriptor mToNConnectorDescriptor = getMtoNHashPartitioningConnector(
jobConf, spec);
spec.connect(mToNConnectorDescriptor, previousOperator, 0, sorter,
0);
spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0,
reducer, 0);
mrOutputOperator = reducer;
}
return mrOutputOperator;
}
private long getInputSize(Object[] splits, JobConf conf)
throws IOException, InterruptedException {
long totalInputSize = 0;
if (conf.getUseNewMapper()) {
for (org.apache.hadoop.mapreduce.InputSplit split : (org.apache.hadoop.mapreduce.InputSplit[]) splits) {
totalInputSize += split.getLength();
}
} else {
for (InputSplit split : (InputSplit[]) splits) {
totalInputSize += split.getLength();
}
}
return totalInputSize;
}
private Object[] getInputSplits(JobConf conf, int desiredMaxMappers)
throws Exception {
Object[] splits = getInputSplits(conf);
if (splits.length > desiredMaxMappers) {
long totalInputSize = getInputSize(splits, conf);
long goalSize = (totalInputSize / desiredMaxMappers);
conf.setLong("mapred.min.split.size", goalSize);
conf.setNumMapTasks(desiredMaxMappers);
splits = getInputSplits(conf);
}
return splits;
}
public JobSpecification getPipelinedSpec(List<JobConf> jobConfs)
throws Exception {
JobSpecification spec = new JobSpecification();
Iterator<JobConf> iterator = jobConfs.iterator();
JobConf firstMR = iterator.next();
IOperatorDescriptor mrOutputOp = configureMapReduce(null, spec, firstMR);
while (iterator.hasNext())
for (JobConf currentJobConf : jobConfs) {
mrOutputOp = configureMapReduce(mrOutputOp, spec,
currentJobConf);
}
configureOutput(mrOutputOp, jobConfs.get(jobConfs.size() - 1), spec);
return spec;
}
public JobSpecification getJobSpecification(JobConf conf) throws Exception {
JobSpecification spec = new JobSpecification();
IOperatorDescriptor mrOutput = configureMapReduce(null, spec, conf);
IOperatorDescriptor printer = configureOutput(mrOutput, conf, spec);
spec.addRoot(printer);
System.out.println(spec);
return spec;
}
private IOperatorDescriptor configureMapReduce(
IOperatorDescriptor previousOuputOp, JobSpecification spec,
JobConf conf) throws Exception {
IOperatorDescriptor mapper = getMapper(conf, spec, previousOuputOp);
IOperatorDescriptor mapSideOutputOp = addCombiner(mapper, conf, spec);
IOperatorDescriptor reducer = addReducer(mapSideOutputOp, conf, spec);
return reducer;
}
public static InMemorySortOperatorDescriptor getInMemorySorter(
JobConf conf, IOperatorDescriptorRegistry spec) {
InMemorySortOperatorDescriptor inMemorySortOp = null;
RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf
.getMapOutputKeyClass().getName(), conf
.getMapOutputValueClass().getName());
Class<? extends RawComparator> rawComparatorClass = null;
WritableComparator writableComparator = WritableComparator.get(conf
.getMapOutputKeyClass().asSubclass(WritableComparable.class));
WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
writableComparator.getClass());
inMemorySortOp = new InMemorySortOperatorDescriptor(spec,
new int[] { 0 },
new IBinaryComparatorFactory[] { comparatorFactory },
recordDescriptor);
return inMemorySortOp;
}
public static ExternalSortOperatorDescriptor getExternalSorter(
JobConf conf, IOperatorDescriptorRegistry spec) {
ExternalSortOperatorDescriptor externalSortOp = null;
RecordDescriptor recordDescriptor = getHadoopRecordDescriptor(conf
.getMapOutputKeyClass().getName(), conf
.getMapOutputValueClass().getName());
Class<? extends RawComparator> rawComparatorClass = null;
WritableComparator writableComparator = WritableComparator.get(conf
.getMapOutputKeyClass().asSubclass(WritableComparable.class));
WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(
writableComparator.getClass());
externalSortOp = new ExternalSortOperatorDescriptor(spec, conf.getInt(
HYRACKS_EX_SORT_FRAME_LIMIT, DEFAULT_EX_SORT_FRAME_LIMIT),
new int[] { 0 },
new IBinaryComparatorFactory[] { comparatorFactory },
recordDescriptor);
return externalSortOp;
}
public static MToNPartitioningConnectorDescriptor getMtoNHashPartitioningConnector(
JobConf conf, IConnectorDescriptorRegistry spec) {
Class mapOutputKeyClass = conf.getMapOutputKeyClass();
Class mapOutputValueClass = conf.getMapOutputValueClass();
MToNPartitioningConnectorDescriptor connectorDescriptor = null;
ITuplePartitionComputerFactory factory = null;
conf.getMapOutputKeyClass();
if (conf.getPartitionerClass() != null
&& !conf.getPartitionerClass().getName().startsWith(
"org.apache.hadoop")) {
Class<? extends Partitioner> partitioner = conf
.getPartitionerClass();
factory = new HadoopPartitionerTuplePartitionComputerFactory(
partitioner, DatatypeHelper
.createSerializerDeserializer(mapOutputKeyClass),
DatatypeHelper
.createSerializerDeserializer(mapOutputValueClass));
} else {
RecordDescriptor recordDescriptor = DatatypeHelper
.createKeyValueRecordDescriptor(mapOutputKeyClass,
mapOutputValueClass);
ISerializerDeserializer mapOutputKeySerializerDerserializer = DatatypeHelper
.createSerializerDeserializer(mapOutputKeyClass);
factory = new HadoopHashTuplePartitionComputerFactory(
mapOutputKeySerializerDerserializer);
}
connectorDescriptor = new MToNPartitioningConnectorDescriptor(spec,
factory);
return connectorDescriptor;
}
}