[STORM-3701] clean-up topo directory with the check against latest topo blob cache (#3336)
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index f59943d..054dc62 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -467,14 +467,14 @@
// Will need further investigation if the race condition happens again
List<LocalResource> localResources;
try {
- // Precondition1: Base blob stormconf.ser and stormcode.ser have been localized
- // Precondition2: Both these two blob files are fully downloaded and proper permission been set
+ // Precondition1: Base blob stormconf.ser and stormcode.ser are available
+ // Precondition2: Both files have proper permission
localResources = getLocalResources(pna);
} catch (IOException e) {
LOG.info("Port and assignment info: {}", pna);
if (e instanceof FileNotFoundException) {
localResourceFileNotFoundWhenReleasingSlot.mark();
- LOG.warn("Local base blobs have not been downloaded yet. ", e);
+ LOG.warn("Local base blobs are not available. ", e);
return;
} else {
LOG.error("Unable to read local file. ", e);
@@ -613,6 +613,7 @@
@VisibleForTesting
void cleanup() {
try {
+ LOG.info("Starting cleanup");
LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
// need one large set of all and then clean via LRU
for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userArchives.entrySet()) {
@@ -641,7 +642,12 @@
try {
forEachTopologyDistDir((p, topologyId) -> {
- if (!safeTopologyIds.contains(topologyId)) {
+ String topoJarKey = ConfigUtils.masterStormJarKey(topologyId);
+ String topoCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
+ String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
+ if (!topologyBlobs.containsKey(topoJarKey)
+ && !topologyBlobs.containsKey(topoCodeKey)
+ && !topologyBlobs.containsKey(topoConfKey)) {
fsOps.deleteIfExists(p.toFile());
}
});
@@ -673,6 +679,8 @@
} catch (Error error) {
LOG.error("AsyncLocalizer cleanup failure", error);
Utils.exitProcess(20, "AsyncLocalizer cleanup failure");
+ } finally {
+ LOG.info("Finish cleanup");
}
}