DRILL-7834: Add Utility Functions for Compressed Files
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index 9380e41..18a6211 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.dfs;
+import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@@ -26,6 +27,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.io.IOUtils;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.util.AssertionUtil;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -783,6 +785,14 @@
underlyingFs.removeXAttr(path, name);
}
+ /**
+ * Returns an InputStream from a Hadoop path. If the data is compressed, this method will return a compressed
+ * InputStream depending on the codec. Note that if the results of this method are sent to a third party parser
+ * that works with bytes or individual characters directly, you should use the openDecompressedInputStream method.
+ * @param path Input file path
+ * @return InputStream of opened file path
+ * @throws IOException If the file is unreachable, unavailable or otherwise unreadable
+ */
public InputStream openPossiblyCompressedStream(Path path) throws IOException {
CompressionCodec codec = codecFactory.getCodec(path); // infers from file ext.
if (codec != null) {
@@ -791,6 +801,49 @@
return open(path);
}
}
+
+ /**
+ * Returns a normal, decompressed InputStream. Some parsers, particularly those
+ * that read raw bytes, generate errors when passed Hadoop ZipCompressed InputStreams.
+ * This utility function wraps some of these functions so that a format plugin can be guaranteed
+ * readable bytes.
+ * @param path The file being read
+ * @return Decompressed InputStream of the input file
+ * @throws IOException If the file is unreadable or uses an unknown compression codec
+ */
+ public InputStream openDecompressedInputStream(Path path) throws IOException {
+ CompressionCodec codec = getCodec(path);
+ if (codec == null) {
+ return open(path);
+ } else {
+ InputStream compressedStream = codec.createInputStream(open(path));
+ byte[] bytes = IOUtils.toByteArray(compressedStream);
+ return new ByteArrayInputStream(bytes);
+ }
+ }
+
+ /**
+ * There are parsers which require an uncompressed input stream to read the data
+ * properly. This method helps identify whether the file being read is in fact compressed.
+ * @param path The file being read
+ * @return True if the file is compressed, false if not.
+ */
+ public boolean isCompressed(Path path) {
+ CompressionCodec codec = codecFactory.getCodec(path);
+ return codec != null;
+ }
+
+ /**
+ * Returns the {@link org.apache.hadoop.io.compress.CompressionCodec} for a given file. This
+ * can be used to determine the type of compression (if any) which was used. Returns null if the
+ * file is not compressed.
+ * @param path The file of unknown compression
+ * @return CompressionCodec used by the file. Null if the file is not compressed.
+ */
+ public CompressionCodec getCodec(Path path) {
+ return codecFactory.getCodec(path);
+ }
+
@Override
public void fileOpened(Path path, DrillFSDataInputStream fsDataInputStream) {
openedFiles.put(fsDataInputStream, new DebugStackTrace(path, Thread.currentThread().getStackTrace()));