YARN-11176. Refactor TestAggregatedLogDeletionService. Contributed by Szilard Nemeth.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
index daa2fc6..f855f91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
@@ -44,156 +44,172 @@
 import org.junit.Test;
 import org.junit.Assert;
 
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
 public class TestAggregatedLogDeletionService {
-  
+
+  private static final String T_FILE = "TFile";
+  private static final String USER_ME = "me";
+  private static final String DIR_HOST1 = "host1";
+  private static final String DIR_HOST2 = "host2";
+
+  private static final String ROOT = "mockfs://foo/";
+  private static final String REMOTE_ROOT_LOG_DIR = ROOT + "tmp/logs";
+  private static final String SUFFIX = "logs";
+  private static final String NEW_SUFFIX = LogAggregationUtils.getBucketSuffix() + SUFFIX;
+  private static final int TEN_DAYS_IN_SECONDS = 10 * 24 * 3600;
+
+  private static PathWithFileStatus createPathWithFileStatusForAppId(Path remoteRootLogDir,
+                                                                     ApplicationId appId,
+                                                                     String user, String suffix,
+                                                                     long modificationTime) {
+    Path path = LogAggregationUtils.getRemoteAppLogDir(
+            remoteRootLogDir, appId, user, suffix);
+    FileStatus fileStatus = createEmptyFileStatus(modificationTime, path);
+    return new PathWithFileStatus(path, fileStatus);
+  }
+
+  private static FileStatus createEmptyFileStatus(long modificationTime, Path path) {
+    return new FileStatus(0, true, 0, 0, modificationTime, path);
+  }
+
+  private static PathWithFileStatus createFileLogPathWithFileStatus(Path baseDir, String childDir,
+                                                                    long modificationTime) {
+    Path logPath = new Path(baseDir, childDir);
+    FileStatus fStatus = createFileStatusWithLengthForFile(10, modificationTime, logPath);
+    return new PathWithFileStatus(logPath, fStatus);
+  }
+
+  private static PathWithFileStatus createDirLogPathWithFileStatus(Path baseDir, String childDir,
+                                                                   long modificationTime) {
+    Path logPath = new Path(baseDir, childDir);
+    FileStatus fStatus = createFileStatusWithLengthForDir(10, modificationTime, logPath);
+    return new PathWithFileStatus(logPath, fStatus);
+  }
+
+  private static PathWithFileStatus createDirBucketDirLogPathWithFileStatus(Path remoteRootLogPath,
+                                                                            String user,
+                                                                            String suffix,
+                                                                            ApplicationId appId,
+                                                                            long modificationTime) {
+    Path bucketDir = LogAggregationUtils.getRemoteBucketDir(remoteRootLogPath, user, suffix, appId);
+    FileStatus fStatus = new FileStatus(0, true, 0, 0, modificationTime, bucketDir);
+    return new PathWithFileStatus(bucketDir, fStatus);
+  }
+
+  private static FileStatus createFileStatusWithLengthForFile(long length,
+                                                              long modificationTime,
+                                                              Path logPath) {
+    return new FileStatus(length, false, 1, 1, modificationTime, logPath);
+  }
+
+  private static FileStatus createFileStatusWithLengthForDir(long length,
+                                                             long modificationTime,
+                                                             Path logPath) {
+    return new FileStatus(length, true, 1, 1, modificationTime, logPath);
+  }
+
   @Before
   public void closeFilesystems() throws IOException {
     // prevent the same mockfs instance from being reused due to FS cache
     FileSystem.closeAll();
   }
 
+  private Configuration setupConfiguration(int retainSeconds, int retainCheckIntervalSeconds) {
+    Configuration conf = new Configuration();
+    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+    conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, retainSeconds);
+    conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+            retainCheckIntervalSeconds);
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, REMOTE_ROOT_LOG_DIR);
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, SUFFIX);
+    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, T_FILE);
+    conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, T_FILE),
+            LogAggregationTFileController.class.getName());
+    return conf;
+  }
+
   @Test
   public void testDeletion() throws Exception {
     long now = System.currentTimeMillis();
-    long toDeleteTime = now - (2000*1000);
-    long toKeepTime = now - (1500*1000);
-    String root = "mockfs://foo/";
-    String remoteRootLogDir = root+"tmp/logs";
-    String suffix = "logs";
-    String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
-    final Configuration conf = new Configuration();
-    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
-    conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
-         LogAggregationTFileController.class.getName());
+    long toDeleteTime = now - (2000 * 1000);
+    long toKeepTime = now - (1500 * 1000);
 
