blob: 40d6f35625c4298708bce035aa5b0e41858b7b79 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.input;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.ReflectionUtils;
/**
* This class supports MapReduce jobs that have multiple input paths with
* a different {@link InputFormat} and {@link Mapper} for each path
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MultipleInputs {
public static final String DIR_FORMATS =
"mapreduce.input.multipleinputs.dir.formats";
public static final String DIR_MAPPERS =
"mapreduce.input.multipleinputs.dir.mappers";
/**
* Add a {@link Path} with a custom {@link InputFormat} to the list of
* inputs for the map-reduce job.
*
* @param job The {@link Job}
* @param path {@link Path} to be added to the list of inputs for the job
* @param inputFormatClass {@link InputFormat} class to use for this path
*/
@SuppressWarnings("unchecked")
public static void addInputPath(Job job, Path path,
Class<? extends InputFormat> inputFormatClass) {
String inputFormatMapping = path.toString() + ";"
+ inputFormatClass.getName();
Configuration conf = job.getConfiguration();
String inputFormats = conf.get(DIR_FORMATS);
conf.set(DIR_FORMATS,
inputFormats == null ? inputFormatMapping : inputFormats + ","
+ inputFormatMapping);
job.setInputFormatClass(DelegatingInputFormat.class);
}
/**
* Add a {@link Path} with a custom {@link InputFormat} and
* {@link Mapper} to the list of inputs for the map-reduce job.
*
* @param job The {@link Job}
* @param path {@link Path} to be added to the list of inputs for the job
* @param inputFormatClass {@link InputFormat} class to use for this path
* @param mapperClass {@link Mapper} class to use for this path
*/
@SuppressWarnings("unchecked")
public static void addInputPath(Job job, Path path,
Class<? extends InputFormat> inputFormatClass,
Class<? extends Mapper> mapperClass) {
addInputPath(job, path, inputFormatClass);
Configuration conf = job.getConfiguration();
String mapperMapping = path.toString() + ";" + mapperClass.getName();
String mappers = conf.get(DIR_MAPPERS);
conf.set(DIR_MAPPERS, mappers == null ? mapperMapping
: mappers + "," + mapperMapping);
job.setMapperClass(DelegatingMapper.class);
}
/**
* Retrieves a map of {@link Path}s to the {@link InputFormat} class
* that should be used for them.
*
* @param job The {@link JobContext}
* @see #addInputPath(JobConf, Path, Class)
* @return A map of paths to inputformats for the job
*/
@SuppressWarnings("unchecked")
static Map<Path, InputFormat> getInputFormatMap(JobContext job) {
Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
Configuration conf = job.getConfiguration();
String[] pathMappings = conf.get(DIR_FORMATS).split(",");
for (String pathMapping : pathMappings) {
String[] split = pathMapping.split(";");
InputFormat inputFormat;
try {
inputFormat = (InputFormat) ReflectionUtils.newInstance(conf
.getClassByName(split[1]), conf);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
m.put(new Path(split[0]), inputFormat);
}
return m;
}
/**
* Retrieves a map of {@link Path}s to the {@link Mapper} class that
* should be used for them.
*
* @param job The {@link JobContext}
* @see #addInputPath(JobConf, Path, Class, Class)
* @return A map of paths to mappers for the job
*/
@SuppressWarnings("unchecked")
static Map<Path, Class<? extends Mapper>>
getMapperTypeMap(JobContext job) {
Configuration conf = job.getConfiguration();
if (conf.get(DIR_MAPPERS) == null) {
return Collections.emptyMap();
}
Map<Path, Class<? extends Mapper>> m =
new HashMap<Path, Class<? extends Mapper>>();
String[] pathMappings = conf.get(DIR_MAPPERS).split(",");
for (String pathMapping : pathMappings) {
String[] split = pathMapping.split(";");
Class<? extends Mapper> mapClass;
try {
mapClass =
(Class<? extends Mapper>) conf.getClassByName(split[1]);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
m.put(new Path(split[0]), mapClass);
}
return m;
}
}