| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.nfs.nfs3; |
| |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.Map.Entry; |
| import java.util.concurrent.ConcurrentMap; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; |
| import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; |
| import org.apache.hadoop.nfs.nfs3.FileHandle; |
| import org.apache.hadoop.util.Daemon; |
| import org.apache.hadoop.util.Time; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Maps; |
| |
| /** |
| * A cache saves OpenFileCtx objects for different users. Each cache entry is |
| * used to maintain the writing context for a single file. |
| */ |
| class OpenFileCtxCache { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(OpenFileCtxCache.class); |
| // Insert and delete with openFileMap are synced |
| private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps |
| .newConcurrentMap(); |
| |
| private final int maxStreams; |
| private final long streamTimeout; |
| private final StreamMonitor streamMonitor; |
| |
| OpenFileCtxCache(NfsConfiguration config, long streamTimeout) { |
| maxStreams = config.getInt(NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_KEY, |
| NfsConfigKeys.DFS_NFS_MAX_OPEN_FILES_DEFAULT); |
| LOG.info("Maximum open streams is " + maxStreams); |
| this.streamTimeout = streamTimeout; |
| streamMonitor = new StreamMonitor(); |
| } |
| |
| /** |
| * The entry to be evicted is based on the following rules:<br> |
| * 1. if the OpenFileCtx has any pending task, it will not be chosen.<br> |
| * 2. if there is inactive OpenFileCtx, the first found one is to evict. <br> |
| * 3. For OpenFileCtx entries don't belong to group 1 or 2, the idlest one |
| * is select. If it's idle longer than OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT, it |
| * will be evicted. Otherwise, the whole eviction request is failed. |
| */ |
| @VisibleForTesting |
| Entry<FileHandle, OpenFileCtx> getEntryToEvict() { |
| Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet() |
| .iterator(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("openFileMap size:" + size()); |
| } |
| |
| Entry<FileHandle, OpenFileCtx> idlest = null; |
| |
| while (it.hasNext()) { |
| Entry<FileHandle, OpenFileCtx> pairs = it.next(); |
| OpenFileCtx ctx = pairs.getValue(); |
| if (!ctx.getActiveState()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Got one inactive stream: " + ctx); |
| } |
| return pairs; |
| } |
| if (ctx.hasPendingWork()) { |
| // Always skip files with pending work. |
| continue; |
| } |
| if (idlest == null) { |
| idlest = pairs; |
| } else { |
| if (ctx.getLastAccessTime() < idlest.getValue().getLastAccessTime()) { |
| idlest = pairs; |
| } |
| } |
| } |
| |
| if (idlest == null) { |
| LOG.warn("No eviction candidate. All streams have pending work."); |
| return null; |
| } else { |
| long idleTime = Time.monotonicNow() |
| - idlest.getValue().getLastAccessTime(); |
| if (idleTime < NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("idlest stream's idle time:" + idleTime); |
| } |
| LOG.warn("All opened streams are busy, can't remove any from cache."); |
| return null; |
| } else { |
| return idlest; |
| } |
| } |
| } |
| |
| boolean put(FileHandle h, OpenFileCtx context) { |
| OpenFileCtx toEvict = null; |
| synchronized (this) { |
| Preconditions.checkState(size() <= this.maxStreams, |
| "stream cache size " + size() + " is larger than maximum" + this |
| .maxStreams); |
| if (size() == this.maxStreams) { |
| Entry<FileHandle, OpenFileCtx> pairs = getEntryToEvict(); |
| if (pairs ==null) { |
| return false; |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Evict stream ctx: " + pairs.getValue()); |
| } |
| toEvict = openFileMap.remove(pairs.getKey()); |
| Preconditions.checkState(toEvict == pairs.getValue(), |
| "The deleted entry is not the same as odlest found."); |
| } |
| } |
| openFileMap.put(h, context); |
| } |
| |
| // Cleanup the old stream outside the lock |
| if (toEvict != null) { |
| toEvict.cleanup(); |
| } |
| return true; |
| } |
| |
| @VisibleForTesting |
| void scan(long streamTimeout) { |
| ArrayList<OpenFileCtx> ctxToRemove = new ArrayList<OpenFileCtx>(); |
| Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet() |
| .iterator(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("openFileMap size:" + size()); |
| } |
| |
| while (it.hasNext()) { |
| Entry<FileHandle, OpenFileCtx> pairs = it.next(); |
| FileHandle handle = pairs.getKey(); |
| OpenFileCtx ctx = pairs.getValue(); |
| if (!ctx.streamCleanup(handle, streamTimeout)) { |
| continue; |
| } |
| |
| // Check it again inside lock before removing |
| synchronized (this) { |
| OpenFileCtx ctx2 = openFileMap.get(handle); |
| if (ctx2 != null) { |
| if (ctx2.streamCleanup(handle, streamTimeout)) { |
| openFileMap.remove(handle); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("After remove stream " + handle.dumpFileHandle() |
| + ", the stream number:" + size()); |
| } |
| ctxToRemove.add(ctx2); |
| } |
| } |
| } |
| } |
| |
| // Invoke the cleanup outside the lock |
| for (OpenFileCtx ofc : ctxToRemove) { |
| ofc.cleanup(); |
| } |
| } |
| |
| OpenFileCtx get(FileHandle key) { |
| return openFileMap.get(key); |
| } |
| |
| int size() { |
| return openFileMap.size(); |
| } |
| |
| void start() { |
| streamMonitor.start(); |
| } |
| |
| // Evict all entries |
| void cleanAll() { |
| ArrayList<OpenFileCtx> cleanedContext = new ArrayList<OpenFileCtx>(); |
| synchronized (this) { |
| Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet() |
| .iterator(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("openFileMap size:" + size()); |
| } |
| |
| while (it.hasNext()) { |
| Entry<FileHandle, OpenFileCtx> pairs = it.next(); |
| OpenFileCtx ctx = pairs.getValue(); |
| it.remove(); |
| cleanedContext.add(ctx); |
| } |
| } |
| |
| // Invoke the cleanup outside the lock |
| for (OpenFileCtx ofc : cleanedContext) { |
| ofc.cleanup(); |
| } |
| } |
| |
| void shutdown() { |
| // stop the dump thread |
| if (streamMonitor.isAlive()) { |
| streamMonitor.shouldRun(false); |
| streamMonitor.interrupt(); |
| try { |
| streamMonitor.join(3000); |
| } catch (InterruptedException ignored) { |
| } |
| } |
| |
| cleanAll(); |
| } |
| |
| /** |
| * StreamMonitor wakes up periodically to find and closes idle streams. |
| */ |
| class StreamMonitor extends Daemon { |
| private final static int rotation = 5 * 1000; // 5 seconds |
| private long lastWakeupTime = 0; |
| private boolean shouldRun = true; |
| |
| void shouldRun(boolean shouldRun) { |
| this.shouldRun = shouldRun; |
| } |
| |
| @Override |
| public void run() { |
| while (shouldRun) { |
| scan(streamTimeout); |
| |
| // Check if it can sleep |
| try { |
| long workedTime = Time.monotonicNow() - lastWakeupTime; |
| if (workedTime < rotation) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("StreamMonitor can still have a sleep:" |
| + ((rotation - workedTime) / 1000)); |
| } |
| Thread.sleep(rotation - workedTime); |
| } |
| lastWakeupTime = Time.monotonicNow(); |
| |
| } catch (InterruptedException e) { |
| LOG.info("StreamMonitor got interrupted"); |
| return; |
| } |
| } |
| } |
| } |
| } |