blob: 4d5be68dc4a701730d3d2b8827379a450d5c0adf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.jobcontrol;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.StringUtils;
/**
* This class encapsulates a MapReduce job and its dependency. It monitors
* the states of the depending jobs and updates the state of this job.
* A job starts in the WAITING state. If it does not have any depending jobs,
* or all of the depending jobs are in SUCCESS state, then the job state
* will become READY. If any depending jobs fail, the job will fail too.
* When in READY state, the job can be submitted to Hadoop for execution, with
* the state changing into RUNNING state. From RUNNING state, the job
* can get into SUCCESS or FAILED state, depending
* the status of the job execution.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class ControlledJob {
private static final Log LOG = LogFactory.getLog(ControlledJob.class);
// A job will be in one of the following states
public static enum State {SUCCESS, WAITING, RUNNING, READY, FAILED,
DEPENDENT_FAILED};
public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
private State state;
private String controlID; // assigned and used by JobControl class
private Job job; // mapreduce job to be executed.
// some info for human consumption, e.g. the reason why the job failed
private String message;
// the jobs the current job depends on
private List<ControlledJob> dependingJobs;
/**
* Construct a job.
* @param job a mapreduce job to be executed.
* @param dependingJobs an array of jobs the current job depends on
*/
public ControlledJob(Job job, List<ControlledJob> dependingJobs)
throws IOException {
this.job = job;
this.dependingJobs = dependingJobs;
this.state = State.WAITING;
this.controlID = "unassigned";
this.message = "just initialized";
}
/**
* Construct a job.
*
* @param conf mapred job configuration representing a job to be executed.
* @throws IOException
*/
public ControlledJob(Configuration conf) throws IOException {
this(Job.getInstance(conf), null);
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("job name:\t").append(this.job.getJobName()).append("\n");
sb.append("job id:\t").append(this.controlID).append("\n");
sb.append("job state:\t").append(this.state).append("\n");
sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n");
sb.append("job message:\t").append(this.message).append("\n");
if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
sb.append("job has no depending job:\t").append("\n");
} else {
sb.append("job has ").append(this.dependingJobs.size()).
append(" dependeng jobs:\n");
for (int i = 0; i < this.dependingJobs.size(); i++) {
sb.append("\t depending job ").append(i).append(":\t");
sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
}
}
return sb.toString();
}
/**
* @return the job name of this job
*/
public String getJobName() {
return job.getJobName();
}
/**
* Set the job name for this job.
* @param jobName the job name
*/
public void setJobName(String jobName) {
job.setJobName(jobName);
}
/**
* @return the job ID of this job assigned by JobControl
*/
public String getJobID() {
return this.controlID;
}
/**
* Set the job ID for this job.
* @param id the job ID
*/
public void setJobID(String id) {
this.controlID = id;
}
/**
* @return the mapred ID of this job as assigned by the mapred framework.
*/
public synchronized JobID getMapredJobId() {
return this.job.getJobID();
}
/**
* @return the mapreduce job
*/
public synchronized Job getJob() {
return this.job;
}
/**
* Set the mapreduce job
* @param job the mapreduce job for this job.
*/
public synchronized void setJob(Job job) {
this.job = job;
}
/**
* @return the state of this job
*/
public synchronized State getJobState() {
return this.state;
}
/**
* Set the state for this job.
* @param state the new state for this job.
*/
protected synchronized void setJobState(State state) {
this.state = state;
}
/**
* @return the message of this job
*/
public synchronized String getMessage() {
return this.message;
}
/**
* Set the message for this job.
* @param message the message for this job.
*/
public synchronized void setMessage(String message) {
this.message = message;
}
/**
* @return the depending jobs of this job
*/
public List<ControlledJob> getDependentJobs() {
return this.dependingJobs;
}
/**
* Add a job to this jobs' dependency list.
* Dependent jobs can only be added while a Job
* is waiting to run, not during or afterwards.
*
* @param dependingJob Job that this Job depends on.
* @return <tt>true</tt> if the Job was added.
*/
public synchronized boolean addDependingJob(ControlledJob dependingJob) {
if (this.state == State.WAITING) { //only allowed to add jobs when waiting
if (this.dependingJobs == null) {
this.dependingJobs = new ArrayList<ControlledJob>();
}
return this.dependingJobs.add(dependingJob);
} else {
return false;
}
}
/**
* @return true if this job is in a complete state
*/
public synchronized boolean isCompleted() {
return this.state == State.FAILED ||
this.state == State.DEPENDENT_FAILED ||
this.state == State.SUCCESS;
}
/**
* @return true if this job is in READY state
*/
public synchronized boolean isReady() {
return this.state == State.READY;
}
public void killJob() throws IOException, InterruptedException {
job.killJob();
}
public synchronized void failJob(String message) throws IOException, InterruptedException {
try {
if(job != null && this.state == State.RUNNING) {
job.killJob();
}
} finally {
this.state = State.FAILED;
this.message = message;
}
}
/**
* Check the state of this running job. The state may
* remain the same, become SUCCESS or FAILED.
*/
private void checkRunningState() throws IOException, InterruptedException {
try {
if (job.isComplete()) {
if (job.isSuccessful()) {
this.state = State.SUCCESS;
} else {
this.state = State.FAILED;
this.message = "Job failed!";
}
}
} catch (IOException ioe) {
this.state = State.FAILED;
this.message = StringUtils.stringifyException(ioe);
try {
if (job != null) {
job.killJob();
}
} catch (IOException e) {}
}
}
/**
* Check and update the state of this job. The state changes
* depending on its current state and the states of the depending jobs.
*/
synchronized State checkState() throws IOException, InterruptedException {
if (this.state == State.RUNNING) {
checkRunningState();
}
if (this.state != State.WAITING) {
return this.state;
}
if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
this.state = State.READY;
return this.state;
}
ControlledJob pred = null;
int n = this.dependingJobs.size();
for (int i = 0; i < n; i++) {
pred = this.dependingJobs.get(i);
State s = pred.checkState();
if (s == State.WAITING || s == State.READY || s == State.RUNNING) {
break; // a pred is still not completed, continue in WAITING
// state
}
if (s == State.FAILED || s == State.DEPENDENT_FAILED) {
this.state = State.DEPENDENT_FAILED;
this.message = "depending job " + i + " with jobID "
+ pred.getJobID() + " failed. " + pred.getMessage();
break;
}
// pred must be in success state
if (i == n - 1) {
this.state = State.READY;
}
}
return this.state;
}
/**
* Submit this job to mapred. The state becomes RUNNING if submission
* is successful, FAILED otherwise.
*/
protected synchronized void submit() {
try {
Configuration conf = job.getConfiguration();
if (conf.getBoolean(CREATE_DIR, false)) {
FileSystem fs = FileSystem.get(conf);
Path inputPaths[] = FileInputFormat.getInputPaths(job);
for (int i = 0; i < inputPaths.length; i++) {
if (!fs.exists(inputPaths[i])) {
try {
fs.mkdirs(inputPaths[i]);
} catch (IOException e) {
}
}
}
}
job.submit();
this.state = State.RUNNING;
} catch (Exception ioe) {
LOG.info(getJobName()+" got an error while submitting ",ioe);
this.state = State.FAILED;
this.message = StringUtils.stringifyException(ioe);
}
}
}