[STORM-3701] clean-up topo directory with the check against latest topo blob cache (#3336)

diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index f59943d..054dc62 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -467,14 +467,14 @@
         // Will need further investigation if the race condition happens again
         List<LocalResource> localResources;
         try {
-            // Precondition1: Base blob stormconf.ser and stormcode.ser have been localized
-            // Precondition2: Both these two blob files are fully downloaded and proper permission been set
+            // Precondition1: Base blob stormconf.ser and stormcode.ser are available
+            // Precondition2: Both files have proper permission
             localResources = getLocalResources(pna);
         } catch (IOException e) {
             LOG.info("Port and assignment info: {}", pna);
             if (e instanceof FileNotFoundException) {
                 localResourceFileNotFoundWhenReleasingSlot.mark();
-                LOG.warn("Local base blobs have not been downloaded yet. ", e);
+                LOG.warn("Local base blobs are not available. ", e);
                 return;
             } else {
                 LOG.error("Unable to read local file. ", e);
@@ -613,6 +613,7 @@
     @VisibleForTesting
     void cleanup() {
         try {
+            LOG.info("Starting cleanup");
             LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
             // need one large set of all and then clean via LRU
             for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userArchives.entrySet()) {
@@ -641,7 +642,12 @@
 
             try {
                 forEachTopologyDistDir((p, topologyId) -> {
-                    if (!safeTopologyIds.contains(topologyId)) {
+                    String topoJarKey = ConfigUtils.masterStormJarKey(topologyId);
+                    String topoCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
+                    String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);
+                    if (!topologyBlobs.containsKey(topoJarKey)
+                        && !topologyBlobs.containsKey(topoCodeKey)
+                        && !topologyBlobs.containsKey(topoConfKey)) {
                         fsOps.deleteIfExists(p.toFile());
                     }
                 });
@@ -673,6 +679,8 @@
         } catch (Error error) {
             LOG.error("AsyncLocalizer cleanup failure", error);
             Utils.exitProcess(20, "AsyncLocalizer cleanup failure");
+        } finally {
+            LOG.info("Finish cleanup");
         }
     }