blob: 5df900b9f64137bfd66dd7b5863b098c865868b2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.webapp.View.ViewContext;
import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
/**
* Base class to implement Log Aggregation File Controller.
*/
@Private
@Unstable
public abstract class LogAggregationFileController {
private static final Logger LOG = LoggerFactory.getLogger(
LogAggregationFileController.class);
/*
* Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
* Group to which NMOwner belongs> App dirs will be created as 770,
* owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
* access / modify the files.
* <NMGroup> should obviously be a limited access group.
*/
/**
* Permissions for the top level directory under which app directories will be
* created.
*/
protected static final FsPermission TLDIR_PERMISSIONS = FsPermission
.createImmutable((short) 01777);
/**
* Permissions for the Application directory.
*/
protected static final FsPermission APP_DIR_PERMISSIONS = FsPermission
.createImmutable((short) 0770);
/**
* Umask for the log file.
*/
protected static final FsPermission APP_LOG_FILE_UMASK = FsPermission
.createImmutable((short) (0640 ^ 0777));
// This is temporary solution. The configuration will be deleted once
// we find a more scalable method to only write a single log file per LRS.
private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
= YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app";
private static final int
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
// This is temporary solution. The configuration will be deleted once we have
// the FileSystem API to check whether append operation is supported or not.
public static final String LOG_AGGREGATION_FS_SUPPORT_APPEND
= YarnConfiguration.YARN_PREFIX+ "log-aggregation.fs-support-append";
protected Configuration conf;
protected Path remoteRootLogDir;
protected String remoteRootLogDirSuffix;
protected int retentionSize;
protected String fileControllerName;
public LogAggregationFileController() {}
/**
* Initialize the log file controller.
* @param conf the Configuration
* @param controllerName the log controller class name
*/
public void initialize(Configuration conf, String controllerName) {
this.conf = conf;
int configuredRentionSize =
conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
if (configuredRentionSize <= 0) {
this.retentionSize =
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
} else {
this.retentionSize = configuredRentionSize;
}
this.fileControllerName = controllerName;
initInternal(conf);
}
/**
* Derived classes initialize themselves using this method.
* @param conf the Configuration
*/
protected abstract void initInternal(Configuration conf);
/**
* Get the remote root log directory.
* @return the remote root log directory path
*/
public Path getRemoteRootLogDir() {
return this.remoteRootLogDir;
}
/**
* Get the log aggregation directory suffix.
* @return the log aggregation directory suffix
*/
public String getRemoteRootLogDirSuffix() {
return this.remoteRootLogDirSuffix;
}
/**
* Initialize the writer.
* @param context the {@link LogAggregationFileControllerContext}
* @throws IOException if fails to initialize the writer
*/
public abstract void initializeWriter(
LogAggregationFileControllerContext context) throws IOException;
/**
* Close the writer.
*/
public abstract void closeWriter();
/**
* Write the log content.
* @param logKey the log key
* @param logValue the log content
* @throws IOException if fails to write the logs
*/
public abstract void write(LogKey logKey, LogValue logValue)
throws IOException;
/**
* Operations needed after write the log content.
* @param record the {@link LogAggregationFileControllerContext}
* @throws Exception if anything fails
*/
public abstract void postWrite(LogAggregationFileControllerContext record)
throws Exception;
protected void closePrintStream(OutputStream out) {
if (out != System.out) {
IOUtils.cleanupWithLogger(LOG, out);
}
}
/**
* Output container log.
* @param logRequest {@link ContainerLogsRequest}
* @param os the output stream
* @return true if we can read the aggregated logs successfully
* @throws IOException if we can not access the log file.
*/
public abstract boolean readAggregatedLogs(ContainerLogsRequest logRequest,
OutputStream os) throws IOException;
/**
* Return a list of {@link ContainerLogMeta} for an application
* from Remote FileSystem.
*
* @param logRequest {@link ContainerLogsRequest}
* @return a list of {@link ContainerLogMeta}
* @throws IOException if there is no available log file
*/
public abstract List<ContainerLogMeta> readAggregatedLogsMeta(
ContainerLogsRequest logRequest) throws IOException;
/**
* Render Aggregated Logs block.
* @param html the html
* @param context the ViewContext
*/
public abstract void renderAggregatedLogsBlock(Block html,
ViewContext context);
/**
* Returns the owner of the application.
*
* @param aggregatedLogPath the aggregatedLog path
* @return the application owner
* @throws IOException if we can not get the application owner
*/
public abstract String getApplicationOwner(Path aggregatedLogPath)
throws IOException;
/**
* Returns ACLs for the application. An empty map is returned if no ACLs are
* found.
*
* @param aggregatedLogPath the aggregatedLog path.
* @return a map of the Application ACLs.
* @throws IOException if we can not get the application acls
*/
public abstract Map<ApplicationAccessType, String> getApplicationAcls(
Path aggregatedLogPath) throws IOException;
/**
* Verify and create the remote log directory.
*/
public void verifyAndCreateRemoteLogDir() {
boolean logPermError = true;
// Checking the existence of the TLD
FileSystem remoteFS = null;
try {
remoteFS = getFileSystem(conf);
} catch (IOException e) {
throw new YarnRuntimeException(
"Unable to get Remote FileSystem instance", e);
}
boolean remoteExists = true;
Path remoteRootLogDir = getRemoteRootLogDir();
try {
FsPermission perms =
remoteFS.getFileStatus(remoteRootLogDir).getPermission();
if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) {
LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+ "] already exist, but with incorrect permissions. "
+ "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+ "]." + " The cluster may have problems with multiple users.");
logPermError = false;
} else {
logPermError = true;
}
} catch (FileNotFoundException e) {
remoteExists = false;
} catch (IOException e) {
throw new YarnRuntimeException(
"Failed to check permissions for dir ["
+ remoteRootLogDir + "]", e);
}
if (!remoteExists) {
LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+ "] does not exist. Attempting to create it.");
try {
Path qualified =
remoteRootLogDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
String primaryGroupName = null;
try {
primaryGroupName = loginUser.getPrimaryGroupName();
} catch (IOException e) {
LOG.warn("No primary group found. The remote root log directory" +
" will be created with the HDFS superuser being its group " +
"owner. JobHistoryServer may be unable to read the directory.");
}
// set owner on the remote directory only if the primary group exists
if (primaryGroupName != null) {
remoteFS.setOwner(qualified,
loginUser.getShortUserName(), primaryGroupName);
}
} catch (IOException e) {
throw new YarnRuntimeException("Failed to create remoteLogDir ["
+ remoteRootLogDir + "]", e);
}
}
}
/**
* Create remote Application directory for log aggregation.
* @param user the user
* @param appId the application ID
* @param userUgi the UGI
*/
public void createAppDir(final String user, final ApplicationId appId,
UserGroupInformation userUgi) {
final Path remoteRootLogDir = getRemoteRootLogDir();
final String remoteRootLogDirSuffix = getRemoteRootLogDirSuffix();
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
try {
// TODO: Reuse FS for user?
FileSystem remoteFS = getFileSystem(conf);
// Only creating directories if they are missing to avoid
// unnecessary load on the filesystem from all of the nodes
Path appDir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogDir, appId, user, remoteRootLogDirSuffix);
appDir = appDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
remoteRootLogDir, user, remoteRootLogDirSuffix);
suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
Path userDir = LogAggregationUtils.getRemoteLogUserDir(
remoteRootLogDir, user);
userDir = userDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
}
createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
}
createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
}
} catch (IOException e) {
LOG.error("Failed to setup application log directory for "
+ appId, e);
throw e;
}
return null;
}
});
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
}
@VisibleForTesting
protected FileSystem getFileSystem(Configuration conf) throws IOException {
return getRemoteRootLogDir().getFileSystem(conf);
}
protected void createDir(FileSystem fs, Path path, FsPermission fsPerm)
throws IOException {
FsPermission dirPerm = new FsPermission(fsPerm);
fs.mkdirs(path, dirPerm);
FsPermission umask = FsPermission.getUMask(fs.getConf());
if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
fs.setPermission(path, new FsPermission(fsPerm));
}
}
protected boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
throws IOException {
boolean exists = true;
try {
FileStatus appDirStatus = fs.getFileStatus(path);
if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
fs.setPermission(path, APP_DIR_PERMISSIONS);
}
} catch (FileNotFoundException fnfe) {
exists = false;
}
return exists;
}
/**
* Get the remote aggregated log path.
* @param appId the ApplicationId
* @param user the Application Owner
* @param nodeId the NodeManager Id
* @return the remote aggregated log path
*/
public Path getRemoteNodeLogFileForApp(ApplicationId appId, String user,
NodeId nodeId) {
return LogAggregationUtils.getRemoteNodeLogFileForApp(
getRemoteRootLogDir(), appId, user, nodeId,
getRemoteRootLogDirSuffix());
}
/**
* Get the remote application directory for log aggregation.
* @param appId the Application ID
* @param appOwner the Application Owner
* @return the remote application directory
* @throws IOException if can not find the remote application directory
*/
public Path getRemoteAppLogDir(ApplicationId appId, String appOwner)
throws IOException {
return LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner,
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
}
protected void cleanOldLogs(Path remoteNodeLogFileForApp,
final NodeId nodeId, UserGroupInformation userUgi) {
try {
final FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf);
Path appDir = remoteNodeLogFileForApp.getParent().makeQualified(
remoteFS.getUri(), remoteFS.getWorkingDirectory());
Set<FileStatus> status =
new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
Iterable<FileStatus> mask =
Iterables.filter(status, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus next) {
return next.getPath().getName()
.contains(LogAggregationUtils.getNodeString(nodeId))
&& !next.getPath().getName().endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX);
}
});
status = Sets.newHashSet(mask);
// Normally, we just need to delete one oldest log
// before we upload a new log.
// If we can not delete the older logs in this cycle,
// we will delete them in next cycle.
if (status.size() >= this.retentionSize) {
// sort by the lastModificationTime ascending
List<FileStatus> statusList = new ArrayList<FileStatus>(status);
Collections.sort(statusList, new Comparator<FileStatus>() {
public int compare(FileStatus s1, FileStatus s2) {
return s1.getModificationTime() < s2.getModificationTime() ? -1
: s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
}
});
for (int i = 0; i <= statusList.size() - this.retentionSize; i++) {
final FileStatus remove = statusList.get(i);
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
remoteFS.delete(remove.getPath(), false);
return null;
}
});
} catch (Exception e) {
LOG.error("Failed to delete " + remove.getPath(), e);
}
}
}
} catch (Exception e) {
LOG.error("Failed to clean old logs", e);
}
}
/**
* Create the aggregated log suffix. The LogAggregationFileController
* should call this to get the suffix and append the suffix to the end
* of each log. This would keep the aggregated log format consistent.
*
* @param fileName the File Name
* @return the aggregated log suffix String
*/
protected String aggregatedLogSuffix(String fileName) {
StringBuilder sb = new StringBuilder();
String endOfFile = "End of LogType:" + fileName;
sb.append("\n" + endOfFile + "\n");
sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+ "\n\n");
return sb.toString();
}
}