| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.jobhistory; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileAlreadyExistsException; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.mapreduce.Counter; |
| import org.apache.hadoop.mapreduce.Counters; |
| import org.apache.hadoop.mapreduce.JobCounter; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.TaskType; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobId; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobState; |
| import org.apache.hadoop.mapreduce.v2.app.AppContext; |
| import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; |
| import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; |
| import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.yarn.YarnException; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.service.AbstractService; |
| |
| /** |
| * The job history events get routed to this class. This class writes the Job |
| * history events to the DFS directly into a staging dir and then moved to a |
| * done-dir. JobHistory implementation is in this package to access package |
| * private classes. |
| */ |
| public class JobHistoryEventHandler extends AbstractService |
| implements EventHandler<JobHistoryEvent> { |
| |
| private final AppContext context; |
| private final int startCount; |
| |
| private int eventCounter; |
| |
| //TODO Does the FS object need to be different ? |
| private FileSystem stagingDirFS; // log Dir FileSystem |
| private FileSystem doneDirFS; // done Dir FileSystem |
| |
| |
| private Path stagingDirPath = null; |
| private Path doneDirPrefixPath = null; // folder for completed jobs |
| |
| private int maxUnflushedCompletionEvents; |
| private int postJobCompletionMultiplier; |
| private long flushTimeout; |
| private int minQueueSizeForBatchingFlushes; // TODO: Rename |
| |
| private int numUnflushedCompletionEvents = 0; |
| private boolean isTimerActive; |
| |
| |
| protected BlockingQueue<JobHistoryEvent> eventQueue = |
| new LinkedBlockingQueue<JobHistoryEvent>(); |
| protected Thread eventHandlingThread; |
| private volatile boolean stopped; |
| private final Object lock = new Object(); |
| |
| private static final Log LOG = LogFactory.getLog( |
| JobHistoryEventHandler.class); |
| |
| private static final Map<JobId, MetaInfo> fileMap = |
| Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>()); |
| |
| public JobHistoryEventHandler(AppContext context, int startCount) { |
| super("JobHistoryEventHandler"); |
| this.context = context; |
| this.startCount = startCount; |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.hadoop.yarn.service.AbstractService#init(org. |
| * apache.hadoop.conf.Configuration) |
| * Initializes the FileSystem and Path objects for the log and done directories. |
| * Creates these directories if they do not already exist. |
| */ |
| @Override |
| public void init(Configuration conf) { |
| |
| String stagingDirStr = null; |
| String doneDirStr = null; |
| String userDoneDirStr = null; |
| try { |
| stagingDirStr = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf); |
| doneDirStr = |
| JobHistoryUtils.getConfiguredHistoryIntermediateDoneDirPrefix(conf); |
| userDoneDirStr = |
| JobHistoryUtils.getHistoryIntermediateDoneDirForUser(conf); |
| } catch (IOException e) { |
| LOG.error("Failed while getting the configured log directories", e); |
| throw new YarnException(e); |
| } |
| |
| //Check for the existence of the history staging dir. Maybe create it. |
| try { |
| stagingDirPath = |
| FileSystem.get(conf).makeQualified(new Path(stagingDirStr)); |
| stagingDirFS = FileSystem.get(stagingDirPath.toUri(), conf); |
| mkdir(stagingDirFS, stagingDirPath, new FsPermission( |
| JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS)); |
| } catch (IOException e) { |
| LOG.error("Failed while checking for/creating history staging path: [" |
| + stagingDirPath + "]", e); |
| throw new YarnException(e); |
| } |
| |
| //Check for the existence of intermediate done dir. |
| Path doneDirPath = null; |
| try { |
| doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirStr)); |
| doneDirFS = FileSystem.get(doneDirPath.toUri(), conf); |
| // This directory will be in a common location, or this may be a cluster |
| // meant for a single user. Creating based on the conf. Should ideally be |
| // created by the JobHistoryServer or as part of deployment. |
| if (!doneDirFS.exists(doneDirPath)) { |
| if (JobHistoryUtils.shouldCreateNonUserDirectory(conf)) { |
| LOG.info("Creating intermediate history logDir: [" |
| + doneDirPath |
| + "] + based on conf. Should ideally be created by the JobHistoryServer: " |
| + MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR); |
| mkdir( |
| doneDirFS, |
| doneDirPath, |
| new FsPermission( |
| JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS |
| .toShort())); |
| // TODO Temporary toShort till new FsPermission(FsPermissions) |
| // respects |
| // sticky |
| } else { |
| String message = "Not creating intermediate history logDir: [" |
| + doneDirPath |
| + "] based on conf: " |
| + MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR |
| + ". Either set to true or pre-create this directory with" + |
| " appropriate permissions"; |
| LOG.error(message); |
| throw new YarnException(message); |
| } |
| } |
| } catch (IOException e) { |
| LOG.error("Failed checking for the existance of history intermediate " + |
| "done directory: [" + doneDirPath + "]"); |
| throw new YarnException(e); |
| } |
| |
| //Check/create user directory under intermediate done dir. |
| try { |
| doneDirPrefixPath = |
| FileSystem.get(conf).makeQualified(new Path(userDoneDirStr)); |
| mkdir(doneDirFS, doneDirPrefixPath, new FsPermission( |
| JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS)); |
| } catch (IOException e) { |
| LOG.error("Error creating user intermediate history done directory: [ " |
| + doneDirPrefixPath + "]", e); |
| throw new YarnException(e); |
| } |
| |
| // Maximum number of unflushed completion-events that can stay in the queue |
| // before flush kicks in. |
| maxUnflushedCompletionEvents = |
| conf.getInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, |
| MRJobConfig.DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS); |
| // We want to cut down flushes after job completes so as to write quicker, |
| // so we increase maxUnflushedEvents post Job completion by using the |
| // following multiplier. |
| postJobCompletionMultiplier = |
| conf.getInt( |
| MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, |
| MRJobConfig.DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER); |
| // Max time until which flush doesn't take place. |
| flushTimeout = |
| conf.getLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS, |
| MRJobConfig.DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS); |
| minQueueSizeForBatchingFlushes = |
| conf.getInt( |
| MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, |
| MRJobConfig.DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD); |
| |
| super.init(conf); |
| } |
| |
| private void mkdir(FileSystem fs, Path path, FsPermission fsp) |
| throws IOException { |
| if (!fs.exists(path)) { |
| try { |
| fs.mkdirs(path, fsp); |
| FileStatus fsStatus = fs.getFileStatus(path); |
| LOG.info("Perms after creating " + fsStatus.getPermission().toShort() |
| + ", Expected: " + fsp.toShort()); |
| if (fsStatus.getPermission().toShort() != fsp.toShort()) { |
| LOG.info("Explicitly setting permissions to : " + fsp.toShort() |
| + ", " + fsp); |
| fs.setPermission(path, fsp); |
| } |
| } catch (FileAlreadyExistsException e) { |
| LOG.info("Directory: [" + path + "] already exists."); |
| } |
| } |
| } |
| |
| @Override |
| public void start() { |
| eventHandlingThread = new Thread(new Runnable() { |
| @Override |
| public void run() { |
| JobHistoryEvent event = null; |
| while (!stopped && !Thread.currentThread().isInterrupted()) { |
| |
| // Log the size of the history-event-queue every so often. |
| if (eventCounter != 0 && eventCounter % 1000 == 0) { |
| eventCounter = 0; |
| LOG.info("Size of the JobHistory event queue is " |
| + eventQueue.size()); |
| } else { |
| eventCounter++; |
| } |
| |
| try { |
| event = eventQueue.take(); |
| } catch (InterruptedException e) { |
| LOG.info("EventQueue take interrupted. Returning"); |
| return; |
| } |
| // If an event has been removed from the queue. Handle it. |
| // The rest of the queue is handled via stop() |
| // Clear the interrupt status if it's set before calling handleEvent |
| // and set it if it was set before calling handleEvent. |
| // Interrupts received from other threads during handleEvent cannot be |
| // dealth with - Shell.runCommand() ignores them. |
| synchronized (lock) { |
| boolean isInterrupted = Thread.interrupted(); |
| handleEvent(event); |
| if (isInterrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| }); |
| eventHandlingThread.start(); |
| super.start(); |
| } |
| |
| @Override |
| public void stop() { |
| LOG.info("Stopping JobHistoryEventHandler. " |
| + "Size of the outstanding queue size is " + eventQueue.size()); |
| stopped = true; |
| //do not interrupt while event handling is in progress |
| synchronized(lock) { |
| if (eventHandlingThread != null) |
| eventHandlingThread.interrupt(); |
| } |
| |
| try { |
| if (eventHandlingThread != null) |
| eventHandlingThread.join(); |
| } catch (InterruptedException ie) { |
| LOG.info("Interruped Exception while stopping", ie); |
| } |
| |
| // Cancel all timers - so that they aren't invoked during or after |
| // the metaInfo object is wrapped up. |
| for (MetaInfo mi : fileMap.values()) { |
| try { |
| mi.shutDownTimer(); |
| } catch (IOException e) { |
| LOG.info("Exception while cancelling delayed flush timer. " |
| + "Likely caused by a failed flush " + e.getMessage()); |
| } |
| } |
| |
| //write all the events remaining in queue |
| Iterator<JobHistoryEvent> it = eventQueue.iterator(); |
| while(it.hasNext()) { |
| JobHistoryEvent ev = it.next(); |
| LOG.info("In stop, writing event " + ev.getType()); |
| handleEvent(ev); |
| } |
| |
| //close all file handles |
| for (MetaInfo mi : fileMap.values()) { |
| try { |
| mi.closeWriter(); |
| } catch (IOException e) { |
| LOG.info("Exception while closing file " + e.getMessage()); |
| } |
| } |
| LOG.info("Stopped JobHistoryEventHandler. super.stop()"); |
| super.stop(); |
| } |
| |
| protected EventWriter createEventWriter(Path historyFilePath) |
| throws IOException { |
| FSDataOutputStream out = stagingDirFS.create(historyFilePath, true); |
| return new EventWriter(out); |
| } |
| |
| /** |
| * Create an event writer for the Job represented by the jobID. |
| * Writes out the job configuration to the log directory. |
| * This should be the first call to history for a job |
| * |
| * @param jobId the jobId. |
| * @throws IOException |
| */ |
| protected void setupEventWriter(JobId jobId) |
| throws IOException { |
| if (stagingDirPath == null) { |
| LOG.error("Log Directory is null, returning"); |
| throw new IOException("Missing Log Directory for History"); |
| } |
| |
| MetaInfo oldFi = fileMap.get(jobId); |
| Configuration conf = getConfig(); |
| |
| // TODO Ideally this should be written out to the job dir |
| // (.staging/jobid/files - RecoveryService will need to be patched) |
| Path historyFile = JobHistoryUtils.getStagingJobHistoryFile( |
| stagingDirPath, jobId, startCount); |
| String user = UserGroupInformation.getCurrentUser().getShortUserName(); |
| if (user == null) { |
| throw new IOException( |
| "User is null while setting up jobhistory eventwriter"); |
| } |
| |
| String jobName = context.getJob(jobId).getName(); |
| EventWriter writer = (oldFi == null) ? null : oldFi.writer; |
| |
| Path logDirConfPath = |
| JobHistoryUtils.getStagingConfFile(stagingDirPath, jobId, startCount); |
| if (writer == null) { |
| try { |
| writer = createEventWriter(historyFile); |
| LOG.info("Event Writer setup for JobId: " + jobId + ", File: " |
| + historyFile); |
| } catch (IOException ioe) { |
| LOG.info("Could not create log file: [" + historyFile + "] + for job " |
| + "[" + jobName + "]"); |
| throw ioe; |
| } |
| |
| //Write out conf only if the writer isn't already setup. |
| if (conf != null) { |
| // TODO Ideally this should be written out to the job dir |
| // (.staging/jobid/files - RecoveryService will need to be patched) |
| FSDataOutputStream jobFileOut = null; |
| try { |
| if (logDirConfPath != null) { |
| jobFileOut = stagingDirFS.create(logDirConfPath, true); |
| conf.writeXml(jobFileOut); |
| jobFileOut.close(); |
| } |
| } catch (IOException e) { |
| LOG.info("Failed to write the job configuration file", e); |
| throw e; |
| } |
| } |
| } |
| |
| MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer, |
| user, jobName, jobId); |
| fi.getJobSummary().setJobId(jobId); |
| fileMap.put(jobId, fi); |
| } |
| |
| /** Close the event writer for this id |
| * @throws IOException */ |
| public void closeWriter(JobId id) throws IOException { |
| try { |
| final MetaInfo mi = fileMap.get(id); |
| if (mi != null) { |
| mi.closeWriter(); |
| } |
| |
| } catch (IOException e) { |
| LOG.error("Error closing writer for JobID: " + id); |
| throw e; |
| } |
| } |
| |
| @Override |
| public void handle(JobHistoryEvent event) { |
| try { |
| if (isJobCompletionEvent(event.getHistoryEvent())) { |
| // When the job is complete, flush slower but write faster. |
| maxUnflushedCompletionEvents = |
| maxUnflushedCompletionEvents * postJobCompletionMultiplier; |
| } |
| |
| eventQueue.put(event); |
| } catch (InterruptedException e) { |
| throw new YarnException(e); |
| } |
| } |
| |
| private boolean isJobCompletionEvent(HistoryEvent historyEvent) { |
| if (EnumSet.of(EventType.JOB_FINISHED, EventType.JOB_FAILED, |
| EventType.JOB_KILLED).contains(historyEvent.getEventType())) { |
| return true; |
| } |
| return false; |
| } |
| |
| protected void handleEvent(JobHistoryEvent event) { |
| synchronized (lock) { |
| |
| // If this is JobSubmitted Event, setup the writer |
| if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) { |
| try { |
| setupEventWriter(event.getJobID()); |
| } catch (IOException ioe) { |
| LOG.error("Error JobHistoryEventHandler in handleEvent: " + event, |
| ioe); |
| throw new YarnException(ioe); |
| } |
| } |
| |
| // For all events |
| // (1) Write it out |
| // (2) Process it for JobSummary |
| MetaInfo mi = fileMap.get(event.getJobID()); |
| try { |
| HistoryEvent historyEvent = event.getHistoryEvent(); |
| if (! (historyEvent instanceof NormalizedResourceEvent)) { |
| mi.writeEvent(historyEvent); |
| } |
| processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), |
| event.getJobID()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("In HistoryEventHandler " |
| + event.getHistoryEvent().getEventType()); |
| } |
| } catch (IOException e) { |
| LOG.error("Error writing History Event: " + event.getHistoryEvent(), |
| e); |
| throw new YarnException(e); |
| } |
| |
| if (event.getHistoryEvent().getEventType() == EventType.JOB_SUBMITTED) { |
| JobSubmittedEvent jobSubmittedEvent = |
| (JobSubmittedEvent) event.getHistoryEvent(); |
| mi.getJobIndexInfo().setSubmitTime(jobSubmittedEvent.getSubmitTime()); |
| mi.getJobIndexInfo().setQueueName(jobSubmittedEvent.getJobQueueName()); |
| } |
| |
| // If this is JobFinishedEvent, close the writer and setup the job-index |
| if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) { |
| try { |
| JobFinishedEvent jFinishedEvent = |
| (JobFinishedEvent) event.getHistoryEvent(); |
| mi.getJobIndexInfo().setFinishTime(jFinishedEvent.getFinishTime()); |
| mi.getJobIndexInfo().setNumMaps(jFinishedEvent.getFinishedMaps()); |
| mi.getJobIndexInfo().setNumReduces( |
| jFinishedEvent.getFinishedReduces()); |
| mi.getJobIndexInfo().setJobStatus(JobState.SUCCEEDED.toString()); |
| closeEventWriter(event.getJobID()); |
| } catch (IOException e) { |
| throw new YarnException(e); |
| } |
| } |
| |
| if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED |
| || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) { |
| try { |
| JobUnsuccessfulCompletionEvent jucEvent = |
| (JobUnsuccessfulCompletionEvent) event |
| .getHistoryEvent(); |
| mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime()); |
| mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps()); |
| mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces()); |
| mi.getJobIndexInfo().setJobStatus(jucEvent.getStatus()); |
| closeEventWriter(event.getJobID()); |
| } catch (IOException e) { |
| throw new YarnException(e); |
| } |
| } |
| } |
| } |
| |
| public void processEventForJobSummary(HistoryEvent event, JobSummary summary, |
| JobId jobId) { |
| // context.getJob could be used for some of this info as well. |
| switch (event.getEventType()) { |
| case JOB_SUBMITTED: |
| JobSubmittedEvent jse = (JobSubmittedEvent) event; |
| summary.setUser(jse.getUserName()); |
| summary.setQueue(jse.getJobQueueName()); |
| summary.setJobSubmitTime(jse.getSubmitTime()); |
| break; |
| case NORMALIZED_RESOURCE: |
| NormalizedResourceEvent normalizedResourceEvent = |
| (NormalizedResourceEvent) event; |
| if (normalizedResourceEvent.getTaskType() == TaskType.MAP) { |
| summary.setResourcesPerMap(normalizedResourceEvent.getMemory()); |
| } else if (normalizedResourceEvent.getTaskType() == TaskType.REDUCE) { |
| summary.setResourcesPerReduce(normalizedResourceEvent.getMemory()); |
| } |
| break; |
| case JOB_INITED: |
| JobInitedEvent jie = (JobInitedEvent) event; |
| summary.setJobLaunchTime(jie.getLaunchTime()); |
| break; |
| case MAP_ATTEMPT_STARTED: |
| TaskAttemptStartedEvent mtase = (TaskAttemptStartedEvent) event; |
| if (summary.getFirstMapTaskLaunchTime() == 0) |
| summary.setFirstMapTaskLaunchTime(mtase.getStartTime()); |
| break; |
| case REDUCE_ATTEMPT_STARTED: |
| TaskAttemptStartedEvent rtase = (TaskAttemptStartedEvent) event; |
| if (summary.getFirstReduceTaskLaunchTime() == 0) |
| summary.setFirstReduceTaskLaunchTime(rtase.getStartTime()); |
| break; |
| case JOB_FINISHED: |
| JobFinishedEvent jfe = (JobFinishedEvent) event; |
| summary.setJobFinishTime(jfe.getFinishTime()); |
| summary.setNumFinishedMaps(jfe.getFinishedMaps()); |
| summary.setNumFailedMaps(jfe.getFailedMaps()); |
| summary.setNumFinishedReduces(jfe.getFinishedReduces()); |
| summary.setNumFailedReduces(jfe.getFailedReduces()); |
| if (summary.getJobStatus() == null) |
| summary |
| .setJobStatus(org.apache.hadoop.mapreduce.JobStatus.State.SUCCEEDED |
| .toString()); |
| // TODO JOB_FINISHED does not have state. Effectively job history does not |
| // have state about the finished job. |
| setSummarySlotSeconds(summary, jfe.getTotalCounters()); |
| break; |
| case JOB_FAILED: |
| case JOB_KILLED: |
| JobUnsuccessfulCompletionEvent juce = (JobUnsuccessfulCompletionEvent) event; |
| summary.setJobStatus(juce.getStatus()); |
| summary.setNumFinishedMaps(context.getJob(jobId).getTotalMaps()); |
| summary.setNumFinishedReduces(context.getJob(jobId).getTotalReduces()); |
| summary.setJobFinishTime(juce.getFinishTime()); |
| setSummarySlotSeconds(summary, context.getJob(jobId).getAllCounters()); |
| break; |
| } |
| } |
| |
| private void setSummarySlotSeconds(JobSummary summary, Counters allCounters) { |
| |
| Counter slotMillisMapCounter = allCounters |
| .findCounter(JobCounter.SLOTS_MILLIS_MAPS); |
| if (slotMillisMapCounter != null) { |
| summary.setMapSlotSeconds(slotMillisMapCounter.getValue()); |
| } |
| |
| Counter slotMillisReduceCounter = allCounters |
| .findCounter(JobCounter.SLOTS_MILLIS_REDUCES); |
| if (slotMillisReduceCounter != null) { |
| summary.setMapSlotSeconds(slotMillisReduceCounter.getValue()); |
| } |
| } |
| |
| protected void closeEventWriter(JobId jobId) throws IOException { |
| |
| final MetaInfo mi = fileMap.get(jobId); |
| if (mi == null) { |
| throw new IOException("No MetaInfo found for JobId: [" + jobId + "]"); |
| } |
| |
| if (!mi.isWriterActive()) { |
| throw new IOException( |
| "Inactive Writer: Likely received multiple JobFinished / " + |
| "JobUnsuccessful events for JobId: [" |
| + jobId + "]"); |
| } |
| |
| // Close the Writer |
| try { |
| mi.closeWriter(); |
| } catch (IOException e) { |
| LOG.error("Error closing writer for JobID: " + jobId); |
| throw e; |
| } |
| |
| if (mi.getHistoryFile() == null) { |
| LOG.warn("No file for job-history with " + jobId + " found in cache!"); |
| } |
| if (mi.getConfFile() == null) { |
| LOG.warn("No file for jobconf with " + jobId + " found in cache!"); |
| } |
| |
| // Writing out the summary file. |
| // TODO JH enhancement - reuse this file to store additional indexing info |
| // like ACLs, etc. JHServer can use HDFS append to build an index file |
| // with more info than is available via the filename. |
| Path qualifiedSummaryDoneFile = null; |
| FSDataOutputStream summaryFileOut = null; |
| try { |
| String doneSummaryFileName = getTempFileName(JobHistoryUtils |
| .getIntermediateSummaryFileName(jobId)); |
| qualifiedSummaryDoneFile = doneDirFS.makeQualified(new Path( |
| doneDirPrefixPath, doneSummaryFileName)); |
| summaryFileOut = doneDirFS.create(qualifiedSummaryDoneFile, true); |
| summaryFileOut.writeUTF(mi.getJobSummary().getJobSummaryString()); |
| summaryFileOut.close(); |
| } catch (IOException e) { |
| LOG.info("Unable to write out JobSummaryInfo to [" |
| + qualifiedSummaryDoneFile + "]", e); |
| throw e; |
| } |
| |
| try { |
| |
| // Move historyFile to Done Folder. |
| Path qualifiedDoneFile = null; |
| if (mi.getHistoryFile() != null) { |
| Path historyFile = mi.getHistoryFile(); |
| Path qualifiedLogFile = stagingDirFS.makeQualified(historyFile); |
| String doneJobHistoryFileName = |
| getTempFileName(FileNameIndexUtils.getDoneFileName(mi |
| .getJobIndexInfo())); |
| qualifiedDoneFile = |
| doneDirFS.makeQualified(new Path(doneDirPrefixPath, |
| doneJobHistoryFileName)); |
| moveToDoneNow(qualifiedLogFile, qualifiedDoneFile); |
| } |
| |
| // Move confFile to Done Folder |
| Path qualifiedConfDoneFile = null; |
| if (mi.getConfFile() != null) { |
| Path confFile = mi.getConfFile(); |
| Path qualifiedConfFile = stagingDirFS.makeQualified(confFile); |
| String doneConfFileName = |
| getTempFileName(JobHistoryUtils |
| .getIntermediateConfFileName(jobId)); |
| qualifiedConfDoneFile = |
| doneDirFS.makeQualified(new Path(doneDirPrefixPath, |
| doneConfFileName)); |
| moveToDoneNow(qualifiedConfFile, qualifiedConfDoneFile); |
| } |
| |
| moveTmpToDone(qualifiedSummaryDoneFile); |
| moveTmpToDone(qualifiedConfDoneFile); |
| moveTmpToDone(qualifiedDoneFile); |
| |
| } catch (IOException e) { |
| LOG.error("Error closing writer for JobID: " + jobId); |
| throw e; |
| } |
| } |
| |
| private class FlushTimerTask extends TimerTask { |
| private MetaInfo metaInfo; |
| private IOException ioe = null; |
| private volatile boolean shouldRun = true; |
| |
| FlushTimerTask(MetaInfo metaInfo) { |
| this.metaInfo = metaInfo; |
| } |
| |
| @Override |
| public void run() { |
| synchronized (lock) { |
| try { |
| if (!metaInfo.isTimerShutDown() && shouldRun) |
| metaInfo.flush(); |
| } catch (IOException e) { |
| ioe = e; |
| } |
| } |
| } |
| |
| public IOException getException() { |
| return ioe; |
| } |
| |
| public void stop() { |
| shouldRun = false; |
| this.cancel(); |
| } |
| } |
| |
| private class MetaInfo { |
| private Path historyFile; |
| private Path confFile; |
| private EventWriter writer; |
| JobIndexInfo jobIndexInfo; |
| JobSummary jobSummary; |
| Timer flushTimer; |
| FlushTimerTask flushTimerTask; |
| private boolean isTimerShutDown = false; |
| |
| MetaInfo(Path historyFile, Path conf, EventWriter writer, String user, |
| String jobName, JobId jobId) { |
| this.historyFile = historyFile; |
| this.confFile = conf; |
| this.writer = writer; |
| this.jobIndexInfo = |
| new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null); |
| this.jobSummary = new JobSummary(); |
| this.flushTimer = new Timer("FlushTimer", true); |
| } |
| |
| Path getHistoryFile() { |
| return historyFile; |
| } |
| |
| Path getConfFile() { |
| return confFile; |
| } |
| |
| JobIndexInfo getJobIndexInfo() { |
| return jobIndexInfo; |
| } |
| |
| JobSummary getJobSummary() { |
| return jobSummary; |
| } |
| |
| boolean isWriterActive() { |
| return writer != null; |
| } |
| |
| boolean isTimerShutDown() { |
| return isTimerShutDown; |
| } |
| |
| void closeWriter() throws IOException { |
| synchronized (lock) { |
| if (writer != null) { |
| writer.close(); |
| } |
| writer = null; |
| } |
| } |
| |
| void writeEvent(HistoryEvent event) throws IOException { |
| synchronized (lock) { |
| if (writer != null) { |
| writer.write(event); |
| processEventForFlush(event); |
| maybeFlush(event); |
| } |
| } |
| } |
| |
| void processEventForFlush(HistoryEvent historyEvent) throws IOException { |
| if (EnumSet.of(EventType.MAP_ATTEMPT_FINISHED, |
| EventType.MAP_ATTEMPT_FAILED, EventType.MAP_ATTEMPT_KILLED, |
| EventType.REDUCE_ATTEMPT_FINISHED, EventType.REDUCE_ATTEMPT_FAILED, |
| EventType.REDUCE_ATTEMPT_KILLED, EventType.TASK_FINISHED, |
| EventType.TASK_FAILED, EventType.JOB_FINISHED, EventType.JOB_FAILED, |
| EventType.JOB_KILLED).contains(historyEvent.getEventType())) { |
| numUnflushedCompletionEvents++; |
| if (!isTimerActive) { |
| resetFlushTimer(); |
| if (!isTimerShutDown) { |
| flushTimerTask = new FlushTimerTask(this); |
| flushTimer.schedule(flushTimerTask, flushTimeout); |
| } |
| } |
| } |
| } |
| |
| void resetFlushTimer() throws IOException { |
| if (flushTimerTask != null) { |
| IOException exception = flushTimerTask.getException(); |
| flushTimerTask.stop(); |
| if (exception != null) { |
| throw exception; |
| } |
| flushTimerTask = null; |
| } |
| isTimerActive = false; |
| } |
| |
| void maybeFlush(HistoryEvent historyEvent) throws IOException { |
| if ((eventQueue.size() < minQueueSizeForBatchingFlushes |
| && numUnflushedCompletionEvents > 0) |
| || numUnflushedCompletionEvents >= maxUnflushedCompletionEvents |
| || isJobCompletionEvent(historyEvent)) { |
| this.flush(); |
| } |
| } |
| |
| void flush() throws IOException { |
| synchronized (lock) { |
| if (numUnflushedCompletionEvents != 0) { // skipped timer cancel. |
| writer.flush(); |
| numUnflushedCompletionEvents = 0; |
| resetFlushTimer(); |
| } |
| } |
| } |
| |
| void shutDownTimer() throws IOException { |
| synchronized (lock) { |
| isTimerShutDown = true; |
| flushTimer.cancel(); |
| if (flushTimerTask != null && flushTimerTask.getException() != null) { |
| throw flushTimerTask.getException(); |
| } |
| } |
| } |
| } |
| |
| private void moveTmpToDone(Path tmpPath) throws IOException { |
| if (tmpPath != null) { |
| String tmpFileName = tmpPath.getName(); |
| String fileName = getFileNameFromTmpFN(tmpFileName); |
| Path path = new Path(tmpPath.getParent(), fileName); |
| doneDirFS.rename(tmpPath, path); |
| LOG.info("Moved tmp to done: " + tmpPath + " to " + path); |
| } |
| } |
| |
| // TODO If the FS objects are the same, this should be a rename instead of a |
| // copy. |
| private void moveToDoneNow(Path fromPath, Path toPath) throws IOException { |
| // check if path exists, in case of retries it may not exist |
| if (stagingDirFS.exists(fromPath)) { |
| LOG.info("Moving " + fromPath.toString() + " to " + toPath.toString()); |
| // TODO temporarily removing the existing dst |
| if (doneDirFS.exists(toPath)) { |
| doneDirFS.delete(toPath, true); |
| } |
| boolean copied = FileUtil.copy(stagingDirFS, fromPath, doneDirFS, toPath, |
| false, getConfig()); |
| |
| if (copied) |
| LOG.info("Copied to done location: " + toPath); |
| else |
| LOG.info("copy failed"); |
| doneDirFS.setPermission(toPath, new FsPermission( |
| JobHistoryUtils.HISTORY_INTERMEDIATE_FILE_PERMISSIONS)); |
| |
| stagingDirFS.delete(fromPath, false); |
| } |
| } |
| |
| boolean pathExists(FileSystem fileSys, Path path) throws IOException { |
| return fileSys.exists(path); |
| } |
| |
| private String getTempFileName(String srcFile) { |
| return srcFile + "_tmp"; |
| } |
| |
| private String getFileNameFromTmpFN(String tmpFileName) { |
| //TODO. Some error checking here. |
| return tmpFileName.substring(0, tmpFileName.length()-4); |
| } |
| } |