| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.v2.hs; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.regex.Pattern; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileAlreadyExistsException; |
| import org.apache.hadoop.fs.FileContext; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.Options; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathFilter; |
| import org.apache.hadoop.fs.RemoteIterator; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.mapred.JobACLsManager; |
| import org.apache.hadoop.mapreduce.JobID; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.TypeConverter; |
| import org.apache.hadoop.mapreduce.jobhistory.JobSummary; |
| import org.apache.hadoop.mapreduce.v2.api.records.JobId; |
| import org.apache.hadoop.mapreduce.v2.app.job.Job; |
| import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; |
| import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; |
| import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; |
| import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; |
| import org.apache.hadoop.yarn.Clock; |
| import org.apache.hadoop.yarn.YarnException; |
| import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.event.EventHandler; |
| import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; |
| import org.apache.hadoop.yarn.service.AbstractService; |
| |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| |
| /* |
| * Loads and manages the Job history cache. |
| */ |
| public class JobHistory extends AbstractService implements HistoryContext { |
| |
| private static final int DEFAULT_JOBLIST_CACHE_SIZE = 20000; |
| private static final int DEFAULT_LOADEDJOB_CACHE_SIZE = 5; |
| private static final int DEFAULT_DATESTRING_CACHE_SIZE = 200000; |
| private static final long DEFAULT_MOVE_THREAD_INTERVAL = 3 * 60 * 1000l; //3 minutes |
| private static final int DEFAULT_MOVE_THREAD_COUNT = 3; |
| |
| static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L; //1 week |
| static final long DEFAULT_RUN_INTERVAL = 1 * 24 * 60 * 60 * 1000l; //1 day |
| |
| private static final Log LOG = LogFactory.getLog(JobHistory.class); |
| |
| private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class); |
| public static final Pattern CONF_FILENAME_REGEX = |
| Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?"); |
| public static final String OLD_SUFFIX = ".old"; |
| |
| private static String DONE_BEFORE_SERIAL_TAIL = |
| JobHistoryUtils.doneSubdirsBeforeSerialTail(); |
| |
| /** |
| * Maps between a serial number (generated based on jobId) and the timestamp |
| * component(s) to which it belongs. |
| * Facilitates jobId based searches. |
| * If a jobId is not found in this list - it will not be found. |
| */ |
| private final SortedMap<String, Set<String>> idToDateString = |
| new ConcurrentSkipListMap<String, Set<String>>(); |
| |
| //Maintains minimal details for recent jobs (parsed from history file name). |
| //Sorted on Job Completion Time. |
| private final SortedMap<JobId, MetaInfo> jobListCache = |
| new ConcurrentSkipListMap<JobId, MetaInfo>(); |
| |
| |
| // Re-use exisiting MetaInfo objects if they exist for the specific JobId. (synchronization on MetaInfo) |
| // Check for existance of the object when using iterators. |
| private final SortedMap<JobId, MetaInfo> intermediateListCache = |
| new ConcurrentSkipListMap<JobId, JobHistory.MetaInfo>(); |
| |
| //Maintains a list of known done subdirectories. Not currently used. |
| private final Set<Path> existingDoneSubdirs = new HashSet<Path>(); |
| |
| private final SortedMap<JobId, Job> loadedJobCache = |
| new ConcurrentSkipListMap<JobId, Job>(); |
| |
| /** |
| * Maintains a mapping between intermediate user directories and the last |
| * known modification time. |
| */ |
| private Map<String, Long> userDirModificationTimeMap = |
| new HashMap<String, Long>(); |
| |
| //The number of jobs to maintain in the job list cache. |
| private int jobListCacheSize; |
| |
| private JobACLsManager aclsMgr; |
| |
| //The number of loaded jobs. |
| private int loadedJobCacheSize; |
| |
| //The number of entries in idToDateString |
| private int dateStringCacheSize; |
| |
| //Time interval for the move thread. |
| private long moveThreadInterval; |
| |
| //Number of move threads. |
| private int numMoveThreads; |
| |
| private Configuration conf; |
| |
| private boolean debugMode; |
| private int serialNumberLowDigits; |
| private String serialNumberFormat; |
| |
| |
| private Path doneDirPrefixPath = null; // folder for completed jobs |
| private FileContext doneDirFc; // done Dir FileContext |
| |
| private Path intermediateDoneDirPath = null; //Intermediate Done Dir Path |
| private FileContext intermediateDoneDirFc; //Intermediate Done Dir FileContext |
| |
| private Thread moveIntermediateToDoneThread = null; |
| private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null; |
| private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null; |
| |
| /** |
| * Writes out files to the path |
| * .....${DONE_DIR}/VERSION_STRING/YYYY/MM/DD/HH/SERIAL_NUM/jh{index_entries}.jhist |
| */ |
| |
| @Override |
| public void init(Configuration conf) throws YarnException { |
| LOG.info("JobHistory Init"); |
| this.conf = conf; |
| this.appID = RecordFactoryProvider.getRecordFactory(conf) |
| .newRecordInstance(ApplicationId.class); |
| this.appAttemptID = RecordFactoryProvider.getRecordFactory(conf) |
| .newRecordInstance(ApplicationAttemptId.class); |
| |
| debugMode = conf.getBoolean(JHAdminConfig.MR_HISTORY_DEBUG_MODE, false); |
| serialNumberLowDigits = debugMode ? 1 : 3; |
| serialNumberFormat = ("%0" |
| + (JobHistoryUtils.SERIAL_NUMBER_DIRECTORY_DIGITS |
| + serialNumberLowDigits) + "d"); |
| |
| String doneDirPrefix = null; |
| doneDirPrefix = JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); |
| try { |
| doneDirPrefixPath = FileContext.getFileContext(conf).makeQualified( |
| new Path(doneDirPrefix)); |
| doneDirFc = FileContext.getFileContext(doneDirPrefixPath.toUri(), conf); |
| mkdir(doneDirFc, doneDirPrefixPath, new FsPermission( |
| JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION)); |
| } catch (IOException e) { |
| throw new YarnException("Error creating done directory: [" + |
| doneDirPrefixPath + "]", e); |
| } |
| |
| String intermediateDoneDirPrefix = null; |
| intermediateDoneDirPrefix = JobHistoryUtils |
| .getConfiguredHistoryIntermediateDoneDirPrefix(conf); |
| try { |
| intermediateDoneDirPath = FileContext.getFileContext(conf) |
| .makeQualified(new Path(intermediateDoneDirPrefix)); |
| intermediateDoneDirFc = FileContext.getFileContext( |
| intermediateDoneDirPath.toUri(), conf); |
| mkdir(intermediateDoneDirFc, intermediateDoneDirPath, new FsPermission( |
| JobHistoryUtils.HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS.toShort())); |
| } catch (IOException e) { |
| LOG.info("error creating done directory on dfs " + e); |
| throw new YarnException("Error creating intermediate done directory: [" |
| + intermediateDoneDirPath + "]", e); |
| } |
| |
| this.aclsMgr = new JobACLsManager(conf); |
| |
| jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, |
| DEFAULT_JOBLIST_CACHE_SIZE); |
| loadedJobCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_LOADED_JOB_CACHE_SIZE, |
| DEFAULT_LOADEDJOB_CACHE_SIZE); |
| dateStringCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, |
| DEFAULT_DATESTRING_CACHE_SIZE); |
| moveThreadInterval = |
| conf.getLong(JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS, |
| DEFAULT_MOVE_THREAD_INTERVAL); |
| numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, |
| DEFAULT_MOVE_THREAD_COUNT); |
| try { |
| initExisting(); |
| } catch (IOException e) { |
| throw new YarnException("Failed to intialize existing directories", e); |
| } |
| super.init(conf); |
| } |
| |
| private void mkdir(FileContext fc, Path path, FsPermission fsp) |
| throws IOException { |
| if (!fc.util().exists(path)) { |
| try { |
| fc.mkdir(path, fsp, true); |
| |
| FileStatus fsStatus = fc.getFileStatus(path); |
| LOG.info("Perms after creating " + fsStatus.getPermission().toShort() |
| + ", Expected: " + fsp.toShort()); |
| if (fsStatus.getPermission().toShort() != fsp.toShort()) { |
| LOG.info("Explicitly setting permissions to : " + fsp.toShort() |
| + ", " + fsp); |
| fc.setPermission(path, fsp); |
| } |
| } catch (FileAlreadyExistsException e) { |
| LOG.info("Directory: [" + path + "] already exists."); |
| } |
| } |
| } |
| |
| @Override |
| public void start() { |
| //Start moveIntermediatToDoneThread |
| moveIntermediateToDoneRunnable = |
| new MoveIntermediateToDoneRunnable(moveThreadInterval, numMoveThreads); |
| moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable); |
| moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner"); |
| moveIntermediateToDoneThread.start(); |
| |
| //Start historyCleaner |
| boolean startCleanerService = conf.getBoolean( |
| JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true); |
| if (startCleanerService) { |
| long maxAgeOfHistoryFiles = conf.getLong( |
| JHAdminConfig.MR_HISTORY_MAX_AGE_MS, DEFAULT_HISTORY_MAX_AGE); |
| cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1, |
| new ThreadFactoryBuilder().setNameFormat("LogCleaner").build() |
| ); |
| long runInterval = conf.getLong( |
| JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, DEFAULT_RUN_INTERVAL); |
| cleanerScheduledExecutor |
| .scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles), |
| 30 * 1000l, runInterval, TimeUnit.MILLISECONDS); |
| } |
| super.start(); |
| } |
| |
| @Override |
| public void stop() { |
| LOG.info("Stopping JobHistory"); |
| if (moveIntermediateToDoneThread != null) { |
| LOG.info("Stopping move thread"); |
| moveIntermediateToDoneRunnable.stop(); |
| moveIntermediateToDoneThread.interrupt(); |
| try { |
| LOG.info("Joining on move thread"); |
| moveIntermediateToDoneThread.join(); |
| } catch (InterruptedException e) { |
| LOG.info("Interrupted while stopping move thread"); |
| } |
| } |
| |
| if (cleanerScheduledExecutor != null) { |
| LOG.info("Stopping History Cleaner"); |
| cleanerScheduledExecutor.shutdown(); |
| boolean interrupted = false; |
| long currentTime = System.currentTimeMillis(); |
| while (!cleanerScheduledExecutor.isShutdown() |
| && System.currentTimeMillis() > currentTime + 1000l && !interrupted) { |
| try { |
| Thread.sleep(20); |
| } catch (InterruptedException e) { |
| interrupted = true; |
| } |
| } |
| if (!cleanerScheduledExecutor.isShutdown()) { |
| LOG.warn("HistoryCleanerService shutdown may not have succeeded"); |
| } |
| } |
| super.stop(); |
| } |
| |
| public JobHistory() { |
| super(JobHistory.class.getName()); |
| } |
| |
| /** |
| * Populates index data structures. |
| * Should only be called at initialization times. |
| */ |
| @SuppressWarnings("unchecked") |
| private void initExisting() throws IOException { |
| LOG.info("Initializing Existing Jobs..."); |
| List<FileStatus> timestampedDirList = findTimestampedDirectories(); |
| Collections.sort(timestampedDirList); |
| for (FileStatus fs : timestampedDirList) { |
| //TODO Could verify the correct format for these directories. |
| addDirectoryToSerialNumberIndex(fs.getPath()); |
| addDirectoryToJobListCache(fs.getPath()); |
| } |
| } |
| |
| private void removeDirectoryFromSerialNumberIndex(Path serialDirPath) { |
| String serialPart = serialDirPath.getName(); |
| String timeStampPart = |
| JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString()); |
| if (timeStampPart == null) { |
| LOG.warn("Could not find timestamp portion from path: " + |
| serialDirPath.toString() +". Continuing with next"); |
| return; |
| } |
| if (serialPart == null) { |
| LOG.warn("Could not find serial portion from path: " + |
| serialDirPath.toString() + ". Continuing with next"); |
| return; |
| } |
| if (idToDateString.containsKey(serialPart)) { |
| Set<String> set = idToDateString.get(serialPart); |
| set.remove(timeStampPart); |
| if (set.isEmpty()) { |
| idToDateString.remove(serialPart); |
| } |
| } |
| |
| } |
| |
| private void addDirectoryToSerialNumberIndex(Path serialDirPath) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Adding "+serialDirPath+" to serial index"); |
| } |
| String serialPart = serialDirPath.getName(); |
| String timestampPart = |
| JobHistoryUtils.getTimestampPartFromPath(serialDirPath.toString()); |
| if (timestampPart == null) { |
| LOG.warn("Could not find timestamp portion from path: " + |
| serialDirPath.toString() +". Continuing with next"); |
| return; |
| } |
| if (serialPart == null) { |
| LOG.warn("Could not find serial portion from path: " + |
| serialDirPath.toString() + ". Continuing with next"); |
| } |
| addToSerialNumberIndex(serialPart, timestampPart); |
| } |
| |
| private void addToSerialNumberIndex(String serialPart, String timestampPart) { |
| if (!idToDateString.containsKey(serialPart)) { |
| idToDateString.put(serialPart, new HashSet<String>()); |
| if (idToDateString.size() > dateStringCacheSize) { |
| idToDateString.remove(idToDateString.firstKey()); |
| } |
| Set<String> datePartSet = idToDateString.get(serialPart); |
| datePartSet.add(timestampPart); |
| } |
| } |
| |
| private void addDirectoryToJobListCache(Path path) throws IOException { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Adding "+path+" to job list cache."); |
| } |
| List<FileStatus> historyFileList = scanDirectoryForHistoryFiles(path, |
| doneDirFc); |
| for (FileStatus fs : historyFileList) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Adding in history for "+fs.getPath()); |
| } |
| JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() |
| .getName()); |
| String confFileName = JobHistoryUtils |
| .getIntermediateConfFileName(jobIndexInfo.getJobId()); |
| String summaryFileName = JobHistoryUtils |
| .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); |
| MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() |
| .getParent(), confFileName), new Path(fs.getPath().getParent(), |
| summaryFileName), jobIndexInfo); |
| addToJobListCache(jobIndexInfo.getJobId(), metaInfo); |
| } |
| } |
| |
| private static List<FileStatus> scanDirectory(Path path, FileContext fc, |
| PathFilter pathFilter) throws IOException { |
| path = fc.makeQualified(path); |
| List<FileStatus> jhStatusList = new ArrayList<FileStatus>(); |
| RemoteIterator<FileStatus> fileStatusIter = fc.listStatus(path); |
| while (fileStatusIter.hasNext()) { |
| FileStatus fileStatus = fileStatusIter.next(); |
| Path filePath = fileStatus.getPath(); |
| if (fileStatus.isFile() && pathFilter.accept(filePath)) { |
| jhStatusList.add(fileStatus); |
| } |
| } |
| return jhStatusList; |
| } |
| |
| private static List<FileStatus> scanDirectoryForHistoryFiles(Path path, |
| FileContext fc) throws IOException { |
| return scanDirectory(path, fc, JobHistoryUtils.getHistoryFileFilter()); |
| } |
| |
| /** |
| * Finds all history directories with a timestamp component by scanning |
| * the filesystem. |
| * Used when the JobHistory server is started. |
| * @return |
| */ |
| private List<FileStatus> findTimestampedDirectories() throws IOException { |
| List<FileStatus> fsList = JobHistoryUtils.localGlobber(doneDirFc, |
| doneDirPrefixPath, DONE_BEFORE_SERIAL_TAIL); |
| return fsList; |
| } |
| |
| /** |
| * Adds an entry to the job list cache. Maintains the size. |
| */ |
| private void addToJobListCache(JobId jobId, MetaInfo metaInfo) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Adding "+jobId+" to job list cache with " |
| +metaInfo.getJobIndexInfo()); |
| } |
| jobListCache.put(jobId, metaInfo); |
| if (jobListCache.size() > jobListCacheSize) { |
| jobListCache.remove(jobListCache.firstKey()); |
| } |
| } |
| |
| /** |
| * Adds an entry to the loaded job cache. Maintains the size. |
| */ |
| private void addToLoadedJobCache(Job job) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Adding "+job.getID()+" to loaded job cache"); |
| } |
| loadedJobCache.put(job.getID(), job); |
| if (loadedJobCache.size() > loadedJobCacheSize ) { |
| loadedJobCache.remove(loadedJobCache.firstKey()); |
| } |
| } |
| |
| |
| /** |
| * Scans the intermediate directory to find user directories. Scans these |
| * for history files if the modification time for the directory has changed. |
| * @throws IOException |
| */ |
| private void scanIntermediateDirectory() throws IOException { |
| List<FileStatus> userDirList = |
| JobHistoryUtils.localGlobber(intermediateDoneDirFc, intermediateDoneDirPath, ""); |
| |
| for (FileStatus userDir : userDirList) { |
| String name = userDir.getPath().getName(); |
| long newModificationTime = userDir.getModificationTime(); |
| boolean shouldScan = false; |
| synchronized (userDirModificationTimeMap) { |
| if (!userDirModificationTimeMap.containsKey(name) || newModificationTime |
| > userDirModificationTimeMap.get(name)) { |
| shouldScan = true; |
| userDirModificationTimeMap.put(name, newModificationTime); |
| } |
| } |
| if (shouldScan) { |
| scanIntermediateDirectory(userDir.getPath()); |
| } |
| } |
| } |
| |
| /** |
| * Scans the specified path and populates the intermediate cache. |
| * @param absPath |
| * @throws IOException |
| */ |
| private void scanIntermediateDirectory(final Path absPath) |
| throws IOException { |
| List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(absPath, |
| intermediateDoneDirFc); |
| for (FileStatus fs : fileStatusList) { |
| JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() |
| .getName()); |
| String confFileName = JobHistoryUtils |
| .getIntermediateConfFileName(jobIndexInfo.getJobId()); |
| String summaryFileName = JobHistoryUtils |
| .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); |
| MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() |
| .getParent(), confFileName), new Path(fs.getPath().getParent(), |
| summaryFileName), jobIndexInfo); |
| if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) { |
| intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo); |
| } |
| } |
| } |
| |
| /** |
| * Searches the job history file FileStatus list for the specified JobId. |
| * |
| * @param fileStatusList fileStatus list of Job History Files. |
| * @param jobId The JobId to find. |
| * @param checkForDoneFile whether to check for the existance of a done file. |
| * @return A MetaInfo object for the jobId, null if not found. |
| * @throws IOException |
| */ |
| private MetaInfo getJobMetaInfo(List<FileStatus> fileStatusList, JobId jobId) |
| throws IOException { |
| for (FileStatus fs : fileStatusList) { |
| JobIndexInfo jobIndexInfo = |
| FileNameIndexUtils.getIndexInfo(fs.getPath().getName()); |
| if (jobIndexInfo.getJobId().equals(jobId)) { |
| String confFileName = JobHistoryUtils |
| .getIntermediateConfFileName(jobIndexInfo.getJobId()); |
| String summaryFileName = JobHistoryUtils |
| .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); |
| MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() |
| .getParent(), confFileName), new Path(fs.getPath().getParent(), |
| summaryFileName), jobIndexInfo); |
| return metaInfo; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Scans old directories known by the idToDateString map for the specified |
| * jobId. |
| * If the number of directories is higher than the supported size of the |
| * idToDateString cache, the jobId will not be found. |
| * @param jobId the jobId. |
| * @return |
| * @throws IOException |
| */ |
| private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException { |
| int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId); |
| String boxedSerialNumber = String.valueOf(jobSerialNumber); |
| Set<String> dateStringSet = idToDateString.get(boxedSerialNumber); |
| if (dateStringSet == null) { |
| return null; |
| } |
| for (String timestampPart : dateStringSet) { |
| Path logDir = canonicalHistoryLogPath(jobId, timestampPart); |
| List<FileStatus> fileStatusList = scanDirectoryForHistoryFiles(logDir, |
| doneDirFc); |
| MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId); |
| if (metaInfo != null) { |
| return metaInfo; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Checks for the existence of the job history file in the intermediate |
| * directory. |
| * @param jobId |
| * @return |
| * @throws IOException |
| */ |
| private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException { |
| scanIntermediateDirectory(); |
| return intermediateListCache.get(jobId); |
| } |
| |
| @Override |
| public String getApplicationName() { |
| return "Job History Server"; |
| } |
| |
| private class MoveIntermediateToDoneRunnable implements Runnable { |
| |
| private long sleepTime; |
| private ThreadPoolExecutor moveToDoneExecutor = null; |
| private boolean running = false; |
| |
| public void stop() { |
| running = false; |
| } |
| |
| MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) { |
| this.sleepTime = sleepTime; |
| ThreadFactory tf = new ThreadFactoryBuilder() |
| .setNameFormat("MoveIntermediateToDone Thread #%d") |
| .build(); |
| moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1, |
| TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf); |
| running = true; |
| } |
| |
| @Override |
| public void run() { |
| Thread.currentThread().setName("IntermediateHistoryScanner"); |
| try { |
| while (running) { |
| LOG.info("Starting scan to move intermediate done files"); |
| scanIntermediateDirectory(); |
| for (final MetaInfo metaInfo : intermediateListCache.values()) { |
| moveToDoneExecutor.execute(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| moveToDone(metaInfo); |
| } catch (IOException e) { |
| LOG.info("Failed to process metaInfo for job: " + |
| metaInfo.jobIndexInfo.getJobId(), e); |
| } |
| } |
| }); |
| |
| } |
| synchronized (this) { // TODO Is this really required. |
| try { |
| this.wait(sleepTime); |
| } catch (InterruptedException e) { |
| LOG.info("IntermediateHistoryScannerThread interrupted"); |
| } |
| } |
| } |
| } catch (IOException e) { |
| LOG.warn("Unable to get a list of intermediate files to be moved from: " |
| + intermediateDoneDirPath); |
| } |
| } |
| } |
| |
| private Job loadJob(MetaInfo metaInfo) { |
| synchronized(metaInfo) { |
| try { |
| Job job = new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(), |
| metaInfo.getHistoryFile(), true, metaInfo.getJobIndexInfo().getUser(), |
| metaInfo.getConfFile(), this.aclsMgr); |
| addToLoadedJobCache(job); |
| return job; |
| } catch (IOException e) { |
| throw new YarnException("Could not find/load job: " + |
| metaInfo.getJobIndexInfo().getJobId(), e); |
| } |
| } |
| } |
| |
| private Map<JobId, Job> getAllJobsInternal() { |
| //TODO This should ideally be using getAllJobsMetaInfo |
| // or get rid of that method once Job has APIs for user, finishTime etc. |
| SortedMap<JobId, Job> result = new TreeMap<JobId, Job>(); |
| try { |
| scanIntermediateDirectory(); |
| } catch (IOException e) { |
| LOG.warn("Failed to scan intermediate directory", e); |
| throw new YarnException(e); |
| } |
| for (JobId jobId : intermediateListCache.keySet()) { |
| MetaInfo mi = intermediateListCache.get(jobId); |
| if (mi != null) { |
| result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi |
| .getJobIndexInfo().getJobId())); |
| } |
| } |
| for (JobId jobId : jobListCache.keySet()) { |
| MetaInfo mi = jobListCache.get(jobId); |
| if (mi != null) { |
| result.put(jobId, new PartialJob(mi.getJobIndexInfo(), mi |
| .getJobIndexInfo().getJobId())); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Helper method for test cases. |
| */ |
| MetaInfo getJobMetaInfo(JobId jobId) throws IOException { |
| //MetaInfo available in cache. |
| MetaInfo metaInfo = null; |
| if (jobListCache.containsKey(jobId)) { |
| metaInfo = jobListCache.get(jobId); |
| } |
| |
| if (metaInfo != null) { |
| return metaInfo; |
| } |
| |
| //MetaInfo not available. Check intermediate directory for meta info. |
| metaInfo = scanIntermediateForJob(jobId); |
| if (metaInfo != null) { |
| return metaInfo; |
| } |
| |
| //Intermediate directory does not contain job. Search through older ones. |
| metaInfo = scanOldDirsForJob(jobId); |
| if (metaInfo != null) { |
| return metaInfo; |
| } |
| return null; |
| } |
| |
| private Job findJob(JobId jobId) throws IOException { |
| //Job already loaded. |
| if (loadedJobCache.containsKey(jobId)) { |
| return loadedJobCache.get(jobId); |
| } |
| |
| //MetaInfo available in cache. |
| MetaInfo metaInfo = null; |
| if (jobListCache.containsKey(jobId)) { |
| metaInfo = jobListCache.get(jobId); |
| } |
| |
| if (metaInfo != null) { |
| return loadJob(metaInfo); |
| } |
| |
| //MetaInfo not available. Check intermediate directory for meta info. |
| metaInfo = scanIntermediateForJob(jobId); |
| if (metaInfo != null) { |
| return loadJob(metaInfo); |
| } |
| |
| //Intermediate directory does not contain job. Search through older ones. |
| metaInfo = scanOldDirsForJob(jobId); |
| if (metaInfo != null) { |
| return loadJob(metaInfo); |
| } |
| return null; |
| } |
| |
| private void moveToDone(MetaInfo metaInfo) throws IOException { |
| long completeTime = metaInfo.getJobIndexInfo().getFinishTime(); |
| if (completeTime == 0) completeTime = System.currentTimeMillis(); |
| JobId jobId = metaInfo.getJobIndexInfo().getJobId(); |
| |
| List<Path> paths = new ArrayList<Path>(); |
| Path historyFile = metaInfo.getHistoryFile(); |
| if (historyFile == null) { |
| LOG.info("No file for job-history with " + jobId + " found in cache!"); |
| } else { |
| paths.add(historyFile); |
| } |
| |
| Path confFile = metaInfo.getConfFile(); |
| if (confFile == null) { |
| LOG.info("No file for jobConf with " + jobId + " found in cache!"); |
| } else { |
| paths.add(confFile); |
| } |
| |
| //TODO Check all mi getters and setters for the conf path |
| Path summaryFile = metaInfo.getSummaryFile(); |
| if (summaryFile == null) { |
| LOG.info("No summary file for job: " + jobId); |
| } else { |
| try { |
| String jobSummaryString = getJobSummary(intermediateDoneDirFc, summaryFile); |
| SUMMARY_LOG.info(jobSummaryString); |
| LOG.info("Deleting JobSummary file: [" + summaryFile + "]"); |
| intermediateDoneDirFc.delete(summaryFile, false); |
| metaInfo.setSummaryFile(null); |
| } catch (IOException e) { |
| LOG.warn("Failed to process summary file: [" + summaryFile + "]"); |
| throw e; |
| } |
| } |
| |
| Path targetDir = canonicalHistoryLogPath(jobId, completeTime); |
| addDirectoryToSerialNumberIndex(targetDir); |
| try { |
| maybeMakeSubdirectory(targetDir); |
| } catch (IOException e) { |
| LOG.warn("Failed creating subdirectory: " + targetDir + |
| " while attempting to move files for jobId: " + jobId); |
| throw e; |
| } |
| synchronized (metaInfo) { |
| if (historyFile != null) { |
| Path toPath = doneDirFc.makeQualified(new Path(targetDir, |
| historyFile.getName())); |
| try { |
| moveToDoneNow(historyFile, toPath); |
| } catch (IOException e) { |
| LOG.warn("Failed to move file: " + historyFile + " for jobId: " |
| + jobId); |
| throw e; |
| } |
| metaInfo.setHistoryFile(toPath); |
| } |
| if (confFile != null) { |
| Path toPath = doneDirFc.makeQualified(new Path(targetDir, |
| confFile.getName())); |
| try { |
| moveToDoneNow(confFile, toPath); |
| } catch (IOException e) { |
| LOG.warn("Failed to move file: " + historyFile + " for jobId: " |
| + jobId); |
| throw e; |
| } |
| metaInfo.setConfFile(toPath); |
| } |
| } |
| addToJobListCache(jobId, metaInfo); |
| intermediateListCache.remove(jobId); |
| } |
| |
| private void moveToDoneNow(final Path src, final Path target) |
| throws IOException { |
| LOG.info("Moving " + src.toString() + " to " + target.toString()); |
| intermediateDoneDirFc.rename(src, target, Options.Rename.NONE); |
| // fc.util().copy(src, target); |
| //fc.delete(src, false); |
| //intermediateDoneDirFc.setPermission(target, new FsPermission( |
| //JobHistoryUtils.HISTORY_DONE_FILE_PERMISSION)); |
| } |
| |
| String getJobSummary(FileContext fc, Path path) throws IOException { |
| Path qPath = fc.makeQualified(path); |
| FSDataInputStream in = fc.open(qPath); |
| String jobSummaryString = in.readUTF(); |
| in.close(); |
| return jobSummaryString; |
| } |
| |
| private void maybeMakeSubdirectory(Path path) throws IOException { |
| boolean existsInExistingCache = false; |
| synchronized(existingDoneSubdirs) { |
| if (existingDoneSubdirs.contains(path)) existsInExistingCache = true; |
| } |
| try { |
| doneDirFc.getFileStatus(path); |
| if (!existsInExistingCache) { |
| existingDoneSubdirs.add(path); |
| if (debugMode) { |
| LOG.info("JobHistory.maybeMakeSubdirectory -- We believed " |
| + path + " already existed, but it didn't."); |
| } |
| } |
| } catch (FileNotFoundException fnfE) { |
| try { |
| FsPermission fsp = |
| new FsPermission(JobHistoryUtils.HISTORY_DONE_DIR_PERMISSION); |
| doneDirFc.mkdir(path, fsp, true); |
| FileStatus fsStatus = doneDirFc.getFileStatus(path); |
| LOG.info("Perms after creating " + fsStatus.getPermission().toShort() |
| + ", Expected: " + fsp.toShort()); |
| if (fsStatus.getPermission().toShort() != fsp.toShort()) { |
| LOG.info("Explicitly setting permissions to : " + fsp.toShort() |
| + ", " + fsp); |
| doneDirFc.setPermission(path, fsp); |
| } |
| synchronized(existingDoneSubdirs) { |
| existingDoneSubdirs.add(path); |
| } |
| } catch (FileAlreadyExistsException faeE) { //Nothing to do. |
| } |
| } |
| } |
| |
| private Path canonicalHistoryLogPath(JobId id, String timestampComponent) { |
| return new Path(doneDirPrefixPath, |
| JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat)); |
| } |
| |
| private Path canonicalHistoryLogPath(JobId id, long millisecondTime) { |
| String timestampComponent = |
| JobHistoryUtils.timestampDirectoryComponent(millisecondTime, debugMode); |
| return new Path(doneDirPrefixPath, |
| JobHistoryUtils.historyLogSubdirectory(id, timestampComponent, serialNumberFormat)); |
| } |
| |
| |
| @Override |
| public synchronized Job getJob(JobId jobId) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Looking for Job "+jobId); |
| } |
| Job job = null; |
| try { |
| job = findJob(jobId); |
| //This could return a null job. |
| } catch (IOException e) { |
| throw new YarnException(e); |
| } |
| return job; |
| } |
| |
| @Override |
| public Map<JobId, Job> getAllJobs(ApplicationId appID) { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("Called getAllJobs(AppId): " + appID); |
| } |
| // currently there is 1 to 1 mapping between app and job id |
| org.apache.hadoop.mapreduce.JobID oldJobID = TypeConverter.fromYarn(appID); |
| Map<JobId, Job> jobs = new HashMap<JobId, Job>(); |
| JobId jobID = TypeConverter.toYarn(oldJobID); |
| jobs.put(jobID, getJob(jobID)); |
| return jobs; |
| // return getAllJobs(); |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.hadoop.mapreduce.v2.hs.HistoryContext#getAllJobs() |
| * |
| * Returns a recent list of jobs. This may not be the complete set. |
| * If a previous jobId is known - it can be queries via the getJob(JobId) |
| * method. |
| * Size of this list is determined by the size of the job list cache. |
| * This can be fixed when pagination is implemented - return the first set of |
| * jobs via the cache, go to DFS only when an attempt is made to navigate |
| * past the cached list. |
| * This does involve a DFS oepration of scanning the intermediate directory. |
| */ |
| public Map<JobId, Job> getAllJobs() { |
| LOG.debug("Called getAllJobs()"); |
| return getAllJobsInternal(); |
| } |
| |
| static class MetaInfo { |
| private Path historyFile; |
| private Path confFile; |
| private Path summaryFile; |
| JobIndexInfo jobIndexInfo; |
| |
| MetaInfo(Path historyFile, Path confFile, Path summaryFile, |
| JobIndexInfo jobIndexInfo) { |
| this.historyFile = historyFile; |
| this.confFile = confFile; |
| this.summaryFile = summaryFile; |
| this.jobIndexInfo = jobIndexInfo; |
| } |
| |
| Path getHistoryFile() { return historyFile; } |
| Path getConfFile() { return confFile; } |
| Path getSummaryFile() { return summaryFile; } |
| JobIndexInfo getJobIndexInfo() { return jobIndexInfo; } |
| |
| void setHistoryFile(Path historyFile) { this.historyFile = historyFile; } |
| void setConfFile(Path confFile) {this.confFile = confFile; } |
| void setSummaryFile(Path summaryFile) { this.summaryFile = summaryFile; } |
| } |
| |
| |
| public class HistoryCleaner implements Runnable { |
| private long currentTime; |
| |
| long maxAgeMillis; |
| long filesDeleted = 0; |
| long dirsDeleted = 0; |
| |
| public HistoryCleaner(long maxAge) { |
| this.maxAgeMillis = maxAge; |
| } |
| |
| @SuppressWarnings("unchecked") |
| public void run() { |
| LOG.info("History Cleaner started"); |
| currentTime = System.currentTimeMillis(); |
| boolean halted = false; |
| //TODO Delete YYYY/MM/DD directories. |
| try { |
| List<FileStatus> serialDirList = findTimestampedDirectories(); |
| //Sort in ascending order. Relies on YYYY/MM/DD/Serial |
| Collections.sort(serialDirList); |
| for (FileStatus serialDir : serialDirList) { |
| List<FileStatus> historyFileList = |
| scanDirectoryForHistoryFiles(serialDir.getPath(), doneDirFc); |
| for (FileStatus historyFile : historyFileList) { |
| JobIndexInfo jobIndexInfo = |
| FileNameIndexUtils.getIndexInfo(historyFile.getPath().getName()); |
| long effectiveTimestamp = |
| getEffectiveTimestamp(jobIndexInfo.getFinishTime(), historyFile); |
| if (shouldDelete(effectiveTimestamp)) { |
| String confFileName = |
| JobHistoryUtils.getIntermediateConfFileName(jobIndexInfo.getJobId()); |
| MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), |
| new Path(historyFile.getPath().getParent(), confFileName), |
| null, jobIndexInfo); |
| delete(metaInfo); |
| } else { |
| halted = true; |
| break; |
| } |
| } |
| if (!halted) { |
| deleteDir(serialDir.getPath()); |
| removeDirectoryFromSerialNumberIndex(serialDir.getPath()); |
| synchronized (existingDoneSubdirs) { |
| existingDoneSubdirs.remove(serialDir.getPath()); |
| } |
| |
| } else { |
| break; //Don't scan any more directories. |
| } |
| } |
| } catch (IOException e) { |
| LOG.warn("Error in History cleaner run", e); |
| } |
| LOG.info("History Cleaner complete"); |
| LOG.info("FilesDeleted: " + filesDeleted); |
| LOG.info("Directories Deleted: " + dirsDeleted); |
| } |
| |
| private boolean shouldDelete(long ts) { |
| return ((ts + maxAgeMillis) <= currentTime); |
| } |
| |
| private long getEffectiveTimestamp(long finishTime, FileStatus fileStatus) { |
| if (finishTime == 0) { |
| return fileStatus.getModificationTime(); |
| } |
| return finishTime; |
| } |
| |
| private void delete(MetaInfo metaInfo) throws IOException { |
| deleteFile(metaInfo.getHistoryFile()); |
| deleteFile(metaInfo.getConfFile()); |
| jobListCache.remove(metaInfo.getJobIndexInfo().getJobId()); |
| loadedJobCache.remove(metaInfo.getJobIndexInfo().getJobId()); |
| } |
| |
| private void deleteFile(final Path path) throws IOException { |
| doneDirFc.delete(doneDirFc.makeQualified(path), false); |
| filesDeleted++; |
| } |
| |
| private void deleteDir(Path path) throws IOException { |
| doneDirFc.delete(doneDirFc.makeQualified(path), true); |
| dirsDeleted++; |
| } |
| } |
| |
| |
| |
| //TODO AppContext - Not Required |
| private ApplicationAttemptId appAttemptID; |
| @Override |
| public ApplicationAttemptId getApplicationAttemptId() { |
| //TODO fixme - bogus appAttemptID for now |
| return appAttemptID; |
| } |
| |
| //TODO AppContext - Not Required |
| private ApplicationId appID; |
| @Override |
| public ApplicationId getApplicationID() { |
| //TODO fixme - bogus appID for now |
| return appID; |
| } |
| |
| //TODO AppContext - Not Required |
| @Override |
| public EventHandler getEventHandler() { |
| // TODO Auto-generated method stub |
| return null; |
| } |
| |
| //TODO AppContext - Not Required |
| private String userName; |
| @Override |
| public CharSequence getUser() { |
| if (userName != null) { |
| userName = conf.get(MRJobConfig.USER_NAME, "history-user"); |
| } |
| return userName; |
| } |
| |
| //TODO AppContext - Not Required |
| @Override |
| public Clock getClock() { |
| return null; |
| } |
| } |