+    Configuration conf = setupConfiguration(1800, -1);
 
-    Path rootPath = new Path(root);
+    Path rootPath = new Path(ROOT);
     FileSystem rootFs = rootPath.getFileSystem(conf);
     FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
     
-    Path remoteRootLogPath = new Path(remoteRootLogDir);
-    
-    Path userDir = new Path(remoteRootLogPath, "me");
-    FileStatus userDirStatus = new FileStatus(0, true, 0, 0, toKeepTime, userDir); 
-    
-    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
-        new FileStatus[]{userDirStatus});
+    Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
+    PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
+            toKeepTime);
 
-    ApplicationId appId1 =
-        ApplicationId.newInstance(now, 1);
-    Path suffixDir = new Path(userDir, newSuffix);
-    FileStatus suffixDirStatus = new FileStatus(0, true,
-        0, 0, toDeleteTime, suffixDir);
-    Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
-        remoteRootLogPath, "me", suffix, appId1);
-    FileStatus bucketDirStatus = new FileStatus(0, true, 0,
-        0, toDeleteTime, bucketDir);
-    Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogPath, appId1, "me", suffix);
-    FileStatus app1DirStatus = new FileStatus(0, true, 0, 0,
-        toDeleteTime, app1Dir);
-    
-    ApplicationId appId2 =
-        ApplicationId.newInstance(now, 2);
-    Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogPath, appId2, "me", suffix);
-    FileStatus app2DirStatus = new FileStatus(0, true, 0, 0,
-        toDeleteTime, app2Dir);
-    
-    ApplicationId appId3 =
-        ApplicationId.newInstance(now, 3);
-    Path app3Dir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogPath, appId3, "me", suffix);
-    FileStatus app3DirStatus = new FileStatus(0, true, 0, 0,
-        toDeleteTime, app3Dir);
-    
-    ApplicationId appId4 =
-        ApplicationId.newInstance(now, 4);
-    Path app4Dir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogPath, appId4, "me", suffix);
-    FileStatus app4DirStatus =
-        new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
+    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
 
-    when(mockFs.listStatus(userDir)).thenReturn(
-        new FileStatus[] {suffixDirStatus});
-    when(mockFs.listStatus(suffixDir)).thenReturn(
-        new FileStatus[] {bucketDirStatus});
-    when(mockFs.listStatus(bucketDir)).thenReturn(
-        new FileStatus[] {app1DirStatus, app2DirStatus,
-            app3DirStatus, app4DirStatus});
-    
-    when(mockFs.listStatus(app1Dir)).thenReturn(
-        new FileStatus[]{});
+    ApplicationId appId1 = ApplicationId.newInstance(now, 1);
+    ApplicationId appId2 = ApplicationId.newInstance(now, 2);
+    ApplicationId appId3 = ApplicationId.newInstance(now, 3);
+    ApplicationId appId4 = ApplicationId.newInstance(now, 4);
 
+    PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
+            toDeleteTime);
+    PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(remoteRootLogPath, SUFFIX,
+            toDeleteTime);
 
-    Path app2Log1 = new Path(app2Dir, "host1");
-    FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1);
-    
-    Path app2Log2 = new Path(app2Dir, "host2");
-    FileStatus app2Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app2Log2);
-    
-    when(mockFs.listStatus(app2Dir)).thenReturn(
-        new FileStatus[]{app2Log1Status, app2Log2Status});
-    
-    Path app3Log1 = new Path(app3Dir, "host1");
-    FileStatus app3Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log1);
-    
-    Path app3Log2 = new Path(app3Dir, "host2");
-    FileStatus app3Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app3Log2);
-    
-    when(mockFs.delete(app3Dir, true)).thenThrow(new AccessControlException("Injected Error\nStack Trace :("));
-    
-    when(mockFs.listStatus(app3Dir)).thenReturn(
-        new FileStatus[]{app3Log1Status, app3Log2Status});
-    
-    Path app4Log1 = new Path(app4Dir, "host1");
-    FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1);
-    
-    Path app4Log2 = new Path(app4Dir, "host2");
-    FileStatus app4Log2Status = new FileStatus(10, false, 1, 1,
-        toKeepTime, app4Log2);
+    PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
+            USER_ME, SUFFIX, toDeleteTime);
+    PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
+            USER_ME, SUFFIX, toDeleteTime);
+    PathWithFileStatus app3 = createPathWithFileStatusForAppId(remoteRootLogPath, appId3,
+            USER_ME, SUFFIX, toDeleteTime);
+    PathWithFileStatus app4 = createPathWithFileStatusForAppId(remoteRootLogPath, appId4,
+            USER_ME, SUFFIX, toDeleteTime);
 
