MAPREDUCE-2494. Order distributed cache deletions by LRU.
Contributed by Robert Joseph Evans


git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1127386 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 92791a0..06ce82b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,8 +130,11 @@
     HADOOP-7259. Contrib modules should include the build.properties from
     the enclosing hadoop directory. (omalley)
 
+    MAPREDUCE-2494. Order distributed cache deletions by LRU. (Robert Joseph
+    Evans via cdouglas)
+
   OPTIMIZATIONS
-    
+
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and
     JobInProgress.getCounters() aquire locks in a shorter time period.
     (Joydeep Sen Sarma via schen)
diff --git a/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java b/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
index 56bdf10..6a4d403 100644
--- a/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
+++ b/src/java/org/apache/hadoop/mapreduce/filecache/TrackerDistributedCacheManager.java
@@ -21,12 +21,12 @@
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
@@ -57,14 +57,19 @@
 @InterfaceAudience.Private
 public class TrackerDistributedCacheManager {
   // cacheID to cacheStatus mapping
-  private TreeMap<String, CacheStatus> cachedArchives = 
-    new TreeMap<String, CacheStatus>();
+  private LinkedHashMap<String, CacheStatus> cachedArchives =
+    new LinkedHashMap<String, CacheStatus>();
 
   // default total cache size (10GB)
   private static final long DEFAULT_CACHE_SIZE = 10737418240L;
   private static final long DEFAULT_CACHE_SUBDIR_LIMIT = 10000;
+  // This default of 75 % was just pulled out of the air, it should be set
+  // based off of read world use cases.
+  private static final float DEFAULT_CACHE_KEEP_AROUND_PCT = 0.75f;
   private long allowedCacheSize;
   private long allowedCacheSubdirs;
+  private long allowedCacheSizeCleanupGoal;
+  private long allowedCacheSubdirsCleanupGoal;
 
   private static final Log LOG =
     LogFactory.getLog(TrackerDistributedCacheManager.class);
@@ -96,6 +101,12 @@
       // setting the cache number of subdirectories limit to a default of 10000
     this.allowedCacheSubdirs = conf.getLong(
           TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, DEFAULT_CACHE_SUBDIR_LIMIT);
+    double cleanupPct = conf.getFloat(TTConfig.TT_LOCAL_CACHE_KEEP_AROUND_PCT,
+        DEFAULT_CACHE_KEEP_AROUND_PCT);
+    this.allowedCacheSizeCleanupGoal =
+      (long)(this.allowedCacheSize * cleanupPct);
+    this.allowedCacheSubdirsCleanupGoal =
+      (long)(this.allowedCacheSubdirs * cleanupPct);
     this.cleanupThread = new CleanupThread(conf);
   }
 
@@ -160,12 +171,12 @@
         Path localPath = lDirAllocator.getLocalPathForWrite(cachePath,
           fileStatus.getLen(), trackerConf);
         lcacheStatus = new CacheStatus(new Path(localPath.toString().replace(
-          cachePath, "")), localPath, new Path(subDir), uniqueString);
+          cachePath, "")), localPath, new Path(subDir), uniqueString, key);
         cachedArchives.put(key, lcacheStatus);
       }
 
       //mark the cache for use. 
-      lcacheStatus.refcount++;
+      lcacheStatus.incRefCount();
     }
     
     boolean initSuccessful = false;
@@ -190,9 +201,7 @@
       return localizedPath;
     } finally {
       if (!initSuccessful) {
-        synchronized (cachedArchives) {
-          lcacheStatus.refcount--;
-        }
+        lcacheStatus.decRefCount();
       }
     }
   }
@@ -219,7 +228,7 @@
       }
       
       // decrement ref count 
-      lcacheStatus.refcount--;
+      lcacheStatus.decRefCount();
     }
   }
 
@@ -234,7 +243,7 @@
       if (lcacheStatus == null) {
         throw new IOException("Cannot find localized cache: " + cache);
       }
-      return lcacheStatus.refcount;
+      return lcacheStatus.getRefCount();
     }
   }
   
@@ -533,11 +542,11 @@
     }
   }
 
