MAPREDUCE-2494. Order distributed cache deletions by LRU.
Contributed by Robert Joseph Evans
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1127386 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 92791a0..06ce82b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,8 +130,11 @@
HADOOP-7259. Contrib modules should include the build.properties from
the enclosing hadoop directory. (omalley)
+ MAPREDUCE-2494. Order distributed cache deletions by LRU. (Robert Joseph
+ Evans via cdouglas)
+
OPTIMIZATIONS
-
+
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
JobInProgress.getCounters() aquire locks in a shorter time period.
(Joydeep Sen Sarma via schen)
diff --git a/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java b/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
index 56bdf10..6a4d403 100644
--- a/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
+++ b/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
@@ -21,12 +21,12 @@
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
-import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
@@ -57,14 +57,19 @@
@InterfaceAudience.Private
public class TrackerDistributedCacheManager {
// cacheID to cacheStatus mapping
- private TreeMap<String, CacheStatus> cachedArchives =
- new TreeMap<String, CacheStatus>();
+ private LinkedHashMap<String, CacheStatus> cachedArchives =
+ new LinkedHashMap<String, CacheStatus>();
// default total cache size (10GB)
private static final long DEFAULT_CACHE_SIZE = 10737418240L;
private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
+ // This default of 75 % was just pulled out of the air, it should be set
+ // based off of read world use cases.
+ private static final float DEFAULT_CACHE_KEEP_AROUND_PCT = 0.75f;
private long allowedCacheSize;
private long allowedCacheSubdirs;
+ private long allowedCacheSizeCleanupGoal;
+ private long allowedCacheSubdirsCleanupGoal;
private static final Log LOG =
LogFactory.getLog(TrackerDistributedCacheManager.class);
@@ -96,6 +101,12 @@
// setting the cache number of subdirectories limit to a default of 10000
this.allowedCacheSubdirs = conf.getLong(
TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, DEFAULT_CACHE_SUBDIR_LIMIT);
+ double cleanupPct = conf.getFloat(TTConfig.TT_LOCAL_CACHE_KEEP_AROUND_PCT,
+ DEFAULT_CACHE_KEEP_AROUND_PCT);
+ this.allowedCacheSizeCleanupGoal =
+ (long)(this.allowedCacheSize * cleanupPct);
+ this.allowedCacheSubdirsCleanupGoal =
+ (long)(this.allowedCacheSubdirs * cleanupPct);
this.cleanupThread = new CleanupThread(conf);
}
@@ -160,12 +171,12 @@
Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
fileStatus.getLen(), trackerConf);
lcacheStatus = new CacheStatus(new Path(localPath.toString().replace(
- cachePath, "")), localPath, new Path(subDir), uniqueString);
+ cachePath, "")), localPath, new Path(subDir), uniqueString, key);
cachedArchives.put(key, lcacheStatus);
}
//mark the cache for use.
- lcacheStatus.refcount++;
+ lcacheStatus.incRefCount();
}
boolean initSuccessful = false;
@@ -190,9 +201,7 @@
return localizedPath;
} finally {
if (!initSuccessful) {
- synchronized (cachedArchives) {
- lcacheStatus.refcount--;
- }
+ lcacheStatus.decRefCount();
}
}
}
@@ -219,7 +228,7 @@
}
// decrement ref count
- lcacheStatus.refcount--;
+ lcacheStatus.decRefCount();
}
}
@@ -234,7 +243,7 @@
if (lcacheStatus == null) {
throw new IOException("Cannot find localized cache: " + cache);
}
- return lcacheStatus.refcount;
+ return lcacheStatus.getRefCount();
}
}
@@ -533,11 +542,11 @@
}
}
- static class CacheStatus {
+ class CacheStatus {
//
// This field should be accessed under global cachedArchives lock.
//
- int refcount; // number of instances using this cache.
+ private int refcount; // number of instances using this cache.
//
// The following three fields should be accessed under
@@ -559,9 +568,11 @@
final Path localizedLoadPath;
//the base dir where the cache lies
final Path localizedBaseDir;
+ //The key of this in the cachedArchives.
+ private final String key;
public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
- String uniqueString) {
+ String uniqueString, String key) {
super();
this.localizedLoadPath = localLoadPath;
this.refcount = 0;
@@ -570,6 +581,32 @@
this.size = 0;
this.subDir = subDir;
this.uniqueString = uniqueString;
+ this.key = key;
+ }
+
+ public synchronized void incRefCount() {
+ refcount += 1;
+ }
+
+ public void decRefCount() {
+ synchronized (cachedArchives) {
+ synchronized (this) {
+ refcount -= 1;
+ if(refcount <= 0) {
+ String key = this.key;
+ cachedArchives.remove(key);
+ cachedArchives.put(key, this);
+ }
+ }
+ }
+ }
+
+ public int getRefCount() {
+ return refcount;
+ }
+
+ public synchronized boolean isUsed() {
+ return refcount > 0;
}
Path getBaseDir(){
@@ -699,25 +736,20 @@
}
}
+ private class CacheDir {
+ long size;
+ long subdirs;
+ }
+
/**
* This class holds properties of each base directories and is responsible
* for clean up unused cache files in base directories.
*/
protected class BaseDirManager {
- private class CacheDir {
- long size;
- long subdirs;
- }
+
private TreeMap<Path, CacheDir> properties =
new TreeMap<Path, CacheDir>();
- private long getDirSize(Path p) {
- return properties.get(p).size;
- }
- private long getDirSubdirs(Path p) {
- return properties.get(p).subdirs;
- }
-
/**
* Check each base directory to see if the size or number of subdirectories
* are exceed the limit. If the limit is exceeded, start deleting caches
@@ -725,23 +757,33 @@
*/
public void checkAndCleanup() throws IOException {
Collection<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>();
- Set<Path> toBeCleanedBaseDir = new HashSet<Path>();
+ HashMap<Path, CacheDir> toBeCleanedBaseDir =
+ new HashMap<Path, CacheDir>();
synchronized (properties) {
- for (Path baseDir : properties.keySet()) {
- if (allowedCacheSize < getDirSize(baseDir) ||
- allowedCacheSubdirs < getDirSubdirs(baseDir)) {
- toBeCleanedBaseDir.add(baseDir);
+ for (Map.Entry<Path, CacheDir> baseDir : properties.entrySet()) {
+ CacheDir baseDirCounts = baseDir.getValue();
+ if (allowedCacheSize < baseDirCounts.size ||
+ allowedCacheSubdirs < baseDirCounts.subdirs) {
+ CacheDir tcc = new CacheDir();
+ tcc.size = baseDirCounts.size - allowedCacheSizeCleanupGoal;
+ tcc.subdirs = baseDirCounts.subdirs - allowedCacheSubdirsCleanupGoal;
+ toBeCleanedBaseDir.put(baseDir.getKey(), tcc);
}
}
}
synchronized (cachedArchives) {
- for (Iterator<String> it = cachedArchives.keySet().iterator();
- it.hasNext();) {
- String cacheId = it.next();
+ for (Iterator<Map.Entry<String, CacheStatus>> it =
+ cachedArchives.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<String, CacheStatus> entry = it.next();
+ String cacheId = entry.getKey();
CacheStatus cacheStatus = cachedArchives.get(cacheId);
- if (toBeCleanedBaseDir.contains(cacheStatus.getBaseDir())) {
+ CacheDir leftToClean = toBeCleanedBaseDir.get(cacheStatus.getBaseDir());
+ if (leftToClean != null
+ && (leftToClean.size > 0 || leftToClean.subdirs > 0)) {
// if reference count is zero mark the cache for deletion
- if (cacheStatus.refcount == 0) {
+ if (!cacheStatus.isUsed()) {
+ leftToClean.size -= cacheStatus.size;
+ leftToClean.subdirs--;
// delete this cache entry from the global list
// and mark the localized file for deletion
toBeDeletedCache.add(cacheStatus);
@@ -760,6 +802,7 @@
}
}
}
+
/**
* Decrement the size and sub directory count of the cache from baseDirSize
* and baseDirNumberSubDir. Have to synchronize cacheStatus before calling
@@ -792,6 +835,7 @@
*/
public void addCacheUpdate(CacheStatus cacheStatus) {
long cacheSize = cacheStatus.size;
+ LOG.info("Adding in cache "+cacheStatus.localizedLoadPath+" at "+cacheStatus.localizedBaseDir+" size:"+cacheStatus.size);
synchronized (properties) {
CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
if (cacheDir != null) {
diff --git a/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java b/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
index 9d723a0..d9bd67d 100644
--- a/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
+++ b/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
@@ -99,4 +99,11 @@
"mapreduce.tasktracker.userlogcleanup.sleeptime";
public static final String TT_DISTRIBUTED_CACHE_CHECK_PERIOD =
"mapreduce.tasktracker.distributedcache.checkperiod";
+ /**
+ * Percentage of the local distributed cache that should be kept in between
+ * garbage collection.
+ */
+ public static final String TT_LOCAL_CACHE_KEEP_AROUND_PCT =
+ "mapreduce.tasktracker.cache.local.keep.pct";
+
}
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java b/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
index 50df192..dcfa7f5 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
@@ -41,7 +41,6 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -545,6 +544,106 @@
return fileStatus.getModificationTime();
}
+ public static final long CACHE_DELETE_PERIOD_MS = 100l;
+
+ /** test delete cache */
+ public void testLRUDeleteCache() throws Exception {
+ if (!canRun()) {
+ return;
+ }
+ // This test needs MRConfig.LOCAL_DIR to be single directory
+ // instead of four, because it assumes that both
+ // firstcachefile and secondcachefile will be localized on same directory
+ // so that second localization triggers deleteCache.
+ // If MRConfig.LOCAL_DIR is four directories, second localization might not
+ // trigger deleteCache, if it is localized in different directory.
+ Configuration conf2 = new Configuration(conf);
+ conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString());
+ //Make it larger then expected
+ conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, 21 * 1024l);
+ conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, 3);
+ //The goal is to get down to 15.75K and 2 dirs
+ conf2.setFloat(TTConfig.TT_LOCAL_CACHE_KEEP_AROUND_PCT, 0.75f);
+ conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, CACHE_DELETE_PERIOD_MS);
+ refreshConf(conf2);
+ TrackerDistributedCacheManager manager =
+ new TrackerDistributedCacheManager(conf2, taskController);
+ manager.startCleanupThread();
+ FileSystem localfs = FileSystem.getLocal(conf2);
+ String userName = getJobOwnerName();
+ conf2.set(MRJobConfig.USER_NAME, userName);
+
+ //Here we are testing the LRU. In this case we will add in 4 cache entries
+ // 2 of them are 8k each and 2 of them are very small. We want to verify
+ // That they are deleted in LRU order.
+ // So what we will do is add in the two large files first, 1 then 2, and
+ // then one of the small ones 3. We will then release them in opposite
+ // order 3, 2, 1.
+ //
+ // Finally we will add in the last small file. This last file should push
+ // us over the 3 entry limit to trigger a cleanup. So LRU order is 3, 2, 1
+ // And we will only delete 2 entries so that should leave 1 un touched
+ // but 3 and 2 deleted
+
+ Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
+ Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
+ // Adding two more small files, so it triggers the number of sub directory
+ // limit but does not trigger the file size limit.
+ createTempFile(thirdCacheFile, 1);
+ createTempFile(fourthCacheFile, 1);
+
+ Path firstLocalCache = manager.getLocalCache(firstCacheFile.toUri(), conf2,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(firstCacheFile), false,
+ getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
+
+ Path secondLocalCache = manager.getLocalCache(secondCacheFile.toUri(), conf2,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(secondCacheFile), false,
+ getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false, false);
+
+ Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(thirdCacheFile), false,
+ getFileStamp(thirdCacheFile), new Path(TEST_ROOT_DIR), false, false);
+
+ manager.releaseCache(thirdCacheFile.toUri(), conf2,
+ getFileStamp(thirdCacheFile),
+ TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+
+ manager.releaseCache(secondCacheFile.toUri(), conf2,
+ getFileStamp(secondCacheFile),
+ TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+
+ manager.releaseCache(firstCacheFile.toUri(), conf2,
+ getFileStamp(firstCacheFile),
+ TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+
+
+ // Getting the fourth cache will make the number of sub directories becomes
+ // 4 which is greater than 3. So the released cache will be deleted.
+ manager.getLocalCache(fourthCacheFile.toUri(), conf2,
+ TaskTracker.getPrivateDistributedCacheDir(userName),
+ fs.getFileStatus(fourthCacheFile), false,
+ getFileStamp(fourthCacheFile), new Path(TEST_ROOT_DIR), false, false);
+
+ checkCacheDeletion(localfs, secondLocalCache, "DistributedCache failed " +
+ "deleting second cache LRU order");
+
+ checkCacheDeletion(localfs, thirdLocalCache,
+ "DistributedCache failed deleting third" +
+ " cache LRU order.");
+
+ checkCacheNOTDeletion(localfs, firstLocalCache, "DistributedCache failed " +
+ "Deleted first cache LRU order.");
+
+ checkCacheNOTDeletion(localfs, fourthCacheFile, "DistributedCache failed " +
+ "Deleted fourth cache LRU order.");
+ // Clean up the files created in this test
+ new File(thirdCacheFile.toString()).delete();
+ new File(fourthCacheFile.toString()).delete();
+ manager.stopCleanupThread();
+ }
/** test delete cache */
public void testDeleteCache() throws Exception {
@@ -561,7 +660,7 @@
conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString());
conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, LOCAL_CACHE_SUBDIR);
- conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, 200); // 200 ms
+ conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, CACHE_DELETE_PERIOD_MS);
refreshConf(conf2);
TrackerDistributedCacheManager manager =
new TrackerDistributedCacheManager(conf2, taskController);
@@ -620,6 +719,15 @@
}
/**
+ * Do a simple check to see if the file has NOT been deleted.
+ */
+ private void checkCacheNOTDeletion(FileSystem fs, Path cache, String msg)
+ throws Exception {
+ TimeUnit.MILLISECONDS.sleep(3 * CACHE_DELETE_PERIOD_MS);
+ assertTrue(msg, fs.exists(cache));
+ }
+
+ /**
* Periodically checks if a file is there, return if the file is no longer
* there. Fails the test if a files is there for 30 seconds.
*/
@@ -632,7 +740,7 @@
cacheExists = false;
break;
}
- TimeUnit.MILLISECONDS.sleep(100L);
+ TimeUnit.MILLISECONDS.sleep(CACHE_DELETE_PERIOD_MS);
}
// If the cache is still there after 5 minutes, test fails.
assertFalse(msg, cacheExists);