blob: 69f738741eb12ed9929dc503f019f5b5fda5af39 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
/**
* This class provides shims for HBase to interact with the Hadoop 1.0.x and the
* Hadoop 0.23.x series.
*
* NOTE: No testing done against 0.22.x, or 0.21.x.
*/
abstract public class MapreduceTestingShim {
private static MapreduceTestingShim instance;
private static Class[] emptyParam = new Class[] {};
static {
try {
// This class exists in hadoop 0.22+ but not in Hadoop 20.x/1.x
Class c = Class
.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
instance = new MapreduceV2Shim();
} catch (Exception e) {
instance = new MapreduceV1Shim();
}
}
abstract public JobContext newJobContext(Configuration jobConf)
throws IOException;
abstract public Job newJob(Configuration conf) throws IOException;
abstract public JobConf obtainJobConf(MiniMRCluster cluster);
abstract public String obtainMROutputDirProp();
public static JobContext createJobContext(Configuration jobConf)
throws IOException {
return instance.newJobContext(jobConf);
}
public static JobConf getJobConf(MiniMRCluster cluster) {
return instance.obtainJobConf(cluster);
}
public static Job createJob(Configuration conf) throws IOException {
return instance.newJob(conf);
}
public static String getMROutputDirProp() {
return instance.obtainMROutputDirProp();
}
private static class MapreduceV1Shim extends MapreduceTestingShim {
@Override
public JobContext newJobContext(Configuration jobConf) throws IOException {
// Implementing:
// return new JobContext(jobConf, new JobID());
JobID jobId = new JobID();
Constructor<JobContext> c;
try {
c = JobContext.class.getConstructor(Configuration.class, JobID.class);
return c.newInstance(jobConf, jobId);
} catch (Exception e) {
throw new IllegalStateException(
"Failed to instantiate new JobContext(jobConf, new JobID())", e);
}
}
@Override
public Job newJob(Configuration conf) throws IOException {
// Implementing:
// return new Job(conf);
Constructor<Job> c;
try {
c = Job.class.getConstructor(Configuration.class);
return c.newInstance(conf);
} catch (Exception e) {
throw new IllegalStateException(
"Failed to instantiate new Job(conf)", e);
}
}
@Override
public JobConf obtainJobConf(MiniMRCluster cluster) {
if (cluster == null) return null;
try {
Object runner = cluster.getJobTrackerRunner();
Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam);
Object tracker = meth.invoke(runner, new Object []{});
Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam);
return (JobConf) m.invoke(tracker, new Object []{});
} catch (NoSuchMethodException nsme) {
return null;
} catch (InvocationTargetException ite) {
return null;
} catch (IllegalAccessException iae) {
return null;
}
}
@Override
public String obtainMROutputDirProp() {
return "mapred.output.dir";
}
}
private static class MapreduceV2Shim extends MapreduceTestingShim {
@Override
public JobContext newJobContext(Configuration jobConf) {
return newJob(jobConf);
}
@Override
public Job newJob(Configuration jobConf) {
// Implementing:
// return Job.getInstance(jobConf);
try {
Method m = Job.class.getMethod("getInstance", Configuration.class);
return (Job) m.invoke(null, jobConf); // static method, then arg
} catch (Exception e) {
e.printStackTrace();
throw new IllegalStateException(
"Failed to return from Job.getInstance(jobConf)");
}
}
@Override
public JobConf obtainJobConf(MiniMRCluster cluster) {
try {
Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam);
return (JobConf) meth.invoke(cluster, new Object []{});
} catch (NoSuchMethodException nsme) {
return null;
} catch (InvocationTargetException ite) {
return null;
} catch (IllegalAccessException iae) {
return null;
}
}
@Override
public String obtainMROutputDirProp() {
// This is a copy of o.a.h.mapreduce.lib.output.FileOutputFormat.OUTDIR
// from Hadoop 0.23.x. If we use the source directly we break the hadoop 1.x compile.
return "mapreduce.output.fileoutputformat.outputdir";
}
}
}