blob: 4998cbc946e12c0c65de17defb5f78aaaeee0a7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_MAX;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
/**
* Test the prefetching input stream, validates that the underlying S3ACachingInputStream and
* S3AInMemoryInputStream are working as expected.
*/
public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
public ITestS3APrefetchingInputStream() {
super(true);
}
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3APrefetchingInputStream.class);
private static final int S_500 = 512;
private static final int S_1K = S_500 * 2;
private static final int S_1M = S_1K * S_1K;
private int numBlocks;
// Size should be > block size so S3ACachingInputStream is used
private long largeFileSize;
// Size should be < block size so S3AInMemoryInputStream is used
private static final int SMALL_FILE_SIZE = S_1K * 9;
private static final int TIMEOUT_MILLIS = 5000;
private static final int INTERVAL_MILLIS = 500;
private static final int BLOCK_SIZE = S_1K * 10;
@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
conf.setBoolean(PREFETCH_ENABLED_KEY, true);
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
return conf;
}
private void createLargeFile() throws Exception {
byte[] data = ContractTestUtils.dataset(S_1K * 72, 'x', 26);
Path largeFile = methodPath();
FileSystem largeFileFS = getFileSystem();
ContractTestUtils.writeDataset(getFileSystem(), largeFile, data, data.length, 16, true);
FileStatus fileStatus = largeFileFS.getFileStatus(largeFile);
largeFileSize = fileStatus.getLen();
numBlocks = calculateNumBlocks(largeFileSize, BLOCK_SIZE);
}
private static int calculateNumBlocks(long largeFileSize, int blockSize) {
if (largeFileSize == 0) {
return 0;
} else {
return ((int) (largeFileSize / blockSize)) + (largeFileSize % blockSize > 0 ? 1 : 0);
}
}
@Test
public void testReadLargeFileFully() throws Throwable {
describe("read a large file fully, uses S3ACachingInputStream");
skipIfClientSideEncryption();
IOStatistics ioStats;
createLargeFile();
try (FSDataInputStream in = getFileSystem().open(methodPath())) {
ioStats = in.getIOStatistics();
byte[] buffer = new byte[S_1M * 10];
long bytesRead = 0;
while (bytesRead < largeFileSize) {
in.readFully(buffer, 0, (int) Math.min(buffer.length, largeFileSize - bytesRead));
bytesRead += buffer.length;
// Blocks are fully read, no blocks should be cached
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE,
0);
}
// Assert that first block is read synchronously, following blocks are prefetched
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS,
numBlocks - 1);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks);
}
// Verify that once stream is closed, all memory is freed
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
assertThatStatisticMaximum(ioStats,
ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
}
@Test
public void testReadLargeFileFullyLazySeek() throws Throwable {
describe("read a large file using readFully(position,buffer,offset,length),"
+ " uses S3ACachingInputStream");
skipIfClientSideEncryption();
IOStatistics ioStats;
createLargeFile();
try (FSDataInputStream in = getFileSystem().open(methodPath())) {
ioStats = in.getIOStatistics();
byte[] buffer = new byte[S_1M * 10];
long bytesRead = 0;
while (bytesRead < largeFileSize) {
in.readFully(bytesRead, buffer, 0, (int) Math.min(buffer.length,
largeFileSize - bytesRead));
bytesRead += buffer.length;
// Blocks are fully read, no blocks should be cached
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE,
0);
}
// Assert that first block is read synchronously, following blocks are prefetched
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS,
numBlocks - 1);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, numBlocks);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, numBlocks);
}
// Verify that once stream is closed, all memory is freed
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
}
@Test
public void testRandomReadLargeFile() throws Throwable {
describe("random read on a large file, uses S3ACachingInputStream");
skipIfClientSideEncryption();
IOStatistics ioStats;
createLargeFile();
try (FSDataInputStream in = getFileSystem().open(methodPath())) {
ioStats = in.getIOStatistics();
byte[] buffer = new byte[BLOCK_SIZE];
// Don't read block 0 completely so it gets cached on read after seek
in.read(buffer, 0, BLOCK_SIZE - S_500 * 10);
// Seek to block 2 and read all of it
in.seek(BLOCK_SIZE * 2);
in.read(buffer, 0, BLOCK_SIZE);
// Seek to block 4 but don't read: noop.
in.seek(BLOCK_SIZE * 4);
// Backwards seek, will use cached block 0
in.seek(S_500 * 5);
in.read();
// Expected to get block 0 (partially read), 1 (prefetch), 2 (fully read), 3 (prefetch)
// Blocks 0, 1, 3 were not fully read, so remain in the file cache
LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
LOG.info("IO stats: {}", ioStats);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 4);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 4);
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 2);
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 3);
});
}
LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
LOG.info("IO stats: {}", ioStats);
verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
});
}
@Test
public void testRandomReadSmallFile() throws Throwable {
describe("random read on a small file, uses S3AInMemoryInputStream");
byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
Path smallFile = path("randomReadSmallFile");
ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);
try (FSDataInputStream in = getFileSystem().open(smallFile)) {
IOStatistics ioStats = in.getIOStatistics();
byte[] buffer = new byte[SMALL_FILE_SIZE];
in.read(buffer, 0, S_1K * 2);
in.seek(S_1K * 7);
in.read(buffer, 0, S_1K * 2);
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
verifyStatisticCounterValue(ioStats, STREAM_READ_OPENED, 1);
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCH_OPERATIONS, 0);
// The buffer pool is not used
verifyStatisticGaugeValue(ioStats, STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
// no prefetch ops, so no action_executor_acquired
assertThatStatisticMaximum(ioStats,
ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isEqualTo(-1);
}
}
@Test
public void testStatusProbesAfterClosingStream() throws Throwable {
describe("When the underlying input stream is closed, the prefetch input stream"
+ " should still support some status probes");
byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
Path smallFile = methodPath();
ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);
FSDataInputStream in = getFileSystem().open(smallFile);
byte[] buffer = new byte[SMALL_FILE_SIZE];
in.read(buffer, 0, S_1K * 2);
in.seek(S_1K * 7);
in.read(buffer, 0, S_1K * 2);
long pos = in.getPos();
IOStatistics ioStats = in.getIOStatistics();
S3AInputStreamStatistics inputStreamStatistics =
((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();
assertNotNull("Prefetching input IO stats should not be null", ioStats);
assertNotNull("Prefetching input stream stats should not be null", inputStreamStatistics);
assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
pos);
in.close();
// status probes after closing the input stream
long newPos = in.getPos();
IOStatistics newIoStats = in.getIOStatistics();
S3AInputStreamStatistics newInputStreamStatistics =
((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();
assertNotNull("Prefetching input IO stats should not be null", newIoStats);
assertNotNull("Prefetching input stream stats should not be null", newInputStreamStatistics);
assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
newPos);
// compare status probes after closing of the stream with status probes done before
// closing the stream
assertEquals("Position retrieved through stream before and after closing should match", pos,
newPos);
assertEquals("IO stats retrieved through stream before and after closing should match", ioStats,
newIoStats);
assertEquals("Stream stats retrieved through stream before and after closing should match",
inputStreamStatistics, newInputStreamStatistics);
assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10));
}
}