blob: facd75945a5fbc6cf7e7d31b4704b10024771e26 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.tools;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TIPStatus;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.jobhistory.HistoryViewer;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import com.google.common.base.Charsets;
/**
* Interprets the map reduce cli options
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class CLI extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(CLI.class);
protected Cluster cluster;
private static final Set<String> taskTypes = new HashSet<String>(
Arrays.asList("MAP", "REDUCE"));
private final Set<String> taskStates = new HashSet<String>(Arrays.asList(
"running", "completed", "pending", "failed", "killed"));
public CLI() {
}
public CLI(Configuration conf) {
setConf(conf);
}
public int run(String[] argv) throws Exception {
int exitCode = -1;
if (argv.length < 1) {
displayUsage("");
return exitCode;
}
// process arguments
String cmd = argv[0];
String submitJobFile = null;
String jobid = null;
String taskid = null;
String historyFileOrJobId = null;
String historyOutFile = null;
String historyOutFormat = HistoryViewer.HUMAN_FORMAT;
String counterGroupName = null;
String counterName = null;
JobPriority jp = null;
String taskType = null;
String taskState = null;
int fromEvent = 0;
int nEvents = 0;
int jpvalue = 0;
String configOutFile = null;
boolean getStatus = false;
boolean getCounter = false;
boolean killJob = false;
boolean listEvents = false;
boolean viewHistory = false;
boolean viewAllHistory = false;
boolean listJobs = false;
boolean listAllJobs = false;
boolean listActiveTrackers = false;
boolean listBlacklistedTrackers = false;
boolean displayTasks = false;
boolean killTask = false;
boolean failTask = false;
boolean setJobPriority = false;
boolean logs = false;
boolean downloadConfig = false;
if ("-submit".equals(cmd)) {
if (argv.length != 2) {
displayUsage(cmd);
return exitCode;
}
submitJobFile = argv[1];
} else if ("-status".equals(cmd)) {
if (argv.length != 2) {
displayUsage(cmd);
return exitCode;
}
jobid = argv[1];
getStatus = true;
} else if("-counter".equals(cmd)) {
if (argv.length != 4) {
displayUsage(cmd);
return exitCode;
}
getCounter = true;
jobid = argv[1];
counterGroupName = argv[2];
counterName = argv[3];
} else if ("-kill".equals(cmd)) {
if (argv.length != 2) {
displayUsage(cmd);
return exitCode;
}
jobid = argv[1];
killJob = true;
} else if ("-set-priority".equals(cmd)) {
if (argv.length != 3) {
displayUsage(cmd);
return exitCode;
}
jobid = argv[1];
try {
jp = JobPriority.valueOf(argv[2]);
} catch (IllegalArgumentException iae) {
try {
jpvalue = Integer.parseInt(argv[2]);
} catch (NumberFormatException ne) {
LOG.info(ne);
displayUsage(cmd);
return exitCode;
}
}
setJobPriority = true;
} else if ("-events".equals(cmd)) {
if (argv.length != 4) {
displayUsage(cmd);
return exitCode;
}
jobid = argv[1];
fromEvent = Integer.parseInt(argv[2]);
nEvents = Integer.parseInt(argv[3]);
listEvents = true;
} else if ("-history".equals(cmd)) {
viewHistory = true;
if (argv.length < 2 || argv.length > 7) {
displayUsage(cmd);
return exitCode;
}
// Some arguments are optional while others are not, and some require
// second arguments. Due to this, the indexing can vary depending on
// what's specified and what's left out, as summarized in the below table:
// [all] <jobHistoryFile|jobId> [-outfile <file>] [-format <human|json>]
// 1 2 3 4 5 6
// 1 2 3 4
// 1 2 3 4
// 1 2
// 1 2 3 4 5
// 1 2 3
// 1 2 3
// 1
// "all" is optional, but comes first if specified
int index = 1;
if ("all".equals(argv[index])) {
index++;
viewAllHistory = true;
if (argv.length == 2) {
displayUsage(cmd);
return exitCode;
}
}
// Get the job history file or job id argument
historyFileOrJobId = argv[index++];
// "-outfile" is optional, but if specified requires a second argument
if (argv.length > index + 1 && "-outfile".equals(argv[index])) {
index++;
historyOutFile = argv[index++];
}
// "-format" is optional, but if specified required a second argument
if (argv.length > index + 1 && "-format".equals(argv[index])) {
index++;
historyOutFormat = argv[index++];
}
// Check for any extra arguments that don't belong here
if (argv.length > index) {
displayUsage(cmd);
return exitCode;
}
} else if ("-list".equals(cmd)) {
if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
displayUsage(cmd);
return exitCode;
}
if (argv.length == 2 && "all".equals(argv[1])) {
listAllJobs = true;
} else {
listJobs = true;
}
} else if("-kill-task".equals(cmd)) {
if (argv.length != 2) {
displayUsage(cmd);
return exitCode;
}
killTask = true;
taskid = argv[1];
} else if("-fail-task".equals(cmd)) {
if (argv.length != 2) {
displayUsage(cmd);
return exitCode;
}
failTask = true;
taskid = argv[1];
} else if ("-list-active-trackers".equals(cmd)) {
if (argv.length != 1) {
displayUsage(cmd);
return exitCode;
}
listActiveTrackers = true;
} else if ("-list-blacklisted-trackers".equals(cmd)) {
if (argv.length != 1) {
displayUsage(cmd);
return exitCode;
}
listBlacklistedTrackers = true;
} else if ("-list-attempt-ids".equals(cmd)) {
if (argv.length != 4) {
displayUsage(cmd);
return exitCode;
}
jobid = argv[1];
taskType = argv[2];
taskState = argv[3];
displayTasks = true;
if (!taskTypes.contains(
org.apache.hadoop.util.StringUtils.toUpperCase(taskType))) {
System.out.println("Error: Invalid task-type: " + taskType);
displayUsage(cmd);
return exitCode;
}
if (!taskStates.contains(
org.apache.hadoop.util.StringUtils.toLowerCase(taskState))) {
System.out.println("Error: Invalid task-state: " + taskState);
displayUsage(cmd);
return exitCode;
}
} else if ("-logs".equals(cmd)) {
if (argv.length == 2 || argv.length ==3) {
logs = true;
jobid = argv[1];
if (argv.length == 3) {
taskid = argv[2];
} else {
taskid = null;
}
} else {
displayUsage(cmd);
return exitCode;
}
} else if ("-config".equals(cmd)) {
downloadConfig = true;
if (argv.length != 3) {
displayUsage(cmd);
return exitCode;
}
jobid = argv[1];
configOutFile = argv[2];
} else {
displayUsage(cmd);
return exitCode;
}
// initialize cluster
cluster = createCluster();
// Submit the request
try {
if (submitJobFile != null) {
Job job = Job.getInstance(new JobConf(submitJobFile));
job.submit();
System.out.println("Created job " + job.getJobID());
exitCode = 0;
} else if (getStatus) {
Job job = getJob(JobID.forName(jobid));
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
Counters counters = job.getCounters();
System.out.println();
System.out.println(job);
if (counters != null) {
System.out.println(counters);
} else {
System.out.println("Counters not available. Job is retired.");
}
exitCode = 0;
}
} else if (getCounter) {
Job job = getJob(JobID.forName(jobid));
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
Counters counters = job.getCounters();
if (counters == null) {
System.out.println("Counters not available for retired job " +
jobid);
exitCode = -1;
} else {
System.out.println(getCounter(counters,
counterGroupName, counterName));
exitCode = 0;
}
}
} else if (killJob) {
Job job = getJob(JobID.forName(jobid));
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
JobStatus jobStatus = job.getStatus();
if (jobStatus.getState() == JobStatus.State.FAILED) {
System.out.println("Could not mark the job " + jobid
+ " as killed, as it has already failed.");
exitCode = -1;
} else if (jobStatus.getState() == JobStatus.State.KILLED) {
System.out
.println("The job " + jobid + " has already been killed.");
exitCode = -1;
} else if (jobStatus.getState() == JobStatus.State.SUCCEEDED) {
System.out.println("Could not kill the job " + jobid
+ ", as it has already succeeded.");
exitCode = -1;
} else {
job.killJob();
System.out.println("Killed job " + jobid);
exitCode = 0;
}
}
} else if (setJobPriority) {
Job job = getJob(JobID.forName(jobid));
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
if (jp != null) {
job.setPriority(jp);
} else {
job.setPriorityAsInteger(jpvalue);
}
System.out.println("Changed job priority.");
exitCode = 0;
}
} else if (viewHistory) {
// If it ends with .jhist, assume it's a jhist file; otherwise, assume
// it's a Job ID
if (historyFileOrJobId.endsWith(".jhist")) {
viewHistory(historyFileOrJobId, viewAllHistory, historyOutFile,
historyOutFormat);
exitCode = 0;
} else {
Job job = getJob(JobID.forName(historyFileOrJobId));
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
String historyUrl = job.getHistoryUrl();
if (historyUrl == null || historyUrl.isEmpty()) {
System.out.println("History file for job " + historyFileOrJobId +
" is currently unavailable.");
} else {
viewHistory(historyUrl, viewAllHistory, historyOutFile,
historyOutFormat);
exitCode = 0;
}
}
}
} else if (listEvents) {
Job job = getJob(JobID.forName(jobid));
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
listEvents(job, fromEvent, nEvents);
exitCode = 0;
}
} else if (listJobs) {
listJobs(cluster);
exitCode = 0;
} else if (listAllJobs) {
listAllJobs(cluster);
exitCode = 0;
} else if (listActiveTrackers) {
listActiveTrackers(cluster);
exitCode = 0;
} else if (listBlacklistedTrackers) {
listBlacklistedTrackers(cluster);
exitCode = 0;
} else if (displayTasks) {
Job job = getJob(JobID.forName(jobid));
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
displayTasks(getJob(JobID.forName(jobid)), taskType, taskState);
exitCode = 0;
}
} else if(killTask) {
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
Job job = getJob(taskID.getJobID());
if (job == null) {
System.out.println("Could not find job " + jobid);
} else if (job.killTask(taskID, false)) {
System.out.println("Killed task " + taskid);
exitCode = 0;
} else {
System.out.println("Could not kill task " + taskid);
exitCode = -1;
}
} else if(failTask) {
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
Job job = getJob(taskID.getJobID());
if (job == null) {
System.out.println("Could not find job " + jobid);
} else if(job.killTask(taskID, true)) {
System.out.println("Killed task " + taskID + " by failing it");
exitCode = 0;
} else {
System.out.println("Could not fail task " + taskid);
exitCode = -1;
}
} else if (logs) {
JobID jobID = JobID.forName(jobid);
if (getJob(jobID) == null) {
System.out.println("Could not find job " + jobid);
} else {
try {
TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskid);
LogParams logParams = cluster.getLogParams(jobID, taskAttemptID);
LogCLIHelpers logDumper = new LogCLIHelpers();
logDumper.setConf(getConf());
exitCode = logDumper.dumpAContainersLogs(
logParams.getApplicationId(), logParams.getContainerId(),
logParams.getNodeId(), logParams.getOwner());
} catch (IOException e) {
if (e instanceof RemoteException) {
throw e;
}
System.out.println(e.getMessage());
}
}
} else if (downloadConfig) {
Job job = getJob(JobID.forName(jobid));
if (job == null) {
System.out.println("Could not find job " + jobid);
} else {
String jobFile = job.getJobFile();
if (jobFile == null || jobFile.isEmpty()) {
System.out.println("Config file for job " + jobFile +
" could not be found.");
} else {
Path configPath = new Path(jobFile);
FileSystem fs = FileSystem.get(getConf());
fs.copyToLocalFile(configPath, new Path(configOutFile));
exitCode = 0;
}
}
}
} catch (RemoteException re) {
IOException unwrappedException = re.unwrapRemoteException();
if (unwrappedException instanceof AccessControlException) {
System.out.println(unwrappedException.getMessage());
} else {
throw re;
}
} finally {
cluster.close();
}
return exitCode;
}
Cluster createCluster() throws IOException {
return new Cluster(getConf());
}
private String getJobPriorityNames() {
StringBuffer sb = new StringBuffer();
for (JobPriority p : JobPriority.values()) {
// UNDEFINED_PRIORITY need not to be displayed in usage
if (JobPriority.UNDEFINED_PRIORITY == p) {
continue;
}
sb.append(p.name()).append(" ");
}
return sb.substring(0, sb.length()-1);
}
private String getTaskTypes() {
return StringUtils.join(taskTypes, " ");
}
/**
* Display usage of the command-line tool and terminate execution.
*/
private void displayUsage(String cmd) {
String prefix = "Usage: job ";
String jobPriorityValues = getJobPriorityNames();
String taskStates = "pending, running, completed, failed, killed";
if ("-submit".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " <job-file>]");
} else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " <job-id>]");
} else if ("-counter".equals(cmd)) {
System.err.println(prefix + "[" + cmd +
" <job-id> <group-name> <counter-name>]");
} else if ("-events".equals(cmd)) {
System.err.println(prefix + "[" + cmd +
" <job-id> <from-event-#> <#-of-events>]. Event #s start from 1.");
} else if ("-history".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " [all] <jobHistoryFile|jobId> " +
"[-outfile <file>] [-format <human|json>]]");
} else if ("-list".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " [all]]");
} else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " <task-attempt-id>]");
} else if ("-set-priority".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
"Valid values for priorities are: "
+ jobPriorityValues
+ ". In addition to this, integers also can be used.");
} else if ("-list-active-trackers".equals(cmd)) {
System.err.println(prefix + "[" + cmd + "]");
} else if ("-list-blacklisted-trackers".equals(cmd)) {
System.err.println(prefix + "[" + cmd + "]");
} else if ("-list-attempt-ids".equals(cmd)) {
System.err.println(prefix + "[" + cmd +
" <job-id> <task-type> <task-state>]. " +
"Valid values for <task-type> are " + getTaskTypes() + ". " +
"Valid values for <task-state> are " + taskStates);
} else if ("-logs".equals(cmd)) {
System.err.println(prefix + "[" + cmd +
" <job-id> <task-attempt-id>]. " +
" <task-attempt-id> is optional to get task attempt logs.");
} else if ("-config".equals(cmd)) {
System.err.println(prefix + "[" + cmd + " <job-id> <file>]");
} else {
System.err.printf(prefix + "<command> <args>%n");
System.err.printf("\t[-submit <job-file>]%n");
System.err.printf("\t[-status <job-id>]%n");
System.err.printf("\t[-counter <job-id> <group-name> <counter-name>]%n");
System.err.printf("\t[-kill <job-id>]%n");
System.err.printf("\t[-set-priority <job-id> <priority>]. " +
"Valid values for priorities are: " + jobPriorityValues +
". In addition to this, integers also can be used." + "%n");
System.err.printf("\t[-events <job-id> <from-event-#> <#-of-events>]%n");
System.err.printf("\t[-history [all] <jobHistoryFile|jobId> " +
"[-outfile <file>] [-format <human|json>]]%n");
System.err.printf("\t[-list [all]]%n");
System.err.printf("\t[-list-active-trackers]%n");
System.err.printf("\t[-list-blacklisted-trackers]%n");
System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
"<task-state>]. " +
"Valid values for <task-type> are " + getTaskTypes() + ". " +
"Valid values for <task-state> are " + taskStates);
System.err.printf("\t[-kill-task <task-attempt-id>]%n");
System.err.printf("\t[-fail-task <task-attempt-id>]%n");
System.err.printf("\t[-logs <job-id> <task-attempt-id>]%n");
System.err.printf("\t[-config <job-id> <file>%n%n");
ToolRunner.printGenericCommandUsage(System.out);
}
}
private void viewHistory(String historyFile, boolean all,
String historyOutFile, String format) throws IOException {
HistoryViewer historyViewer = new HistoryViewer(historyFile,
getConf(), all, format);
PrintStream ps = System.out;
if (historyOutFile != null) {
ps = new PrintStream(new BufferedOutputStream(new FileOutputStream(
new File(historyOutFile))), true, "UTF-8");
}
historyViewer.print(ps);
}
protected long getCounter(Counters counters, String counterGroupName,
String counterName) throws IOException {
return counters.findCounter(counterGroupName, counterName).getValue();
}
/**
* List the events for the given job
* @param job the job to list
* @param fromEventId event id for the job's events to list from
* @param numEvents number of events we want to list
* @throws IOException
*/
private void listEvents(Job job, int fromEventId, int numEvents)
throws IOException, InterruptedException {
TaskCompletionEvent[] events = job.
getTaskCompletionEvents(fromEventId, numEvents);
System.out.println("Task completion events for " + job.getJobID());
System.out.println("Number of events (from " + fromEventId + ") are: "
+ events.length);
for(TaskCompletionEvent event: events) {
System.out.println(event.getStatus() + " " +
event.getTaskAttemptId() + " " +
getTaskLogURL(event.getTaskAttemptId(), event.getTaskTrackerHttp()));
}
}
protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
}
@VisibleForTesting
Job getJob(JobID jobid) throws IOException, InterruptedException {
int maxRetry = getConf().getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
long retryInterval = getConf()
.getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
Job job = cluster.getJob(jobid);
for (int i = 0; i < maxRetry; ++i) {
if (job != null) {
return job;
}
LOG.info("Could not obtain job info after " + String.valueOf(i + 1)
+ " attempt(s). Sleeping for " + String.valueOf(retryInterval / 1000)
+ " seconds and retrying.");
Thread.sleep(retryInterval);
job = cluster.getJob(jobid);
}
return job;
}
/**
* Dump a list of currently running jobs
* @throws IOException
*/
private void listJobs(Cluster cluster)
throws IOException, InterruptedException {
List<JobStatus> runningJobs = new ArrayList<JobStatus>();
for (JobStatus job : cluster.getAllJobStatuses()) {
if (!job.isJobComplete()) {
runningJobs.add(job);
}
}
displayJobList(runningJobs.toArray(new JobStatus[0]));
}
/**
* Dump a list of all jobs submitted.
* @throws IOException
*/
private void listAllJobs(Cluster cluster)
throws IOException, InterruptedException {
displayJobList(cluster.getAllJobStatuses());
}
/**
* Display the list of active trackers
*/
private void listActiveTrackers(Cluster cluster)
throws IOException, InterruptedException {
TaskTrackerInfo[] trackers = cluster.getActiveTaskTrackers();
for (TaskTrackerInfo tracker : trackers) {
System.out.println(tracker.getTaskTrackerName());
}
}
/**
* Display the list of blacklisted trackers
*/
private void listBlacklistedTrackers(Cluster cluster)
throws IOException, InterruptedException {
TaskTrackerInfo[] trackers = cluster.getBlackListedTaskTrackers();
if (trackers.length > 0) {
System.out.println("BlackListedNode \t Reason");
}
for (TaskTrackerInfo tracker : trackers) {
System.out.println(tracker.getTaskTrackerName() + "\t" +
tracker.getReasonForBlacklist());
}
}
private void printTaskAttempts(TaskReport report) {
if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
System.out.println(report.getSuccessfulTaskAttemptId());
} else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
for (TaskAttemptID t :
report.getRunningTaskAttemptIds()) {
System.out.println(t);
}
}
}
/**
* Display the information about a job's tasks, of a particular type and
* in a particular state
*
* @param job the job
* @param type the type of the task (map/reduce/setup/cleanup)
* @param state the state of the task
* (pending/running/completed/failed/killed)
*/
protected void displayTasks(Job job, String type, String state)
throws IOException, InterruptedException {
TaskReport[] reports = job.getTaskReports(TaskType.valueOf(
org.apache.hadoop.util.StringUtils.toUpperCase(type)));
for (TaskReport report : reports) {
TIPStatus status = report.getCurrentStatus();
if ((state.equalsIgnoreCase("pending") && status ==TIPStatus.PENDING) ||
(state.equalsIgnoreCase("running") && status ==TIPStatus.RUNNING) ||
(state.equalsIgnoreCase("completed") && status == TIPStatus.COMPLETE) ||
(state.equalsIgnoreCase("failed") && status == TIPStatus.FAILED) ||
(state.equalsIgnoreCase("killed") && status == TIPStatus.KILLED)) {
printTaskAttempts(report);
}
}
}
public void displayJobList(JobStatus[] jobs)
throws IOException, InterruptedException {
displayJobList(jobs, new PrintWriter(new OutputStreamWriter(System.out,
Charsets.UTF_8)));
}
@Private
public static String headerPattern = "%23s\t%10s\t%14s\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
@Private
public static String dataPattern = "%23s\t%10s\t%14d\t%12s\t%12s\t%10s\t%15s\t%15s\t%8s\t%8s\t%10s\t%10s\n";
private static String memPattern = "%dM";
private static String UNAVAILABLE = "N/A";
@Private
public void displayJobList(JobStatus[] jobs, PrintWriter writer) {
writer.println("Total jobs:" + jobs.length);
writer.printf(headerPattern, "JobId", "State", "StartTime", "UserName",
"Queue", "Priority", "UsedContainers",
"RsvdContainers", "UsedMem", "RsvdMem", "NeededMem", "AM info");
for (JobStatus job : jobs) {
int numUsedSlots = job.getNumUsedSlots();
int numReservedSlots = job.getNumReservedSlots();
long usedMem = job.getUsedMem();
long rsvdMem = job.getReservedMem();
long neededMem = job.getNeededMem();
writer.printf(dataPattern,
job.getJobID().toString(), job.getState(), job.getStartTime(),
job.getUsername(), job.getQueue(),
job.getPriority().name(),
numUsedSlots < 0 ? UNAVAILABLE : numUsedSlots,
numReservedSlots < 0 ? UNAVAILABLE : numReservedSlots,
usedMem < 0 ? UNAVAILABLE : String.format(memPattern, usedMem),
rsvdMem < 0 ? UNAVAILABLE : String.format(memPattern, rsvdMem),
neededMem < 0 ? UNAVAILABLE : String.format(memPattern, neededMem),
job.getSchedulingInfo());
}
writer.flush();
}
public static void main(String[] argv) throws Exception {
int res = ToolRunner.run(new CLI(), argv);
ExitUtil.terminate(res);
}
}