| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.mapreduce.jobhistory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.regex.Pattern; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.fs.ChecksumException; |
| import org.apache.hadoop.fs.CreateFlag; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapred.JobTracker; |
| import org.apache.hadoop.mapreduce.JobID; |
| import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.StringUtils; |
| |
| /** |
| * JobHistory is the class that is responsible for creating and maintaining |
| * job history information. |
| * |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public class JobHistory { |
| |
| final Log LOG = LogFactory.getLog(JobHistory.class); |
| |
| private long jobHistoryBlockSize; |
| private final Map<JobID, MetaInfo> fileMap = |
| Collections.<JobID,MetaInfo>synchronizedMap(new HashMap<JobID,MetaInfo>()); |
| private ThreadPoolExecutor executor = null; |
| static final FsPermission HISTORY_DIR_PERMISSION = |
| FsPermission.createImmutable((short) 0750); // rwxr-x--- |
| |
| public static final FsPermission HISTORY_FILE_PERMISSION = |
| FsPermission.createImmutable((short) 0740); // rwxr----- |
| |
| private JobTracker jobTracker; |
| |
| static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L; //week |
| private FileSystem logDirFs; // log Dir FS |
| private FileSystem doneDirFs; // done Dir FS |
| |
| private Path logDir = null; |
| private Path done = null; // folder for completed jobs |
| |
| public static final String OLD_SUFFIX = ".old"; |
| |
| // Version string that will prefix all History Files |
| public static final String HISTORY_VERSION = "1.0"; |
| |
| private HistoryCleaner historyCleanerThread = null; |
| |
| private Map<JobID, MovedFileInfo> jobHistoryFileMap = |
| Collections.<JobID,MovedFileInfo>synchronizedMap( |
| new LinkedHashMap<JobID, MovedFileInfo>()); |
| |
| // JobHistory filename regex |
| public static final Pattern JOBHISTORY_FILENAME_REGEX = |
| Pattern.compile("(" + JobID.JOBID_REGEX + ")_.+"); |
| // JobHistory conf-filename regex |
| public static final Pattern CONF_FILENAME_REGEX = |
| Pattern.compile("(" + JobID.JOBID_REGEX + ")_conf.xml(?:\\.[0-9]+\\.old)?"); |
| |
| private static class MovedFileInfo { |
| private final String historyFile; |
| private final long timestamp; |
| public MovedFileInfo(String historyFile, long timestamp) { |
| this.historyFile = historyFile; |
| this.timestamp = timestamp; |
| } |
| } |
| /** |
| * Initialize Job History Module |
| * @param jt Job Tracker handle |
| * @param conf Configuration |
| * @param hostname Host name of JT |
| * @param jobTrackerStartTime Start time of JT |
| * @throws IOException |
| */ |
| public void init(JobTracker jt, JobConf conf, String hostname, |
| long jobTrackerStartTime) throws IOException { |
| |
| // Get and create the log folder |
| final String logDirLoc = conf.get(JTConfig.JT_JOBHISTORY_LOCATION , |
| "file:///" + |
| new File(System.getProperty("hadoop.log.dir")).getAbsolutePath() |
| + File.separator + "history"); |
| |
| LOG.info("History log directory is " + logDirLoc); |
| |
| logDir = new Path(logDirLoc); |
| logDirFs = logDir.getFileSystem(conf); |
| |
| if (!logDirFs.exists(logDir)){ |
| if (!logDirFs.mkdirs(logDir, |
| new FsPermission(HISTORY_DIR_PERMISSION))) { |
| throw new IOException("Mkdirs failed to create " + |
| logDir.toString()); |
| } |
| } |
| conf.set(JTConfig.JT_JOBHISTORY_LOCATION, logDirLoc); |
| |
| jobHistoryBlockSize = |
| conf.getLong(JTConfig.JT_JOBHISTORY_BLOCK_SIZE, |
| 3 * 1024 * 1024); |
| |
| jobTracker = jt; |
| } |
| |
| /** Initialize the done directory and start the history cleaner thread */ |
| public void initDone(JobConf conf, FileSystem fs) throws IOException { |
| //if completed job history location is set, use that |
| String doneLocation = |
| conf.get(JTConfig.JT_JOBHISTORY_COMPLETED_LOCATION); |
| if (doneLocation != null) { |
| Path donePath = new Path(doneLocation); |
| doneDirFs = donePath.getFileSystem(conf); |
| done = doneDirFs.makeQualified(donePath); |
| } else { |
| done = logDirFs.makeQualified(new Path(logDir, "done")); |
| doneDirFs = logDirFs; |
| } |
| |
| //If not already present create the done folder with appropriate |
| //permission |
| if (!doneDirFs.exists(done)) { |
| LOG.info("Creating DONE folder at "+ done); |
| if (! doneDirFs.mkdirs(done, |
| new FsPermission(HISTORY_DIR_PERMISSION))) { |
| throw new IOException("Mkdirs failed to create " + done.toString()); |
| } |
| } |
| LOG.info("Inited the done directory to " + done.toString()); |
| |
| moveOldFiles(); |
| startFileMoverThreads(); |
| |
| // Start the History Cleaner Thread |
| long maxAgeOfHistoryFiles = conf.getLong( |
| JTConfig.JT_JOBHISTORY_MAXAGE, DEFAULT_HISTORY_MAX_AGE); |
| historyCleanerThread = new HistoryCleaner(maxAgeOfHistoryFiles); |
| historyCleanerThread.start(); |
| } |
| |
| /** |
| * Move the completed job into the completed folder. |
| * This assumes that the job history file is closed and |
| * all operations on the job history file is complete. |
| * This *should* be the last call to job history for a given job. |
| */ |
| public void markCompleted(JobID id) throws IOException { |
| moveToDone(id); |
| } |
| |
| /** Shut down JobHistory after stopping the History cleaner */ |
| public void shutDown() { |
| LOG.info("Interrupting History Cleaner"); |
| historyCleanerThread.interrupt(); |
| try { |
| historyCleanerThread.join(); |
| } catch (InterruptedException e) { |
| LOG.info("Error with shutting down history thread"); |
| } |
| } |
| |
| /** |
| * Get the history location |
| */ |
| public Path getJobHistoryLocation() { |
| return logDir; |
| } |
| |
| /** |
| * Get the history location for completed jobs |
| */ |
| public Path getCompletedJobHistoryLocation() { |
| return done; |
| } |
| |
| /** |
| * Get the job history file path |
| */ |
| public static Path getJobHistoryFile(Path dir, JobID jobId, |
| String user) { |
| return new Path(dir, jobId.toString() + "_" + user); |
| } |
| |
| /** |
| * Get the JobID from the history file's name. See it's companion method |
| * {@link #getJobHistoryFile(Path, JobID, String)} for how history file's name |
| * is constructed from a given JobID and userName. |
| * |
| * @param jobHistoryFilePath |
| * @return jobID |
| */ |
| public static JobID getJobIDFromHistoryFilePath(Path jobHistoryFilePath) { |
| String[] jobDetails = jobHistoryFilePath.getName().split("_"); |
| String jobId = |
| jobDetails[0] + "_" + jobDetails[1] + "_" + jobDetails[2]; |
| return JobID.forName(jobId); |
| } |
| |
| /** |
| * Get the user name of the job-submitter from the history file's name. See |
| * it's companion method {@link #getJobHistoryFile(Path, JobID, String)} for |
| * how history file's name is constructed from a given JobID and username. |
| * |
| * @param jobHistoryFilePath |
| * @return the user-name |
| */ |
| public static String getUserFromHistoryFilePath(Path jobHistoryFilePath) { |
| String[] jobDetails = jobHistoryFilePath.getName().split("_"); |
| return jobDetails[3]; |
| } |
| |
| /** |
| * Given the job id, return the history file path from the cache |
| */ |
| public String getHistoryFilePath(JobID jobId) { |
| MovedFileInfo info = jobHistoryFileMap.get(jobId); |
| if (info == null) { |
| return null; |
| } |
| return info.historyFile; |
| } |
| /** |
| * Create an event writer for the Job represented by the jobID. |
| * This should be the first call to history for a job |
| * @param jobId |
| * @param jobConf |
| * @throws IOException |
| */ |
| public void setupEventWriter(JobID jobId, JobConf jobConf) |
| throws IOException { |
| Path logFile = getJobHistoryFile(logDir, jobId, getUserName(jobConf)); |
| |
| if (logDir == null) { |
| LOG.info("Log Directory is null, returning"); |
| throw new IOException("Missing Log Directory for History"); |
| } |
| |
| int defaultBufferSize = |
| logDirFs.getConf().getInt("io.file.buffer.size", 4096); |
| |
| LOG.info("SetupWriter, creating file " + logFile); |
| |
| FSDataOutputStream out = logDirFs.create(logFile, |
| new FsPermission(JobHistory.HISTORY_FILE_PERMISSION), |
| true, defaultBufferSize, |
| logDirFs.getDefaultReplication(), |
| jobHistoryBlockSize, null); |
| |
| EventWriter writer = new EventWriter(out); |
| |
| /* Storing the job conf on the log dir */ |
| |
| Path logDirConfPath = getConfFile(logDir, jobId); |
| LOG.info("LogDirConfPath is " + logDirConfPath); |
| |
| FSDataOutputStream jobFileOut = null; |
| try { |
| if (logDirConfPath != null) { |
| defaultBufferSize = |
| logDirFs.getConf().getInt("io.file.buffer.size", 4096); |
| if (!logDirFs.exists(logDirConfPath)) { |
| jobFileOut = logDirFs.create(logDirConfPath, |
| new FsPermission(JobHistory.HISTORY_FILE_PERMISSION), |
| true, defaultBufferSize, |
| logDirFs.getDefaultReplication(), |
| logDirFs.getDefaultBlockSize(), null); |
| jobConf.writeXml(jobFileOut); |
| jobFileOut.close(); |
| } |
| } |
| } catch (IOException e) { |
| LOG.info("Failed to close the job configuration file " |
| + StringUtils.stringifyException(e)); |
| } |
| |
| MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer); |
| fileMap.put(jobId, fi); |
| } |
| |
| /** Close the event writer for this id */ |
| public void closeWriter(JobID id) { |
| try { |
| final MetaInfo mi = fileMap.get(id); |
| if (mi != null) { |
| mi.closeWriter(); |
| } |
| } catch (IOException e) { |
| LOG.info("Error closing writer for JobID: " + id); |
| } |
| } |
| |
| /** |
| * Method to log the specified event |
| * @param event The event to log |
| * @param id The Job ID of the event |
| */ |
| public void logEvent(HistoryEvent event, JobID id) { |
| try { |
| final MetaInfo mi = fileMap.get(id); |
| if (mi != null) { |
| mi.writeEvent(event); |
| } |
| } catch (IOException e) { |
| LOG.error("Error Logging event, " + e.getMessage()); |
| } |
| } |
| |
| |
| private void moveToDoneNow(Path fromPath, Path toPath) throws IOException { |
| //check if path exists, in case of retries it may not exist |
| if (logDirFs.exists(fromPath)) { |
| LOG.info("Moving " + fromPath.toString() + " to " + |
| toPath.toString()); |
| FileUtil.copy(logDirFs, fromPath, doneDirFs, toPath, true, false, |
| jobTracker.getConf()); |
| doneDirFs.setPermission(toPath, |
| new FsPermission(JobHistory.HISTORY_FILE_PERMISSION)); |
| } |
| } |
| |
| private void startFileMoverThreads() { |
| executor = new ThreadPoolExecutor(1, 3, 1, |
| TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>()); |
| } |
| |
| /** |
| * Get the job conf file for the given jobId |
| * |
| * @param logDir |
| * @param jobId |
| * @return the jobconf.xml path |
| */ |
| public static Path getConfFile(Path logDir, JobID jobId) { |
| Path jobFilePath = null; |
| if (logDir != null) { |
| jobFilePath = new Path(logDir + File.separator + |
| jobId.toString() + "_conf.xml"); |
| } |
| return jobFilePath; |
| } |
| |
| /** |
| * Generates a suffix for old/stale jobhistory files |
| * Pattern : . + identifier + .old |
| */ |
| public static String getOldFileSuffix(String identifier) { |
| return "." + identifier + JobHistory.OLD_SUFFIX; |
| } |
| |
| private void moveOldFiles() throws IOException { |
| //move the log files remaining from last run to the DONE folder |
| //suffix the file name based on Job tracker identifier so that history |
| //files with same job id don't get over written in case of recovery. |
| FileStatus[] files = logDirFs.listStatus(logDir); |
| String fileSuffix = getOldFileSuffix(jobTracker.getTrackerIdentifier()); |
| for (FileStatus fileStatus : files) { |
| Path fromPath = fileStatus.getPath(); |
| if (fromPath.equals(done)) { //DONE can be a subfolder of log dir |
| continue; |
| } |
| LOG.info("Moving log file from last run: " + fromPath); |
| Path toPath = new Path(done, fromPath.getName() + fileSuffix); |
| try { |
| moveToDoneNow(fromPath, toPath); |
| } catch (ChecksumException e) { |
| // If there is an exception moving the file to done because of |
| // a checksum exception, just delete it |
| LOG.warn("Unable to move " + fromPath +", deleting it"); |
| try { |
| boolean b = logDirFs.delete(fromPath, false); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Deletion of corrupt file " + fromPath + " returned " + b); |
| } |
| } catch (IOException ioe) { |
| // Cannot delete either? Just log and carry on |
| LOG.warn("Unable to delete " + fromPath + "Exception: " + |
| ioe.getMessage()); |
| } |
| } catch (IOException e) { |
| // Exceptions other than checksum, just log and continue |
| LOG.warn("Error moving file " + fromPath + " to done folder." + |
| "Ignoring."); |
| } |
| } |
| } |
| |
| private void moveToDone(final JobID id) { |
| final List<Path> paths = new ArrayList<Path>(); |
| final MetaInfo metaInfo = fileMap.get(id); |
| if (metaInfo == null) { |
| LOG.info("No file for job-history with " + id + " found in cache!"); |
| return; |
| } |
| |
| final Path historyFile = metaInfo.getHistoryFile(); |
| if (historyFile == null) { |
| LOG.info("No file for job-history with " + id + " found in cache!"); |
| } else { |
| paths.add(historyFile); |
| } |
| |
| final Path confPath = metaInfo.getConfFile(); |
| if (confPath == null) { |
| LOG.info("No file for jobconf with " + id + " found in cache!"); |
| } else { |
| paths.add(confPath); |
| } |
| |
| executor.execute(new Runnable() { |
| |
| public void run() { |
| //move the files to DONE folder |
| try { |
| for (Path path : paths) { |
| moveToDoneNow(path, new Path(done, path.getName())); |
| } |
| } catch (Throwable e) { |
| LOG.error("Unable to move history file to DONE folder.", e); |
| } |
| String historyFileDonePath = null; |
| if (historyFile != null) { |
| historyFileDonePath = new Path(done, |
| historyFile.getName()).toString(); |
| } |
| jobHistoryFileMap.put(id, new MovedFileInfo(historyFileDonePath, |
| System.currentTimeMillis())); |
| jobTracker.retireJob(org.apache.hadoop.mapred.JobID.downgrade(id), |
| historyFileDonePath); |
| |
| //purge the job from the cache |
| fileMap.remove(id); |
| } |
| |
| }); |
| } |
| |
| private String getUserName(JobConf jobConf) { |
| String user = jobConf.getUser(); |
| if (user == null) { |
| user = ""; |
| } |
| return user; |
| } |
| |
| private static class MetaInfo { |
| private Path historyFile; |
| private Path confFile; |
| private EventWriter writer; |
| |
| MetaInfo(Path historyFile, Path conf, EventWriter writer) { |
| this.historyFile = historyFile; |
| this.confFile = conf; |
| this.writer = writer; |
| } |
| |
| Path getHistoryFile() { return historyFile; } |
| Path getConfFile() { return confFile; } |
| |
| synchronized void closeWriter() throws IOException { |
| if (writer != null) { |
| writer.close(); |
| } |
| writer = null; |
| } |
| |
| synchronized void writeEvent(HistoryEvent event) throws IOException { |
| if (writer != null) { |
| writer.write(event); |
| } |
| } |
| } |
| |
| /** |
| * Delete history files older than a specified time duration. |
| */ |
| class HistoryCleaner extends Thread { |
| static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L; |
| private long cleanupFrequency; |
| private long maxAgeOfHistoryFiles; |
| |
| public HistoryCleaner(long maxAge) { |
| setName("Thread for cleaning up History files"); |
| setDaemon(true); |
| this.maxAgeOfHistoryFiles = maxAge; |
| cleanupFrequency = Math.min(ONE_DAY_IN_MS, maxAgeOfHistoryFiles); |
| LOG.info("Job History Cleaner Thread started." + |
| " MaxAge is " + |
| maxAge + " ms(" + ((float)maxAge)/(ONE_DAY_IN_MS) + " days)," + |
| " Cleanup Frequency is " + |
| + cleanupFrequency + " ms (" + |
| ((float)cleanupFrequency)/ONE_DAY_IN_MS + " days)"); |
| } |
| |
| @Override |
| public void run(){ |
| |
| while (true) { |
| try { |
| doCleanup(); |
| Thread.sleep(cleanupFrequency); |
| } |
| catch (InterruptedException e) { |
| LOG.info("History Cleaner thread exiting"); |
| return; |
| } |
| catch (Throwable t) { |
| LOG.warn("History cleaner thread threw an exception", t); |
| } |
| } |
| } |
| |
| private void doCleanup() { |
| long now = System.currentTimeMillis(); |
| try { |
| FileStatus[] historyFiles = doneDirFs.listStatus(done); |
| if (historyFiles != null) { |
| for (FileStatus f : historyFiles) { |
| if (now - f.getModificationTime() > maxAgeOfHistoryFiles) { |
| doneDirFs.delete(f.getPath(), true); |
| LOG.info("Deleting old history file : " + f.getPath()); |
| } |
| } |
| } |
| //walking over the map to purge entries from jobHistoryFileMap |
| synchronized (jobHistoryFileMap) { |
| Iterator<Entry<JobID, MovedFileInfo>> it = |
| jobHistoryFileMap.entrySet().iterator(); |
| while (it.hasNext()) { |
| MovedFileInfo info = it.next().getValue(); |
| if (now - info.timestamp > maxAgeOfHistoryFiles) { |
| it.remove(); |
| } else { |
| //since entries are in sorted timestamp order, no more entries |
| //are required to be checked |
| break; |
| } |
| } |
| } |
| } catch (IOException ie) { |
| LOG.info("Error cleaning up history directory" + |
| StringUtils.stringifyException(ie)); |
| } |
| } |
| } |
| } |