PARQUET-2134: Fix type checking in HadoopStreams.wrap (#951)

HadoopStreams.wrap produces a wrong H2SeekableInputStream if the
passed-in FSDataInputStream wraps another FSDataInputStream.

Since [HDFS-14111](https://issues.apache.org/jira/browse/HDFS-14111) all
input streams in the hadoop codebase which implement `ByteBufferReadable`
return true on the StreamCapabilities probe
`stream.hasCapability("in:readbytebuffer")`;
those which don't are forbidden to do so.

This means that on Hadoop 3.3.0+ the preferred way to probe for the API
is to ask the stream.

The StreamCapabilities probe was added in Hadoop 2.9. Along with
making all use of `ByteBufferReadable` non-reflective, this makes
the checks fairly straightforward.

Tests verify that if a stream implements `ByteBufferReadable' then
it will be bonded to H2SeekableInputStream, even if multiply wrapped
by FSDataInputStreams, and that if it doesn't, it won't.

Co-authored-by: Steve Loughran <stevel@cloudera.com>

Co-authored-by: Steve Loughran <stevel@cloudera.com>
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
index 2994ca8..4bbbb8e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
@@ -84,7 +84,7 @@
   }
 
   public static void readFully(Reader reader, ByteBuffer buf) throws IOException {
-    // unfortunately the Hadoop APIs seem to not have a 'readFully' equivalent for the byteBuffer read
+    // unfortunately the Hadoop 2 APIs do not have a 'readFully' equivalent for the byteBuffer read
     // calls. The read(ByteBuffer) call might read fewer than byteBuffer.hasRemaining() bytes. Thus we
     // have to loop to ensure we read them.
     while (buf.hasRemaining()) {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
index 40f12fe..bafb45a 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
@@ -19,16 +19,15 @@
 
 package org.apache.parquet.hadoop.util;
 
+import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.io.SeekableInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
+import java.io.InputStream;
 import java.util.Objects;
 
 /**
@@ -38,9 +37,6 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(HadoopStreams.class);
 
-  private static final Class<?> byteBufferReadableClass = getReadableClass();
-  static final Constructor<SeekableInputStream> h2SeekableConstructor = getH2SeekableConstructor();
-
   /**
    * Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream}
    * implementation for Parquet readers.
@@ -50,51 +46,45 @@
    */
   public static SeekableInputStream wrap(FSDataInputStream stream) {
     Objects.requireNonNull(stream, "Cannot wrap a null input stream");
-    if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
-        byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
-      try {
-        return h2SeekableConstructor.newInstance(stream);
-      } catch (InstantiationException | IllegalAccessException e) {
-        LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e);
-        return new H1SeekableInputStream(stream);
-      } catch (InvocationTargetException e) {
-        throw new ParquetDecodingException(
-            "Could not instantiate H2SeekableInputStream", e.getTargetException());
-      }
+    if (isWrappedStreamByteBufferReadable(stream)) {
+      return new H2SeekableInputStream(stream);
     } else {
       return new H1SeekableInputStream(stream);
     }
   }
 
-  private static Class<?> getReadableClass() {
-    try {
-      return Class.forName("org.apache.hadoop.fs.ByteBufferReadable");
-    } catch (ClassNotFoundException | NoClassDefFoundError e) {
-      return null;
+  /**
+   * Is the inner stream byte buffer readable?
+   * The test is "the stream is not FSDataInputStream
+   * and implements ByteBufferReadable'
+   *
+   * That is: all streams which implement ByteBufferReadable
+   * other than FSDataInputStream successfuly support read(ByteBuffer).
+   * This is true for all filesytem clients the hadoop codebase.
+   *
+   * In hadoop 3.3.0+, the StreamCapabilities probe can be used to
+   * check this: only those streams which provide the read(ByteBuffer)
+   * semantics MAY return true for the probe "in:readbytebuffer";
+   * FSDataInputStream will pass the probe down to the underlying stream.
+   *
+   * @param stream stream to probe
+   * @return true if it is safe to a H2SeekableInputStream to access the data
+   */
+  private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) {
+    if (stream.hasCapability("in:readbytebuffer")) {
+      // stream is issuing the guarantee that it implements the
+      // API. Holds for all implementations in hadoop-*
+      // since Hadoop 3.3.0 (HDFS-14111).
+      return true;
     }
+    InputStream wrapped = stream.getWrappedStream();
+    if (wrapped instanceof FSDataInputStream) {
+      LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream);
+      return isWrappedStreamByteBufferReadable(((FSDataInputStream) wrapped));
+    }
+    return wrapped instanceof ByteBufferReadable;
   }
 
-  @SuppressWarnings("unchecked")
-  private static Class<SeekableInputStream> getH2SeekableClass() {
-    try {
-      return (Class<SeekableInputStream>) Class.forName(
-          "org.apache.parquet.hadoop.util.H2SeekableInputStream");
-    } catch (ClassNotFoundException | NoClassDefFoundError e) {
-      return null;
-    }
-  }
-
-  private static Constructor<SeekableInputStream> getH2SeekableConstructor() {
-    Class<SeekableInputStream> h2SeekableClass = getH2SeekableClass();
-    if (h2SeekableClass != null) {
-      try {
-        return h2SeekableClass.getConstructor(FSDataInputStream.class);
-      } catch (NoSuchMethodException e) {
-        return null;
-      }
-    }
-    return null;
-  }
 
   /**
    * Wraps a {@link FSDataOutputStream} in a {@link PositionOutputStream}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
index 1b1e373..b514feb 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
@@ -19,8 +19,10 @@
 
 package org.apache.parquet.hadoop.util;
 
+import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.parquet.hadoop.TestUtils;
+import org.apache.parquet.io.SeekableInputStream;
 import org.junit.Assert;
 import org.junit.Test;
 import java.io.EOFException;
@@ -28,6 +30,7 @@
 import java.nio.ByteBuffer;
 import java.util.concurrent.Callable;
 
+import static org.apache.parquet.hadoop.util.HadoopStreams.wrap;
 import static org.apache.parquet.hadoop.util.MockHadoopInputStream.TEST_ARRAY;
 
 public class TestHadoop2ByteBufferReads {
@@ -396,4 +399,48 @@
     Assert.assertEquals("Buffer contents should match",
         ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
   }
+
+  @Test
+  public void testCreateStreamNoByteBufferReadable() {
+    final SeekableInputStream s = wrap(new FSDataInputStream(
+      new MockHadoopInputStream()));
+    Assert.assertTrue("Wrong wrapper: " + s,
+      s instanceof H1SeekableInputStream);
+  }
+
+  @Test
+  public void testDoubleWrapNoByteBufferReadable() {
+    final SeekableInputStream s = wrap(new FSDataInputStream(
+      new FSDataInputStream(new MockHadoopInputStream())));
+    Assert.assertTrue("Wrong wrapper: " + s,
+      s instanceof H1SeekableInputStream);
+  }
+
+  @Test
+  public void testCreateStreamWithByteBufferReadable() {
+    final SeekableInputStream s = wrap(new FSDataInputStream(
+      new MockByteBufferInputStream()));
+    Assert.assertTrue("Wrong wrapper: " + s,
+      s instanceof H2SeekableInputStream);
+  }
+
+  @Test
+  public void testDoubleWrapByteBufferReadable() {
+    final SeekableInputStream s = wrap(new FSDataInputStream(
+      new FSDataInputStream(new MockByteBufferInputStream())));
+    Assert.assertTrue("Wrong wrapper: " + s,
+      s instanceof H2SeekableInputStream);
+  }
+
+  /**
+   * Input stream which claims to implement ByteBufferReadable.
+   */
+  private static final class MockByteBufferInputStream
+    extends MockHadoopInputStream implements ByteBufferReadable {
+
+    @Override
+    public int read(final ByteBuffer buf) {
+      return 0;
+    }
+  }
 }