blob: 1b1a85b7af4dadc50ca86219cdb6837126ffc30f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import org.apache.hadoop.conf.Configuration;
/**
* A factory to allow applications to deal with inconsistencies between
* MapReduce Context Objects API between hadoop-0.20 and later versions.
*/
public class ContextFactory {
private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR;
private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR;
private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR;
private static final Constructor<?> MAP_CONTEXT_IMPL_CONSTRUCTOR;
private static final boolean useV21;
private static final Field REPORTER_FIELD;
private static final Field READER_FIELD;
private static final Field WRITER_FIELD;
private static final Field OUTER_MAP_FIELD;
private static final Field WRAPPED_CONTEXT_FIELD;
static {
boolean v21 = true;
final String PACKAGE = "org.apache.hadoop.mapreduce";
try {
Class.forName(PACKAGE + ".task.JobContextImpl");
} catch (ClassNotFoundException cnfe) {
v21 = false;
}
useV21 = v21;
Class<?> jobContextCls;
Class<?> taskContextCls;
Class<?> taskIOContextCls;
Class<?> mapCls;
Class<?> mapContextCls;
Class<?> innerMapContextCls;
try {
if (v21) {
jobContextCls =
Class.forName(PACKAGE+".task.JobContextImpl");
taskContextCls =
Class.forName(PACKAGE+".task.TaskAttemptContextImpl");
taskIOContextCls =
Class.forName(PACKAGE+".task.TaskInputOutputContextImpl");
mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl");
mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper");
innerMapContextCls =
Class.forName(PACKAGE+".lib.map.WrappedMapper$Context");
} else {
jobContextCls =
Class.forName(PACKAGE+".JobContext");
taskContextCls =
Class.forName(PACKAGE+".TaskAttemptContext");
taskIOContextCls =
Class.forName(PACKAGE+".TaskInputOutputContext");
mapContextCls = Class.forName(PACKAGE + ".MapContext");
mapCls = Class.forName(PACKAGE + ".Mapper");
innerMapContextCls =
Class.forName(PACKAGE+".Mapper$Context");
}
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Can't find class", e);
}
try {
JOB_CONTEXT_CONSTRUCTOR =
jobContextCls.getConstructor(Configuration.class, JobID.class);
JOB_CONTEXT_CONSTRUCTOR.setAccessible(true);
TASK_CONTEXT_CONSTRUCTOR =
taskContextCls.getConstructor(Configuration.class,
TaskAttemptID.class);
TASK_CONTEXT_CONSTRUCTOR.setAccessible(true);
if (useV21) {
MAP_CONTEXT_CONSTRUCTOR =
innerMapContextCls.getConstructor(mapCls,
MapContext.class);
MAP_CONTEXT_IMPL_CONSTRUCTOR =
mapContextCls.getDeclaredConstructor(Configuration.class,
TaskAttemptID.class,
RecordReader.class,
RecordWriter.class,
OutputCommitter.class,
StatusReporter.class,
InputSplit.class);
MAP_CONTEXT_IMPL_CONSTRUCTOR.setAccessible(true);
WRAPPED_CONTEXT_FIELD =
innerMapContextCls.getDeclaredField("mapContext");
WRAPPED_CONTEXT_FIELD.setAccessible(true);
} else {
MAP_CONTEXT_CONSTRUCTOR =
innerMapContextCls.getConstructor(mapCls,
Configuration.class,
TaskAttemptID.class,
RecordReader.class,
RecordWriter.class,
OutputCommitter.class,
StatusReporter.class,
InputSplit.class);
MAP_CONTEXT_IMPL_CONSTRUCTOR = null;
WRAPPED_CONTEXT_FIELD = null;
}
MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
REPORTER_FIELD = taskIOContextCls.getDeclaredField("reporter");
REPORTER_FIELD.setAccessible(true);
READER_FIELD = mapContextCls.getDeclaredField("reader");
READER_FIELD.setAccessible(true);
WRITER_FIELD = taskIOContextCls.getDeclaredField("output");
WRITER_FIELD.setAccessible(true);
OUTER_MAP_FIELD = innerMapContextCls.getDeclaredField("this$0");
OUTER_MAP_FIELD.setAccessible(true);
} catch (SecurityException e) {
throw new IllegalArgumentException("Can't run constructor ", e);
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Can't find constructor ", e);
} catch (NoSuchFieldException e) {
throw new IllegalArgumentException("Can't find field ", e);
}
}
/**
* Clone a job or task attempt context with a new configuration.
* @param original the original context
* @param conf the new configuration
* @return a new context object
* @throws InterruptedException
* @throws IOException
*/
@SuppressWarnings("unchecked")
public static JobContext cloneContext(JobContext original,
Configuration conf
) throws IOException,
InterruptedException {
try {
if (original instanceof MapContext<?,?,?,?>) {
return cloneMapContext((Mapper.Context) original, conf, null, null);
} else if (original instanceof ReduceContext<?,?,?,?>) {
throw new IllegalArgumentException("can't clone ReduceContext");
} else if (original instanceof TaskAttemptContext) {
TaskAttemptContext spec = (TaskAttemptContext) original;
return (JobContext)
TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, spec.getTaskAttemptID());
} else {
return (JobContext)
JOB_CONTEXT_CONSTRUCTOR.newInstance(conf, original.getJobID());
}
} catch (InstantiationException e) {
throw new IllegalArgumentException("Can't clone object", e);
} catch (IllegalAccessException e) {
throw new IllegalArgumentException("Can't clone object", e);
} catch (InvocationTargetException e) {
throw new IllegalArgumentException("Can't clone object", e);
}
}
/**
* Copy a mapper context, optionally replacing the input and output.
* @param <K1> input key type
* @param <V1> input value type
* @param <K2> output key type
* @param <V2> output value type
* @param context the context to clone
* @param conf a new configuration
* @param reader Reader to read from. Null means to clone from context.
* @param writer Writer to write to. Null means to clone from context.
* @return a new context. it will not be the same class as the original.
* @throws IOException
* @throws InterruptedException
*/
@SuppressWarnings("unchecked")
public static <K1,V1,K2,V2> Mapper<K1,V1,K2,V2>.Context
cloneMapContext(MapContext<K1,V1,K2,V2> context,
Configuration conf,
RecordReader<K1,V1> reader,
RecordWriter<K2,V2> writer
) throws IOException, InterruptedException {
try {
// get the outer object pointer
Object outer = OUTER_MAP_FIELD.get(context);
// if it is a wrapped 21 context, unwrap it
if ("org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context".equals
(context.getClass().getName())) {
context = (MapContext<K1,V1,K2,V2>) WRAPPED_CONTEXT_FIELD.get(context);
}
// if the reader or writer aren't given, use the same ones
if (reader == null) {
reader = (RecordReader<K1,V1>) READER_FIELD.get(context);
}
if (writer == null) {
writer = (RecordWriter<K2,V2>) WRITER_FIELD.get(context);
}
if (useV21) {
Object basis =
MAP_CONTEXT_IMPL_CONSTRUCTOR.newInstance(conf,
context.getTaskAttemptID(),
reader, writer,
context.getOutputCommitter(),
REPORTER_FIELD.get(context),
context.getInputSplit());
return (Mapper.Context)
MAP_CONTEXT_CONSTRUCTOR.newInstance(outer, basis);
} else {
return (Mapper.Context)
MAP_CONTEXT_CONSTRUCTOR.newInstance(outer,
conf, context.getTaskAttemptID(),
reader, writer,
context.getOutputCommitter(),
REPORTER_FIELD.get(context),
context.getInputSplit());
}
} catch (IllegalAccessException e) {
throw new IllegalArgumentException("Can't access field", e);
} catch (InstantiationException e) {
throw new IllegalArgumentException("Can't create object", e);
} catch (InvocationTargetException e) {
throw new IllegalArgumentException("Can't invoke constructor", e);
}
}
}