Merge pull request #3146 from govind-menon/STORM-3521

STORM-3521: Ensures unrecognized Storm CLI flags/arguments are passed to main Java class if any
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index eaa6384..578be2b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -21,6 +21,7 @@
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
@@ -449,7 +450,15 @@
             topoConfBlob.removeReference(pna);
         }
 
-        for (LocalResource lr : getLocalResources(pna)) {
+        List<LocalResource> localResources;
+        try {
+            localResources = getLocalResources(pna);
+        } catch (FileNotFoundException e) {
+            LOG.warn("Local resources for {} no longer available", pna, e);
+            return;
+        }
+
+        for (LocalResource lr : localResources) {
             try {
                 removeBlobReference(lr.getBlobName(), pna, lr.shouldUncompress());
             } catch (Exception e) {
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
index 87bd970..f984def 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
@@ -143,6 +143,9 @@
     }
 
     static void completelyRemoveUnusedUser(Path localBaseDir, String user) throws IOException {
+        Path localUserDir = getLocalUserDir(localBaseDir, user);
+        LOG.info("completelyRemoveUnusedUser {} for directory {}", user, localUserDir);
+
         Path userFileCacheDir = getLocalUserFileCacheDir(localBaseDir, user);
         // baseDir/supervisor/usercache/user1/filecache/files
         Files.deleteIfExists(getCacheDirForFiles(userFileCacheDir));
@@ -151,7 +154,7 @@
         // baseDir/supervisor/usercache/user1/filecache
         Files.deleteIfExists(userFileCacheDir);
         // baseDir/supervisor/usercache/user1
-        Files.deleteIfExists(getLocalUserDir(localBaseDir, user));
+        Files.deleteIfExists(localUserDir);
     }
 
     static List<String> getLocalizedArchiveKeys(Path localBaseDir, String user) throws IOException {
@@ -254,9 +257,12 @@
                 if (!Files.exists(parent)) {
                     //There is a race here that we can still lose
                     try {
-                        Files.createDirectory(parent);
+                        Files.createDirectories(parent);
                     } catch (FileAlreadyExistsException e) {
                         //Ignored
+                    } catch (IOException e) {
+                        LOG.error("Failed to create parent directory {}", parent, e);
+                        throw e;
                     }
                 }
                 return path;
@@ -397,7 +403,7 @@
                 }
             }
         } catch (NoSuchFileException e) {
-            LOG.warn("Nothing to cleanup with badeDir {} even though we expected there to be something there", baseDir);
+            LOG.warn("Nothing to cleanup with baseDir {} even though we expected there to be something there", baseDir);
         }
     }
 
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index a46f99e..f245d90 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -215,6 +215,8 @@
         // Add 1 topology with large number of executors and constraints. Too many can cause a java.lang.StackOverflowError
         Config config = createCSSClusterConfig(10, 10, 0, null);
         config.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, 50000);
+        config.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS, 120);
+        config.put(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY, 120);
 
         List<List<String>> constraints = new LinkedList<>();
         addContraints("spout-0", "spout-0", constraints);