| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version |
| * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions |
| * and limitations under the License. |
| */ |
| |
| package org.apache.storm.localizer; |
| |
| import com.codahale.metrics.Histogram; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.nio.file.Files; |
| import java.nio.file.LinkOption; |
| import java.nio.file.Path; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| import org.apache.storm.blobstore.ClientBlobStore; |
| import org.apache.storm.blobstore.InputStreamWithMeta; |
| import org.apache.storm.generated.AuthorizationException; |
| import org.apache.storm.generated.KeyNotFoundException; |
| import org.apache.storm.metric.StormMetricsRegistry; |
| import org.apache.storm.utils.Time; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Represents a blob that is cached locally on disk by the supervisor. |
| */ |
| public abstract class LocallyCachedBlob { |
| public static final long NOT_DOWNLOADED_VERSION = -1; |
| private static final Logger LOG = LoggerFactory.getLogger(LocallyCachedBlob.class); |
| // A callback that does nothing. |
| private static final BlobChangingCallback NOOP_CB = (assignment, port, resource, go) -> { |
| }; |
| private final ConcurrentHashMap<PortAndAssignment, BlobChangingCallback> references = new ConcurrentHashMap<>(); |
| private final String blobDescription; |
| private final String blobKey; |
| private AtomicLong lastUsed = new AtomicLong(Time.currentTimeMillis()); |
| |
| private final Histogram fetchingRate; |
| |
| /** |
| * Create a new LocallyCachedBlob. |
| * |
| * @param blobDescription a description of the blob this represents. Typically it should at least be the blob key, but ideally also |
| * include if it is an archive or not, what user or topology it is for, or if it is a storm.jar etc. |
| */ |
| protected LocallyCachedBlob(String blobDescription, String blobKey, StormMetricsRegistry metricsRegistry) { |
| this.blobDescription = blobDescription; |
| this.blobKey = blobKey; |
| this.fetchingRate = metricsRegistry.registerHistogram("supervisor:blob-fetching-rate-MB/s"); |
| } |
| |
| /** |
| * Helper function to download blob from blob store. |
| * @param store Blob store to fetch blobs from |
| * @param key Key to retrieve blobs |
| * @param pathSupplier A function that supplies the download destination of a blob. It guarantees the validity |
| * of path or throws {@link IOException} |
| * @param outStreamSupplier A function that supplies the {@link OutputStream} object |
| * @return The metadata of the download session, including blob's version and download destination |
| * @throws KeyNotFoundException Thrown if key to retrieve blob is invalid |
| * @throws AuthorizationException Thrown if the retrieval is not under security authorization |
| * @throws IOException Thrown if any IO error occurs |
| */ |
| protected DownloadMeta fetch(ClientBlobStore store, String key, |
| IOFunction<Long, Path> pathSupplier, |
| IOFunction<File, OutputStream> outStreamSupplier) |
| throws KeyNotFoundException, AuthorizationException, IOException { |
| |
| try (InputStreamWithMeta in = store.getBlob(key)) { |
| long newVersion = in.getVersion(); |
| long currentVersion = getLocalVersion(); |
| if (newVersion == currentVersion) { |
| LOG.warn("The version did not change, but going to download again {} {}", currentVersion, key); |
| } |
| |
| //Make sure the parent directory is there and ready to go |
| Path downloadPath = pathSupplier.apply(newVersion); |
| LOG.debug("Downloading {} to {}", key, downloadPath); |
| |
| long duration; |
| long totalRead = 0; |
| try (OutputStream out = outStreamSupplier.apply(downloadPath.toFile())) { |
| long startTime = Time.nanoTime(); |
| |
| byte[] buffer = new byte[4096]; |
| int read; |
| while ((read = in.read(buffer)) >= 0) { |
| out.write(buffer, 0, read); |
| totalRead += read; |
| } |
| |
| duration = Time.nanoTime() - startTime; |
| } |
| |
| long expectedSize = in.getFileLength(); |
| if (totalRead != expectedSize) { |
| throw new IOException("We expected to download " + expectedSize + " bytes but found we got " + totalRead); |
| } else { |
| double downloadRate = ((double) totalRead * 1e3) / duration; |
| fetchingRate.update(Math.round(downloadRate)); |
| } |
| return new DownloadMeta(downloadPath, newVersion); |
| } |
| } |
| |
| /** |
| * Get the version of the blob cached locally. If the version is unknown or it has not been downloaded NOT_DOWNLOADED_VERSION should be |
| * returned. PRECONDITION: this can only be called with a lock on this instance held. |
| */ |
| public abstract long getLocalVersion(); |
| |
| /** |
| * Get the version of the blob in the blob store. PRECONDITION: this can only be called with a lock on this instance held. |
| */ |
| public abstract long getRemoteVersion(ClientBlobStore store) throws KeyNotFoundException, AuthorizationException; |
| |
| /** |
| * Download the latest version to a temp location. This may also include unzipping some or all of the data to a temp location. |
| * PRECONDITION: this can only be called with a lock on this instance held. |
| * |
| * @param store the store to us to download the data. |
| * @return the version that was downloaded. |
| */ |
| public abstract long fetchUnzipToTemp(ClientBlobStore store) throws IOException, KeyNotFoundException, AuthorizationException; |
| |
| /** |
| * Commit the new version and make it available for the end user. |
| * PRECONDITION: uncompressToTempLocationIfNeeded will have been called. |
| * PRECONDITION: this can only be called with a lock on this instance held. |
| * @param version the version of the blob to commit. |
| */ |
| protected abstract void commitNewVersion(long version) throws IOException; |
| |
| /** |
| * Clean up any temporary files. This will be called after updating a blob, either successfully or if an error has occured. |
| * The goal is to find any files that may be left over and remove them so space is not leaked. |
| * PRECONDITION: this can only be called with a lock on this instance held. |
| */ |
| public abstract void cleanupOrphanedData() throws IOException; |
| |
| /** |
| * Completely remove anything that is cached locally for this blob and all tracking files also stored for it. |
| * This will be called after the blob was determined to no longer be needed in the cache. |
| * PRECONDITION: this can only be called with a lock on this instance held. |
| */ |
| public abstract void completelyRemove() throws IOException; |
| |
| /** |
| * Get the amount of disk space that is used by this blob. If the blob is uncompressed it should be the sum of the space used by all |
| * of the uncompressed files. In general this will not be called with any locks held so it is a good idea to cache it and updated it |
| * when committing a new version. |
| */ |
| public abstract long getSizeOnDisk(); |
| |
| /** |
| * Get the size of p in bytes. |
| * @param p the path to read. |
| * @return the size of p in bytes. |
| */ |
| protected static long getSizeOnDisk(Path p) throws IOException { |
| if (!Files.exists(p)) { |
| return 0; |
| } else if (Files.isRegularFile(p)) { |
| return Files.size(p); |
| } else { |
| //We will not follow sym links |
| return Files.walk(p) |
| .filter((subp) -> Files.isRegularFile(subp, LinkOption.NOFOLLOW_LINKS)) |
| .mapToLong((subp) -> { |
| try { |
| return Files.size(subp); |
| } catch (IOException e) { |
| LOG.warn("Could not get the size of {}", subp); |
| } |
| return 0; |
| }).sum(); |
| } |
| } |
| |
| /** |
| * Updates the last updated time. This should be called when references are added or removed. |
| */ |
| protected void touch() { |
| lastUsed.set(Time.currentTimeMillis()); |
| LOG.debug("Setting {} ts to {}", blobKey, lastUsed.get()); |
| } |
| |
| /** |
| * Get the last time that this used for LRU calculations. |
| */ |
| public long getLastUsed() { |
| return lastUsed.get(); |
| } |
| |
| /** |
| * Return true if this blob is actively being used, else false (meaning it can be deleted, but might not be). |
| */ |
| public boolean isUsed() { |
| return !references.isEmpty(); |
| } |
| |
| /** |
| * Mark that a given port and assignment are using this. |
| * @param pna the slot and assignment that are using this blob. |
| * @param cb an optional callback indicating that they want to know/synchronize when a blob is updated. |
| */ |
| public void addReference(final PortAndAssignment pna, BlobChangingCallback cb) { |
| LOG.debug("Adding reference {}", pna); |
| if (cb == null) { |
| cb = NOOP_CB; |
| } |
| if (references.put(pna, cb) != null) { |
| LOG.warn("{} already has a reservation for {}", pna, blobDescription); |
| } |
| } |
| |
| /** |
| * Removes a reservation for this blob from a given slot and assignemnt. |
| * @param pna the slot + assignment that no longer needs this blob. |
| */ |
| public void removeReference(final PortAndAssignment pna) { |
| LOG.debug("Removing reference {}", pna); |
| if (references.remove(pna) == null) { |
| LOG.warn("{} had no reservation for {}", pna, blobDescription); |
| } |
| touch(); |
| } |
| |
| /** |
| * Inform all of the callbacks that a change is going to happen and then wait for them to all get back that it is OK to make that |
| * change. Commit the new version once all callbacks are ready. Finally inform all callbacks that the commit is complete. |
| */ |
| public synchronized void informReferencesAndCommitNewVersion(long newVersion) throws IOException { |
| CompletableFuture<Void> doneUpdating = informAllOfChangeAndWaitForConsensus(); |
| commitNewVersion(newVersion); |
| doneUpdating.complete(null); |
| } |
| |
| /** |
| * Inform all of the callbacks that a change is going to happen and then wait for |
| * them to all get back that it is OK to make that change. |
| * |
| * @return A future to complete when the change is committed |
| */ |
| private CompletableFuture<Void> informAllOfChangeAndWaitForConsensus() { |
| HashMap<PortAndAssignment, BlobChangingCallback> refsCopy = new HashMap<>(references); |
| CountDownLatch cdl = new CountDownLatch(refsCopy.size()); |
| CompletableFuture<Void> doneUpdating = new CompletableFuture<>(); |
| for (Map.Entry<PortAndAssignment, BlobChangingCallback> entry : refsCopy.entrySet()) { |
| GoodToGo gtg = new GoodToGo(cdl, doneUpdating); |
| try { |
| PortAndAssignment pna = entry.getKey(); |
| BlobChangingCallback cb = entry.getValue(); |
| cb.blobChanging(pna.getAssignment(), pna.getPort(), this, gtg); |
| } finally { |
| gtg.countDownIfLatchWasNotGotten(); |
| } |
| } |
| try { |
| cdl.await(3, TimeUnit.MINUTES); |
| } catch (InterruptedException e) { |
| //Interrupted is thrown when we are shutting down. |
| // So just ignore it for now... |
| } |
| return doneUpdating; |
| } |
| |
| /** |
| * Get the key for this blob. |
| */ |
| public String getKey() { |
| return blobKey; |
| } |
| |
| |
| public Collection<PortAndAssignment> getDependencies() { |
| return references.keySet(); |
| } |
| |
| public abstract boolean isFullyDownloaded(); |
| |
| static class DownloadMeta { |
| private final Path downloadPath; |
| private final long version; |
| |
| DownloadMeta(Path downloadPath, long version) { |
| this.downloadPath = downloadPath; |
| this.version = version; |
| } |
| |
| public Path getDownloadPath() { |
| return downloadPath; |
| } |
| |
| public long getVersion() { |
| return version; |
| } |
| } |
| |
| } |