STORM-3523 revert handling of FNF. Prevent AsyncLocalizer deadlock. (#3153)
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 578be2b..fdd130b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -21,7 +21,6 @@
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
@@ -95,7 +94,8 @@
private final ConcurrentHashMap<String, CompletableFuture<Void>> topologyBasicDownloaded = new ConcurrentHashMap<>();
private final Path localBaseDir;
private final int blobDownloadRetries;
- private final ScheduledExecutorService execService;
+ private final ScheduledExecutorService downloadExecService;
+ private final ScheduledExecutorService taskExecService;
private final long cacheCleanupPeriod;
private final StormMetricsRegistry metricsRegistry;
// cleanup
@@ -120,13 +120,14 @@
cacheCleanupPeriod = ObjectReader.getInt(conf.get(
DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 * 1000).longValue();
- // if we needed we could make config for update thread pool size
- int threadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
blobDownloadRetries = ObjectReader.getInt(conf.get(
DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
- execService = Executors.newScheduledThreadPool(threadPoolSize,
- new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor - %d").build());
+ int downloadThreadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
+ downloadExecService = Executors.newScheduledThreadPool(downloadThreadPoolSize,
+ new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Download Executor - %d").build());
+ taskExecService = Executors.newScheduledThreadPool(3,
+ new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Task Executor - %d").build());
reconstructLocalizedResources();
symlinksDisabled = (boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
@@ -213,7 +214,7 @@
blobPending.compute(topologyId, (tid, old) -> {
CompletableFuture<Void> ret = old;
if (ret == null) {
- ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), execService);
+ ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), taskExecService);
} else {
try {
addReferencesToBlobs(pna, cb);
@@ -291,7 +292,7 @@
}
}
LOG.debug("FINISHED download of {}", blob);
- }, execService);
+ }, downloadExecService);
i++;
}
return CompletableFuture.allOf(all);
@@ -337,14 +338,15 @@
* Start any background threads needed. This includes updating blobs and cleaning up unused blobs over the configured size limit.
*/
public void start() {
- execService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
+ taskExecService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
LOG.debug("Scheduling cleanup every {} millis", cacheCleanupPeriod);
- execService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
+ taskExecService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
}
@Override
public void close() throws InterruptedException {
- execService.shutdown();
+ downloadExecService.shutdown();
+ taskExecService.shutdown();
}
private List<LocalResource> getLocalResources(PortAndAssignment pna) throws IOException {
@@ -450,15 +452,7 @@
topoConfBlob.removeReference(pna);
}
- List<LocalResource> localResources;
- try {
- localResources = getLocalResources(pna);
- } catch (FileNotFoundException e) {
- LOG.warn("Local resources for {} no longer available", pna, e);
- return;
- }
-
- for (LocalResource lr : localResources) {
+ for (LocalResource lr : getLocalResources(pna)) {
try {
removeBlobReference(lr.getBlobName(), pna, lr.shouldUncompress());
} catch (Exception e) {