blob: 817cd14593e2c17a341103f4d72c47d5a6bae8e5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.jobhistory;
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.util.JobHistoryEventUtils;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.JsonNodeFactory;
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.api.client.ClientHandlerException;
/**
* The job history events get routed to this class. This class writes the Job
* history events to the DFS directly into a staging dir and then moved to a
* done-dir. JobHistory implementation is in this package to access package
* private classes.
*/
public class JobHistoryEventHandler extends AbstractService
implements EventHandler<JobHistoryEvent> {
private static final JsonNodeFactory FACTORY =
new ObjectMapper().getNodeFactory();
private final AppContext context;
private final int startCount;
private int eventCounter;
// Those file systems may differ from the job configuration
// See org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils
// #ensurePathInDefaultFileSystem
private FileSystem stagingDirFS; // log Dir FileSystem
private FileSystem doneDirFS; // done Dir FileSystem
private Path stagingDirPath = null;
private Path doneDirPrefixPath = null; // folder for completed jobs
private int maxUnflushedCompletionEvents;
private int postJobCompletionMultiplier;
private long flushTimeout;
private int minQueueSizeForBatchingFlushes; // TODO: Rename
private int numUnflushedCompletionEvents = 0;
private boolean isTimerActive;
private EventWriter.WriteMode jhistMode =
EventWriter.WriteMode.JSON;
protected BlockingQueue<JobHistoryEvent> eventQueue =
new LinkedBlockingQueue<JobHistoryEvent>();
protected Thread eventHandlingThread;
private volatile boolean stopped;
private final Object lock = new Object();
private static final Log LOG = LogFactory.getLog(
JobHistoryEventHandler.class);
protected static final Map<JobId, MetaInfo> fileMap =
Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
// should job completion be force when the AM shuts down?
protected volatile boolean forceJobCompletion = false;
protected TimelineClient timelineClient;
private boolean timelineServiceV2Enabled = false;
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
private static final String MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE =
"MAPREDUCE_TASK_ATTEMPT";
public JobHistoryEventHandler(AppContext context, int startCount) {
super("JobHistoryEventHandler");
this.context = context;
this.startCount = startCount;
}
/* (non-Javadoc)
* @see org.apache.hadoop.yarn.service.AbstractService#init(org.
* apache.hadoop.conf.Configuration)
* Initializes the FileSystem and Path objects for the log and done directories.
* Creates these directories if they do not already exist.
*/
@Override
protected void serviceInit(Configuration conf) throws Exception {
String jobId =
TypeConverter.fromYarn(context.getApplicationID()).toString();
String stagingDirStr = null;
String doneDirStr = null;
String userDoneDirStr = null;
try {
stagingDirStr = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf,
jobId);
doneDirStr =
JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf);
userDoneDirStr =
JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf);
} catch (IOException e) {
LOG.error("Failed while getting the configured log directories", e);
throw new YarnRuntimeException(e);
}
//Check for the existence of the history staging dir. Maybe create it.
try {
stagingDirPath =
FileContext.getFileContext(conf).makeQualified(new Path(stagingDirStr));
stagingDirFS = FileSystem.get(stagingDirPath.toUri(), conf);
mkdir(stagingDirFS, stagingDirPath, new FsPermission(
JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
} catch (IOException e) {
LOG.error("Failed while checking for/creating history staging path: ["
+ stagingDirPath + "]", e);
throw new YarnRuntimeException(e);
}
//Check for the existence of intermediate done dir.
Path doneDirPath = null;
try {
doneDirPath = FileContext.getFileContext(conf).makeQualified(new Path(doneDirStr));
doneDirFS = FileSystem.get(doneDirPath.toUri(), conf);
// This directory will be in a common location, or this may be a cluster
// meant for a single user. Creating based on the conf. Should ideally be
// created by the JobHistoryServer or as part of deployment.
if (!doneDirFS.exists(doneDirPath)) {
if (JobHistoryUtils.shouldCreateNonUserDirectory(conf)) {
LOG.info("Creating intermediate history logDir: ["
+ doneDirPath
+ "] + based on conf. Should ideally be created by the JobHistoryServer: "
+ MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR);
mkdir(
doneDirFS,
doneDirPath,
new FsPermission(
JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS
.toShort()));
// TODO Temporary toShort till new FsPermission(FsPermissions)
// respects
// sticky
} else {
String message = "Not creating intermediate history logDir: ["
+ doneDirPath
+ "] based on conf: "
+ MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR
+ ". Either set to true or pre-create this directory with" +
" appropriate permissions";
LOG.error(message);
throw new YarnRuntimeException(message);
}
}
} catch (IOException e) {
LOG.error("Failed checking for the existance of history intermediate " +
"done directory: [" + doneDirPath + "]");
throw new YarnRuntimeException(e);
}
//Check/create user directory under intermediate done dir.
try {
doneDirPrefixPath =
FileContext.getFileContext(conf).makeQualified(new Path(userDoneDirStr));
mkdir(doneDirFS, doneDirPrefixPath, new FsPermission(
JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
} catch (IOException e) {
LOG.error("Error creating user intermediate history done directory: [ "
+ doneDirPrefixPath + "]", e);
throw new YarnRuntimeException(e);
}
// Maximum number of unflushed completion-events that can stay in the queue
// before flush kicks in.
maxUnflushedCompletionEvents =
conf.getInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS,
MRJobConfig.DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS);
// We want to cut down flushes after job completes so as to write quicker,
// so we increase maxUnflushedEvents post Job completion by using the
// following multiplier.
postJobCompletionMultiplier =
conf.getInt(
MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER,
MRJobConfig.DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER);
// Max time until which flush doesn't take place.
flushTimeout =
conf.getLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
MRJobConfig.DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS);
minQueueSizeForBatchingFlushes =
conf.getInt(
MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD,
MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD);
// TODO replace MR specific configurations on timeline service with getting
// configuration from RM through registerApplicationMaster() in
// ApplicationMasterProtocol with return value for timeline service
// configuration status: off, on_with_v1 or on_with_v2.
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
LOG.info("Emitting job history data to the timeline service is enabled");
if (YarnConfiguration.timelineServiceEnabled(conf)) {
timelineClient =
((MRAppMaster.RunningAppContext)context).getTimelineClient();
timelineClient.init(conf);
timelineServiceV2Enabled =
YarnConfiguration.timelineServiceV2Enabled(conf);
LOG.info("Timeline service is enabled; version: " +
YarnConfiguration.getTimelineServiceVersion(conf));
} else {
LOG.info("Timeline service is not enabled");
}
} else {
LOG.info("Emitting job history data to the timeline server is not " +
"enabled");
}
// Flag for setting
String jhistFormat = conf.get(JHAdminConfig.MR_HS_JHIST_FORMAT,
JHAdminConfig.DEFAULT_MR_HS_JHIST_FORMAT);
if (jhistFormat.equals("json")) {
jhistMode = EventWriter.WriteMode.JSON;
} else if (jhistFormat.equals("binary")) {
jhistMode = EventWriter.WriteMode.BINARY;
} else {
LOG.warn("Unrecognized value '" + jhistFormat + "' for property " +
JHAdminConfig.MR_HS_JHIST_FORMAT + ". Valid values are " +
"'json' or 'binary'. Falling back to default value '" +
JHAdminConfig.DEFAULT_MR_HS_JHIST_FORMAT + "'.");
}
super.serviceInit(conf);
}
private void mkdir(FileSystem fs, Path path, FsPermission fsp)
throws IOException {
if (!fs.exists(path)) {
try {
fs.mkdirs(path, fsp);
FileStatus fsStatus = fs.getFileStatus(path);
LOG.info("Perms after creating " + fsStatus.getPermission().toShort()
+ ", Expected: " + fsp.toShort());
if (fsStatus.getPermission().toShort() != fsp.toShort()) {
LOG.info("Explicitly setting permissions to : " + fsp.toShort()
+ ", " + fsp);
fs.setPermission(path, fsp);
}
} catch (FileAlreadyExistsException e) {
LOG.info("Directory: [" + path + "] already exists.");
}
}
}
@Override
protected void serviceStart() throws Exception {
if (timelineClient != null) {
timelineClient.start();
}
eventHandlingThread = new Thread(new Runnable() {
@Override
public void run() {
JobHistoryEvent event = null;
while (!stopped && !Thread.currentThread().isInterrupted()) {
// Log the size of the history-event-queue every so often.
if (eventCounter != 0 && eventCounter % 1000 == 0) {
eventCounter = 0;
LOG.info("Size of the JobHistory event queue is "
+ eventQueue.size());
} else {
eventCounter++;
}
try {
event = eventQueue.take();
} catch (InterruptedException e) {
LOG.info("EventQueue take interrupted. Returning");
return;
}
// If an event has been removed from the queue. Handle it.
// The rest of the queue is handled via stop()
// Clear the interrupt status if it's set before calling handleEvent
// and set it if it was set before calling handleEvent.
// Interrupts received from other threads during handleEvent cannot be
// dealth with - Shell.runCommand() ignores them.
synchronized (lock) {
boolean isInterrupted = Thread.interrupted();
handleEvent(event);
if (isInterrupted) {
LOG.debug("Event handling interrupted");
Thread.currentThread().interrupt();
}
}
}
}
}, "eventHandlingThread");
eventHandlingThread.start();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
LOG.info("Stopping JobHistoryEventHandler. "
+ "Size of the outstanding queue size is " + eventQueue.size());
stopped = true;
//do not interrupt while event handling is in progress
synchronized(lock) {
if (eventHandlingThread != null) {
LOG.debug("Interrupting Event Handling thread");
eventHandlingThread.interrupt();
} else {
LOG.debug("Null event handling thread");
}
}
try {
if (eventHandlingThread != null) {
LOG.debug("Waiting for Event Handling thread to complete");
eventHandlingThread.join();
}
} catch (InterruptedException ie) {
LOG.info("Interrupted Exception while stopping", ie);
}
// Cancel all timers - so that they aren't invoked during or after
// the metaInfo object is wrapped up.
for (MetaInfo mi : fileMap.values()) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Shutting down timer for " + mi);
}
mi.shutDownTimer();
} catch (IOException e) {
LOG.info("Exception while cancelling delayed flush timer. "
+ "Likely caused by a failed flush " + e.getMessage());
}
}
//write all the events remaining in queue
Iterator<JobHistoryEvent> it = eventQueue.iterator();
while(it.hasNext()) {
JobHistoryEvent ev = it.next();
LOG.info("In stop, writing event " + ev.getType());
handleEvent(ev);
}
// Process JobUnsuccessfulCompletionEvent for jobIds which still haven't
// closed their event writers
if(forceJobCompletion) {
for (Map.Entry<JobId,MetaInfo> jobIt : fileMap.entrySet()) {
JobId toClose = jobIt.getKey();
MetaInfo mi = jobIt.getValue();
if(mi != null && mi.isWriterActive()) {
LOG.warn("Found jobId " + toClose
+ " to have not been closed. Will close");
//Create a JobFinishEvent so that it is written to the job history
final Job job = context.getJob(toClose);
JobUnsuccessfulCompletionEvent jucEvent =
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
System.currentTimeMillis(), job.getCompletedMaps(),
job.getCompletedReduces(),
createJobStateForJobUnsuccessfulCompletionEvent(
mi.getForcedJobStateOnShutDown()),
job.getDiagnostics());
JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
//Bypass the queue mechanism which might wait. Call the method directly
handleEvent(jfEvent);
}
}
}
//close all file handles
for (MetaInfo mi : fileMap.values()) {
try {
mi.closeWriter();
} catch (IOException e) {
LOG.info("Exception while closing file " + e.getMessage());
}
}
if (timelineClient != null) {
timelineClient.stop();
}
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
super.serviceStop();
}
protected EventWriter createEventWriter(Path historyFilePath)
throws IOException {
FSDataOutputStream out = stagingDirFS.create(historyFilePath, true);
return new EventWriter(out, this.jhistMode);
}
/**
* Create an event writer for the Job represented by the jobID.
* Writes out the job configuration to the log directory.
* This should be the first call to history for a job
*
* @param jobId the jobId.
* @param amStartedEvent
* @throws IOException
*/
protected void setupEventWriter(JobId jobId, AMStartedEvent amStartedEvent)
throws IOException {
if (stagingDirPath == null) {
LOG.error("Log Directory is null, returning");
throw new IOException("Missing Log Directory for History");
}
MetaInfo oldFi = fileMap.get(jobId);
Configuration conf = getConfig();
// TODO Ideally this should be written out to the job dir
// (.staging/jobid/files - RecoveryService will need to be patched)
Path historyFile = JobHistoryUtils.getStagingJobHistoryFile(
stagingDirPath, jobId, startCount);
String user = UserGroupInformation.getCurrentUser().getShortUserName();
if (user == null) {
throw new IOException(
"User is null while setting up jobhistory eventwriter");
}
String jobName = context.getJob(jobId).getName();
EventWriter writer = (oldFi == null) ? null : oldFi.writer;
Path logDirConfPath =
JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId, startCount);
if (writer == null) {
try {
writer = createEventWriter(historyFile);
LOG.info("Event Writer setup for JobId: " + jobId + ", File: "
+ historyFile);
} catch (IOException ioe) {
LOG.info("Could not create log file: [" + historyFile + "] + for job "
+ "[" + jobName + "]");
throw ioe;
}
//Write out conf only if the writer isn't already setup.
if (conf != null) {
// TODO Ideally this should be written out to the job dir
// (.staging/jobid/files - RecoveryService will need to be patched)
FSDataOutputStream jobFileOut = null;
try {
if (logDirConfPath != null) {
jobFileOut = stagingDirFS.create(logDirConfPath, true);
conf.writeXml(jobFileOut);
jobFileOut.close();
}
} catch (IOException e) {
LOG.info("Failed to write the job configuration file", e);
throw e;
}
}
}
String queueName = JobConf.DEFAULT_QUEUE_NAME;
if (conf != null) {
queueName = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME);
}
MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer,
user, jobName, jobId, amStartedEvent.getForcedJobStateOnShutDown(),
queueName);
fi.getJobSummary().setJobId(jobId);
fi.getJobSummary().setJobLaunchTime(amStartedEvent.getStartTime());
fi.getJobSummary().setJobSubmitTime(amStartedEvent.getSubmitTime());
fi.getJobIndexInfo().setJobStartTime(amStartedEvent.getStartTime());
fi.getJobIndexInfo().setSubmitTime(amStartedEvent.getSubmitTime());
fileMap.put(jobId, fi);
}
/** Close the event writer for this id
* @throws IOException */
public void closeWriter(JobId id) throws IOException {
try {
final MetaInfo mi = fileMap.get(id);
if (mi != null) {
mi.closeWriter();
}
} catch (IOException e) {
LOG.error("Error closing writer for JobID: " + id);
throw e;
}
}
@Override
public void handle(JobHistoryEvent event) {
try {
if (isJobCompletionEvent(event.getHistoryEvent())) {
// When the job is complete, flush slower but write faster.
maxUnflushedCompletionEvents =
maxUnflushedCompletionEvents * postJobCompletionMultiplier;
}
eventQueue.put(event);
} catch (InterruptedException e) {
throw new YarnRuntimeException(e);
}
}
private boolean isJobCompletionEvent(HistoryEvent historyEvent) {
if (EnumSet.of(EventType.JOB_FINISHED, EventType.JOB_FAILED,
EventType.JOB_KILLED).contains(historyEvent.getEventType())) {
return true;
}
return false;
}
@Private
public void handleEvent(JobHistoryEvent event) {
synchronized (lock) {
// If this is JobSubmitted Event, setup the writer
if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) {
try {
AMStartedEvent amStartedEvent =
(AMStartedEvent) event.getHistoryEvent();
setupEventWriter(event.getJobID(), amStartedEvent);
} catch (IOException ioe) {
LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
ioe);
throw new YarnRuntimeException(ioe);
}
}
// For all events
// (1) Write it out
// (2) Process it for JobSummary
// (3) Process it for ATS (if enabled)
MetaInfo mi = fileMap.get(event.getJobID());
try {
HistoryEvent historyEvent = event.getHistoryEvent();
if (! (historyEvent instanceof NormalizedResourceEvent)) {
mi.writeEvent(historyEvent);
}
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
event.getJobID());
if (timelineClient != null) {
if (timelineServiceV2Enabled) {
processEventForNewTimelineService(historyEvent, event.getJobID(),
event.getTimestamp());
} else {
processEventForTimelineServer(historyEvent, event.getJobID(),
event.getTimestamp());
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("In HistoryEventHandler "
+ event.getHistoryEvent().getEventType());
}
} catch (IOException e) {
LOG.error("Error writing History Event: " + event.getHistoryEvent(),
e);
throw new YarnRuntimeException(e);
}
if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) {
JobSubmittedEvent jobSubmittedEvent =
(JobSubmittedEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime());
mi.getJobIndexInfo().setQueueName(jobSubmittedEvent.getJobQueueName());
}
//initialize the launchTime in the JobIndexInfo of MetaInfo
if(event.getHistoryEvent().getEventType() == EventType.JOB_INITED ){
JobInitedEvent jie = (JobInitedEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setJobStartTime(jie.getLaunchTime());
}
if (event.getHistoryEvent().getEventType() == EventType.JOB_QUEUE_CHANGED) {
JobQueueChangeEvent jQueueEvent =
(JobQueueChangeEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setQueueName(jQueueEvent.getJobQueueName());
}
// If this is JobFinishedEvent, close the writer and setup the job-index
if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {
try {
JobFinishedEvent jFinishedEvent =
(JobFinishedEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jFinishedEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jFinishedEvent.getFinishedMaps());
mi.getJobIndexInfo().setNumReduces(
jFinishedEvent.getFinishedReduces());
mi.getJobIndexInfo().setJobStatus(JobState.SUCCEEDED.toString());
closeEventWriter(event.getJobID());
processDoneFiles(event.getJobID());
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
// In case of JOB_ERROR, only process all the Done files(e.g. job
// summary, job history file etc.) if it is last AM retry.
if (event.getHistoryEvent().getEventType() == EventType.JOB_ERROR) {
try {
JobUnsuccessfulCompletionEvent jucEvent =
(JobUnsuccessfulCompletionEvent) event.getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
closeEventWriter(event.getJobID());
if(context.isLastAMRetry())
processDoneFiles(event.getJobID());
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
|| event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
try {
JobUnsuccessfulCompletionEvent jucEvent =
(JobUnsuccessfulCompletionEvent) event
.getHistoryEvent();
mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus());
closeEventWriter(event.getJobID());
processDoneFiles(event.getJobID());
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
}
}
}
public void processEventForJobSummary(HistoryEvent event, JobSummary summary,
JobId jobId) {
// context.getJob could be used for some of this info as well.
switch (event.getEventType()) {
case JOB_SUBMITTED:
JobSubmittedEvent jse = (JobSubmittedEvent) event;
summary.setUser(jse.getUserName());
summary.setQueue(jse.getJobQueueName());
summary.setJobSubmitTime(jse.getSubmitTime());
summary.setJobName(jse.getJobName());
break;
case NORMALIZED_RESOURCE:
NormalizedResourceEvent normalizedResourceEvent =
(NormalizedResourceEvent) event;
if (normalizedResourceEvent.getTaskType() == TaskType.MAP) {
summary.setResourcesPerMap((int) normalizedResourceEvent.getMemory());
} else if (normalizedResourceEvent.getTaskType() == TaskType.REDUCE) {
summary.setResourcesPerReduce((int) normalizedResourceEvent.getMemory());
}
break;
case JOB_INITED:
JobInitedEvent jie = (JobInitedEvent) event;
summary.setJobLaunchTime(jie.getLaunchTime());
break;
case MAP_ATTEMPT_STARTED:
TaskAttemptStartedEvent mtase = (TaskAttemptStartedEvent) event;
if (summary.getFirstMapTaskLaunchTime() == 0)
summary.setFirstMapTaskLaunchTime(mtase.getStartTime());
break;
case REDUCE_ATTEMPT_STARTED:
TaskAttemptStartedEvent rtase = (TaskAttemptStartedEvent) event;
if (summary.getFirstReduceTaskLaunchTime() == 0)
summary.setFirstReduceTaskLaunchTime(rtase.getStartTime());
break;
case JOB_FINISHED:
JobFinishedEvent jfe = (JobFinishedEvent) event;
summary.setJobFinishTime(jfe.getFinishTime());
summary.setNumFinishedMaps(jfe.getFinishedMaps());
summary.setNumFailedMaps(jfe.getFailedMaps());
summary.setNumFinishedReduces(jfe.getFinishedReduces());
summary.setNumFailedReduces(jfe.getFailedReduces());
if (summary.getJobStatus() == null)
summary
.setJobStatus(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED
.toString());
// TODO JOB_FINISHED does not have state. Effectively job history does not
// have state about the finished job.
setSummarySlotSeconds(summary, jfe.getTotalCounters());
break;
case JOB_FAILED:
case JOB_KILLED:
JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent) event;
summary.setJobStatus(juce.getStatus());
summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps());
summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces());
summary.setJobFinishTime(juce.getFinishTime());
setSummarySlotSeconds(summary, context.getJob(jobId).getAllCounters());
break;
default:
break;
}
}
private void processEventForTimelineServer(HistoryEvent event, JobId jobId,
long timestamp) {
TimelineEvent tEvent = new TimelineEvent();
tEvent.setEventType(StringUtils.toUpperCase(event.getEventType().name()));
tEvent.setTimestamp(timestamp);
TimelineEntity tEntity = new TimelineEntity();
switch (event.getEventType()) {
case JOB_SUBMITTED:
JobSubmittedEvent jse =
(JobSubmittedEvent) event;
tEvent.addEventInfo("SUBMIT_TIME", jse.getSubmitTime());
tEvent.addEventInfo("QUEUE_NAME", jse.getJobQueueName());
tEvent.addEventInfo("JOB_NAME", jse.getJobName());
tEvent.addEventInfo("USER_NAME", jse.getUserName());
tEvent.addEventInfo("JOB_CONF_PATH", jse.getJobConfPath());
tEvent.addEventInfo("ACLS", jse.getJobAcls());
tEvent.addEventInfo("JOB_QUEUE_NAME", jse.getJobQueueName());
tEvent.addEventInfo("WORKFLOW_ID", jse.getWorkflowId());
tEvent.addEventInfo("WORKFLOW_NAME", jse.getWorkflowName());
tEvent.addEventInfo("WORKFLOW_NAME_NAME", jse.getWorkflowNodeName());
tEvent.addEventInfo("WORKFLOW_ADJACENCIES",
jse.getWorkflowAdjacencies());
tEvent.addEventInfo("WORKFLOW_TAGS", jse.getWorkflowTags());
tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString());
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
break;
case JOB_STATUS_CHANGED:
JobStatusChangedEvent jsce = (JobStatusChangedEvent) event;
tEvent.addEventInfo("STATUS", jsce.getStatus());
tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString());
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
break;
case JOB_INFO_CHANGED:
JobInfoChangeEvent jice = (JobInfoChangeEvent) event;
tEvent.addEventInfo("SUBMIT_TIME", jice.getSubmitTime());
tEvent.addEventInfo("LAUNCH_TIME", jice.getLaunchTime());
tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString());
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
break;
case JOB_INITED:
JobInitedEvent jie = (JobInitedEvent) event;
tEvent.addEventInfo("START_TIME", jie.getLaunchTime());
tEvent.addEventInfo("STATUS", jie.getStatus());
tEvent.addEventInfo("TOTAL_MAPS", jie.getTotalMaps());
tEvent.addEventInfo("TOTAL_REDUCES", jie.getTotalReduces());
tEvent.addEventInfo("UBERIZED", jie.getUberized());
tEntity.setStartTime(jie.getLaunchTime());
tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString());
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
break;
case JOB_PRIORITY_CHANGED:
JobPriorityChangeEvent jpce = (JobPriorityChangeEvent) event;
tEvent.addEventInfo("PRIORITY", jpce.getPriority().toString());
tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString());
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
break;
case JOB_QUEUE_CHANGED:
JobQueueChangeEvent jqe = (JobQueueChangeEvent) event;
tEvent.addEventInfo("QUEUE_NAMES", jqe.getJobQueueName());
tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString());
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
break;
case JOB_FAILED:
case JOB_KILLED:
case JOB_ERROR:
JobUnsuccessfulCompletionEvent juce =
(JobUnsuccessfulCompletionEvent) event;
tEvent.addEventInfo("FINISH_TIME", juce.getFinishTime());
tEvent.addEventInfo("NUM_MAPS", juce.getFinishedMaps());
tEvent.addEventInfo("NUM_REDUCES", juce.getFinishedReduces());
tEvent.addEventInfo("JOB_STATUS", juce.getStatus());
tEvent.addEventInfo("DIAGNOSTICS", juce.getDiagnostics());
tEvent.addEventInfo("FINISHED_MAPS", juce.getFinishedMaps());
tEvent.addEventInfo("FINISHED_REDUCES", juce.getFinishedReduces());
tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString());
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
break;
case JOB_FINISHED:
JobFinishedEvent jfe = (JobFinishedEvent) event;
tEvent.addEventInfo("FINISH_TIME", jfe.getFinishTime());
tEvent.addEventInfo("NUM_MAPS", jfe.getFinishedMaps());
tEvent.addEventInfo("NUM_REDUCES", jfe.getFinishedReduces());
tEvent.addEventInfo("FAILED_MAPS", jfe.getFailedMaps());
tEvent.addEventInfo("FAILED_REDUCES", jfe.getFailedReduces());
tEvent.addEventInfo("FINISHED_MAPS", jfe.getFinishedMaps());
tEvent.addEventInfo("FINISHED_REDUCES", jfe.getFinishedReduces());
tEvent.addEventInfo("MAP_COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(jfe.getMapCounters()));
tEvent.addEventInfo("REDUCE_COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(jfe.getReduceCounters()));
tEvent.addEventInfo("TOTAL_COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(jfe.getTotalCounters()));
tEvent.addEventInfo("JOB_STATUS", JobState.SUCCEEDED.toString());
tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString());
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
break;
case TASK_STARTED:
TaskStartedEvent tse = (TaskStartedEvent) event;
tEvent.addEventInfo("TASK_TYPE", tse.getTaskType().toString());
tEvent.addEventInfo("START_TIME", tse.getStartTime());
tEvent.addEventInfo("SPLIT_LOCATIONS", tse.getSplitLocations());
tEntity.addEvent(tEvent);
tEntity.setEntityId(tse.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
break;
case TASK_FAILED:
TaskFailedEvent tfe = (TaskFailedEvent) event;
tEvent.addEventInfo("TASK_TYPE", tfe.getTaskType().toString());
tEvent.addEventInfo("STATUS", TaskStatus.State.FAILED.toString());
tEvent.addEventInfo("FINISH_TIME", tfe.getFinishTime());
tEvent.addEventInfo("ERROR", tfe.getError());
tEvent.addEventInfo("FAILED_ATTEMPT_ID",
tfe.getFailedAttemptID() == null ?
"" : tfe.getFailedAttemptID().toString());
tEvent.addEventInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(tfe.getCounters()));
tEntity.addEvent(tEvent);
tEntity.setEntityId(tfe.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
break;
case TASK_UPDATED:
TaskUpdatedEvent tue = (TaskUpdatedEvent) event;
tEvent.addEventInfo("FINISH_TIME", tue.getFinishTime());
tEntity.addEvent(tEvent);
tEntity.setEntityId(tue.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
break;
case TASK_FINISHED:
TaskFinishedEvent tfe2 = (TaskFinishedEvent) event;
tEvent.addEventInfo("TASK_TYPE", tfe2.getTaskType().toString());
tEvent.addEventInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(tfe2.getCounters()));
tEvent.addEventInfo("FINISH_TIME", tfe2.getFinishTime());
tEvent.addEventInfo("STATUS", TaskStatus.State.SUCCEEDED.toString());
tEvent.addEventInfo("SUCCESSFUL_TASK_ATTEMPT_ID",
tfe2.getSuccessfulTaskAttemptId() == null ?
"" : tfe2.getSuccessfulTaskAttemptId().toString());
tEntity.addEvent(tEvent);
tEntity.setEntityId(tfe2.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
break;
case MAP_ATTEMPT_STARTED:
case CLEANUP_ATTEMPT_STARTED:
case REDUCE_ATTEMPT_STARTED:
case SETUP_ATTEMPT_STARTED:
TaskAttemptStartedEvent tase = (TaskAttemptStartedEvent) event;
tEvent.addEventInfo("TASK_TYPE", tase.getTaskType().toString());
tEvent.addEventInfo("TASK_ATTEMPT_ID",
tase.getTaskAttemptId().toString());
tEvent.addEventInfo("START_TIME", tase.getStartTime());
tEvent.addEventInfo("HTTP_PORT", tase.getHttpPort());
tEvent.addEventInfo("TRACKER_NAME", tase.getTrackerName());
tEvent.addEventInfo("SHUFFLE_PORT", tase.getShufflePort());
tEvent.addEventInfo("CONTAINER_ID", tase.getContainerId() == null ?
"" : tase.getContainerId().toString());
tEntity.addEvent(tEvent);
tEntity.setEntityId(tase.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
break;
case MAP_ATTEMPT_FAILED:
case CLEANUP_ATTEMPT_FAILED:
case REDUCE_ATTEMPT_FAILED:
case SETUP_ATTEMPT_FAILED:
case MAP_ATTEMPT_KILLED:
case CLEANUP_ATTEMPT_KILLED:
case REDUCE_ATTEMPT_KILLED:
case SETUP_ATTEMPT_KILLED:
TaskAttemptUnsuccessfulCompletionEvent tauce =
(TaskAttemptUnsuccessfulCompletionEvent) event;
tEvent.addEventInfo("TASK_TYPE", tauce.getTaskType().toString());
tEvent.addEventInfo("TASK_ATTEMPT_ID",
tauce.getTaskAttemptId() == null ?
"" : tauce.getTaskAttemptId().toString());
tEvent.addEventInfo("FINISH_TIME", tauce.getFinishTime());
tEvent.addEventInfo("ERROR", tauce.getError());
tEvent.addEventInfo("STATUS", tauce.getTaskStatus());
tEvent.addEventInfo("HOSTNAME", tauce.getHostname());
tEvent.addEventInfo("PORT", tauce.getPort());
tEvent.addEventInfo("RACK_NAME", tauce.getRackName());
tEvent.addEventInfo("SHUFFLE_FINISH_TIME", tauce.getFinishTime());
tEvent.addEventInfo("SORT_FINISH_TIME", tauce.getFinishTime());
tEvent.addEventInfo("MAP_FINISH_TIME", tauce.getFinishTime());
tEvent.addEventInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(tauce.getCounters()));
tEntity.addEvent(tEvent);
tEntity.setEntityId(tauce.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
break;
case MAP_ATTEMPT_FINISHED:
MapAttemptFinishedEvent mafe = (MapAttemptFinishedEvent) event;
tEvent.addEventInfo("TASK_TYPE", mafe.getTaskType().toString());
tEvent.addEventInfo("FINISH_TIME", mafe.getFinishTime());
tEvent.addEventInfo("STATUS", mafe.getTaskStatus());
tEvent.addEventInfo("STATE", mafe.getState());
tEvent.addEventInfo("MAP_FINISH_TIME", mafe.getMapFinishTime());
tEvent.addEventInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(mafe.getCounters()));
tEvent.addEventInfo("HOSTNAME", mafe.getHostname());
tEvent.addEventInfo("PORT", mafe.getPort());
tEvent.addEventInfo("RACK_NAME", mafe.getRackName());
tEvent.addEventInfo("ATTEMPT_ID", mafe.getAttemptId() == null ?
"" : mafe.getAttemptId().toString());
tEntity.addEvent(tEvent);
tEntity.setEntityId(mafe.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
break;
case REDUCE_ATTEMPT_FINISHED:
ReduceAttemptFinishedEvent rafe = (ReduceAttemptFinishedEvent) event;
tEvent.addEventInfo("TASK_TYPE", rafe.getTaskType().toString());
tEvent.addEventInfo("ATTEMPT_ID", rafe.getAttemptId() == null ?
"" : rafe.getAttemptId().toString());
tEvent.addEventInfo("FINISH_TIME", rafe.getFinishTime());
tEvent.addEventInfo("STATUS", rafe.getTaskStatus());
tEvent.addEventInfo("STATE", rafe.getState());
tEvent.addEventInfo("SHUFFLE_FINISH_TIME", rafe.getShuffleFinishTime());
tEvent.addEventInfo("SORT_FINISH_TIME", rafe.getSortFinishTime());
tEvent.addEventInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(rafe.getCounters()));
tEvent.addEventInfo("HOSTNAME", rafe.getHostname());
tEvent.addEventInfo("PORT", rafe.getPort());
tEvent.addEventInfo("RACK_NAME", rafe.getRackName());
tEntity.addEvent(tEvent);
tEntity.setEntityId(rafe.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
break;
case SETUP_ATTEMPT_FINISHED:
case CLEANUP_ATTEMPT_FINISHED:
TaskAttemptFinishedEvent tafe = (TaskAttemptFinishedEvent) event;
tEvent.addEventInfo("TASK_TYPE", tafe.getTaskType().toString());
tEvent.addEventInfo("ATTEMPT_ID", tafe.getAttemptId() == null ?
"" : tafe.getAttemptId().toString());
tEvent.addEventInfo("FINISH_TIME", tafe.getFinishTime());
tEvent.addEventInfo("STATUS", tafe.getTaskStatus());
tEvent.addEventInfo("STATE", tafe.getState());
tEvent.addEventInfo("COUNTERS_GROUPS",
JobHistoryEventUtils.countersToJSON(tafe.getCounters()));
tEvent.addEventInfo("HOSTNAME", tafe.getHostname());
tEntity.addEvent(tEvent);
tEntity.setEntityId(tafe.getTaskId().toString());
tEntity.setEntityType(MAPREDUCE_TASK_ENTITY_TYPE);
tEntity.addRelatedEntity(MAPREDUCE_JOB_ENTITY_TYPE, jobId.toString());
break;
case AM_STARTED:
AMStartedEvent ase = (AMStartedEvent) event;
tEvent.addEventInfo("APPLICATION_ATTEMPT_ID",
ase.getAppAttemptId() == null ?
"" : ase.getAppAttemptId().toString());
tEvent.addEventInfo("CONTAINER_ID", ase.getContainerId() == null ?
"" : ase.getContainerId().toString());
tEvent.addEventInfo("NODE_MANAGER_HOST", ase.getNodeManagerHost());
tEvent.addEventInfo("NODE_MANAGER_PORT", ase.getNodeManagerPort());
tEvent.addEventInfo("NODE_MANAGER_HTTP_PORT",
ase.getNodeManagerHttpPort());
tEvent.addEventInfo("START_TIME", ase.getStartTime());
tEvent.addEventInfo("SUBMIT_TIME", ase.getSubmitTime());
tEntity.addEvent(tEvent);
tEntity.setEntityId(jobId.toString());
tEntity.setEntityType(MAPREDUCE_JOB_ENTITY_TYPE);
break;
default:
break;
}
try {
TimelinePutResponse response = timelineClient.putEntities(tEntity);
List<TimelinePutResponse.TimelinePutError> errors = response.getErrors();
if (errors.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Timeline entities are successfully put in event " + event
.getEventType());
}
} else {
for (TimelinePutResponse.TimelinePutError error : errors) {
LOG.error(
"Error when publishing entity [" + error.getEntityType() + ","
+ error.getEntityId() + "], server side error code: "
+ error.getErrorCode());
}
}
} catch (YarnException | IOException | ClientHandlerException ex) {
LOG.error("Error putting entity " + tEntity.getEntityId() + " to Timeline"
+ "Server", ex);
}
}
// create JobEntity from HistoryEvent with adding other info, like:
// jobId, timestamp and entityType.
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createJobEntity(HistoryEvent event, long timestamp, JobId jobId,
String entityType, boolean setCreatedTime) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
createBaseEntity(event, timestamp, entityType, setCreatedTime);
entity.setId(jobId.toString());
return entity;
}
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createJobEntity(JobId jobId) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.setId(jobId.toString());
entity.setType(MAPREDUCE_JOB_ENTITY_TYPE);
return entity;
}
// create ApplicationEntity with job finished Metrics from HistoryEvent
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createAppEntityWithJobMetrics(HistoryEvent event, JobId jobId) {
ApplicationEntity entity = new ApplicationEntity();
entity.setId(jobId.getAppId().toString());
entity.setMetrics(event.getTimelineMetrics());
return entity;
}
// create BaseEntity from HistoryEvent with adding other info, like:
// timestamp and entityType.
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createBaseEntity(HistoryEvent event, long timestamp, String entityType,
boolean setCreatedTime) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent tEvent =
event.toTimelineEvent();
tEvent.setTimestamp(timestamp);
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
entity.addEvent(tEvent);
entity.setType(entityType);
if (setCreatedTime) {
entity.setCreatedTime(timestamp);
}
Set<TimelineMetric> timelineMetrics = event.getTimelineMetrics();
if (timelineMetrics != null) {
entity.setMetrics(timelineMetrics);
}
return entity;
}
// create TaskEntity from HistoryEvent with adding other info, like:
// taskId, jobId, timestamp, entityType and relatedJobEntity.
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createTaskEntity(HistoryEvent event, long timestamp, String taskId,
String entityType, String relatedJobEntity, JobId jobId,
boolean setCreatedTime) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
createBaseEntity(event, timestamp, entityType, setCreatedTime);
entity.setId(taskId);
if (event.getEventType() == EventType.TASK_STARTED) {
entity.addInfo("TASK_TYPE",
((TaskStartedEvent)event).getTaskType().toString());
}
entity.addIsRelatedToEntity(relatedJobEntity, jobId.toString());
return entity;
}
// create TaskAttemptEntity from HistoryEvent with adding other info, like:
// timestamp, taskAttemptId, entityType, relatedTaskEntity and taskId.
private org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
createTaskAttemptEntity(HistoryEvent event, long timestamp,
String taskAttemptId, String entityType, String relatedTaskEntity,
String taskId, boolean setCreatedTime) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity =
createBaseEntity(event, timestamp, entityType, setCreatedTime);
entity.setId(taskAttemptId);
entity.addIsRelatedToEntity(relatedTaskEntity, taskId);
return entity;
}
private void publishConfigsOnJobSubmittedEvent(JobSubmittedEvent event,
JobId jobId) {
if (event.getJobConf() == null) {
return;
}
// Publish job configurations both as job and app entity.
// Configs are split into multiple entities if they exceed 100kb in size.
org.apache.hadoop.yarn.api.records.timelineservice.
TimelineEntity jobEntityForConfigs = createJobEntity(jobId);
ApplicationEntity appEntityForConfigs = new ApplicationEntity();
String appId = jobId.getAppId().toString();
appEntityForConfigs.setId(appId);
try {
int configSize = 0;
for (Map.Entry<String, String> entry : event.getJobConf()) {
int size = entry.getKey().length() + entry.getValue().length();
configSize += size;
if (configSize > JobHistoryEventUtils.ATS_CONFIG_PUBLISH_SIZE_BYTES) {
if (jobEntityForConfigs.getConfigs().size() > 0) {
timelineClient.putEntities(jobEntityForConfigs);
timelineClient.putEntities(appEntityForConfigs);
jobEntityForConfigs = createJobEntity(jobId);
appEntityForConfigs = new ApplicationEntity();
appEntityForConfigs.setId(appId);
}
configSize = size;
}
jobEntityForConfigs.addConfig(entry.getKey(), entry.getValue());
appEntityForConfigs.addConfig(entry.getKey(), entry.getValue());
}
if (configSize > 0) {
timelineClient.putEntities(jobEntityForConfigs);
timelineClient.putEntities(appEntityForConfigs);
}
} catch (IOException | YarnException e) {
LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " +
" for the job : " + jobId, e);
}
}
private void processEventForNewTimelineService(HistoryEvent event,
JobId jobId, long timestamp) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity =
null;
String taskId = null;
String taskAttemptId = null;
boolean setCreatedTime = false;
switch (event.getEventType()) {
// Handle job events
case JOB_SUBMITTED:
setCreatedTime = true;
break;
case JOB_STATUS_CHANGED:
case JOB_INFO_CHANGED:
case JOB_INITED:
case JOB_PRIORITY_CHANGED:
case JOB_QUEUE_CHANGED:
case JOB_FAILED:
case JOB_KILLED:
case JOB_ERROR:
case JOB_FINISHED:
case AM_STARTED:
case NORMALIZED_RESOURCE:
break;
// Handle task events
case TASK_STARTED:
setCreatedTime = true;
taskId = ((TaskStartedEvent)event).getTaskId().toString();
break;
case TASK_FAILED:
taskId = ((TaskFailedEvent)event).getTaskId().toString();
break;
case TASK_UPDATED:
taskId = ((TaskUpdatedEvent)event).getTaskId().toString();
break;
case TASK_FINISHED:
taskId = ((TaskFinishedEvent)event).getTaskId().toString();
break;
case MAP_ATTEMPT_STARTED:
case REDUCE_ATTEMPT_STARTED:
setCreatedTime = true;
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptStartedEvent)event).
getTaskAttemptId().toString();
break;
case CLEANUP_ATTEMPT_STARTED:
case SETUP_ATTEMPT_STARTED:
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptStartedEvent)event).
getTaskAttemptId().toString();
break;
case MAP_ATTEMPT_FAILED:
case CLEANUP_ATTEMPT_FAILED:
case REDUCE_ATTEMPT_FAILED:
case SETUP_ATTEMPT_FAILED:
case MAP_ATTEMPT_KILLED:
case CLEANUP_ATTEMPT_KILLED:
case REDUCE_ATTEMPT_KILLED:
case SETUP_ATTEMPT_KILLED:
taskId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
getTaskId().toString();
taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
getTaskAttemptId().toString();
break;
case MAP_ATTEMPT_FINISHED:
taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((MapAttemptFinishedEvent)event).
getAttemptId().toString();
break;
case REDUCE_ATTEMPT_FINISHED:
taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((ReduceAttemptFinishedEvent)event).
getAttemptId().toString();
break;
case SETUP_ATTEMPT_FINISHED:
case CLEANUP_ATTEMPT_FINISHED:
taskId = ((TaskAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptFinishedEvent)event).
getAttemptId().toString();
break;
default:
LOG.warn("EventType: " + event.getEventType() + " cannot be recognized" +
" and handled by timeline service.");
return;
}
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
appEntityWithJobMetrics = null;
if (taskId == null) {
// JobEntity
tEntity = createJobEntity(event, timestamp, jobId,
MAPREDUCE_JOB_ENTITY_TYPE, setCreatedTime);
if (event.getEventType() == EventType.JOB_FINISHED
&& event.getTimelineMetrics() != null) {
appEntityWithJobMetrics = createAppEntityWithJobMetrics(event, jobId);
}
} else {
if (taskAttemptId == null) {
// TaskEntity
tEntity = createTaskEntity(event, timestamp, taskId,
MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE,
jobId, setCreatedTime);
} else {
// TaskAttemptEntity
tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId,
MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE,
taskId, setCreatedTime);
}
}
try {
if (appEntityWithJobMetrics == null) {
timelineClient.putEntitiesAsync(tEntity);
} else {
timelineClient.putEntities(tEntity, appEntityWithJobMetrics);
}
} catch (IOException | YarnException e) {
LOG.error("Failed to process Event " + event.getEventType()
+ " for the job : " + jobId, e);
return;
}
if (event.getEventType() == EventType.JOB_SUBMITTED) {
// Publish configs after main job submitted event has been posted.
publishConfigsOnJobSubmittedEvent((JobSubmittedEvent)event, jobId);
}
}
private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) {
Counter slotMillisMapCounter = allCounters
.findCounter(JobCounter.SLOTS_MILLIS_MAPS);
if (slotMillisMapCounter != null) {
summary.setMapSlotSeconds(slotMillisMapCounter.getValue() / 1000);
}
Counter slotMillisReduceCounter = allCounters
.findCounter(JobCounter.SLOTS_MILLIS_REDUCES);
if (slotMillisReduceCounter != null) {
summary.setReduceSlotSeconds(slotMillisReduceCounter.getValue() / 1000);
}
}
protected void closeEventWriter(JobId jobId) throws IOException {
final MetaInfo mi = fileMap.get(jobId);
if (mi == null) {
throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
}
if (!mi.isWriterActive()) {
throw new IOException(
"Inactive Writer: Likely received multiple JobFinished / " +
"JobUnsuccessful events for JobId: ["
+ jobId + "]");
}
// Close the Writer
try {
mi.closeWriter();
} catch (IOException e) {
LOG.error("Error closing writer for JobID: " + jobId);
throw e;
}
}
protected void processDoneFiles(JobId jobId) throws IOException {
final MetaInfo mi = fileMap.get(jobId);
if (mi == null) {
throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
}
if (mi.getHistoryFile() == null) {
LOG.warn("No file for job-history with " + jobId + " found in cache!");
}
if (mi.getConfFile() == null) {
LOG.warn("No file for jobconf with " + jobId + " found in cache!");
}
// Writing out the summary file.
// TODO JH enhancement - reuse this file to store additional indexing info
// like ACLs, etc. JHServer can use HDFS append to build an index file
// with more info than is available via the filename.
Path qualifiedSummaryDoneFile = null;
FSDataOutputStream summaryFileOut = null;
try {
String doneSummaryFileName = getTempFileName(JobHistoryUtils
.getIntermediateSummaryFileName(jobId));
qualifiedSummaryDoneFile = doneDirFS.makeQualified(new Path(
doneDirPrefixPath, doneSummaryFileName));
summaryFileOut = doneDirFS.create(qualifiedSummaryDoneFile, true);
summaryFileOut.writeUTF(mi.getJobSummary().getJobSummaryString());
summaryFileOut.close();
doneDirFS.setPermission(qualifiedSummaryDoneFile, new FsPermission(
JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
} catch (IOException e) {
LOG.info("Unable to write out JobSummaryInfo to ["
+ qualifiedSummaryDoneFile + "]", e);
throw e;
}
try {
// Move historyFile to Done Folder.
Path qualifiedDoneFile = null;
if (mi.getHistoryFile() != null) {
Path historyFile = mi.getHistoryFile();
Path qualifiedLogFile = stagingDirFS.makeQualified(historyFile);
int jobNameLimit =
getConfig().getInt(JHAdminConfig.MR_HS_JOBNAME_LIMIT,
JHAdminConfig.DEFAULT_MR_HS_JOBNAME_LIMIT);
String doneJobHistoryFileName =
getTempFileName(FileNameIndexUtils.getDoneFileName(mi
.getJobIndexInfo(), jobNameLimit));
qualifiedDoneFile =
doneDirFS.makeQualified(new Path(doneDirPrefixPath,
doneJobHistoryFileName));
moveToDoneNow(qualifiedLogFile, qualifiedDoneFile);
}
// Move confFile to Done Folder
Path qualifiedConfDoneFile = null;
if (mi.getConfFile() != null) {
Path confFile = mi.getConfFile();
Path qualifiedConfFile = stagingDirFS.makeQualified(confFile);
String doneConfFileName =
getTempFileName(JobHistoryUtils
.getIntermediateConfFileName(jobId));
qualifiedConfDoneFile =
doneDirFS.makeQualified(new Path(doneDirPrefixPath,
doneConfFileName));
moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile);
}
moveTmpToDone(qualifiedSummaryDoneFile);
moveTmpToDone(qualifiedConfDoneFile);
moveTmpToDone(qualifiedDoneFile);
} catch (IOException e) {
LOG.error("Error closing writer for JobID: " + jobId);
throw e;
}
}
private class FlushTimerTask extends TimerTask {
private MetaInfo metaInfo;
private IOException ioe = null;
private volatile boolean shouldRun = true;
FlushTimerTask(MetaInfo metaInfo) {
this.metaInfo = metaInfo;
}
@Override
public void run() {
LOG.debug("In flush timer task");
synchronized (lock) {
try {
if (!metaInfo.isTimerShutDown() && shouldRun)
metaInfo.flush();
} catch (IOException e) {
ioe = e;
}
}
}
public IOException getException() {
return ioe;
}
public void stop() {
shouldRun = false;
this.cancel();
}
}
protected class MetaInfo {
private Path historyFile;
private Path confFile;
private EventWriter writer;
JobIndexInfo jobIndexInfo;
JobSummary jobSummary;
Timer flushTimer;
FlushTimerTask flushTimerTask;
private boolean isTimerShutDown = false;
private String forcedJobStateOnShutDown;
MetaInfo(Path historyFile, Path conf, EventWriter writer, String user,
String jobName, JobId jobId, String forcedJobStateOnShutDown,
String queueName) {
this.historyFile = historyFile;
this.confFile = conf;
this.writer = writer;
this.jobIndexInfo =
new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null,
queueName);
this.jobSummary = new JobSummary();
this.flushTimer = new Timer("FlushTimer", true);
this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
}
Path getHistoryFile() {
return historyFile;
}
Path getConfFile() {
return confFile;
}
JobIndexInfo getJobIndexInfo() {
return jobIndexInfo;
}
JobSummary getJobSummary() {
return jobSummary;
}
boolean isWriterActive() {
return writer != null;
}
boolean isTimerShutDown() {
return isTimerShutDown;
}
String getForcedJobStateOnShutDown() {
return forcedJobStateOnShutDown;
}
@Override
public String toString() {
return "Job MetaInfo for "+ jobSummary.getJobId()
+ " history file " + historyFile;
}
void closeWriter() throws IOException {
LOG.debug("Closing Writer");
synchronized (lock) {
if (writer != null) {
writer.close();
}
writer = null;
}
}
void writeEvent(HistoryEvent event) throws IOException {
LOG.debug("Writing event");
synchronized (lock) {
if (writer != null) {
writer.write(event);
processEventForFlush(event);
maybeFlush(event);
}
}
}
void processEventForFlush(HistoryEvent historyEvent) throws IOException {
if (EnumSet.of(EventType.MAP_ATTEMPT_FINISHED,
EventType.MAP_ATTEMPT_FAILED, EventType.MAP_ATTEMPT_KILLED,
EventType.REDUCE_ATTEMPT_FINISHED, EventType.REDUCE_ATTEMPT_FAILED,
EventType.REDUCE_ATTEMPT_KILLED, EventType.TASK_FINISHED,
EventType.TASK_FAILED, EventType.JOB_FINISHED, EventType.JOB_FAILED,
EventType.JOB_KILLED).contains(historyEvent.getEventType())) {
numUnflushedCompletionEvents++;
if (!isTimerActive) {
resetFlushTimer();
if (!isTimerShutDown) {
flushTimerTask = new FlushTimerTask(this);
flushTimer.schedule(flushTimerTask, flushTimeout);
isTimerActive = true;
}
}
}
}
void resetFlushTimer() throws IOException {
if (flushTimerTask != null) {
IOException exception = flushTimerTask.getException();
flushTimerTask.stop();
if (exception != null) {
throw exception;
}
flushTimerTask = null;
}
isTimerActive = false;
}
void maybeFlush(HistoryEvent historyEvent) throws IOException {
if ((eventQueue.size() < minQueueSizeForBatchingFlushes
&& numUnflushedCompletionEvents > 0)
|| numUnflushedCompletionEvents >= maxUnflushedCompletionEvents
|| isJobCompletionEvent(historyEvent)) {
this.flush();
}
}
void flush() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Flushing " + toString());
}
synchronized (lock) {
if (numUnflushedCompletionEvents != 0) { // skipped timer cancel.
writer.flush();
numUnflushedCompletionEvents = 0;
resetFlushTimer();
}
}
}
void shutDownTimer() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Shutting down timer "+ toString());
}
synchronized (lock) {
isTimerShutDown = true;
flushTimer.cancel();
if (flushTimerTask != null && flushTimerTask.getException() != null) {
throw flushTimerTask.getException();
}
}
}
}
private void moveTmpToDone(Path tmpPath) throws IOException {
if (tmpPath != null) {
String tmpFileName = tmpPath.getName();
String fileName = getFileNameFromTmpFN(tmpFileName);
Path path = new Path(tmpPath.getParent(), fileName);
doneDirFS.rename(tmpPath, path);
LOG.info("Moved tmp to done: " + tmpPath + " to " + path);
}
}
// TODO If the FS objects are the same, this should be a rename instead of a
// copy.
private void moveToDoneNow(Path fromPath, Path toPath) throws IOException {
// check if path exists, in case of retries it may not exist
if (stagingDirFS.exists(fromPath)) {
LOG.info("Copying " + fromPath.toString() + " to " + toPath.toString());
// TODO temporarily removing the existing dst
if (doneDirFS.exists(toPath)) {
doneDirFS.delete(toPath, true);
}
boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath,
false, getConfig());
if (copied)
LOG.info("Copied to done location: " + toPath);
else
LOG.info("copy failed");
doneDirFS.setPermission(toPath, new FsPermission(
JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS));
}
}
boolean pathExists(FileSystem fileSys, Path path) throws IOException {
return fileSys.exists(path);
}
private String getTempFileName(String srcFile) {
return srcFile + "_tmp";
}
private String getFileNameFromTmpFN(String tmpFileName) {
//TODO. Some error checking here.
return tmpFileName.substring(0, tmpFileName.length()-4);
}
public void setForcejobCompletion(boolean forceJobCompletion) {
this.forceJobCompletion = forceJobCompletion;
LOG.info("JobHistoryEventHandler notified that forceJobCompletion is "
+ forceJobCompletion);
}
private String createJobStateForJobUnsuccessfulCompletionEvent(
String forcedJobStateOnShutDown) {
if (forcedJobStateOnShutDown == null || forcedJobStateOnShutDown
.isEmpty()) {
return JobState.KILLED.toString();
} else if (forcedJobStateOnShutDown.equals(
JobStateInternal.ERROR.toString()) ||
forcedJobStateOnShutDown.equals(JobStateInternal.FAILED.toString())) {
return JobState.FAILED.toString();
} else if (forcedJobStateOnShutDown.equals(JobStateInternal.SUCCEEDED
.toString())) {
return JobState.SUCCEEDED.toString();
}
return JobState.KILLED.toString();
}
@VisibleForTesting
boolean getFlushTimerStatus() {
return isTimerActive;
}
}