| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hcatalog.mapreduce; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| |
| import org.apache.commons.lang.StringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.JobStatus.State; |
| import org.apache.hadoop.mapreduce.OutputCommitter; |
| import org.apache.hadoop.mapreduce.OutputFormat; |
| import org.apache.hadoop.mapreduce.RecordWriter; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.TaskInputOutputContext; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hcatalog.shims.HCatHadoopShims; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * The MultiOutputFormat class simplifies writing output data to multiple |
| * outputs. |
| * <p> |
| * Multiple output formats can be defined each with its own |
| * <code>OutputFormat</code> class, own key class and own value class. Any |
| * configuration on these output format classes can be done without interfering |
| * with other output format's configuration. |
| * <p> |
| * Usage pattern for job submission: |
| * |
| * <pre> |
| * |
| * Job job = new Job(); |
| * |
| * FileInputFormat.setInputPath(job, inDir); |
| * |
| * job.setMapperClass(WordCountMap.class); |
| * job.setReducerClass(WordCountReduce.class); |
| * job.setInputFormatClass(TextInputFormat.class); |
| * job.setOutputFormatClass(MultiOutputFormat.class); |
| * // Need not define OutputKeyClass and OutputValueClass. They default to |
| * // Writable.class |
| * job.setMapOutputKeyClass(Text.class); |
| * job.setMapOutputValueClass(IntWritable.class); |
| * |
| * |
| * // Create a JobConfigurer that will configure the job with the multiple |
| * // output format information. |
| * JobConfigurer configurer = MultiOutputFormat.createConfigurer(job); |
| * |
| * // Defines additional single text based output 'text' for the job. |
| * // Any configuration for the defined OutputFormat should be done with |
| * // the Job obtained with configurer.getJob() method. |
| * configurer.addOutputFormat("text", TextOutputFormat.class, |
| * IntWritable.class, Text.class); |
| * FileOutputFormat.setOutputPath(configurer.getJob("text"), textOutDir); |
| * |
| * // Defines additional sequence-file based output 'sequence' for the job |
| * configurer.addOutputFormat("sequence", SequenceFileOutputFormat.class, |
| * Text.class, IntWritable.class); |
| * FileOutputFormat.setOutputPath(configurer.getJob("sequence"), seqOutDir); |
| * ... |
| * // configure method to be called on the JobConfigurer once all the |
| * // output formats have been defined and configured. |
| * configurer.configure(); |
| * |
| * job.waitForCompletion(true); |
| * ... |
| * </pre> |
| * <p> |
| * Usage in Reducer: |
| * |
| * <pre> |
| * public class WordCountReduce extends |
| * Reducer<Text, IntWritable, Writable, Writable> { |
| * |
| * private IntWritable count = new IntWritable(); |
| * |
| * public void reduce(Text word, Iterator<IntWritable> values, |
| * Context context) |
| * throws IOException { |
| * int sum = 0; |
| * for (IntWritable val : values) { |
| * sum += val.get(); |
| * } |
| * count.set(sum); |
| * MultiOutputFormat.write("text", count, word, context); |
| * MultiOutputFormat.write("sequence", word, count, context); |
| * } |
| * |
| * } |
| * |
| * </pre> |
| * |
| * Map only jobs: |
| * <p> |
| * MultiOutputFormat.write("output", key, value, context); can be called similar |
| * to a reducer in map only jobs. |
| * |
| */ |
| public class MultiOutputFormat extends OutputFormat<Writable, Writable> { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(MultiOutputFormat.class.getName()); |
| private static final String MO_ALIASES = "mapreduce.multiout.aliases"; |
| private static final String MO_ALIAS = "mapreduce.multiout.alias"; |
| private static final String CONF_KEY_DELIM = "%%"; |
| private static final String CONF_VALUE_DELIM = ";;"; |
| private static final String COMMA_DELIM = ","; |
| private static final List<String> configsToOverride = new ArrayList<String>(); |
| private static final Map<String, String> configsToMerge = new HashMap<String, String>(); |
| |
| static { |
| configsToOverride.add("mapred.output.dir"); |
| configsToOverride.add(HCatHadoopShims.Instance.get().getPropertyName(HCatHadoopShims.PropertyName.CACHE_SYMLINK)); |
| configsToMerge.put(JobContext.JOB_NAMENODES, COMMA_DELIM); |
| configsToMerge.put("tmpfiles", COMMA_DELIM); |
| configsToMerge.put("tmpjars", COMMA_DELIM); |
| configsToMerge.put("tmparchives", COMMA_DELIM); |
| configsToMerge.put(HCatHadoopShims.Instance.get().getPropertyName(HCatHadoopShims.PropertyName.CACHE_ARCHIVES), COMMA_DELIM); |
| configsToMerge.put(HCatHadoopShims.Instance.get().getPropertyName(HCatHadoopShims.PropertyName.CACHE_FILES), COMMA_DELIM); |
| configsToMerge.put("mapred.job.classpath.archives", System.getProperty("path.separator")); |
| configsToMerge.put("mapred.job.classpath.files", System.getProperty("path.separator")); |
| } |
| |
| /** |
| * Get a JobConfigurer instance that will support configuration of the job |
| * for multiple output formats. |
| * |
| * @param job the mapreduce job to be submitted |
| * @return JobConfigurer |
| */ |
| public static JobConfigurer createConfigurer(Job job) { |
| return JobConfigurer.create(job); |
| } |
| |
| /** |
| * Get the JobContext with the related OutputFormat configuration populated given the alias |
| * and the actual JobContext |
| * @param alias the name given to the OutputFormat configuration |
| * @param context the JobContext |
| * @return a copy of the JobContext with the alias configuration populated |
| */ |
| public static JobContext getJobContext(String alias, JobContext context) { |
| String aliasConf = context.getConfiguration().get(getAliasConfName(alias)); |
| JobContext aliasContext = HCatHadoopShims.Instance.get().createJobContext(context.getConfiguration(), context.getJobID()); |
| addToConfig(aliasConf, aliasContext.getConfiguration()); |
| return aliasContext; |
| } |
| |
| /** |
| * Get the TaskAttemptContext with the related OutputFormat configuration populated given the alias |
| * and the actual TaskAttemptContext |
| * @param alias the name given to the OutputFormat configuration |
| * @param context the Mapper or Reducer Context |
| * @return a copy of the TaskAttemptContext with the alias configuration populated |
| */ |
| public static TaskAttemptContext getTaskAttemptContext(String alias, TaskAttemptContext context) { |
| String aliasConf = context.getConfiguration().get(getAliasConfName(alias)); |
| TaskAttemptContext aliasContext = HCatHadoopShims.Instance.get().createTaskAttemptContext( |
| context.getConfiguration(), context.getTaskAttemptID()); |
| addToConfig(aliasConf, aliasContext.getConfiguration()); |
| return aliasContext; |
| } |
| |
| /** |
| * Write the output key and value using the OutputFormat defined by the |
| * alias. |
| * |
| * @param alias the name given to the OutputFormat configuration |
| * @param key the output key to be written |
| * @param value the output value to be written |
| * @param context the Mapper or Reducer Context |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| public static <K, V> void write(String alias, K key, V value, TaskInputOutputContext context) |
| throws IOException, InterruptedException { |
| KeyValue<K, V> keyval = new KeyValue<K, V>(key, value); |
| context.write(new Text(alias), keyval); |
| } |
| |
| @Override |
| public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { |
| for (String alias : getOutputFormatAliases(context)) { |
| LOGGER.debug("Calling checkOutputSpecs for alias: " + alias); |
| JobContext aliasContext = getJobContext(alias, context); |
| OutputFormat<?, ?> outputFormat = getOutputFormatInstance(aliasContext); |
| outputFormat.checkOutputSpecs(aliasContext); |
| // Copy credentials and any new config added back to JobContext |
| context.getCredentials().addAll(aliasContext.getCredentials()); |
| setAliasConf(alias, context, aliasContext); |
| } |
| } |
| |
| @Override |
| public RecordWriter<Writable, Writable> getRecordWriter(TaskAttemptContext context) |
| throws IOException, |
| InterruptedException { |
| return new MultiRecordWriter(context); |
| } |
| |
| @Override |
| public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, |
| InterruptedException { |
| return new MultiOutputCommitter(context); |
| } |
| |
| private static OutputFormat<?, ?> getOutputFormatInstance(JobContext context) { |
| OutputFormat<?, ?> outputFormat; |
| try { |
| outputFormat = ReflectionUtils.newInstance(context.getOutputFormatClass(), |
| context.getConfiguration()); |
| } catch (ClassNotFoundException e) { |
| throw new IllegalStateException(e); |
| } |
| return outputFormat; |
| } |
| |
| private static String[] getOutputFormatAliases(JobContext context) { |
| return context.getConfiguration().getStrings(MO_ALIASES); |
| } |
| |
| /** |
| * Compare the aliasContext with userJob and add the differing configuration |
| * as mapreduce.multiout.alias.<aliasname>.conf to the userJob. |
| * <p> |
| * Merge config like tmpjars, tmpfile, tmparchives, |
| * mapreduce.job.hdfs-servers that are directly handled by JobClient and add |
| * them to userJob. |
| * <p> |
| * Add mapred.output.dir config to userJob. |
| * |
| * @param alias alias name associated with a OutputFormat |
| * @param userJob reference to Job that the user is going to submit |
| * @param aliasContext JobContext populated with OutputFormat related |
| * configuration. |
| */ |
| private static void setAliasConf(String alias, JobContext userJob, JobContext aliasContext) { |
| Configuration userConf = userJob.getConfiguration(); |
| StringBuilder builder = new StringBuilder(); |
| for (Entry<String, String> conf : aliasContext.getConfiguration()) { |
| String key = conf.getKey(); |
| String value = conf.getValue(); |
| String jobValue = userConf.getRaw(key); |
| if (jobValue == null || !jobValue.equals(value)) { |
| if (configsToMerge.containsKey(key)) { |
| String mergedValue = getMergedConfValue(jobValue, value, configsToMerge.get(key)); |
| userConf.set(key, mergedValue); |
| } else { |
| if (configsToOverride.contains(key)) { |
| userConf.set(key, value); |
| } |
| builder.append(key).append(CONF_KEY_DELIM).append(value) |
| .append(CONF_VALUE_DELIM); |
| } |
| } |
| } |
| builder.delete(builder.length() - CONF_VALUE_DELIM.length(), builder.length()); |
| userConf.set(getAliasConfName(alias), builder.toString()); |
| } |
| |
| private static String getMergedConfValue(String originalValues, String newValues, String separator) { |
| if (originalValues == null) { |
| return newValues; |
| } |
| Set<String> mergedValues = new LinkedHashSet<String>(); |
| mergedValues.addAll(Arrays.asList(StringUtils.split(originalValues, separator))); |
| mergedValues.addAll(Arrays.asList(StringUtils.split(newValues, separator))); |
| StringBuilder builder = new StringBuilder(originalValues.length() + newValues.length() + 2); |
| for (String value : mergedValues) { |
| builder.append(value).append(separator); |
| } |
| return builder.substring(0, builder.length() - separator.length()); |
| } |
| |
| private static String getAliasConfName(String alias) { |
| return MO_ALIAS + "." + alias + ".conf"; |
| } |
| |
| private static void addToConfig(String aliasConf, Configuration conf) { |
| String[] config = aliasConf.split(CONF_KEY_DELIM + "|" + CONF_VALUE_DELIM); |
| for (int i = 0; i < config.length; i += 2) { |
| conf.set(config[i], config[i + 1]); |
| } |
| } |
| |
| /** |
| * Class that supports configuration of the job for multiple output formats. |
| */ |
| public static class JobConfigurer { |
| |
| private final Job job; |
| private Map<String, Job> outputConfigs = new LinkedHashMap<String, Job>(); |
| |
| private JobConfigurer(Job job) { |
| this.job = job; |
| } |
| |
| private static JobConfigurer create(Job job) { |
| JobConfigurer configurer = new JobConfigurer(job); |
| return configurer; |
| } |
| |
| /** |
| * Add a OutputFormat configuration to the Job with a alias name. |
| * |
| * @param alias the name to be given to the OutputFormat configuration |
| * @param outputFormatClass OutputFormat class |
| * @param keyClass the key class for the output data |
| * @param valueClass the value class for the output data |
| * @throws IOException |
| */ |
| public void addOutputFormat(String alias, |
| Class<? extends OutputFormat> outputFormatClass, |
| Class<?> keyClass, Class<?> valueClass) throws IOException { |
| Job copy = new Job(this.job.getConfiguration()); |
| outputConfigs.put(alias, copy); |
| copy.setOutputFormatClass(outputFormatClass); |
| copy.setOutputKeyClass(keyClass); |
| copy.setOutputValueClass(valueClass); |
| } |
| |
| /** |
| * Get the Job configuration for a OutputFormat defined by the alias |
| * name. The job returned by this method should be passed to the |
| * OutputFormat for any configuration instead of the Job that will be |
| * submitted to the JobClient. |
| * |
| * @param alias the name used for the OutputFormat during |
| * addOutputFormat |
| * @return Job |
| */ |
| public Job getJob(String alias) { |
| Job copy = outputConfigs.get(alias); |
| if (copy == null) { |
| throw new IllegalArgumentException("OutputFormat with alias " + alias |
| + " has not beed added"); |
| } |
| return copy; |
| } |
| |
| /** |
| * Configure the job with the multiple output formats added. This method |
| * should be called after all the output formats have been added and |
| * configured and before the job submission. |
| */ |
| public void configure() { |
| StringBuilder aliases = new StringBuilder(); |
| Configuration jobConf = job.getConfiguration(); |
| for (Entry<String, Job> entry : outputConfigs.entrySet()) { |
| // Copy credentials |
| job.getCredentials().addAll(entry.getValue().getCredentials()); |
| String alias = entry.getKey(); |
| aliases.append(alias).append(COMMA_DELIM); |
| // Store the differing configuration for each alias in the job |
| // as a setting. |
| setAliasConf(alias, job, entry.getValue()); |
| } |
| aliases.delete(aliases.length() - COMMA_DELIM.length(), aliases.length()); |
| jobConf.set(MO_ALIASES, aliases.toString()); |
| } |
| |
| } |
| |
| private static class KeyValue<K, V> implements Writable { |
| private final K key; |
| private final V value; |
| |
| public KeyValue(K key, V value) { |
| this.key = key; |
| this.value = value; |
| } |
| |
| public K getKey() { |
| return key; |
| } |
| |
| public V getValue() { |
| return value; |
| } |
| |
| @Override |
| public void write(DataOutput out) throws IOException { |
| // Ignore. Not required as this will be never |
| // serialized/deserialized. |
| } |
| |
| @Override |
| public void readFields(DataInput in) throws IOException { |
| // Ignore. Not required as this will be never |
| // serialized/deserialized. |
| } |
| } |
| |
| private static class MultiRecordWriter extends RecordWriter<Writable, Writable> { |
| |
| private final Map<String, BaseRecordWriterContainer> baseRecordWriters; |
| |
| public MultiRecordWriter(TaskAttemptContext context) throws IOException, |
| InterruptedException { |
| baseRecordWriters = new LinkedHashMap<String, BaseRecordWriterContainer>(); |
| String[] aliases = getOutputFormatAliases(context); |
| for (String alias : aliases) { |
| LOGGER.info("Creating record writer for alias: " + alias); |
| TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context); |
| Configuration aliasConf = aliasContext.getConfiguration(); |
| // Create output directory if not already created. |
| String outDir = aliasConf.get("mapred.output.dir"); |
| if (outDir != null) { |
| Path outputDir = new Path(outDir); |
| FileSystem fs = outputDir.getFileSystem(aliasConf); |
| if (!fs.exists(outputDir)) { |
| fs.mkdirs(outputDir); |
| } |
| } |
| OutputFormat<?, ?> outputFormat = getOutputFormatInstance(aliasContext); |
| baseRecordWriters.put(alias, |
| new BaseRecordWriterContainer(outputFormat.getRecordWriter(aliasContext), |
| aliasContext)); |
| } |
| } |
| |
| @Override |
| public void write(Writable key, Writable value) throws IOException, InterruptedException { |
| Text _key = (Text) key; |
| KeyValue _value = (KeyValue) value; |
| String alias = new String(_key.getBytes(), 0, _key.getLength()); |
| BaseRecordWriterContainer baseRWContainer = baseRecordWriters.get(alias); |
| if (baseRWContainer == null) { |
| throw new IllegalArgumentException("OutputFormat with alias " + alias |
| + " has not been added"); |
| } |
| baseRWContainer.getRecordWriter().write(_value.getKey(), _value.getValue()); |
| } |
| |
| @Override |
| public void close(TaskAttemptContext context) throws IOException, InterruptedException { |
| for (Entry<String, BaseRecordWriterContainer> entry : baseRecordWriters.entrySet()) { |
| BaseRecordWriterContainer baseRWContainer = entry.getValue(); |
| LOGGER.info("Closing record writer for alias: " + entry.getKey()); |
| baseRWContainer.getRecordWriter().close(baseRWContainer.getContext()); |
| } |
| } |
| |
| } |
| |
| private static class BaseRecordWriterContainer { |
| |
| private final RecordWriter recordWriter; |
| private final TaskAttemptContext context; |
| |
| public BaseRecordWriterContainer(RecordWriter recordWriter, TaskAttemptContext context) { |
| this.recordWriter = recordWriter; |
| this.context = context; |
| } |
| |
| public RecordWriter getRecordWriter() { |
| return recordWriter; |
| } |
| |
| public TaskAttemptContext getContext() { |
| return context; |
| } |
| } |
| |
| public class MultiOutputCommitter extends OutputCommitter { |
| |
| private final Map<String, BaseOutputCommitterContainer> outputCommitters; |
| |
| public MultiOutputCommitter(TaskAttemptContext context) throws IOException, |
| InterruptedException { |
| outputCommitters = new LinkedHashMap<String, MultiOutputFormat.BaseOutputCommitterContainer>(); |
| String[] aliases = getOutputFormatAliases(context); |
| for (String alias : aliases) { |
| LOGGER.info("Creating output committer for alias: " + alias); |
| TaskAttemptContext aliasContext = getTaskAttemptContext(alias, context); |
| OutputCommitter baseCommitter = getOutputFormatInstance(aliasContext) |
| .getOutputCommitter(aliasContext); |
| outputCommitters.put(alias, |
| new BaseOutputCommitterContainer(baseCommitter, aliasContext)); |
| } |
| } |
| |
| @Override |
| public void setupJob(JobContext jobContext) throws IOException { |
| for (String alias : outputCommitters.keySet()) { |
| LOGGER.info("Calling setupJob for alias: " + alias); |
| BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); |
| outputContainer.getBaseCommitter().setupJob(outputContainer.getContext()); |
| } |
| } |
| |
| @Override |
| public void setupTask(TaskAttemptContext taskContext) throws IOException { |
| for (String alias : outputCommitters.keySet()) { |
| LOGGER.info("Calling setupTask for alias: " + alias); |
| BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); |
| outputContainer.getBaseCommitter().setupTask(outputContainer.getContext()); |
| } |
| } |
| |
| @Override |
| public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException { |
| boolean needTaskCommit = false; |
| for (String alias : outputCommitters.keySet()) { |
| BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); |
| needTaskCommit = needTaskCommit |
| || outputContainer.getBaseCommitter().needsTaskCommit( |
| outputContainer.getContext()); |
| } |
| return needTaskCommit; |
| } |
| |
| @Override |
| public void commitTask(TaskAttemptContext taskContext) throws IOException { |
| for (String alias : outputCommitters.keySet()) { |
| BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); |
| OutputCommitter baseCommitter = outputContainer.getBaseCommitter(); |
| TaskAttemptContext committerContext = outputContainer.getContext(); |
| if (baseCommitter.needsTaskCommit(committerContext)) { |
| LOGGER.info("Calling commitTask for alias: " + alias); |
| baseCommitter.commitTask(committerContext); |
| } |
| } |
| } |
| |
| @Override |
| public void abortTask(TaskAttemptContext taskContext) throws IOException { |
| for (String alias : outputCommitters.keySet()) { |
| LOGGER.info("Calling abortTask for alias: " + alias); |
| BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); |
| outputContainer.getBaseCommitter().abortTask(outputContainer.getContext()); |
| } |
| } |
| |
| @Override |
| public void commitJob(JobContext jobContext) throws IOException { |
| for (String alias : outputCommitters.keySet()) { |
| LOGGER.info("Calling commitJob for alias: " + alias); |
| BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); |
| outputContainer.getBaseCommitter().commitJob(outputContainer.getContext()); |
| } |
| } |
| |
| @Override |
| public void abortJob(JobContext jobContext, State state) throws IOException { |
| for (String alias : outputCommitters.keySet()) { |
| LOGGER.info("Calling abortJob for alias: " + alias); |
| BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias); |
| outputContainer.getBaseCommitter().abortJob(outputContainer.getContext(), state); |
| } |
| } |
| } |
| |
| private static class BaseOutputCommitterContainer { |
| |
| private final OutputCommitter outputCommitter; |
| private final TaskAttemptContext context; |
| |
| public BaseOutputCommitterContainer(OutputCommitter outputCommitter, |
| TaskAttemptContext context) { |
| this.outputCommitter = outputCommitter; |
| this.context = context; |
| } |
| |
| public OutputCommitter getBaseCommitter() { |
| return outputCommitter; |
| } |
| |
| public TaskAttemptContext getContext() { |
| return context; |
| } |
| } |
| |
| } |