-    when(mockFs.listStatus(app4Dir)).thenReturn(
-        new FileStatus[]{app4Log1Status, app4Log2Status});
+    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
+    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
+    when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {
+        app1.fileStatus, app2.fileStatus, app3.fileStatus, app4.fileStatus});
+    
+    PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
+        toDeleteTime);
+    PathWithFileStatus app2Log2 = createFileLogPathWithFileStatus(app2.path, DIR_HOST2, toKeepTime);
+    PathWithFileStatus app3Log1 = createFileLogPathWithFileStatus(app3.path, DIR_HOST1,
+        toDeleteTime);
+    PathWithFileStatus app3Log2 = createFileLogPathWithFileStatus(app3.path, DIR_HOST2,
+        toDeleteTime);
+    PathWithFileStatus app4Log1 = createFileLogPathWithFileStatus(app4.path, DIR_HOST1,
+        toDeleteTime);
+    PathWithFileStatus app4Log2 = createFileLogPathWithFileStatus(app4.path, DIR_HOST2, toKeepTime);
 
-    final List<ApplicationId> finishedApplications =
-        Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3));
-    final List<ApplicationId> runningApplications =
-        Collections.unmodifiableList(Arrays.asList(appId4));
+    when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{});
+    when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{app2Log1.fileStatus,
+        app2Log2.fileStatus});
+    when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log1.fileStatus,
+        app3Log2.fileStatus});
+    when(mockFs.listStatus(app4.path)).thenReturn(new FileStatus[]{app4Log1.fileStatus,
+        app4Log2.fileStatus});
+    when(mockFs.delete(app3.path, true)).thenThrow(
+            new AccessControlException("Injected Error\nStack Trace :("));
+
+    final List<ApplicationId> finishedApplications = Collections.unmodifiableList(
+            Arrays.asList(appId1, appId2, appId3));
+    final List<ApplicationId> runningApplications = Collections.singletonList(appId4);
 
     AggregatedLogDeletionService deletionService =
-        new AggregatedLogDeletionService() {
-          @Override
-          protected ApplicationClientProtocol createRMClient()
-              throws IOException {
-            try {
-              return createMockRMClient(finishedApplications,
-                runningApplications);
-            } catch (Exception e) {
-              throw new IOException(e);
-            }
-          }
-          @Override
-          protected void stopRMClient() {
-            // DO NOTHING
-          }
-        };
+            new AggregatedLogDeletionServiceForTest(runningApplications, finishedApplications);
     deletionService.init(conf);
     deletionService.start();
 
-    verify(mockFs, timeout(2000)).delete(app1Dir, true);
-    verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
-    verify(mockFs, timeout(2000)).delete(app3Dir, true);
-    verify(mockFs, timeout(2000).times(0)).delete(app4Dir, true);
-    verify(mockFs, timeout(2000)).delete(app4Log1, true);
-    verify(mockFs, timeout(2000).times(0)).delete(app4Log2, true);
+    int timeout = 2000;
+    verify(mockFs, timeout(timeout)).delete(app1.path, true);
+    verify(mockFs, timeout(timeout).times(0)).delete(app2.path, true);
+    verify(mockFs, timeout(timeout)).delete(app3.path, true);
+    verify(mockFs, timeout(timeout).times(0)).delete(app4.path, true);
+    verify(mockFs, timeout(timeout)).delete(app4Log1.path, true);
+    verify(mockFs, timeout(timeout).times(0)).delete(app4Log2.path, true);
 
     deletionService.stop();
   }
