PARQUET-2276: Bring back support for Hadoop 2.7.3 (#1084) (#1090)
* Bring back support for Hadoop 2.7.3
* Simplify the code
* Fix the naming
* Comments
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
index bafb45a..fe7b4c5 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.util.DynMethods;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +38,13 @@
private static final Logger LOG = LoggerFactory.getLogger(HadoopStreams.class);
+ private static final DynMethods.UnboundMethod hasCapabilitiesMethod =
+ new DynMethods
+ .Builder("hasCapabilities")
+ .impl(FSDataInputStream.class, "hasCapabilities", String.class)
+ .orNoop()
+ .build();
+
/**
* Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream}
* implementation for Parquet readers.
@@ -46,7 +54,39 @@
*/
public static SeekableInputStream wrap(FSDataInputStream stream) {
Objects.requireNonNull(stream, "Cannot wrap a null input stream");
- if (isWrappedStreamByteBufferReadable(stream)) {
+
+ // Try to check using hasCapabilities(str)
+ Boolean hasCapabilitiesResult = isWrappedStreamByteBufferReadable(stream);
+
+ // If it is null, then fall back to the old method
+ if (hasCapabilitiesResult != null) {
+ if (hasCapabilitiesResult) {
+ return new H2SeekableInputStream(stream);
+ } else {
+ return new H1SeekableInputStream(stream);
+ }
+ }
+
+ return unwrapByteBufferReadableLegacy(stream);
+ }
+
+ /**
+ * Is the inner stream byte buffer readable?
+ * The test is 'the stream is not FSDataInputStream
+ * and implements ByteBufferReadable'
+ *
+ * This logic is only used for Hadoop <2.9.x, and <3.x.x
+ *
+ * @param stream stream to probe
+ * @return A H2SeekableInputStream to access, or H1SeekableInputStream if the stream is not seekable
+ */
+ private static SeekableInputStream unwrapByteBufferReadableLegacy(FSDataInputStream stream) {
+ InputStream wrapped = stream.getWrappedStream();
+ if (wrapped instanceof FSDataInputStream) {
+ LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream);
+ return unwrapByteBufferReadableLegacy(((FSDataInputStream) wrapped));
+ }
+ if (stream.getWrappedStream() instanceof ByteBufferReadable) {
return new H2SeekableInputStream(stream);
} else {
return new H1SeekableInputStream(stream);
@@ -55,12 +95,12 @@
/**
* Is the inner stream byte buffer readable?
- * The test is "the stream is not FSDataInputStream
+ * The test is 'the stream is not FSDataInputStream
* and implements ByteBufferReadable'
*
* That is: all streams which implement ByteBufferReadable
- * other than FSDataInputStream successfuly support read(ByteBuffer).
- * This is true for all filesytem clients the hadoop codebase.
+ * other than FSDataInputStream successfully support read(ByteBuffer).
+ * This is true for all filesystem clients the hadoop codebase.
*
* In hadoop 3.3.0+, the StreamCapabilities probe can be used to
* check this: only those streams which provide the read(ByteBuffer)
@@ -68,10 +108,18 @@
* FSDataInputStream will pass the probe down to the underlying stream.
*
* @param stream stream to probe
- * @return true if it is safe to a H2SeekableInputStream to access the data
+ * @return true if it is safe to a H2SeekableInputStream to access
+ * the data, null when it cannot be determined because of missing hasCapabilities
*/
- private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) {
- if (stream.hasCapability("in:readbytebuffer")) {
+ private static Boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) {
+ if (hasCapabilitiesMethod.isNoop()) {
+ // When the method is not available, just return a null
+ return null;
+ }
+
+ boolean isByteBufferReadable = hasCapabilitiesMethod.invoke(stream, "in:readbytebuffer");
+
+ if (isByteBufferReadable) {
// stream is issuing the guarantee that it implements the
// API. Holds for all implementations in hadoop-*
// since Hadoop 3.3.0 (HDFS-14111).
diff --git a/pom.xml b/pom.xml
index eb568e0..0fb6555 100644
--- a/pom.xml
+++ b/pom.xml
@@ -598,7 +598,7 @@
<profile>
<id>hadoop2</id>
<properties>
- <hadoop.version>2.9.2</hadoop.version>
+ <hadoop.version>2.7.3</hadoop.version>
</properties>
</profile>