| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapred.jobcontrol; |
| |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapred.FileInputFormat; |
| import org.apache.hadoop.mapred.JobClient; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.JobID; |
| import org.apache.hadoop.mapred.RunningJob; |
| import org.apache.hadoop.util.StringUtils; |
| |
| /** This class encapsulates a MapReduce job and its dependency. It monitors |
| * the states of the depending jobs and updates the state of this job. |
| * A job starts in the WAITING state. If it does not have any depending jobs, or |
| * all of the depending jobs are in SUCCESS state, then the job state will become |
| * READY. If any depending jobs fail, the job will fail too. |
| * When in READY state, the job can be submitted to Hadoop for execution, with |
| * the state changing into RUNNING state. From RUNNING state, the job can get into |
| * SUCCESS or FAILED state, depending the status of the job execution. |
| * |
| */ |
| |
| public class Job { |
| |
| // A job will be in one of the following states |
| final public static int SUCCESS = 0; |
| final public static int WAITING = 1; |
| final public static int RUNNING = 2; |
| final public static int READY = 3; |
| final public static int FAILED = 4; |
| final public static int DEPENDENT_FAILED = 5; |
| |
| |
| private JobConf theJobConf; |
| private int state; |
| private String jobID; // assigned and used by JobControl class |
| private JobID mapredJobID; // the job ID assigned by map/reduce |
| private String jobName; // external name, assigned/used by client app |
| private String message; // some info for human consumption, |
| // e.g. the reason why the job failed |
| private ArrayList<Job> dependingJobs; // the jobs the current job depends on |
| |
| private JobClient jc = null; // the map reduce job client |
| |
| /** |
| * Construct a job. |
| * @param jobConf a mapred job configuration representing a job to be executed. |
| * @param dependingJobs an array of jobs the current job depends on |
| */ |
| public Job(JobConf jobConf, ArrayList<Job> dependingJobs) throws IOException { |
| this.theJobConf = jobConf; |
| this.dependingJobs = dependingJobs; |
| this.state = Job.WAITING; |
| this.jobID = "unassigned"; |
| this.mapredJobID = null; //not yet assigned |
| this.jobName = "unassigned"; |
| this.message = "just initialized"; |
| this.jc = new JobClient(jobConf); |
| } |
| |
| /** |
| * Construct a job. |
| * |
| * @param jobConf mapred job configuration representing a job to be executed. |
| * @throws IOException |
| */ |
| public Job(JobConf jobConf) throws IOException { |
| this(jobConf, null); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuffer sb = new StringBuffer(); |
| sb.append("job name:\t").append(this.jobName).append("\n"); |
| sb.append("job id:\t").append(this.jobID).append("\n"); |
| sb.append("job state:\t").append(this.state).append("\n"); |
| sb.append("job mapred id:\t").append(this.mapredJobID==null ? "unassigned" |
| : this.mapredJobID).append("\n"); |
| sb.append("job message:\t").append(this.message).append("\n"); |
| |
| if (this.dependingJobs == null || this.dependingJobs.size() == 0) { |
| sb.append("job has no depending job:\t").append("\n"); |
| } else { |
| sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n"); |
| for (int i = 0; i < this.dependingJobs.size(); i++) { |
| sb.append("\t depending job ").append(i).append(":\t"); |
| sb.append((this.dependingJobs.get(i)).getJobName()).append("\n"); |
| } |
| } |
| return sb.toString(); |
| } |
| |
| /** |
| * @return the job name of this job |
| */ |
| public String getJobName() { |
| return this.jobName; |
| } |
| |
| /** |
| * Set the job name for this job. |
| * @param jobName the job name |
| */ |
| public void setJobName(String jobName) { |
| this.jobName = jobName; |
| } |
| |
| /** |
| * @return the job ID of this job assigned by JobControl |
| */ |
| public String getJobID() { |
| return this.jobID; |
| } |
| |
| /** |
| * Set the job ID for this job. |
| * @param id the job ID |
| */ |
| public void setJobID(String id) { |
| this.jobID = id; |
| } |
| |
| /** |
| * @return the mapred ID of this job |
| * @deprecated use {@link #getAssignedJobID()} instead |
| */ |
| @Deprecated |
| public String getMapredJobID() { |
| return this.mapredJobID.toString(); |
| } |
| |
| /** |
| * Set the mapred ID for this job. |
| * @param mapredJobID the mapred job ID for this job. |
| * @deprecated use {@link #setAssignedJobID(JobID)} instead |
| */ |
| @Deprecated |
| public void setMapredJobID(String mapredJobID) { |
| this.mapredJobID = JobID.forName(mapredJobID); |
| } |
| |
| /** |
| * @return the mapred ID of this job as assigned by the |
| * mapred framework. |
| */ |
| public JobID getAssignedJobID() { |
| return this.mapredJobID; |
| } |
| |
| /** |
| * Set the mapred ID for this job as assigned by the |
| * mapred framework. |
| * @param mapredJobID the mapred job ID for this job. |
| */ |
| public void setAssignedJobID(JobID mapredJobID) { |
| this.mapredJobID = mapredJobID; |
| } |
| |
| /** |
| * @return the mapred job conf of this job |
| */ |
| public JobConf getJobConf() { |
| return this.theJobConf; |
| } |
| |
| |
| /** |
| * Set the mapred job conf for this job. |
| * @param jobConf the mapred job conf for this job. |
| */ |
| public void setJobConf(JobConf jobConf) { |
| this.theJobConf = jobConf; |
| } |
| |
| /** |
| * @return the state of this job |
| */ |
| public synchronized int getState() { |
| return this.state; |
| } |
| |
| /** |
| * Set the state for this job. |
| * @param state the new state for this job. |
| */ |
| protected synchronized void setState(int state) { |
| this.state = state; |
| } |
| |
| /** |
| * @return the message of this job |
| */ |
| public String getMessage() { |
| return this.message; |
| } |
| |
| /** |
| * Set the message for this job. |
| * @param message the message for this job. |
| */ |
| public void setMessage(String message) { |
| this.message = message; |
| } |
| |
| |
| /** |
| * @return the job client of this job |
| */ |
| public JobClient getJobClient(){ |
| return this.jc; |
| } |
| |
| /** |
| * @return the depending jobs of this job |
| */ |
| public ArrayList<Job> getDependingJobs() { |
| return this.dependingJobs; |
| } |
| |
| /** |
| * Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job |
| * is waiting to run, not during or afterwards. |
| * |
| * @param dependingJob Job that this Job depends on. |
| * @return <tt>true</tt> if the Job was added. |
| */ |
| public synchronized boolean addDependingJob(Job dependingJob) { |
| if (this.state == Job.WAITING) { //only allowed to add jobs when waiting |
| if (this.dependingJobs == null) { |
| this.dependingJobs = new ArrayList<Job>(); |
| } |
| return this.dependingJobs.add(dependingJob); |
| } else { |
| return false; |
| } |
| } |
| |
| /** |
| * @return true if this job is in a complete state |
| */ |
| public boolean isCompleted() { |
| return this.state == Job.FAILED || |
| this.state == Job.DEPENDENT_FAILED || |
| this.state == Job.SUCCESS; |
| } |
| |
| /** |
| * @return true if this job is in READY state |
| */ |
| public boolean isReady() { |
| return this.state == Job.READY; |
| } |
| |
| /** |
| * Check the state of this running job. The state may |
| * remain the same, become SUCCESS or FAILED. |
| */ |
| private void checkRunningState() { |
| RunningJob running = null; |
| try { |
| running = jc.getJob(this.mapredJobID); |
| if (running.isComplete()) { |
| if (running.isSuccessful()) { |
| this.state = Job.SUCCESS; |
| } else { |
| this.state = Job.FAILED; |
| this.message = "Job failed! Error - " + running.getFailureInfo(); |
| try { |
| running.killJob(); |
| } catch (IOException e1) { |
| |
| } |
| try { |
| this.jc.close(); |
| } catch (IOException e2) { |
| |
| } |
| } |
| } |
| |
| } catch (IOException ioe) { |
| this.state = Job.FAILED; |
| this.message = StringUtils.stringifyException(ioe); |
| try { |
| if (running != null) |
| running.killJob(); |
| } catch (IOException e1) { |
| |
| } |
| try { |
| this.jc.close(); |
| } catch (IOException e1) { |
| |
| } |
| } |
| } |
| |
| /** |
| * Check and update the state of this job. The state changes |
| * depending on its current state and the states of the depending jobs. |
| */ |
| synchronized int checkState() { |
| if (this.state == Job.RUNNING) { |
| checkRunningState(); |
| } |
| if (this.state != Job.WAITING) { |
| return this.state; |
| } |
| if (this.dependingJobs == null || this.dependingJobs.size() == 0) { |
| this.state = Job.READY; |
| return this.state; |
| } |
| Job pred = null; |
| int n = this.dependingJobs.size(); |
| for (int i = 0; i < n; i++) { |
| pred = this.dependingJobs.get(i); |
| int s = pred.checkState(); |
| if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) { |
| break; // a pred is still not completed, continue in WAITING |
| // state |
| } |
| if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) { |
| this.state = Job.DEPENDENT_FAILED; |
| this.message = "depending job " + i + " with jobID " |
| + pred.getJobID() + " failed. " + pred.getMessage(); |
| break; |
| } |
| // pred must be in success state |
| if (i == n - 1) { |
| this.state = Job.READY; |
| } |
| } |
| |
| return this.state; |
| } |
| |
| /** |
| * Submit this job to mapred. The state becomes RUNNING if submission |
| * is successful, FAILED otherwise. |
| */ |
| protected synchronized void submit() { |
| try { |
| if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) { |
| FileSystem fs = FileSystem.get(theJobConf); |
| Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf); |
| for (int i = 0; i < inputPaths.length; i++) { |
| if (!fs.exists(inputPaths[i])) { |
| try { |
| fs.mkdirs(inputPaths[i]); |
| } catch (IOException e) { |
| |
| } |
| } |
| } |
| } |
| RunningJob running = jc.submitJob(theJobConf); |
| this.mapredJobID = running.getID(); |
| this.state = Job.RUNNING; |
| } catch (IOException ioe) { |
| this.state = Job.FAILED; |
| this.message = StringUtils.stringifyException(ioe); |
| } |
| } |
| |
| } |