HADOOP-18227. Add input stream IOStats for vectored IO api in S3A. (#4636)
part of HADOOP-18103.
Contributed By: Mukund Thakur
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index ca755f0..bb697ad 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -47,7 +47,7 @@
public static final String STREAM_READ_ABORTED = "stream_aborted";
/**
- * Bytes read from an input stream in read() calls.
+ * Bytes read from an input stream in read()/readVectored() calls.
* Does not include bytes read and then discarded in seek/close etc.
* These are the bytes returned to the caller.
* Value: {@value}.
@@ -111,6 +111,34 @@
"stream_read_operations";
/**
+ * Count of readVectored() operations in an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_VECTORED_OPERATIONS =
+ "stream_read_vectored_operations";
+
+ /**
+ * Count of bytes discarded during readVectored() operation
+ * in an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_VECTORED_READ_BYTES_DISCARDED =
+ "stream_read_vectored_read_bytes_discarded";
+
+ /**
+ * Count of incoming file ranges during readVectored() operation.
+ * Value: {@value}
+ */
+ public static final String STREAM_READ_VECTORED_INCOMING_RANGES =
+ "stream_read_vectored_incoming_ranges";
+ /**
+ * Count of combined file ranges during readVectored() operation.
+ * Value: {@value}
+ */
+ public static final String STREAM_READ_VECTORED_COMBINED_RANGES =
+ "stream_read_vectored_combined_ranges";
+
+ /**
* Count of incomplete read() operations in an input stream,
* that is, when the bytes returned were less than that requested.
* Value: {@value}.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
index 77bcc49..379b992 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
@@ -84,6 +84,10 @@
return allocate;
}
+ public WeakReferencedElasticByteBufferPool getPool() {
+ return pool;
+ }
+
@Override
public void setup() throws Exception {
super.setup();
@@ -382,6 +386,13 @@
return fileRanges;
}
+ protected List<FileRange> getConsecutiveRanges() {
+ List<FileRange> fileRanges = new ArrayList<>();
+ fileRanges.add(FileRange.createFileRange(100, 500));
+ fileRanges.add(FileRange.createFileRange(600, 500));
+ return fileRanges;
+ }
+
/**
* Validate that exceptions must be thrown during a vectored
* read operation with specific input ranges.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 178a807..c20c3a0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -963,7 +963,6 @@
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
-
LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges);
checkNotClosed();
if (stopVectoredIOOperations.getAndSet(false)) {
@@ -978,6 +977,7 @@
if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
LOG.debug("Not merging the ranges as they are disjoint");
+ streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size());
for (FileRange range: sortedRanges) {
ByteBuffer buffer = allocate.apply(range.getLength());
unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
@@ -987,6 +987,7 @@
List<CombinedFileRange> combinedFileRanges = mergeSortedRanges(sortedRanges,
1, minSeekForVectorReads(),
maxReadSizeForVectorReads());
+ streamStatistics.readVectoredOperationStarted(sortedRanges.size(), combinedFileRanges.size());
LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
ranges.size(), combinedFileRanges.size());
for (CombinedFileRange combinedFileRange: combinedFileRanges) {
@@ -1088,6 +1089,7 @@
}
drainBytes += readCount;
}
+ streamStatistics.readVectoredBytesDiscarded(drainBytes);
LOG.debug("{} bytes drained from stream ", drainBytes);
}
@@ -1168,6 +1170,8 @@
} else {
readByteArray(objectContent, buffer.array(), 0, length);
}
+ // update io stats.
+ incrementBytesRead(length);
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 67734b7..0a49f56 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -803,6 +803,10 @@
private final AtomicLong readOperations;
private final AtomicLong readFullyOperations;
private final AtomicLong seekOperations;
+ private final AtomicLong readVectoredOperations;
+ private final AtomicLong bytesDiscardedInVectoredIO;
+ private final AtomicLong readVectoredIncomingRanges;
+ private final AtomicLong readVectoredCombinedRanges;
/** Bytes read by the application and any when draining streams . */
private final AtomicLong totalBytesRead;
@@ -836,6 +840,10 @@
StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED,
StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
StreamStatisticNames.STREAM_READ_UNBUFFERED,
+ StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+ StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+ StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+ StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES)
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY)
.withDurationTracking(ACTION_HTTP_GET_REQUEST,
@@ -872,6 +880,14 @@
StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE);
readOperations = st.getCounterReference(
StreamStatisticNames.STREAM_READ_OPERATIONS);
+ readVectoredOperations = st.getCounterReference(
+ StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS);
+ bytesDiscardedInVectoredIO = st.getCounterReference(
+ StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED);
+ readVectoredIncomingRanges = st.getCounterReference(
+ StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES);
+ readVectoredCombinedRanges = st.getCounterReference(
+ StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES);
readFullyOperations = st.getCounterReference(
StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS);
seekOperations = st.getCounterReference(
@@ -1017,6 +1033,19 @@
}
}
+ @Override
+ public void readVectoredOperationStarted(int numIncomingRanges,
+ int numCombinedRanges) {
+ readVectoredIncomingRanges.addAndGet(numIncomingRanges);
+ readVectoredCombinedRanges.addAndGet(numCombinedRanges);
+ readVectoredOperations.incrementAndGet();
+ }
+
+ @Override
+ public void readVectoredBytesDiscarded(int discarded) {
+ bytesDiscardedInVectoredIO.addAndGet(discarded);
+ }
+
/**
* {@code close()} merges the stream statistics into the filesystem's
* instrumentation instance.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
index 539af2b..41a8f25 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
@@ -96,6 +96,20 @@
*/
void readOperationCompleted(int requested, int actual);
+ /**
+ * A vectored read operation has started..
+ * @param numIncomingRanges number of input ranges.
+ * @param numCombinedRanges number of combined ranges.
+ */
+ void readVectoredOperationStarted(int numIncomingRanges,
+ int numCombinedRanges);
+
+ /**
+ * Number of bytes discarded during vectored read.
+ * @param discarded discarded bytes during vectored read.
+ */
+ void readVectoredBytesDiscarded(int discarded);
+
@Override
void close();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
index 5c0995e..cea8be7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
@@ -196,6 +196,17 @@
}
@Override
+ public void readVectoredOperationStarted(int numIncomingRanges,
+ int numCombinedRanges) {
+
+ }
+
+ @Override
+ public void readVectoredBytesDiscarded(int discarded) {
+
+ }
+
+ @Override
public void close() {
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
index 18a727d..84a90ba 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
@@ -19,28 +19,41 @@
package org.apache.hadoop.fs.contract.s3a;
import java.io.EOFException;
+import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.test.LambdaTestUtils;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.test.MoreAsserts.assertEqual;
public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ITestS3AContractVectoredRead.class);
+
public ITestS3AContractVectoredRead(String bufferType) {
super(bufferType);
}
@@ -156,4 +169,162 @@
List<FileRange> fileRanges = getSampleSameRanges();
verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class);
}
+
+ /**
+ * As the minimum seek value is 4*1024, the first three ranges will be
+ * merged into and other two will remain as it is.
+ * */
+ @Test
+ public void testNormalReadVsVectoredReadStatsCollection() throws Exception {
+ FileSystem fs = getTestFileSystemWithReadAheadDisabled();
+ List<FileRange> fileRanges = new ArrayList<>();
+ fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+ fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+ fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+ FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
+ CompletableFuture<FSDataInputStream> builder =
+ fs.openFile(path(VECTORED_READ_FILE_NAME))
+ .withFileStatus(fileStatus)
+ .build();
+ try (FSDataInputStream in = builder.get()) {
+ in.readVectored(fileRanges, getAllocate());
+ validateVectoredReadResult(fileRanges, DATASET);
+ returnBuffersToPoolPostRead(fileRanges, getPool());
+
+ // audit the io statistics for this stream
+ IOStatistics st = in.getIOStatistics();
+ LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st));
+
+ // the vectored io operation must be tracked
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+ 1);
+
+ // the vectored io operation is being called with 5 input ranges.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+ 5);
+
+ // 5 input ranges got combined in 3 as some of them are close.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+ 3);
+
+ // number of bytes discarded will be based on the above input ranges.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+ 5944);
+
+ verifyStatisticCounterValue(st,
+ StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+ 3);
+
+ // read bytes should match the sum of requested length for each input ranges.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_BYTES,
+ 1424);
+
+ }
+
+ CompletableFuture<FSDataInputStream> builder1 =
+ fs.openFile(path(VECTORED_READ_FILE_NAME))
+ .withFileStatus(fileStatus)
+ .build();
+
+ try (FSDataInputStream in = builder1.get()) {
+ for (FileRange range : fileRanges) {
+ byte[] temp = new byte[range.getLength()];
+ in.readFully((int) range.getOffset(), temp, 0, range.getLength());
+ }
+
+ // audit the statistics for this stream
+ IOStatistics st = in.getIOStatistics();
+ LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st));
+
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+ 0);
+
+ // all other counter values consistent.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+ 0);
+ verifyStatisticCounterValue(st,
+ StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+ 5);
+
+ // read bytes should match the sum of requested length for each input ranges.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_BYTES,
+ 1424);
+ }
+ }
+
+ @Test
+ public void testMultiVectoredReadStatsCollection() throws Exception {
+ FileSystem fs = getTestFileSystemWithReadAheadDisabled();
+ List<FileRange> ranges1 = getConsecutiveRanges();
+ List<FileRange> ranges2 = getConsecutiveRanges();
+ FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
+ CompletableFuture<FSDataInputStream> builder =
+ fs.openFile(path(VECTORED_READ_FILE_NAME))
+ .withFileStatus(fileStatus)
+ .build();
+ try (FSDataInputStream in = builder.get()) {
+ in.readVectored(ranges1, getAllocate());
+ in.readVectored(ranges2, getAllocate());
+ validateVectoredReadResult(ranges1, DATASET);
+ validateVectoredReadResult(ranges2, DATASET);
+ returnBuffersToPoolPostRead(ranges1, getPool());
+ returnBuffersToPoolPostRead(ranges2, getPool());
+
+ // audit the io statistics for this stream
+ IOStatistics st = in.getIOStatistics();
+
+ // 2 vectored io calls are made above.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+ 2);
+
+ // 2 vectored io operation is being called with 2 input ranges.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+ 4);
+
+ // 2 ranges are getting merged in 1 during both vectored io operation.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+ 2);
+
+ // number of bytes discarded will be 0 as the ranges are consecutive.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+ 0);
+ // only 2 http get request will be made because ranges in both range list will be merged
+ // to 1 because they are consecutive.
+ verifyStatisticCounterValue(st,
+ StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+ 2);
+ // read bytes should match the sum of requested length for each input ranges.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_BYTES,
+ 2000);
+ }
+ }
+
+ private FileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
+ Configuration conf = getFileSystem().getConf();
+ // also resetting the min seek and max size values is important
+ // as this same test suite has test which overrides these params.
+ S3ATestUtils.removeBaseAndBucketOverrides(conf,
+ Constants.READAHEAD_RANGE,
+ Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
+ Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
+ S3ATestUtils.disableFilesystemCaching(conf);
+ conf.setInt(Constants.READAHEAD_RANGE, 0);
+ return S3ATestUtils.createTestFileSystem(conf);
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 48cb52c..6162ed1 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.DataInputBuffer;
@@ -69,6 +70,7 @@
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DateFormat;
@@ -1457,4 +1459,33 @@
+ " in " + secrets);
}
}
+
+
+ /**
+ * Get the input stream statistics of an input stream.
+ * Raises an exception if the inner stream is not an S3A input stream
+ * @param in wrapper
+ * @return the statistics for the inner stream
+ */
+ public static S3AInputStreamStatistics getInputStreamStatistics(
+ FSDataInputStream in) {
+ return getS3AInputStream(in).getS3AStreamStatistics();
+ }
+
+ /**
+ * Get the inner stream of an input stream.
+ * Raises an exception if the inner stream is not an S3A input stream
+ * @param in wrapper
+ * @return the inner stream
+ * @throws AssertionError if the inner stream is of the wrong type
+ */
+ public static S3AInputStream getS3AInputStream(
+ FSDataInputStream in) {
+ InputStream inner = in.getWrappedStream();
+ if (inner instanceof S3AInputStream) {
+ return (S3AInputStream) inner;
+ } else {
+ throw new AssertionError("Not an S3AInputStream: " + inner);
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
index d73a938..b8195cb 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
@@ -56,6 +56,8 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getInputStreamStatistics;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMinimum;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMaximumStatistic;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
index d95b46b..514c6cf 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
@@ -19,19 +19,14 @@
package org.apache.hadoop.fs.s3a.scale;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
-import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3ATestConstants;
import org.apache.hadoop.fs.s3a.Statistic;
-import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.InputStream;
-
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupGaugeStatistic;
@@ -155,34 +150,6 @@
}
/**
- * Get the input stream statistics of an input stream.
- * Raises an exception if the inner stream is not an S3A input stream
- * @param in wrapper
- * @return the statistics for the inner stream
- */
- protected S3AInputStreamStatistics getInputStreamStatistics(
- FSDataInputStream in) {
- return getS3AInputStream(in).getS3AStreamStatistics();
- }
-
- /**
- * Get the inner stream of an input stream.
- * Raises an exception if the inner stream is not an S3A input stream
- * @param in wrapper
- * @return the inner stream
- * @throws AssertionError if the inner stream is of the wrong type
- */
- protected S3AInputStream getS3AInputStream(
- FSDataInputStream in) {
- InputStream inner = in.getWrappedStream();
- if (inner instanceof S3AInputStream) {
- return (S3AInputStream) inner;
- } else {
- throw new AssertionError("Not an S3AInputStream: " + inner);
- }
- }
-
- /**
* Get the gauge value of a statistic from the
* IOStatistics of the filesystem. Raises an assertion if
* there is no such gauge.