blob: e3a6631aa460c4e7cdf12b20b72315cf6b671e5e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.filecache;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Helper class of {@link TrackerDistributedCacheManager} that represents
* the cached files of a single task. This class is used
* by TaskRunner/LocalJobRunner to parse out the job configuration
* and setup the local caches.
*
*/
@InterfaceAudience.Private
public class TaskDistributedCacheManager {
private final TrackerDistributedCacheManager distributedCacheManager;
private final Configuration taskConf;
private final List<CacheFile> cacheFiles = new ArrayList<CacheFile>();
private final List<String> classPaths = new ArrayList<String>();
private boolean setupCalled = false;
/**
* Struct representing a single cached file.
* There are four permutations (archive, file) and
* (don't put in classpath, do put in classpath).
*/
static class CacheFile {
/** URI as in the configuration */
final URI uri;
enum FileType {
REGULAR,
ARCHIVE
}
boolean isPublic = true;
/** Whether to decompress */
final FileType type;
final long timestamp;
/** Whether this is to be added to the classpath */
final boolean shouldBeAddedToClassPath;
boolean localized = false;
private CacheFile(URI uri, FileType type, boolean isPublic, long timestamp,
boolean classPath) {
this.uri = uri;
this.type = type;
this.isPublic = isPublic;
this.timestamp = timestamp;
this.shouldBeAddedToClassPath = classPath;
}
/**
* Converts the scheme used by DistributedCache to serialize what files to
* cache in the configuration into CacheFile objects that represent those
* files.
*/
private static List<CacheFile> makeCacheFiles(URI[] uris,
String[] timestamps, String cacheVisibilities[], Path[] paths, FileType type) {
List<CacheFile> ret = new ArrayList<CacheFile>();
if (uris != null) {
if (uris.length != timestamps.length) {
throw new IllegalArgumentException("Mismatched uris and timestamps.");
}
Map<String, Path> classPaths = new HashMap<String, Path>();
if (paths != null) {
for (Path p : paths) {
classPaths.put(p.toUri().getPath().toString(), p);
}
}
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
boolean isClassPath = (null != classPaths.get(u.getPath()));
long t = Long.parseLong(timestamps[i]);
ret.add(new CacheFile(u, type, Boolean.valueOf(cacheVisibilities[i]),
t, isClassPath));
}
}
return ret;
}
boolean getLocalized() {
return localized;
}
void setLocalized(boolean val) {
localized = val;
}
}
TaskDistributedCacheManager(
TrackerDistributedCacheManager distributedCacheManager,
Configuration taskConf) throws IOException {
this.distributedCacheManager = distributedCacheManager;
this.taskConf = taskConf;
this.cacheFiles.addAll(
CacheFile.makeCacheFiles(DistributedCache.getCacheFiles(taskConf),
DistributedCache.getFileTimestamps(taskConf),
TrackerDistributedCacheManager.getFileVisibilities(taskConf),
DistributedCache.getFileClassPaths(taskConf),
CacheFile.FileType.REGULAR));
this.cacheFiles.addAll(
CacheFile.makeCacheFiles(DistributedCache.getCacheArchives(taskConf),
DistributedCache.getArchiveTimestamps(taskConf),
TrackerDistributedCacheManager.getArchiveVisibilities(taskConf),
DistributedCache.getArchiveClassPaths(taskConf),
CacheFile.FileType.ARCHIVE));
}
/**
* Retrieve files into the local cache and updates the task configuration
* (which has been passed in via the constructor).
*
* It is the caller's responsibility to re-write the task configuration XML
* file, if necessary.
*/
public void setup(LocalDirAllocator lDirAlloc, File workDir,
String privateCacheSubdir, String publicCacheSubDir) throws IOException {
setupCalled = true;
if (cacheFiles.isEmpty()) {
return;
}
ArrayList<Path> localArchives = new ArrayList<Path>();
ArrayList<Path> localFiles = new ArrayList<Path>();
Path workdirPath = new Path(workDir.getAbsolutePath());
for (CacheFile cacheFile : cacheFiles) {
URI uri = cacheFile.uri;
FileSystem fileSystem = FileSystem.get(uri, taskConf);
FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
String cacheSubdir = publicCacheSubDir;
if (!cacheFile.isPublic) {
cacheSubdir = privateCacheSubdir;
}
Path p = distributedCacheManager.getLocalCache(uri, taskConf,
cacheSubdir, fileStatus,
cacheFile.type == CacheFile.FileType.ARCHIVE,
cacheFile.timestamp, workdirPath, false, cacheFile.isPublic);
cacheFile.setLocalized(true);
if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
localArchives.add(p);
} else {
localFiles.add(p);
}
if (cacheFile.shouldBeAddedToClassPath) {
classPaths.add(p.toString());
}
}
// Update the configuration object with localized data.
if (!localArchives.isEmpty()) {
TrackerDistributedCacheManager.setLocalArchives(taskConf,
stringifyPathList(localArchives));
}
if (!localFiles.isEmpty()) {
TrackerDistributedCacheManager.setLocalFiles(taskConf,
stringifyPathList(localFiles));
}
}
/*
* This method is called from unit tests.
*/
List<CacheFile> getCacheFiles() {
return cacheFiles;
}
private static String stringifyPathList(List<Path> p){
if (p == null || p.isEmpty()) {
return null;
}
StringBuilder str = new StringBuilder(p.get(0).toString());
for (int i = 1; i < p.size(); i++){
str.append(",");
str.append(p.get(i).toString());
}
return str.toString();
}
/**
* Retrieves class paths (as local references) to add.
* Should be called after setup().
*
*/
public List<String> getClassPaths() throws IOException {
if (!setupCalled) {
throw new IllegalStateException(
"getClassPaths() should be called after setup()");
}
return classPaths;
}
/**
* Releases the cached files/archives, so that space
* can be reclaimed by the {@link TrackerDistributedCacheManager}.
*/
public void release() throws IOException {
for (CacheFile c : cacheFiles) {
if (c.getLocalized()) {
distributedCacheManager.releaseCache(c.uri, taskConf, c.timestamp);
}
}
}
/**
* Creates a class loader that includes the designated
* files and archives.
*/
public ClassLoader makeClassLoader(final ClassLoader parent)
throws MalformedURLException {
final URL[] urls = new URL[classPaths.size()];
for (int i = 0; i < classPaths.size(); ++i) {
urls[i] = new File(classPaths.get(i)).toURI().toURL();
}
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
@Override
public ClassLoader run() {
return new URLClassLoader(urls, parent);
}
});
}
}