HADOOP-18392. Propagate vectored s3a input stream stats to file system stats. (#4704)

part of HADOOP-18103.

Contributed By: Mukund Thakur
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
index 379b992..c76f183 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
@@ -24,11 +24,10 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.function.IntFunction;
 
 import org.assertj.core.api.Assertions;
-import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -43,13 +42,14 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.impl.FutureIOSupport;
 import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
-import org.apache.hadoop.test.LambdaTestUtils;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
 
 @RunWith(Parameterized.class)
 public abstract class AbstractContractVectoredReadTest extends AbstractFSContractTestBase {
@@ -281,16 +281,11 @@
       in.readVectored(fileRanges, allocate);
       for (FileRange res : fileRanges) {
         CompletableFuture<ByteBuffer> data = res.getData();
-        try {
-          ByteBuffer buffer = data.get();
-          // Shouldn't reach here.
-          Assert.fail("EOFException must be thrown while reading EOF");
-        } catch (ExecutionException ex) {
-          // ignore as expected.
-        } catch (Exception ex) {
-          LOG.error("Exception while running vectored read ", ex);
-          Assert.fail("Exception while running vectored read " + ex);
-        }
+        interceptFuture(EOFException.class,
+                "",
+                ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+                TimeUnit.SECONDS,
+                data);
       }
     }
   }
@@ -410,7 +405,7 @@
             fs.openFile(path(VECTORED_READ_FILE_NAME))
                     .build();
     try (FSDataInputStream in = builder.get()) {
-      LambdaTestUtils.intercept(clazz,
+      intercept(clazz,
           () -> in.readVectored(fileRanges, allocate));
     }
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index dfe9fdf..2dc88ee 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -308,6 +308,23 @@
       StreamStatisticNames.STREAM_READ_OPERATIONS,
       "Count of read() operations in an input stream",
       TYPE_COUNTER),
+  STREAM_READ_VECTORED_OPERATIONS(
+          StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+          "Count of readVectored() operations in an input stream.",
+          TYPE_COUNTER),
+  STREAM_READ_VECTORED_READ_BYTES_DISCARDED(
+          StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+          "Count of bytes discarded during readVectored() operation." +
+                  " in an input stream",
+          TYPE_COUNTER),
+  STREAM_READ_VECTORED_INCOMING_RANGES(
+          StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+          "Count of incoming file ranges during readVectored() operation.",
+          TYPE_COUNTER),
+  STREAM_READ_VECTORED_COMBINED_RANGES(
+          StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+          "Count of combined file ranges during readVectored() operation.",
+          TYPE_COUNTER),
   STREAM_READ_REMOTE_STREAM_ABORTED(
       StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
       "Duration of aborting a remote stream during stream IO",
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
index 84a90ba..4c357e2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
@@ -176,146 +176,172 @@
    * */
   @Test
   public void testNormalReadVsVectoredReadStatsCollection() throws Exception {
-    FileSystem fs = getTestFileSystemWithReadAheadDisabled();
-    List<FileRange> fileRanges = new ArrayList<>();
-    fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
-    fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
-    fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
-    fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
-    fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
 
-    FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
-    CompletableFuture<FSDataInputStream> builder =
-            fs.openFile(path(VECTORED_READ_FILE_NAME))
-                    .withFileStatus(fileStatus)
-                    .build();
-    try (FSDataInputStream in = builder.get()) {
-      in.readVectored(fileRanges, getAllocate());
-      validateVectoredReadResult(fileRanges, DATASET);
-      returnBuffersToPoolPostRead(fileRanges, getPool());
+    try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) {
+      List<FileRange> fileRanges = new ArrayList<>();
+      fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+      fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+      fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+      fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+      fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
 
-      // audit the io statistics for this stream
-      IOStatistics st = in.getIOStatistics();
-      LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st));
+      FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
+      CompletableFuture<FSDataInputStream> builder =
+              fs.openFile(path(VECTORED_READ_FILE_NAME))
+                      .withFileStatus(fileStatus)
+                      .build();
+      try (FSDataInputStream in = builder.get()) {
+        in.readVectored(fileRanges, getAllocate());
+        validateVectoredReadResult(fileRanges, DATASET);
+        returnBuffersToPoolPostRead(fileRanges, getPool());
 
-      // the vectored io operation must be tracked
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
-              1);
+        // audit the io statistics for this stream
+        IOStatistics st = in.getIOStatistics();
+        LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st));
 
-      // the vectored io operation is being called with 5 input ranges.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
-              5);
+        // the vectored io operation must be tracked
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+                1);
 
-      // 5 input ranges got combined in 3 as some of them are close.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
-              3);
+        // the vectored io operation is being called with 5 input ranges.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+                5);
 
-      // number of bytes discarded will be based on the above input ranges.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
-              5944);
+        // 5 input ranges got combined in 3 as some of them are close.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+                3);
 
-      verifyStatisticCounterValue(st,
-              StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
-              3);
+        // number of bytes discarded will be based on the above input ranges.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+                5944);
 
-      // read bytes should match the sum of requested length for each input ranges.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_BYTES,
-              1424);
+        verifyStatisticCounterValue(st,
+                StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+                3);
 
-    }
+        // read bytes should match the sum of requested length for each input ranges.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_BYTES,
+                1424);
 
