blob: 2e8dc8df699a6df78d6c8d1abbe34df49d51cf76 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.io;
import com.google.common.collect.Sets;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances
* writing to multiple files within a single MapReduce job.
*/
public class CrunchOutputs<K, V> {
public static final String CRUNCH_OUTPUTS = "crunch.outputs.dir";
public static final String CRUNCH_DISABLE_OUTPUT_COUNTERS = "crunch.disable.output.counters";
private static final char RECORD_SEP = ',';
private static final char FIELD_SEP = ';';
private static final Joiner JOINER = Joiner.on(FIELD_SEP);
private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
public static void addNamedOutput(Job job, String name,
Class<? extends OutputFormat> outputFormatClass,
Class keyClass, Class valueClass) {
addNamedOutput(job, name, FormatBundle.forOutput(outputFormatClass), keyClass, valueClass);
}
public static void addNamedOutput(Job job, String name,
FormatBundle<? extends OutputFormat> outputBundle,
Class keyClass, Class valueClass) {
Configuration conf = job.getConfiguration();
String inputs = JOINER.join(name, outputBundle.serialize(), keyClass.getName(), valueClass.getName());
String existing = conf.get(CRUNCH_OUTPUTS);
conf.set(CRUNCH_OUTPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
}
public static void checkOutputSpecs(JobContext jc) throws IOException, InterruptedException {
Map<String, OutputConfig> outputs = getNamedOutputs(jc.getConfiguration());
for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) {
String namedOutput = e.getKey();
Job job = getJob(jc.getJobID(), e.getKey(), jc.getConfiguration());
OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
fmt.checkOutputSpecs(job);
}
}
public static OutputCommitter getOutputCommitter(TaskAttemptContext tac) throws IOException, InterruptedException {
Map<String, OutputConfig> outputs = getNamedOutputs(tac.getConfiguration());
Map<String, OutputCommitter> committers = Maps.newHashMap();
for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) {
String namedOutput = e.getKey();
Job job = getJob(tac.getJobID(), e.getKey(), tac.getConfiguration());
OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue());
TaskAttemptContext taskContext = getTaskContext(tac, job);
OutputCommitter oc = fmt.getOutputCommitter(taskContext);
committers.put(namedOutput, oc);
}
return new CompositeOutputCommitter(outputs, committers);
}
public static class OutputConfig<K, V> {
public FormatBundle<OutputFormat<K, V>> bundle;
public Class<K> keyClass;
public Class<V> valueClass;
public OutputConfig(FormatBundle<OutputFormat<K, V>> bundle,
Class<K> keyClass, Class<V> valueClass) {
this.bundle = bundle;
this.keyClass = keyClass;
this.valueClass = valueClass;
}
}
private static Map<String, OutputConfig> getNamedOutputs(
TaskInputOutputContext<?, ?, ?, ?> context) {
return getNamedOutputs(context.getConfiguration());
}
public static Map<String, OutputConfig> getNamedOutputs(Configuration conf) {
Map<String, OutputConfig> out = Maps.newHashMap();
String serOut = conf.get(CRUNCH_OUTPUTS);
if (serOut == null || serOut.isEmpty()) {
return out;
}
for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) {
List<String> fields = Lists.newArrayList(SPLITTER.split(input));
String name = fields.get(0);
FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1), conf);
try {
Class<?> keyClass = Class.forName(fields.get(2));
Class<?> valueClass = Class.forName(fields.get(3));
out.put(name, new OutputConfig(bundle, keyClass, valueClass));
} catch (ClassNotFoundException e) {
throw new CrunchRuntimeException(e);
}
}
return out;
}
private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
private static final String COUNTERS_GROUP = CrunchOutputs.class.getName();
private TaskInputOutputContext<?, ?, K, V> baseContext;
private Configuration baseConf;
private final Map<String, OutputConfig> namedOutputs;
private final Map<String, OutputState<K, V>> outputStates;
private final boolean disableOutputCounters;
/**
* Creates and initializes multiple outputs support,
* it should be instantiated in the Mapper/Reducer setup method.
*
* @param context the TaskInputOutputContext object
*/
public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> context) {
this(context.getConfiguration());
this.baseContext = context;
}
public CrunchOutputs(Configuration conf) {
this.baseConf = conf;
this.namedOutputs = getNamedOutputs(conf);
this.outputStates = Maps.newHashMap();
this.disableOutputCounters = conf.getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false);
}
@SuppressWarnings("unchecked")
public void write(String namedOutput, K key, V value)
throws IOException, InterruptedException {
if (!namedOutputs.containsKey(namedOutput)) {
throw new IllegalArgumentException("Undefined named output '" +
namedOutput + "'");
}
if (!disableOutputCounters) {
baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1);
}
getOutputState(namedOutput).write(key, value);
}
public void close() throws IOException, InterruptedException {
for (OutputState<?, ?> out : outputStates.values()) {
out.close();
}
}
private OutputState<K, V> getOutputState(String namedOutput) throws IOException, InterruptedException {
OutputState<?, ?> out = outputStates.get(namedOutput);
if (out != null) {
return (OutputState<K, V>) out;
}
// The following trick leverages the instantiation of a record writer via
// the job thus supporting arbitrary output formats.
Job job = getJob(baseContext.getJobID(), namedOutput, baseConf);
// Get a job with the expected named output.
job = getJob(job.getJobID(), namedOutput,baseConf);
OutputFormat<K, V> fmt = getOutputFormat(namedOutput, job, namedOutputs.get(namedOutput));
TaskAttemptContext taskContext = getTaskContext(baseContext, job);
RecordWriter<K, V> recordWriter = fmt.getRecordWriter(taskContext);
OutputState<K, V> outputState = new OutputState(taskContext, recordWriter);
this.outputStates.put(namedOutput, outputState);
return outputState;
}
private static Job getJob(JobID jobID, String namedOutput, Configuration baseConf)
throws IOException {
Job job = new Job(new JobConf(baseConf));
job.getConfiguration().set("crunch.namedoutput", namedOutput);
setJobID(job, jobID, namedOutput);
return job;
}
private static TaskAttemptContext getTaskContext(TaskAttemptContext baseContext, Job job) {
org.apache.hadoop.mapreduce.TaskAttemptID baseTaskId = baseContext.getTaskAttemptID();
// Create a task ID context with our specialized job ID.
org.apache.hadoop.mapreduce.TaskAttemptID taskId;
taskId = new org.apache.hadoop.mapreduce.TaskAttemptID(job.getJobID().getJtIdentifier(),
job.getJobID().getId(),
baseTaskId.isMap(),
baseTaskId.getTaskID().getId(),
baseTaskId.getId());
return new TaskAttemptContextWrapper(baseContext, job.getConfiguration(), taskId);
}
private static void setJobID(Job job, JobID jobID, String namedOutput) {
JobID newJobID = jobID == null || jobID.getJtIdentifier().contains(namedOutput) ?
jobID :
new JobID(jobID.getJtIdentifier() + "_" + namedOutput, jobID.getId());
job.setJobID(newJobID);
}
private static void configureJob(
String namedOutput,
Job job,
OutputConfig outConfig) throws IOException {
job.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
job.setOutputFormatClass(outConfig.bundle.getFormatClass());
job.setOutputKeyClass(outConfig.keyClass);
job.setOutputValueClass(outConfig.valueClass);
outConfig.bundle.configure(job.getConfiguration());
}
private static OutputFormat getOutputFormat(
String namedOutput,
Job job,
OutputConfig outConfig) throws IOException {
configureJob(namedOutput, job, outConfig);
try {
return ReflectionUtils.newInstance(
job.getOutputFormatClass(),
job.getConfiguration());
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
}
private static class OutputState<K, V> {
private final TaskAttemptContext context;
private final RecordWriter<K, V> recordWriter;
public OutputState(TaskAttemptContext context, RecordWriter<K, V> recordWriter) {
this.context = context;
this.recordWriter = recordWriter;
}
public void write(K key, V value) throws IOException, InterruptedException {
recordWriter.write(key, value);
}
public void close() throws IOException, InterruptedException {
recordWriter.close(context);
}
}
private static class CompositeOutputCommitter extends OutputCommitter {
private final Map<String, OutputConfig> outputs;
private final Map<String, OutputCommitter> committers;
public CompositeOutputCommitter(Map<String, OutputConfig> outputs, Map<String, OutputCommitter> committers) {
this.outputs = outputs;
this.committers = committers;
}
private TaskAttemptContext getContext(String namedOutput, TaskAttemptContext baseContext) throws IOException {
Job job = getJob(baseContext.getJobID(), namedOutput, baseContext.getConfiguration());
configureJob(namedOutput, job, outputs.get(namedOutput));
return getTaskContext(baseContext, job);
}
@Override
public void setupJob(JobContext jobContext) throws IOException {
Configuration conf = jobContext.getConfiguration();
for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
Job job = getJob(jobContext.getJobID(), e.getKey(), conf);
configureJob(e.getKey(), job, outputs.get(e.getKey()));
e.getValue().setupJob(job);
}
}
@Override
public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
e.getValue().setupTask(getContext(e.getKey(), taskAttemptContext));
}
}
@Override
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
if (e.getValue().needsTaskCommit(getContext(e.getKey(), taskAttemptContext))) {
return true;
}
}
return false;
}
@Override
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
e.getValue().commitTask(getContext(e.getKey(), taskAttemptContext));
}
}
@Override
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
e.getValue().abortTask(getContext(e.getKey(), taskAttemptContext));
}
}
@Override
public void commitJob(JobContext jobContext) throws IOException {
Configuration conf = jobContext.getConfiguration();
Set<Path> handledPaths = Sets.newHashSet();
for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
OutputCommitter oc = e.getValue();
Job job = getJob(jobContext.getJobID(), e.getKey(), conf);
configureJob(e.getKey(), job, outputs.get(e.getKey()));
if (oc instanceof FileOutputCommitter) {
Path outputPath = ((FileOutputCommitter) oc).getWorkPath().getParent();
if (handledPaths.contains(outputPath)) {
continue;
} else {
handledPaths.add(outputPath);
}
}
oc.commitJob(job);
}
}
@Override
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
Configuration conf = jobContext.getConfiguration();
for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) {
Job job = getJob(jobContext.getJobID(), e.getKey(), conf);
configureJob(e.getKey(), job, outputs.get(e.getKey()));
e.getValue().abortJob(job, state);
}
}
}
private static class TaskAttemptContextWrapper extends TaskAttemptContextImpl {
private final TaskAttemptContext baseContext;
public TaskAttemptContextWrapper(TaskAttemptContext baseContext, Configuration config, TaskAttemptID taskId){
super(config, taskId);
this.baseContext = baseContext;
}
@Override
public Counter getCounter(Enum<?> counterName) {
return baseContext.getCounter(counterName);
}
@Override
public Counter getCounter(String groupName, String counterName) {
return baseContext.getCounter(groupName, counterName);
}
}
}