blob: 37cf0f788fb069c2549f207c81da3f70737feb8d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.mapred;
import java.util.Collection;
import java.net.URLEncoder;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.avro.Schema;
/** Setters to configure jobs for Avro data. */
public class AvroJob {
private AvroJob() {} // no public ctor
static final String MAPPER = "avro.mapper";
static final String COMBINER = "avro.combiner";
static final String REDUCER = "avro.reducer";
/** The configuration key for a job's input schema. */
public static final String INPUT_SCHEMA = "avro.input.schema";
/** The configuration key for a job's intermediate schema. */
public static final String MAP_OUTPUT_SCHEMA = "avro.map.output.schema";
/** The configuration key for a job's output schema. */
public static final String OUTPUT_SCHEMA = "avro.output.schema";
/** The configuration key prefix for a text output metadata. */
public static final String TEXT_PREFIX = "avro.meta.text.";
/** The configuration key prefix for a binary output metadata. */
public static final String BINARY_PREFIX = "avro.meta.binary.";
/** The configuration key for reflection-based input representation. */
public static final String INPUT_IS_REFLECT = "avro.input.is.reflect";
/** The configuration key for reflection-based map output representation. */
public static final String MAP_OUTPUT_IS_REFLECT = "avro.map.output.is.reflect";
/** Configure a job's map input schema. */
public static void setInputSchema(JobConf job, Schema s) {
job.set(INPUT_SCHEMA, s.toString());
configureAvroInput(job);
}
/** Return a job's map input schema. */
public static Schema getInputSchema(Configuration job) {
return Schema.parse(job.get(INPUT_SCHEMA));
}
/** Configure a job's map output schema. The map output schema defaults to
* the output schema and need only be specified when it differs. Thus must
* be a {@link Pair} schema. */
public static void setMapOutputSchema(JobConf job, Schema s) {
job.set(MAP_OUTPUT_SCHEMA, s.toString());
configureAvroShuffle(job);
}
/** Return a job's map output key schema. */
public static Schema getMapOutputSchema(Configuration job) {
return Schema.parse(job.get(MAP_OUTPUT_SCHEMA, job.get(OUTPUT_SCHEMA)));
}
/** Configure a job's output schema. Unless this is a map-only job, this
* must be a {@link Pair} schema. */
public static void setOutputSchema(JobConf job, Schema s) {
job.set(OUTPUT_SCHEMA, s.toString());
configureAvroOutput(job);
}
/** Add metadata to job output files.*/
public static void setOutputMeta(JobConf job, String key, String value) {
job.set(TEXT_PREFIX+key, value);
}
/** Add metadata to job output files.*/
public static void setOutputMeta(JobConf job, String key, long value) {
job.set(TEXT_PREFIX+key, Long.toString(value));
}
/** Add metadata to job output files.*/
public static void setOutputMeta(JobConf job, String key, byte[] value) {
try {
job.set(BINARY_PREFIX+key,
URLEncoder.encode(new String(value, "ISO-8859-1"),
"ISO-8859-1"));
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
/** Indicate that a job's input files are in SequenceFile format.*/
public static void setInputSequenceFile(JobConf job) {
job.setInputFormat(SequenceFileInputFormat.class);
}
/** Indicate that all a job's data should use the reflect representation.*/
public static void setReflect(JobConf job) {
setInputReflect(job);
setMapOutputReflect(job);
}
/** Indicate that a job's input data should use reflect representation.*/
public static void setInputReflect(JobConf job) {
job.setBoolean(INPUT_IS_REFLECT, true);
}
/** Indicate that a job's map output data should use reflect representation.*/
public static void setMapOutputReflect(JobConf job) {
job.setBoolean(MAP_OUTPUT_IS_REFLECT, true);
}
/** Return a job's output key schema. */
public static Schema getOutputSchema(Configuration job) {
return Schema.parse(job.get(OUTPUT_SCHEMA));
}
private static void configureAvroInput(JobConf job) {
if (job.get("mapred.input.format.class") == null)
job.setInputFormat(AvroInputFormat.class);
if (job.getMapperClass() == IdentityMapper.class)
job.setMapperClass(HadoopMapper.class);
configureAvroShuffle(job);
}
private static void configureAvroOutput(JobConf job) {
if (job.get("mapred.output.format.class") == null)
job.setOutputFormat(AvroOutputFormat.class);
if (job.getReducerClass() == IdentityReducer.class)
job.setReducerClass(HadoopReducer.class);
job.setOutputKeyClass(AvroWrapper.class);
configureAvroShuffle(job);
}
private static void configureAvroShuffle(JobConf job) {
job.setOutputKeyComparatorClass(AvroKeyComparator.class);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
// add AvroSerialization to io.serializations
Collection<String> serializations =
job.getStringCollection("io.serializations");
if (!serializations.contains(AvroSerialization.class.getName())) {
serializations.add(AvroSerialization.class.getName());
job.setStrings("io.serializations",
serializations.toArray(new String[0]));
}
}
/** Configure a job's mapper implementation. */
public static void setMapperClass(JobConf job,
Class<? extends AvroMapper> c) {
job.set(MAPPER, c.getName());
}
/** Configure a job's combiner implementation. */
public static void setCombinerClass(JobConf job,
Class<? extends AvroReducer> c) {
job.set(COMBINER, c.getName());
job.setCombinerClass(HadoopCombiner.class);
}
/** Configure a job's reducer implementation. */
public static void setReducerClass(JobConf job,
Class<? extends AvroReducer> c) {
job.set(REDUCER, c.getName());
}
}