blob: 1c16618b6810f05029d4100dab4fcb59de07e048 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.mapred.tether;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
/** Constructs and submits tether jobs. This may be used as an API-based
* method to launch tether jobs. */
@SuppressWarnings("deprecation")
public class TetherJob extends Configured {
public static final String TETHER_EXEC="avro.tether.executable";
public static final String TETHER_EXEC_ARGS="avro.tether.executable_args";
public static final String TETHER_EXEC_CACHED="avro.tether.executable_cached";
public static final String TETHER_PROTOCOL="avro.tether.protocol";
/** Get the URI of the application's executable. */
public static URI getExecutable(JobConf job) {
try {
return new URI(job.get("avro.tether.executable"));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
/** Set the URI for the application's executable. Normally this in HDFS. */
public static void setExecutable(JobConf job, File executable) {
setExecutable(job,executable, new ArrayList<String>(),false);
}
/**
* Set the URI for the application's executable (i.e the program to run in a subprocess
* and provides the mapper/reducer).
* @param job - Job
* @param executable - The URI of the executable
* @param args - List of additional arguments; Null if no arguments
* @param cached - If true, the executable URI is cached using DistributedCache
* - if false its not cached. I.e if the file is already stored on each local file system
* or if its on a NFS share
*/
public static void setExecutable(JobConf job, File executable, List<String> args, boolean cached) {
job.set(TETHER_EXEC, executable.toString());
if (args != null){
StringBuilder sb = new StringBuilder();
for (String a : args) {
sb.append(a);
sb.append('\n');
}
job.set(TETHER_EXEC_ARGS, sb.toString());
}
job.set(TETHER_EXEC_CACHED, (new Boolean(cached)).toString());
}
/**
* Extract from the job configuration file an instance of the TRANSPROTO enumeration
* to represent the protocol to use for the communication
* @param job
* @return
*/
public static TetheredProcess.Protocol getProtocol(JobConf job) {
if (job.get(TetherJob.TETHER_PROTOCOL)==null) {
return TetheredProcess.Protocol.NONE;
} else if (job.get(TetherJob.TETHER_PROTOCOL).equals("http")) {
return TetheredProcess.Protocol.HTTP;
} else if (job.get(TetherJob.TETHER_PROTOCOL).equals("sasl")) {
return TetheredProcess.Protocol.SASL;
} else {
throw new RuntimeException("Unknown value for protocol: " +job.get(TetherJob.TETHER_PROTOCOL));
}
}
/** Submit a job to the map/reduce cluster. All of the necessary
* modifications to the job to run under tether are made to the
* configuration.
*/
public static RunningJob runJob(JobConf job) throws IOException {
setupTetherJob(job);
return JobClient.runJob(job);
}
/** Submit a job to the Map-Reduce framework. */
public static RunningJob submitJob(JobConf conf) throws IOException {
setupTetherJob(conf);
return new JobClient(conf).submitJob(conf);
}
/**
* Determines which transport protocol (e.g http or sasl) used to communicate
* between the parent and subprocess
*
* @param job - job configuration
* @param proto - String identifying the protocol currently http or sasl
*/
public static void setProtocol(JobConf job, String proto) throws IOException {
proto=proto.trim().toLowerCase();
if (!(proto.equals("http") || proto.equals("sasl"))) {
throw new IOException("protocol must be 'http' or 'sasl'");
}
job.set(TETHER_PROTOCOL,proto);
}
private static void setupTetherJob(JobConf job) throws IOException {
job.setMapRunnerClass(TetherMapRunner.class);
job.setPartitionerClass(TetherPartitioner.class);
job.setReducerClass(TetherReducer.class);
job.setInputFormat(TetherInputFormat.class);
job.setOutputFormat(TetherOutputFormat.class);
job.setOutputKeyClass(TetherData.class);
job.setOutputKeyComparatorClass(TetherKeyComparator.class);
job.setMapOutputValueClass(NullWritable.class);
// set the map output key class to TetherData
job.setMapOutputKeyClass(TetherData.class);
// if protocol isn't set
if (job.getStrings(TETHER_PROTOCOL)==null) {
job.set(TETHER_PROTOCOL, "sasl");
}
// add TetherKeySerialization to io.serializations
Collection<String> serializations =
job.getStringCollection("io.serializations");
if (!serializations.contains(TetherKeySerialization.class.getName())) {
serializations.add(TetherKeySerialization.class.getName());
job.setStrings("io.serializations",
serializations.toArray(new String[0]));
}
// determine whether the executable should be added to the cache.
if (job.getBoolean(TETHER_EXEC_CACHED,false)){
DistributedCache.addCacheFile(getExecutable(job), job);
}
}
}