blob: 5ca47fb2afa8aa7b6616b60b957fd5ed97024105 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
/**
* The job submitter's view of the Job. It allows the user to configure the
* job, submit it, control its execution, and query the state. The set methods
* only work until the job is submitted, afterwards they will throw an
* IllegalStateException.
*/
public class Job extends JobContext {
public static enum JobState {DEFINE, RUNNING};
private JobState state = JobState.DEFINE;
private JobClient jobClient;
private RunningJob info;
/**
* Creates a new {@link Job}
* A Job will be created with a generic {@link Configuration}.
*
* @return the {@link Job}
* @throws IOException
*/
public static Job getInstance() throws IOException {
// create with a null Cluster
return getInstance(new Configuration());
}
/**
* Creates a new {@link Job} with a given {@link Configuration}.
*
* The <code>Job</code> makes a copy of the <code>Configuration</code> so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param conf the {@link Configuration}
* @return the {@link Job}
* @throws IOException
*/
public static Job getInstance(Configuration conf) throws IOException {
// create with a null Cluster
JobConf jobConf = new JobConf(conf);
return new Job(jobConf);
}
/**
* Creates a new {@link Job} with a given {@link Configuration}
* and a given jobName.
*
* The <code>Job</code> makes a copy of the <code>Configuration</code> so
* that any necessary internal modifications do not reflect on the incoming
* parameter.
*
* @param conf the {@link Configuration}
* @param jobName the job instance's name
* @return the {@link Job}
* @throws IOException
*/
public static Job getInstance(Configuration conf, String jobName)
throws IOException {
// create with a null Cluster
Job result = getInstance(conf);
result.setJobName(jobName);
return result;
}
public Job() throws IOException {
this(new Configuration());
}
public Job(Configuration conf) throws IOException {
super(conf, null);
}
public Job(Configuration conf, String jobName) throws IOException {
this(conf);
setJobName(jobName);
}
JobClient getJobClient() {
return jobClient;
}
private void ensureState(JobState state) throws IllegalStateException {
if (state != this.state) {
throw new IllegalStateException("Job in state "+ this.state +
" instead of " + state);
}
if (state == JobState.RUNNING && jobClient == null) {
throw new IllegalStateException("Job in state " + JobState.RUNNING +
" however jobClient is not initialized!");
}
}
/**
* Set the number of reduce tasks for the job.
* @param tasks the number of reduce tasks
* @throws IllegalStateException if the job is submitted
*/
public void setNumReduceTasks(int tasks) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setNumReduceTasks(tasks);
}
/**
* Set the current working directory for the default file system.
*
* @param dir the new current working directory.
* @throws IllegalStateException if the job is submitted
*/
public void setWorkingDirectory(Path dir) throws IOException {
ensureState(JobState.DEFINE);
conf.setWorkingDirectory(dir);
}
/**
* Set the {@link InputFormat} for the job.
* @param cls the <code>InputFormat</code> to use
* @throws IllegalStateException if the job is submitted
*/
public void setInputFormatClass(Class<? extends InputFormat> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
}
/**
* Set the {@link OutputFormat} for the job.
* @param cls the <code>OutputFormat</code> to use
* @throws IllegalStateException if the job is submitted
*/
public void setOutputFormatClass(Class<? extends OutputFormat> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);
}
/**
* Set the {@link Mapper} for the job.
* @param cls the <code>Mapper</code> to use
* @throws IllegalStateException if the job is submitted
*/
public void setMapperClass(Class<? extends Mapper> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
}
/**
* Set the Jar by finding where a given class came from.
* @param cls the example class
*/
public void setJarByClass(Class<?> cls) {
conf.setJarByClass(cls);
}
/**
* Get the pathname of the job's jar.
* @return the pathname
*/
public String getJar() {
return conf.getJar();
}
/**
* Set the combiner class for the job.
* @param cls the combiner to use
* @throws IllegalStateException if the job is submitted
*/
public void setCombinerClass(Class<? extends Reducer> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
}
/**
* Set the {@link Reducer} for the job.
* @param cls the <code>Reducer</code> to use
* @throws IllegalStateException if the job is submitted
*/
public void setReducerClass(Class<? extends Reducer> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
}
/**
* Set the {@link Partitioner} for the job.
* @param cls the <code>Partitioner</code> to use
* @throws IllegalStateException if the job is submitted
*/
public void setPartitionerClass(Class<? extends Partitioner> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(PARTITIONER_CLASS_ATTR, cls, Partitioner.class);
}
/**
* Set the key class for the map output data. This allows the user to
* specify the map output key class to be different than the final output
* value class.
*
* @param theClass the map output key class.
* @throws IllegalStateException if the job is submitted
*/
public void setMapOutputKeyClass(Class<?> theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setMapOutputKeyClass(theClass);
}
/**
* Set the value class for the map output data. This allows the user to
* specify the map output value class to be different than the final output
* value class.
*
* @param theClass the map output value class.
* @throws IllegalStateException if the job is submitted
*/
public void setMapOutputValueClass(Class<?> theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setMapOutputValueClass(theClass);
}
/**
* Set the key class for the job output data.
*
* @param theClass the key class for the job output data.
* @throws IllegalStateException if the job is submitted
*/
public void setOutputKeyClass(Class<?> theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setOutputKeyClass(theClass);
}
/**
* Set the value class for job outputs.
*
* @param theClass the value class for job outputs.
* @throws IllegalStateException if the job is submitted
*/
public void setOutputValueClass(Class<?> theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setOutputValueClass(theClass);
}
/**
* Define the comparator that controls how the keys are sorted before they
* are passed to the {@link Reducer}.
* @param cls the raw comparator
* @throws IllegalStateException if the job is submitted
*/
public void setSortComparatorClass(Class<? extends RawComparator> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setOutputKeyComparatorClass(cls);
}
/**
* Define the comparator that controls which keys are grouped together
* for a single call to
* {@link Reducer#reduce(Object, Iterable,
* org.apache.hadoop.mapreduce.Reducer.Context)}
* @param cls the raw comparator to use
* @throws IllegalStateException if the job is submitted
*/
public void setGroupingComparatorClass(Class<? extends RawComparator> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setOutputValueGroupingComparator(cls);
}
/**
* Set the user-specified job name.
*
* @param name the job's new name.
* @throws IllegalStateException if the job is submitted
*/
public void setJobName(String name) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setJobName(name);
}
/**
* Turn speculative execution on or off for this job.
*
* @param speculativeExecution <code>true</code> if speculative execution
* should be turned on, else <code>false</code>.
*/
public void setSpeculativeExecution(boolean speculativeExecution) {
ensureState(JobState.DEFINE);
conf.setSpeculativeExecution(speculativeExecution);
}
/**
* Turn speculative execution on or off for this job for map tasks.
*
* @param speculativeExecution <code>true</code> if speculative execution
* should be turned on for map tasks,
* else <code>false</code>.
*/
public void setMapSpeculativeExecution(boolean speculativeExecution) {
ensureState(JobState.DEFINE);
conf.setMapSpeculativeExecution(speculativeExecution);
}
/**
* Turn speculative execution on or off for this job for reduce tasks.
*
* @param speculativeExecution <code>true</code> if speculative execution
* should be turned on for reduce tasks,
* else <code>false</code>.
*/
public void setReduceSpeculativeExecution(boolean speculativeExecution) {
ensureState(JobState.DEFINE);
conf.setReduceSpeculativeExecution(speculativeExecution);
}
/**
* Get the URL where some job progress information will be displayed.
*
* @return the URL where some job progress information will be displayed.
*/
public String getTrackingURL() {
ensureState(JobState.RUNNING);
return info.getTrackingURL();
}
/**
* Get the <i>progress</i> of the job's setup, as a float between 0.0
* and 1.0. When the job setup is completed, the function returns 1.0.
*
* @return the progress of the job's setup.
* @throws IOException
*/
public float setupProgress() throws IOException {
ensureState(JobState.RUNNING);
return info.setupProgress();
}
/**
* Get the <i>progress</i> of the job's map-tasks, as a float between 0.0
* and 1.0. When all map tasks have completed, the function returns 1.0.
*
* @return the progress of the job's map-tasks.
* @throws IOException
*/
public float mapProgress() throws IOException {
ensureState(JobState.RUNNING);
return info.mapProgress();
}
/**
* Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0
* and 1.0. When all reduce tasks have completed, the function returns 1.0.
*
* @return the progress of the job's reduce-tasks.
* @throws IOException
*/
public float reduceProgress() throws IOException {
ensureState(JobState.RUNNING);
return info.reduceProgress();
}
/**
* Check if the job is finished or not.
* This is a non-blocking call.
*
* @return <code>true</code> if the job is complete, else <code>false</code>.
* @throws IOException
*/
public boolean isComplete() throws IOException {
ensureState(JobState.RUNNING);
return info.isComplete();
}
/**
* Check if the job completed successfully.
*
* @return <code>true</code> if the job succeeded, else <code>false</code>.
* @throws IOException
*/
public boolean isSuccessful() throws IOException {
ensureState(JobState.RUNNING);
return info.isSuccessful();
}
/**
* Kill the running job. Blocks until all job tasks have been
* killed as well. If the job is no longer running, it simply returns.
*
* @throws IOException
*/
public void killJob() throws IOException {
ensureState(JobState.RUNNING);
info.killJob();
}
/**
* Get events indicating completion (success/failure) of component tasks.
*
* @param startFrom index to start fetching events from
* @return an array of {@link TaskCompletionEvent}s
* @throws IOException
*/
public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom
) throws IOException {
ensureState(JobState.RUNNING);
return info.getTaskCompletionEvents(startFrom);
}
/**
* Kill indicated task attempt.
*
* @param taskId the id of the task to be terminated.
* @throws IOException
*/
public void killTask(TaskAttemptID taskId) throws IOException {
ensureState(JobState.RUNNING);
info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId),
false);
}
/**
* Fail indicated task attempt.
*
* @param taskId the id of the task to be terminated.
* @throws IOException
*/
public void failTask(TaskAttemptID taskId) throws IOException {
ensureState(JobState.RUNNING);
info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId),
true);
}
/**
* Gets the counters for this job.
*
* @return the counters for this job.
* @throws IOException
*/
public Counters getCounters() throws IOException {
ensureState(JobState.RUNNING);
return new Counters(info.getCounters());
}
private void ensureNotSet(String attr, String msg) throws IOException {
if (conf.get(attr) != null) {
throw new IOException(attr + " is incompatible with " + msg + " mode.");
}
}
/**
* Sets the flag that will allow the JobTracker to cancel the HDFS delegation
* tokens upon job completion. Defaults to true.
*/
public void setCancelDelegationTokenUponJobCompletion(boolean value) {
ensureState(JobState.DEFINE);
conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value);
}
/**
* Default to the new APIs unless they are explicitly set or the old mapper or
* reduce attributes are used.
* @throws IOException if the configuration is inconsistant
*/
private void setUseNewAPI() throws IOException {
int numReduces = conf.getNumReduceTasks();
String oldMapperClass = "mapred.mapper.class";
String oldReduceClass = "mapred.reducer.class";
conf.setBooleanIfUnset("mapred.mapper.new-api",
conf.get(oldMapperClass) == null);
if (conf.getUseNewMapper()) {
String mode = "new map API";
ensureNotSet("mapred.input.format.class", mode);
ensureNotSet(oldMapperClass, mode);
if (numReduces != 0) {
ensureNotSet("mapred.partitioner.class", mode);
} else {
ensureNotSet("mapred.output.format.class", mode);
}
} else {
String mode = "map compatability";
ensureNotSet(JobContext.INPUT_FORMAT_CLASS_ATTR, mode);
ensureNotSet(JobContext.MAP_CLASS_ATTR, mode);
if (numReduces != 0) {
ensureNotSet(JobContext.PARTITIONER_CLASS_ATTR, mode);
} else {
ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
}
}
conf.setBooleanIfUnset("mapred.reducer.new-api",
conf.get(oldReduceClass) == null);
if (numReduces != 0) {
if (conf.getUseNewReducer()) {
String mode = "new reduce API";
ensureNotSet("mapred.output.format.class", mode);
ensureNotSet(oldReduceClass, mode);
} else {
String mode = "reduce compatability";
ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
ensureNotSet(JobContext.REDUCE_CLASS_ATTR, mode);
}
}
}
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit() throws IOException, InterruptedException,
ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// Connect to the JobTracker and submit the job
connect();
info = jobClient.submitJobInternal(conf);
super.setJobID(info.getID());
state = JobState.RUNNING;
}
/**
* Open a connection to the JobTracker
* @throws IOException
* @throws InterruptedException
*/
private void connect() throws IOException, InterruptedException {
ugi.doAs(new PrivilegedExceptionAction<Object>() {
public Object run() throws IOException {
jobClient = new JobClient((JobConf) getConfiguration());
return null;
}
});
}
/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
jobClient.monitorAndPrintJob(conf, info);
} else {
info.waitForCompletion();
}
return isSuccessful();
}
}