PARQUET-2134: Fix type checking in HadoopStreams.wrap (#951)
HadoopStreams.wrap produces a wrong H2SeekableInputStream if the
passed-in FSDataInputStream wraps another FSDataInputStream.
Since [HDFS-14111](https://issues.apache.org/jira/browse/HDFS-14111) all
input streams in the hadoop codebase which implement `ByteBufferReadable`
return true on the StreamCapabilities probe
`stream.hasCapability("in:readbytebuffer")`;
those which don't are forbidden to do so.
This means that on Hadoop 3.3.0+ the preferred way to probe for the API
is to ask the stream.
The StreamCapabilities probe was added in Hadoop 2.9. Along with
making all use of `ByteBufferReadable` non-reflective, this makes
the checks fairly straightforward.
Tests verify that if a stream implements `ByteBufferReadable' then
it will be bonded to H2SeekableInputStream, even if multiply wrapped
by FSDataInputStreams, and that if it doesn't, it won't.
Co-authored-by: Steve Loughran <stevel@cloudera.com>
Co-authored-by: Steve Loughran <stevel@cloudera.com>
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
index 2994ca8..4bbbb8e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
@@ -84,7 +84,7 @@
}
public static void readFully(Reader reader, ByteBuffer buf) throws IOException {
- // unfortunately the Hadoop APIs seem to not have a 'readFully' equivalent for the byteBuffer read
+ // unfortunately the Hadoop 2 APIs do not have a 'readFully' equivalent for the byteBuffer read
// calls. The read(ByteBuffer) call might read fewer than byteBuffer.hasRemaining() bytes. Thus we
// have to loop to ensure we read them.
while (buf.hasRemaining()) {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
index 40f12fe..bafb45a 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
@@ -19,16 +19,15 @@
package org.apache.parquet.hadoop.util;
+import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.io.SeekableInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
+import java.io.InputStream;
import java.util.Objects;
/**
@@ -38,9 +37,6 @@
private static final Logger LOG = LoggerFactory.getLogger(HadoopStreams.class);
- private static final Class<?> byteBufferReadableClass = getReadableClass();
- static final Constructor<SeekableInputStream> h2SeekableConstructor = getH2SeekableConstructor();
-
/**
* Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream}
* implementation for Parquet readers.
@@ -50,51 +46,45 @@
*/
public static SeekableInputStream wrap(FSDataInputStream stream) {
Objects.requireNonNull(stream, "Cannot wrap a null input stream");
- if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
- byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
- try {
- return h2SeekableConstructor.newInstance(stream);
- } catch (InstantiationException | IllegalAccessException e) {
- LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e);
- return new H1SeekableInputStream(stream);
- } catch (InvocationTargetException e) {
- throw new ParquetDecodingException(
- "Could not instantiate H2SeekableInputStream", e.getTargetException());
- }
+ if (isWrappedStreamByteBufferReadable(stream)) {
+ return new H2SeekableInputStream(stream);
} else {
return new H1SeekableInputStream(stream);
}
}
- private static Class<?> getReadableClass() {
- try {
- return Class.forName("org.apache.hadoop.fs.ByteBufferReadable");
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- return null;
+ /**
+ * Is the inner stream byte buffer readable?
+ * The test is "the stream is not FSDataInputStream
+ * and implements ByteBufferReadable'
+ *
+ * That is: all streams which implement ByteBufferReadable
+ * other than FSDataInputStream successfuly support read(ByteBuffer).
+ * This is true for all filesytem clients the hadoop codebase.
+ *
+ * In hadoop 3.3.0+, the StreamCapabilities probe can be used to
+ * check this: only those streams which provide the read(ByteBuffer)
+ * semantics MAY return true for the probe "in:readbytebuffer";
+ * FSDataInputStream will pass the probe down to the underlying stream.
+ *
+ * @param stream stream to probe
+ * @return true if it is safe to a H2SeekableInputStream to access the data
+ */
+ private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) {
+ if (stream.hasCapability("in:readbytebuffer")) {
+ // stream is issuing the guarantee that it implements the
+ // API. Holds for all implementations in hadoop-*
+ // since Hadoop 3.3.0 (HDFS-14111).
+ return true;
}
+ InputStream wrapped = stream.getWrappedStream();
+ if (wrapped instanceof FSDataInputStream) {
+ LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream);
+ return isWrappedStreamByteBufferReadable(((FSDataInputStream) wrapped));
+ }
+ return wrapped instanceof ByteBufferReadable;
}
- @SuppressWarnings("unchecked")
- private static Class<SeekableInputStream> getH2SeekableClass() {
- try {
- return (Class<SeekableInputStream>) Class.forName(
- "org.apache.parquet.hadoop.util.H2SeekableInputStream");
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- return null;
- }
- }
-
- private static Constructor<SeekableInputStream> getH2SeekableConstructor() {
- Class<SeekableInputStream> h2SeekableClass = getH2SeekableClass();
- if (h2SeekableClass != null) {
- try {
- return h2SeekableClass.getConstructor(FSDataInputStream.class);
- } catch (NoSuchMethodException e) {
- return null;
- }
- }
- return null;
- }
/**
* Wraps a {@link FSDataOutputStream} in a {@link PositionOutputStream}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
index 1b1e373..b514feb 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
@@ -19,8 +19,10 @@
package org.apache.parquet.hadoop.util;
+import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.parquet.hadoop.TestUtils;
+import org.apache.parquet.io.SeekableInputStream;
import org.junit.Assert;
import org.junit.Test;
import java.io.EOFException;
@@ -28,6 +30,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
+import static org.apache.parquet.hadoop.util.HadoopStreams.wrap;
import static org.apache.parquet.hadoop.util.MockHadoopInputStream.TEST_ARRAY;
public class TestHadoop2ByteBufferReads {
@@ -396,4 +399,48 @@
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
}
+
+ @Test
+ public void testCreateStreamNoByteBufferReadable() {
+ final SeekableInputStream s = wrap(new FSDataInputStream(
+ new MockHadoopInputStream()));
+ Assert.assertTrue("Wrong wrapper: " + s,
+ s instanceof H1SeekableInputStream);
+ }
+
+ @Test
+ public void testDoubleWrapNoByteBufferReadable() {
+ final SeekableInputStream s = wrap(new FSDataInputStream(
+ new FSDataInputStream(new MockHadoopInputStream())));
+ Assert.assertTrue("Wrong wrapper: " + s,
+ s instanceof H1SeekableInputStream);
+ }
+
+ @Test
+ public void testCreateStreamWithByteBufferReadable() {
+ final SeekableInputStream s = wrap(new FSDataInputStream(
+ new MockByteBufferInputStream()));
+ Assert.assertTrue("Wrong wrapper: " + s,
+ s instanceof H2SeekableInputStream);
+ }
+
+ @Test
+ public void testDoubleWrapByteBufferReadable() {
+ final SeekableInputStream s = wrap(new FSDataInputStream(
+ new FSDataInputStream(new MockByteBufferInputStream())));
+ Assert.assertTrue("Wrong wrapper: " + s,
+ s instanceof H2SeekableInputStream);
+ }
+
+ /**
+ * Input stream which claims to implement ByteBufferReadable.
+ */
+ private static final class MockByteBufferInputStream
+ extends MockHadoopInputStream implements ByteBufferReadable {
+
+ @Override
+ public int read(final ByteBuffer buf) {
+ return 0;
+ }
+ }
}