Merge pull request #3146 from govind-menon/STORM-3521
STORM-3521: Ensures unrecognized Storm CLI flags/arguments are passed to main Java class if any
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index eaa6384..578be2b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -21,6 +21,7 @@
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
@@ -449,7 +450,15 @@
topoConfBlob.removeReference(pna);
}
- for (LocalResource lr : getLocalResources(pna)) {
+ List<LocalResource> localResources;
+ try {
+ localResources = getLocalResources(pna);
+ } catch (FileNotFoundException e) {
+ LOG.warn("Local resources for {} no longer available", pna, e);
+ return;
+ }
+
+ for (LocalResource lr : localResources) {
try {
removeBlobReference(lr.getBlobName(), pna, lr.shouldUncompress());
} catch (Exception e) {
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
index 87bd970..f984def 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/LocalizedResource.java
@@ -143,6 +143,9 @@
}
static void completelyRemoveUnusedUser(Path localBaseDir, String user) throws IOException {
+ Path localUserDir = getLocalUserDir(localBaseDir, user);
+ LOG.info("completelyRemoveUnusedUser {} for directory {}", user, localUserDir);
+
Path userFileCacheDir = getLocalUserFileCacheDir(localBaseDir, user);
// baseDir/supervisor/usercache/user1/filecache/files
Files.deleteIfExists(getCacheDirForFiles(userFileCacheDir));
@@ -151,7 +154,7 @@
// baseDir/supervisor/usercache/user1/filecache
Files.deleteIfExists(userFileCacheDir);
// baseDir/supervisor/usercache/user1
- Files.deleteIfExists(getLocalUserDir(localBaseDir, user));
+ Files.deleteIfExists(localUserDir);
}
static List<String> getLocalizedArchiveKeys(Path localBaseDir, String user) throws IOException {
@@ -254,9 +257,12 @@
if (!Files.exists(parent)) {
//There is a race here that we can still lose
try {
- Files.createDirectory(parent);
+ Files.createDirectories(parent);
} catch (FileAlreadyExistsException e) {
//Ignored
+ } catch (IOException e) {
+ LOG.error("Failed to create parent directory {}", parent, e);
+ throw e;
}
}
return path;
@@ -397,7 +403,7 @@
}
}
} catch (NoSuchFileException e) {
- LOG.warn("Nothing to cleanup with badeDir {} even though we expected there to be something there", baseDir);
+ LOG.warn("Nothing to cleanup with baseDir {} even though we expected there to be something there", baseDir);
}
}
diff --git a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
index a46f99e..f245d90 100644
--- a/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
+++ b/storm-server/src/test/java/org/apache/storm/scheduler/resource/strategies/scheduling/TestConstraintSolverStrategy.java
@@ -215,6 +215,8 @@
// Add 1 topology with large number of executors and constraints. Too many can cause a java.lang.StackOverflowError
Config config = createCSSClusterConfig(10, 10, 0, null);
config.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_STATE_SEARCH, 50000);
+ config.put(Config.TOPOLOGY_RAS_CONSTRAINT_MAX_TIME_SECS, 120);
+ config.put(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY, 120);
List<List<String>> constraints = new LinkedList<>();
addContraints("spout-0", "spout-0", constraints);