-    CompletableFuture<FSDataInputStream> builder1 =
-            fs.openFile(path(VECTORED_READ_FILE_NAME))
-                    .withFileStatus(fileStatus)
-                    .build();
-
-    try (FSDataInputStream in = builder1.get()) {
-      for (FileRange range : fileRanges) {
-        byte[] temp = new byte[range.getLength()];
-        in.readFully((int) range.getOffset(), temp, 0, range.getLength());
       }
 
-      // audit the statistics for this stream
-      IOStatistics st = in.getIOStatistics();
-      LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st));
+      CompletableFuture<FSDataInputStream> builder1 =
+              fs.openFile(path(VECTORED_READ_FILE_NAME))
+                      .withFileStatus(fileStatus)
+                      .build();
 
-      verifyStatisticCounterValue(st,
+      try (FSDataInputStream in = builder1.get()) {
+        for (FileRange range : fileRanges) {
+          byte[] temp = new byte[range.getLength()];
+          in.readFully((int) range.getOffset(), temp, 0, range.getLength());
+        }
+
+        // audit the statistics for this stream
+        IOStatistics st = in.getIOStatistics();
+        LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st));
+
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+                0);
+
+        // all other counter values consistent.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+                0);
+        verifyStatisticCounterValue(st,
+                StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+                5);
+
+        // read bytes should match the sum of requested length for each input ranges.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_BYTES,
+                1424);
+      }
+      // validate stats are getting merged at fs instance level.
+      IOStatistics fsStats = fs.getIOStatistics();
+      // only 1 vectored io call is made in this fs instance.
+      verifyStatisticCounterValue(fsStats,
               StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
-              0);
-
-      // all other counter values consistent.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
-              0);
-      verifyStatisticCounterValue(st,
+              1);
+      // 8 get requests were made in this fs instance.
+      verifyStatisticCounterValue(fsStats,
               StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
-              5);
+              8);
 
-      // read bytes should match the sum of requested length for each input ranges.
-      verifyStatisticCounterValue(st,
+      verifyStatisticCounterValue(fsStats,
               StreamStatisticNames.STREAM_READ_BYTES,
-              1424);
+              2848);
     }
   }
 
   @Test
   public void testMultiVectoredReadStatsCollection() throws Exception {
-    FileSystem fs = getTestFileSystemWithReadAheadDisabled();
-    List<FileRange> ranges1 = getConsecutiveRanges();
-    List<FileRange> ranges2 = getConsecutiveRanges();
-    FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
-    CompletableFuture<FSDataInputStream> builder =
-            fs.openFile(path(VECTORED_READ_FILE_NAME))
-                    .withFileStatus(fileStatus)
-                    .build();
-    try (FSDataInputStream in = builder.get()) {
-      in.readVectored(ranges1, getAllocate());
-      in.readVectored(ranges2, getAllocate());
-      validateVectoredReadResult(ranges1, DATASET);
-      validateVectoredReadResult(ranges2, DATASET);
-      returnBuffersToPoolPostRead(ranges1, getPool());
-      returnBuffersToPoolPostRead(ranges2, getPool());
+    try (S3AFileSystem fs = getTestFileSystemWithReadAheadDisabled()) {
+      List<FileRange> ranges1 = getConsecutiveRanges();
+      List<FileRange> ranges2 = getConsecutiveRanges();
+      FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
+      CompletableFuture<FSDataInputStream> builder =
+              fs.openFile(path(VECTORED_READ_FILE_NAME))
+                      .withFileStatus(fileStatus)
+                      .build();
+      try (FSDataInputStream in = builder.get()) {
+        in.readVectored(ranges1, getAllocate());
+        in.readVectored(ranges2, getAllocate());
+        validateVectoredReadResult(ranges1, DATASET);
+        validateVectoredReadResult(ranges2, DATASET);
+        returnBuffersToPoolPostRead(ranges1, getPool());
+        returnBuffersToPoolPostRead(ranges2, getPool());
 
-      // audit the io statistics for this stream
-      IOStatistics st = in.getIOStatistics();
+        // audit the io statistics for this stream
+        IOStatistics st = in.getIOStatistics();
 
-      // 2 vectored io calls are made above.
-      verifyStatisticCounterValue(st,
+        // 2 vectored io calls are made above.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+                2);
+
+        // 2 vectored io operation is being called with 2 input ranges.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+                4);
+
+        // 2 ranges are getting merged in 1 during both vectored io operation.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+                2);
+
+        // number of bytes discarded will be 0 as the ranges are consecutive.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+                0);
+        // only 2 http get request will be made because ranges in both range list will be merged
+        // to 1 because they are consecutive.
+        verifyStatisticCounterValue(st,
+                StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+                2);
+        // read bytes should match the sum of requested length for each input ranges.
+        verifyStatisticCounterValue(st,
+                StreamStatisticNames.STREAM_READ_BYTES,
+                2000);
+      }
+      IOStatistics fsStats = fs.getIOStatistics();
+      // 2 vectored io calls are made in this fs instance.
+      verifyStatisticCounterValue(fsStats,
               StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
               2);
-
-      // 2 vectored io operation is being called with 2 input ranges.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
-              4);
-
-      // 2 ranges are getting merged in 1 during both vectored io operation.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
-              2);
-
-      // number of bytes discarded will be 0 as the ranges are consecutive.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
-              0);
-      // only 2 http get request will be made because ranges in both range list will be merged
-      // to 1 because they are consecutive.
-      verifyStatisticCounterValue(st,
+      // 2 get requests were made in this fs instance.
+      verifyStatisticCounterValue(fsStats,
               StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
               2);
-      // read bytes should match the sum of requested length for each input ranges.
-      verifyStatisticCounterValue(st,
-              StreamStatisticNames.STREAM_READ_BYTES,
-              2000);
     }
   }
 
-  private FileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
+  private S3AFileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
     Configuration conf = getFileSystem().getConf();
     // also resetting the min seek and max size values is important
     // as this same test suite has test which overrides these params.