HADOOP-18392. Propagate vectored s3a input stream stats to file system stats. (#4704)
part of HADOOP-18103.
Contributed By: Mukund Thakur
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
index 379b992..c76f183 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
@@ -24,11 +24,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.function.IntFunction;
import org.assertj.core.api.Assertions;
-import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -43,13 +42,14 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.impl.FutureIOSupport;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
-import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
@RunWith(Parameterized.class)
public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
@@ -281,16 +281,11 @@
in.readVectored(fileRanges, allocate);
for (FileRange res : fileRanges) {
CompletableFuture<ByteBuffer> data = res.getData();
- try {
- ByteBuffer buffer = data.get();
- // Shouldn't reach here.
- Assert.fail("EOFException must be thrown while reading EOF");
- } catch (ExecutionException ex) {
- // ignore as expected.
- } catch (Exception ex) {
- LOG.error("Exception while running vectored read ", ex);
- Assert.fail("Exception while running vectored read " + ex);
- }
+ interceptFuture(EOFException.class,
+ "",
+ ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+ TimeUnit.SECONDS,
+ data);
}
}
}
@@ -410,7 +405,7 @@
fs.openFile(path(VECTORED_READ_FILE_NAME))
.build();
try (FSDataInputStream in = builder.get()) {
- LambdaTestUtils.intercept(clazz,
+ intercept(clazz,
() -> in.readVectored(fileRanges, allocate));
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index dfe9fdf..2dc88ee 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -308,6 +308,23 @@
StreamStatisticNames.STREAM_READ_OPERATIONS,
"Count of read() operations in an input stream",
TYPE_COUNTER),
+ STREAM_READ_VECTORED_OPERATIONS(
+ StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+ "Count of readVectored() operations in an input stream.",
+ TYPE_COUNTER),
+ STREAM_READ_VECTORED_READ_BYTES_DISCARDED(
+ StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+ "Count of bytes discarded during readVectored() operation." +
+ " in an input stream",
+ TYPE_COUNTER),
+ STREAM_READ_VECTORED_INCOMING_RANGES(
+ StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+ "Count of incoming file ranges during readVectored() operation.",
+ TYPE_COUNTER),
+ STREAM_READ_VECTORED_COMBINED_RANGES(
+ StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+ "Count of combined file ranges during readVectored() operation.",
+ TYPE_COUNTER),
STREAM_READ_REMOTE_STREAM_ABORTED(
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
"Duration of aborting a remote stream during stream IO",
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
index 84a90ba..4c357e2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
@@ -176,146 +176,172 @@
* */
@Test
public void testNormalReadVsVectoredReadStatsCollection() throws Exception {
- FileSystem fs = getTestFileSystemWithReadAheadDisabled();
- List<FileRange> fileRanges = new ArrayList<>();
- fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
- fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
- fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
- fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
- fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
- FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
- CompletableFuture<FSDataInputStream> builder =
- fs.openFile(path(VECTORED_READ_FILE_NAME))
- .withFileStatus(fileStatus)
- .build();
- try (FSDataInputStream in = builder.get()) {
- in.readVectored(fileRanges, getAllocate());
- validateVectoredReadResult(fileRanges, DATASET);
- returnBuffersToPoolPostRead(fileRanges, getPool());
+ try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) {
+ List<FileRange> fileRanges = new ArrayList<>();
+ fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+ fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
- // audit the io statistics for this stream
- IOStatistics st = in.getIOStatistics();
- LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st));
+ FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
+ CompletableFuture<FSDataInputStream> builder =
+ fs.openFile(path(VECTORED_READ_FILE_NAME))
+ .withFileStatus(fileStatus)
+ .build();
+ try (FSDataInputStream in = builder.get()) {
+ in.readVectored(fileRanges, getAllocate());
+ validateVectoredReadResult(fileRanges, DATASET);
+ returnBuffersToPoolPostRead(fileRanges, getPool());
- // the vectored io operation must be tracked
- verifyStatisticCounterValue(st,
- StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
- 1);
+ // audit the io statistics for this stream
+ IOStatistics st = in.getIOStatistics();
+ LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st));
- // the vectored io operation is being called with 5 input ranges.
- verifyStatisticCounterValue(st,
- StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
- 5);
+ // the vectored io operation must be tracked
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+ 1);
- // 5 input ranges got combined in 3 as some of them are close.
- verifyStatisticCounterValue(st,
- StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
- 3);
+ // the vectored io operation is being called with 5 input ranges.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+ 5);
- // number of bytes discarded will be based on the above input ranges.
- verifyStatisticCounterValue(st,
- StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
- 5944);
+ // 5 input ranges got combined in 3 as some of them are close.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+ 3);
- verifyStatisticCounterValue(st,
- StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
- 3);
+ // number of bytes discarded will be based on the above input ranges.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+ 5944);
- // read bytes should match the sum of requested length for each input ranges.
- verifyStatisticCounterValue(st,
- StreamStatisticNames.STREAM_READ_BYTES,
- 1424);
+ verifyStatisticCounterValue(st,
+ StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+ 3);
- }
+ // read bytes should match the sum of requested length for each input ranges.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_BYTES,
+ 1424);
- CompletableFuture<FSDataInputStream> builder1 =
- fs.openFile(path(VECTORED_READ_FILE_NAME))
- .withFileStatus(fileStatus)
- .build();
-
- try (FSDataInputStream in = builder1.get()) {
- for (FileRange range : fileRanges) {
- byte[] temp = new byte[range.getLength()];
- in.readFully((int) range.getOffset(), temp, 0, range.getLength());
}
- // audit the statistics for this stream
- IOStatistics st = in.getIOStatistics();
- LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st));
+ CompletableFuture<FSDataInputStream> builder1 =
+ fs.openFile(path(VECTORED_READ_FILE_NAME))
+ .withFileStatus(fileStatus)
+ .build();
- verifyStatisticCounterValue(st,
+ try (FSDataInputStream in = builder1.get()) {
+ for (FileRange range : fileRanges) {
+ byte[] temp = new byte[range.getLength()];
+ in.readFully((int) range.getOffset(), temp, 0, range.getLength());
+ }
+
+ // audit the statistics for this stream
+ IOStatistics st = in.getIOStatistics();
+ LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st));
+
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+ 0);
+
+ // all other counter values consistent.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+ 0);
+ verifyStatisticCounterValue(st,
+ StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+ 5);
+
+ // read bytes should match the sum of requested length for each input ranges.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_BYTES,
+ 1424);
+ }
+ // validate stats are getting merged at fs instance level.
+ IOStatistics fsStats = fs.getIOStatistics();
+ // only 1 vectored io call is made in this fs instance.
+ verifyStatisticCounterValue(fsStats,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
- 0);
-
- // all other counter values consistent.
- verifyStatisticCounterValue(st,
- StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
- 0);
- verifyStatisticCounterValue(st,
+ 1);
+ // 8 get requests were made in this fs instance.
+ verifyStatisticCounterValue(fsStats,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
- 5);
+ 8);
- // read bytes should match the sum of requested length for each input ranges.
- verifyStatisticCounterValue(st,
+ verifyStatisticCounterValue(fsStats,
StreamStatisticNames.STREAM_READ_BYTES,
- 1424);
+ 2848);
}
}
@Test
public void testMultiVectoredReadStatsCollection() throws Exception {
- FileSystem fs = getTestFileSystemWithReadAheadDisabled();
- List<FileRange> ranges1 = getConsecutiveRanges();
- List<FileRange> ranges2 = getConsecutiveRanges();
- FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
- CompletableFuture<FSDataInputStream> builder =
- fs.openFile(path(VECTORED_READ_FILE_NAME))
- .withFileStatus(fileStatus)
- .build();
- try (FSDataInputStream in = builder.get()) {
- in.readVectored(ranges1, getAllocate());
- in.readVectored(ranges2, getAllocate());
- validateVectoredReadResult(ranges1, DATASET);
- validateVectoredReadResult(ranges2, DATASET);
- returnBuffersToPoolPostRead(ranges1, getPool());
- returnBuffersToPoolPostRead(ranges2, getPool());
+ try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) {
+ List<FileRange> ranges1 = getConsecutiveRanges();
+ List<FileRange> ranges2 = getConsecutiveRanges();
+ FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
+ CompletableFuture<FSDataInputStream> builder =
+ fs.openFile(path(VECTORED_READ_FILE_NAME))
+ .withFileStatus(fileStatus)
+ .build();
+ try (FSDataInputStream in = builder.get()) {
+ in.readVectored(ranges1, getAllocate());
+ in.readVectored(ranges2, getAllocate());
+ validateVectoredReadResult(ranges1, DATASET);
+ validateVectoredReadResult(ranges2, DATASET);
+ returnBuffersToPoolPostRead(ranges1, getPool());
+ returnBuffersToPoolPostRead(ranges2, getPool());
- // audit the io statistics for this stream
- IOStatistics st = in.getIOStatistics();
+ // audit the io statistics for this stream
+ IOStatistics st = in.getIOStatistics();
- // 2 vectored io calls are made above.
- verifyStatisticCounterValue(st,
+ // 2 vectored io calls are made above.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+ 2);
+
+ // 2 vectored io operation is being called with 2 input ranges.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+ 4);
+
+ // 2 ranges are getting merged in 1 during both vectored io operation.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+ 2);
+
+ // number of bytes discarded will be 0 as the ranges are consecutive.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+ 0);
+ // only 2 http get request will be made because ranges in both range list will be merged
+ // to 1 because they are consecutive.
+ verifyStatisticCounterValue(st,
+ StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+ 2);
+ // read bytes should match the sum of requested length for each input ranges.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_BYTES,
+ 2000);
+ }
+ IOStatistics fsStats = fs.getIOStatistics();
+ // 2 vectored io calls are made in this fs instance.
+ verifyStatisticCounterValue(fsStats,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
2);
-
- // 2 vectored io operation is being called with 2 input ranges.
- verifyStatisticCounterValue(st,
- StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
- 4);
-
- // 2 ranges are getting merged in 1 during both vectored io operation.
- verifyStatisticCounterValue(st,
- StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
- 2);
-
- // number of bytes discarded will be 0 as the ranges are consecutive.
- verifyStatisticCounterValue(st,
- StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
- 0);
- // only 2 http get request will be made because ranges in both range list will be merged
- // to 1 because they are consecutive.
- verifyStatisticCounterValue(st,
+ // 2 get requests were made in this fs instance.
+ verifyStatisticCounterValue(fsStats,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
2);
- // read bytes should match the sum of requested length for each input ranges.
- verifyStatisticCounterValue(st,
- StreamStatisticNames.STREAM_READ_BYTES,
- 2000);
}
}
- private FileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
+ private S3AFileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
Configuration conf = getFileSystem().getConf();
// also resetting the min seek and max size values is important
// as this same test suite has test which overrides these params.