DRILL-7874: Ensure DrillFSDataInputStream.read populates byte array of the requested length
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
index 46afd17..7c3c913 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.dfs;
import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.shaded.guava.com.google.common.io.ByteStreams;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.PositionedReadable;
@@ -40,12 +41,12 @@
private final OpenFileTracker openFileTracker;
private final OperatorStats operatorStats;
- public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats) throws IOException {
+ public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats) {
this(in, operatorStats, null);
}
public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats,
- OpenFileTracker openFileTracker) throws IOException {
+ OpenFileTracker openFileTracker) {
super(new WrappedInputStream(in, operatorStats));
underlyingIs = in;
this.openFileTracker = openFileTracker;
@@ -213,7 +214,7 @@
public int read(byte[] b, int off, int len) throws IOException {
operatorStats.startWait();
try {
- return is.read(b, off, len);
+ return readBytes(b, off, len);
} finally {
operatorStats.stopWait();
}
@@ -223,12 +224,28 @@
public int read(byte[] b) throws IOException {
operatorStats.startWait();
try {
- return is.read(b);
+ return readBytes(b, 0, b.length);
} finally {
operatorStats.stopWait();
}
}
+ /**
+ * Reads up to {@code len} bytes of data from the input stream into an array of bytes.
+ * This method guarantees that regardless of the underlying stream implementation,
+ * the byte array will be populated with either {@code len} bytes or
+ * all available in stream bytes if they are less than {@code len}.
+ */
+ private int readBytes(byte[] b, int off, int len) throws IOException {
+ int read = ByteStreams.read(is, b, off, len);
+ if (read == 0 && len > 0) {
+ // ByteStreams.read() doesn't return -1 at EOF, but returns 0,
+ // if no bytes available in the stream
+ return -1;
+ }
+ return read;
+ }
+
@Override
public long skip(long n) throws IOException {
return is.skip(n);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index 4f72699..0da534b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.dfs;
-import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -27,8 +26,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
-import org.apache.commons.io.IOUtils;
-import org.apache.drill.common.AutoCloseables;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.util.AssertionUtil;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -788,51 +785,18 @@
/**
* Returns an InputStream from a Hadoop path. If the data is compressed, this method will return a compressed
- * InputStream depending on the codec. Note that if the results of this method are sent to a third party parser
- * that works with bytes or individual characters directly, you should use the openDecompressedInputStream method.
+ * InputStream depending on the codec.
* @param path Input file path
* @return InputStream of opened file path
* @throws IOException If the file is unreachable, unavailable or otherwise unreadable
*/
public InputStream openPossiblyCompressedStream(Path path) throws IOException {
- CompressionCodec codec = codecFactory.getCodec(path); // infers from file ext.
+ CompressionCodec codec = getCodec(path); // infers from file ext.
+ InputStream inputStream = open(path);
if (codec != null) {
- return codec.createInputStream(open(path));
- } else {
- return open(path);
+ inputStream = codec.createInputStream(inputStream);
}
- }
-
- /**
- * Returns a normal, decompressed InputStream. Some parsers, particularly those
- * that read raw bytes, generate errors when passed Hadoop ZipCompressed InputStreams.
- * This utility function wraps some of these functions so that a format plugin can be guaranteed
- * readable bytes.
- * @param path The file being read
- * @return Decompressed InputStream of the input file
- * @throws IOException If the file is unreadable or uses an unknown compression codec
- */
- public InputStream openDecompressedInputStream(Path path) throws IOException {
- CompressionCodec codec = getCodec(path);
- if (codec == null) {
- return open(path);
- } else {
- InputStream compressedStream = codec.createInputStream(open(path));
- byte[] bytes = IOUtils.toByteArray(compressedStream);
- AutoCloseables.closeSilently(compressedStream);
- return new ByteArrayInputStream(bytes);
- }
- }
-
- /**
- * There are parsers which require an uncompressed input stream to read the data
- * properly. This method helps identify whether the file being read is in fact compressed.
- * @param path The file being read
- * @return True if the file is compressed, false if not.
- */
- public boolean isCompressed(Path path) {
- CompressionCodec codec = codecFactory.getCodec(path);
- return codec != null;
+ return inputStream;
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
index befa7d9..a1d188c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
@@ -467,7 +467,7 @@
originalLength = getIntFileOrder(byteOrder, header, offset + PacketConstants.ORIGINAL_LENGTH_OFFSET);
packetLength = getIntFileOrder(byteOrder, header, offset + PacketConstants.ACTUAL_LENGTH_OFFSET);
Preconditions.checkState(originalLength <= maxLength,
- "Packet too long (%d bytes)", originalLength);
+ "Packet too long (%s bytes)", originalLength);
}
private long getTimestampMicro(final byte[] header, final boolean byteOrder, final int offset) {