blob: 8489f83f4e860f4921bb6e0e5e042f2c7cb33154 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;
import java.io.IOException;
import java.net.URL;
import java.net.URLDecoder;
import java.util.Enumeration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
import org.apache.hama.bsp.message.queue.MessageQueue;
/**
* A BSP job configuration.
*
* BSPJob is the primary interface for a user to describe a BSP job to the Hama
* BSP framework for execution.
*/
public class BSPJob extends BSPJobContext {
public static enum JobState {
DEFINE, RUNNING
}
private JobState state = JobState.DEFINE;
private BSPJobClient jobClient;
private RunningJob info;
public BSPJob() throws IOException {
this(new HamaConfiguration());
}
public BSPJob(HamaConfiguration conf) throws IOException {
super(conf, null);
jobClient = new BSPJobClient(conf);
}
public BSPJob(HamaConfiguration conf, BSPJobID jobID) throws IOException {
super(conf, jobID);
}
public BSPJob(HamaConfiguration conf, String jobName) throws IOException {
this(conf);
setJobName(jobName);
}
public BSPJob(BSPJobID jobID, String jobFile) throws IOException {
super(new Path(jobFile), jobID);
}
public BSPJob(HamaConfiguration conf, Class<?> exampleClass)
throws IOException {
this(conf);
setJarByClass(exampleClass);
}
public BSPJob(HamaConfiguration conf, int numPeer) {
super(conf, null);
this.setNumBspTask(numPeer);
}
public void ensureState(JobState state) throws IllegalStateException {
if (state != this.state) {
throw new IllegalStateException("Job in state " + this.state
+ " instead of " + state);
}
}
// /////////////////////////////////////
// Setter for Job Submission
// /////////////////////////////////////
public void setWorkingDirectory(Path pDir) {
ensureState(JobState.DEFINE);
Path dir = new Path(getWorkingDirectory(), pDir);
conf.set(WORKING_DIR, dir.toString());
}
/**
* Set the BSP algorithm class for the job.
*
* @param cls the class implementing the BSP job
* @throws IllegalStateException
*/
@SuppressWarnings("rawtypes")
public void setBspClass(Class<? extends BSP> cls)
throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(WORK_CLASS_ATTR, cls, BSP.class);
}
@SuppressWarnings("rawtypes")
public void setSupersteps(Class<? extends Superstep>... classes) {
ensureState(JobState.DEFINE);
String clazzString = "";
for (Class<? extends Superstep> s : classes)
clazzString += s.getName() + ",";
conf.set("hama.supersteps.class", clazzString);
this.setBspClass(SuperstepBSP.class);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public Class<? extends BSP> getBspClass() {
return (Class<? extends BSP>) conf.getClass(WORK_CLASS_ATTR, BSP.class);
}
public void setCombinerClass(Class<? extends Combiner<? extends Writable>> cls) {
ensureState(JobState.DEFINE);
conf.setClass(Constants.COMBINER_CLASS, cls, Combiner.class);
}
@SuppressWarnings("unchecked")
public Class<? extends Combiner<? extends Writable>> getCombinerClass() {
return (Class<? extends Combiner<? extends Writable>>) conf.getClass(
Constants.COMBINER_CLASS, Combiner.class);
}
public void setJar(String jar) {
conf.set("bsp.jar", jar);
}
public void setJarByClass(Class<?> cls) {
String jar = findContainingJar(cls);
if (jar != null) {
conf.set("bsp.jar", jar);
}
}
private static String findContainingJar(Class<?> my_class) {
ClassLoader loader = my_class.getClassLoader();
String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
try {
for (Enumeration<URL> itr = loader.getResources(class_file); itr
.hasMoreElements();) {
URL url = itr.nextElement();
if ("jar".equals(url.getProtocol())) {
String toReturn = url.getPath();
if (toReturn.startsWith("file:")) {
toReturn = toReturn.substring("file:".length());
}
toReturn = URLDecoder.decode(toReturn, "UTF-8");
return toReturn.replaceAll("!.*$", "");
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
}
public void setJobName(String name) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.set("bsp.job.name", name);
}
public void setUser(String user) {
conf.set("user.name", user);
}
// /////////////////////////////////////
// Methods for Job Control
// /////////////////////////////////////
public long progress() throws IOException {
ensureState(JobState.RUNNING);
return info.progress();
}
public Counters getCounters() throws IOException {
ensureState(JobState.RUNNING);
return info.getCounters();
}
public boolean isComplete() throws IOException {
ensureState(JobState.RUNNING);
return info.isComplete();
}
public boolean isSuccessful() throws IOException {
ensureState(JobState.RUNNING);
return info.isSuccessful();
}
public void killJob() throws IOException {
ensureState(JobState.RUNNING);
info.killJob();
}
public void killTask(TaskAttemptID taskId) throws IOException {
ensureState(JobState.RUNNING);
info.killTask(taskId, false);
}
public void failTask(TaskAttemptID taskId) throws IOException {
ensureState(JobState.RUNNING);
info.killTask(taskId, true);
}
public void submit() throws IOException, InterruptedException {
ensureState(JobState.DEFINE);
info = jobClient.submitJob(this);
state = JobState.RUNNING;
}
public boolean waitForCompletion(boolean verbose) throws IOException,
InterruptedException, ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
jobClient.monitorAndPrintJob(this, info);
} else {
info.waitForCompletion();
}
return isSuccessful();
}
// for the testcase
BSPJobClient getJobClient() {
return jobClient;
}
public void set(String name, String value) {
conf.set(name, value);
}
public void setBoolean(String name, boolean value) {
conf.setBoolean(name, value);
}
public boolean getBoolean(String name, boolean defaultValue) {
return conf.getBoolean(name, defaultValue);
}
public void setNumBspTask(int tasks) {
conf.setInt("bsp.peers.num", tasks);
}
public int getNumBspTask() {
// default is 1, because with zero, we will hang in infinity
return conf.getInt("bsp.peers.num", 1);
}
@SuppressWarnings({ "rawtypes" })
public InputFormat getInputFormat() {
return ReflectionUtils.newInstance(
conf.getClass(Constants.INPUT_FORMAT_CLASS, TextInputFormat.class,
InputFormat.class), conf);
}
@SuppressWarnings({ "rawtypes" })
public void setInputFormat(Class<? extends InputFormat> cls) {
conf.setClass(Constants.INPUT_FORMAT_CLASS, cls, InputFormat.class);
}
/**
* Sets the compression codec that should be used to compress messages.
*/
@SuppressWarnings({ "rawtypes" })
public void setCompressionCodec(Class<? extends BSPMessageCompressor> clazz) {
conf.setClass(BSPMessageCompressorFactory.COMPRESSION_CODEC_CLASS, clazz,
BSPMessageCompressor.class);
}
/**
* Get the key class for the job input data.
*
* @return the key class for the job input data.
*/
public Class<?> getInputKeyClass() {
return conf.getClass("bsp.input.key.class", LongWritable.class,
Object.class);
}
/**
* Set the key class for the job input data.
*
* @param theClass the key class for the job input data.
*/
public void setInputKeyClass(Class<?> theClass) {
conf.setClass("bsp.input.key.class", theClass, Object.class);
}
/**
* Get the value class for job input.
*
* @return the value class for job input.
*/
public Class<?> getInputValueClass() {
return conf.getClass("bsp.input.value.class", Text.class, Object.class);
}
/**
* Set the value class for job input.
*
* @param theClass the value class for job input.
*/
public void setInputValueClass(Class<?> theClass) {
conf.setClass("bsp.input.value.class", theClass, Object.class);
}
/**
* Get the key class for the job output data.
*
* @return the key class for the job output data.
*/
public Class<?> getOutputKeyClass() {
return conf.getClass("bsp.output.key.class", LongWritable.class,
Object.class);
}
/**
* Set the key class for the job output data.
*
* @param theClass the key class for the job output data.
*/
public void setOutputKeyClass(Class<?> theClass) {
conf.setClass("bsp.output.key.class", theClass, Object.class);
}
/**
* Get the value class for job outputs.
*
* @return the value class for job outputs.
*/
public Class<?> getOutputValueClass() {
return conf.getClass("bsp.output.value.class", Text.class, Object.class);
}
/**
* Set the value class for job outputs.
*
* @param theClass the value class for job outputs.
*/
public void setOutputValueClass(Class<?> theClass) {
conf.setClass("bsp.output.value.class", theClass, Object.class);
}
/**
* Get the message class.
*
* @return the message class.
*/
public Class<?> getMessageClass() {
return conf.getClass(Constants.MESSAGE_CLASS, Text.class, Object.class);
}
/**
* Set the message class.
*
* @param theClass the message class.
*/
public void setMessageClass(Class<?> theClass) {
conf.setClass(Constants.MESSAGE_CLASS, theClass, Object.class);
}
/**
* Sets the output path for the job.
*
* @param path where the output gets written.
*/
public void setOutputPath(Path path) {
conf.set("bsp.output.dir", path.toString());
}
/**
* Sets the input path for the job.
*
*/
public void setInputPath(Path path) {
conf.set("bsp.input.dir", path.toString());
}
/**
* Sets the output format for the job.
*/
@SuppressWarnings("rawtypes")
public void setOutputFormat(Class<? extends OutputFormat> theClass) {
conf.setClass(Constants.OUTPUT_FORMAT_CLASS, theClass, OutputFormat.class);
}
/**
* Sets the partitioner for the input of the job.
*/
@SuppressWarnings("rawtypes")
public void setPartitioner(Class<? extends Partitioner> theClass) {
conf.setClass(Constants.RUNTIME_PARTITIONING_CLASS, theClass,
Partitioner.class);
}
@SuppressWarnings("rawtypes")
public OutputFormat getOutputFormat() {
return ReflectionUtils.newInstance(conf.getClass(
Constants.OUTPUT_FORMAT_CLASS, TextOutputFormat.class,
OutputFormat.class), conf);
}
protected void setCheckPointInterval(int checkPointInterval) {
conf.setInt(Constants.CHECKPOINT_INTERVAL, checkPointInterval);
}
protected void setCheckPointFlag(boolean enableCheckPoint) {
conf.setBoolean(Constants.CHECKPOINT_ENABLED, enableCheckPoint);
}
public void setMessageQueueBehaviour(String queueBehaviour) {
if (queueBehaviour.equals(MessageQueue.PERSISTENT_QUEUE))
conf.setBoolean(MessageQueue.PERSISTENT_QUEUE, true);
}
}