blob: 1fa1a44864ad4b67746ff1ad0ba87b093310bd07 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.filecache;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.RunJar;
/**
* Manages a single machine's instance of a cross-job
* cache. This class would typically be instantiated
* by a TaskTracker (or something that emulates it,
* like LocalJobRunner).
*
* <b>This class is internal to Hadoop, and should not be treated as a public
* interface.</b>
*/
public class TrackerDistributedCacheManager {
// cacheID to cacheStatus mapping
private TreeMap<String, CacheStatus> cachedArchives =
new TreeMap<String, CacheStatus>();
private TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
// default total cache size (10GB)
private static final long DEFAULT_CACHE_SIZE = 10737418240L;
private static final Log LOG =
LogFactory.getLog(TrackerDistributedCacheManager.class);
private final LocalFileSystem localFs;
public TrackerDistributedCacheManager(Configuration conf) throws IOException {
this.localFs = FileSystem.getLocal(conf);
}
/**
* Get the locally cached file or archive; it could either be
* previously cached (and valid) or copy it from the {@link FileSystem} now.
*
* @param cache the cache to be localized, this should be specified as
* new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
* @param conf The Configuration file which contains the filesystem
* @param baseDir The base cache Dir where you wnat to localize the
* files/archives
* @param fileStatus The file status on the dfs.
* @param isArchive if the cache is an archive or a file. In case it is an
* archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
* be unzipped/unjarred/untarred automatically
* and the directory where the archive is unzipped/unjarred/untarred is
* returned as the Path.
* In case of a file, the path to the file is returned
* @param confFileStamp this is the hdfs file modification timestamp to verify
* that the file to be cached hasn't changed since the job started
* @param currentWorkDir this is the directory where you would want to create
* symlinks for the locally cached files/archives
* @param honorSymLinkConf if this is false, then the symlinks are not
* created even if conf says so (this is required for an optimization in task
* launches
* NOTE: This is effectively always on since r696957, since there is no code
* path that does not use this.
* @return the path to directory where the archives are unjarred in case of
* archives, the path to the file where the file is copied locally
* @throws IOException
*/
Path getLocalCache(URI cache, Configuration conf,
Path baseDir, FileStatus fileStatus,
boolean isArchive, long confFileStamp,
Path currentWorkDir, boolean honorSymLinkConf)
throws IOException {
String cacheId = makeRelative(cache, conf);
CacheStatus lcacheStatus;
Path localizedPath;
synchronized (cachedArchives) {
lcacheStatus = cachedArchives.get(cacheId);
if (lcacheStatus == null) {
// was never localized
lcacheStatus = new CacheStatus(baseDir,
new Path(baseDir, new Path(cacheId)));
cachedArchives.put(cacheId, lcacheStatus);
}
synchronized (lcacheStatus) {
localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus,
fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
lcacheStatus.refcount++;
}
}
// try deleting stuff if you can
long size = 0;
synchronized (baseDirSize) {
Long get = baseDirSize.get(baseDir);
if ( get != null ) {
size = get.longValue();
}
}
// setting the cache size to a default of 10GB
long allowedSize = conf.getLong(TTConfig.TT_LOCAL_CACHE_SIZE, DEFAULT_CACHE_SIZE);
if (allowedSize < size) {
// try some cache deletions
deleteCache(conf);
}
return localizedPath;
}
/**
* This is the opposite of getlocalcache. When you are done with
* using the cache, you need to release the cache
* @param cache The cache URI to be released
* @param conf configuration which contains the filesystem the cache
* is contained in.
* @throws IOException
*/
void releaseCache(URI cache, Configuration conf)
throws IOException {
String cacheId = makeRelative(cache, conf);
synchronized (cachedArchives) {
CacheStatus lcacheStatus = cachedArchives.get(cacheId);
if (lcacheStatus == null)
return;
synchronized (lcacheStatus) {
lcacheStatus.refcount--;
}
}
}
// To delete the caches which have a refcount of zero
private void deleteCache(Configuration conf) throws IOException {
// try deleting cache Status with refcount of zero
synchronized (cachedArchives) {
for (Iterator<String> it = cachedArchives.keySet().iterator();
it.hasNext();) {
String cacheId = it.next();
CacheStatus lcacheStatus = cachedArchives.get(cacheId);
synchronized (lcacheStatus) {
if (lcacheStatus.refcount == 0) {
// delete this cache entry
FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
synchronized (baseDirSize) {
Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
if ( dirSize != null ) {
dirSize -= lcacheStatus.size;
baseDirSize.put(lcacheStatus.baseDir, dirSize);
}
}
it.remove();
}
}
}
}
}
/*
* Returns the relative path of the dir this cache will be localized in
* relative path that this cache will be localized in. For
* hdfs://hostname:port/absolute_path -- the relative path is
* hostname/absolute path -- if it is just /absolute_path -- then the
* relative path is hostname of DFS this mapred cluster is running
* on/absolute_path
*/
String makeRelative(URI cache, Configuration conf)
throws IOException {
String host = cache.getHost();
if (host == null) {
host = cache.getScheme();
}
if (host == null) {
URI defaultUri = FileSystem.get(conf).getUri();
host = defaultUri.getHost();
if (host == null) {
host = defaultUri.getScheme();
}
}
String path = host + cache.getPath();
path = path.replace(":/","/"); // remove windows device colon
return path;
}
/**
* Returns mtime of a given cache file on hdfs.
*
* @param conf configuration
* @param cache cache file
* @return mtime of a given cache file on hdfs
* @throws IOException
*/
static long getTimestamp(Configuration conf, URI cache)
throws IOException {
FileSystem fileSystem = FileSystem.get(cache, conf);
Path filePath = new Path(cache.getPath());
return fileSystem.getFileStatus(filePath).getModificationTime();
}
private Path cacheFilePath(Path p) {
return new Path(p, p.getName());
}
// the method which actually copies the caches locally and unjars/unzips them
// and does chmod for the files
private Path localizeCache(Configuration conf,
URI cache, long confFileStamp,
CacheStatus cacheStatus,
FileStatus fileStatus,
boolean isArchive,
Path currentWorkDir,
boolean honorSymLinkConf)
throws IOException {
boolean doSymlink = honorSymLinkConf && DistributedCache.getSymlink(conf);
if(cache.getFragment() == null) {
doSymlink = false;
}
FileSystem fs = FileSystem.get(cache, conf);
String link =
currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
File flink = new File(link);
if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
cacheStatus, fileStatus)) {
LOG.info(String.format("Using existing cache of %s->%s",
cache.toString(), cacheStatus.localLoadPath));
if (isArchive) {
if (doSymlink){
if (!flink.exists())
FileUtil.symLink(cacheStatus.localLoadPath.toString(),
link);
}
return cacheStatus.localLoadPath;
}
else {
if (doSymlink){
if (!flink.exists())
FileUtil.symLink(
cacheFilePath(cacheStatus.localLoadPath).toString(), link);
}
return cacheFilePath(cacheStatus.localLoadPath);
}
} else {
// remove the old archive
// if the old archive cannot be removed since it is being used by another
// job
// return null
if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
+ " is in use and cannot be refreshed");
FileSystem localFs = FileSystem.getLocal(conf);
localFs.delete(cacheStatus.localLoadPath, true);
synchronized (baseDirSize) {
Long dirSize = baseDirSize.get(cacheStatus.baseDir);
if ( dirSize != null ) {
dirSize -= cacheStatus.size;
baseDirSize.put(cacheStatus.baseDir, dirSize);
}
}
Path parchive = new Path(cacheStatus.localLoadPath,
new Path(cacheStatus.localLoadPath.getName()));
if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
throw new IOException("Mkdirs failed to create directory " +
cacheStatus.localLoadPath.toString());
}
String cacheId = cache.getPath();
fs.copyToLocalFile(new Path(cacheId), parchive);
if (isArchive) {
String tmpArchive = parchive.toString().toLowerCase();
File srcFile = new File(parchive.toString());
File destDir = new File(parchive.getParent().toString());
LOG.info(String.format("Extracting %s to %s",
srcFile.toString(), destDir.toString()));
if (tmpArchive.endsWith(".jar")) {
RunJar.unJar(srcFile, destDir);
} else if (tmpArchive.endsWith(".zip")) {
FileUtil.unZip(srcFile, destDir);
} else if (isTarFile(tmpArchive)) {
FileUtil.unTar(srcFile, destDir);
} else {
LOG.warn(String.format(
"Cache file %s specified as archive, but not valid extension.",
srcFile.toString()));
// else will not do anyhting
// and copy the file into the dir as it is
}
}
long cacheSize =
FileUtil.getDU(new File(parchive.getParent().toString()));
cacheStatus.size = cacheSize;
synchronized (baseDirSize) {
Long dirSize = baseDirSize.get(cacheStatus.baseDir);
if( dirSize == null ) {
dirSize = Long.valueOf(cacheSize);
} else {
dirSize += cacheSize;
}
baseDirSize.put(cacheStatus.baseDir, dirSize);
}
// do chmod here
try {
//Setting recursive permission to grant everyone read and execute
FileUtil.chmod(cacheStatus.baseDir.toString(), "ugo+rx",true);
} catch(InterruptedException e) {
LOG.warn("Exception in chmod" + e.toString());
}
// update cacheStatus to reflect the newly cached file
cacheStatus.currentStatus = true;
cacheStatus.mtime = getTimestamp(conf, cache);
LOG.info(String.format("Cached %s as %s",
cache.toString(), cacheStatus.localLoadPath));
}
if (isArchive){
if (doSymlink){
if (!flink.exists())
FileUtil.symLink(cacheStatus.localLoadPath.toString(),
link);
}
return cacheStatus.localLoadPath;
}
else {
if (doSymlink){
if (!flink.exists())
FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(),
link);
}
return cacheFilePath(cacheStatus.localLoadPath);
}
}
private static boolean isTarFile(String filename) {
return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
filename.endsWith(".tar"));
}
// Checks if the cache has already been localized and is fresh
private boolean ifExistsAndFresh(Configuration conf, FileSystem fs,
URI cache, long confFileStamp,
CacheStatus lcacheStatus,
FileStatus fileStatus)
throws IOException {
// check for existence of the cache
if (lcacheStatus.currentStatus == false) {
return false;
} else {
long dfsFileStamp;
if (fileStatus != null) {
dfsFileStamp = fileStatus.getModificationTime();
} else {
dfsFileStamp = getTimestamp(conf, cache);
}
// ensure that the file on hdfs hasn't been modified since the job started
if (dfsFileStamp != confFileStamp) {
LOG.fatal("File: " + cache + " has changed on HDFS since job started");
throw new IOException("File: " + cache +
" has changed on HDFS since job started");
}
if (dfsFileStamp != lcacheStatus.mtime) {
// needs refreshing
return false;
}
}
return true;
}
/**
* This method create symlinks for all files in a given dir in another
* directory.
*
* Should not be used outside of DistributedCache code.
*
* @param conf the configuration
* @param jobCacheDir the target directory for creating symlinks
* @param workDir the directory in which the symlinks are created
* @throws IOException
*/
public static void createAllSymlink(Configuration conf, File jobCacheDir,
File workDir)
throws IOException{
if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
workDir == null || (!workDir.isDirectory())) {
return;
}
boolean createSymlink = DistributedCache.getSymlink(conf);
if (createSymlink){
File[] list = jobCacheDir.listFiles();
for (int i=0; i < list.length; i++){
String target = list[i].getAbsolutePath();
String link = new File(workDir, list[i].getName()).toString();
LOG.info(String.format("Creating symlink: %s <- %s", target, link));
int ret = FileUtil.symLink(target, link);
if (ret != 0) {
LOG.warn(String.format("Failed to create symlink: %s <- %s", target,
link));
}
}
}
}
private static class CacheStatus {
// false, not loaded yet, true is loaded
boolean currentStatus;
// the local load path of this cache
Path localLoadPath;
//the base dir where the cache lies
Path baseDir;
//the size of this cache
long size;
// number of instances using this cache
int refcount;
// the cache-file modification time
long mtime;
public CacheStatus(Path baseDir, Path localLoadPath) {
super();
this.currentStatus = false;
this.localLoadPath = localLoadPath;
this.refcount = 0;
this.mtime = -1;
this.baseDir = baseDir;
this.size = 0;
}
}
/**
* Clear the entire contents of the cache and delete the backing files. This
* should only be used when the server is reinitializing, because the users
* are going to lose their files.
*/
public void purgeCache() {
synchronized (cachedArchives) {
for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
try {
localFs.delete(f.getValue().localLoadPath, true);
} catch (IOException ie) {
LOG.debug("Error cleaning up cache", ie);
}
}
cachedArchives.clear();
}
}
public TaskDistributedCacheManager newTaskDistributedCacheManager(
Configuration taskConf) throws IOException {
return new TaskDistributedCacheManager(this, taskConf);
}
/**
* Determines timestamps of files to be cached, and stores those
* in the configuration. This is intended to be used internally by JobClient
* after all cache files have been added.
*
* This is an internal method!
*
* @param job Configuration of a job.
* @throws IOException
*/
public static void determineTimestamps(Configuration job) throws IOException {
URI[] tarchives = DistributedCache.getCacheArchives(job);
if (tarchives != null) {
StringBuffer archiveTimestamps =
new StringBuffer(String.valueOf(
getTimestamp(job, tarchives[0])));
for (int i = 1; i < tarchives.length; i++) {
archiveTimestamps.append(",");
archiveTimestamps.append(String.valueOf(
getTimestamp(job, tarchives[i])));
}
setArchiveTimestamps(job, archiveTimestamps.toString());
}
URI[] tfiles = DistributedCache.getCacheFiles(job);
if (tfiles != null) {
StringBuffer fileTimestamps = new StringBuffer(String.valueOf(
getTimestamp(job, tfiles[0])));
for (int i = 1; i < tfiles.length; i++) {
fileTimestamps.append(",");
fileTimestamps.append(String.valueOf(
getTimestamp(job, tfiles[i])));
}
setFileTimestamps(job, fileTimestamps.toString());
}
}
/**
* This method checks if there is a conflict in the fragment names
* of the uris. Also makes sure that each uri has a fragment. It
* is only to be called if you want to create symlinks for
* the various archives and files. May be used by user code.
* @param uriFiles The uri array of urifiles
* @param uriArchives the uri array of uri archives
*/
public static boolean checkURIs(URI[] uriFiles, URI[] uriArchives){
if ((uriFiles == null) && (uriArchives == null)){
return true;
}
if (uriFiles != null){
for (int i = 0; i < uriFiles.length; i++){
String frag1 = uriFiles[i].getFragment();
if (frag1 == null)
return false;
for (int j=i+1; j < uriFiles.length; j++){
String frag2 = uriFiles[j].getFragment();
if (frag2 == null)
return false;
if (frag1.equalsIgnoreCase(frag2))
return false;
}
if (uriArchives != null){
for (int j = 0; j < uriArchives.length; j++){
String frag2 = uriArchives[j].getFragment();
if (frag2 == null){
return false;
}
if (frag1.equalsIgnoreCase(frag2))
return false;
for (int k=j+1; k < uriArchives.length; k++){
String frag3 = uriArchives[k].getFragment();
if (frag3 == null)
return false;
if (frag2.equalsIgnoreCase(frag3))
return false;
}
}
}
}
}
return true;
}
/**
* This is to check the timestamp of the archives to be localized.
*
* @param conf Configuration which stores the timestamp's
* @param timestamps comma separated list of timestamps of archives.
* The order should be the same as the order in which the archives are added.
*/
static void setArchiveTimestamps(Configuration conf, String timestamps) {
conf.set(JobContext.CACHE_ARCHIVES_TIMESTAMPS, timestamps);
}
/**
* This is to check the timestamp of the files to be localized.
*
* @param conf Configuration which stores the timestamp's
* @param timestamps comma separated list of timestamps of files.
* The order should be the same as the order in which the files are added.
*/
static void setFileTimestamps(Configuration conf, String timestamps) {
conf.set(JobContext.CACHE_FILE_TIMESTAMPS, timestamps);
}
/**
* Set the conf to contain the location for localized archives.
*
* @param conf The conf to modify to contain the localized caches
* @param str a comma separated list of local archives
*/
static void setLocalArchives(Configuration conf, String str) {
conf.set(JobContext.CACHE_LOCALARCHIVES, str);
}
/**
* Set the conf to contain the location for localized files.
*
* @param conf The conf to modify to contain the localized caches
* @param str a comma separated list of local files
*/
static void setLocalFiles(Configuration conf, String str) {
conf.set(JobContext.CACHE_LOCALFILES, str);
}
}