blob: 46cf9375f7edc0cfbfc243a8bcf1e0cf1917a817 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URL;
import java.net.URLConnection;
import java.net.URI;
import javax.security.auth.login.LoginException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configuration.IntegerRanges;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.util.StringUtils;
/**
* The job submitter's view of the Job.
*
* <p>It allows the user to configure the
* job, submit it, control its execution, and query the state. The set methods
* only work until the job is submitted, afterwards they will throw an
* IllegalStateException. </p>
*
* <p>
* Normally the user creates the application, describes various facets of the
* job via {@link Job} and then submits the job and monitor its progress.</p>
*
* <p>Here is an example on how to submit a job:</p>
* <p><blockquote><pre>
* // Create a new Job
* Job job = new Job(new Configuration());
* job.setJarByClass(MyJob.class);
*
* // Specify various job-specific parameters
* job.setJobName("myjob");
*
* job.setInputPath(new Path("in"));
* job.setOutputPath(new Path("out"));
*
* job.setMapperClass(MyJob.MyMapper.class);
* job.setReducerClass(MyJob.MyReducer.class);
*
* // Submit the job, then poll for progress until the job is complete
* job.waitForCompletion(true);
* </pre></blockquote></p>
*
*
*/
public class Job extends JobContextImpl implements JobContext {
private static final Log LOG = LogFactory.getLog(Job.class);
public static enum JobState {DEFINE, RUNNING};
private static final long MAX_JOBSTATUS_AGE = 1000 * 2;
public static final String OUTPUT_FILTER = "mapreduce.client.output.filter";
/** Key in mapred-*.xml that sets completionPollInvervalMillis */
public static final String COMPLETION_POLL_INTERVAL_KEY =
"mapreduce.client.completion.pollinterval";
/** Default completionPollIntervalMillis is 5000 ms. */
static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000;
/** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */
public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY =
"mapreduce.client.progressmonitor.pollinterval";
/** Default progMonitorPollIntervalMillis is 1000 ms. */
static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000;
public static final String USED_GENERIC_PARSER =
"mapreduce.client.genericoptionsparser.used";
public static final String SUBMIT_REPLICATION =
"mapreduce.client.submit.file.replication";
public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
static {
ConfigUtil.loadResources();
}
private JobState state = JobState.DEFINE;
private JobStatus status;
private long statustime;
private Cluster cluster;
@Deprecated
public Job() throws IOException {
this(new Configuration());
}
@Deprecated
public Job(Configuration conf) throws IOException {
this(new Cluster(conf), conf);
}
@Deprecated
public Job(Configuration conf, String jobName) throws IOException {
this(conf);
setJobName(jobName);
}
Job(Cluster cluster) throws IOException {
this(cluster, new Configuration());
}
Job(Cluster cluster, Configuration conf) throws IOException {
super(conf, null);
this.cluster = cluster;
}
Job(Cluster cluster, JobStatus status,
Configuration conf) throws IOException {
this(cluster, conf);
state = JobState.RUNNING;
this.status = status;
}
public static Job getInstance(Cluster cluster) throws IOException {
return new Job(cluster);
}
public static Job getInstance(Cluster cluster, Configuration conf)
throws IOException {
return new Job(cluster, conf);
}
public static Job getInstance(Cluster cluster, JobStatus status,
Configuration conf) throws IOException {
return new Job(cluster, status, conf);
}
private void ensureState(JobState state) throws IllegalStateException {
if (state != this.state) {
throw new IllegalStateException("Job in state "+ this.state +
" instead of " + state);
}
}
/**
* Some methods rely on having a recent job status object. Refresh
* it, if necessary
*/
synchronized void ensureFreshStatus()
throws IOException, InterruptedException {
if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
updateStatus();
}
}
/** Some methods need to update status immediately. So, refresh
* immediately
* @throws IOException
*/
synchronized void updateStatus() throws IOException, InterruptedException {
this.status = cluster.getClient().getJobStatus(status.getJobID());
if (this.status == null) {
throw new IOException("Job status not available ");
}
this.statustime = System.currentTimeMillis();
}
public JobStatus getStatus() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
updateStatus();
return status;
}
/**
* Get the job identifier.
*
* @return the job identifier.
*/
public JobID getID() {
ensureState(JobState.RUNNING);
return status.getJobID();
}
/**
* Returns the current state of the Job.
*
* @return JobStatus#State
* @throws IOException
* @throws InterruptedException
*/
public JobStatus.State getJobState()
throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
updateStatus();
return status.getState();
}
/**
* Get the URL where some job progress information will be displayed.
*
* @return the URL where some job progress information will be displayed.
*/
public String getTrackingURL(){
ensureState(JobState.RUNNING);
return status.getTrackingUrl().toString();
}
/**
* Get the path of the submitted job configuration.
*
* @return the path of the submitted job configuration.
*/
public String getJobFile() {
ensureState(JobState.RUNNING);
return status.getJobFile();
}
/**
* Get start time of the job.
*
* @return the start time of the job
*/
public long getStartTime() {
ensureState(JobState.RUNNING);
return status.getStartTime();
}
/**
* Get finish time of the job.
*
* @return the finish time of the job
*/
public long getFinishTime() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
updateStatus();
return status.getFinishTime();
}
/**
* Get scheduling info of the job.
*
* @return the scheduling info of the job
*/
public String getSchedulingInfo() {
ensureState(JobState.RUNNING);
return status.getSchedulingInfo();
}
/**
* Get scheduling info of the job.
*
* @return the scheduling info of the job
*/
public JobPriority getPriority() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
updateStatus();
return status.getPriority();
}
/**
* The user-specified job name.
*/
public String getJobName() {
if (state == JobState.DEFINE) {
return super.getJobName();
}
ensureState(JobState.RUNNING);
return status.getJobName();
}
public String getHistoryUrl() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
updateStatus();
return status.getHistoryFile();
}
public boolean isRetired() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
updateStatus();
return status.isRetired();
}
/**
* Dump stats to screen.
*/
@Override
public String toString() {
ensureState(JobState.RUNNING);
try {
updateStatus();
} catch (IOException e) {
} catch (InterruptedException ie) {
}
StringBuffer sb = new StringBuffer();
sb.append("Job: ").append(status.getJobID()).append("\n");
sb.append("Job File: ").append(status.getJobFile()).append("\n");
sb.append("Job Tracking URL : ").append(status.getTrackingUrl());
sb.append("\n");
sb.append("map() completion: ");
sb.append(status.getMapProgress()).append("\n");
sb.append("reduce() completion: ");
sb.append(status.getReduceProgress()).append("\n");
sb.append("Job state: ");
sb.append(status.getState()).append("\n");
sb.append("history URL: ");
sb.append(status.getHistoryFile()).append("\n");
sb.append("retired: ").append(status.isRetired());
return sb.toString();
}
/**
* Get the information of the current state of the tasks of a job.
*
* @param type Type of the task
* @return the list of all of the map tips.
* @throws IOException
*/
public TaskReport[] getTaskReports(TaskType type)
throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
return cluster.getClient().getTaskReports(getID(), type);
}
/**
* Get the <i>progress</i> of the job's map-tasks, as a float between 0.0
* and 1.0. When all map tasks have completed, the function returns 1.0.
*
* @return the progress of the job's map-tasks.
* @throws IOException
*/
public float mapProgress() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
ensureFreshStatus();
return status.getMapProgress();
}
/**
* Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0
* and 1.0. When all reduce tasks have completed, the function returns 1.0.
*
* @return the progress of the job's reduce-tasks.
* @throws IOException
*/
public float reduceProgress() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
ensureFreshStatus();
return status.getReduceProgress();
}
/**
* Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0
* and 1.0. When all cleanup tasks have completed, the function returns 1.0.
*
* @return the progress of the job's cleanup-tasks.
* @throws IOException
*/
public float cleanupProgress() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
ensureFreshStatus();
return status.getCleanupProgress();
}
/**
* Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0
* and 1.0. When all setup tasks have completed, the function returns 1.0.
*
* @return the progress of the job's setup-tasks.
* @throws IOException
*/
public float setupProgress() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
ensureFreshStatus();
return status.getSetupProgress();
}
/**
* Check if the job is finished or not.
* This is a non-blocking call.
*
* @return <code>true</code> if the job is complete, else <code>false</code>.
* @throws IOException
*/
public boolean isComplete() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
updateStatus();
return status.isJobComplete();
}
/**
* Check if the job completed successfully.
*
* @return <code>true</code> if the job succeeded, else <code>false</code>.
* @throws IOException
*/
public boolean isSuccessful() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
updateStatus();
return status.getState() == JobStatus.State.SUCCEEDED;
}
/**
* Kill the running job. Blocks until all job tasks have been
* killed as well. If the job is no longer running, it simply returns.
*
* @throws IOException
*/
public void killJob() throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
cluster.getClient().killJob(getID());
}
/**
* Set the priority of a running job.
* @param priority the new priority for the job.
* @throws IOException
*/
public void setPriority(JobPriority priority)
throws IOException, InterruptedException {
if (state == JobState.DEFINE) {
conf.setJobPriority(
org.apache.hadoop.mapred.JobPriority.valueOf(priority.name()));
} else {
ensureState(JobState.RUNNING);
cluster.getClient().setJobPriority(getID(), priority.toString());
}
}
/**
* Get events indicating completion (success/failure) of component tasks.
*
* @param startFrom index to start fetching events from
* @param numEvents number of events to fetch
* @return an array of {@link TaskCompletionEvent}s
* @throws IOException
*/
public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom,
int numEvents) throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
return cluster.getClient().getTaskCompletionEvents(getID(),
startFrom, numEvents);
}
/**
* Kill indicated task attempt.
*
* @param taskId the id of the task to be terminated.
* @throws IOException
*/
public boolean killTask(TaskAttemptID taskId)
throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
return cluster.getClient().killTask(taskId, false);
}
/**
* Fail indicated task attempt.
*
* @param taskId the id of the task to be terminated.
* @throws IOException
*/
public boolean failTask(TaskAttemptID taskId)
throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
return cluster.getClient().killTask(taskId, true);
}
/**
* Gets the counters for this job.
*
* @return the counters for this job.
* @throws IOException
*/
public Counters getCounters()
throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
return cluster.getClient().getJobCounters(getID());
}
/**
* Gets the diagnostic messages for a given task attempt.
* @param taskid
* @return the list of diagnostic messages for the task
* @throws IOException
*/
public String[] getTaskDiagnostics(TaskAttemptID taskid)
throws IOException, InterruptedException {
ensureState(JobState.RUNNING);
return cluster.getClient().getTaskDiagnostics(taskid);
}
/**
* Set the number of reduce tasks for the job.
* @param tasks the number of reduce tasks
* @throws IllegalStateException if the job is submitted
*/
public void setNumReduceTasks(int tasks) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setNumReduceTasks(tasks);
}
/**
* Set the current working directory for the default file system.
*
* @param dir the new current working directory.
* @throws IllegalStateException if the job is submitted
*/
public void setWorkingDirectory(Path dir) throws IOException {
ensureState(JobState.DEFINE);
conf.setWorkingDirectory(dir);
}
/**
* Set the {@link InputFormat} for the job.
* @param cls the <code>InputFormat</code> to use
* @throws IllegalStateException if the job is submitted
*/
public void setInputFormatClass(Class<? extends InputFormat> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls,
InputFormat.class);
}
/**
* Set the {@link OutputFormat} for the job.
* @param cls the <code>OutputFormat</code> to use
* @throws IllegalStateException if the job is submitted
*/
public void setOutputFormatClass(Class<? extends OutputFormat> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls,
OutputFormat.class);
}
/**
* Set the {@link Mapper} for the job.
* @param cls the <code>Mapper</code> to use
* @throws IllegalStateException if the job is submitted
*/
public void setMapperClass(Class<? extends Mapper> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
}
/**
* Set the Jar by finding where a given class came from.
* @param cls the example class
*/
public void setJarByClass(Class<?> cls) {
ensureState(JobState.DEFINE);
conf.setJarByClass(cls);
}
/**
* Set the job jar
*/
public void setJar(String jar) {
ensureState(JobState.DEFINE);
conf.setJar(jar);
}
/**
* Set the reported username for this job.
*
* @param user the username for this job.
*/
public void setUser(String user) {
ensureState(JobState.DEFINE);
conf.setUser(user);
}
/**
* Set the combiner class for the job.
* @param cls the combiner to use
* @throws IllegalStateException if the job is submitted
*/
public void setCombinerClass(Class<? extends Reducer> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
}
/**
* Set the {@link Reducer} for the job.
* @param cls the <code>Reducer</code> to use
* @throws IllegalStateException if the job is submitted
*/
public void setReducerClass(Class<? extends Reducer> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
}
/**
* Set the {@link Partitioner} for the job.
* @param cls the <code>Partitioner</code> to use
* @throws IllegalStateException if the job is submitted
*/
public void setPartitionerClass(Class<? extends Partitioner> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(PARTITIONER_CLASS_ATTR, cls,
Partitioner.class);
}
/**
* Set the key class for the map output data. This allows the user to
* specify the map output key class to be different than the final output
* value class.
*
* @param theClass the map output key class.
* @throws IllegalStateException if the job is submitted
*/
public void setMapOutputKeyClass(Class<?> theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setMapOutputKeyClass(theClass);
}
/**
* Set the value class for the map output data. This allows the user to
* specify the map output value class to be different than the final output
* value class.
*
* @param theClass the map output value class.
* @throws IllegalStateException if the job is submitted
*/
public void setMapOutputValueClass(Class<?> theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setMapOutputValueClass(theClass);
}
/**
* Set the key class for the job output data.
*
* @param theClass the key class for the job output data.
* @throws IllegalStateException if the job is submitted
*/
public void setOutputKeyClass(Class<?> theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setOutputKeyClass(theClass);
}
/**
* Set the value class for job outputs.
*
* @param theClass the value class for job outputs.
* @throws IllegalStateException if the job is submitted
*/
public void setOutputValueClass(Class<?> theClass
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setOutputValueClass(theClass);
}
/**
* Define the comparator that controls how the keys are sorted before they
* are passed to the {@link Reducer}.
* @param cls the raw comparator
* @throws IllegalStateException if the job is submitted
*/
public void setSortComparatorClass(Class<? extends RawComparator> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setOutputKeyComparatorClass(cls);
}
/**
* Define the comparator that controls which keys are grouped together
* for a single call to
* {@link Reducer#reduce(Object, Iterable,
* org.apache.hadoop.mapreduce.Reducer.Context)}
* @param cls the raw comparator to use
* @throws IllegalStateException if the job is submitted
*/
public void setGroupingComparatorClass(Class<? extends RawComparator> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setOutputValueGroupingComparator(cls);
}
/**
* Set the user-specified job name.
*
* @param name the job's new name.
* @throws IllegalStateException if the job is submitted
*/
public void setJobName(String name) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setJobName(name);
}
/**
* Turn speculative execution on or off for this job.
*
* @param speculativeExecution <code>true</code> if speculative execution
* should be turned on, else <code>false</code>.
*/
public void setSpeculativeExecution(boolean speculativeExecution) {
ensureState(JobState.DEFINE);
conf.setSpeculativeExecution(speculativeExecution);
}
/**
* Turn speculative execution on or off for this job for map tasks.
*
* @param speculativeExecution <code>true</code> if speculative execution
* should be turned on for map tasks,
* else <code>false</code>.
*/
public void setMapSpeculativeExecution(boolean speculativeExecution) {
ensureState(JobState.DEFINE);
conf.setMapSpeculativeExecution(speculativeExecution);
}
/**
* Turn speculative execution on or off for this job for reduce tasks.
*
* @param speculativeExecution <code>true</code> if speculative execution
* should be turned on for reduce tasks,
* else <code>false</code>.
*/
public void setReduceSpeculativeExecution(boolean speculativeExecution) {
ensureState(JobState.DEFINE);
conf.setReduceSpeculativeExecution(speculativeExecution);
}
/**
* Specify whether job-setup and job-cleanup is needed for the job
*
* @param needed If <code>true</code>, job-setup and job-cleanup will be
* considered from {@link OutputCommitter}
* else ignored.
*/
public void setJobSetupCleanupNeeded(boolean needed) {
ensureState(JobState.DEFINE);
conf.setBoolean(SETUP_CLEANUP_NEEDED, needed);
}
/**
* Set the given set of archives
* @param archives The list of archives that need to be localized
*/
public void setCacheArchives(URI[] archives) {
ensureState(JobState.DEFINE);
DistributedCache.setCacheArchives(archives, conf);
}
/**
* Set the given set of files
* @param files The list of files that need to be localized
*/
public void setCacheFiles(URI[] files) {
ensureState(JobState.DEFINE);
DistributedCache.setCacheFiles(files, conf);
}
/**
* Add a archives to be localized
* @param uri The uri of the cache to be localized
*/
public void addCacheArchive(URI uri) {
ensureState(JobState.DEFINE);
DistributedCache.addCacheArchive(uri, conf);
}
/**
* Add a file to be localized
* @param uri The uri of the cache to be localized
*/
public void addCacheFile(URI uri) {
ensureState(JobState.DEFINE);
DistributedCache.addCacheFile(uri, conf);
}
/**
* Add an file path to the current set of classpath entries It adds the file
* to cache as well.
*
* @param file Path of the file to be added
*/
public void addFileToClassPath(Path file)
throws IOException {
ensureState(JobState.DEFINE);
DistributedCache.addFileToClassPath(file, conf);
}
/**
* Add an archive path to the current set of classpath entries. It adds the
* archive to cache as well.
*
* @param archive Path of the archive to be added
*/
public void addArchiveToClassPath(Path archive)
throws IOException {
ensureState(JobState.DEFINE);
DistributedCache.addArchiveToClassPath(archive, conf);
}
/**
* This method allows you to create symlinks in the current working directory
* of the task to all the cache files/archives
*/
public void createSymlink() {
ensureState(JobState.DEFINE);
DistributedCache.createSymlink(conf);
}
/**
* Expert: Set the number of maximum attempts that will be made to run a
* map task.
*
* @param n the number of attempts per map task.
*/
public void setMaxMapAttempts(int n) {
ensureState(JobState.DEFINE);
conf.setMaxMapAttempts(n);
}
/**
* Expert: Set the number of maximum attempts that will be made to run a
* reduce task.
*
* @param n the number of attempts per reduce task.
*/
public void setMaxReduceAttempts(int n) {
ensureState(JobState.DEFINE);
conf.setMaxReduceAttempts(n);
}
/**
* Set whether the system should collect profiler information for some of
* the tasks in this job? The information is stored in the user log
* directory.
* @param newValue true means it should be gathered
*/
public void setProfileEnabled(boolean newValue) {
ensureState(JobState.DEFINE);
conf.setProfileEnabled(newValue);
}
/**
* Set the profiler configuration arguments. If the string contains a '%s' it
* will be replaced with the name of the profiling output file when the task
* runs.
*
* This value is passed to the task child JVM on the command line.
*
* @param value the configuration string
*/
public void setProfileParams(String value) {
ensureState(JobState.DEFINE);
conf.setProfileParams(value);
}
/**
* Set the ranges of maps or reduces to profile. setProfileEnabled(true)
* must also be called.
* @param newValue a set of integer ranges of the map ids
*/
public void setProfileTaskRange(boolean isMap, String newValue) {
ensureState(JobState.DEFINE);
conf.setProfileTaskRange(isMap, newValue);
}
private void ensureNotSet(String attr, String msg) throws IOException {
if (conf.get(attr) != null) {
throw new IOException(attr + " is incompatible with " + msg + " mode.");
}
}
/**
* Default to the new APIs unless they are explicitly set or the old mapper or
* reduce attributes are used.
* @throws IOException if the configuration is inconsistant
*/
private void setUseNewAPI() throws IOException {
int numReduces = conf.getNumReduceTasks();
String oldMapperClass = "mapred.mapper.class";
String oldReduceClass = "mapred.reducer.class";
conf.setBooleanIfUnset("mapred.mapper.new-api",
conf.get(oldMapperClass) == null);
if (conf.getUseNewMapper()) {
String mode = "new map API";
ensureNotSet("mapred.input.format.class", mode);
ensureNotSet(oldMapperClass, mode);
if (numReduces != 0) {
ensureNotSet("mapred.partitioner.class", mode);
} else {
ensureNotSet("mapred.output.format.class", mode);
}
} else {
String mode = "map compatability";
ensureNotSet(INPUT_FORMAT_CLASS_ATTR, mode);
ensureNotSet(MAP_CLASS_ATTR, mode);
if (numReduces != 0) {
ensureNotSet(PARTITIONER_CLASS_ATTR, mode);
} else {
ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
}
}
if (numReduces != 0) {
conf.setBooleanIfUnset("mapred.reducer.new-api",
conf.get(oldReduceClass) == null);
if (conf.getUseNewReducer()) {
String mode = "new reduce API";
ensureNotSet("mapred.output.format.class", mode);
ensureNotSet(oldReduceClass, mode);
} else {
String mode = "reduce compatability";
ensureNotSet(OUTPUT_FORMAT_CLASS_ATTR, mode);
ensureNotSet(REDUCE_CLASS_ATTR, mode);
}
}
}
/**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit() throws IOException, InterruptedException,
ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
status = new JobSubmitter(cluster.getFileSystem(),
cluster.getClient()).submitJobInternal(this, cluster);
state = JobState.RUNNING;
}
/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
/**
* Monitor a job and print status in real-time as progress is made and tasks
* fail.
* @return true if the job succeeded
* @throws IOException if communication to the JobTracker fails
*/
public boolean monitorAndPrintJob()
throws IOException, InterruptedException {
String lastReport = null;
Job.TaskStatusFilter filter;
Configuration clientConf = cluster.getConf();
filter = Job.getTaskOutputFilter(clientConf);
JobID jobId = getID();
LOG.info("Running job: " + jobId);
int eventCounter = 0;
boolean profiling = getProfileEnabled();
IntegerRanges mapRanges = getProfileTaskRange(true);
IntegerRanges reduceRanges = getProfileTaskRange(false);
int progMonitorPollIntervalMillis =
Job.getProgressPollInterval(clientConf);
while (!isComplete()) {
Thread.sleep(progMonitorPollIntervalMillis);
String report =
(" map " + StringUtils.formatPercent(mapProgress(), 0)+
" reduce " +
StringUtils.formatPercent(reduceProgress(), 0));
if (!report.equals(lastReport)) {
LOG.info(report);
lastReport = report;
}
TaskCompletionEvent[] events =
getTaskCompletionEvents(eventCounter, 10);
eventCounter += events.length;
printTaskEvents(events, filter, profiling, mapRanges, reduceRanges);
}
LOG.info("Job complete: " + jobId);
Counters counters = getCounters();
if (counters != null) {
LOG.info(counters.toString());
}
return isSuccessful();
}
/**
* @return true if the profile parameters indicate that this is using
* hprof, which generates profile files in a particular location
* that we can retrieve to the client.
*/
private boolean shouldDownloadProfile() {
// Check the argument string that was used to initialize profiling.
// If this indicates hprof and file-based output, then we're ok to
// download.
String profileParams = getProfileParams();
if (null == profileParams) {
return false;
}
// Split this on whitespace.
String [] parts = profileParams.split("[ \\t]+");
// If any of these indicate hprof, and the use of output files, return true.
boolean hprofFound = false;
boolean fileFound = false;
for (String p : parts) {
if (p.startsWith("-agentlib:hprof") || p.startsWith("-Xrunhprof")) {
hprofFound = true;
// This contains a number of comma-delimited components, one of which
// may specify the file to write to. Make sure this is present and
// not empty.
String [] subparts = p.split(",");
for (String sub : subparts) {
if (sub.startsWith("file=") && sub.length() != "file=".length()) {
fileFound = true;
}
}
}
}
return hprofFound && fileFound;
}
private void printTaskEvents(TaskCompletionEvent[] events,
Job.TaskStatusFilter filter, boolean profiling, IntegerRanges mapRanges,
IntegerRanges reduceRanges) throws IOException, InterruptedException {
for (TaskCompletionEvent event : events) {
TaskCompletionEvent.Status status = event.getStatus();
if (profiling && shouldDownloadProfile() &&
(status == TaskCompletionEvent.Status.SUCCEEDED ||
status == TaskCompletionEvent.Status.FAILED) &&
(event.isMapTask() ? mapRanges : reduceRanges).
isIncluded(event.idWithinJob())) {
downloadProfile(event);
}
switch (filter) {
case NONE:
break;
case SUCCEEDED:
if (event.getStatus() ==
TaskCompletionEvent.Status.SUCCEEDED) {
LOG.info(event.toString());
displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
}
break;
case FAILED:
if (event.getStatus() ==
TaskCompletionEvent.Status.FAILED) {
LOG.info(event.toString());
// Displaying the task diagnostic information
TaskAttemptID taskId = event.getTaskAttemptId();
String[] taskDiagnostics = getTaskDiagnostics(taskId);
if (taskDiagnostics != null) {
for (String diagnostics : taskDiagnostics) {
System.err.println(diagnostics);
}
}
// Displaying the task logs
displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
}
break;
case KILLED:
if (event.getStatus() == TaskCompletionEvent.Status.KILLED){
LOG.info(event.toString());
}
break;
case ALL:
LOG.info(event.toString());
displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
break;
}
}
}
private void downloadProfile(TaskCompletionEvent e) throws IOException {
URLConnection connection = new URL(
getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) +
"&filter=profile").openConnection();
InputStream in = connection.getInputStream();
OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile");
IOUtils.copyBytes(in, out, 64 * 1024, true);
}
private void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
throws IOException {
// The tasktracker for a 'failed/killed' job might not be around...
if (baseUrl != null) {
// Construct the url for the tasklogs
String taskLogUrl = getTaskLogURL(taskId, baseUrl);
// Copy tasks's stdout of the JobClient
getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
// Copy task's stderr to stderr of the JobClient
getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err);
}
}
private void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl,
OutputStream out) {
try {
URLConnection connection = taskLogUrl.openConnection();
BufferedReader input =
new BufferedReader(new InputStreamReader(connection.getInputStream()));
BufferedWriter output =
new BufferedWriter(new OutputStreamWriter(out));
try {
String logData = null;
while ((logData = input.readLine()) != null) {
if (logData.length() > 0) {
output.write(taskId + ": " + logData + "\n");
output.flush();
}
}
} finally {
input.close();
}
} catch(IOException ioe) {
LOG.warn("Error reading task output" + ioe.getMessage());
}
}
private String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId);
}
/**
* Set the UGI, user name and the group name for the job.
*
* This method is called by job submission code while submitting the job.
* Internal to MapReduce project.
* @throws IOException
*/
public void setUGIAndUserGroupNames()
throws IOException {
UnixUserGroupInformation ugi = Job.getUGI(conf);
setUser(ugi.getUserName());
if (ugi.getGroupNames().length > 0) {
conf.set("group.name", ugi.getGroupNames()[0]);
}
}
/** The interval at which monitorAndPrintJob() prints status */
public static int getProgressPollInterval(Configuration conf) {
// Read progress monitor poll interval from config. Default is 1 second.
int progMonitorPollIntervalMillis = conf.getInt(
PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL);
if (progMonitorPollIntervalMillis < 1) {
LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY +
" has been set to an invalid value; "
+ " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL);
progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL;
}
return progMonitorPollIntervalMillis;
}
/** The interval at which waitForCompletion() should check. */
public static int getCompletionPollInterval(Configuration conf) {
int completionPollIntervalMillis = conf.getInt(
COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL);
if (completionPollIntervalMillis < 1) {
LOG.warn(COMPLETION_POLL_INTERVAL_KEY +
" has been set to an invalid value; "
+ "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL);
completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL;
}
return completionPollIntervalMillis;
}
/**
* Get the task output filter.
*
* @param conf the configuration.
* @return the filter level.
*/
public static TaskStatusFilter getTaskOutputFilter(Configuration conf) {
return TaskStatusFilter.valueOf(conf.get(Job.OUTPUT_FILTER, "FAILED"));
}
/**
* Modify the Configuration to set the task output filter.
*
* @param conf the Configuration to modify.
* @param newValue the value to set.
*/
public static void setTaskOutputFilter(Configuration conf,
TaskStatusFilter newValue) {
conf.set(Job.OUTPUT_FILTER, newValue.toString());
}
public static UnixUserGroupInformation getUGI(Configuration job)
throws IOException {
UnixUserGroupInformation ugi = null;
try {
ugi = UnixUserGroupInformation.login(job, true);
} catch (LoginException e) {
throw (IOException)(new IOException(
"Failed to get the current user's information.").initCause(e));
}
return ugi;
}
}