blob: 9f8edb5df0d6427b268b7ac8b7607a8d7272973b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.filecache;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
/**
* Manages internal configuration of the cache by the client for job submission.
*/
@InterfaceAudience.Private
public class ClientDistributedCacheManager {
/**
* Determines timestamps of files to be cached, and stores those
* in the configuration. Determines the visibilities of the distributed cache
* files and archives. The visibility of a cache path is "public" if the leaf
* component has READ permissions for others, and the parent subdirs have
* EXECUTE permissions for others.
*
* This is an internal method!
*
* @param job
* @throws IOException
*/
public static void determineTimestampsAndCacheVisibilities(Configuration job)
throws IOException {
Map<URI, FileStatus> statCache = new HashMap<URI, FileStatus>();
determineTimestampsAndCacheVisibilities(job, statCache);
}
/**
* See ClientDistributedCacheManager#determineTimestampsAndCacheVisibilities(
* Configuration).
*
* @param job Configuration of a job
* @param statCache A map containing cached file status objects
* @throws IOException if there is a problem with the underlying filesystem
*/
public static void determineTimestampsAndCacheVisibilities(Configuration job,
Map<URI, FileStatus> statCache) throws IOException {
determineTimestamps(job, statCache);
determineCacheVisibilities(job, statCache);
}
/**
* Determines timestamps of files to be cached, and stores those
* in the configuration. This is intended to be used internally by JobClient
* after all cache files have been added.
*
* This is an internal method!
*
* @param job Configuration of a job.
* @throws IOException
*/
public static void determineTimestamps(Configuration job,
Map<URI, FileStatus> statCache) throws IOException {
URI[] tarchives = DistributedCache.getCacheArchives(job);
if (tarchives != null) {
FileStatus status = getFileStatus(job, tarchives[0], statCache);
StringBuilder archiveFileSizes =
new StringBuilder(String.valueOf(status.getLen()));
StringBuilder archiveTimestamps =
new StringBuilder(String.valueOf(status.getModificationTime()));
for (int i = 1; i < tarchives.length; i++) {
status = getFileStatus(job, tarchives[i], statCache);
archiveFileSizes.append(",");
archiveFileSizes.append(String.valueOf(status.getLen()));
archiveTimestamps.append(",");
archiveTimestamps.append(String.valueOf(status.getModificationTime()));
}
job.set(MRJobConfig.CACHE_ARCHIVES_SIZES, archiveFileSizes.toString());
setArchiveTimestamps(job, archiveTimestamps.toString());
}
URI[] tfiles = DistributedCache.getCacheFiles(job);
if (tfiles != null) {
FileStatus status = getFileStatus(job, tfiles[0], statCache);
StringBuilder fileSizes =
new StringBuilder(String.valueOf(status.getLen()));
StringBuilder fileTimestamps = new StringBuilder(String.valueOf(
status.getModificationTime()));
for (int i = 1; i < tfiles.length; i++) {
status = getFileStatus(job, tfiles[i], statCache);
fileSizes.append(",");
fileSizes.append(String.valueOf(status.getLen()));
fileTimestamps.append(",");
fileTimestamps.append(String.valueOf(status.getModificationTime()));
}
job.set(MRJobConfig.CACHE_FILES_SIZES, fileSizes.toString());
setFileTimestamps(job, fileTimestamps.toString());
}
}
/**
* For each archive or cache file - get the corresponding delegation token
* @param job
* @param credentials
* @throws IOException
*/
public static void getDelegationTokens(Configuration job,
Credentials credentials) throws IOException {
URI[] tarchives = DistributedCache.getCacheArchives(job);
URI[] tfiles = DistributedCache.getCacheFiles(job);
int size = (tarchives!=null? tarchives.length : 0) + (tfiles!=null ? tfiles.length :0);
Path[] ps = new Path[size];
int i = 0;
if (tarchives != null) {
for (i=0; i < tarchives.length; i++) {
ps[i] = new Path(tarchives[i].toString());
}
}
if (tfiles != null) {
for(int j=0; j< tfiles.length; j++) {
ps[i+j] = new Path(tfiles[j].toString());
}
}
TokenCache.obtainTokensForNamenodes(credentials, ps, job);
}
/**
* Determines the visibilities of the distributed cache files and
* archives. The visibility of a cache path is "public" if the leaf component
* has READ permissions for others, and the parent subdirs have
* EXECUTE permissions for others
* @param job
* @throws IOException
*/
public static void determineCacheVisibilities(Configuration job,
Map<URI, FileStatus> statCache) throws IOException {
URI[] tarchives = DistributedCache.getCacheArchives(job);
if (tarchives != null) {
StringBuilder archiveVisibilities =
new StringBuilder(String.valueOf(isPublic(job, tarchives[0], statCache)));
for (int i = 1; i < tarchives.length; i++) {
archiveVisibilities.append(",");
archiveVisibilities.append(String.valueOf(isPublic(job, tarchives[i], statCache)));
}
setArchiveVisibilities(job, archiveVisibilities.toString());
}
URI[] tfiles = DistributedCache.getCacheFiles(job);
if (tfiles != null) {
StringBuilder fileVisibilities =
new StringBuilder(String.valueOf(isPublic(job, tfiles[0], statCache)));
for (int i = 1; i < tfiles.length; i++) {
fileVisibilities.append(",");
fileVisibilities.append(String.valueOf(isPublic(job, tfiles[i], statCache)));
}
setFileVisibilities(job, fileVisibilities.toString());
}
}
/**
* This is to check the public/private visibility of the archives to be
* localized.
*
* @param conf Configuration which stores the timestamp's
* @param booleans comma separated list of booleans (true - public)
* The order should be the same as the order in which the archives are added.
*/
static void setArchiveVisibilities(Configuration conf, String booleans) {
conf.set(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES, booleans);
}
/**
* This is to check the public/private visibility of the files to be localized
*
* @param conf Configuration which stores the timestamp's
* @param booleans comma separated list of booleans (true - public)
* The order should be the same as the order in which the files are added.
*/
static void setFileVisibilities(Configuration conf, String booleans) {
conf.set(MRJobConfig.CACHE_FILE_VISIBILITIES, booleans);
}
/**
* This is to check the timestamp of the archives to be localized.
*
* @param conf Configuration which stores the timestamp's
* @param timestamps comma separated list of timestamps of archives.
* The order should be the same as the order in which the archives are added.
*/
static void setArchiveTimestamps(Configuration conf, String timestamps) {
conf.set(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS, timestamps);
}
/**
* This is to check the timestamp of the files to be localized.
*
* @param conf Configuration which stores the timestamp's
* @param timestamps comma separated list of timestamps of files.
* The order should be the same as the order in which the files are added.
*/
static void setFileTimestamps(Configuration conf, String timestamps) {
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, timestamps);
}
/**
* Gets the file status for the given URI. If the URI is in the cache,
* returns it. Otherwise, fetches it and adds it to the cache.
*/
private static FileStatus getFileStatus(Configuration job, URI uri,
Map<URI, FileStatus> statCache) throws IOException {
FileSystem fileSystem = FileSystem.get(uri, job);
return getFileStatus(fileSystem, uri, statCache);
}
/**
* Returns a boolean to denote whether a cache file is visible to all(public)
* or not
* @param conf the configuration
* @param uri the URI to test
* @return true if the path in the uri is visible to all, false otherwise
* @throws IOException thrown if a file system operation fails
*/
static boolean isPublic(Configuration conf, URI uri,
Map<URI, FileStatus> statCache) throws IOException {
boolean isPublic = true;
FileSystem fs = FileSystem.get(uri, conf);
Path current = new Path(uri.getPath());
current = fs.makeQualified(current);
// If we're looking at a wildcarded path, we only need to check that the
// ancestors allow execution. Otherwise, look for read permissions in
// addition to the ancestors' permissions.
if (!current.getName().equals(DistributedCache.WILDCARD)) {
isPublic = checkPermissionOfOther(fs, current, FsAction.READ, statCache);
}
return isPublic &&
ancestorsHaveExecutePermissions(fs, current.getParent(), statCache);
}
/**
* Returns true if all ancestors of the specified path have the 'execute'
* permission set for all users (i.e. that other users can traverse
* the directory heirarchy to the given path)
*/
static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path,
Map<URI, FileStatus> statCache) throws IOException {
Path current = path;
while (current != null) {
//the subdirs in the path should have execute permissions for others
if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
return false;
}
current = current.getParent();
}
return true;
}
/**
* Checks for a given path whether the Other permissions on it
* imply the permission in the passed FsAction
* @param fs
* @param path
* @param action
* @return true if the path in the uri is visible to all, false otherwise
* @throws IOException
*/
private static boolean checkPermissionOfOther(FileSystem fs, Path path,
FsAction action, Map<URI, FileStatus> statCache) throws IOException {
FileStatus status = getFileStatus(fs, path.toUri(), statCache);
FsPermission perms = status.getPermission();
// Encrypted files are always treated as private. This stance has two
// important side effects. The first is that the encrypted files will be
// downloaded as the job owner instead of the YARN user, which is required
// for the KMS ACLs to work as expected. Second, it prevent a file with
// world readable permissions that is stored in an encryption zone from
// being localized as a publicly shared file with world readable
// permissions.
if (!perms.getEncryptedBit()) {
FsAction otherAction = perms.getOtherAction();
if (otherAction.implies(action)) {
return true;
}
}
return false;
}
private static FileStatus getFileStatus(FileSystem fs, URI uri,
Map<URI, FileStatus> statCache) throws IOException {
Path path = new Path(uri);
if (path.getName().equals(DistributedCache.WILDCARD)) {
path = path.getParent();
uri = path.toUri();
}
FileStatus stat = statCache.get(uri);
if (stat == null) {
stat = fs.getFileStatus(path);
statCache.put(uri, stat);
}
return stat;
}
}