HDDS-7135. ofs file input stream should support StreamCapabilities interface (#3694)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
index ae6b851..9ff481b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
@@ -21,12 +21,14 @@
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.TrashPolicy;
import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -1721,6 +1723,21 @@
key.getReplicationConfig().getReplicationType().name());
}
+ @Test
+ public void testUnbuffer() throws IOException {
+ String testKeyName = "testKey2";
+ Path path = new Path(bucketPath, testKeyName);
+ try (FSDataOutputStream stream = fs.create(path)) {
+ stream.write(1);
+ }
+
+ try (FSDataInputStream stream = fs.open(path)) {
+ assertTrue(stream.hasCapability(StreamCapabilities.UNBUFFER));
+ stream.unbuffer();
+ }
+
+ }
+
public void testNonPrivilegedUserMkdirCreateBucket() throws IOException {
// This test is only meaningful when ACL is enabled
Assume.assumeTrue("ACL is not enabled. Skipping this test as it requires " +
diff --git a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
index 99e2140..b1f0193 100644
--- a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneFileSystem.java
@@ -54,6 +54,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -194,8 +195,11 @@
statistics.incrementReadOps(1);
LOG.trace("open() path: {}", path);
final String key = pathToKey(path);
- return new FSDataInputStream(
- new OzoneFSInputStream(adapter.readFile(key), statistics));
+ return new FSDataInputStream(createFSInputStream(adapter.readFile(key)));
+ }
+
+ protected InputStream createFSInputStream(InputStream inputStream) {
+ return new OzoneFSInputStream(inputStream, statistics);
}
protected void incrementCounter(Statistic statistic) {
diff --git a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index 6d48ead..cf88703 100644
--- a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
/**
@@ -92,4 +93,9 @@
return new RootedOzoneClientAdapterImpl(omHost, omPort, conf,
storageStatistics);
}
+
+ @Override
+ protected InputStream createFSInputStream(InputStream inputStream) {
+ return new CapableOzoneFSInputStream(inputStream, statistics);
+ }
}
diff --git a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index 6d48ead..cf88703 100644
--- a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++ b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
/**
@@ -92,4 +93,9 @@
return new RootedOzoneClientAdapterImpl(omHost, omPort, conf,
storageStatistics);
}
+
+ @Override
+ protected InputStream createFSInputStream(InputStream inputStream) {
+ return new CapableOzoneFSInputStream(inputStream, statistics);
+ }
}