blob: e8b6e8b3e30290f32ac9f8af6f77a42d54276e60 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.dataflow.hadoop.mapreduce;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.util.ReflectionUtils;
import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.dataflow.hadoop.data.HadoopNewPartitionerTuplePartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.data.WritableComparingBinaryComparatorFactory;
import edu.uci.ics.hyracks.dataflow.hadoop.util.DatatypeHelper;
public class HadoopHelper {
public static final int KEY_FIELD_INDEX = 0;
public static final int VALUE_FIELD_INDEX = 1;
public static final int BLOCKID_FIELD_INDEX = 2;
private static final int[] KEY_SORT_FIELDS = new int[] { 0 };
private MarshalledWritable<Configuration> mConfig;
private Configuration config;
private Job job;
public HadoopHelper(MarshalledWritable<Configuration> mConfig) throws HyracksDataException {
this.mConfig = mConfig;
ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
config = mConfig.get();
config.setClassLoader(getClass().getClassLoader());
job = new Job(config);
} catch (Exception e) {
throw new HyracksDataException(e);
} finally {
Thread.currentThread().setContextClassLoader(ctxCL);
}
}
public RecordDescriptor getMapOutputRecordDescriptor() throws HyracksDataException {
try {
return new RecordDescriptor(
new ISerializerDeserializer[] {
DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
.getMapOutputKeyClass()),
DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
.getMapOutputValueClass()), IntegerSerializerDeserializer.INSTANCE });
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
public RecordDescriptor getMapOutputRecordDescriptorWithoutExtraFields() throws HyracksDataException {
try {
return new RecordDescriptor(
new ISerializerDeserializer[] {
DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
.getMapOutputKeyClass()),
DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
.getMapOutputValueClass()) });
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
public TaskAttemptContext createTaskAttemptContext(TaskAttemptID taId) {
ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(config.getClassLoader());
return new TaskAttemptContext(config, taId);
} finally {
Thread.currentThread().setContextClassLoader(ctxCL);
}
}
public JobContext createJobContext() {
ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(config.getClassLoader());
return new JobContext(config, null);
} finally {
Thread.currentThread().setContextClassLoader(ctxCL);
}
}
public <K1, V1, K2, V2> Mapper<K1, V1, K2, V2> getMapper() throws HyracksDataException {
try {
return (Mapper<K1, V1, K2, V2>) HadoopTools.newInstance(job.getMapperClass());
} catch (ClassNotFoundException e) {
throw new HyracksDataException(e);
} catch (InstantiationException e) {
throw new HyracksDataException(e);
} catch (IllegalAccessException e) {
throw new HyracksDataException(e);
}
}
public <K2, V2, K3, V3> Reducer<K2, V2, K3, V3> getReducer() throws HyracksDataException {
try {
return (Reducer<K2, V2, K3, V3>) HadoopTools.newInstance(job.getReducerClass());
} catch (ClassNotFoundException e) {
throw new HyracksDataException(e);
} catch (InstantiationException e) {
throw new HyracksDataException(e);
} catch (IllegalAccessException e) {
throw new HyracksDataException(e);
}
}
public <K2, V2> Reducer<K2, V2, K2, V2> getCombiner() throws HyracksDataException {
try {
return (Reducer<K2, V2, K2, V2>) HadoopTools.newInstance(job.getCombinerClass());
} catch (ClassNotFoundException e) {
throw new HyracksDataException(e);
} catch (InstantiationException e) {
throw new HyracksDataException(e);
} catch (IllegalAccessException e) {
throw new HyracksDataException(e);
}
}
public <K, V> InputFormat<K, V> getInputFormat() throws HyracksDataException {
try {
return (InputFormat<K, V>) ReflectionUtils.newInstance(job.getInputFormatClass(), config);
} catch (ClassNotFoundException e) {
throw new HyracksDataException(e);
}
}
public <K, V> List<InputSplit> getInputSplits() throws HyracksDataException {
ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
InputFormat<K, V> fmt = getInputFormat();
JobContext jCtx = new JobContext(config, null);
try {
return fmt.getSplits(jCtx);
} catch (IOException e) {
throw new HyracksDataException(e);
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
} finally {
Thread.currentThread().setContextClassLoader(ctxCL);
}
}
public IBinaryComparatorFactory[] getSortComparatorFactories() {
WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
.getSortComparator().getClass());
return new IBinaryComparatorFactory[] { comparatorFactory };
}
public IBinaryComparatorFactory[] getGroupingComparatorFactories() {
WritableComparingBinaryComparatorFactory comparatorFactory = new WritableComparingBinaryComparatorFactory(job
.getGroupingComparator().getClass());
return new IBinaryComparatorFactory[] { comparatorFactory };
}
public RawComparator<?> getRawGroupingComparator() {
return job.getGroupingComparator();
}
public int getSortFrameLimit(IHyracksCommonContext ctx) {
int sortMemory = job.getConfiguration().getInt("io.sort.mb", 100);
return (int) (((long) sortMemory * 1024 * 1024) / ctx.getFrameSize());
}
public Job getJob() {
return job;
}
public MarshalledWritable<Configuration> getMarshalledConfiguration() {
return mConfig;
}
public Configuration getConfiguration() {
return config;
}
public ITuplePartitionComputerFactory getTuplePartitionComputer() throws HyracksDataException {
int nReducers = job.getNumReduceTasks();
try {
return new HadoopNewPartitionerTuplePartitionComputerFactory<Writable, Writable>(
(Class<? extends Partitioner<Writable, Writable>>) job.getPartitionerClass(),
(ISerializerDeserializer<Writable>) DatatypeHelper
.createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputKeyClass()),
(ISerializerDeserializer<Writable>) DatatypeHelper
.createSerializerDeserializer((Class<? extends Writable>) job.getMapOutputValueClass()));
} catch (ClassNotFoundException e) {
throw new HyracksDataException(e);
}
}
public int[] getSortFields() {
return KEY_SORT_FIELDS;
}
public <K> ISerializerDeserializer<K> getMapOutputKeySerializerDeserializer() {
return (ISerializerDeserializer<K>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
.getMapOutputKeyClass());
}
public <V> ISerializerDeserializer<V> getMapOutputValueSerializerDeserializer() {
return (ISerializerDeserializer<V>) DatatypeHelper.createSerializerDeserializer((Class<? extends Writable>) job
.getMapOutputValueClass());
}
public FileSystem getFilesystem() throws HyracksDataException {
try {
return FileSystem.get(config);
} catch (IOException e) {
throw new HyracksDataException(e);
}
}
public <K, V> OutputFormat<K, V> getOutputFormat() throws HyracksDataException {
try {
return (OutputFormat<K, V>) ReflectionUtils.newInstance(job.getOutputFormatClass(), config);
} catch (ClassNotFoundException e) {
throw new HyracksDataException(e);
}
}
public boolean hasCombiner() throws HyracksDataException {
try {
return job.getCombinerClass() != null;
} catch (ClassNotFoundException e) {
throw new HyracksDataException(e);
}
}
}