| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.shortcircuit; |
| |
| import java.io.BufferedOutputStream; |
| import java.io.Closeable; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.nio.MappedByteBuffer; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.NoSuchElementException; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import org.apache.commons.collections.map.LinkedMap; |
| import org.apache.commons.lang.mutable.MutableBoolean; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.hdfs.ExtendedBlockId; |
| import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; |
| import org.apache.hadoop.hdfs.net.DomainPeer; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; |
| import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; |
| import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; |
| import org.apache.hadoop.hdfs.util.IOUtilsClient; |
| import org.apache.hadoop.ipc.RetriableException; |
| import org.apache.hadoop.net.unix.DomainSocket; |
| import org.apache.hadoop.net.unix.DomainSocketWatcher; |
| import org.apache.hadoop.security.token.SecretManager.InvalidToken; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.util.Time; |
| import org.apache.hadoop.util.Waitable; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * The ShortCircuitCache tracks things which the client needs to access |
| * HDFS block files via short-circuit. |
| * |
| * These things include: memory-mapped regions, file descriptors, and shared |
| * memory areas for communicating with the DataNode. |
| */ |
| @InterfaceAudience.Private |
| public class ShortCircuitCache implements Closeable { |
| public static final Logger LOG = LoggerFactory.getLogger( |
| ShortCircuitCache.class); |
| |
| /** |
| * Expiry thread which makes sure that the file descriptors get closed |
| * after a while. |
| */ |
| private class CacheCleaner implements Runnable, Closeable { |
| private ScheduledFuture<?> future; |
| |
| /** |
| * Run the CacheCleaner thread. |
| * |
| * Whenever a thread requests a ShortCircuitReplica object, we will make |
| * sure it gets one. That ShortCircuitReplica object can then be re-used |
| * when another thread requests a ShortCircuitReplica object for the same |
| * block. So in that sense, there is no maximum size to the cache. |
| * |
| * However, when a ShortCircuitReplica object is unreferenced by the |
| * thread(s) that are using it, it becomes evictable. There are two |
| * separate eviction lists-- one for mmaped objects, and another for |
| * non-mmaped objects. We do this in order to avoid having the regular |
| * files kick the mmaped files out of the cache too quickly. Reusing |
| * an already-existing mmap gives a huge performance boost, since the |
| * page table entries don't have to be re-populated. Both the mmap |
| * and non-mmap evictable lists have maximum sizes and maximum lifespans. |
| */ |
| @Override |
| public void run() { |
| ShortCircuitCache.this.lock.lock(); |
| try { |
| if (ShortCircuitCache.this.closed) return; |
| long curMs = Time.monotonicNow(); |
| |
| LOG.debug("{}: cache cleaner running at {}", this, curMs); |
| |
| int numDemoted = demoteOldEvictableMmaped(curMs); |
| int numPurged = 0; |
| Long evictionTimeNs; |
| while (!evictable.isEmpty()) { |
| Object eldestKey = evictable.firstKey(); |
| evictionTimeNs = (Long)eldestKey; |
| long evictionTimeMs = |
| TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS); |
| if (evictionTimeMs + maxNonMmappedEvictableLifespanMs >= curMs) break; |
| ShortCircuitReplica replica = (ShortCircuitReplica)evictable.get( |
| eldestKey); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("CacheCleaner: purging " + replica + ": " + |
| StringUtils.getStackTrace(Thread.currentThread())); |
| } |
| purge(replica); |
| numPurged++; |
| } |
| |
| LOG.debug("{}: finishing cache cleaner run started at {}. Demoted {} " |
| + "mmapped replicas; purged {} replicas.", |
| this, curMs, numDemoted, numPurged); |
| } finally { |
| ShortCircuitCache.this.lock.unlock(); |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| if (future != null) { |
| future.cancel(false); |
| } |
| } |
| |
| public void setFuture(ScheduledFuture<?> future) { |
| this.future = future; |
| } |
| |
| /** |
| * Get the rate at which this cleaner thread should be scheduled. |
| * |
| * We do this by taking the minimum expiration time and dividing by 4. |
| * |
| * @return the rate in milliseconds at which this thread should be |
| * scheduled. |
| */ |
| public long getRateInMs() { |
| long minLifespanMs = |
| Math.min(maxNonMmappedEvictableLifespanMs, |
| maxEvictableMmapedLifespanMs); |
| long sampleTimeMs = minLifespanMs / 4; |
| return (sampleTimeMs < 1) ? 1 : sampleTimeMs; |
| } |
| } |
| |
| /** |
| * A task which asks the DataNode to release a short-circuit shared memory |
| * slot. If successful, this will tell the DataNode to stop monitoring |
| * changes to the mlock status of the replica associated with the slot. |
| * It will also allow us (the client) to re-use this slot for another |
| * replica. If we can't communicate with the DataNode for some reason, |
| * we tear down the shared memory segment to avoid being in an inconsistent |
| * state. |
| */ |
| private class SlotReleaser implements Runnable { |
| /** |
| * The slot that we need to release. |
| */ |
| private final Slot slot; |
| |
| SlotReleaser(Slot slot) { |
| this.slot = slot; |
| } |
| |
| @Override |
| public void run() { |
| LOG.trace("{}: about to release {}", ShortCircuitCache.this, slot); |
| final DfsClientShm shm = (DfsClientShm)slot.getShm(); |
| final DomainSocket shmSock = shm.getPeer().getDomainSocket(); |
| final String path = shmSock.getPath(); |
| boolean success = false; |
| try (DomainSocket sock = DomainSocket.connect(path); |
| DataOutputStream out = new DataOutputStream( |
| new BufferedOutputStream(sock.getOutputStream()))) { |
| new Sender(out).releaseShortCircuitFds(slot.getSlotId()); |
| DataInputStream in = new DataInputStream(sock.getInputStream()); |
| ReleaseShortCircuitAccessResponseProto resp = |
| ReleaseShortCircuitAccessResponseProto.parseFrom( |
| PBHelperClient.vintPrefixed(in)); |
| if (resp.getStatus() != Status.SUCCESS) { |
| String error = resp.hasError() ? resp.getError() : "(unknown)"; |
| throw new IOException(resp.getStatus().toString() + ": " + error); |
| } |
| LOG.trace("{}: released {}", this, slot); |
| success = true; |
| } catch (IOException e) { |
| LOG.error(ShortCircuitCache.this + ": failed to release " + |
| "short-circuit shared memory slot " + slot + " by sending " + |
| "ReleaseShortCircuitAccessRequestProto to " + path + |
| ". Closing shared memory segment.", e); |
| } finally { |
| if (success) { |
| shmManager.freeSlot(slot); |
| } else { |
| shm.getEndpointShmManager().shutdown(shm); |
| } |
| } |
| } |
| } |
| |
| public interface ShortCircuitReplicaCreator { |
| /** |
| * Attempt to create a ShortCircuitReplica object. |
| * |
| * This callback will be made without holding any locks. |
| * |
| * @return a non-null ShortCircuitReplicaInfo object. |
| */ |
| ShortCircuitReplicaInfo createShortCircuitReplicaInfo(); |
| } |
| |
| /** |
| * Lock protecting the cache. |
| */ |
| private final ReentrantLock lock = new ReentrantLock(); |
| |
| /** |
| * The executor service that runs the cacheCleaner. |
| */ |
| private final ScheduledThreadPoolExecutor cleanerExecutor |
| = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder(). |
| setDaemon(true).setNameFormat("ShortCircuitCache_Cleaner"). |
| build()); |
| |
| /** |
| * The executor service that runs the cacheCleaner. |
| */ |
| private final ScheduledThreadPoolExecutor releaserExecutor |
| = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder(). |
| setDaemon(true).setNameFormat("ShortCircuitCache_SlotReleaser"). |
| build()); |
| |
| /** |
| * A map containing all ShortCircuitReplicaInfo objects, organized by Key. |
| * ShortCircuitReplicaInfo objects may contain a replica, or an InvalidToken |
| * exception. |
| */ |
| private final HashMap<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> |
| replicaInfoMap = new HashMap<>(); |
| |
| /** |
| * The CacheCleaner. We don't create this and schedule it until it becomes |
| * necessary. |
| */ |
| private CacheCleaner cacheCleaner; |
| |
| /** |
| * LinkedMap of evictable elements. |
| * |
| * Maps (unique) insertion time in nanoseconds to the element. |
| */ |
| private final LinkedMap evictable = new LinkedMap(); |
| |
| /** |
| * Maximum total size of the cache, including both mmapped and |
| * no$-mmapped elements. |
| */ |
| private int maxTotalSize; |
| |
| /** |
| * Non-mmaped elements older than this will be closed. |
| */ |
| private long maxNonMmappedEvictableLifespanMs; |
| |
| /** |
| * LinkedMap of mmaped evictable elements. |
| * |
| * Maps (unique) insertion time in nanoseconds to the element. |
| */ |
| private final LinkedMap evictableMmapped = new LinkedMap(); |
| |
| /** |
| * Maximum number of mmaped evictable elements. |
| */ |
| private int maxEvictableMmapedSize; |
| |
| /** |
| * Mmaped elements older than this will be closed. |
| */ |
| private final long maxEvictableMmapedLifespanMs; |
| |
| /** |
| * The minimum number of milliseconds we'll wait after an unsuccessful |
| * mmap attempt before trying again. |
| */ |
| private final long mmapRetryTimeoutMs; |
| |
| /** |
| * How long we will keep replicas in the cache before declaring them |
| * to be stale. |
| */ |
| private final long staleThresholdMs; |
| |
| /** |
| * True if the ShortCircuitCache is closed. |
| */ |
| private boolean closed = false; |
| |
| /** |
| * Number of existing mmaps associated with this cache. |
| */ |
| private int outstandingMmapCount = 0; |
| |
| /** |
| * Manages short-circuit shared memory segments for the client. |
| */ |
| private final DfsClientShmManager shmManager; |
| |
| public static ShortCircuitCache fromConf(ShortCircuitConf conf) { |
| return new ShortCircuitCache( |
| conf.getShortCircuitStreamsCacheSize(), |
| conf.getShortCircuitStreamsCacheExpiryMs(), |
| conf.getShortCircuitMmapCacheSize(), |
| conf.getShortCircuitMmapCacheExpiryMs(), |
| conf.getShortCircuitMmapCacheRetryTimeout(), |
| conf.getShortCircuitCacheStaleThresholdMs(), |
| conf.getShortCircuitSharedMemoryWatcherInterruptCheckMs()); |
| } |
| |
| public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs, |
| int maxEvictableMmapedSize, long maxEvictableMmapedLifespanMs, |
| long mmapRetryTimeoutMs, long staleThresholdMs, int shmInterruptCheckMs) { |
| Preconditions.checkArgument(maxTotalSize >= 0); |
| this.maxTotalSize = maxTotalSize; |
| Preconditions.checkArgument(maxNonMmappedEvictableLifespanMs >= 0); |
| this.maxNonMmappedEvictableLifespanMs = maxNonMmappedEvictableLifespanMs; |
| Preconditions.checkArgument(maxEvictableMmapedSize >= 0); |
| this.maxEvictableMmapedSize = maxEvictableMmapedSize; |
| Preconditions.checkArgument(maxEvictableMmapedLifespanMs >= 0); |
| this.maxEvictableMmapedLifespanMs = maxEvictableMmapedLifespanMs; |
| this.mmapRetryTimeoutMs = mmapRetryTimeoutMs; |
| this.staleThresholdMs = staleThresholdMs; |
| DfsClientShmManager shmManager = null; |
| if ((shmInterruptCheckMs > 0) && |
| (DomainSocketWatcher.getLoadingFailureReason() == null)) { |
| try { |
| shmManager = new DfsClientShmManager(shmInterruptCheckMs); |
| } catch (IOException e) { |
| LOG.error("failed to create ShortCircuitShmManager", e); |
| } |
| } |
| this.shmManager = shmManager; |
| } |
| |
| public long getStaleThresholdMs() { |
| return staleThresholdMs; |
| } |
| |
| @VisibleForTesting |
| public void setMaxTotalSize(int maxTotalSize) { |
| this.maxTotalSize = maxTotalSize; |
| } |
| |
| /** |
| * Increment the reference count of a replica, and remove it from any free |
| * list it may be in. |
| * |
| * You must hold the cache lock while calling this function. |
| * |
| * @param replica The replica we're removing. |
| */ |
| private void ref(ShortCircuitReplica replica) { |
| lock.lock(); |
| try { |
| Preconditions.checkArgument(replica.refCount > 0, |
| "can't ref %s because its refCount reached %d", replica, |
| replica.refCount); |
| Long evictableTimeNs = replica.getEvictableTimeNs(); |
| replica.refCount++; |
| if (evictableTimeNs != null) { |
| String removedFrom = removeEvictable(replica); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(this + ": " + removedFrom + |
| " no longer contains " + replica + ". refCount " + |
| (replica.refCount - 1) + " -> " + replica.refCount + |
| StringUtils.getStackTrace(Thread.currentThread())); |
| |
| } |
| } else if (LOG.isTraceEnabled()) { |
| LOG.trace(this + ": replica refCount " + |
| (replica.refCount - 1) + " -> " + replica.refCount + |
| StringUtils.getStackTrace(Thread.currentThread())); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Unreference a replica. |
| * |
| * You must hold the cache lock while calling this function. |
| * |
| * @param replica The replica being unreferenced. |
| */ |
| void unref(ShortCircuitReplica replica) { |
| lock.lock(); |
| try { |
| // If the replica is stale or unusable, but we haven't purged it yet, |
| // let's do that. It would be a shame to evict a non-stale replica so |
| // that we could put a stale or unusable one into the cache. |
| if (!replica.purged) { |
| String purgeReason = null; |
| if (!replica.getDataStream().getChannel().isOpen()) { |
| purgeReason = "purging replica because its data channel is closed."; |
| } else if (!replica.getMetaStream().getChannel().isOpen()) { |
| purgeReason = "purging replica because its meta channel is closed."; |
| } else if (replica.isStale()) { |
| purgeReason = "purging replica because it is stale."; |
| } |
| if (purgeReason != null) { |
| LOG.debug("{}: {}", this, purgeReason); |
| purge(replica); |
| } |
| } |
| String addedString = ""; |
| boolean shouldTrimEvictionMaps = false; |
| int newRefCount = --replica.refCount; |
| if (newRefCount == 0) { |
| // Close replica, since there are no remaining references to it. |
| Preconditions.checkArgument(replica.purged, |
| "Replica %s reached a refCount of 0 without being purged", replica); |
| replica.close(); |
| } else if (newRefCount == 1) { |
| Preconditions.checkState(null == replica.getEvictableTimeNs(), |
| "Replica %s had a refCount higher than 1, " + |
| "but was still evictable (evictableTimeNs = %d)", |
| replica, replica.getEvictableTimeNs()); |
| if (!replica.purged) { |
| // Add the replica to the end of an eviction list. |
| // Eviction lists are sorted by time. |
| if (replica.hasMmap()) { |
| insertEvictable(System.nanoTime(), replica, evictableMmapped); |
| addedString = "added to evictableMmapped, "; |
| } else { |
| insertEvictable(System.nanoTime(), replica, evictable); |
| addedString = "added to evictable, "; |
| } |
| shouldTrimEvictionMaps = true; |
| } |
| } else { |
| Preconditions.checkArgument(replica.refCount >= 0, |
| "replica's refCount went negative (refCount = %d" + |
| " for %s)", replica.refCount, replica); |
| } |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(this + ": unref replica " + replica + |
| ": " + addedString + " refCount " + |
| (newRefCount + 1) + " -> " + newRefCount + |
| StringUtils.getStackTrace(Thread.currentThread())); |
| } |
| if (shouldTrimEvictionMaps) { |
| trimEvictionMaps(); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Demote old evictable mmaps into the regular eviction map. |
| * |
| * You must hold the cache lock while calling this function. |
| * |
| * @param now Current time in monotonic milliseconds. |
| * @return Number of replicas demoted. |
| */ |
| private int demoteOldEvictableMmaped(long now) { |
| int numDemoted = 0; |
| boolean needMoreSpace = false; |
| Long evictionTimeNs; |
| |
| while (!evictableMmapped.isEmpty()) { |
| Object eldestKey = evictableMmapped.firstKey(); |
| evictionTimeNs = (Long)eldestKey; |
| long evictionTimeMs = |
| TimeUnit.MILLISECONDS.convert(evictionTimeNs, TimeUnit.NANOSECONDS); |
| if (evictionTimeMs + maxEvictableMmapedLifespanMs >= now) { |
| if (evictableMmapped.size() < maxEvictableMmapedSize) { |
| break; |
| } |
| needMoreSpace = true; |
| } |
| ShortCircuitReplica replica = (ShortCircuitReplica)evictableMmapped.get( |
| eldestKey); |
| if (LOG.isTraceEnabled()) { |
| String rationale = needMoreSpace ? "because we need more space" : |
| "because it's too old"; |
| LOG.trace("demoteOldEvictable: demoting " + replica + ": " + |
| rationale + ": " + |
| StringUtils.getStackTrace(Thread.currentThread())); |
| } |
| removeEvictable(replica, evictableMmapped); |
| munmap(replica); |
| insertEvictable(evictionTimeNs, replica, evictable); |
| numDemoted++; |
| } |
| return numDemoted; |
| } |
| |
| /** |
| * Trim the eviction lists. |
| */ |
| private void trimEvictionMaps() { |
| long now = Time.monotonicNow(); |
| demoteOldEvictableMmaped(now); |
| |
| while (evictable.size() + evictableMmapped.size() > maxTotalSize) { |
| ShortCircuitReplica replica; |
| if (evictable.isEmpty()) { |
| replica = (ShortCircuitReplica) evictableMmapped |
| .get(evictableMmapped.firstKey()); |
| } else { |
| replica = (ShortCircuitReplica) evictable.get(evictable.firstKey()); |
| } |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(this + ": trimEvictionMaps is purging " + replica + |
| StringUtils.getStackTrace(Thread.currentThread())); |
| } |
| purge(replica); |
| } |
| } |
| |
| /** |
| * Munmap a replica, updating outstandingMmapCount. |
| * |
| * @param replica The replica to munmap. |
| */ |
| private void munmap(ShortCircuitReplica replica) { |
| replica.munmap(); |
| outstandingMmapCount--; |
| } |
| |
| /** |
| * Remove a replica from an evictable map. |
| * |
| * @param replica The replica to remove. |
| * @return The map it was removed from. |
| */ |
| private String removeEvictable(ShortCircuitReplica replica) { |
| if (replica.hasMmap()) { |
| removeEvictable(replica, evictableMmapped); |
| return "evictableMmapped"; |
| } else { |
| removeEvictable(replica, evictable); |
| return "evictable"; |
| } |
| } |
| |
| /** |
| * Remove a replica from an evictable map. |
| * |
| * @param replica The replica to remove. |
| * @param map The map to remove it from. |
| */ |
| private void removeEvictable(ShortCircuitReplica replica, |
| LinkedMap map) { |
| Long evictableTimeNs = replica.getEvictableTimeNs(); |
| Preconditions.checkNotNull(evictableTimeNs); |
| ShortCircuitReplica removed = (ShortCircuitReplica)map.remove( |
| evictableTimeNs); |
| Preconditions.checkState(removed == replica, |
| "failed to make %s unevictable", replica); |
| replica.setEvictableTimeNs(null); |
| } |
| |
| /** |
| * Insert a replica into an evictable map. |
| * |
| * If an element already exists with this eviction time, we add a nanosecond |
| * to it until we find an unused key. |
| * |
| * @param evictionTimeNs The eviction time in absolute nanoseconds. |
| * @param replica The replica to insert. |
| * @param map The map to insert it into. |
| */ |
| private void insertEvictable(Long evictionTimeNs, |
| ShortCircuitReplica replica, LinkedMap map) { |
| while (map.containsKey(evictionTimeNs)) { |
| evictionTimeNs++; |
| } |
| Preconditions.checkState(null == replica.getEvictableTimeNs()); |
| replica.setEvictableTimeNs(evictionTimeNs); |
| map.put(evictionTimeNs, replica); |
| } |
| |
| /** |
| * Purge a replica from the cache. |
| * |
| * This doesn't necessarily close the replica, since there may be |
| * outstanding references to it. However, it does mean the cache won't |
| * hand it out to anyone after this. |
| * |
| * You must hold the cache lock while calling this function. |
| * |
| * @param replica The replica being removed. |
| */ |
| private void purge(ShortCircuitReplica replica) { |
| boolean removedFromInfoMap = false; |
| String evictionMapName = null; |
| Preconditions.checkArgument(!replica.purged); |
| replica.purged = true; |
| Waitable<ShortCircuitReplicaInfo> val = replicaInfoMap.get(replica.key); |
| if (val != null) { |
| ShortCircuitReplicaInfo info = val.getVal(); |
| if ((info != null) && (info.getReplica() == replica)) { |
| replicaInfoMap.remove(replica.key); |
| removedFromInfoMap = true; |
| } |
| } |
| Long evictableTimeNs = replica.getEvictableTimeNs(); |
| if (evictableTimeNs != null) { |
| evictionMapName = removeEvictable(replica); |
| } |
| if (LOG.isTraceEnabled()) { |
| StringBuilder builder = new StringBuilder(); |
| builder.append(this).append(": ").append(": purged "). |
| append(replica).append(" from the cache."); |
| if (removedFromInfoMap) { |
| builder.append(" Removed from the replicaInfoMap."); |
| } |
| if (evictionMapName != null) { |
| builder.append(" Removed from ").append(evictionMapName); |
| } |
| LOG.trace(builder.toString()); |
| } |
| unref(replica); |
| } |
| |
| /** |
| * Fetch or create a replica. |
| * |
| * You must hold the cache lock while calling this function. |
| * |
| * @param key Key to use for lookup. |
| * @param creator Replica creator callback. Will be called without |
| * the cache lock being held. |
| * |
| * @return Null if no replica could be found or created. |
| * The replica, otherwise. |
| */ |
| public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key, |
| ShortCircuitReplicaCreator creator) { |
| Waitable<ShortCircuitReplicaInfo> newWaitable = null; |
| lock.lock(); |
| try { |
| ShortCircuitReplicaInfo info = null; |
| do { |
| if (closed) { |
| LOG.trace("{}: can't fethchOrCreate {} because the cache is closed.", |
| this, key); |
| return null; |
| } |
| Waitable<ShortCircuitReplicaInfo> waitable = replicaInfoMap.get(key); |
| if (waitable != null) { |
| try { |
| info = fetch(key, waitable); |
| } catch (RetriableException e) { |
| LOG.debug("{}: retrying {}", this, e.getMessage()); |
| } |
| } |
| } while (false); |
| if (info != null) return info; |
| // We need to load the replica ourselves. |
| newWaitable = new Waitable<>(lock.newCondition()); |
| replicaInfoMap.put(key, newWaitable); |
| } finally { |
| lock.unlock(); |
| } |
| return create(key, creator, newWaitable); |
| } |
| |
| /** |
| * Fetch an existing ReplicaInfo object. |
| * |
| * @param key The key that we're using. |
| * @param waitable The waitable object to wait on. |
| * @return The existing ReplicaInfo object, or null if there is |
| * none. |
| * |
| * @throws RetriableException If the caller needs to retry. |
| */ |
| private ShortCircuitReplicaInfo fetch(ExtendedBlockId key, |
| Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException { |
| // Another thread is already in the process of loading this |
| // ShortCircuitReplica. So we simply wait for it to complete. |
| ShortCircuitReplicaInfo info; |
| try { |
| LOG.trace("{}: found waitable for {}", this, key); |
| info = waitable.await(); |
| } catch (InterruptedException e) { |
| LOG.info(this + ": interrupted while waiting for " + key); |
| Thread.currentThread().interrupt(); |
| throw new RetriableException("interrupted"); |
| } |
| if (info.getInvalidTokenException() != null) { |
| LOG.info(this + ": could not get " + key + " due to InvalidToken " + |
| "exception.", info.getInvalidTokenException()); |
| return info; |
| } |
| ShortCircuitReplica replica = info.getReplica(); |
| if (replica == null) { |
| LOG.warn(this + ": failed to get " + key); |
| return info; |
| } |
| if (replica.purged) { |
| // Ignore replicas that have already been purged from the cache. |
| throw new RetriableException("Ignoring purged replica " + |
| replica + ". Retrying."); |
| } |
| // Check if the replica is stale before using it. |
| // If it is, purge it and retry. |
| if (replica.isStale()) { |
| LOG.info(this + ": got stale replica " + replica + ". Removing " + |
| "this replica from the replicaInfoMap and retrying."); |
| // Remove the cache's reference to the replica. This may or may not |
| // trigger a close. |
| purge(replica); |
| throw new RetriableException("ignoring stale replica " + replica); |
| } |
| ref(replica); |
| return info; |
| } |
| |
| private ShortCircuitReplicaInfo create(ExtendedBlockId key, |
| ShortCircuitReplicaCreator creator, |
| Waitable<ShortCircuitReplicaInfo> newWaitable) { |
| // Handle loading a new replica. |
| ShortCircuitReplicaInfo info = null; |
| try { |
| LOG.trace("{}: loading {}", this, key); |
| info = creator.createShortCircuitReplicaInfo(); |
| } catch (RuntimeException e) { |
| LOG.warn(this + ": failed to load " + key, e); |
| } |
| if (info == null) info = new ShortCircuitReplicaInfo(); |
| lock.lock(); |
| try { |
| if (info.getReplica() != null) { |
| // On success, make sure the cache cleaner thread is running. |
| LOG.trace("{}: successfully loaded {}", this, info.getReplica()); |
| startCacheCleanerThreadIfNeeded(); |
| // Note: new ShortCircuitReplicas start with a refCount of 2, |
| // indicating that both this cache and whoever requested the |
| // creation of the replica hold a reference. So we don't need |
| // to increment the reference count here. |
| } else { |
| // On failure, remove the waitable from the replicaInfoMap. |
| Waitable<ShortCircuitReplicaInfo> waitableInMap = replicaInfoMap.get(key); |
| if (waitableInMap == newWaitable) replicaInfoMap.remove(key); |
| if (info.getInvalidTokenException() != null) { |
| LOG.info(this + ": could not load " + key + " due to InvalidToken " + |
| "exception.", info.getInvalidTokenException()); |
| } else { |
| LOG.warn(this + ": failed to load " + key); |
| } |
| } |
| newWaitable.provide(info); |
| } finally { |
| lock.unlock(); |
| } |
| return info; |
| } |
| |
| private void startCacheCleanerThreadIfNeeded() { |
| if (cacheCleaner == null) { |
| cacheCleaner = new CacheCleaner(); |
| long rateMs = cacheCleaner.getRateInMs(); |
| ScheduledFuture<?> future = |
| cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs, |
| TimeUnit.MILLISECONDS); |
| cacheCleaner.setFuture(future); |
| LOG.debug("{}: starting cache cleaner thread which will run every {} ms", |
| this, rateMs); |
| } |
| } |
| |
| ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica, |
| boolean anchored) { |
| Condition newCond; |
| lock.lock(); |
| try { |
| while (replica.mmapData != null) { |
| if (replica.mmapData instanceof MappedByteBuffer) { |
| ref(replica); |
| MappedByteBuffer mmap = (MappedByteBuffer)replica.mmapData; |
| return new ClientMmap(replica, mmap, anchored); |
| } else if (replica.mmapData instanceof Long) { |
| long lastAttemptTimeMs = (Long)replica.mmapData; |
| long delta = Time.monotonicNow() - lastAttemptTimeMs; |
| if (delta < mmapRetryTimeoutMs) { |
| LOG.trace("{}: can't create client mmap for {} because we failed to" |
| + " create one just {}ms ago.", this, replica, delta); |
| return null; |
| } |
| LOG.trace("{}: retrying client mmap for {}, {} ms after the previous " |
| + "failure.", this, replica, delta); |
| } else if (replica.mmapData instanceof Condition) { |
| Condition cond = (Condition)replica.mmapData; |
| cond.awaitUninterruptibly(); |
| } else { |
| Preconditions.checkState(false, "invalid mmapData type %s", |
| replica.mmapData.getClass().getName()); |
| } |
| } |
| newCond = lock.newCondition(); |
| replica.mmapData = newCond; |
| } finally { |
| lock.unlock(); |
| } |
| MappedByteBuffer map = replica.loadMmapInternal(); |
| lock.lock(); |
| try { |
| if (map == null) { |
| replica.mmapData = Time.monotonicNow(); |
| newCond.signalAll(); |
| return null; |
| } else { |
| outstandingMmapCount++; |
| replica.mmapData = map; |
| ref(replica); |
| newCond.signalAll(); |
| return new ClientMmap(replica, map, anchored); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Close the cache and free all associated resources. |
| */ |
| @Override |
| public void close() { |
| try { |
| lock.lock(); |
| if (closed) return; |
| closed = true; |
| LOG.info(this + ": closing"); |
| maxNonMmappedEvictableLifespanMs = 0; |
| maxEvictableMmapedSize = 0; |
| // Close and join cacheCleaner thread. |
| IOUtilsClient.cleanup(LOG, cacheCleaner); |
| // Purge all replicas. |
| while (true) { |
| Object eldestKey; |
| try { |
| eldestKey = evictable.firstKey(); |
| } catch (NoSuchElementException e) { |
| break; |
| } |
| purge((ShortCircuitReplica)evictable.get(eldestKey)); |
| } |
| while (true) { |
| Object eldestKey; |
| try { |
| eldestKey = evictableMmapped.firstKey(); |
| } catch (NoSuchElementException e) { |
| break; |
| } |
| purge((ShortCircuitReplica)evictableMmapped.get(eldestKey)); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| |
| releaserExecutor.shutdown(); |
| cleanerExecutor.shutdown(); |
| // wait for existing tasks to terminate |
| try { |
| if (!releaserExecutor.awaitTermination(30, TimeUnit.SECONDS)) { |
| LOG.error("Forcing SlotReleaserThreadPool to shutdown!"); |
| releaserExecutor.shutdownNow(); |
| } |
| } catch (InterruptedException e) { |
| releaserExecutor.shutdownNow(); |
| Thread.currentThread().interrupt(); |
| LOG.error("Interrupted while waiting for SlotReleaserThreadPool " |
| + "to terminate", e); |
| } |
| |
| // wait for existing tasks to terminate |
| try { |
| if (!cleanerExecutor.awaitTermination(30, TimeUnit.SECONDS)) { |
| LOG.error("Forcing CleanerThreadPool to shutdown!"); |
| cleanerExecutor.shutdownNow(); |
| } |
| } catch (InterruptedException e) { |
| cleanerExecutor.shutdownNow(); |
| Thread.currentThread().interrupt(); |
| LOG.error("Interrupted while waiting for CleanerThreadPool " |
| + "to terminate", e); |
| } |
| IOUtilsClient.cleanup(LOG, shmManager); |
| } |
| |
| @VisibleForTesting // ONLY for testing |
| public interface CacheVisitor { |
| void visit(int numOutstandingMmaps, |
| Map<ExtendedBlockId, ShortCircuitReplica> replicas, |
| Map<ExtendedBlockId, InvalidToken> failedLoads, |
| LinkedMap evictable, |
| LinkedMap evictableMmapped); |
| } |
| |
| @VisibleForTesting // ONLY for testing |
| public void accept(CacheVisitor visitor) { |
| lock.lock(); |
| try { |
| Map<ExtendedBlockId, ShortCircuitReplica> replicas = new HashMap<>(); |
| Map<ExtendedBlockId, InvalidToken> failedLoads = new HashMap<>(); |
| for (Entry<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> entry : |
| replicaInfoMap.entrySet()) { |
| Waitable<ShortCircuitReplicaInfo> waitable = entry.getValue(); |
| if (waitable.hasVal()) { |
| if (waitable.getVal().getReplica() != null) { |
| replicas.put(entry.getKey(), waitable.getVal().getReplica()); |
| } else { |
| // The exception may be null here, indicating a failed load that |
| // isn't the result of an invalid block token. |
| failedLoads.put(entry.getKey(), |
| waitable.getVal().getInvalidTokenException()); |
| } |
| } |
| } |
| LOG.debug("visiting {} with outstandingMmapCount={}, replicas={}, " |
| + "failedLoads={}, evictable={}, evictableMmapped={}", |
| visitor.getClass().getName(), outstandingMmapCount, replicas, |
| failedLoads, evictable, evictableMmapped); |
| visitor.visit(outstandingMmapCount, replicas, failedLoads, |
| evictable, evictableMmapped); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "ShortCircuitCache(0x" + |
| Integer.toHexString(System.identityHashCode(this)) + ")"; |
| } |
| |
| /** |
| * Allocate a new shared memory slot. |
| * |
| * @param datanode The datanode to allocate a shm slot with. |
| * @param peer A peer connected to the datanode. |
| * @param usedPeer Will be set to true if we use up the provided peer. |
| * @param blockId The block id and block pool id of the block we're |
| * allocating this slot for. |
| * @param clientName The name of the DFSClient allocating the shared |
| * memory. |
| * @return Null if short-circuit shared memory is disabled; |
| * a short-circuit memory slot otherwise. |
| * @throws IOException An exception if there was an error talking to |
| * the datanode. |
| */ |
| public Slot allocShmSlot(DatanodeInfo datanode, |
| DomainPeer peer, MutableBoolean usedPeer, |
| ExtendedBlockId blockId, String clientName) throws IOException { |
| if (shmManager != null) { |
| return shmManager.allocSlot(datanode, peer, usedPeer, |
| blockId, clientName); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Free a slot immediately. |
| * |
| * ONLY use this if the DataNode is not yet aware of the slot. |
| * |
| * @param slot The slot to free. |
| */ |
| public void freeSlot(Slot slot) { |
| Preconditions.checkState(shmManager != null); |
| slot.makeInvalid(); |
| shmManager.freeSlot(slot); |
| } |
| |
| /** |
| * Schedule a shared memory slot to be released. |
| * |
| * @param slot The slot to release. |
| */ |
| public void scheduleSlotReleaser(Slot slot) { |
| Preconditions.checkState(shmManager != null); |
| releaserExecutor.execute(new SlotReleaser(slot)); |
| } |
| |
| @VisibleForTesting |
| public DfsClientShmManager getDfsClientShmManager() { |
| return shmManager; |
| } |
| |
| /** |
| * Can be used in testing to verify whether a read went through SCR, after |
| * the read is done and before the stream is closed. |
| */ |
| @VisibleForTesting |
| public int getReplicaInfoMapSize() { |
| return replicaInfoMap.size(); |
| } |
| } |