@@ -201,357 +217,216 @@
   @Test
   public void testRefreshLogRetentionSettings() throws Exception {
     long now = System.currentTimeMillis();
-    //time before 2000 sec
     long before2000Secs = now - (2000 * 1000);
-    //time before 50 sec
     long before50Secs = now - (50 * 1000);
-    String root = "mockfs://foo/";
-    String remoteRootLogDir = root + "tmp/logs";
-    String suffix = "logs";
-    String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
-    final Configuration conf = new Configuration();
-    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
-        "1");
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
-    conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
-         LogAggregationTFileController.class.getName());
+    int checkIntervalSeconds = 2;
+    int checkIntervalMilliSeconds = checkIntervalSeconds * 1000;
 
+    Configuration conf = setupConfiguration(1800, 1);
 
-    Path rootPath = new Path(root);
+    Path rootPath = new Path(ROOT);
     FileSystem rootFs = rootPath.getFileSystem(conf);
     FileSystem mockFs = ((FilterFileSystem) rootFs).getRawFileSystem();
 
-    Path remoteRootLogPath = new Path(remoteRootLogDir);
+    ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
 
-    Path userDir = new Path(remoteRootLogPath, "me");
-    FileStatus userDirStatus = new FileStatus(0, true, 0, 0, before50Secs,
-        userDir);
+    Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
 
-    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
-        new FileStatus[] { userDirStatus });
+    PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME,
+            before50Secs);
+    PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX,
+            before50Secs);
+    PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
+            USER_ME, SUFFIX, appId1, before50Secs);
 
-    Path suffixDir = new Path(userDir, newSuffix);
-    FileStatus suffixStatus = new FileStatus(0, true, 0, 0, before50Secs,
-        suffixDir);
+    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[] {userDir.fileStatus});
 
-    ApplicationId appId1 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
     //Set time last modified of app1Dir directory and its files to before2000Secs 
-    Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogPath, appId1, "me", suffix);
-    Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
-        remoteRootLogPath, "me", suffix, appId1);
-    FileStatus bucketDirStatus = new FileStatus(0, true, 0,
-        0, before50Secs, bucketDir);
-    FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
-        app1Dir);
-    
-    ApplicationId appId2 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 2);
-    //Set time last modified of app1Dir directory and its files to before50Secs 
-    Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogPath, appId2, "me", suffix);
-    FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
-        app2Dir);
+    PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
+            USER_ME, SUFFIX, before2000Secs);
 
-    when(mockFs.listStatus(userDir)).thenReturn(
-        new FileStatus[] {suffixStatus });
-    when(mockFs.listStatus(suffixDir)).thenReturn(
-        new FileStatus[] {bucketDirStatus });
-    when(mockFs.listStatus(bucketDir)).thenReturn(
-        new FileStatus[] {app1DirStatus, app2DirStatus });
+    //Set time last modified of app1Dir directory and its files to before50Secs
+    PathWithFileStatus app2 = createPathWithFileStatusForAppId(remoteRootLogPath, appId2,
+            USER_ME, SUFFIX, before50Secs);
 
-    Path app1Log1 = new Path(app1Dir, "host1");
-    FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
-        app1Log1);
+    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
+    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
+    when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{app1.fileStatus,
+            app2.fileStatus});
 
-    when(mockFs.listStatus(app1Dir)).thenReturn(
-        new FileStatus[] { app1Log1Status });
+    PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1,
+            before2000Secs);
+    PathWithFileStatus app2Log1 = createFileLogPathWithFileStatus(app2.path, DIR_HOST1,
+            before50Secs);
 
-    Path app2Log1 = new Path(app2Dir, "host1");
-    FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, before50Secs,
-        app2Log1);
-
-    when(mockFs.listStatus(app2Dir)).thenReturn(
-        new FileStatus[] { app2Log1Status });
+    when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[] {app1Log1.fileStatus});
+    when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[] {app2Log1.fileStatus});
 
     final List<ApplicationId> finishedApplications =
         Collections.unmodifiableList(Arrays.asList(appId1, appId2));
 
-    AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionService() {
-      @Override
-      protected Configuration createConf() {
-        return conf;
-      }
-      @Override
-      protected ApplicationClientProtocol createRMClient()
-          throws IOException {
-        try {
-          return createMockRMClient(finishedApplications, null);
-        } catch (Exception e) {
-          throw new IOException(e);
-        }
-      }
-      @Override
-      protected void stopRMClient() {
-        // DO NOTHING
-      }
-    };
+    AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
+            finishedApplications, conf);
     
     deletionSvc.init(conf);
     deletionSvc.start();
     
     //app1Dir would be deleted since its done above log retention period
