blob: 284c46a9187e9dd2d073411db98b28ff7108ffa0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hcatalog.shims.HCatHadoopShims;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The MultiOutputFormat class simplifies writing output data to multiple
* outputs.
* <p>
* Multiple output formats can be defined each with its own
* <code>OutputFormat</code> class, own key class and own value class. Any
* configuration on these output format classes can be done without interfering
* with other output format's configuration.
* <p>
* Usage pattern for job submission:
*
* <pre>
*
* Job job = new Job();
*
* FileInputFormat.setInputPath(job, inDir);
*
* job.setMapperClass(WordCountMap.class);
* job.setReducerClass(WordCountReduce.class);
* job.setInputFormatClass(TextInputFormat.class);
* job.setOutputFormatClass(MultiOutputFormat.class);
* // Need not define OutputKeyClass and OutputValueClass. They default to
* // Writable.class
* job.setMapOutputKeyClass(Text.class);
* job.setMapOutputValueClass(IntWritable.class);
*
*
* // Create a JobConfigurer that will configure the job with the multiple
* // output format information.
* JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
*
* // Defines additional single text based output 'text' for the job.
* // Any configuration for the defined OutputFormat should be done with
* // the Job obtained with configurer.getJob() method.
* configurer.addOutputFormat("text", TextOutputFormat.class,
* IntWritable.class, Text.class);
* FileOutputFormat.setOutputPath(configurer.getJob("text"), textOutDir);
*
* // Defines additional sequence-file based output 'sequence' for the job
* configurer.addOutputFormat("sequence", SequenceFileOutputFormat.class,
* Text.class, IntWritable.class);
* FileOutputFormat.setOutputPath(configurer.getJob("sequence"), seqOutDir);
* ...
* // configure method to be called on the JobConfigurer once all the
* // output formats have been defined and configured.
* configurer.configure();
*
* job.waitForCompletion(true);
* ...
* </pre>
* <p>
* Usage in Reducer:
*
* <pre>
* public class WordCountReduce extends
* Reducer&lt;Text, IntWritable, Writable, Writable&gt; {
*
* private IntWritable count = new IntWritable();
*
* public void reduce(Text word, Iterator&lt;IntWritable&gt; values,
* Context context)
* throws IOException {
* int sum = 0;
* for (IntWritable val : values) {
* sum += val.get();
* }
* count.set(sum);
* MultiOutputFormat.write(&quot;text&quot;, count, word, context);
* MultiOutputFormat.write(&quot;sequence&quot;, word, count, context);
* }
*
* }
*
* </pre>
*
* Map only jobs:
* <p>
* MultiOutputFormat.write("output", key, value, context); can be called similar
* to a reducer in map only jobs.
*
*/
public class MultiOutputFormat extends OutputFormat<Writable, Writable> {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiOutputFormat.class.getName());
private static final String MO_ALIASES = "mapreduce.multiout.aliases";
private static final String MO_ALIAS = "mapreduce.multiout.alias";
private static final String CONF_KEY_DELIM = "%%";
private static final String CONF_VALUE_DELIM = ";;";
private static final String COMMA_DELIM = ",";
private static final List<String> configsToOverride = new ArrayList<String>();
private static final Map<String, String> configsToMerge = new HashMap<String, String>();
static {
configsToOverride.add("mapred.output.dir");
configsToOverride.add(HCatHadoopShims.Instance.get().getPropertyName(HCatHadoopShims.PropertyName.CACHE_SYMLINK));
configsToMerge.put(JobContext.JOB_NAMENODES, COMMA_DELIM);
configsToMerge.put("tmpfiles", COMMA_DELIM);
configsToMerge.put("tmpjars", COMMA_DELIM);
configsToMerge.put("tmparchives", COMMA_DELIM);
configsToMerge.put(HCatHadoopShims.Instance.get().getPropertyName(HCatHadoopShims.PropertyName.CACHE_ARCHIVES), COMMA_DELIM);
configsToMerge.put(HCatHadoopShims.Instance.get().getPropertyName(HCatHadoopShims.PropertyName.CACHE_FILES), COMMA_DELIM);
configsToMerge.put("mapred.job.classpath.archives", System.getProperty("path.separator"));
configsToMerge.put("mapred.job.classpath.files", System.getProperty("path.separator"));
}
/**
* Get a JobConfigurer instance that will support configuration of the job
* for multiple output formats.
*
* @param job the mapreduce job to be submitted
* @return JobConfigurer
*/
public static JobConfigurer createConfigurer(Job job) {
return JobConfigurer.create(job);
}
/**
* Get the JobContext with the related OutputFormat configuration populated given the alias
* and the actual JobContext
* @param alias the name given to the OutputFormat configuration
* @param context the JobContext
* @return a copy of the JobContext with the alias configuration populated
*/
public static JobContext getJobContext(String alias, JobContext context) {
String aliasConf = context.getConfiguration().get(getAliasConfName(alias));
JobContext aliasContext = HCatHadoopShims.Instance.get().createJobContext(context.getConfiguration(), context.getJobID());
addToConfig(aliasConf, aliasContext.getConfiguration());
return aliasContext;
}
/**
* Get the TaskAttemptContext with the related OutputFormat configuration populated given the alias
* and the actual TaskAttemptContext
* @param alias the name given to the OutputFormat configuration
* @param context the Mapper or Reducer Context
* @return a copy of the TaskAttemptContext with the alias configuration populated
*/
public static TaskAttemptContext getTaskAttemptContext(String alias, TaskAttemptContext context) {
String aliasConf = context.getConfiguration().get(getAliasConfName(alias));
TaskAttemptContext aliasContext = HCatHadoopShims.Instance.get().createTaskAttemptContext(
context.getConfiguration(), context.getTaskAttemptID());
addToConfig(aliasConf, aliasContext.getConfiguration());
return aliasContext;
}
/**
* Write the output key and value using the OutputFormat defined by the
* alias.
*
* @param alias the name given to the OutputFormat configuration
* @param key the output key to be written
* @param value the output value to be written
* @param context the Mapper or Reducer Context
* @throws IOException
* @throws InterruptedException
*/
public static <K, V> void write(String alias, K key, V value, TaskInputOutputContext context)
throws IOException, InterruptedException {
KeyValue<K, V> keyval = new KeyValue<K, V>(key, value);
context.write(new Text(alias), keyval);
}
@Override
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
for (String alias : getOutputFormatAliases(context)) {
LOGGER.debug("Calling checkOutputSpecs for alias: " + alias);
JobContext aliasContext = getJobContext(alias, context);
OutputFormat<?, ?> outputFormat = getOutputFormatInstance(aliasContext);
outputFormat.checkOutputSpecs(aliasContext);
// Copy credentials and any new config added back to JobContext
context.getCredentials().addAll(aliasContext.getCredentials());
setAliasConf(alias, context, aliasContext);
}
}
@Override
public RecordWriter<Writable, Writable> getRecordWriter(TaskAttemptContext context)
throws IOException,
InterruptedException {
return new MultiRecordWriter(context);
}
@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
InterruptedException {
return new MultiOutputCommitter(context);
}
private static OutputFormat<?, ?> getOutputFormatInstance(JobContext context) {
OutputFormat<?, ?> outputFormat;
try {
outputFormat = ReflectionUtils.newInstance(context.getOutputFormatClass(),
context.getConfiguration());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
}
return outputFormat;
}
private static String[] getOutputFormatAliases(JobContext context) {
return context.getConfiguration().getStrings(MO_ALIASES);
}
/**
* Compare the aliasContext with userJob and add the differing configuration
* as mapreduce.multiout.alias.<aliasname>.conf to the userJob.
* <p>
* Merge config like tmpjars, tmpfile, tmparchives,
* mapreduce.job.hdfs-servers that are directly handled by JobClient and add
* them to userJob.
* <p>
* Add mapred.output.dir config to userJob.
*
* @param alias alias name associated with a OutputFormat
* @param userJob reference to Job that the user is going to submit
* @param aliasContext JobContext populated with OutputFormat related
* configuration.
*/
private static void setAliasConf(String alias, JobContext userJob, JobContext aliasContext) {
Configuration userConf = userJob.getConfiguration();
StringBuilder builder = new StringBuilder();
for (Entry<String, String> conf : aliasContext.getConfiguration()) {
String key = conf.getKey();
String value = conf.getValue();
String jobValue = userConf.getRaw(key);
if (jobValue == null || !jobValue.equals(value)) {
if (configsToMerge.containsKey(key)) {
String mergedValue = getMergedConfValue(jobValue, value, configsToMerge.get(key));
userConf.set(key, mergedValue);
} else {
if (configsToOverride.contains(key)) {
userConf.set(key, value);
}
builder.append(key).append(CONF_KEY_DELIM).append(value)
.append(CONF_VALUE_DELIM);
}
}
}
builder.delete(builder.length() - CONF_VALUE_DELIM.length(), builder.length());
userConf.set(getAliasConfName(alias), builder.toString());
}
private static String getMergedConfValue(String originalValues, String newValues, String separator) {
if (originalValues == null) {
return newValues;
}
Set<String> mergedValues = new LinkedHashSet<String>();
mergedValues.addAll(Arrays.asList(StringUtils.split(originalValues, separator)));
mergedValues.addAll(Arrays.asList(StringUtils.split(newValues, separator)));
StringBuilder builder = new StringBuilder(originalValues.length() + newValues.length() + 2);
for (String value : mergedValues) {
builder.append(value).append(separator);
}
return builder.substring(0, builder.length() - separator.length());
}
private static String getAliasConfName(String alias) {
return MO_ALIAS + "." + alias + ".conf";
}
private static void addToConfig(String aliasConf, Configuration conf) {
String[] config = aliasConf.split(CONF_KEY_DELIM + "|" + CONF_VALUE_DELIM);
for (int i = 0; i < config.length; i += 2) {
conf.set(config[i], config[i + 1]);
}
}
/**
* Class that supports configuration of the job for multiple output formats.
*/
public static class JobConfigurer {
private final Job job;
private Map<String, Job> outputConfigs = new LinkedHashMap<String, Job>();
private JobConfigurer(Job job) {
this.job = job;
}
private static JobConfigurer create(Job job) {
JobConfigurer configurer = new JobConfigurer(job);
return configurer;
}
/**
* Add a OutputFormat configuration to the Job with a alias name.
*
* @param alias the name to be given to the OutputFormat configuration
* @param outputFormatClass OutputFormat class
* @param keyClass the key class for the output data
* @param valueClass the value class for the output data
* @throws IOException
*/
public void addOutputFormat(String alias,
Class<? extends OutputFormat> outputFormatClass,
Class<?> keyClass, Class<?> valueClass) throws IOException {
Job copy = new Job(this.job.getConfiguration());
outputConfigs.put(alias, copy);
copy.setOutputFormatClass(outputFormatClass);
copy.setOutputKeyClass(keyClass);
copy.setOutputValueClass(valueClass);
}
/**
* Get the Job configuration for a OutputFormat defined by the alias
* name. The job returned by this method should be passed to the
* OutputFormat for any configuration instead of the Job that will be
* submitted to the JobClient.
*
* @param alias the name used for the OutputFormat during
* addOutputFormat
* @return Job
*/
public Job getJob(String alias) {
Job copy = outputConfigs.get(alias);
if (copy == null) {
throw new IllegalArgumentException("OutputFormat with alias " + alias
+ " has not beed added");
}
return copy;
}
/**
* Configure the job with the multiple output formats added. This method
* should be called after all the output formats have been added and
* configured and before the job submission.
*/
public void configure() {
StringBuilder aliases = new StringBuilder();
Configuration jobConf = job.getConfiguration();
for (Entry<String, Job> entry : outputConfigs.entrySet()) {
// Copy credentials
job.getCredentials().addAll(entry.getValue().getCredentials());
String alias = entry.getKey();
aliases.append(alias).append(COMMA_DELIM);
// Store the differing configuration for each alias in the job
// as a setting.
setAliasConf(alias, job, entry.getValue());
}
aliases.delete(aliases.length() - COMMA_DELIM.length(), aliases.length());
jobConf.set(MO_ALIASES, aliases.toString());
}
}
private static class KeyValue<K, V> implements Writable {
private final K key;
private final V value;
public KeyValue(K key, V value) {
this.key = key;
this.value = value;
}
public K getKey() {
return key;
}
public V getValue() {
return value;
}
@Override
public void write(DataOutput out) throws IOException {
// Ignore. Not required as this will be never
// serialized/deserialized.
}
@Override
public void readFields(DataInput in) throws IOException {
// Ignore. Not required as this will be never
// serialized/deserialized.
}
}
private static class MultiRecordWriter extends RecordWriter<Writable, Writable> {
private final Map<String, BaseRecordWriterContainer> baseRecordWriters;
public MultiRecordWriter(TaskAttemptContext context) throws IOException,
InterruptedException {
baseRecordWriters = new LinkedHashMap<String, BaseRecordWriterContainer>();
String[] aliases = getOutputFormatAliases(context);
for (String alias : aliases) {
LOGGER.info("Creating record writer for alias: " + alias);
TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context);
Configuration aliasConf = aliasContext.getConfiguration();
// Create output directory if not already created.
String outDir = aliasConf.get("mapred.output.dir");
if (outDir != null) {
Path outputDir = new Path(outDir);
FileSystem fs = outputDir.getFileSystem(aliasConf);
if (!fs.exists(outputDir)) {
fs.mkdirs(outputDir);
}
}
OutputFormat<?, ?> outputFormat = getOutputFormatInstance(aliasContext);
baseRecordWriters.put(alias,
new BaseRecordWriterContainer(outputFormat.getRecordWriter(aliasContext),
aliasContext));
}
}
@Override
public void write(Writable key, Writable value) throws IOException, InterruptedException {
Text _key = (Text) key;
KeyValue _value = (KeyValue) value;
String alias = new String(_key.getBytes(), 0, _key.getLength());
BaseRecordWriterContainer baseRWContainer = baseRecordWriters.get(alias);
if (baseRWContainer == null) {
throw new IllegalArgumentException("OutputFormat with alias " + alias
+ " has not been added");
}
baseRWContainer.getRecordWriter().write(_value.getKey(), _value.getValue());
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
for (Entry<String, BaseRecordWriterContainer> entry : baseRecordWriters.entrySet()) {
BaseRecordWriterContainer baseRWContainer = entry.getValue();
LOGGER.info("Closing record writer for alias: " + entry.getKey());
baseRWContainer.getRecordWriter().close(baseRWContainer.getContext());
}
}
}
private static class BaseRecordWriterContainer {
private final RecordWriter recordWriter;
private final TaskAttemptContext context;
public BaseRecordWriterContainer(RecordWriter recordWriter, TaskAttemptContext context) {
this.recordWriter = recordWriter;
this.context = context;
}
public RecordWriter getRecordWriter() {
return recordWriter;
}
public TaskAttemptContext getContext() {
return context;
}
}
public class MultiOutputCommitter extends OutputCommitter {
private final Map<String, BaseOutputCommitterContainer> outputCommitters;
public MultiOutputCommitter(TaskAttemptContext context) throws IOException,
InterruptedException {
outputCommitters = new LinkedHashMap<String, MultiOutputFormat.BaseOutputCommitterContainer>();
String[] aliases = getOutputFormatAliases(context);
for (String alias : aliases) {
LOGGER.info("Creating output committer for alias: " + alias);
TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context);
OutputCommitter baseCommitter = getOutputFormatInstance(aliasContext)
.getOutputCommitter(aliasContext);
outputCommitters.put(alias,
new BaseOutputCommitterContainer(baseCommitter, aliasContext));
}
}
@Override
public void setupJob(JobContext jobContext) throws IOException {
for (String alias : outputCommitters.keySet()) {
LOGGER.info("Calling setupJob for alias: " + alias);
BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
outputContainer.getBaseCommitter().setupJob(outputContainer.getContext());
}
}
@Override
public void setupTask(TaskAttemptContext taskContext) throws IOException {
for (String alias : outputCommitters.keySet()) {
LOGGER.info("Calling setupTask for alias: " + alias);
BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
outputContainer.getBaseCommitter().setupTask(outputContainer.getContext());
}
}
@Override
public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
boolean needTaskCommit = false;
for (String alias : outputCommitters.keySet()) {
BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
needTaskCommit = needTaskCommit
|| outputContainer.getBaseCommitter().needsTaskCommit(
outputContainer.getContext());
}
return needTaskCommit;
}
@Override
public void commitTask(TaskAttemptContext taskContext) throws IOException {
for (String alias : outputCommitters.keySet()) {
BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
OutputCommitter baseCommitter = outputContainer.getBaseCommitter();
TaskAttemptContext committerContext = outputContainer.getContext();
if (baseCommitter.needsTaskCommit(committerContext)) {
LOGGER.info("Calling commitTask for alias: " + alias);
baseCommitter.commitTask(committerContext);
}
}
}
@Override
public void abortTask(TaskAttemptContext taskContext) throws IOException {
for (String alias : outputCommitters.keySet()) {
LOGGER.info("Calling abortTask for alias: " + alias);
BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
outputContainer.getBaseCommitter().abortTask(outputContainer.getContext());
}
}
@Override
public void commitJob(JobContext jobContext) throws IOException {
for (String alias : outputCommitters.keySet()) {
LOGGER.info("Calling commitJob for alias: " + alias);
BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
outputContainer.getBaseCommitter().commitJob(outputContainer.getContext());
}
}
@Override
public void abortJob(JobContext jobContext, State state) throws IOException {
for (String alias : outputCommitters.keySet()) {
LOGGER.info("Calling abortJob for alias: " + alias);
BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
outputContainer.getBaseCommitter().abortJob(outputContainer.getContext(), state);
}
}
}
private static class BaseOutputCommitterContainer {
private final OutputCommitter outputCommitter;
private final TaskAttemptContext context;
public BaseOutputCommitterContainer(OutputCommitter outputCommitter,
TaskAttemptContext context) {
this.outputCommitter = outputCommitter;
this.context = context;
}
public OutputCommitter getBaseCommitter() {
return outputCommitter;
}
public TaskAttemptContext getContext() {
return context;
}
}
}