blob: d8015656eea1b4f01647ad6c004425aef916a116 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.filecache;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.TrackerDistributedCacheManager.CacheStatus;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobLocalizer;
/**
* Helper class of {@link TrackerDistributedCacheManager} that represents
* the cached files of a single job.
*
* <b>This class is internal to Hadoop, and should not be treated as a public
* interface.</b>
*/
public class TaskDistributedCacheManager {
private final TrackerDistributedCacheManager distributedCacheManager;
private final List<CacheFile> cacheFiles = new ArrayList<CacheFile>();
private final List<String> classPaths = new ArrayList<String>();
private boolean setupCalled = false;
/**
* Struct representing a single cached file.
* There are four permutations (archive, file) and
* (don't put in classpath, do put in classpath).
*/
static class CacheFile {
/** URI as in the configuration */
final URI uri;
enum FileType {
REGULAR,
ARCHIVE
}
boolean isPublic = true;
/** Whether to decompress */
final FileType type;
final long timestamp;
/** Whether this is to be added to the classpath */
final boolean shouldBeAddedToClassPath;
boolean localized = false;
/** The owner of the localized file. Relevant only on the tasktrackers */
final String owner;
private CacheStatus status;
CacheFile(URI uri, FileType type, boolean isPublic, long timestamp,
boolean classPath) throws IOException {
this.uri = uri;
this.type = type;
this.isPublic = isPublic;
this.timestamp = timestamp;
this.shouldBeAddedToClassPath = classPath;
this.owner =
TrackerDistributedCacheManager.getLocalizedCacheOwner(isPublic);
}
/**
* Set the status for this cache file.
* @param status
*/
public void setStatus(CacheStatus status) {
this.status = status;
}
/**
* Get the status for this cache file.
* @return the status object
*/
public CacheStatus getStatus() {
return status;
}
/**
* Converts the scheme used by DistributedCache to serialize what files to
* cache in the configuration into CacheFile objects that represent those
* files.
*/
private static List<CacheFile> makeCacheFiles(URI[] uris,
long[] timestamps, boolean cacheVisibilities[], Path[] paths,
FileType type) throws IOException {
List<CacheFile> ret = new ArrayList<CacheFile>();
if (uris != null) {
if (uris.length != timestamps.length) {
throw new IllegalArgumentException("Mismatched uris and timestamps.");
}
Map<String, Path> classPaths = new HashMap<String, Path>();
if (paths != null) {
for (Path p : paths) {
classPaths.put(p.toUri().getPath().toString(), p);
}
}
for (int i = 0; i < uris.length; ++i) {
URI u = uris[i];
boolean isClassPath = (null != classPaths.get(u.getPath()));
ret.add(new CacheFile(u, type, cacheVisibilities[i],
timestamps[i], isClassPath));
}
}
return ret;
}
boolean getLocalized() {
return localized;
}
void setLocalized(boolean val) {
localized = val;
}
}
TaskDistributedCacheManager(
TrackerDistributedCacheManager distributedCacheManager,
Configuration taskConf) throws IOException {
this.distributedCacheManager = distributedCacheManager;
this.cacheFiles.addAll(
CacheFile.makeCacheFiles(DistributedCache.getCacheFiles(taskConf),
DistributedCache.getFileTimestamps(taskConf),
TrackerDistributedCacheManager.getFileVisibilities(taskConf),
DistributedCache.getFileClassPaths(taskConf),
CacheFile.FileType.REGULAR));
this.cacheFiles.addAll(
CacheFile.makeCacheFiles(DistributedCache.getCacheArchives(taskConf),
DistributedCache.getArchiveTimestamps(taskConf),
TrackerDistributedCacheManager.getArchiveVisibilities(taskConf),
DistributedCache.getArchiveClassPaths(taskConf),
CacheFile.FileType.ARCHIVE));
}
/**
* Retrieve public distributed cache files into the local cache and updates
* the task configuration (which has been passed in via the constructor).
* The private distributed cache is just looked at and the paths where the
* files/archives should go to is decided here. The actual localization is
* done by {@link JobLocalizer}.
*
* It is the caller's responsibility to re-write the task configuration XML
* file, if necessary.
*/
public void setupCache(Configuration taskConf, String publicCacheSubdir,
String privateCacheSubdir) throws IOException {
setupCalled = true;
ArrayList<Path> localArchives = new ArrayList<Path>();
ArrayList<Path> localFiles = new ArrayList<Path>();
for (CacheFile cacheFile : cacheFiles) {
URI uri = cacheFile.uri;
FileSystem fileSystem = FileSystem.get(uri, taskConf);
FileStatus fileStatus = fileSystem.getFileStatus(new Path(uri.getPath()));
Path p;
if (cacheFile.isPublic) {
p = distributedCacheManager.getLocalCache(uri, taskConf,
publicCacheSubdir, fileStatus,
cacheFile.type == CacheFile.FileType.ARCHIVE,
cacheFile.timestamp, cacheFile.isPublic, cacheFile);
} else {
p = distributedCacheManager.getLocalCache(uri, taskConf,
privateCacheSubdir, fileStatus,
cacheFile.type == CacheFile.FileType.ARCHIVE,
cacheFile.timestamp, cacheFile.isPublic, cacheFile);
}
cacheFile.setLocalized(true);
if (cacheFile.type == CacheFile.FileType.ARCHIVE) {
localArchives.add(p);
} else {
localFiles.add(p);
}
if (cacheFile.shouldBeAddedToClassPath) {
classPaths.add(p.toString());
}
}
// Update the configuration object with localized data.
if (!localArchives.isEmpty()) {
DistributedCache.addLocalArchives(taskConf,
stringifyPathList(localArchives));
}
if (!localFiles.isEmpty()) {
DistributedCache.addLocalFiles(taskConf, stringifyPathList(localFiles));
}
}
/*
* This method is called from unit tests.
*/
List<CacheFile> getCacheFiles() {
return cacheFiles;
}
private static String stringifyPathList(List<Path> p){
if (p == null || p.isEmpty()) {
return null;
}
StringBuilder str = new StringBuilder(p.get(0).toString());
for (int i = 1; i < p.size(); i++){
str.append(",");
str.append(p.get(i).toString());
}
return str.toString();
}
/**
* Retrieves class paths (as local references) to add.
* Should be called after setup().
*
*/
public List<String> getClassPaths() throws IOException {
if (!setupCalled) {
throw new IllegalStateException(
"getClassPaths() should be called after setup()");
}
return classPaths;
}
/**
* Releases the cached files/archives, so that space
* can be reclaimed by the {@link TrackerDistributedCacheManager}.
*/
public void release() throws IOException {
for (CacheFile c : cacheFiles) {
if (c.getLocalized() && c.status != null) {
distributedCacheManager.releaseCache(c.status);
}
}
}
public void setSizes(long[] sizes) throws IOException {
int i = 0;
for (CacheFile c: cacheFiles) {
if (!c.isPublic && c.type == CacheFile.FileType.ARCHIVE &&
c.status != null) {
distributedCacheManager.setSize(c.status, sizes[i++]);
}
}
}
/**
* Creates a class loader that includes the designated
* files and archives.
*/
public ClassLoader makeClassLoader(final ClassLoader parent)
throws MalformedURLException {
final URL[] urls = new URL[classPaths.size()];
for (int i = 0; i < classPaths.size(); ++i) {
urls[i] = new File(classPaths.get(i)).toURI().toURL();
}
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
@Override
public ClassLoader run() {
return new URLClassLoader(urls, parent);
}
});
}
}