blob: 23aa5b3aa8fe18f506b97dde41bf5b7f2399464e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.URISyntaxException;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksum;
import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksumFactory;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.FSDownload;
import com.google.common.annotations.VisibleForTesting;
/**
* The callable class that handles the actual upload to the shared cache.
*/
class SharedCacheUploader implements Callable<Boolean> {
// rwxr-xr-x
static final FsPermission DIRECTORY_PERMISSION =
new FsPermission((short)00755);
// r-xr-xr-x
static final FsPermission FILE_PERMISSION =
new FsPermission((short)00555);
private static final Logger LOG =
LoggerFactory.getLogger(SharedCacheUploader.class);
private final LocalResource resource;
private final Path localPath;
private final String user;
private final Configuration conf;
private final SCMUploaderProtocol scmClient;
private final FileSystem fs;
private final FileSystem localFs;
private final String sharedCacheRootDir;
private final int nestedLevel;
private final SharedCacheChecksum checksum;
private final RecordFactory recordFactory;
public SharedCacheUploader(LocalResource resource, Path localPath,
String user, Configuration conf, SCMUploaderProtocol scmClient)
throws IOException {
this(resource, localPath, user, conf, scmClient,
FileSystem.get(conf), localPath.getFileSystem(conf));
}
/**
* @param resource the local resource that contains the original remote path
* @param localPath the path in the local filesystem where the resource is
* localized
* @param fs the filesystem of the shared cache
* @param localFs the local filesystem
*/
public SharedCacheUploader(LocalResource resource, Path localPath,
String user, Configuration conf, SCMUploaderProtocol scmClient,
FileSystem fs, FileSystem localFs) {
this.resource = resource;
this.localPath = localPath;
this.user = user;
this.conf = conf;
this.scmClient = scmClient;
this.fs = fs;
this.sharedCacheRootDir =
conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
this.nestedLevel = SharedCacheUtil.getCacheDepth(conf);
this.checksum = SharedCacheChecksumFactory.getChecksum(conf);
this.localFs = localFs;
this.recordFactory = RecordFactoryProvider.getRecordFactory(null);
}
/**
* Uploads the file under the shared cache, and notifies the shared cache
* manager. If it is unable to upload the file because it already exists, it
* returns false.
*/
@Override
public Boolean call() throws Exception {
Path tempPath = null;
try {
if (!verifyAccess()) {
LOG.warn("User " + user + " is not authorized to upload file " +
localPath.getName());
return false;
}
// first determine the actual local path that will be used for upload
Path actualPath = getActualPath();
// compute the checksum
String checksumVal = computeChecksum(actualPath);
// create the directory (if it doesn't exist)
Path directoryPath =
new Path(SharedCacheUtil.getCacheEntryPath(nestedLevel,
sharedCacheRootDir, checksumVal));
// let's not check if the directory already exists: in the vast majority
// of the cases, the directory does not exist; as long as mkdirs does not
// error out if it exists, we should be fine
fs.mkdirs(directoryPath, DIRECTORY_PERMISSION);
// create the temporary file
tempPath = new Path(directoryPath, getTemporaryFileName(actualPath));
if (!uploadFile(actualPath, tempPath)) {
LOG.warn("Could not copy the file to the shared cache at " + tempPath);
return false;
}
// set the permission so that it is readable but not writable
fs.setPermission(tempPath, FILE_PERMISSION);
// rename it to the final filename
Path finalPath = new Path(directoryPath, actualPath.getName());
if (!fs.rename(tempPath, finalPath)) {
LOG.warn("The file already exists under " + finalPath +
". Ignoring this attempt.");
deleteTempFile(tempPath);
return false;
}
// notify the SCM
if (!notifySharedCacheManager(checksumVal, actualPath.getName())) {
// the shared cache manager rejected the upload (as it is likely
// uploaded under a different name
// clean up this file and exit
fs.delete(finalPath, false);
return false;
}
// set the replication factor
short replication =
(short)conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR,
YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR);
fs.setReplication(finalPath, replication);
LOG.info("File " + actualPath.getName() +
" was uploaded to the shared cache at " + finalPath);
return true;
} catch (IOException e) {
LOG.warn("Exception while uploading the file " + localPath.getName(), e);
// in case an exception is thrown, delete the temp file
deleteTempFile(tempPath);
throw e;
}
}
@VisibleForTesting
Path getActualPath() throws IOException {
Path path = localPath;
FileStatus status = localFs.getFileStatus(path);
if (status != null && status.isDirectory()) {
// for certain types of resources that get unpacked, the original file may
// be found under the directory with the same name (see
// FSDownload.unpack); check if the path is a directory and if so look
// under it
path = new Path(path, path.getName());
}
return path;
}
private void deleteTempFile(Path tempPath) {
try {
if (tempPath != null) {
fs.delete(tempPath, false);
}
} catch (IOException ioe) {
LOG.debug("Exception received while deleting temp files", ioe);
}
}
/**
* Checks that the (original) remote file is either owned by the user who
* started the app or public.
*/
@VisibleForTesting
boolean verifyAccess() throws IOException {
// if it is in the public cache, it's trivially OK
if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
return true;
}
final Path remotePath;
try {
remotePath = resource.getResource().toPath();
} catch (URISyntaxException e) {
throw new IOException("Invalid resource", e);
}
// get the file status of the HDFS file
FileSystem remoteFs = remotePath.getFileSystem(conf);
FileStatus status = remoteFs.getFileStatus(remotePath);
// check to see if the file has been modified in any way
if (status.getModificationTime() != resource.getTimestamp()) {
LOG.warn("The remote file " + remotePath +
" has changed since it's localized; will not consider it for upload");
return false;
}
// check for the user ownership
if (status.getOwner().equals(user)) {
return true; // the user owns the file
}
// check if the file is publicly readable otherwise
return fileIsPublic(remotePath, remoteFs, status);
}
@VisibleForTesting
boolean fileIsPublic(final Path remotePath, FileSystem remoteFs,
FileStatus status) throws IOException {
return FSDownload.isPublic(remoteFs, remotePath, status, null);
}
/**
* Uploads the file to the shared cache under a temporary name, and returns
* the result.
*/
@VisibleForTesting
boolean uploadFile(Path sourcePath, Path tempPath) throws IOException {
return FileUtil.copy(localFs, sourcePath, fs, tempPath, false, conf);
}
@VisibleForTesting
String computeChecksum(Path path) throws IOException {
InputStream is = localFs.open(path);
try {
return checksum.computeChecksum(is);
} finally {
try { is.close(); } catch (IOException ignore) {}
}
}
private String getTemporaryFileName(Path path) {
return path.getName() + "-" + ThreadLocalRandom.current().nextLong();
}
@VisibleForTesting
boolean notifySharedCacheManager(String checksumVal, String fileName)
throws IOException {
try {
SCMUploaderNotifyRequest request =
recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
request.setResourceKey(checksumVal);
request.setFilename(fileName);
return scmClient.notify(request).getAccepted();
} catch (YarnException e) {
throw new IOException(e);
} catch (UndeclaredThrowableException e) {
// retrieve the cause of the exception and throw it as an IOException
throw new IOException(e.getCause() == null ? e : e.getCause());
}
}
}