-  static class CacheStatus {
+  class CacheStatus {
     //
     // This field should be accessed under global cachedArchives lock.
     //
-    int refcount;           // number of instances using this cache.
+    private int refcount;           // number of instances using this cache.
 
     //
     // The following three fields should be accessed under
@@ -559,9 +568,11 @@
     final Path localizedLoadPath;
     //the base dir where the cache lies
     final Path localizedBaseDir;
+    //The key of this in the cachedArchives.
+    private final String key;
 
     public CacheStatus(Path baseDir, Path localLoadPath, Path subDir,
-        String uniqueString) {
+        String uniqueString, String key) {
       super();
       this.localizedLoadPath = localLoadPath;
       this.refcount = 0;
@@ -570,6 +581,32 @@
       this.size = 0;
       this.subDir = subDir;
       this.uniqueString = uniqueString;
+      this.key = key;
+    }
+
+    public synchronized void incRefCount() {
+      refcount += 1;
+    }
+
+    public void decRefCount() {
+      synchronized (cachedArchives) {
+        synchronized (this) {
+          refcount -= 1;
+          if(refcount <= 0) {
+            String key = this.key;
+            cachedArchives.remove(key);
+            cachedArchives.put(key, this);
+          }
+        }
+      }
+    }
+
+    public int getRefCount() {
+      return refcount;
+    }
+
+    public synchronized boolean isUsed() {
+      return refcount > 0;
     }
     
     Path getBaseDir(){
@@ -699,25 +736,20 @@
     }
   }
 
+  private class CacheDir {
+    long size;
+    long subdirs;
+  }
+
   /**
    * This class holds properties of each base directories and is responsible
    * for clean up unused cache files in base directories.
    */
   protected class BaseDirManager {
-    private class CacheDir {
-      long size;
-      long subdirs;
-    }
+
     private TreeMap<Path, CacheDir> properties =
       new TreeMap<Path, CacheDir>();
 
-    private long getDirSize(Path p) {
-      return properties.get(p).size;
-    }
-    private long getDirSubdirs(Path p) {
-      return properties.get(p).subdirs;
-    }
-
     /**
      * Check each base directory to see if the size or number of subdirectories
      * are exceed the limit. If the limit is exceeded, start deleting caches
@@ -725,23 +757,33 @@
      */
     public void checkAndCleanup() throws IOException {
       Collection<CacheStatus> toBeDeletedCache = new LinkedList<CacheStatus>();
-      Set<Path> toBeCleanedBaseDir = new HashSet<Path>();
+      HashMap<Path, CacheDir> toBeCleanedBaseDir =
+        new HashMap<Path, CacheDir>();
       synchronized (properties) {
-        for (Path baseDir : properties.keySet()) {
-          if (allowedCacheSize < getDirSize(baseDir) ||
-              allowedCacheSubdirs < getDirSubdirs(baseDir)) {
-            toBeCleanedBaseDir.add(baseDir);
+        for (Map.Entry<Path, CacheDir> baseDir : properties.entrySet()) {
+          CacheDir baseDirCounts = baseDir.getValue();
+          if (allowedCacheSize < baseDirCounts.size ||
+              allowedCacheSubdirs < baseDirCounts.subdirs) {
+            CacheDir tcc = new CacheDir();
+            tcc.size = baseDirCounts.size - allowedCacheSizeCleanupGoal;
+            tcc.subdirs = baseDirCounts.subdirs - allowedCacheSubdirsCleanupGoal;
+            toBeCleanedBaseDir.put(baseDir.getKey(), tcc);
           }
         }
       }
       synchronized (cachedArchives) {
-        for (Iterator<String> it = cachedArchives.keySet().iterator();
-        it.hasNext();) {
-          String cacheId = it.next();
+        for (Iterator<Map.Entry<String, CacheStatus>> it =
+             cachedArchives.entrySet().iterator(); it.hasNext();) {
+          Map.Entry<String, CacheStatus> entry = it.next();
+          String cacheId = entry.getKey();
           CacheStatus cacheStatus = cachedArchives.get(cacheId);
-          if (toBeCleanedBaseDir.contains(cacheStatus.getBaseDir())) {
+          CacheDir leftToClean = toBeCleanedBaseDir.get(cacheStatus.getBaseDir());
+          if (leftToClean != null
+              && (leftToClean.size > 0 || leftToClean.subdirs > 0)) {
             // if reference count is zero mark the cache for deletion
-            if (cacheStatus.refcount == 0) {
+            if (!cacheStatus.isUsed()) {
+              leftToClean.size -= cacheStatus.size;
+              leftToClean.subdirs--;
               // delete this cache entry from the global list 
               // and mark the localized file for deletion
               toBeDeletedCache.add(cacheStatus);
@@ -760,6 +802,7 @@
         }
       }
     }
+
     /**
      * Decrement the size and sub directory count of the cache from baseDirSize
      * and baseDirNumberSubDir. Have to synchronize cacheStatus before calling
@@ -792,6 +835,7 @@
      */
     public void addCacheUpdate(CacheStatus cacheStatus) {
       long cacheSize = cacheStatus.size;
+      LOG.info("Adding in cache "+cacheStatus.localizedLoadPath+" at "+cacheStatus.localizedBaseDir+" size:"+cacheStatus.size);
       synchronized (properties) {
         CacheDir cacheDir = properties.get(cacheStatus.getBaseDir());
         if (cacheDir != null) {
diff --git a/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java b/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
index 9d723a0..d9bd67d 100644
--- a/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
+++ b/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
@@ -99,4 +99,11 @@
     "mapreduce.tasktracker.userlogcleanup.sleeptime";
   public static final String TT_DISTRIBUTED_CACHE_CHECK_PERIOD =
     "mapreduce.tasktracker.distributedcache.checkperiod";
+  /**
+   * Percentage of the local distributed cache that should be kept in between
+   * garbage collection.
+   */
+  public static final String TT_LOCAL_CACHE_KEEP_AROUND_PCT =
+    "mapreduce.tasktracker.cache.local.keep.pct";
+
 }
diff --git a/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java b/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
index 50df192..dcfa7f5 100644
--- a/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
+++ b/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java
@@ -41,7 +41,6 @@
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -545,6 +544,106 @@
     return fileStatus.getModificationTime();
   }
   
+  public static final long CACHE_DELETE_PERIOD_MS = 100l;
+
+  /** test delete cache */
+  public void testLRUDeleteCache() throws Exception {
+    if (!canRun()) {
+      return;
+    }
+    // This test needs MRConfig.LOCAL_DIR to be single directory
+    // instead of four, because it assumes that both
+    // firstcachefile and secondcachefile will be localized on same directory
+    // so that second localization triggers deleteCache.
+    // If MRConfig.LOCAL_DIR is four directories, second localization might not
+    // trigger deleteCache, if it is localized in different directory.
+    Configuration conf2 = new Configuration(conf);
+    conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString());
+    //Make it larger then expected
+    conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, 21 * 1024l);
+    conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, 3);
+    //The goal is to get down to 15.75K and 2 dirs
+    conf2.setFloat(TTConfig.TT_LOCAL_CACHE_KEEP_AROUND_PCT, 0.75f);
+    conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, CACHE_DELETE_PERIOD_MS);
+    refreshConf(conf2);
+    TrackerDistributedCacheManager manager =
+        new TrackerDistributedCacheManager(conf2, taskController);
+    manager.startCleanupThread();
+    FileSystem localfs = FileSystem.getLocal(conf2);
+    String userName = getJobOwnerName();
+    conf2.set(MRJobConfig.USER_NAME, userName);
+
+    //Here we are testing the LRU.  In this case we will add in 4 cache entries
+    // 2 of them are 8k each and 2 of them are very small.  We want to verify
+    // That they are deleted in LRU order.
+    // So what we will do is add in the two large files first, 1 then 2, and
+    // then one of the small ones 3.  We will then release them in opposite
+    // order 3, 2, 1.
+    //
+    // Finally we will add in the last small file.  This last file should push
+    // us over the 3 entry limit to trigger a cleanup.  So LRU order is 3, 2, 1
+    // And we will only delete 2 entries so that should leave 1 un touched
+    // but 3 and 2 deleted
+
+    Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile");
+    Path fourthCacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile");
+    // Adding two more small files, so it triggers the number of sub directory
+    // limit but does not trigger the file size limit.
+    createTempFile(thirdCacheFile, 1);
+    createTempFile(fourthCacheFile, 1);
+
+    Path firstLocalCache = manager.getLocalCache(firstCacheFile.toUri(), conf2,
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(firstCacheFile), false,
+        getFileStamp(firstCacheFile), new Path(TEST_ROOT_DIR), false, false);
+
+    Path secondLocalCache = manager.getLocalCache(secondCacheFile.toUri(), conf2,
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(secondCacheFile), false,
+        getFileStamp(secondCacheFile), new Path(TEST_ROOT_DIR), false, false);
+
+    Path thirdLocalCache = manager.getLocalCache(thirdCacheFile.toUri(), conf2,
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(thirdCacheFile), false,
+        getFileStamp(thirdCacheFile), new Path(TEST_ROOT_DIR), false, false);
+
+    manager.releaseCache(thirdCacheFile.toUri(), conf2,
+        getFileStamp(thirdCacheFile),
+        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+
+    manager.releaseCache(secondCacheFile.toUri(), conf2,
+        getFileStamp(secondCacheFile),
+        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+
+    manager.releaseCache(firstCacheFile.toUri(), conf2,
+        getFileStamp(firstCacheFile),
+        TrackerDistributedCacheManager.getLocalizedCacheOwner(false));
+
+
+    // Getting the fourth cache will make the number of sub directories becomes
+    // 4 which is greater than 3. So the released cache will be deleted.
+    manager.getLocalCache(fourthCacheFile.toUri(), conf2,
+        TaskTracker.getPrivateDistributedCacheDir(userName),
+        fs.getFileStatus(fourthCacheFile), false,
+        getFileStamp(fourthCacheFile), new Path(TEST_ROOT_DIR), false, false);
+
+    checkCacheDeletion(localfs, secondLocalCache, "DistributedCache failed " +
+    "deleting second cache LRU order");
+
+    checkCacheDeletion(localfs, thirdLocalCache,
+    "DistributedCache failed deleting third" +
+    " cache LRU order.");
+
+    checkCacheNOTDeletion(localfs, firstLocalCache, "DistributedCache failed " +
+    "Deleted first cache LRU order.");
+
+    checkCacheNOTDeletion(localfs, fourthCacheFile, "DistributedCache failed " +
+    "Deleted fourth cache LRU order.");
+    // Clean up the files created in this test
+    new File(thirdCacheFile.toString()).delete();
+    new File(fourthCacheFile.toString()).delete();
+    manager.stopCleanupThread();
+  }
 
   /** test delete cache */
   public void testDeleteCache() throws Exception {
@@ -561,7 +660,7 @@
     conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString());
     conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT);
     conf2.setLong(TTConfig.TT_LOCAL_CACHE_SUBDIRS_LIMIT, LOCAL_CACHE_SUBDIR);
-    conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, 200); // 200 ms
+    conf2.setLong(TTConfig.TT_DISTRIBUTED_CACHE_CHECK_PERIOD, CACHE_DELETE_PERIOD_MS);
     refreshConf(conf2);
     TrackerDistributedCacheManager manager = 
         new TrackerDistributedCacheManager(conf2, taskController);
@@ -620,6 +719,15 @@
   }
 
   /**
+   * Do a simple check to see if the file has NOT been deleted.
+   */
+  private void checkCacheNOTDeletion(FileSystem fs, Path cache, String msg)
+    throws Exception {
+    TimeUnit.MILLISECONDS.sleep(3 * CACHE_DELETE_PERIOD_MS);
+    assertTrue(msg, fs.exists(cache));
+  }
+
+  /**
    * Periodically checks if a file is there, return if the file is no longer
    * there. Fails the test if a files is there for 30 seconds.
    */
@@ -632,7 +740,7 @@
         cacheExists = false;
         break;
       }
-      TimeUnit.MILLISECONDS.sleep(100L);
+      TimeUnit.MILLISECONDS.sleep(CACHE_DELETE_PERIOD_MS);
     }
     // If the cache is still there after 5 minutes, test fails.
     assertFalse(msg, cacheExists);