| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce; |
| |
| import java.io.BufferedReader; |
| import java.io.BufferedWriter; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.OutputStream; |
| import java.io.OutputStreamWriter; |
| import java.net.URL; |
| import java.net.URLConnection; |
| import java.net.URI; |
| import java.security.PrivilegedExceptionAction; |
| |
| import javax.security.auth.login.LoginException; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.Configuration.IntegerRanges; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.RawComparator; |
| import org.apache.hadoop.mapreduce.filecache.DistributedCache; |
| import org.apache.hadoop.mapreduce.task.JobContextImpl; |
| import org.apache.hadoop.mapreduce.util.ConfigUtil; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.StringUtils; |
| |
| /** |
| * The job submitter's view of the Job. |
| * |
| * <p>It allows the user to configure the |
| * job, submit it, control its execution, and query the state. The set methods |
| * only work until the job is submitted, afterwards they will throw an |
| * IllegalStateException. </p> |
| * |
| * <p> |
| * Normally the user creates the application, describes various facets of the |
| * job via {@link Job} and then submits the job and monitor its progress.</p> |
| * |
| * <p>Here is an example on how to submit a job:</p> |
| * <p><blockquote><pre> |
| * // Create a new Job |
| * Job job = new Job(new Configuration()); |
| * job.setJarByClass(MyJob.class); |
| * |
| * // Specify various job-specific parameters |
| * job.setJobName("myjob"); |
| * |
| * job.setInputPath(new Path("in")); |
| * job.setOutputPath(new Path("out")); |
| * |
| * job.setMapperClass(MyJob.MyMapper.class); |
| * job.setReducerClass(MyJob.MyReducer.class); |
| * |
| * // Submit the job, then poll for progress until the job is complete |
| * job.waitForCompletion(true); |
| * </pre></blockquote></p> |
| * |
| * |
| */ |
| @InterfaceAudience.Public |
| @InterfaceStability.Evolving |
| public class Job extends JobContextImpl implements JobContext { |
| private static final Log LOG = LogFactory.getLog(Job.class); |
| |
| @InterfaceStability.Evolving |
| public static enum JobState {DEFINE, RUNNING}; |
| private static final long MAX_JOBSTATUS_AGE = 1000 * 2; |
| public static final String OUTPUT_FILTER = "mapreduce.client.output.filter"; |
| /** Key in mapred-*.xml that sets completionPollInvervalMillis */ |
| public static final String COMPLETION_POLL_INTERVAL_KEY = |
| "mapreduce.client.completion.pollinterval"; |
| |
| /** Default completionPollIntervalMillis is 5000 ms. */ |
| static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000; |
| /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */ |
| public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY = |
| "mapreduce.client.progressmonitor.pollinterval"; |
| /** Default progMonitorPollIntervalMillis is 1000 ms. */ |
| static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000; |
| |
| public static final String USED_GENERIC_PARSER = |
| "mapreduce.client.genericoptionsparser.used"; |
| public static final String SUBMIT_REPLICATION = |
| "mapreduce.client.submit.file.replication"; |
| private static final String TASKLOG_PULL_TIMEOUT_KEY = |
| "mapreduce.client.tasklog.timeout"; |
| private static final int DEFAULT_TASKLOG_TIMEOUT = 60000; |
| |
| @InterfaceStability.Evolving |
| public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } |
| |
| static { |
| ConfigUtil.loadResources(); |
| } |
| |
| private JobState state = JobState.DEFINE; |
| private JobStatus status; |
| private long statustime; |
| private Cluster cluster; |
| |
| @Deprecated |
| public Job() throws IOException { |
| this(new Configuration()); |
| } |
| |
| @Deprecated |
| public Job(Configuration conf) throws IOException { |
| this(new Cluster(conf), conf); |
| } |
| |
| @Deprecated |
| public Job(Configuration conf, String jobName) throws IOException { |
| this(conf); |
| setJobName(jobName); |
| } |
| |
| Job(Cluster cluster) throws IOException { |
| this(cluster, new Configuration()); |
| } |
| |
| Job(Cluster cluster, Configuration conf) throws IOException { |
| super(conf, null); |
| this.cluster = cluster; |
| } |
| |
| Job(Cluster cluster, JobStatus status, |
| Configuration conf) throws IOException { |
| this(cluster, conf); |
| setJobID(status.getJobID()); |
| this.status = status; |
| state = JobState.RUNNING; |
| } |
| |
| |
| /** |
| * Creates a new {@link Job} with no particular {@link Cluster} . |
| * A Cluster will be created with a generic {@link Configuration}. |
| * |
| * @return the {@link Job} , with no connection to a cluster yet. |
| * @throws IOException |
| */ |
| public static Job getInstance() throws IOException { |
| // create with a null Cluster |
| return getInstance(new Configuration()); |
| } |
| |
| /** |
| * Creates a new {@link Job} with no particular {@link Cluster} . |
| * A Cluster will be created from the conf parameter only when it's needed. |
| * |
| * @param conf the configuration |
| * @return the {@link Job} , with no connection to a cluster yet. |
| * @throws IOException |
| */ |
| public static Job getInstance(Configuration conf) throws IOException { |
| // create with a null Cluster |
| return new Job(null, conf); |
| } |
| |
| |
| /** |
| * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName. |
| * A Cluster will be created from the conf parameter only when it's needed. |
| * |
| * @param conf the configuration |
| * @return the {@link Job} , with no connection to a cluster yet. |
| * @throws IOException |
| */ |
| public static Job getInstance(Configuration conf, String jobName) |
| throws IOException { |
| // create with a null Cluster |
| Job result = new Job(null, conf); |
| result.setJobName(jobName); |
| return result; |
| } |
| |
| public static Job getInstance(Cluster cluster) throws IOException { |
| return new Job(cluster); |
| } |
| |
| public static Job getInstance(Cluster cluster, Configuration conf) |
| throws IOException { |
| return new Job(cluster, conf); |
| } |
| |
| public static Job getInstance(Cluster cluster, JobStatus status, |
| Configuration conf) throws IOException { |
| return new Job(cluster, status, conf); |
| } |
| |
| private void ensureState(JobState state) throws IllegalStateException { |
| if (state != this.state) { |
| throw new IllegalStateException("Job in state "+ this.state + |
| " instead of " + state); |
| } |
| |
| if (state == JobState.RUNNING && cluster == null) { |
| throw new IllegalStateException |
| ("Job in state " + this.state |
| + ", but it isn't attached to any job tracker!"); |
| } |
| } |
| |
| /** |
| * Some methods rely on having a recent job status object. Refresh |
| * it, if necessary |
| */ |
| synchronized void ensureFreshStatus() |
| throws IOException, InterruptedException { |
| if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) { |
| updateStatus(); |
| } |
| } |
| |
| /** Some methods need to update status immediately. So, refresh |
| * immediately |
| * @throws IOException |
| */ |
| synchronized void updateStatus() throws IOException, InterruptedException { |
| this.status = cluster.getClient().getJobStatus(status.getJobID()); |
| if (this.status == null) { |
| throw new IOException("Job status not available "); |
| } |
| this.statustime = System.currentTimeMillis(); |
| } |
| |
| public JobStatus getStatus() throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| updateStatus(); |
| return status; |
| } |
| |
| /** |
| * Returns the current state of the Job. |
| * |
| * @return JobStatus#State |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| public JobStatus.State getJobState() |
| throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| updateStatus(); |
| return status.getState(); |
| } |
| |
| /** |
| * Get the URL where some job progress information will be displayed. |
| * |
| * @return the URL where some job progress information will be displayed. |
| */ |
| public String getTrackingURL(){ |
| ensureState(JobState.RUNNING); |
| return status.getTrackingUrl().toString(); |
| } |
| |
| /** |
| * Get the path of the submitted job configuration. |
| * |
| * @return the path of the submitted job configuration. |
| */ |
| public String getJobFile() { |
| ensureState(JobState.RUNNING); |
| return status.getJobFile(); |
| } |
| |
| /** |
| * Get start time of the job. |
| * |
| * @return the start time of the job |
| */ |
| public long getStartTime() { |
| ensureState(JobState.RUNNING); |
| return status.getStartTime(); |
| } |
| |
| /** |
| * Get finish time of the job. |
| * |
| * @return the finish time of the job |
| */ |
| public long getFinishTime() throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| updateStatus(); |
| return status.getFinishTime(); |
| } |
| |
| /** |
| * Get scheduling info of the job. |
| * |
| * @return the scheduling info of the job |
| */ |
| public String getSchedulingInfo() { |
| ensureState(JobState.RUNNING); |
| return status.getSchedulingInfo(); |
| } |
| |
| /** |
| * Get scheduling info of the job. |
| * |
| * @return the scheduling info of the job |
| */ |
| public JobPriority getPriority() throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| updateStatus(); |
| return status.getPriority(); |
| } |
| |
| /** |
| * The user-specified job name. |
| */ |
| public String getJobName() { |
| if (state == JobState.DEFINE) { |
| return super.getJobName(); |
| } |
| ensureState(JobState.RUNNING); |
| return status.getJobName(); |
| } |
| |
| public String getHistoryUrl() throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| updateStatus(); |
| return status.getHistoryFile(); |
| } |
| |
| public boolean isRetired() throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| updateStatus(); |
| return status.isRetired(); |
| } |
| |
| /** |
| * Dump stats to screen. |
| */ |
| @Override |
| public String toString() { |
| ensureState(JobState.RUNNING); |
| try { |
| updateStatus(); |
| } catch (IOException e) { |
| } catch (InterruptedException ie) { |
| } |
| StringBuffer sb = new StringBuffer(); |
| sb.append("Job: ").append(status.getJobID()).append("\n"); |
| sb.append("Job File: ").append(status.getJobFile()).append("\n"); |
| sb.append("Job Tracking URL : ").append(status.getTrackingUrl()); |
| sb.append("\n"); |
| sb.append("map() completion: "); |
| sb.append(status.getMapProgress()).append("\n"); |
| sb.append("reduce() completion: "); |
| sb.append(status.getReduceProgress()).append("\n"); |
| sb.append("Job state: "); |
| sb.append(status.getState()).append("\n"); |
| sb.append("history URL: "); |
| sb.append(status.getHistoryFile()).append("\n"); |
| sb.append("retired: ").append(status.isRetired()); |
| return sb.toString(); |
| } |
| |
| /** |
| * Get the information of the current state of the tasks of a job. |
| * |
| * @param type Type of the task |
| * @return the list of all of the map tips. |
| * @throws IOException |
| */ |
| public TaskReport[] getTaskReports(TaskType type) |
| throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| return cluster.getClient().getTaskReports(getJobID(), type); |
| } |
| |
| /** |
| * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 |
| * and 1.0. When all map tasks have completed, the function returns 1.0. |
| * |
| * @return the progress of the job's map-tasks. |
| * @throws IOException |
| */ |
| public float mapProgress() throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| ensureFreshStatus(); |
| return status.getMapProgress(); |
| } |
| |
| /** |
| * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 |
| * and 1.0. When all reduce tasks have completed, the function returns 1.0. |
| * |
| * @return the progress of the job's reduce-tasks. |
| * @throws IOException |
| */ |
| public float reduceProgress() throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| ensureFreshStatus(); |
| return status.getReduceProgress(); |
| } |
| |
| /** |
| * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 |
| * and 1.0. When all cleanup tasks have completed, the function returns 1.0. |
| * |
| * @return the progress of the job's cleanup-tasks. |
| * @throws IOException |
| */ |
| public float cleanupProgress() throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| ensureFreshStatus(); |
| return status.getCleanupProgress(); |
| } |
| |
| /** |
| * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 |
| * and 1.0. When all setup tasks have completed, the function returns 1.0. |
| * |
| * @return the progress of the job's setup-tasks. |
| * @throws IOException |
| */ |
| public float setupProgress() throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| ensureFreshStatus(); |
| return status.getSetupProgress(); |
| } |
| |
| /** |
| * Check if the job is finished or not. |
| * This is a non-blocking call. |
| * |
| * @return <code>true</code> if the job is complete, else <code>false</code>. |
| * @throws IOException |
| */ |
| public boolean isComplete() throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| updateStatus(); |
| return status.isJobComplete(); |
| } |
| |
| /** |
| * Check if the job completed successfully. |
| * |
| * @return <code>true</code> if the job succeeded, else <code>false</code>. |
| * @throws IOException |
| */ |
| public boolean isSuccessful() throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| updateStatus(); |
| return status.getState() == JobStatus.State.SUCCEEDED; |
| } |
| |
| /** |
| * Kill the running job. Blocks until all job tasks have been |
| * killed as well. If the job is no longer running, it simply returns. |
| * |
| * @throws IOException |
| */ |
| public void killJob() throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| cluster.getClient().killJob(getJobID()); |
| } |
| |
| /** |
| * Set the priority of a running job. |
| * @param priority the new priority for the job. |
| * @throws IOException |
| */ |
| public void setPriority(JobPriority priority) |
| throws IOException, InterruptedException { |
| if (state == JobState.DEFINE) { |
| conf.setJobPriority( |
| org.apache.hadoop.mapred.JobPriority.valueOf(priority.name())); |
| } else { |
| ensureState(JobState.RUNNING); |
| cluster.getClient().setJobPriority(getJobID(), priority.toString()); |
| } |
| } |
| |
| /** |
| * Get events indicating completion (success/failure) of component tasks. |
| * |
| * @param startFrom index to start fetching events from |
| * @param numEvents number of events to fetch |
| * @return an array of {@link TaskCompletionEvent}s |
| * @throws IOException |
| */ |
| public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom, |
| int numEvents) throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| return cluster.getClient().getTaskCompletionEvents(getJobID(), |
| startFrom, numEvents); |
| } |
| |
| /** |
| * Kill indicated task attempt. |
| * |
| * @param taskId the id of the task to be terminated. |
| * @throws IOException |
| */ |
| public boolean killTask(TaskAttemptID taskId) |
| throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| return cluster.getClient().killTask(taskId, false); |
| } |
| |
| /** |
| * Fail indicated task attempt. |
| * |
| * @param taskId the id of the task to be terminated. |
| * @throws IOException |
| */ |
| public boolean failTask(TaskAttemptID taskId) |
| throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| return cluster.getClient().killTask(taskId, true); |
| } |
| |
| /** |
| * Gets the counters for this job. May return null if the job has been |
| * retired and the job is no longer in the completed job store. |
| * |
| * @return the counters for this job. |
| * @throws IOException |
| */ |
| public Counters getCounters() |
| throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| return cluster.getClient().getJobCounters(getJobID()); |
| } |
| |
| /** |
| * Gets the diagnostic messages for a given task attempt. |
| * @param taskid |
| * @return the list of diagnostic messages for the task |
| * @throws IOException |
| */ |
| public String[] getTaskDiagnostics(TaskAttemptID taskid) |
| throws IOException, InterruptedException { |
| ensureState(JobState.RUNNING); |
| return cluster.getClient().getTaskDiagnostics(taskid); |
| } |
| |
| /** |
| * Set the number of reduce tasks for the job. |
| * @param tasks the number of reduce tasks |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setNumReduceTasks(int tasks) throws IllegalStateException { |
| ensureState(JobState.DEFINE); |
| conf.setNumReduceTasks(tasks); |
| } |
| |
| /** |
| * Set the current working directory for the default file system. |
| * |
| * @param dir the new current working directory. |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setWorkingDirectory(Path dir) throws IOException { |
| ensureState(JobState.DEFINE); |
| conf.setWorkingDirectory(dir); |
| } |
| |
| /** |
| * Set the {@link InputFormat} for the job. |
| * @param cls the <code>InputFormat</code> to use |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setInputFormatClass(Class<? extends InputFormat> cls |
| ) throws IllegalStateException { |
| ensureState(JobState.DEFINE); |
| conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, |
| InputFormat.class); |
| } |
| |
| /** |
| * Set the {@link OutputFormat} for the job. |
| * @param cls the <code>OutputFormat</code> to use |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setOutputFormatClass(Class<? extends OutputFormat> cls |
| ) throws IllegalStateException { |
| ensureState(JobState.DEFINE); |
| conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, |
| OutputFormat.class); |
| } |
| |
| /** |
| * Set the {@link Mapper} for the job. |
| * @param cls the <code>Mapper</code> to use |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setMapperClass(Class<? extends Mapper> cls |
| ) throws IllegalStateException { |
| ensureState(JobState.DEFINE); |
| conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class); |
| } |
| |
| /** |
| * Set the Jar by finding where a given class came from. |
| * @param cls the example class |
| */ |
| public void setJarByClass(Class<?> cls) { |
| ensureState(JobState.DEFINE); |
| conf.setJarByClass(cls); |
| } |
| |
| /** |
| * Set the job jar |
| */ |
| public void setJar(String jar) { |
| ensureState(JobState.DEFINE); |
| conf.setJar(jar); |
| } |
| |
| /** |
| * Set the reported username for this job. |
| * |
| * @param user the username for this job. |
| */ |
| public void setUser(String user) { |
| ensureState(JobState.DEFINE); |
| conf.setUser(user); |
| } |
| |
| /** |
| * Set the combiner class for the job. |
| * @param cls the combiner to use |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setCombinerClass(Class<? extends Reducer> cls |
| ) throws IllegalStateException { |
| ensureState(JobState.DEFINE); |
| conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); |
| } |
| |
| /** |
| * Set the {@link Reducer} for the job. |
| * @param cls the <code>Reducer</code> to use |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setReducerClass(Class<? extends Reducer> cls |
| ) throws IllegalStateException { |
| ensureState(JobState.DEFINE); |
| conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class); |
| } |
| |
| /** |
| * Set the {@link Partitioner} for the job. |
| * @param cls the <code>Partitioner</code> to use |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setPartitionerClass(Class<? extends Partitioner> cls |
| ) throws IllegalStateException { |
| ensureState(JobState.DEFINE); |
| conf.setClass(PARTITIONER_CLASS_ATTR, cls, |
| Partitioner.class); |
| } |
| |
| /** |
| * Set the key class for the map output data. This allows the user to |
| * specify the map output key class to be different than the final output |
| * value class. |
| * |
| * @param theClass the map output key class. |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setMapOutputKeyClass(Class<?> theClass |
| ) throws IllegalStateException { |
| ensureState(JobState.DEFINE); |
| conf.setMapOutputKeyClass(theClass); |
| } |
| |
| /** |
| * Set the value class for the map output data. This allows the user to |
| * specify the map output value class to be different than the final output |
| * value class. |
| * |
| * @param theClass the map output value class. |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setMapOutputValueClass(Class<?> theClass |
| ) throws IllegalStateException { |
| ensureState(JobState.DEFINE); |
| conf.setMapOutputValueClass(theClass); |
| } |
| |
| /** |
| * Set the key class for the job output data. |
| * |
| * @param theClass the key class for the job output data. |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setOutputKeyClass(Class<?> theClass |
| ) throws IllegalStateException { |
| ensureState(JobState.DEFINE); |
| conf.setOutputKeyClass(theClass); |
| } |
| |
| /** |
| * Set the value class for job outputs. |
| * |
| * @param theClass the value class for job outputs. |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setOutputValueClass(Class<?> theClass |
| ) throws IllegalStateException { |
| ensureState(JobState.DEFINE); |
| conf.setOutputValueClass(theClass); |
| } |
| |
| /** |
| * Define the comparator that controls how the keys are sorted before they |
| * are passed to the {@link Reducer}. |
| * @param cls the raw comparator |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setSortComparatorClass(Class<? extends RawComparator> cls |
| ) throws IllegalStateException { |
| ensureState(JobState.DEFINE); |
| conf.setOutputKeyComparatorClass(cls); |
| } |
| |
| /** |
| * Define the comparator that controls which keys are grouped together |
| * for a single call to |
| * {@link Reducer#reduce(Object, Iterable, |
| * org.apache.hadoop.mapreduce.Reducer.Context)} |
| * @param cls the raw comparator to use |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setGroupingComparatorClass(Class<? extends RawComparator> cls |
| ) throws IllegalStateException { |
| ensureState(JobState.DEFINE); |
| conf.setOutputValueGroupingComparator(cls); |
| } |
| |
| /** |
| * Set the user-specified job name. |
| * |
| * @param name the job's new name. |
| * @throws IllegalStateException if the job is submitted |
| */ |
| public void setJobName(String name) throws IllegalStateException { |
| ensureState(JobState.DEFINE); |
| conf.setJobName(name); |
| } |
| |
| /** |
| * Turn speculative execution on or off for this job. |
| * |
| * @param speculativeExecution <code>true</code> if speculative execution |
| * should be turned on, else <code>false</code>. |
| */ |
| public void setSpeculativeExecution(boolean speculativeExecution) { |
| ensureState(JobState.DEFINE); |
| conf.setSpeculativeExecution(speculativeExecution); |
| } |
| |
| /** |
| * Turn speculative execution on or off for this job for map tasks. |
| * |
| * @param speculativeExecution <code>true</code> if speculative execution |
| * should be turned on for map tasks, |
| * else <code>false</code>. |
| */ |
| public void setMapSpeculativeExecution(boolean speculativeExecution) { |
| ensureState(JobState.DEFINE); |
| conf.setMapSpeculativeExecution(speculativeExecution); |
| } |
| |
| /** |
| * Turn speculative execution on or off for this job for reduce tasks. |
| * |
| * @param speculativeExecution <code>true</code> if speculative execution |
| * should be turned on for reduce tasks, |
| * else <code>false</code>. |
| */ |
| public void setReduceSpeculativeExecution(boolean speculativeExecution) { |
| ensureState(JobState.DEFINE); |
| conf.setReduceSpeculativeExecution(speculativeExecution); |
| } |
| |
| /** |
| * Specify whether job-setup and job-cleanup is needed for the job |
| * |
| * @param needed If <code>true</code>, job-setup and job-cleanup will be |
| * considered from {@link OutputCommitter} |
| * else ignored. |
| */ |
| public void setJobSetupCleanupNeeded(boolean needed) { |
| ensureState(JobState.DEFINE); |
| conf.setBoolean(SETUP_CLEANUP_NEEDED, needed); |
| } |
| |
| /** |
| * Set the given set of archives |
| * @param archives The list of archives that need to be localized |
| */ |
| public void setCacheArchives(URI[] archives) { |
| ensureState(JobState.DEFINE); |
| DistributedCache.setCacheArchives(archives, conf); |
| } |
| |
| /** |
| * Set the given set of files |
| * @param files The list of files that need to be localized |
| */ |
| public void setCacheFiles(URI[] files) { |
| ensureState(JobState.DEFINE); |
| DistributedCache.setCacheFiles(files, conf); |
| } |
| |
| /** |
| * Add a archives to be localized |
| * @param uri The uri of the cache to be localized |
| */ |
| public void addCacheArchive(URI uri) { |
| ensureState(JobState.DEFINE); |
| DistributedCache.addCacheArchive(uri, conf); |
| } |
| |
| /** |
| * Add a file to be localized |
| * @param uri The uri of the cache to be localized |
| */ |
| public void addCacheFile(URI uri) { |
| ensureState(JobState.DEFINE); |
| DistributedCache.addCacheFile(uri, conf); |
| } |
| |
| /** |
| * Add an file path to the current set of classpath entries It adds the file |
| * to cache as well. |
| * |
| * @param file Path of the file to be added |
| */ |
| public void addFileToClassPath(Path file) |
| throws IOException { |
| ensureState(JobState.DEFINE); |
| DistributedCache.addFileToClassPath(file, conf); |
| } |
| |
| /** |
| * Add an archive path to the current set of classpath entries. It adds the |
| * archive to cache as well. |
| * |
| * @param archive Path of the archive to be added |
| */ |
| public void addArchiveToClassPath(Path archive) |
| throws IOException { |
| ensureState(JobState.DEFINE); |
| DistributedCache.addArchiveToClassPath(archive, conf); |
| } |
| |
| /** |
| * This method allows you to create symlinks in the current working directory |
| * of the task to all the cache files/archives |
| */ |
| public void createSymlink() { |
| ensureState(JobState.DEFINE); |
| DistributedCache.createSymlink(conf); |
| } |
| |
| /** |
| * Expert: Set the number of maximum attempts that will be made to run a |
| * map task. |
| * |
| * @param n the number of attempts per map task. |
| */ |
| public void setMaxMapAttempts(int n) { |
| ensureState(JobState.DEFINE); |
| conf.setMaxMapAttempts(n); |
| } |
| |
| /** |
| * Expert: Set the number of maximum attempts that will be made to run a |
| * reduce task. |
| * |
| * @param n the number of attempts per reduce task. |
| */ |
| public void setMaxReduceAttempts(int n) { |
| ensureState(JobState.DEFINE); |
| conf.setMaxReduceAttempts(n); |
| } |
| |
| /** |
| * Set whether the system should collect profiler information for some of |
| * the tasks in this job? The information is stored in the user log |
| * directory. |
| * @param newValue true means it should be gathered |
| */ |
| public void setProfileEnabled(boolean newValue) { |
| ensureState(JobState.DEFINE); |
| conf.setProfileEnabled(newValue); |
| } |
| |
| /** |
| * Set the profiler configuration arguments. If the string contains a '%s' it |
| * will be replaced with the name of the profiling output file when the task |
| * runs. |
| * |
| * This value is passed to the task child JVM on the command line. |
| * |
| * @param value the configuration string |
| */ |
| public void setProfileParams(String value) { |
| ensureState(JobState.DEFINE); |
| conf.setProfileParams(value); |
| } |
| |
| /** |
| * Set the ranges of maps or reduces to profile. setProfileEnabled(true) |
| * must also be called. |
| * @param newValue a set of integer ranges of the map ids |
| */ |
| public void setProfileTaskRange(boolean isMap, String newValue) { |
| ensureState(JobState.DEFINE); |
| conf.setProfileTaskRange(isMap, newValue); |
| } |
| |
| private void ensureNotSet(String attr, String msg) throws IOException { |
| if (conf.get(attr) != null) { |
| throw new IOException(attr + " is incompatible with " + msg + " mode."); |
| } |
| } |
| |
| /** |
| * Sets the flag that will allow the JobTracker to cancel the HDFS delegation |
| * tokens upon job completion. Defaults to true. |
| */ |
| public void setCancelDelegationTokenUponJobCompletion(boolean value) { |
| ensureState(JobState.DEFINE); |
| conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value); |
| } |
| |
| /** |
| * Default to the new APIs unless they are explicitly set or the old mapper or |
| * reduce attributes are used. |
| * @throws IOException if the configuration is inconsistant |
| */ |
| private void setUseNewAPI() throws IOException { |
| int numReduces = conf.getNumReduceTasks(); |
| String oldMapperClass = "mapred.mapper.class"; |
| String oldReduceClass = "mapred.reducer.class"; |
| conf.setBooleanIfUnset("mapred.mapper.new-api", |
| conf.get(oldMapperClass) == null); |
| if (conf.getUseNewMapper()) { |
| String mode = "new map API"; |
| ensureNotSet("mapred.input.format.class", mode); |
| ensureNotSet(oldMapperClass, mode); |
| if (numReduces != 0) { |
| ensureNotSet("mapred.partitioner.class", mode); |
| } else { |
| ensureNotSet("mapred.output.format.class", mode); |
| } |
| } else { |
| String mode = "map compatability"; |
| ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode); |
| ensureNotSet(MAP_CLASS_ATTR, mode); |
| if (numReduces != 0) { |
| ensureNotSet(PARTITIONER_CLASS_ATTR, mode); |
| } else { |
| ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); |
| } |
| } |
| if (numReduces != 0) { |
| conf.setBooleanIfUnset("mapred.reducer.new-api", |
| conf.get(oldReduceClass) == null); |
| if (conf.getUseNewReducer()) { |
| String mode = "new reduce API"; |
| ensureNotSet("mapred.output.format.class", mode); |
| ensureNotSet(oldReduceClass, mode); |
| } else { |
| String mode = "reduce compatability"; |
| ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode); |
| ensureNotSet(REDUCE_CLASS_ATTR, mode); |
| } |
| } |
| } |
| |
| private synchronized void connect() |
| throws IOException, InterruptedException, ClassNotFoundException { |
| if (cluster == null) { |
| cluster = |
| ugi.doAs(new PrivilegedExceptionAction<Cluster>() { |
| public Cluster run() |
| throws IOException, InterruptedException, |
| ClassNotFoundException { |
| return new Cluster(getConfiguration()); |
| } |
| }); |
| } |
| } |
| |
| boolean isConnected() { |
| return cluster != null; |
| } |
| |
| /** |
| * Submit the job to the cluster and return immediately. |
| * @throws IOException |
| */ |
| public void submit() |
| throws IOException, InterruptedException, ClassNotFoundException { |
| ensureState(JobState.DEFINE); |
| setUseNewAPI(); |
| connect(); |
| final JobSubmitter submitter = new JobSubmitter(cluster.getFileSystem(), |
| cluster.getClient()); |
| status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { |
| public JobStatus run() throws IOException, InterruptedException, |
| ClassNotFoundException { |
| return submitter.submitJobInternal(Job.this, cluster); |
| } |
| }); |
| state = JobState.RUNNING; |
| } |
| |
| /** |
| * Submit the job to the cluster and wait for it to finish. |
| * @param verbose print the progress to the user |
| * @return true if the job succeeded |
| * @throws IOException thrown if the communication with the |
| * <code>JobTracker</code> is lost |
| */ |
| public boolean waitForCompletion(boolean verbose |
| ) throws IOException, InterruptedException, |
| ClassNotFoundException { |
| if (state == JobState.DEFINE) { |
| submit(); |
| } |
| if (verbose) { |
| monitorAndPrintJob(); |
| } else { |
| // get the completion poll interval from the client. |
| int completionPollIntervalMillis = |
| Job.getCompletionPollInterval(cluster.getConf()); |
| while (!isComplete()) { |
| try { |
| Thread.sleep(completionPollIntervalMillis); |
| } catch (InterruptedException ie) { |
| } |
| } |
| } |
| return isSuccessful(); |
| } |
| |
| /** |
| * Monitor a job and print status in real-time as progress is made and tasks |
| * fail. |
| * @return true if the job succeeded |
| * @throws IOException if communication to the JobTracker fails |
| */ |
| public boolean monitorAndPrintJob() |
| throws IOException, InterruptedException { |
| String lastReport = null; |
| Job.TaskStatusFilter filter; |
| Configuration clientConf = cluster.getConf(); |
| filter = Job.getTaskOutputFilter(clientConf); |
| JobID jobId = getJobID(); |
| LOG.info("Running job: " + jobId); |
| int eventCounter = 0; |
| boolean profiling = getProfileEnabled(); |
| IntegerRanges mapRanges = getProfileTaskRange(true); |
| IntegerRanges reduceRanges = getProfileTaskRange(false); |
| int progMonitorPollIntervalMillis = |
| Job.getProgressPollInterval(clientConf); |
| while (!isComplete()) { |
| Thread.sleep(progMonitorPollIntervalMillis); |
| String report = |
| (" map " + StringUtils.formatPercent(mapProgress(), 0)+ |
| " reduce " + |
| StringUtils.formatPercent(reduceProgress(), 0)); |
| if (!report.equals(lastReport)) { |
| LOG.info(report); |
| lastReport = report; |
| } |
| |
| TaskCompletionEvent[] events = |
| getTaskCompletionEvents(eventCounter, 10); |
| eventCounter += events.length; |
| printTaskEvents(events, filter, profiling, mapRanges, reduceRanges); |
| } |
| LOG.info("Job complete: " + jobId); |
| Counters counters = getCounters(); |
| if (counters != null) { |
| LOG.info(counters.toString()); |
| } |
| return isSuccessful(); |
| } |
| |
| /** |
| * @return true if the profile parameters indicate that this is using |
| * hprof, which generates profile files in a particular location |
| * that we can retrieve to the client. |
| */ |
| private boolean shouldDownloadProfile() { |
| // Check the argument string that was used to initialize profiling. |
| // If this indicates hprof and file-based output, then we're ok to |
| // download. |
| String profileParams = getProfileParams(); |
| |
| if (null == profileParams) { |
| return false; |
| } |
| |
| // Split this on whitespace. |
| String [] parts = profileParams.split("[ \\t]+"); |
| |
| // If any of these indicate hprof, and the use of output files, return true. |
| boolean hprofFound = false; |
| boolean fileFound = false; |
| for (String p : parts) { |
| if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) { |
| hprofFound = true; |
| |
| // This contains a number of comma-delimited components, one of which |
| // may specify the file to write to. Make sure this is present and |
| // not empty. |
| String [] subparts = p.split(","); |
| for (String sub : subparts) { |
| if (sub.startsWith("file=") && sub.length() != "file=".length()) { |
| fileFound = true; |
| } |
| } |
| } |
| } |
| |
| return hprofFound && fileFound; |
| } |
| |
| private void printTaskEvents(TaskCompletionEvent[] events, |
| Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges, |
| IntegerRanges reduceRanges) throws IOException, InterruptedException { |
| for (TaskCompletionEvent event : events) { |
| TaskCompletionEvent.Status status = event.getStatus(); |
| if (profiling && shouldDownloadProfile() && |
| (status == TaskCompletionEvent.Status.SUCCEEDED || |
| status == TaskCompletionEvent.Status.FAILED) && |
| (event.isMapTask() ? mapRanges : reduceRanges). |
| isIncluded(event.idWithinJob())) { |
| downloadProfile(event); |
| } |
| switch (filter) { |
| case NONE: |
| break; |
| case SUCCEEDED: |
| if (event.getStatus() == |
| TaskCompletionEvent.Status.SUCCEEDED) { |
| LOG.info(event.toString()); |
| displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp()); |
| } |
| break; |
| case FAILED: |
| if (event.getStatus() == |
| TaskCompletionEvent.Status.FAILED) { |
| LOG.info(event.toString()); |
| // Displaying the task diagnostic information |
| TaskAttemptID taskId = event.getTaskAttemptId(); |
| String[] taskDiagnostics = getTaskDiagnostics(taskId); |
| if (taskDiagnostics != null) { |
| for (String diagnostics : taskDiagnostics) { |
| System.err.println(diagnostics); |
| } |
| } |
| // Displaying the task logs |
| displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp()); |
| } |
| break; |
| case KILLED: |
| if (event.getStatus() == TaskCompletionEvent.Status.KILLED){ |
| LOG.info(event.toString()); |
| } |
| break; |
| case ALL: |
| LOG.info(event.toString()); |
| displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp()); |
| break; |
| } |
| } |
| } |
| |
| private void downloadProfile(TaskCompletionEvent e) throws IOException { |
| URLConnection connection = new URL( |
| getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) + |
| "&filter=profile").openConnection(); |
| InputStream in = connection.getInputStream(); |
| OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile"); |
| IOUtils.copyBytes(in, out, 64 * 1024, true); |
| } |
| |
| private void displayTaskLogs(TaskAttemptID taskId, String baseUrl) |
| throws IOException { |
| // The tasktracker for a 'failed/killed' job might not be around... |
| if (baseUrl != null) { |
| // Construct the url for the tasklogs |
| String taskLogUrl = getTaskLogURL(taskId, baseUrl); |
| |
| // Copy tasks's stdout of the JobClient |
| getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out); |
| |
| // Copy task's stderr to stderr of the JobClient |
| getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err); |
| } |
| } |
| |
| private void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl, |
| OutputStream out) { |
| try { |
| int tasklogtimeout = cluster.getConf().getInt( |
| TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT); |
| URLConnection connection = taskLogUrl.openConnection(); |
| connection.setReadTimeout(tasklogtimeout); |
| connection.setConnectTimeout(tasklogtimeout); |
| BufferedReader input = |
| new BufferedReader(new InputStreamReader(connection.getInputStream())); |
| BufferedWriter output = |
| new BufferedWriter(new OutputStreamWriter(out)); |
| try { |
| String logData = null; |
| while ((logData = input.readLine()) != null) { |
| if (logData.length() > 0) { |
| output.write(taskId + ": " + logData + "\n"); |
| output.flush(); |
| } |
| } |
| } finally { |
| input.close(); |
| } |
| } catch(IOException ioe) { |
| LOG.warn("Error reading task output" + ioe.getMessage()); |
| } |
| } |
| |
| private String getTaskLogURL(TaskAttemptID taskId, String baseUrl) { |
| return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId); |
| } |
| |
| /** The interval at which monitorAndPrintJob() prints status */ |
| public static int getProgressPollInterval(Configuration conf) { |
| // Read progress monitor poll interval from config. Default is 1 second. |
| int progMonitorPollIntervalMillis = conf.getInt( |
| PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL); |
| if (progMonitorPollIntervalMillis < 1) { |
| LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + |
| " has been set to an invalid value; " |
| + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL); |
| progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL; |
| } |
| return progMonitorPollIntervalMillis; |
| } |
| |
| /** The interval at which waitForCompletion() should check. */ |
| public static int getCompletionPollInterval(Configuration conf) { |
| int completionPollIntervalMillis = conf.getInt( |
| COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL); |
| if (completionPollIntervalMillis < 1) { |
| LOG.warn(COMPLETION_POLL_INTERVAL_KEY + |
| " has been set to an invalid value; " |
| + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL); |
| completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL; |
| } |
| return completionPollIntervalMillis; |
| } |
| |
| /** |
| * Get the task output filter. |
| * |
| * @param conf the configuration. |
| * @return the filter level. |
| */ |
| public static TaskStatusFilter getTaskOutputFilter(Configuration conf) { |
| return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED")); |
| } |
| |
| /** |
| * Modify the Configuration to set the task output filter. |
| * |
| * @param conf the Configuration to modify. |
| * @param newValue the value to set. |
| */ |
| public static void setTaskOutputFilter(Configuration conf, |
| TaskStatusFilter newValue) { |
| conf.set(Job.OUTPUT_FILTER, newValue.toString()); |
| } |
| |
| } |