blob: 9584d05dcd7e2bba158bafd2a149345e6a7c3572 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobACLsManager;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
/**
* Loads the basic job level data upfront.
* Data from job history file is loaded lazily.
*/
public class CompletedJob implements org.apache.hadoop.mapreduce.v2.app.job.Job {
static final Log LOG = LogFactory.getLog(CompletedJob.class);
private final Counters counters;
private final Configuration conf;
private final JobId jobId;
private final List<String> diagnostics = new ArrayList<String>();
private final JobReport report;
private final Map<TaskId, Task> tasks = new HashMap<TaskId, Task>();
private final Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
private final String user;
private final Path confFile;
private JobACLsManager aclsMgr;
private List<TaskAttemptCompletionEvent> completionEvents = null;
private JobInfo jobInfo;
public CompletedJob(Configuration conf, JobId jobId, Path historyFile,
boolean loadTasks, String userName, Path confFile, JobACLsManager aclsMgr)
throws IOException {
LOG.info("Loading job: " + jobId + " from file: " + historyFile);
this.conf = conf;
this.jobId = jobId;
this.confFile = confFile;
this.aclsMgr = aclsMgr;
loadFullHistoryData(loadTasks, historyFile);
user = userName;
counters = jobInfo.getTotalCounters();
diagnostics.add(jobInfo.getErrorInfo());
report =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(
JobReport.class);
report.setJobId(jobId);
report.setJobState(JobState.valueOf(jobInfo.getJobStatus()));
report.setSubmitTime(jobInfo.getSubmitTime());
report.setStartTime(jobInfo.getLaunchTime());
report.setFinishTime(jobInfo.getFinishTime());
report.setJobName(jobInfo.getJobname());
report.setUser(jobInfo.getUsername());
report.setMapProgress((float) getCompletedMaps() / getTotalMaps());
report.setReduceProgress((float) getCompletedReduces() / getTotalReduces());
report.setJobFile(confFile.toString());
report.setTrackingUrl(JobHistoryUtils.getHistoryUrl(conf, TypeConverter
.toYarn(TypeConverter.fromYarn(jobId)).getAppId()));
report.setAMInfos(getAMInfos());
report.setIsUber(isUber());
}
@Override
public int getCompletedMaps() {
return (int) jobInfo.getFinishedMaps();
}
@Override
public int getCompletedReduces() {
return (int) jobInfo.getFinishedReduces();
}
@Override
public Counters getAllCounters() {
return counters;
}
@Override
public JobId getID() {
return jobId;
}
@Override
public JobReport getReport() {
return report;
}
@Override
public float getProgress() {
return 1.0f;
}
@Override
public JobState getState() {
return report.getJobState();
}
@Override
public Task getTask(TaskId taskId) {
return tasks.get(taskId);
}
@Override
public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
int fromEventId, int maxEvents) {
if (completionEvents == null) {
constructTaskAttemptCompletionEvents();
}
TaskAttemptCompletionEvent[] events = new TaskAttemptCompletionEvent[0];
if (completionEvents.size() > fromEventId) {
int actualMax = Math.min(maxEvents,
(completionEvents.size() - fromEventId));
events = completionEvents.subList(fromEventId, actualMax + fromEventId)
.toArray(events);
}
return events;
}
private void constructTaskAttemptCompletionEvents() {
completionEvents = new LinkedList<TaskAttemptCompletionEvent>();
List<TaskAttempt> allTaskAttempts = new LinkedList<TaskAttempt>();
for (TaskId taskId : tasks.keySet()) {
Task task = tasks.get(taskId);
for (TaskAttemptId taskAttemptId : task.getAttempts().keySet()) {
TaskAttempt taskAttempt = task.getAttempts().get(taskAttemptId);
allTaskAttempts.add(taskAttempt);
}
}
Collections.sort(allTaskAttempts, new Comparator<TaskAttempt>() {
@Override
public int compare(TaskAttempt o1, TaskAttempt o2) {
if (o1.getFinishTime() == 0 || o2.getFinishTime() == 0) {
if (o1.getFinishTime() == 0 && o2.getFinishTime() == 0) {
if (o1.getLaunchTime() == 0 || o2.getLaunchTime() == 0) {
if (o1.getLaunchTime() == 0 && o2.getLaunchTime() == 0) {
return 0;
} else {
long res = o1.getLaunchTime() - o2.getLaunchTime();
return res > 0 ? -1 : 1;
}
} else {
return (int) (o1.getLaunchTime() - o2.getLaunchTime());
}
} else {
long res = o1.getFinishTime() - o2.getFinishTime();
return res > 0 ? -1 : 1;
}
} else {
return (int) (o1.getFinishTime() - o2.getFinishTime());
}
}
});
int eventId = 0;
for (TaskAttempt taskAttempt : allTaskAttempts) {
TaskAttemptCompletionEvent tace = RecordFactoryProvider.getRecordFactory(
null).newRecordInstance(TaskAttemptCompletionEvent.class);
int attemptRunTime = -1;
if (taskAttempt.getLaunchTime() != 0 && taskAttempt.getFinishTime() != 0) {
attemptRunTime =
(int) (taskAttempt.getFinishTime() - taskAttempt.getLaunchTime());
}
// Default to KILLED
TaskAttemptCompletionEventStatus taceStatus =
TaskAttemptCompletionEventStatus.KILLED;
String taStateString = taskAttempt.getState().toString();
try {
taceStatus = TaskAttemptCompletionEventStatus.valueOf(taStateString);
} catch (Exception e) {
LOG.warn("Cannot constuct TACEStatus from TaskAtemptState: ["
+ taStateString + "] for taskAttemptId: [" + taskAttempt.getID()
+ "]. Defaulting to KILLED");
}
tace.setAttemptId(taskAttempt.getID());
tace.setAttemptRunTime(attemptRunTime);
tace.setEventId(eventId++);
tace.setMapOutputServerAddress(taskAttempt
.getAssignedContainerMgrAddress());
tace.setStatus(taceStatus);
completionEvents.add(tace);
}
}
@Override
public Map<TaskId, Task> getTasks() {
return tasks;
}
//History data is leisurely loaded when task level data is requested
private synchronized void loadFullHistoryData(boolean loadTasks,
Path historyFileAbsolute) throws IOException {
LOG.info("Loading history file: [" + historyFileAbsolute + "]");
if (jobInfo != null) {
return; //data already loaded
}
if (historyFileAbsolute != null) {
JobHistoryParser parser = null;
try {
parser =
new JobHistoryParser(historyFileAbsolute.getFileSystem(conf),
historyFileAbsolute);
jobInfo = parser.parse();
} catch (IOException e) {
throw new YarnException("Could not load history file "
+ historyFileAbsolute, e);
}
IOException parseException = parser.getParseException();
if (parseException != null) {
throw new YarnException(
"Could not parse history file " + historyFileAbsolute,
parseException);
}
} else {
throw new IOException("History file not found");
}
if (loadTasks) {
for (Map.Entry<org.apache.hadoop.mapreduce.TaskID, TaskInfo> entry : jobInfo
.getAllTasks().entrySet()) {
TaskId yarnTaskID = TypeConverter.toYarn(entry.getKey());
TaskInfo taskInfo = entry.getValue();
Task task = new CompletedTask(yarnTaskID, taskInfo);
tasks.put(yarnTaskID, task);
if (task.getType() == TaskType.MAP) {
mapTasks.put(task.getID(), task);
} else if (task.getType() == TaskType.REDUCE) {
reduceTasks.put(task.getID(), task);
}
}
}
LOG.info("TaskInfo loaded");
}
@Override
public List<String> getDiagnostics() {
return diagnostics;
}
@Override
public String getName() {
return jobInfo.getJobname();
}
@Override
public String getQueueName() {
return jobInfo.getJobQueueName();
}
@Override
public int getTotalMaps() {
return (int) jobInfo.getTotalMaps();
}
@Override
public int getTotalReduces() {
return (int) jobInfo.getTotalReduces();
}
@Override
public boolean isUber() {
return jobInfo.getUberized();
}
@Override
public Map<TaskId, Task> getTasks(TaskType taskType) {
if (TaskType.MAP.equals(taskType)) {
return mapTasks;
} else {//we have only two types of tasks
return reduceTasks;
}
}
@Override
public
boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
Map<JobACL, AccessControlList> jobACLs = jobInfo.getJobACLs();
AccessControlList jobACL = jobACLs.get(jobOperation);
if (jobACL == null) {
return true;
}
return aclsMgr.checkAccess(callerUGI, jobOperation,
jobInfo.getUsername(), jobACL);
}
/*
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.v2.app.job.Job#getJobACLs()
*/
@Override
public Map<JobACL, AccessControlList> getJobACLs() {
return jobInfo.getJobACLs();
}
@Override
public String getUserName() {
return user;
}
/*
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.v2.app.job.Job#getConfFile()
*/
@Override
public Path getConfFile() {
return confFile;
}
@Override
public List<AMInfo> getAMInfos() {
List<AMInfo> amInfos = new LinkedList<AMInfo>();
for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
.getAMInfos()) {
AMInfo amInfo =
MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
jhAmInfo.getNodeManagerHttpPort());
amInfos.add(amInfo);
}
return amInfos;
}
}