-    verify(mockFs, timeout(10000)).delete(app1Dir, true);
-    //app2Dir is not expected to be deleted since its below the threshold
-    verify(mockFs, timeout(3000).times(0)).delete(app2Dir, true);
+    verify(mockFs, timeout(10000)).delete(app1.path, true);
+    //app2Dir is not expected to be deleted since it is below the threshold
+    verify(mockFs, timeout(3000).times(0)).delete(app2.path, true);
 
-    //Now,lets change the confs
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "50");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
-        "2");
+    //Now, let's change the confs
+    conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, 50);
+    conf.setInt(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
+            checkIntervalSeconds);
     //We have not called refreshLogSettings,hence don't expect to see the changed conf values
-    Assert.assertTrue(2000l != deletionSvc.getCheckIntervalMsecs());
+    assertTrue(checkIntervalMilliSeconds != deletionSvc.getCheckIntervalMsecs());
     
     //refresh the log settings
     deletionSvc.refreshLogRetentionSettings();
 
     //Check interval time should reflect the new value
-    Assert.assertTrue(2000l == deletionSvc.getCheckIntervalMsecs());
+    Assert.assertEquals(checkIntervalMilliSeconds, deletionSvc.getCheckIntervalMsecs());
     //app2Dir should be deleted since it falls above the threshold
-    verify(mockFs, timeout(10000)).delete(app2Dir, true);
+    verify(mockFs, timeout(10000)).delete(app2.path, true);
     deletionSvc.stop();
   }
   
   @Test
   public void testCheckInterval() throws Exception {
-    long RETENTION_SECS = 10 * 24 * 3600;
     long now = System.currentTimeMillis();
-    long toDeleteTime = now - RETENTION_SECS*1000;
+    long toDeleteTime = now - TEN_DAYS_IN_SECONDS * 1000;
 
-    String root = "mockfs://foo/";
-    String remoteRootLogDir = root+"tmp/logs";
-    String suffix = "logs";
-    String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
-    Configuration conf = new Configuration();
-    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1");
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
-    conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
-         LogAggregationTFileController.class.getName());
-
+    Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
 
     // prevent us from picking up the same mockfs instance from another test
     FileSystem.closeAll();
-    Path rootPath = new Path(root);
+    Path rootPath = new Path(ROOT);
     FileSystem rootFs = rootPath.getFileSystem(conf);
     FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
 
-    Path remoteRootLogPath = new Path(remoteRootLogDir);
+    Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
 
-    Path userDir = new Path(remoteRootLogPath, "me");
-    FileStatus userDirStatus = new FileStatus(0, true, 0, 0, now, userDir);
+    PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, now);
+    PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, now);
 
-    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
-        new FileStatus[]{userDirStatus});
+    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
 
-    ApplicationId appId1 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    Path suffixDir = new Path(userDir, newSuffix);
-    FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, now,
-        suffixDir);
-    Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
-        remoteRootLogPath, "me", suffix, appId1);
-    Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogPath, appId1, "me", suffix);
-    FileStatus bucketDirStatus = new FileStatus(0, true, 0,
-        0, now, bucketDir);
+    ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    PathWithFileStatus bucketDir = createDirBucketDirLogPathWithFileStatus(remoteRootLogPath,
+            USER_ME, SUFFIX, appId1, now);
 
-    FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
+    PathWithFileStatus app1 = createPathWithFileStatusForAppId(remoteRootLogPath, appId1,
+            USER_ME, SUFFIX, now);
+    PathWithFileStatus app1Log1 = createFileLogPathWithFileStatus(app1.path, DIR_HOST1, now);
 
-    when(mockFs.listStatus(userDir)).thenReturn(
-        new FileStatus[] {suffixDirStatus});
-    when(mockFs.listStatus(suffixDir)).thenReturn(
-        new FileStatus[] {bucketDirStatus});
-    when(mockFs.listStatus(bucketDir)).thenReturn(
-        new FileStatus[] {app1DirStatus});
+    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
+    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus});
+    when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus});
+    when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
 
