HADOOP-18291. S3A prefetch - Implement thread-safe LRU cache for SingleFilePerBlockCache (#5754)
Contributed by Viraj Jasani
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java
index e43b176d..4461c11 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java
@@ -110,6 +110,7 @@ public abstract class CachingBlockManager extends BlockManager {
* @param prefetchingStatistics statistics for this stream.
* @param conf the configuration.
* @param localDirAllocator the local dir allocator instance.
+ * @param maxBlocksCount max blocks count to be kept in cache at any time.
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
*/
public CachingBlockManager(
@@ -118,7 +119,8 @@ public CachingBlockManager(
int bufferPoolSize,
PrefetchingStatistics prefetchingStatistics,
Configuration conf,
- LocalDirAllocator localDirAllocator) {
+ LocalDirAllocator localDirAllocator,
+ int maxBlocksCount) {
super(blockData);
Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
@@ -129,16 +131,16 @@ public CachingBlockManager(
this.numReadErrors = new AtomicInteger();
this.cachingDisabled = new AtomicBoolean();
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
+ this.conf = requireNonNull(conf);
if (this.getBlockData().getFileSize() > 0) {
this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
this.prefetchingStatistics);
- this.cache = this.createCache();
+ this.cache = this.createCache(maxBlocksCount);
}
this.ops = new BlockOperations();
this.ops.setDebug(false);
- this.conf = requireNonNull(conf);
this.localDirAllocator = localDirAllocator;
}
@@ -557,8 +559,8 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
}
}
- protected BlockCache createCache() {
- return new SingleFilePerBlockCache(prefetchingStatistics);
+ protected BlockCache createCache(int maxBlocksCount) {
+ return new SingleFilePerBlockCache(prefetchingStatistics, maxBlocksCount);
}
protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchConstants.java
new file mode 100644
index 0000000..785023f
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.impl.prefetch;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Constants used by prefetch implementations.
+ */
+public final class PrefetchConstants {
+
+ private PrefetchConstants() {
+ }
+
+ /**
+ * Timeout to be used by close, while acquiring prefetch block write lock.
+ * Value = {@value PREFETCH_WRITE_LOCK_TIMEOUT}
+ */
+ static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5;
+
+ /**
+ * Lock timeout unit to be used by the thread while acquiring prefetch block write lock.
+ * Value = {@value PREFETCH_WRITE_LOCK_TIMEOUT_UNIT}
+ */
+ static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS;
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
index e043fbd..a84a79e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
@@ -47,6 +47,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.util.Preconditions;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
@@ -61,7 +62,32 @@ public class SingleFilePerBlockCache implements BlockCache {
/**
* Blocks stored in this cache.
*/
- private final Map<Integer, Entry> blocks = new ConcurrentHashMap<>();
+ private final Map<Integer, Entry> blocks;
+
+ /**
+ * Total max blocks count, to be considered as baseline for LRU cache eviction.
+ */
+ private final int maxBlocksCount;
+
+ /**
+ * The lock to be shared by LRU based linked list updates.
+ */
+ private final ReentrantReadWriteLock blocksLock;
+
+ /**
+ * Head of the linked list.
+ */
+ private Entry head;
+
+ /**
+ * Tail of the linked list.
+ */
+ private Entry tail;
+
+ /**
+ * Total size of the linked list.
+ */
+ private int entryListSize;
/**
* Number of times a block was read from this cache.
@@ -74,16 +100,6 @@ public class SingleFilePerBlockCache implements BlockCache {
private final PrefetchingStatistics prefetchingStatistics;
/**
- * Timeout to be used by close, while acquiring prefetch block write lock.
- */
- private static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5;
-
- /**
- * Lock timeout unit to be used by the thread while acquiring prefetch block write lock.
- */
- private static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS;
-
- /**
* File attributes attached to any intermediate temporary file created during index creation.
*/
private static final Set<PosixFilePermission> TEMP_FILE_ATTRS =
@@ -103,6 +119,8 @@ private enum LockType {
READ,
WRITE
}
+ private Entry previous;
+ private Entry next;
Entry(int blockNumber, Path path, int size, long checksum) {
this.blockNumber = blockNumber;
@@ -110,6 +128,8 @@ private enum LockType {
this.size = size;
this.checksum = checksum;
this.lock = new ReentrantReadWriteLock();
+ this.previous = null;
+ this.next = null;
}
@Override
@@ -166,16 +186,37 @@ private boolean takeLock(LockType lockType, long timeout, TimeUnit unit) {
}
return false;
}
+
+ private Entry getPrevious() {
+ return previous;
+ }
+
+ private void setPrevious(Entry previous) {
+ this.previous = previous;
+ }
+
+ private Entry getNext() {
+ return next;
+ }
+
+ private void setNext(Entry next) {
+ this.next = next;
+ }
}
/**
* Constructs an instance of a {@code SingleFilePerBlockCache}.
*
* @param prefetchingStatistics statistics for this stream.
+ * @param maxBlocksCount max blocks count to be kept in cache at any time.
*/
- public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
+ public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics, int maxBlocksCount) {
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.closed = new AtomicBoolean(false);
+ this.maxBlocksCount = maxBlocksCount;
+ Preconditions.checkArgument(maxBlocksCount > 0, "maxBlocksCount should be more than 0");
+ blocks = new ConcurrentHashMap<>();
+ blocksLock = new ReentrantReadWriteLock();
}
/**
@@ -247,10 +288,61 @@ private Entry getEntry(int blockNumber) {
throw new IllegalStateException(String.format("block %d not found in cache", blockNumber));
}
numGets++;
+ addToLinkedListHead(entry);
return entry;
}
/**
+ * Helper method to add the given entry to the head of the linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToLinkedListHead(Entry entry) {
+ blocksLock.writeLock().lock();
+ try {
+ addToHeadOfLinkedList(entry);
+ } finally {
+ blocksLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Add the given entry to the head of the linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToHeadOfLinkedList(Entry entry) {
+ if (head == null) {
+ head = entry;
+ tail = entry;
+ }
+ LOG.debug(
+ "Block num {} to be added to the head. Current head block num: {} and tail block num: {}",
+ entry.blockNumber, head.blockNumber, tail.blockNumber);
+ if (entry != head) {
+ Entry prev = entry.getPrevious();
+ Entry nxt = entry.getNext();
+ // no-op if the block is already evicted
+ if (!blocks.containsKey(entry.blockNumber)) {
+ return;
+ }
+ if (prev != null) {
+ prev.setNext(nxt);
+ }
+ if (nxt != null) {
+ nxt.setPrevious(prev);
+ }
+ entry.setPrevious(null);
+ entry.setNext(head);
+ head.setPrevious(entry);
+ head = entry;
+ if (prev != null && prev.getNext() == null) {
+ tail = prev;
+ }
+ }
+ }
+
+ /**
* Puts the given block in this cache.
*
* @param blockNumber the block number, used as a key for blocks map.
@@ -278,6 +370,7 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
} finally {
entry.releaseLock(Entry.LockType.READ);
}
+ addToLinkedListHead(entry);
return;
}
@@ -299,9 +392,65 @@ public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
// Update stream_read_blocks_in_cache stats only after blocks map is updated with new file
// entry to avoid any discrepancy related to the value of stream_read_blocks_in_cache.
// If stream_read_blocks_in_cache is updated before updating the blocks map here, closing of
- // the input stream can lead to the removal of the cache file even before blocks is added with
- // the new cache file, leading to incorrect value of stream_read_blocks_in_cache.
+ // the input stream can lead to the removal of the cache file even before blocks is added
+ // with the new cache file, leading to incorrect value of stream_read_blocks_in_cache.
prefetchingStatistics.blockAddedToFileCache();
+ addToLinkedListAndEvictIfRequired(entry);
+ }
+
+ /**
+ * Add the given entry to the head of the linked list and if the LRU cache size
+ * exceeds the max limit, evict tail of the LRU linked list.
+ *
+ * @param entry Block entry to add.
+ */
+ private void addToLinkedListAndEvictIfRequired(Entry entry) {
+ blocksLock.writeLock().lock();
+ try {
+ addToHeadOfLinkedList(entry);
+ entryListSize++;
+ if (entryListSize > maxBlocksCount && !closed.get()) {
+ Entry elementToPurge = tail;
+ tail = tail.getPrevious();
+ if (tail == null) {
+ tail = head;
+ }
+ tail.setNext(null);
+ elementToPurge.setPrevious(null);
+ deleteBlockFileAndEvictCache(elementToPurge);
+ }
+ } finally {
+ blocksLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Delete cache file as part of the block cache LRU eviction.
+ *
+ * @param elementToPurge Block entry to evict.
+ */
+ private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
+ boolean lockAcquired = elementToPurge.takeLock(Entry.LockType.WRITE,
+ PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
+ PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+ if (!lockAcquired) {
+ LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+ + " be acquired within {} {}", elementToPurge.path,
+ PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
+ PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+ } else {
+ try {
+ if (Files.deleteIfExists(elementToPurge.path)) {
+ entryListSize--;
+ prefetchingStatistics.blockRemovedFromFileCache();
+ blocks.remove(elementToPurge.blockNumber);
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to delete cache file {}", elementToPurge.path, e);
+ } finally {
+ elementToPurge.releaseLock(Entry.LockType.WRITE);
+ }
+ }
}
private static final Set<? extends OpenOption> CREATE_OPTIONS =
@@ -337,30 +486,38 @@ protected Path getCacheFilePath(final Configuration conf,
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
LOG.debug(getStats());
- int numFilesDeleted = 0;
+ deleteCacheFiles();
+ }
+ }
- for (Entry entry : blocks.values()) {
- boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT,
- PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
- if (!lockAcquired) {
- LOG.error("Cache file {} deletion would not be attempted as write lock could not"
- + " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT,
- PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
- continue;
- }
- try {
- Files.deleteIfExists(entry.path);
+ /**
+ * Delete cache files as part of the close call.
+ */
+ private void deleteCacheFiles() {
+ int numFilesDeleted = 0;
+ for (Entry entry : blocks.values()) {
+ boolean lockAcquired =
+ entry.takeLock(Entry.LockType.WRITE, PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
+ PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+ if (!lockAcquired) {
+ LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+ + " be acquired within {} {}", entry.path,
+ PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
+ PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+ continue;
+ }
+ try {
+ if (Files.deleteIfExists(entry.path)) {
prefetchingStatistics.blockRemovedFromFileCache();
numFilesDeleted++;
- } catch (IOException e) {
- LOG.warn("Failed to delete cache file {}", entry.path, e);
- } finally {
- entry.releaseLock(Entry.LockType.WRITE);
}
+ } catch (IOException e) {
+ LOG.warn("Failed to delete cache file {}", entry.path, e);
+ } finally {
+ entry.releaseLock(Entry.LockType.WRITE);
}
-
- LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted);
}
+ LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted);
}
@Override
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java
index 3b60c1c..b32ce20 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java
@@ -45,7 +45,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
public void testArgChecks() throws Exception {
// Should not throw.
BlockCache cache =
- new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance());
+ new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2);
ByteBuffer buffer = ByteBuffer.allocate(16);
@@ -55,7 +55,7 @@ public void testArgChecks() throws Exception {
intercept(NullPointerException.class, null,
- () -> new SingleFilePerBlockCache(null));
+ () -> new SingleFilePerBlockCache(null, 2));
}
@@ -63,7 +63,7 @@ public void testArgChecks() throws Exception {
@Test
public void testPutAndGet() throws Exception {
BlockCache cache =
- new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance());
+ new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2);
ByteBuffer buffer1 = ByteBuffer.allocate(BUFFER_SIZE);
for (byte i = 0; i < BUFFER_SIZE; i++) {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index ce65cf6..6c1d637 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1276,4 +1276,16 @@ private Constants() {
public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED =
"fs.s3a.capability.multipart.uploads.enabled";
+ /**
+ * Prefetch max blocks count config.
+ * Value = {@value}
+ */
+ public static final String PREFETCH_MAX_BLOCKS_COUNT = "fs.s3a.prefetch.max.blocks.count";
+
+ /**
+ * Default value for max blocks count config.
+ * Value = {@value}
+ */
+ public static final int DEFAULT_PREFETCH_MAX_BLOCKS_COUNT = 4;
+
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java
index c166943..a029220 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java
@@ -33,6 +33,9 @@
import org.apache.hadoop.fs.impl.prefetch.Validate;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
+
/**
* Provides access to S3 file one block at a time.
*/
@@ -67,7 +70,13 @@ public S3ACachingBlockManager(
Configuration conf,
LocalDirAllocator localDirAllocator) {
- super(futurePool, blockData, bufferPoolSize, streamStatistics, conf, localDirAllocator);
+ super(futurePool,
+ blockData,
+ bufferPoolSize,
+ streamStatistics,
+ conf,
+ localDirAllocator,
+ conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT));
Validate.checkNotNull(reader, "reader");
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
new file mode 100644
index 0000000..bbe0188
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+/**
+ * Test the prefetching input stream with LRU cache eviction on S3ACachingInputStream.
+ */
+@RunWith(Parameterized.class)
+public class ITestS3APrefetchingLruEviction extends AbstractS3ACostTest {
+
+ private final String maxBlocks;
+
+ @Parameterized.Parameters(name = "max-blocks-{0}")
+ public static Collection<Object[]> params() {
+ return Arrays.asList(new Object[][]{
+ {"1"},
+ {"2"},
+ {"3"},
+ {"4"}
+ });
+ }
+
+ public ITestS3APrefetchingLruEviction(final String maxBlocks) {
+ super(true);
+ this.maxBlocks = maxBlocks;
+ }
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestS3APrefetchingLruEviction.class);
+
+ private static final int S_1K = 1024;
+ // Path for file which should have length > block size so S3ACachingInputStream is used
+ private Path largeFile;
+ private FileSystem largeFileFS;
+ private int blockSize;
+
+ private static final int TIMEOUT_MILLIS = 5000;
+ private static final int INTERVAL_MILLIS = 500;
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
+ S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_MAX_BLOCKS_COUNT);
+ conf.setBoolean(PREFETCH_ENABLED_KEY, true);
+ conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks));
+ return conf;
+ }
+
+ @Override
+ public void teardown() throws Exception {
+ super.teardown();
+ cleanupWithLogger(LOG, largeFileFS);
+ largeFileFS = null;
+ }
+
+ private void openFS() throws Exception {
+ Configuration conf = getConfiguration();
+ String largeFileUri = S3ATestUtils.getCSVTestFile(conf);
+
+ largeFile = new Path(largeFileUri);
+ blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
+ largeFileFS = new S3AFileSystem();
+ largeFileFS.initialize(new URI(largeFileUri), getConfiguration());
+ }
+
+ @Test
+ public void testSeeksWithLruEviction() throws Throwable {
+ IOStatistics ioStats;
+ openFS();
+
+ ExecutorService executorService = Executors.newFixedThreadPool(5,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("testSeeksWithLruEviction-%d")
+ .build());
+ CountDownLatch countDownLatch = new CountDownLatch(7);
+
+ try (FSDataInputStream in = largeFileFS.open(largeFile)) {
+ ioStats = in.getIOStatistics();
+ // tests to add multiple blocks in the prefetch cache
+ // and let LRU eviction take place as more cache entries
+ // are added with multiple block reads.
+
+ // Don't read block 0 completely
+ executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
+ in,
+ 0,
+ blockSize - S_1K * 10));
+
+ // Seek to block 1 and don't read completely
+ executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
+ in,
+ blockSize,
+ 2 * S_1K));
+
+ // Seek to block 2 and don't read completely
+ executorService.submit(() -> readFullyWithSeek(countDownLatch,
+ in,
+ blockSize * 2L,
+ 2 * S_1K));
+
+ // Seek to block 3 and don't read completely
+ executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
+ in,
+ blockSize * 3L,
+ 2 * S_1K));
+
+ // Seek to block 4 and don't read completely
+ executorService.submit(() -> readFullyWithSeek(countDownLatch,
+ in,
+ blockSize * 4L,
+ 2 * S_1K));
+
+ // Seek to block 5 and don't read completely
+ executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
+ in,
+ blockSize * 5L,
+ 2 * S_1K));
+
+ // backward seek, can't use block 0 as it is evicted
+ executorService.submit(() -> readFullyWithSeek(countDownLatch,
+ in,
+ S_1K * 5,
+ 2 * S_1K));
+
+ countDownLatch.await();
+
+ // expect 3 blocks as rest are to be evicted by LRU
+ LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
+ LOG.info("IO stats: {}", ioStats);
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE,
+ Integer.parseInt(maxBlocks));
+ });
+ // let LRU evictions settle down, if any
+ Thread.sleep(TIMEOUT_MILLIS);
+ } finally {
+ executorService.shutdownNow();
+ executorService.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
+ LOG.info("IO stats: {}", ioStats);
+ verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
+ });
+ }
+
+ /**
+ * Read the bytes from the given position in the stream to a new buffer using the positioned
+ * readable.
+ *
+ * @param countDownLatch count down latch to mark the operation completed.
+ * @param in input stream.
+ * @param position position in the given input stream to seek from.
+ * @param len the number of bytes to read.
+ * @return true if the read operation is successful.
+ */
+ private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDataInputStream in,
+ long position, int len) {
+ byte[] buffer = new byte[blockSize];
+ // Don't read block 0 completely
+ try {
+ in.readFully(position, buffer, 0, len);
+ countDownLatch.countDown();
+ return true;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ /**
+ * Read the bytes from the given position in the stream to a new buffer using seek followed by
+ * input stream read.
+ *
+ * @param countDownLatch count down latch to mark the operation completed.
+ * @param in input stream.
+ * @param position position in the given input stream to seek from.
+ * @param len the number of bytes to read.
+ * @return true if the read operation is successful.
+ */
+ private boolean readFullyWithSeek(CountDownLatch countDownLatch, FSDataInputStream in,
+ long position, int len) {
+ byte[] buffer = new byte[blockSize];
+ // Don't read block 0 completely
+ try {
+ in.seek(position);
+ in.readFully(buffer, 0, len);
+ countDownLatch.countDown();
+ return true;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java
index cf6aa7b..6cf2ab2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java
@@ -44,6 +44,7 @@
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.impl.prefetch.SingleFilePerBlockCache;
import org.apache.hadoop.fs.impl.prefetch.Validate;
+import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
@@ -314,7 +315,8 @@ public static class FakeS3FilePerBlockCache extends SingleFilePerBlockCache {
private final int writeDelay;
public FakeS3FilePerBlockCache(int readDelay, int writeDelay) {
- super(new EmptyS3AStatisticsContext().newInputStreamStatistics());
+ super(new EmptyS3AStatisticsContext().newInputStreamStatistics(),
+ Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT);
this.files = new ConcurrentHashMap<>();
this.readDelay = readDelay;
this.writeDelay = writeDelay;
@@ -387,7 +389,7 @@ public int read(ByteBuffer buffer, long offset, int size)
}
@Override
- protected BlockCache createCache() {
+ protected BlockCache createCache(int maxBlocksCount) {
final int readDelayMs = 50;
final int writeDelayMs = 200;
return new FakeS3FilePerBlockCache(readDelayMs, writeDelayMs);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java
index cbfa643..8ec94d4 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java
@@ -37,7 +37,9 @@
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT;
import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assert.assertEquals;
@@ -173,6 +175,10 @@ protected void cachePut(int blockNumber,
super.cachePut(blockNumber, buffer);
}
}
+
+ public Configuration getConf() {
+ return CONF;
+ }
}
// @Ignore
@@ -285,8 +291,11 @@ streamStatistics, conf, new LocalDirAllocator(
blockManager.requestCaching(data);
}
- waitForCaching(blockManager, blockData.getNumBlocks());
- assertEquals(blockData.getNumBlocks(), blockManager.numCached());
+ waitForCaching(blockManager, Math.min(blockData.getNumBlocks(),
+ conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)));
+ assertEquals(Math.min(blockData.getNumBlocks(),
+ conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)),
+ blockManager.numCached());
assertEquals(0, this.totalErrors(blockManager));
}
@@ -330,8 +339,11 @@ public void testCachingOfGetHelper(boolean forceCachingFailure)
}
blockManager.requestCaching(data);
- waitForCaching(blockManager, expectedNumSuccesses);
- assertEquals(expectedNumSuccesses, blockManager.numCached());
+ waitForCaching(blockManager, Math.min(expectedNumSuccesses, blockManager.getConf()
+ .getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)));
+ assertEquals(Math.min(expectedNumSuccesses, blockManager.getConf()
+ .getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)),
+ blockManager.numCached());
if (forceCachingFailure) {
assertEquals(expectedNumErrors, this.totalErrors(blockManager));