PARQUET-2276: Bring back support for Hadoop 2.7.3 (#1084) (#1090)

* Bring back support for Hadoop 2.7.3

* Simplify the code

* Fix the naming

* Comments
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
index bafb45a..fe7b4c5 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
@@ -24,6 +24,7 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.parquet.io.PositionOutputStream;
 import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.util.DynMethods;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +38,13 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(HadoopStreams.class);
 
+  private static final DynMethods.UnboundMethod hasCapabilitiesMethod =
+    new DynMethods
+      .Builder("hasCapabilities")
+      .impl(FSDataInputStream.class, "hasCapabilities", String.class)
+      .orNoop()
+      .build();
+
   /**
    * Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream}
    * implementation for Parquet readers.
@@ -46,7 +54,39 @@
    */
   public static SeekableInputStream wrap(FSDataInputStream stream) {
     Objects.requireNonNull(stream, "Cannot wrap a null input stream");
-    if (isWrappedStreamByteBufferReadable(stream)) {
+
+    // Try to check using hasCapabilities(str)
+    Boolean hasCapabilitiesResult = isWrappedStreamByteBufferReadable(stream);
+
+    // If it is null, then fall back to the old method
+    if (hasCapabilitiesResult != null) {
+      if (hasCapabilitiesResult) {
+        return new H2SeekableInputStream(stream);
+      } else {
+        return new H1SeekableInputStream(stream);
+      }
+    }
+
+    return unwrapByteBufferReadableLegacy(stream);
+  }
+
+  /**
+   * Is the inner stream byte buffer readable?
+   * The test is 'the stream is not FSDataInputStream
+   * and implements ByteBufferReadable'
+   *
+   * This logic is only used for Hadoop <2.9.x, and <3.x.x
+   *
+   * @param stream stream to probe
+   * @return A H2SeekableInputStream to access, or H1SeekableInputStream if the stream is not seekable
+   */
+  private static SeekableInputStream unwrapByteBufferReadableLegacy(FSDataInputStream stream) {
+    InputStream wrapped = stream.getWrappedStream();
+    if (wrapped instanceof FSDataInputStream) {
+      LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream);
+      return unwrapByteBufferReadableLegacy(((FSDataInputStream) wrapped));
+    }
+    if (stream.getWrappedStream() instanceof ByteBufferReadable) {
       return new H2SeekableInputStream(stream);
     } else {
       return new H1SeekableInputStream(stream);
@@ -55,12 +95,12 @@
 
   /**
    * Is the inner stream byte buffer readable?
-   * The test is "the stream is not FSDataInputStream
+   * The test is 'the stream is not FSDataInputStream
    * and implements ByteBufferReadable'
    *
    * That is: all streams which implement ByteBufferReadable
-   * other than FSDataInputStream successfuly support read(ByteBuffer).
-   * This is true for all filesytem clients the hadoop codebase.
+   * other than FSDataInputStream successfully support read(ByteBuffer).
+   * This is true for all filesystem clients the hadoop codebase.
    *
    * In hadoop 3.3.0+, the StreamCapabilities probe can be used to
    * check this: only those streams which provide the read(ByteBuffer)
@@ -68,10 +108,18 @@
    * FSDataInputStream will pass the probe down to the underlying stream.
    *
    * @param stream stream to probe
-   * @return true if it is safe to a H2SeekableInputStream to access the data
+   * @return true if it is safe to a H2SeekableInputStream to access
+   * the data, null when it cannot be determined because of missing hasCapabilities
    */
-  private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) {
-    if (stream.hasCapability("in:readbytebuffer")) {
+  private static Boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) {
+    if (hasCapabilitiesMethod.isNoop()) {
+      // When the method is not available, just return a null
+      return null;
+    }
+
+    boolean isByteBufferReadable = hasCapabilitiesMethod.invoke(stream, "in:readbytebuffer");
+
+    if (isByteBufferReadable) {
       // stream is issuing the guarantee that it implements the
       // API. Holds for all implementations in hadoop-*
       // since Hadoop 3.3.0 (HDFS-14111).
diff --git a/pom.xml b/pom.xml
index eb568e0..0fb6555 100644
--- a/pom.xml
+++ b/pom.xml
@@ -598,7 +598,7 @@
     <profile>
       <id>hadoop2</id>
       <properties>
-        <hadoop.version>2.9.2</hadoop.version>
+        <hadoop.version>2.7.3</hadoop.version>
       </properties>
     </profile>