/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.storm.localizer;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.blobstore.ClientBlobStore;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
import org.apache.storm.daemon.supervisor.SupervisorUtils;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.storm.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.storm.thrift.transport.TTransportException;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.NimbusLeaderNotFoundException;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.WrappedKeyNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Downloads and caches blobs locally.
 */
public class AsyncLocalizer implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncLocalizer.class);

    private static final CompletableFuture<Void> ALL_DONE_FUTURE = CompletableFuture.completedFuture(null);
    private static final int ATTEMPTS_INTERVAL_TIME = 100;

    private final Timer singleBlobLocalizationDuration;
    private final Timer blobCacheUpdateDuration;
    private final Timer blobLocalizationDuration;
    private final Meter numBlobUpdateVersionChanged;

    // track resources - user to resourceSet
    //ConcurrentHashMap is explicitly used everywhere in this class because it uses locks to guarantee atomicity for compute and
    // computeIfAbsent where as ConcurrentMap allows for a retry of the function passed in, and would require the function to have
    // no side effects.
    protected final ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>> userFiles = new ConcurrentHashMap<>();
    protected final ConcurrentHashMap<String, ConcurrentHashMap<String, LocalizedResource>> userArchives = new ConcurrentHashMap<>();
    private final boolean isLocalMode;
    // topology to tracking of topology dir and resources
    private final ConcurrentHashMap<String, CompletableFuture<Void>> blobPending;
    private final Map<String, Object> conf;
    private final AdvancedFSOps fsOps;
    private final boolean symlinksDisabled;
    private final ConcurrentHashMap<String, LocallyCachedBlob> topologyBlobs = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, CompletableFuture<Void>> topologyBasicDownloaded = new ConcurrentHashMap<>();
    private final Path localBaseDir;
    private final int blobDownloadRetries;
    private final ScheduledExecutorService downloadExecService;
    private final ScheduledExecutorService taskExecService;
    private final long cacheCleanupPeriod;
    private final StormMetricsRegistry metricsRegistry;
    // cleanup
    @VisibleForTesting
    protected long cacheTargetSize;

    @VisibleForTesting
    AsyncLocalizer(Map<String, Object> conf, AdvancedFSOps ops, String baseDir, StormMetricsRegistry metricsRegistry) throws IOException {
        this.conf = conf;
        this.singleBlobLocalizationDuration = metricsRegistry.registerTimer("supervisor:single-blob-localization-duration");
        this.blobCacheUpdateDuration = metricsRegistry.registerTimer("supervisor:blob-cache-update-duration");
        this.blobLocalizationDuration = metricsRegistry.registerTimer("supervisor:blob-localization-duration");
        this.numBlobUpdateVersionChanged = metricsRegistry.registerMeter("supervisor:num-blob-update-version-changed");
        this.metricsRegistry = metricsRegistry;
        isLocalMode = ConfigUtils.isLocalMode(conf);
        fsOps = ops;
        localBaseDir = Paths.get(baseDir);
        // default cache size 10GB, converted to Bytes
        cacheTargetSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_TARGET_SIZE_MB),
                                              10 * 1024).longValue() << 20;
        // default 30 seconds. (we cache the size so it is cheap to do)
        cacheCleanupPeriod = ObjectReader.getInt(conf.get(
            DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 * 1000).longValue();

        blobDownloadRetries = ObjectReader.getInt(conf.get(
            DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);

        int downloadThreadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
        downloadExecService = Executors.newScheduledThreadPool(downloadThreadPoolSize,
                new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Download Executor - %d").build());
        taskExecService = Executors.newScheduledThreadPool(3,
                new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Task Executor - %d").build());
        reconstructLocalizedResources();

        symlinksDisabled = (boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
        blobPending = new ConcurrentHashMap<>();
    }

    public AsyncLocalizer(Map<String, Object> conf, StormMetricsRegistry metricsRegistry) throws IOException {
        this(conf, AdvancedFSOps.make(conf), ConfigUtils.supervisorLocalDir(conf), metricsRegistry);
    }

    @VisibleForTesting
    LocallyCachedBlob getTopoJar(final String topologyId, String owner) {
        return topologyBlobs.computeIfAbsent(ConfigUtils.masterStormJarKey(topologyId),
            (tjk) -> {
                try {
                    return new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps,
                                                      LocallyCachedTopologyBlob.TopologyBlobType
                                                          .TOPO_JAR, owner, metricsRegistry);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
    }

    @VisibleForTesting
    LocallyCachedBlob getTopoCode(final String topologyId, String owner) {
        return topologyBlobs.computeIfAbsent(ConfigUtils.masterStormCodeKey(topologyId),
            (tck) -> {
                try {
                    return new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps,
                                                      LocallyCachedTopologyBlob.TopologyBlobType
                                                          .TOPO_CODE, owner, metricsRegistry);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
    }

    @VisibleForTesting
    LocallyCachedBlob getTopoConf(final String topologyId, String owner) {
        return topologyBlobs.computeIfAbsent(ConfigUtils.masterStormConfKey(topologyId),
            (tck) -> {
                try {
                    return new LocallyCachedTopologyBlob(topologyId, isLocalMode, conf, fsOps,
                                                      LocallyCachedTopologyBlob.TopologyBlobType
                                                          .TOPO_CONF, owner, metricsRegistry);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
    }

    private LocalizedResource getUserArchive(String user, String key) {
        assert user != null : "All user archives require a user present";
        ConcurrentMap<String, LocalizedResource> keyToResource = userArchives.computeIfAbsent(user, (u) -> new ConcurrentHashMap<>());
        return keyToResource.computeIfAbsent(key, 
            (k) -> new LocalizedResource(key, localBaseDir, true, fsOps, conf, user, metricsRegistry));
    }

    private LocalizedResource getUserFile(String user, String key) {
        assert user != null : "All user archives require a user present";
        ConcurrentMap<String, LocalizedResource> keyToResource = userFiles.computeIfAbsent(user, (u) -> new ConcurrentHashMap<>());
        return keyToResource.computeIfAbsent(key, 
            (k) -> new LocalizedResource(key, localBaseDir, false, fsOps, conf, user, metricsRegistry));
    }

    /**
     * Request that all of the blobs necessary for this topology be downloaded.
     *
     * @param assignment the assignment that needs the blobs
     * @param port       the port the assignment is a part of
     * @param cb         a callback for when the blobs change.  This is only for blobs that are tied to the lifetime of the worker.
     * @return a Future that indicates when they are all downloaded.
     *
     * @throws IOException if there was an error while trying doing it.
     */
    public CompletableFuture<Void> requestDownloadTopologyBlobs(final LocalAssignment assignment, final int port,
                                                                final BlobChangingCallback cb) throws IOException {
        final PortAndAssignment pna = new TimePortAndAssignment(new PortAndAssignmentImpl(port, assignment), blobLocalizationDuration);
        final String topologyId = pna.getToplogyId();

        CompletableFuture<Void> baseBlobs = requestDownloadBaseTopologyBlobs(pna, cb);
        return baseBlobs.thenComposeAsync((v) ->
            blobPending.compute(topologyId, (tid, old) -> {
                CompletableFuture<Void> ret = old;
                if (ret == null) {
                    ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), taskExecService);
                } else {
                    try {
                        addReferencesToBlobs(pna, cb);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    } finally {
                        pna.complete();
                    }
                }
                LOG.debug("Reserved blobs {} {}", topologyId, ret);
                return ret;
            }));
    }

    @VisibleForTesting
    CompletableFuture<Void> requestDownloadBaseTopologyBlobs(PortAndAssignment pna, BlobChangingCallback cb) {
        final String topologyId = pna.getToplogyId();

        final LocallyCachedBlob topoJar = getTopoJar(topologyId, pna.getAssignment().get_owner());
        topoJar.addReference(pna, cb);

        final LocallyCachedBlob topoCode = getTopoCode(topologyId, pna.getAssignment().get_owner());
        topoCode.addReference(pna, cb);

        final LocallyCachedBlob topoConf = getTopoConf(topologyId, pna.getAssignment().get_owner());
        topoConf.addReference(pna, cb);

        return topologyBasicDownloaded.computeIfAbsent(topologyId,
            (tid) -> downloadOrUpdate(topoJar, topoCode, topoConf));
    }

    private CompletableFuture<Void> downloadOrUpdate(LocallyCachedBlob... blobs) {
        return downloadOrUpdate(Arrays.asList(blobs));
    }

    private CompletableFuture<Void> downloadOrUpdate(Collection<? extends LocallyCachedBlob> blobs) {
        CompletableFuture<Void>[] all = new CompletableFuture[blobs.size()];
        int i = 0;
        for (final LocallyCachedBlob blob : blobs) {
            all[i] = CompletableFuture.runAsync(() -> {
                LOG.debug("STARTING download of {}", blob);
                try (ClientBlobStore blobStore = getClientBlobStore()) {
                    boolean done = false;
                    long failures = 0;
                    while (!done) {
                        try {
                            synchronized (blob) {
                                long localVersion = blob.getLocalVersion();
                                long remoteVersion = blob.getRemoteVersion(blobStore);
                                if (localVersion != remoteVersion || !blob.isFullyDownloaded()) {
                                    if (blob.isFullyDownloaded()) {
                                        //Avoid case of different blob version
                                        // when blob is not downloaded (first time download)
                                        numBlobUpdateVersionChanged.mark();
                                    }
                                    Timer.Context t = singleBlobLocalizationDuration.time();
                                    try {
                                        long newVersion = blob.fetchUnzipToTemp(blobStore);
                                        blob.informReferencesAndCommitNewVersion(newVersion);
                                        t.stop();
                                    } finally {
                                        blob.cleanupOrphanedData();
                                    }
                                }
                            }
                            done = true;
                        } catch (Exception e) {
                            failures++;
                            if (failures > blobDownloadRetries) {
                                throw new RuntimeException("Could not download...", e);
                            }
                            LOG.warn("Failed to download blob {} will try again in {} ms", blob, ATTEMPTS_INTERVAL_TIME, e);
                            Utils.sleep(ATTEMPTS_INTERVAL_TIME);
                        }
                    }
                }
                LOG.debug("FINISHED download of {}", blob);
            }, downloadExecService);
            i++;
        }
        return CompletableFuture.allOf(all);
    }

    /**
     * Downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files
     * with a suffix. The runnable is intended to be run periodically by a timer, created elsewhere.
     */
    @VisibleForTesting
    void updateBlobs() {
        try (Timer.Context t = blobCacheUpdateDuration.time()) {
            List<CompletableFuture<?>> futures = new ArrayList<>();
            futures.add(downloadOrUpdate(topologyBlobs.values()));
            if (symlinksDisabled) {
                LOG.warn("symlinks are disabled so blobs cannot be downloaded.");
            } else {
                for (ConcurrentMap<String, LocalizedResource> map : userArchives.values()) {
                    futures.add(downloadOrUpdate(map.values()));
                }

                for (ConcurrentMap<String, LocalizedResource> map : userFiles.values()) {
                    futures.add(downloadOrUpdate(map.values()));
                }
            }
            for (CompletableFuture<?> f : futures) {
                try {
                    f.get();
                } catch (Exception e) {
                    if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) {
                        LOG.error("Network error while updating blobs, will retry again later", e);
                    } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) {
                        LOG.error("Nimbus unavailable to update blobs, will retry again later", e);
                    } else {
                        LOG.error("Could not update blob, will retry again later", e);
                    }
                }
            }
        }
    }

    /**
     * Start any background threads needed.  This includes updating blobs and cleaning up unused blobs over the configured size limit.
     */
    public void start() {
        taskExecService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
        LOG.debug("Scheduling cleanup every {} millis", cacheCleanupPeriod);
        taskExecService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() throws InterruptedException {
        downloadExecService.shutdown();
        taskExecService.shutdown();
    }

    private List<LocalResource> getLocalResources(PortAndAssignment pna) throws IOException {
        String topologyId = pna.getToplogyId();
        Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, topologyId);

        @SuppressWarnings("unchecked")
        Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);

        List<LocalResource> ret = new ArrayList<>();
        if (blobstoreMap != null) {
            List<LocalResource> tmp = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap);
            if (tmp != null) {
                ret.addAll(tmp);
            }
        }

        StormTopology stormCode = ConfigUtils.readSupervisorTopology(conf, topologyId, fsOps);
        List<String> dependencies = new ArrayList<>();
        if (stormCode.is_set_dependency_jars()) {
            dependencies.addAll(stormCode.get_dependency_jars());
        }
        if (stormCode.is_set_dependency_artifacts()) {
            dependencies.addAll(stormCode.get_dependency_artifacts());
        }
        for (String dependency : dependencies) {
            ret.add(new LocalResource(dependency, false, true));
        }
        return ret;
    }

    @VisibleForTesting
    void addReferencesToBlobs(PortAndAssignment pna, BlobChangingCallback cb)
        throws IOException, KeyNotFoundException, AuthorizationException {
        List<LocalResource> localResourceList = getLocalResources(pna);
        if (!localResourceList.isEmpty()) {
            getBlobs(localResourceList, pna, cb);
        }
    }

    /**
     * Do everything needed to recover the state in the AsyncLocalizer for a running topology.
     *
     * @param currentAssignment the assignment for the topology.
     * @param port              the port the assignment is on.
     * @param cb                a callback for when the blobs are updated.  This will only be for blobs that indicate that if they change
     *                          the worker should be restarted.
     * @throws IOException on any error trying to recover the state.
     */
    public void recoverRunningTopology(final LocalAssignment currentAssignment, final int port,
                                       final BlobChangingCallback cb) throws IOException {
        final PortAndAssignment pna = new PortAndAssignmentImpl(port, currentAssignment);
        final String topologyId = pna.getToplogyId();

        LocallyCachedBlob topoJar = getTopoJar(topologyId, pna.getAssignment().get_owner());
        topoJar.addReference(pna, cb);

        LocallyCachedBlob topoCode = getTopoCode(topologyId, pna.getAssignment().get_owner());
        topoCode.addReference(pna, cb);

        LocallyCachedBlob topoConf = getTopoConf(topologyId, pna.getAssignment().get_owner());
        topoConf.addReference(pna, cb);

        CompletableFuture<Void> localResource = blobPending.computeIfAbsent(topologyId, (tid) -> ALL_DONE_FUTURE);

        try {
            addReferencesToBlobs(pna, cb);
        } catch (KeyNotFoundException | AuthorizationException e) {
            LOG.error("Could not recover all blob references for {}", pna);
        }

        LOG.debug("Recovered blobs {} {}", topologyId, localResource);
    }

    /**
     * Remove this assignment/port as blocking resources from being cleaned up.
     *
     * @param assignment the assignment the resources are for
     * @param port       the port the topology is running on
     * @throws IOException on any error
     */
    public void releaseSlotFor(LocalAssignment assignment, int port) throws IOException {
        PortAndAssignment pna = new PortAndAssignmentImpl(port, assignment);
        final String topologyId = assignment.get_topology_id();
        LOG.debug("Releasing slot for {} {}", topologyId, port);

        String topoJarKey = ConfigUtils.masterStormJarKey(topologyId);
        String topoCodeKey = ConfigUtils.masterStormCodeKey(topologyId);
        String topoConfKey = ConfigUtils.masterStormConfKey(topologyId);

        LocallyCachedBlob topoJar = topologyBlobs.get(topoJarKey);
        if (topoJar != null) {
            topoJar.removeReference(pna);
        }

        LocallyCachedBlob topoCode = topologyBlobs.get(topoCodeKey);
        if (topoCode != null) {
            topoCode.removeReference(pna);
        }

        LocallyCachedBlob topoConfBlob = topologyBlobs.get(topoConfKey);
        if (topoConfBlob != null) {
            topoConfBlob.removeReference(pna);
        }

        for (LocalResource lr : getLocalResources(pna)) {
            try {
                removeBlobReference(lr.getBlobName(), pna, lr.shouldUncompress());
            } catch (Exception e) {
                throw new IOException(e);
            }
        }
    }

    // baseDir/supervisor/usercache/user1/
    @VisibleForTesting
    File getLocalUserDir(String userName) {
        return LocalizedResource.getLocalUserDir(localBaseDir, userName).toFile();
    }

    // baseDir/supervisor/usercache/user1/filecache
    @VisibleForTesting
    File getLocalUserFileCacheDir(String userName) {
        return LocalizedResource.getLocalUserFileCacheDir(localBaseDir, userName).toFile();
    }

    private void recoverLocalizedArchivesForUser(String user) throws IOException {
        for (String key : LocalizedResource.getLocalizedArchiveKeys(localBaseDir, user)) {
            getUserArchive(user, key);
        }
    }

    private void recoverLocalizedFilesForUser(String user) throws IOException {
        for (String key : LocalizedResource.getLocalizedFileKeys(localBaseDir, user)) {
            getUserFile(user, key);
        }
    }

    // Check to see if there are any existing files already localized.
    private void reconstructLocalizedResources() {
        try {
            LOG.info("Reconstruct localized resources");
            Collection<String> users = LocalizedResource.getLocalizedUsers(localBaseDir);
            if (!(users == null || users.isEmpty())) {
                for (String user : users) {
                    LOG.debug("reconstructing resources owned by {}", user);
                    recoverLocalizedFilesForUser(user);
                    recoverLocalizedArchivesForUser(user);
                }
            } else {
                LOG.debug("No left over resources found for any user");
            }
        } catch (Exception e) {
            LOG.error("ERROR reconstructing localized resources", e);
        }
    }

    // ignores invalid user/topo/key
    void removeBlobReference(String key, PortAndAssignment pna,
                             boolean uncompress) throws AuthorizationException, KeyNotFoundException {
        String user = pna.getOwner();
        String topo = pna.getToplogyId();
        ConcurrentMap<String, LocalizedResource> lrsrcSet = uncompress ? userArchives.get(user) : userFiles.get(user);
        if (lrsrcSet != null) {
            LocalizedResource lrsrc = lrsrcSet.get(key);
            if (lrsrc != null) {
                LOG.debug("removing blob reference to: {} for topo: {}", key, topo);
                lrsrc.removeReference(pna);
            } else {
                LOG.warn("trying to remove non-existent blob, key: " + key + " for user: " + user
                         + " topo: " + topo);
            }
        } else {
            LOG.warn("trying to remove blob for non-existent resource set for user: " + user + " key: "
                     + key + " topo: " + topo);
        }
    }

    protected ClientBlobStore getClientBlobStore() {
        return ServerUtils.getClientBlobStoreForSupervisor(conf);
    }

    /**
     * This function either returns the blobs in the existing cache or if they don't exist in the cache, it downloads them in parallel (up
     * to SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT) and will block until all of them have been downloaded.
     */
    List<LocalizedResource> getBlobs(List<LocalResource> localResources, PortAndAssignment pna, BlobChangingCallback cb)
        throws AuthorizationException, KeyNotFoundException, IOException {
        if ((boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false)) {
            throw new WrappedKeyNotFoundException("symlinks are disabled so blobs cannot be downloaded.");
        }
        String user = pna.getOwner();
        ArrayList<LocalizedResource> results = new ArrayList<>();
        List<CompletableFuture<?>> futures = new ArrayList<>();

        try {
            for (LocalResource localResource : localResources) {
                String key = localResource.getBlobName();
                boolean uncompress = localResource.shouldUncompress();
                LocalizedResource lrsrc = uncompress ? getUserArchive(user, key) : getUserFile(user, key);

                // go off to blobstore and get it
                // assume dir passed in exists and has correct permission
                LOG.debug("fetching blob: {}", key);
                lrsrc.addReference(pna, localResource.needsCallback() ? cb : null);
                futures.add(downloadOrUpdate(lrsrc));
                results.add(lrsrc);
            }

            for (CompletableFuture<?> futureRsrc : futures) {
                futureRsrc.get();
            }
        } catch (ExecutionException e) {
            Utils.unwrapAndThrow(AuthorizationException.class, e);
            Utils.unwrapAndThrow(KeyNotFoundException.class, e);
            throw new IOException("Error getting blobs", e);
        } catch (RejectedExecutionException re) {
            throw new IOException("RejectedExecutionException: ", re);
        } catch (InterruptedException ie) {
            throw new IOException("Interrupted Exception", ie);
        }
        return results;
    }

    private void forEachTopologyDistDir(ConsumePathAndId consumer) throws IOException {
        Path stormCodeRoot = Paths.get(ConfigUtils.supervisorStormDistRoot(conf));
        if (Files.exists(stormCodeRoot) && Files.isDirectory(stormCodeRoot)) {
            try (DirectoryStream<Path> children = Files.newDirectoryStream(stormCodeRoot)) {
                for (Path child : children) {
                    if (Files.isDirectory(child)) {
                        String topologyId = child.getFileName().toString();
                        consumer.accept(child, topologyId);
                    }
                }
            }
        }
    }

    @VisibleForTesting
    void cleanup() {
        try {
            LocalizedResourceRetentionSet toClean = new LocalizedResourceRetentionSet(cacheTargetSize);
            // need one large set of all and then clean via LRU
            for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userArchives.entrySet()) {
                toClean.addResources(t.getValue());
                LOG.debug("Resources to be cleaned after adding {} archives : {}", t.getKey(), toClean);
            }

            for (Map.Entry<String, ConcurrentHashMap<String, LocalizedResource>> t : userFiles.entrySet()) {
                toClean.addResources(t.getValue());
                LOG.debug("Resources to be cleaned after adding {} files : {}", t.getKey(), toClean);
            }

            toClean.addResources(topologyBlobs);
            try (ClientBlobStore store = getClientBlobStore()) {
                toClean.cleanup(store);
            }

            HashSet<String> safeTopologyIds = new HashSet<>();
            for (String blobKey : topologyBlobs.keySet()) {
                safeTopologyIds.add(ConfigUtils.getIdFromBlobKey(blobKey));
            }

            //Deleting this early does not hurt anything
            topologyBasicDownloaded.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));
            blobPending.keySet().removeIf(topoId -> !safeTopologyIds.contains(topoId));

            try {
                forEachTopologyDistDir((p, topologyId) -> {
                    if (!safeTopologyIds.contains(topologyId)) {
                        fsOps.deleteIfExists(p.toFile());
                    }
                });
            } catch (Exception e) {
                LOG.error("Could not read topology directories for cleanup", e);
            }

            LOG.debug("Resource cleanup: {}", toClean);
            Set<String> allUsers = new HashSet<>(userArchives.keySet());
            allUsers.addAll(userFiles.keySet());
            for (String user : allUsers) {
                ConcurrentMap<String, LocalizedResource> filesForUser = userFiles.get(user);
                ConcurrentMap<String, LocalizedResource> archivesForUser = userArchives.get(user);
                if ((filesForUser == null || filesForUser.size() == 0)
                        && (archivesForUser == null || archivesForUser.size() == 0)) {

                    LOG.debug("removing empty set: {}", user);
                    try {
                        LocalizedResource.completelyRemoveUnusedUser(localBaseDir, user);
                        userFiles.remove(user);
                        userArchives.remove(user);
                    } catch (IOException e) {
                        LOG.error("Error trying to delete cached user files", e);
                    }
                }
            }
        } catch (Exception ex) {
            LOG.error("AsyncLocalizer cleanup failure", ex);
        } catch (Error error) {
            LOG.error("AsyncLocalizer cleanup failure", error);
            Utils.exitProcess(20, "AsyncLocalizer cleanup failure");
        }
    }

    private interface ConsumePathAndId {
        void accept(Path path, String topologyId) throws IOException;
    }

    private class DownloadBlobs implements Supplier<Void> {
        private final PortAndAssignment pna;
        private final BlobChangingCallback cb;

        DownloadBlobs(PortAndAssignment pna, BlobChangingCallback cb) {
            this.pna = pna;
            this.cb = cb;
        }

        @Override
        public Void get() {
            try {
                String topologyId = pna.getToplogyId();
                String topoOwner = pna.getOwner();
                String stormroot = ConfigUtils.supervisorStormDistRoot(conf, topologyId);
                Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(conf, topologyId);

                @SuppressWarnings("unchecked")
                Map<String, Map<String, Object>> blobstoreMap =
                    (Map<String, Map<String, Object>>) topoConf.get(Config.TOPOLOGY_BLOBSTORE_MAP);

                List<LocalResource> localResourceList = getLocalResources(pna);
                if (!localResourceList.isEmpty()) {
                    File userDir = getLocalUserFileCacheDir(topoOwner);
                    if (!fsOps.fileExists(userDir)) {
                        fsOps.forceMkdir(userDir);
                    }
                    List<LocalizedResource> localizedResources = getBlobs(localResourceList, pna, cb);
                    fsOps.setupBlobPermissions(userDir, topoOwner);
                    if (!symlinksDisabled) {
                        for (LocalizedResource localizedResource : localizedResources) {
                            String keyName = localizedResource.getKey();
                            //The sym link we are pointing to
                            File rsrcFilePath = localizedResource.getCurrentSymlinkPath().toFile();

                            String symlinkName = null;
                            if (blobstoreMap != null) {
                                Map<String, Object> blobInfo = blobstoreMap.get(keyName);
                                if (blobInfo != null && blobInfo.containsKey("localname")) {
                                    symlinkName = (String) blobInfo.get("localname");
                                } else {
                                    symlinkName = keyName;
                                }
                            } else {
                                // all things are from dependencies
                                symlinkName = keyName;
                            }
                            fsOps.createSymlink(new File(stormroot, symlinkName), rsrcFilePath);
                        }
                    }
                }

                pna.complete();
                return null;
            } catch (Exception e) {
                LOG.warn("Caught Exception While Downloading (rethrowing)... ", e);
                throw new RuntimeException(e);
            }
        }
    }
}