-    Path app1Log1 = new Path(app1Dir, "host1");
-    FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1);
+    final List<ApplicationId> finishedApplications = Collections.singletonList(appId1);
 
-    when(mockFs.listStatus(app1Dir)).thenReturn(
-        new FileStatus[]{app1Log1Status});
-
-    final List<ApplicationId> finishedApplications =
-        Collections.unmodifiableList(Arrays.asList(appId1));
-
-    AggregatedLogDeletionService deletionSvc =
-        new AggregatedLogDeletionService() {
-      @Override
-      protected ApplicationClientProtocol createRMClient()
-          throws IOException {
-        try {
-          return createMockRMClient(finishedApplications, null);
-        } catch (Exception e) {
-          throw new IOException(e);
-        }
-      }
-      @Override
-      protected void stopRMClient() {
-        // DO NOTHING
-      }
-    };
+    AggregatedLogDeletionService deletionSvc = new AggregatedLogDeletionServiceForTest(null,
+            finishedApplications);
     deletionSvc.init(conf);
     deletionSvc.start();
  
     verify(mockFs, timeout(10000).atLeast(4)).listStatus(any(Path.class));
-    verify(mockFs, never()).delete(app1Dir, true);
+    verify(mockFs, never()).delete(app1.path, true);
 
-    // modify the timestamp of the logs and verify it's picked up quickly
-    bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir);
-    app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
-    app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1);
-    when(mockFs.listStatus(userDir)).thenReturn(
-        new FileStatus[] {suffixDirStatus});
-    when(mockFs.listStatus(suffixDir)).thenReturn(
-        new FileStatus[] {bucketDirStatus });
-    when(mockFs.listStatus(bucketDir)).thenReturn(
-        new FileStatus[] {app1DirStatus });
-    when(mockFs.listStatus(app1Dir)).thenReturn(
-        new FileStatus[]{app1Log1Status});
+    // modify the timestamp of the logs and verify if it's picked up quickly
+    app1.changeModificationTime(toDeleteTime);
+    app1Log1.changeModificationTime(toDeleteTime);
+    bucketDir.changeModificationTime(toDeleteTime);
+    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[] {suffixDir.fileStatus});
+    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[] {bucketDir.fileStatus });
+    when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[] {app1.fileStatus });
+    when(mockFs.listStatus(app1.path)).thenReturn(new FileStatus[]{app1Log1.fileStatus});
 
-    verify(mockFs, timeout(10000)).delete(app1Dir, true);
+    verify(mockFs, timeout(10000)).delete(app1.path, true);
 
     deletionSvc.stop();
   }
 
   @Test
   public void testRobustLogDeletion() throws Exception {
-    final long RETENTION_SECS = 10 * 24 * 3600;
-
-    String root = "mockfs://foo/";
-    String remoteRootLogDir = root+"tmp/logs";
-    String suffix = "logs";
-    String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
-    Configuration conf = new Configuration();
-    conf.setClass("fs.mockfs.impl", MockFileSystem.class,
-        FileSystem.class);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "864000");
-    conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS,
-        "1");
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
-    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
-    conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
-    conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
-         LogAggregationTFileController.class.getName());
+    Configuration conf = setupConfiguration(TEN_DAYS_IN_SECONDS, 1);
 
     // prevent us from picking up the same mockfs instance from another test
     FileSystem.closeAll();
-    Path rootPath = new Path(root);
+    Path rootPath = new Path(ROOT);
     FileSystem rootFs = rootPath.getFileSystem(conf);
     FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
 
-    Path remoteRootLogPath = new Path(remoteRootLogDir);
+    Path remoteRootLogPath = new Path(REMOTE_ROOT_LOG_DIR);
 
-    Path userDir = new Path(remoteRootLogPath, "me");
-    Path suffixDir = new Path(userDir, newSuffix);
-    FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir);
-    FileStatus suffixStatus = new FileStatus(0, true, 0, 0, 0, suffixDir);
-    Path bucketDir = new Path(suffixDir, String.valueOf(0));
-    FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, 0, bucketDir);
+    PathWithFileStatus userDir = createDirLogPathWithFileStatus(remoteRootLogPath, USER_ME, 0);
+    PathWithFileStatus suffixDir = createDirLogPathWithFileStatus(userDir.path, NEW_SUFFIX, 0);
+    PathWithFileStatus bucketDir = createDirLogPathWithFileStatus(suffixDir.path, "0", 0);
 
