STORM-3523 revert handling of FNF.  Prevent AsyncLocalizer deadlock. (#3153)


diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 578be2b..fdd130b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -21,7 +21,6 @@
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
@@ -95,7 +94,8 @@
     private final ConcurrentHashMap<String, CompletableFuture<Void>> topologyBasicDownloaded = new ConcurrentHashMap<>();
     private final Path localBaseDir;
     private final int blobDownloadRetries;
-    private final ScheduledExecutorService execService;
+    private final ScheduledExecutorService downloadExecService;
+    private final ScheduledExecutorService taskExecService;
     private final long cacheCleanupPeriod;
     private final StormMetricsRegistry metricsRegistry;
     // cleanup
@@ -120,13 +120,14 @@
         cacheCleanupPeriod = ObjectReader.getInt(conf.get(
             DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 * 1000).longValue();
 
-        // if we needed we could make config for update thread pool size
-        int threadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
         blobDownloadRetries = ObjectReader.getInt(conf.get(
             DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
 
-        execService = Executors.newScheduledThreadPool(threadPoolSize,
-                                                       new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor - %d").build());
+        int downloadThreadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
+        downloadExecService = Executors.newScheduledThreadPool(downloadThreadPoolSize,
+                new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Download Executor - %d").build());
+        taskExecService = Executors.newScheduledThreadPool(3,
+                new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Task Executor - %d").build());
         reconstructLocalizedResources();
 
         symlinksDisabled = (boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
@@ -213,7 +214,7 @@
             blobPending.compute(topologyId, (tid, old) -> {
                 CompletableFuture<Void> ret = old;
                 if (ret == null) {
-                    ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), execService);
+                    ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), taskExecService);
                 } else {
                     try {
                         addReferencesToBlobs(pna, cb);
@@ -291,7 +292,7 @@
                     }
                 }
                 LOG.debug("FINISHED download of {}", blob);
-            }, execService);
+            }, downloadExecService);
             i++;
         }
         return CompletableFuture.allOf(all);
@@ -337,14 +338,15 @@
      * Start any background threads needed.  This includes updating blobs and cleaning up unused blobs over the configured size limit.
      */
     public void start() {
-        execService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
+        taskExecService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
         LOG.debug("Scheduling cleanup every {} millis", cacheCleanupPeriod);
-        execService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
+        taskExecService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public void close() throws InterruptedException {
-        execService.shutdown();
+        downloadExecService.shutdown();
+        taskExecService.shutdown();
     }
 
     private List<LocalResource> getLocalResources(PortAndAssignment pna) throws IOException {
@@ -450,15 +452,7 @@
             topoConfBlob.removeReference(pna);
         }
 
-        List<LocalResource> localResources;
-        try {
-            localResources = getLocalResources(pna);
-        } catch (FileNotFoundException e) {
-            LOG.warn("Local resources for {} no longer available", pna, e);
-            return;
-        }
-
-        for (LocalResource lr : localResources) {
+        for (LocalResource lr : getLocalResources(pna)) {
             try {
                 removeBlobReference(lr.getBlobName(), pna, lr.shouldUncompress());
             } catch (Exception e) {