| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.runtime.blob; |
| |
| import org.apache.flink.annotation.VisibleForTesting; |
| import org.apache.flink.api.common.JobID; |
| import org.apache.flink.api.java.tuple.Tuple2; |
| import org.apache.flink.configuration.BlobServerOptions; |
| import org.apache.flink.configuration.Configuration; |
| import org.apache.flink.configuration.JobManagerOptions; |
| import org.apache.flink.configuration.SecurityOptions; |
| import org.apache.flink.runtime.net.SSLUtils; |
| import org.apache.flink.util.ExceptionUtils; |
| import org.apache.flink.util.FileUtils; |
| import org.apache.flink.util.NetUtils; |
| import org.apache.flink.util.ShutdownHookUtil; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.annotation.Nullable; |
| import javax.net.ServerSocketFactory; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.ServerSocket; |
| import java.security.MessageDigest; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.Timer; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import static org.apache.flink.runtime.blob.BlobKey.BlobType.PERMANENT_BLOB; |
| import static org.apache.flink.runtime.blob.BlobKey.BlobType.TRANSIENT_BLOB; |
| import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE; |
| import static org.apache.flink.util.Preconditions.checkArgument; |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** |
| * This class implements the BLOB server. The BLOB server is responsible for listening for incoming |
| * requests and spawning threads to handle these requests. Furthermore, it takes care of creating |
| * the directory structure to store the BLOBs or temporarily cache them. |
| */ |
| public class BlobServer extends Thread |
| implements BlobService, BlobWriter, PermanentBlobService, TransientBlobService { |
| |
| /** The log object used for debugging. */ |
| private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class); |
| |
| /** Counter to generate unique names for temporary files. */ |
| private final AtomicLong tempFileCounter = new AtomicLong(0); |
| |
| /** The server socket listening for incoming connections. */ |
| private final ServerSocket serverSocket; |
| |
| /** Blob Server configuration. */ |
| private final Configuration blobServiceConfiguration; |
| |
| /** Indicates whether a shutdown of server component has been requested. */ |
| private final AtomicBoolean shutdownRequested = new AtomicBoolean(); |
| |
| /** Root directory for local file storage. */ |
| private final File storageDir; |
| |
| /** Blob store for distributed file storage, e.g. in HA. */ |
| private final BlobStore blobStore; |
| |
| /** Set of currently running threads. */ |
| private final Set<BlobServerConnection> activeConnections = new HashSet<>(); |
| |
| /** The maximum number of concurrent connections. */ |
| private final int maxConnections; |
| |
| /** Lock guarding concurrent file accesses. */ |
| private final ReadWriteLock readWriteLock; |
| |
| /** Shutdown hook thread to ensure deletion of the local storage directory. */ |
| private final Thread shutdownHook; |
| |
| // -------------------------------------------------------------------------------------------- |
| |
| /** |
| * Map to store the TTL of each element stored in the local storage, i.e. via one of the {@link |
| * #getFile} methods. |
| */ |
| private final ConcurrentHashMap<Tuple2<JobID, TransientBlobKey>, Long> blobExpiryTimes = |
| new ConcurrentHashMap<>(); |
| |
| /** Time interval (ms) to run the cleanup task; also used as the default TTL. */ |
| private final long cleanupInterval; |
| |
| /** Timer task to execute the cleanup at regular intervals. */ |
| private final Timer cleanupTimer; |
| |
| /** |
| * Instantiates a new BLOB server and binds it to a free network port. |
| * |
| * @param config Configuration to be used to instantiate the BlobServer |
| * @param blobStore BlobStore to store blobs persistently |
| * @throws IOException thrown if the BLOB server cannot bind to a free network port or if the |
| * (local or distributed) file storage cannot be created or is not usable |
| */ |
| public BlobServer(Configuration config, BlobStore blobStore) throws IOException { |
| this.blobServiceConfiguration = checkNotNull(config); |
| this.blobStore = checkNotNull(blobStore); |
| this.readWriteLock = new ReentrantReadWriteLock(); |
| |
| // configure and create the storage directory |
| this.storageDir = BlobUtils.initLocalStorageDirectory(config); |
| LOG.info("Created BLOB server storage directory {}", storageDir); |
| |
| // configure the maximum number of concurrent connections |
| final int maxConnections = config.getInteger(BlobServerOptions.FETCH_CONCURRENT); |
| if (maxConnections >= 1) { |
| this.maxConnections = maxConnections; |
| } else { |
| LOG.warn( |
| "Invalid value for maximum connections in BLOB server: {}. Using default value of {}", |
| maxConnections, |
| BlobServerOptions.FETCH_CONCURRENT.defaultValue()); |
| this.maxConnections = BlobServerOptions.FETCH_CONCURRENT.defaultValue(); |
| } |
| |
| // configure the backlog of connections |
| int backlog = config.getInteger(BlobServerOptions.FETCH_BACKLOG); |
| if (backlog < 1) { |
| LOG.warn( |
| "Invalid value for BLOB connection backlog: {}. Using default value of {}", |
| backlog, |
| BlobServerOptions.FETCH_BACKLOG.defaultValue()); |
| backlog = BlobServerOptions.FETCH_BACKLOG.defaultValue(); |
| } |
| |
| // Initializing the clean up task |
| this.cleanupTimer = new Timer(true); |
| |
| this.cleanupInterval = config.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000; |
| this.cleanupTimer.schedule( |
| new TransientBlobCleanupTask( |
| blobExpiryTimes, readWriteLock.writeLock(), storageDir, LOG), |
| cleanupInterval, |
| cleanupInterval); |
| |
| this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG); |
| |
| // ----------------------- start the server ------------------- |
| |
| final String serverPortRange = config.getString(BlobServerOptions.PORT); |
| final Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange); |
| |
| final ServerSocketFactory socketFactory; |
| if (SecurityOptions.isInternalSSLEnabled(config) |
| && config.getBoolean(BlobServerOptions.SSL_ENABLED)) { |
| try { |
| socketFactory = SSLUtils.createSSLServerSocketFactory(config); |
| } catch (Exception e) { |
| throw new IOException("Failed to initialize SSL for the blob server", e); |
| } |
| } else { |
| socketFactory = ServerSocketFactory.getDefault(); |
| } |
| |
| final int finalBacklog = backlog; |
| final String bindHost = |
| config.getOptional(JobManagerOptions.BIND_HOST) |
| .orElseGet(NetUtils::getWildcardIPAddress); |
| |
| this.serverSocket = |
| NetUtils.createSocketFromPorts( |
| ports, |
| (port) -> |
| socketFactory.createServerSocket( |
| port, finalBacklog, InetAddress.getByName(bindHost))); |
| |
| if (serverSocket == null) { |
| throw new IOException( |
| "Unable to open BLOB Server in specified port range: " + serverPortRange); |
| } |
| |
| // start the server thread |
| setName("BLOB Server listener at " + getPort()); |
| setDaemon(true); |
| |
| if (LOG.isInfoEnabled()) { |
| LOG.info( |
| "Started BLOB server at {}:{} - max concurrent requests: {} - max backlog: {}", |
| serverSocket.getInetAddress().getHostAddress(), |
| getPort(), |
| maxConnections, |
| backlog); |
| } |
| } |
| |
| // -------------------------------------------------------------------------------------------- |
| // Path Accessors |
| // -------------------------------------------------------------------------------------------- |
| |
| public File getStorageDir() { |
| return storageDir; |
| } |
| |
| /** |
| * Returns a file handle to the file associated with the given blob key on the blob server. |
| * |
| * <p><strong>This is only called from {@link BlobServerConnection} or unit tests.</strong> |
| * |
| * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated) |
| * @param key identifying the file |
| * @return file handle to the file |
| * @throws IOException if creating the directory fails |
| */ |
| @VisibleForTesting |
| public File getStorageLocation(@Nullable JobID jobId, BlobKey key) throws IOException { |
| return BlobUtils.getStorageLocation(storageDir, jobId, key); |
| } |
| |
| /** |
| * Returns a temporary file inside the BLOB server's incoming directory. |
| * |
| * @return a temporary file inside the BLOB server's incoming directory |
| * @throws IOException if creating the directory fails |
| */ |
| File createTemporaryFilename() throws IOException { |
| return new File( |
| BlobUtils.getIncomingDirectory(storageDir), |
| String.format("temp-%08d", tempFileCounter.getAndIncrement())); |
| } |
| |
| /** Returns the lock used to guard file accesses. */ |
| ReadWriteLock getReadWriteLock() { |
| return readWriteLock; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| while (!this.shutdownRequested.get()) { |
| BlobServerConnection conn = new BlobServerConnection(serverSocket.accept(), this); |
| try { |
| synchronized (activeConnections) { |
| while (activeConnections.size() >= maxConnections) { |
| activeConnections.wait(2000); |
| } |
| activeConnections.add(conn); |
| } |
| |
| conn.start(); |
| conn = null; |
| } finally { |
| if (conn != null) { |
| conn.close(); |
| synchronized (activeConnections) { |
| activeConnections.remove(conn); |
| } |
| } |
| } |
| } |
| } catch (Throwable t) { |
| if (!this.shutdownRequested.get()) { |
| LOG.error("BLOB server stopped working. Shutting down", t); |
| |
| try { |
| close(); |
| } catch (Throwable closeThrowable) { |
| LOG.error("Could not properly close the BlobServer.", closeThrowable); |
| } |
| } |
| } |
| } |
| |
| /** Shuts down the BLOB server. */ |
| @Override |
| public void close() throws IOException { |
| cleanupTimer.cancel(); |
| |
| if (shutdownRequested.compareAndSet(false, true)) { |
| Exception exception = null; |
| |
| try { |
| this.serverSocket.close(); |
| } catch (IOException ioe) { |
| exception = ioe; |
| } |
| |
| // wake the thread up, in case it is waiting on some operation |
| interrupt(); |
| |
| try { |
| join(); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| |
| LOG.debug("Error while waiting for this thread to die.", ie); |
| } |
| |
| synchronized (activeConnections) { |
| if (!activeConnections.isEmpty()) { |
| for (BlobServerConnection conn : activeConnections) { |
| LOG.debug("Shutting down connection {}.", conn.getName()); |
| conn.close(); |
| } |
| activeConnections.clear(); |
| } |
| } |
| |
| // Clean up the storage directory |
| try { |
| FileUtils.deleteDirectory(storageDir); |
| } catch (IOException e) { |
| exception = ExceptionUtils.firstOrSuppressed(e, exception); |
| } |
| |
| // Remove shutdown hook to prevent resource leaks |
| ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); |
| |
| if (LOG.isInfoEnabled()) { |
| LOG.info( |
| "Stopped BLOB server at {}:{}", |
| serverSocket.getInetAddress().getHostAddress(), |
| getPort()); |
| } |
| |
| ExceptionUtils.tryRethrowIOException(exception); |
| } |
| } |
| |
| protected BlobClient createClient() throws IOException { |
| return new BlobClient( |
| new InetSocketAddress(serverSocket.getInetAddress(), getPort()), |
| blobServiceConfiguration); |
| } |
| |
| /** |
| * Retrieves the local path of a (job-unrelated) file associated with a job and a blob key. |
| * |
| * <p>The blob server looks the blob key up in its local storage. If the file exists, it is |
| * returned. If the file does not exist, it is retrieved from the HA blob store (if available) |
| * or a {@link FileNotFoundException} is thrown. |
| * |
| * @param key blob key associated with the requested file |
| * @return file referring to the local storage location of the BLOB |
| * @throws IOException Thrown if the file retrieval failed. |
| */ |
| @Override |
| public File getFile(TransientBlobKey key) throws IOException { |
| return getFileInternal(null, key); |
| } |
| |
| /** |
| * Retrieves the local path of a file associated with a job and a blob key. |
| * |
| * <p>The blob server looks the blob key up in its local storage. If the file exists, it is |
| * returned. If the file does not exist, it is retrieved from the HA blob store (if available) |
| * or a {@link FileNotFoundException} is thrown. |
| * |
| * @param jobId ID of the job this blob belongs to |
| * @param key blob key associated with the requested file |
| * @return file referring to the local storage location of the BLOB |
| * @throws IOException Thrown if the file retrieval failed. |
| */ |
| @Override |
| public File getFile(JobID jobId, TransientBlobKey key) throws IOException { |
| checkNotNull(jobId); |
| return getFileInternal(jobId, key); |
| } |
| |
| /** |
| * Returns the path to a local copy of the file associated with the provided job ID and blob |
| * key. |
| * |
| * <p>We will first attempt to serve the BLOB from the local storage. If the BLOB is not in |
| * there, we will try to download it from the HA store. |
| * |
| * @param jobId ID of the job this blob belongs to |
| * @param key blob key associated with the requested file |
| * @return The path to the file. |
| * @throws java.io.FileNotFoundException if the BLOB does not exist; |
| * @throws IOException if any other error occurs when retrieving the file |
| */ |
| @Override |
| public File getFile(JobID jobId, PermanentBlobKey key) throws IOException { |
| checkNotNull(jobId); |
| return getFileInternal(jobId, key); |
| } |
| |
| /** |
| * Retrieves the local path of a file associated with a job and a blob key. |
| * |
| * <p>The blob server looks the blob key up in its local storage. If the file exists, it is |
| * returned. If the file does not exist, it is retrieved from the HA blob store (if available) |
| * or a {@link FileNotFoundException} is thrown. |
| * |
| * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated) |
| * @param blobKey blob key associated with the requested file |
| * @return file referring to the local storage location of the BLOB |
| * @throws IOException Thrown if the file retrieval failed. |
| */ |
| private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException { |
| checkArgument(blobKey != null, "BLOB key cannot be null."); |
| |
| final File localFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey); |
| readWriteLock.readLock().lock(); |
| |
| try { |
| getFileInternal(jobId, blobKey, localFile); |
| return localFile; |
| } finally { |
| readWriteLock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Helper to retrieve the local path of a file associated with a job and a blob key. |
| * |
| * <p>The blob server looks the blob key up in its local storage. If the file exists, it is |
| * returned. If the file does not exist, it is retrieved from the HA blob store (if available) |
| * or a {@link FileNotFoundException} is thrown. |
| * |
| * <p><strong>Assumes the read lock has already been acquired.</strong> |
| * |
| * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated) |
| * @param blobKey blob key associated with the requested file |
| * @param localFile (local) file where the blob is/should be stored |
| * @throws IOException Thrown if the file retrieval failed. |
| */ |
| void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, File localFile) |
| throws IOException { |
| // assume readWriteLock.readLock() was already locked (cannot really check that) |
| |
| if (localFile.exists()) { |
| // update TTL for transient BLOBs: |
| if (blobKey instanceof TransientBlobKey) { |
| // regarding concurrent operations, it is not really important which timestamp makes |
| // it into the map as they are close to each other anyway, also we can simply |
| // overwrite old values as long as we are in the read (or write) lock |
| blobExpiryTimes.put( |
| Tuple2.of(jobId, (TransientBlobKey) blobKey), |
| System.currentTimeMillis() + cleanupInterval); |
| } |
| return; |
| } else if (blobKey instanceof PermanentBlobKey) { |
| // Try the HA blob store |
| // first we have to release the read lock in order to acquire the write lock |
| readWriteLock.readLock().unlock(); |
| |
| // use a temporary file (thread-safe without locking) |
| File incomingFile = null; |
| try { |
| incomingFile = createTemporaryFilename(); |
| blobStore.get(jobId, blobKey, incomingFile); |
| |
| readWriteLock.writeLock().lock(); |
| try { |
| BlobUtils.moveTempFileToStore( |
| incomingFile, jobId, blobKey, localFile, LOG, null); |
| } finally { |
| readWriteLock.writeLock().unlock(); |
| } |
| |
| return; |
| } finally { |
| // delete incomingFile from a failed download |
| if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) { |
| LOG.warn( |
| "Could not delete the staging file {} for blob key {} and job {}.", |
| incomingFile, |
| blobKey, |
| jobId); |
| } |
| |
| // re-acquire lock so that it can be unlocked again outside |
| readWriteLock.readLock().lock(); |
| } |
| } |
| |
| throw new FileNotFoundException( |
| "Local file " |
| + localFile |
| + " does not exist " |
| + "and failed to copy from blob store."); |
| } |
| |
| @Override |
| public TransientBlobKey putTransient(byte[] value) throws IOException { |
| return (TransientBlobKey) putBuffer(null, value, TRANSIENT_BLOB); |
| } |
| |
| @Override |
| public TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException { |
| checkNotNull(jobId); |
| return (TransientBlobKey) putBuffer(jobId, value, TRANSIENT_BLOB); |
| } |
| |
| @Override |
| public TransientBlobKey putTransient(InputStream inputStream) throws IOException { |
| return (TransientBlobKey) putInputStream(null, inputStream, TRANSIENT_BLOB); |
| } |
| |
| @Override |
| public TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException { |
| checkNotNull(jobId); |
| return (TransientBlobKey) putInputStream(jobId, inputStream, TRANSIENT_BLOB); |
| } |
| |
| @Override |
| public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException { |
| checkNotNull(jobId); |
| return (PermanentBlobKey) putBuffer(jobId, value, PERMANENT_BLOB); |
| } |
| |
| @Override |
| public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException { |
| checkNotNull(jobId); |
| return (PermanentBlobKey) putInputStream(jobId, inputStream, PERMANENT_BLOB); |
| } |
| |
| /** |
| * Uploads the data of the given byte array for the given job to the BLOB server. |
| * |
| * @param jobId the ID of the job the BLOB belongs to |
| * @param value the buffer to upload |
| * @param blobType whether to make the data permanent or transient |
| * @return the computed BLOB key identifying the BLOB on the server |
| * @throws IOException thrown if an I/O error occurs while writing it to a local file, or |
| * uploading it to the HA store |
| */ |
| private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, BlobKey.BlobType blobType) |
| throws IOException { |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Received PUT call for BLOB of job {}.", jobId); |
| } |
| |
| File incomingFile = createTemporaryFilename(); |
| MessageDigest md = BlobUtils.createMessageDigest(); |
| BlobKey blobKey = null; |
| try (FileOutputStream fos = new FileOutputStream(incomingFile)) { |
| md.update(value); |
| fos.write(value); |
| } catch (IOException ioe) { |
| // delete incomingFile from a failed download |
| if (!incomingFile.delete() && incomingFile.exists()) { |
| LOG.warn("Could not delete the staging file {} for job {}.", incomingFile, jobId); |
| } |
| throw ioe; |
| } |
| |
| try { |
| // persist file |
| blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType); |
| |
| return blobKey; |
| } finally { |
| // delete incomingFile from a failed download |
| if (!incomingFile.delete() && incomingFile.exists()) { |
| LOG.warn( |
| "Could not delete the staging file {} for blob key {} and job {}.", |
| incomingFile, |
| blobKey, |
| jobId); |
| } |
| } |
| } |
| |
| /** |
| * Uploads the data from the given input stream for the given job to the BLOB server. |
| * |
| * @param jobId the ID of the job the BLOB belongs to |
| * @param inputStream the input stream to read the data from |
| * @param blobType whether to make the data permanent or transient |
| * @return the computed BLOB key identifying the BLOB on the server |
| * @throws IOException thrown if an I/O error occurs while reading the data from the input |
| * stream, writing it to a local file, or uploading it to the HA store |
| */ |
| private BlobKey putInputStream( |
| @Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType) |
| throws IOException { |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Received PUT call for BLOB of job {}.", jobId); |
| } |
| |
| File incomingFile = createTemporaryFilename(); |
| BlobKey blobKey = null; |
| try { |
| MessageDigest md = writeStreamToFileAndCreateDigest(inputStream, incomingFile); |
| |
| // persist file |
| blobKey = moveTempFileToStore(incomingFile, jobId, md.digest(), blobType); |
| |
| return blobKey; |
| } finally { |
| // delete incomingFile from a failed download |
| if (!incomingFile.delete() && incomingFile.exists()) { |
| LOG.warn( |
| "Could not delete the staging file {} for blob key {} and job {}.", |
| incomingFile, |
| blobKey, |
| jobId); |
| } |
| } |
| } |
| |
| private static MessageDigest writeStreamToFileAndCreateDigest( |
| InputStream inputStream, File file) throws IOException { |
| try (FileOutputStream fos = new FileOutputStream(file)) { |
| MessageDigest md = BlobUtils.createMessageDigest(); |
| // read stream |
| byte[] buf = new byte[BUFFER_SIZE]; |
| while (true) { |
| final int bytesRead = inputStream.read(buf); |
| if (bytesRead == -1) { |
| // done |
| break; |
| } |
| fos.write(buf, 0, bytesRead); |
| md.update(buf, 0, bytesRead); |
| } |
| return md; |
| } |
| } |
| |
| /** |
| * Moves the temporary <tt>incomingFile</tt> to its permanent location where it is available for |
| * use. |
| * |
| * @param incomingFile temporary file created during transfer |
| * @param jobId ID of the job this blob belongs to or <tt>null</tt> if job-unrelated |
| * @param digest BLOB content digest, i.e. hash |
| * @param blobType whether this file is a permanent or transient BLOB |
| * @return unique BLOB key that identifies the BLOB on the server |
| * @throws IOException thrown if an I/O error occurs while moving the file or uploading it to |
| * the HA store |
| */ |
| BlobKey moveTempFileToStore( |
| File incomingFile, @Nullable JobID jobId, byte[] digest, BlobKey.BlobType blobType) |
| throws IOException { |
| |
| int retries = 10; |
| |
| int attempt = 0; |
| while (true) { |
| // add unique component independent of the BLOB content |
| BlobKey blobKey = BlobKey.createKey(blobType, digest); |
| File storageFile = BlobUtils.getStorageLocation(storageDir, jobId, blobKey); |
| |
| // try again until the key is unique (put the existence check into the lock!) |
| readWriteLock.writeLock().lock(); |
| try { |
| if (!storageFile.exists()) { |
| BlobUtils.moveTempFileToStore( |
| incomingFile, |
| jobId, |
| blobKey, |
| storageFile, |
| LOG, |
| blobKey instanceof PermanentBlobKey ? blobStore : null); |
| // add TTL for transient BLOBs: |
| if (blobKey instanceof TransientBlobKey) { |
| // must be inside read or write lock to add a TTL |
| blobExpiryTimes.put( |
| Tuple2.of(jobId, (TransientBlobKey) blobKey), |
| System.currentTimeMillis() + cleanupInterval); |
| } |
| return blobKey; |
| } |
| } finally { |
| readWriteLock.writeLock().unlock(); |
| } |
| |
| ++attempt; |
| if (attempt >= retries) { |
| String message = |
| "Failed to find a unique key for BLOB of job " |
| + jobId |
| + " (last tried " |
| + storageFile.getAbsolutePath() |
| + "."; |
| LOG.error(message + " No retries left."); |
| throw new IOException(message); |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug( |
| "Trying to find a unique key for BLOB of job {} (retry {}, last tried {})", |
| jobId, |
| attempt, |
| storageFile.getAbsolutePath()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Deletes the (job-unrelated) file associated with the blob key in the local storage of the |
| * blob server. |
| * |
| * @param key blob key associated with the file to be deleted |
| * @return <tt>true</tt> if the given blob is successfully deleted or non-existing; |
| * <tt>false</tt> otherwise |
| */ |
| @Override |
| public boolean deleteFromCache(TransientBlobKey key) { |
| return deleteInternal(null, key); |
| } |
| |
| /** |
| * Deletes the file associated with the blob key in the local storage of the blob server. |
| * |
| * @param jobId ID of the job this blob belongs to |
| * @param key blob key associated with the file to be deleted |
| * @return <tt>true</tt> if the given blob is successfully deleted or non-existing; |
| * <tt>false</tt> otherwise |
| */ |
| @Override |
| public boolean deleteFromCache(JobID jobId, TransientBlobKey key) { |
| checkNotNull(jobId); |
| return deleteInternal(jobId, key); |
| } |
| |
| /** |
| * Deletes the file associated with the blob key in the local storage of the blob server. |
| * |
| * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated) |
| * @param key blob key associated with the file to be deleted |
| * @return <tt>true</tt> if the given blob is successfully deleted or non-existing; |
| * <tt>false</tt> otherwise |
| */ |
| boolean deleteInternal(@Nullable JobID jobId, TransientBlobKey key) { |
| final File localFile = |
| new File( |
| BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key)); |
| |
| readWriteLock.writeLock().lock(); |
| |
| try { |
| if (!localFile.delete() && localFile.exists()) { |
| LOG.warn( |
| "Failed to locally delete BLOB " |
| + key |
| + " at " |
| + localFile.getAbsolutePath()); |
| return false; |
| } |
| // this needs to happen inside the write lock in case of concurrent getFile() calls |
| blobExpiryTimes.remove(Tuple2.of(jobId, key)); |
| return true; |
| } finally { |
| readWriteLock.writeLock().unlock(); |
| } |
| } |
| |
| /** |
| * Deletes the file associated with the blob key in the local storage of the blob server. |
| * |
| * @param jobId ID of the job this blob belongs to (or <tt>null</tt> if job-unrelated) |
| * @param key blob key associated with the file to be deleted |
| * @return <tt>true</tt> if the given blob is successfully deleted or non-existing; |
| * <tt>false</tt> otherwise |
| */ |
| private boolean deleteInternal(JobID jobId, PermanentBlobKey key) { |
| final File localFile = |
| new File( |
| BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId, key)); |
| |
| readWriteLock.writeLock().lock(); |
| |
| try { |
| boolean deleteLocally = true; |
| if (!localFile.delete() && localFile.exists()) { |
| LOG.warn( |
| "Failed to locally delete BLOB " |
| + key |
| + " at " |
| + localFile.getAbsolutePath()); |
| deleteLocally = false; |
| } |
| // this needs to happen inside the write lock in case of concurrent getFile() calls |
| boolean deleteHA = blobStore.delete(jobId, key); |
| return deleteLocally && deleteHA; |
| } finally { |
| readWriteLock.writeLock().unlock(); |
| } |
| } |
| |
| /** |
| * Delete the uploaded data with the given {@link JobID} and {@link PermanentBlobKey}. |
| * |
| * @param jobId ID of the job this blob belongs to |
| * @param key the key of this blob |
| */ |
| @Override |
| public boolean deletePermanent(JobID jobId, PermanentBlobKey key) { |
| return deleteInternal(jobId, key); |
| } |
| |
| /** |
| * Removes all BLOBs from local and HA store belonging to the given job ID. |
| * |
| * @param jobId ID of the job this blob belongs to |
| * @param cleanupBlobStoreFiles True if the corresponding blob store files shall be cleaned up |
| * as well. Otherwise false. |
| * @return <tt>true</tt> if the job directory is successfully deleted or non-existing; |
| * <tt>false</tt> otherwise |
| */ |
| public boolean cleanupJob(JobID jobId, boolean cleanupBlobStoreFiles) { |
| checkNotNull(jobId); |
| |
| final File jobDir = |
| new File(BlobUtils.getStorageLocationPath(storageDir.getAbsolutePath(), jobId)); |
| |
| readWriteLock.writeLock().lock(); |
| |
| try { |
| // delete locally |
| boolean deletedLocally = false; |
| try { |
| FileUtils.deleteDirectory(jobDir); |
| |
| // NOTE: Instead of going through blobExpiryTimes, keep lingering entries - they |
| // will be cleaned up by the timer task which tolerates non-existing files |
| // If inserted again with the same IDs (via put()), the TTL will be updated |
| // again. |
| |
| deletedLocally = true; |
| } catch (IOException e) { |
| LOG.warn( |
| "Failed to locally delete BLOB storage directory at " |
| + jobDir.getAbsolutePath(), |
| e); |
| } |
| |
| // delete in HA blob store files |
| final boolean deletedHA = !cleanupBlobStoreFiles || blobStore.deleteAll(jobId); |
| |
| return deletedLocally && deletedHA; |
| } finally { |
| readWriteLock.writeLock().unlock(); |
| } |
| } |
| |
| @Override |
| public PermanentBlobService getPermanentBlobService() { |
| return this; |
| } |
| |
| @Override |
| public TransientBlobService getTransientBlobService() { |
| return this; |
| } |
| |
| /** |
| * Returns the configuration used by the BLOB server. |
| * |
| * @return configuration |
| */ |
| @Override |
| public final int getMinOffloadingSize() { |
| return blobServiceConfiguration.getInteger(BlobServerOptions.OFFLOAD_MINSIZE); |
| } |
| |
| /** |
| * Returns the port on which the server is listening. |
| * |
| * @return port on which the server is listening |
| */ |
| @Override |
| public int getPort() { |
| return this.serverSocket.getLocalPort(); |
| } |
| |
| /** |
| * Returns the blob expiry times - for testing purposes only! |
| * |
| * @return blob expiry times (internal state!) |
| */ |
| @VisibleForTesting |
| ConcurrentMap<Tuple2<JobID, TransientBlobKey>, Long> getBlobExpiryTimes() { |
| return blobExpiryTimes; |
| } |
| |
| /** |
| * Tests whether the BLOB server has been requested to shut down. |
| * |
| * @return True, if the server has been requested to shut down, false otherwise. |
| */ |
| public boolean isShutdown() { |
| return this.shutdownRequested.get(); |
| } |
| |
| /** Access to the server socket, for testing. */ |
| ServerSocket getServerSocket() { |
| return this.serverSocket; |
| } |
| |
| void unregisterConnection(BlobServerConnection conn) { |
| synchronized (activeConnections) { |
| activeConnections.remove(conn); |
| activeConnections.notifyAll(); |
| } |
| } |
| |
| /** |
| * Returns all the current active connections in the BlobServer. |
| * |
| * @return the list of all the active in current BlobServer |
| */ |
| List<BlobServerConnection> getCurrentActiveConnections() { |
| synchronized (activeConnections) { |
| return new ArrayList<>(activeConnections); |
| } |
| } |
| } |