-    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
-        new FileStatus[]{userDirStatus});
-    when(mockFs.listStatus(userDir)).thenReturn(
-        new FileStatus[]{suffixStatus});
-    when(mockFs.listStatus(suffixDir)).thenReturn(
-        new FileStatus[]{bucketDirStatus});
+    when(mockFs.listStatus(remoteRootLogPath)).thenReturn(new FileStatus[]{userDir.fileStatus});
+    when(mockFs.listStatus(userDir.path)).thenReturn(new FileStatus[]{suffixDir.fileStatus});
+    when(mockFs.listStatus(suffixDir.path)).thenReturn(new FileStatus[]{bucketDir.fileStatus});
 
-    ApplicationId appId1 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    Path app1Dir = new Path(bucketDir, appId1.toString());
-    FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir);
-    ApplicationId appId2 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 2);
-    Path app2Dir = new Path(bucketDir, "application_a");
-    FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir);
-    ApplicationId appId3 =
-        ApplicationId.newInstance(System.currentTimeMillis(), 3);
-    Path app3Dir = new Path(bucketDir, appId3.toString());
-    FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir);
+    ApplicationId appId1 = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationId appId2 = ApplicationId.newInstance(System.currentTimeMillis(), 2);
+    ApplicationId appId3 = ApplicationId.newInstance(System.currentTimeMillis(), 3);
 
-    when(mockFs.listStatus(bucketDir)).thenReturn(
-        new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus});
-    when(mockFs.listStatus(app2Dir)).thenReturn(
-            new FileStatus[]{});
+    PathWithFileStatus app1 = createDirLogPathWithFileStatus(bucketDir.path, appId1.toString(), 0);
+    PathWithFileStatus app2 = createDirLogPathWithFileStatus(bucketDir.path, "application_a", 0);
+    PathWithFileStatus app3 = createDirLogPathWithFileStatus(bucketDir.path, appId3.toString(), 0);
+    PathWithFileStatus app3Log3 = createDirLogPathWithFileStatus(app3.path, DIR_HOST1, 0);
 
-    when(mockFs.listStatus(app1Dir)).thenThrow(
-        new RuntimeException("Should Be Caught and Logged"));
-    Path app3Log3 = new Path(app3Dir, "host1");
-    FileStatus app3Log3Status = new FileStatus(10, false, 1, 1, 0, app3Log3);
-    when(mockFs.listStatus(app3Dir)).thenReturn(
-        new FileStatus[]{app3Log3Status});
+    when(mockFs.listStatus(bucketDir.path)).thenReturn(new FileStatus[]{
+        app1.fileStatus,app2.fileStatus, app3.fileStatus});
+    when(mockFs.listStatus(app1.path)).thenThrow(
+        new RuntimeException("Should be caught and logged"));
+    when(mockFs.listStatus(app2.path)).thenReturn(new FileStatus[]{});
+    when(mockFs.listStatus(app3.path)).thenReturn(new FileStatus[]{app3Log3.fileStatus});
 
-    final List<ApplicationId> finishedApplications =
-        Collections.unmodifiableList(Arrays.asList(appId1, appId3));
+    final List<ApplicationId> finishedApplications = Collections.unmodifiableList(
+            Arrays.asList(appId1, appId3));
 
-    ApplicationClientProtocol rmClient =
-        createMockRMClient(finishedApplications, null);
+    ApplicationClientProtocol rmClient = createMockRMClient(finishedApplications, null);
     AggregatedLogDeletionService.LogDeletionTask deletionTask =
-        new AggregatedLogDeletionService.LogDeletionTask(conf,
-            RETENTION_SECS,
-            rmClient);
+        new AggregatedLogDeletionService.LogDeletionTask(conf, TEN_DAYS_IN_SECONDS, rmClient);
     deletionTask.run();
