blob: a42f9ebe320c78e44bb8cc99f4bf196e20055197 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.jobhistory;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Calendar;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
public class JobHistoryUtils {
/**
* Permissions for the history staging dir while JobInProgress.
*/
public static final FsPermission HISTORY_STAGING_DIR_PERMISSIONS =
FsPermission.createImmutable( (short) 0700);
/**
* Permissions for the user directory under the staging directory.
*/
public static final FsPermission HISTORY_STAGING_USER_DIR_PERMISSIONS =
FsPermission.createImmutable((short) 0700);
/**
* Permissions for the history done dir and derivatives.
*/
public static final FsPermission HISTORY_DONE_DIR_PERMISSION =
FsPermission.createImmutable((short) 0770);
public static final FsPermission HISTORY_DONE_FILE_PERMISSION =
FsPermission.createImmutable((short) 0770); // rwx------
/**
* Permissions for the intermediate done directory.
*/
public static final FsPermission HISTORY_INTERMEDIATE_DONE_DIR_PERMISSIONS =
FsPermission.createImmutable((short) 01777);
/**
* Permissions for the user directory under the intermediate done directory.
*/
public static final FsPermission HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS =
FsPermission.createImmutable((short) 0770);
public static final FsPermission HISTORY_INTERMEDIATE_FILE_PERMISSIONS =
FsPermission.createImmutable((short) 0770); // rwx------
/**
* Suffix for configuration files.
*/
public static final String CONF_FILE_NAME_SUFFIX = "_conf.xml";
/**
* Suffix for summary files.
*/
public static final String SUMMARY_FILE_NAME_SUFFIX = ".summary";
/**
* Job History File extension.
*/
public static final String JOB_HISTORY_FILE_EXTENSION = ".jhist";
public static final int VERSION = 4;
public static final int SERIAL_NUMBER_DIRECTORY_DIGITS = 6;
public static final String TIMESTAMP_DIR_REGEX = "\\d{4}" + "\\" + File.separator + "\\d{2}" + "\\" + File.separator + "\\d{2}";
public static final Pattern TIMESTAMP_DIR_PATTERN = Pattern.compile(TIMESTAMP_DIR_REGEX);
private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d";
private static final Splitter ADDR_SPLITTER = Splitter.on(':').trimResults();
private static final Joiner JOINER = Joiner.on("");
private static final PathFilter CONF_FILTER = new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().endsWith(CONF_FILE_NAME_SUFFIX);
}
};
private static final PathFilter JOB_HISTORY_FILE_FILTER = new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().endsWith(JOB_HISTORY_FILE_EXTENSION);
}
};
/**
* Checks whether the provided path string is a valid job history file.
* @param pathString the path to be checked.
* @return true is the path is a valid job history filename else return false
*/
public static boolean isValidJobHistoryFileName(String pathString) {
return pathString.endsWith(JOB_HISTORY_FILE_EXTENSION);
}
/**
* Returns the jobId from a job history file name.
* @param pathString the path string.
* @return the JobId
* @throws IOException if the filename format is invalid.
*/
public static JobID getJobIDFromHistoryFilePath(String pathString) throws IOException {
String [] parts = pathString.split(Path.SEPARATOR);
String fileNamePart = parts[parts.length -1];
JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fileNamePart);
return TypeConverter.fromYarn(jobIndexInfo.getJobId());
}
/**
* Gets a PathFilter which would match configuration files.
* @return the patch filter {@link PathFilter} for matching conf files.
*/
public static PathFilter getConfFileFilter() {
return CONF_FILTER;
}
/**
* Gets a PathFilter which would match job history file names.
* @return the path filter {@link PathFilter} matching job history files.
*/
public static PathFilter getHistoryFileFilter() {
return JOB_HISTORY_FILE_FILTER;
}
/**
* Gets the configured directory prefix for In Progress history files.
* @param conf
* @return A string representation of the prefix.
*/
public static String
getConfiguredHistoryStagingDirPrefix(Configuration conf)
throws IOException {
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path path = MRApps.getStagingAreaDir(conf, user);
String logDir = path.toString();
return logDir;
}
/**
* Gets the configured directory prefix for intermediate done history files.
* @param conf
* @return A string representation of the prefix.
*/
public static String getConfiguredHistoryIntermediateDoneDirPrefix(
Configuration conf) {
String doneDirPrefix = conf
.get(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR);
if (doneDirPrefix == null) {
doneDirPrefix = conf.get(MRJobConfig.MR_AM_STAGING_DIR)
+ "/history/done_intermediate";
}
return doneDirPrefix;
}
/**
* Gets the configured directory prefix for Done history files.
* @param conf the configuration object
* @return the done history directory
*/
public static String getConfiguredHistoryServerDoneDirPrefix(
Configuration conf) {
String doneDirPrefix = conf.get(JHAdminConfig.MR_HISTORY_DONE_DIR);
if (doneDirPrefix == null) {
doneDirPrefix = conf.get(MRJobConfig.MR_AM_STAGING_DIR)
+ "/history/done";
}
return doneDirPrefix;
}
/**
* Gets the user directory for intermediate done history files.
* @param conf the configuration object
* @return the intermediate done directory for jobhistory files.
*/
public static String getHistoryIntermediateDoneDirForUser(Configuration conf) throws IOException {
return getConfiguredHistoryIntermediateDoneDirPrefix(conf) + File.separator
+ UserGroupInformation.getCurrentUser().getShortUserName();
}
public static boolean shouldCreateNonUserDirectory(Configuration conf) {
// Returning true by default to allow non secure single node clusters to work
// without any configuration change.
return conf.getBoolean(MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR, true);
}
/**
* Get the job history file path for non Done history files.
*/
public static Path getStagingJobHistoryFile(Path dir, JobId jobId, int attempt) {
return getStagingJobHistoryFile(dir, TypeConverter.fromYarn(jobId).toString(), attempt);
}
/**
* Get the job history file path for non Done history files.
*/
public static Path getStagingJobHistoryFile(Path dir, String jobId, int attempt) {
return new Path(dir, jobId + "_" +
attempt + JOB_HISTORY_FILE_EXTENSION);
}
/**
* Get the done configuration file name for a job.
* @param jobId the jobId.
* @return the conf file name.
*/
public static String getIntermediateConfFileName(JobId jobId) {
return TypeConverter.fromYarn(jobId).toString() + CONF_FILE_NAME_SUFFIX;
}
/**
* Get the done summary file name for a job.
* @param jobId the jobId.
* @return the conf file name.
*/
public static String getIntermediateSummaryFileName(JobId jobId) {
return TypeConverter.fromYarn(jobId).toString() + SUMMARY_FILE_NAME_SUFFIX;
}
/**
* Gets the conf file path for jobs in progress.
*
* @param logDir the log directory prefix.
* @param jobId the jobId.
* @param attempt attempt number for this job.
* @return the conf file path for jobs in progress.
*/
public static Path getStagingConfFile(Path logDir, JobId jobId, int attempt) {
Path jobFilePath = null;
if (logDir != null) {
jobFilePath = new Path(logDir, TypeConverter.fromYarn(jobId).toString()
+ "_" + attempt + CONF_FILE_NAME_SUFFIX);
}
return jobFilePath;
}
/**
* Gets the serial number part of the path based on the jobId and serialNumber format.
* @param id
* @param serialNumberFormat
* @return the serial number part of the patch based on the jobId and serial number format.
*/
public static String serialNumberDirectoryComponent(JobId id, String serialNumberFormat) {
return String.format(serialNumberFormat,
Integer.valueOf(jobSerialNumber(id))).substring(0,
SERIAL_NUMBER_DIRECTORY_DIGITS);
}
/**Extracts the timstamp component from the path.
* @param path
* @return the timestamp component from the path
*/
public static String getTimestampPartFromPath(String path) {
Matcher matcher = TIMESTAMP_DIR_PATTERN.matcher(path);
if (matcher.find()) {
String matched = matcher.group();
String ret = matched.intern();
return ret;
} else {
return null;
}
}
/**
* Gets the history subdirectory based on the jobId, timestamp and serial number format.
* @param id
* @param timestampComponent
* @param serialNumberFormat
* @return the history sub directory based on the jobid, timestamp and serial number format
*/
public static String historyLogSubdirectory(JobId id, String timestampComponent, String serialNumberFormat) {
// String result = LOG_VERSION_STRING;
String result = "";
String serialNumberDirectory = serialNumberDirectoryComponent(id, serialNumberFormat);
result = result
+ timestampComponent
+ File.separator + serialNumberDirectory
+ File.separator;
return result;
}
/**
* Gets the timestamp component based on millisecond time.
* @param millisecondTime
* @param debugMode
* @return the timestamp component based on millisecond time
*/
public static String timestampDirectoryComponent(long millisecondTime, boolean debugMode) {
Calendar timestamp = Calendar.getInstance();
timestamp.setTimeInMillis(millisecondTime);
String dateString = null;
dateString = String.format(
TIMESTAMP_DIR_FORMAT,
timestamp.get(Calendar.YEAR),
// months are 0-based in Calendar, but people will expect January
// to be month #1.
timestamp.get(debugMode ? Calendar.HOUR : Calendar.MONTH) + 1,
timestamp.get(debugMode ? Calendar.MINUTE : Calendar.DAY_OF_MONTH));
dateString = dateString.intern();
return dateString;
}
public static String doneSubdirsBeforeSerialTail() {
// date
String result = "/*/*/*"; // YYYY/MM/DD ;
return result;
}
/**
* Computes a serial number used as part of directory naming for the given jobId.
* @param id the jobId.
* @return the serial number used as part of directory naming for the given jobid
*/
public static int jobSerialNumber(JobId id) {
return id.getId();
}
public static List<FileStatus> localGlobber(FileContext fc, Path root, String tail)
throws IOException {
return localGlobber(fc, root, tail, null);
}
public static List<FileStatus> localGlobber(FileContext fc, Path root, String tail,
PathFilter filter) throws IOException {
return localGlobber(fc, root, tail, filter, null);
}
// hasMismatches is just used to return a second value if you want
// one. I would have used MutableBoxedBoolean if such had been provided.
public static List<FileStatus> localGlobber(FileContext fc, Path root, String tail,
PathFilter filter, AtomicBoolean hasFlatFiles) throws IOException {
if (tail.equals("")) {
return (listFilteredStatus(fc, root, filter));
}
if (tail.startsWith("/*")) {
Path[] subdirs = filteredStat2Paths(
remoteIterToList(fc.listStatus(root)), true, hasFlatFiles);
List<List<FileStatus>> subsubdirs = new LinkedList<List<FileStatus>>();
int subsubdirCount = 0;
if (subdirs.length == 0) {
return new LinkedList<FileStatus>();
}
String newTail = tail.substring(2);
for (int i = 0; i < subdirs.length; ++i) {
subsubdirs.add(localGlobber(fc, subdirs[i], newTail, filter, null));
// subsubdirs.set(i, localGlobber(fc, subdirs[i], newTail, filter,
// null));
subsubdirCount += subsubdirs.get(i).size();
}
List<FileStatus> result = new LinkedList<FileStatus>();
for (int i = 0; i < subsubdirs.size(); ++i) {
result.addAll(subsubdirs.get(i));
}
return result;
}
if (tail.startsWith("/")) {
int split = tail.indexOf('/', 1);
if (split < 0) {
return listFilteredStatus(fc, new Path(root, tail.substring(1)), filter);
} else {
String thisSegment = tail.substring(1, split);
String newTail = tail.substring(split);
return localGlobber(fc, new Path(root, thisSegment), newTail, filter,
hasFlatFiles);
}
}
IOException e = new IOException("localGlobber: bad tail");
throw e;
}
private static List<FileStatus> listFilteredStatus(FileContext fc, Path root,
PathFilter filter) throws IOException {
List<FileStatus> fsList = remoteIterToList(fc.listStatus(root));
if (filter == null) {
return fsList;
} else {
List<FileStatus> filteredList = new LinkedList<FileStatus>();
for (FileStatus fs : fsList) {
if (filter.accept(fs.getPath())) {
filteredList.add(fs);
}
}
return filteredList;
}
}
private static List<FileStatus> remoteIterToList(
RemoteIterator<FileStatus> rIter) throws IOException {
List<FileStatus> fsList = new LinkedList<FileStatus>();
if (rIter == null)
return fsList;
while (rIter.hasNext()) {
fsList.add(rIter.next());
}
return fsList;
}
// hasMismatches is just used to return a second value if you want
// one. I would have used MutableBoxedBoolean if such had been provided.
private static Path[] filteredStat2Paths(List<FileStatus> stats, boolean dirs,
AtomicBoolean hasMismatches) {
int resultCount = 0;
if (hasMismatches == null) {
hasMismatches = new AtomicBoolean(false);
}
for (int i = 0; i < stats.size(); ++i) {
if (stats.get(i).isDirectory() == dirs) {
stats.set(resultCount++, stats.get(i));
} else {
hasMismatches.set(true);
}
}
Path[] result = new Path[resultCount];
for (int i = 0; i < resultCount; i++) {
result[i] = stats.get(i).getPath();
}
return result;
}
public static String getHistoryUrl(Configuration conf, ApplicationId appId)
throws UnknownHostException {
//construct the history url for job
String addr = conf.get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS);
Iterator<String> it = ADDR_SPLITTER.split(addr).iterator();
it.next(); // ignore the bind host
String port = it.next();
// Use hs address to figure out the host for webapp
addr = conf.get(JHAdminConfig.MR_HISTORY_ADDRESS,
JHAdminConfig.DEFAULT_MR_HISTORY_ADDRESS);
String host = ADDR_SPLITTER.split(addr).iterator().next();
String hsAddress = JOINER.join(host, ":", port);
InetSocketAddress address = NetUtils.createSocketAddr(
hsAddress, JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_PORT,
JHAdminConfig.DEFAULT_MR_HISTORY_WEBAPP_ADDRESS);
StringBuffer sb = new StringBuffer();
if (address.getAddress().isAnyLocalAddress() ||
address.getAddress().isLoopbackAddress()) {
sb.append(InetAddress.getLocalHost().getHostAddress());
} else {
sb.append(address.getHostName());
}
sb.append(":").append(address.getPort());
sb.append("/jobhistory/job/");
JobID jobId = TypeConverter.fromYarn(appId);
sb.append(jobId.toString());
return sb.toString();
}
}