| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.hadoop.fs.impl.prefetch; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.time.Duration; |
| import java.time.Instant; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Supplier; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.fs.statistics.DurationTracker; |
| |
| import static java.util.Objects.requireNonNull; |
| |
| import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNegative; |
| import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; |
| |
| /** |
| * Provides read access to the underlying file one block at a time. |
| * Improve read performance by prefetching and locall caching blocks. |
| */ |
| public abstract class CachingBlockManager extends BlockManager { |
| private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class); |
| private static final int TIMEOUT_MINUTES = 60; |
| |
| /** |
| * Asynchronous tasks are performed in this pool. |
| */ |
| private final ExecutorServiceFuturePool futurePool; |
| |
| /** |
| * Pool of shared ByteBuffer instances. |
| */ |
| private BufferPool bufferPool; |
| |
| /** |
| * Size of the in-memory cache in terms of number of blocks. |
| * Total memory consumption is up to bufferPoolSize * blockSize. |
| */ |
| private final int bufferPoolSize; |
| |
| /** |
| * Local block cache. |
| */ |
| private BlockCache cache; |
| |
| /** |
| * Error counts. For testing purposes. |
| */ |
| private final AtomicInteger numCachingErrors; |
| private final AtomicInteger numReadErrors; |
| |
| /** |
| * Operations performed by this block manager. |
| */ |
| private final BlockOperations ops; |
| |
| private boolean closed; |
| |
| /** |
| * If a single caching operation takes more than this time (in seconds), |
| * we disable caching to prevent further perf degradation due to caching. |
| */ |
| private static final int SLOW_CACHING_THRESHOLD = 5; |
| |
| /** |
| * Once set to true, any further caching requests will be ignored. |
| */ |
| private final AtomicBoolean cachingDisabled; |
| |
| private final PrefetchingStatistics prefetchingStatistics; |
| |
| /** |
| * Constructs an instance of a {@code CachingBlockManager}. |
| * |
| * @param futurePool asynchronous tasks are performed in this pool. |
| * @param blockData information about each block of the underlying file. |
| * @param bufferPoolSize size of the in-memory cache in terms of number of blocks. |
| * @param prefetchingStatistics statistics for this stream. |
| * |
| * @throws IllegalArgumentException if bufferPoolSize is zero or negative. |
| */ |
| public CachingBlockManager( |
| ExecutorServiceFuturePool futurePool, |
| BlockData blockData, |
| int bufferPoolSize, |
| PrefetchingStatistics prefetchingStatistics) { |
| super(blockData); |
| |
| Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize"); |
| |
| this.futurePool = requireNonNull(futurePool); |
| this.bufferPoolSize = bufferPoolSize; |
| this.numCachingErrors = new AtomicInteger(); |
| this.numReadErrors = new AtomicInteger(); |
| this.cachingDisabled = new AtomicBoolean(); |
| this.prefetchingStatistics = requireNonNull(prefetchingStatistics); |
| |
| if (this.getBlockData().getFileSize() > 0) { |
| this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(), |
| this.prefetchingStatistics); |
| this.cache = this.createCache(); |
| } |
| |
| this.ops = new BlockOperations(); |
| this.ops.setDebug(false); |
| } |
| |
| /** |
| * Gets the block having the given {@code blockNumber}. |
| * |
| * @throws IllegalArgumentException if blockNumber is negative. |
| */ |
| @Override |
| public BufferData get(int blockNumber) throws IOException { |
| checkNotNegative(blockNumber, "blockNumber"); |
| |
| BufferData data; |
| final int maxRetryDelayMs = bufferPoolSize * 120 * 1000; |
| final int statusUpdateDelayMs = 120 * 1000; |
| Retryer retryer = new Retryer(10, maxRetryDelayMs, statusUpdateDelayMs); |
| boolean done; |
| |
| do { |
| if (closed) { |
| throw new IOException("this stream is already closed"); |
| } |
| |
| data = bufferPool.acquire(blockNumber); |
| done = getInternal(data); |
| |
| if (retryer.updateStatus()) { |
| LOG.warn("waiting to get block: {}", blockNumber); |
| LOG.info("state = {}", this.toString()); |
| } |
| } |
| while (!done && retryer.continueRetry()); |
| |
| if (done) { |
| return data; |
| } else { |
| String message = String.format("Wait failed for get(%d)", blockNumber); |
| throw new IllegalStateException(message); |
| } |
| } |
| |
| private boolean getInternal(BufferData data) throws IOException { |
| Validate.checkNotNull(data, "data"); |
| |
| // Opportunistic check without locking. |
| if (data.stateEqualsOneOf( |
| BufferData.State.PREFETCHING, |
| BufferData.State.CACHING, |
| BufferData.State.DONE)) { |
| return false; |
| } |
| |
| synchronized (data) { |
| // Reconfirm state after locking. |
| if (data.stateEqualsOneOf( |
| BufferData.State.PREFETCHING, |
| BufferData.State.CACHING, |
| BufferData.State.DONE)) { |
| return false; |
| } |
| |
| int blockNumber = data.getBlockNumber(); |
| if (data.getState() == BufferData.State.READY) { |
| BlockOperations.Operation op = ops.getPrefetched(blockNumber); |
| ops.end(op); |
| return true; |
| } |
| |
| data.throwIfStateIncorrect(BufferData.State.BLANK); |
| read(data); |
| return true; |
| } |
| } |
| |
| /** |
| * Releases resources allocated to the given block. |
| * |
| * @throws IllegalArgumentException if data is null. |
| */ |
| @Override |
| public void release(BufferData data) { |
| if (closed) { |
| return; |
| } |
| |
| Validate.checkNotNull(data, "data"); |
| |
| BlockOperations.Operation op = ops.release(data.getBlockNumber()); |
| bufferPool.release(data); |
| ops.end(op); |
| } |
| |
| @Override |
| public synchronized void close() { |
| if (closed) { |
| return; |
| } |
| |
| closed = true; |
| |
| final BlockOperations.Operation op = ops.close(); |
| |
| // Cancel any prefetches in progress. |
| cancelPrefetches(); |
| |
| cleanupWithLogger(LOG, cache); |
| |
| ops.end(op); |
| LOG.info(ops.getSummary(false)); |
| |
| bufferPool.close(); |
| bufferPool = null; |
| } |
| |
| /** |
| * Requests optional prefetching of the given block. |
| * The block is prefetched only if we can acquire a free buffer. |
| * |
| * @throws IllegalArgumentException if blockNumber is negative. |
| */ |
| @Override |
| public void requestPrefetch(int blockNumber) { |
| checkNotNegative(blockNumber, "blockNumber"); |
| |
| if (closed) { |
| return; |
| } |
| |
| // We initiate a prefetch only if we can acquire a buffer from the shared pool. |
| BufferData data = bufferPool.tryAcquire(blockNumber); |
| if (data == null) { |
| return; |
| } |
| |
| // Opportunistic check without locking. |
| if (!data.stateEqualsOneOf(BufferData.State.BLANK)) { |
| // The block is ready or being prefetched/cached. |
| return; |
| } |
| |
| synchronized (data) { |
| // Reconfirm state after locking. |
| if (!data.stateEqualsOneOf(BufferData.State.BLANK)) { |
| // The block is ready or being prefetched/cached. |
| return; |
| } |
| |
| BlockOperations.Operation op = ops.requestPrefetch(blockNumber); |
| PrefetchTask prefetchTask = new PrefetchTask(data, this, Instant.now()); |
| Future<Void> prefetchFuture = futurePool.executeFunction(prefetchTask); |
| data.setPrefetch(prefetchFuture); |
| ops.end(op); |
| } |
| } |
| |
| /** |
| * Requests cancellation of any previously issued prefetch requests. |
| */ |
| @Override |
| public void cancelPrefetches() { |
| BlockOperations.Operation op = ops.cancelPrefetches(); |
| |
| for (BufferData data : bufferPool.getAll()) { |
| // We add blocks being prefetched to the local cache so that the prefetch is not wasted. |
| if (data.stateEqualsOneOf(BufferData.State.PREFETCHING, BufferData.State.READY)) { |
| requestCaching(data); |
| } |
| } |
| |
| ops.end(op); |
| } |
| |
| private void read(BufferData data) throws IOException { |
| synchronized (data) { |
| readBlock(data, false, BufferData.State.BLANK); |
| } |
| } |
| |
| private void prefetch(BufferData data, Instant taskQueuedStartTime) throws IOException { |
| synchronized (data) { |
| prefetchingStatistics.executorAcquired( |
| Duration.between(taskQueuedStartTime, Instant.now())); |
| readBlock( |
| data, |
| true, |
| BufferData.State.PREFETCHING, |
| BufferData.State.CACHING); |
| } |
| } |
| |
| private void readBlock(BufferData data, boolean isPrefetch, BufferData.State... expectedState) |
| throws IOException { |
| |
| if (closed) { |
| return; |
| } |
| |
| BlockOperations.Operation op = null; |
| DurationTracker tracker = null; |
| |
| synchronized (data) { |
| try { |
| if (data.stateEqualsOneOf(BufferData.State.DONE, BufferData.State.READY)) { |
| // DONE : Block was released, likely due to caching being disabled on slow perf. |
| // READY : Block was already fetched by another thread. No need to re-read. |
| return; |
| } |
| |
| data.throwIfStateIncorrect(expectedState); |
| int blockNumber = data.getBlockNumber(); |
| |
| // Prefer reading from cache over reading from network. |
| if (cache.containsBlock(blockNumber)) { |
| op = ops.getCached(blockNumber); |
| cache.get(blockNumber, data.getBuffer()); |
| data.setReady(expectedState); |
| return; |
| } |
| |
| if (isPrefetch) { |
| tracker = prefetchingStatistics.prefetchOperationStarted(); |
| op = ops.prefetch(data.getBlockNumber()); |
| } else { |
| op = ops.getRead(data.getBlockNumber()); |
| } |
| |
| long offset = getBlockData().getStartOffset(data.getBlockNumber()); |
| int size = getBlockData().getSize(data.getBlockNumber()); |
| ByteBuffer buffer = data.getBuffer(); |
| buffer.clear(); |
| read(buffer, offset, size); |
| buffer.flip(); |
| data.setReady(expectedState); |
| } catch (Exception e) { |
| String message = String.format("error during readBlock(%s)", data.getBlockNumber()); |
| LOG.error(message, e); |
| |
| if (isPrefetch && tracker != null) { |
| tracker.failed(); |
| } |
| |
| numReadErrors.incrementAndGet(); |
| data.setDone(); |
| throw e; |
| } finally { |
| if (op != null) { |
| ops.end(op); |
| } |
| |
| if (isPrefetch) { |
| prefetchingStatistics.prefetchOperationCompleted(); |
| if (tracker != null) { |
| tracker.close(); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Read task that is submitted to the future pool. |
| */ |
| private static class PrefetchTask implements Supplier<Void> { |
| private final BufferData data; |
| private final CachingBlockManager blockManager; |
| private final Instant taskQueuedStartTime; |
| |
| PrefetchTask(BufferData data, CachingBlockManager blockManager, Instant taskQueuedStartTime) { |
| this.data = data; |
| this.blockManager = blockManager; |
| this.taskQueuedStartTime = taskQueuedStartTime; |
| } |
| |
| @Override |
| public Void get() { |
| try { |
| blockManager.prefetch(data, taskQueuedStartTime); |
| } catch (Exception e) { |
| LOG.error("error during prefetch", e); |
| } |
| return null; |
| } |
| } |
| |
| private static final BufferData.State[] EXPECTED_STATE_AT_CACHING = |
| new BufferData.State[] { |
| BufferData.State.PREFETCHING, BufferData.State.READY |
| }; |
| |
| /** |
| * Requests that the given block should be copied to the local cache. |
| * The block must not be accessed by the caller after calling this method |
| * because it will released asynchronously relative to the caller. |
| * |
| * @throws IllegalArgumentException if data is null. |
| */ |
| @Override |
| public void requestCaching(BufferData data) { |
| if (closed) { |
| return; |
| } |
| |
| if (cachingDisabled.get()) { |
| data.setDone(); |
| return; |
| } |
| |
| Validate.checkNotNull(data, "data"); |
| |
| // Opportunistic check without locking. |
| if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) { |
| return; |
| } |
| |
| synchronized (data) { |
| // Reconfirm state after locking. |
| if (!data.stateEqualsOneOf(EXPECTED_STATE_AT_CACHING)) { |
| return; |
| } |
| |
| if (cache.containsBlock(data.getBlockNumber())) { |
| data.setDone(); |
| return; |
| } |
| |
| BufferData.State state = data.getState(); |
| |
| BlockOperations.Operation op = ops.requestCaching(data.getBlockNumber()); |
| Future<Void> blockFuture; |
| if (state == BufferData.State.PREFETCHING) { |
| blockFuture = data.getActionFuture(); |
| } else { |
| CompletableFuture<Void> cf = new CompletableFuture<>(); |
| cf.complete(null); |
| blockFuture = cf; |
| } |
| |
| CachePutTask task = new CachePutTask(data, blockFuture, this, Instant.now()); |
| Future<Void> actionFuture = futurePool.executeFunction(task); |
| data.setCaching(actionFuture); |
| ops.end(op); |
| } |
| } |
| |
| private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture, |
| Instant taskQueuedStartTime) { |
| prefetchingStatistics.executorAcquired( |
| Duration.between(taskQueuedStartTime, Instant.now())); |
| |
| if (closed) { |
| return; |
| } |
| |
| if (cachingDisabled.get()) { |
| data.setDone(); |
| return; |
| } |
| |
| try { |
| blockFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES); |
| if (data.stateEqualsOneOf(BufferData.State.DONE)) { |
| // There was an error during prefetch. |
| return; |
| } |
| } catch (Exception e) { |
| LOG.error("error waiting on blockFuture: {}", data, e); |
| data.setDone(); |
| return; |
| } |
| |
| if (cachingDisabled.get()) { |
| data.setDone(); |
| return; |
| } |
| |
| BlockOperations.Operation op = null; |
| |
| synchronized (data) { |
| try { |
| if (data.stateEqualsOneOf(BufferData.State.DONE)) { |
| return; |
| } |
| |
| if (cache.containsBlock(data.getBlockNumber())) { |
| data.setDone(); |
| return; |
| } |
| |
| op = ops.addToCache(data.getBlockNumber()); |
| ByteBuffer buffer = data.getBuffer().duplicate(); |
| buffer.rewind(); |
| cachePut(data.getBlockNumber(), buffer); |
| data.setDone(); |
| } catch (Exception e) { |
| numCachingErrors.incrementAndGet(); |
| String message = String.format("error adding block to cache after wait: %s", data); |
| LOG.error(message, e); |
| data.setDone(); |
| } |
| |
| if (op != null) { |
| BlockOperations.End endOp = (BlockOperations.End) ops.end(op); |
| if (endOp.duration() > SLOW_CACHING_THRESHOLD) { |
| if (!cachingDisabled.getAndSet(true)) { |
| String message = String.format( |
| "Caching disabled because of slow operation (%.1f sec)", endOp.duration()); |
| LOG.warn(message); |
| } |
| } |
| } |
| } |
| } |
| |
| protected BlockCache createCache() { |
| return new SingleFilePerBlockCache(prefetchingStatistics); |
| } |
| |
| protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException { |
| if (closed) { |
| return; |
| } |
| |
| cache.put(blockNumber, buffer); |
| } |
| |
| private static class CachePutTask implements Supplier<Void> { |
| private final BufferData data; |
| |
| // Block being asynchronously fetched. |
| private final Future<Void> blockFuture; |
| |
| // Block manager that manages this block. |
| private final CachingBlockManager blockManager; |
| |
| private final Instant taskQueuedStartTime; |
| |
| CachePutTask( |
| BufferData data, |
| Future<Void> blockFuture, |
| CachingBlockManager blockManager, |
| Instant taskQueuedStartTime) { |
| this.data = data; |
| this.blockFuture = blockFuture; |
| this.blockManager = blockManager; |
| this.taskQueuedStartTime = taskQueuedStartTime; |
| } |
| |
| @Override |
| public Void get() { |
| blockManager.addToCacheAndRelease(data, blockFuture, taskQueuedStartTime); |
| return null; |
| } |
| } |
| |
| /** |
| * Number of ByteBuffers available to be acquired. |
| * |
| * @return the number of available buffers. |
| */ |
| public int numAvailable() { |
| return bufferPool.numAvailable(); |
| } |
| |
| /** |
| * Number of caching operations completed. |
| * |
| * @return the number of cached buffers. |
| */ |
| public int numCached() { |
| return cache.size(); |
| } |
| |
| /** |
| * Number of errors encountered when caching. |
| * |
| * @return the number of errors encountered when caching. |
| */ |
| public int numCachingErrors() { |
| return numCachingErrors.get(); |
| } |
| |
| /** |
| * Number of errors encountered when reading. |
| * |
| * @return the number of errors encountered when reading. |
| */ |
| public int numReadErrors() { |
| return numReadErrors.get(); |
| } |
| |
| BufferData getData(int blockNumber) { |
| return bufferPool.tryAcquire(blockNumber); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| |
| sb.append("cache("); |
| sb.append(cache.toString()); |
| sb.append("); "); |
| |
| sb.append("pool: "); |
| sb.append(bufferPool.toString()); |
| |
| return sb.toString(); |
| } |
| } |