-    verify(mockFs).delete(app3Dir, true);
+    verify(mockFs).delete(app3.path, true);
   }
 
   static class MockFileSystem extends FilterFileSystem {
     MockFileSystem() {
       super(mock(FileSystem.class));
     }
+
     public void initialize(URI name, Configuration conf) throws IOException {}
   }
 
   private static ApplicationClientProtocol createMockRMClient(
-      List<ApplicationId> finishedApplicaitons,
+      List<ApplicationId> finishedApplications,
       List<ApplicationId> runningApplications) throws Exception {
-    final ApplicationClientProtocol mockProtocol =
-        mock(ApplicationClientProtocol.class);
-    if (finishedApplicaitons != null && !finishedApplicaitons.isEmpty()) {
-      for (ApplicationId appId : finishedApplicaitons) {
-        GetApplicationReportRequest request =
-            GetApplicationReportRequest.newInstance(appId);
-        GetApplicationReportResponse response =
-            createApplicationReportWithFinishedApplication();
-        when(mockProtocol.getApplicationReport(request))
-          .thenReturn(response);
+    final ApplicationClientProtocol mockProtocol = mock(ApplicationClientProtocol.class);
+    if (finishedApplications != null && !finishedApplications.isEmpty()) {
+      for (ApplicationId appId : finishedApplications) {
+        GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId);
+        GetApplicationReportResponse response = createApplicationReportWithFinishedApplication();
+        when(mockProtocol.getApplicationReport(request)).thenReturn(response);
       }
     }
     if (runningApplications != null && !runningApplications.isEmpty()) {
       for (ApplicationId appId : runningApplications) {
-        GetApplicationReportRequest request =
-            GetApplicationReportRequest.newInstance(appId);
-        GetApplicationReportResponse response =
-            createApplicationReportWithRunningApplication();
-        when(mockProtocol.getApplicationReport(request))
-          .thenReturn(response);
+        GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId);
+        GetApplicationReportResponse response = createApplicationReportWithRunningApplication();
+        when(mockProtocol.getApplicationReport(request)).thenReturn(response);
       }
     }
     return mockProtocol;
   }
 
-  private static GetApplicationReportResponse
-      createApplicationReportWithRunningApplication() {
+  private static GetApplicationReportResponse createApplicationReportWithRunningApplication() {
     ApplicationReport report = mock(ApplicationReport.class);
     when(report.getYarnApplicationState()).thenReturn(
       YarnApplicationState.RUNNING);
@@ -561,14 +436,65 @@
     return response;
   }
 
-  private static GetApplicationReportResponse
-      createApplicationReportWithFinishedApplication() {
+  private static GetApplicationReportResponse createApplicationReportWithFinishedApplication() {
     ApplicationReport report = mock(ApplicationReport.class);
-    when(report.getYarnApplicationState()).thenReturn(
-      YarnApplicationState.FINISHED);
-    GetApplicationReportResponse response =
-        mock(GetApplicationReportResponse.class);
+    when(report.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
+    GetApplicationReportResponse response = mock(GetApplicationReportResponse.class);
     when(response.getApplicationReport()).thenReturn(report);
     return response;
   }
+
+  private static class PathWithFileStatus {
+    private final Path path;
+    private FileStatus fileStatus;
+
+    PathWithFileStatus(Path path, FileStatus fileStatus) {
+      this.path = path;
+      this.fileStatus = fileStatus;
+    }
+
+    public void changeModificationTime(long modTime) {
+      fileStatus = new FileStatus(fileStatus.getLen(), fileStatus.isDirectory(),
+              fileStatus.getReplication(),
+              fileStatus.getBlockSize(), modTime, fileStatus.getPath());
+    }
+  }
+
+  private static class AggregatedLogDeletionServiceForTest extends AggregatedLogDeletionService {
+    private final List<ApplicationId> finishedApplications;
+    private final List<ApplicationId> runningApplications;
+    private final Configuration conf;
+
+    AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
+                                               List<ApplicationId> finishedApplications) {
+      this(runningApplications, finishedApplications, null);
+    }
+
+    AggregatedLogDeletionServiceForTest(List<ApplicationId> runningApplications,
+                                               List<ApplicationId> finishedApplications,
+                                               Configuration conf) {
+      this.runningApplications = runningApplications;
+      this.finishedApplications = finishedApplications;
+      this.conf = conf;
+    }
+
+    @Override
+    protected ApplicationClientProtocol createRMClient() throws IOException {
+      try {
+        return createMockRMClient(finishedApplications, runningApplications);
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    protected Configuration createConf() {
+      return conf;
+    }
+
+    @Override
+    protected void stopRMClient() {
+      // DO NOTHING
+    }
+  }
 }