DRILL-5674: Support ZIP compression

1. Added ZipCodec implementation which can read / write single file.
2. Revisited Drill plugin formats to ensure 'openPossiblyCompressedStream' method is used in those which support compression.
3. Added unit tests.
4. General refactoring.
diff --git a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java
index 8ff62ed..7284409 100644
--- a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java
+++ b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.ltsv;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -34,17 +33,14 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import java.io.IOException;
 import java.util.List;
 
 public class LTSVFormatPlugin extends EasyFormatPlugin<LTSVFormatPluginConfig> {
 
-  private static final boolean IS_COMPRESSIBLE = false;
+  private static final boolean IS_COMPRESSIBLE = true;
 
   private static final String DEFAULT_NAME = "ltsv";
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LTSVFormatPlugin.class);
-
   public LTSVFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
     this(name, context, fsConf, storageConfig, new LTSVFormatPluginConfig());
   }
@@ -54,7 +50,7 @@
   }
 
   @Override
-  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns, String userName) throws ExecutionSetupException {
+  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns, String userName) {
     return new LTSVRecordReader(context, fileWork.getPath(), dfs, columns);
   }
 
@@ -75,7 +71,7 @@
   }
 
   @Override
-  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
+  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) {
     throw new UnsupportedOperationException("Drill doesn't currently support writing to LTSV files.");
   }
 
@@ -85,13 +81,12 @@
   }
 
   @Override
-  public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
+  public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
     throw new UnsupportedOperationException("unimplemented");
   }
 
   @Override
-  public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException {
+  public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) {
     throw new UnsupportedOperationException("unimplemented");
   }
-
 }
diff --git a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java
index cb23850..619ceb1 100644
--- a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java
+++ b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java
@@ -19,7 +19,6 @@
 
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -30,11 +29,13 @@
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
@@ -46,13 +47,13 @@
 
 public class LTSVRecordReader extends AbstractRecordReader {
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LTSVRecordReader.class);
+  private static final Logger logger = LoggerFactory.getLogger(LTSVRecordReader.class);
 
   private static final int MAX_RECORDS_PER_BATCH = 8096;
 
   private final String inputPath;
 
-  private final FSDataInputStream fsStream;
+  private final InputStream fsStream;
 
   private final BufferedReader reader;
 
@@ -64,14 +65,14 @@
                           List<SchemaPath> columns) throws OutOfMemoryException {
     this.inputPath = path.toUri().getPath();
     try {
-      this.fsStream = fileSystem.open(path);
-      this.reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), StandardCharsets.UTF_8));
+      this.fsStream = fileSystem.openPossiblyCompressedStream(path);
+      this.reader = new BufferedReader(new InputStreamReader(fsStream, StandardCharsets.UTF_8));
       this.buffer = fragmentContext.getManagedBuffer();
       setColumns(columns);
-
     } catch (IOException e) {
-      String msg = String.format("Failed to open input file: %s", inputPath);
-      throw UserException.dataReadError(e).message(msg).build(logger);
+      throw UserException.dataReadError(e)
+        .message(String.format("Failed to open input file: %s", inputPath))
+        .build(logger);
     }
   }
 
@@ -79,16 +80,14 @@
   protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projected) {
     Set<SchemaPath> transformed = new LinkedHashSet<>();
     if (!isStarQuery()) {
-      for (SchemaPath column : projected) {
-        transformed.add(column);
-      }
+      transformed.addAll(projected);
     } else {
       transformed.add(SchemaPath.STAR_COLUMN);
     }
     return transformed;
   }
 
-  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
+  public void setup(final OperatorContext context, final OutputMutator output) {
     this.writer = new VectorContainerWriter(output);
   }
 
@@ -100,7 +99,7 @@
 
     try {
       BaseWriter.MapWriter map = this.writer.rootAsMap();
-      String line = null;
+      String line;
 
       while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
         // Skip empty lines
@@ -145,7 +144,9 @@
 
     } catch (final Exception e) {
       String msg = String.format("Failure while reading messages from %s. Record reader was at record: %d", inputPath, recordCount + 1);
-      throw UserException.dataReadError(e).message(msg).build(logger);
+      throw UserException.dataReadError(e)
+        .message(msg)
+        .build(logger);
     }
   }
 
@@ -161,5 +162,4 @@
   public void close() throws Exception {
     AutoCloseables.close(reader, fsStream);
   }
-
 }
diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
index 6f81ac6..d21035b 100644
--- a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
+++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
@@ -17,13 +17,12 @@
  */
 
 package org.apache.drill.exec.store.syslog;
-import java.io.IOException;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
+
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
@@ -31,14 +30,13 @@
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
-
-
-import java.util.List;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.util.List;
+
 public class SyslogFormatPlugin extends EasyFormatPlugin<SyslogFormatConfig> {
 
   public static final String DEFAULT_NAME = "syslog";
@@ -59,7 +57,7 @@
 
   @Override
   public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
-                                      List<SchemaPath> columns, String userName) throws ExecutionSetupException {
+                                      List<SchemaPath> columns, String userName) {
     return new SyslogRecordReader(context, dfs, fileWork, columns, userName, formatConfig);
   }
 
@@ -90,12 +88,12 @@
   }
 
   @Override
-  public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
+  public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
     throw new UnsupportedOperationException("unimplemented");
   }
 
   @Override
-  public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException {
+  public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) {
     throw new UnsupportedOperationException("unimplemented");
   }
 }
diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
index 4b2831c..0f39887 100644
--- a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
+++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
@@ -19,7 +19,6 @@
 package org.apache.drill.exec.store.syslog;
 
 import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -34,18 +33,20 @@
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 import org.realityforge.jsyslog.message.StructuredDataParameter;
 import org.realityforge.jsyslog.message.SyslogMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
 import java.util.List;
 import java.util.Map;
-import java.util.Iterator;
 
 public class SyslogRecordReader extends AbstractRecordReader {
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SyslogRecordReader.class);
+  private static final Logger logger = LoggerFactory.getLogger(SyslogRecordReader.class);
   private static final int MAX_RECORDS_PER_BATCH = 4096;
 
   private final DrillFileSystem fileSystem;
@@ -54,16 +55,13 @@
   private BufferedReader reader;
   private DrillBuf buffer;
   private VectorContainerWriter writer;
-  private SyslogFormatConfig config;
-  private int maxErrors;
-  private boolean flattenStructuredData;
+  private final int maxErrors;
+  private final boolean flattenStructuredData;
   private int errorCount;
   private int lineCount;
-  private List<SchemaPath> projectedColumns;
+  private final List<SchemaPath> projectedColumns;
   private String line;
 
-  private SimpleDateFormat df;
-
   public SyslogRecordReader(FragmentContext context,
                             DrillFileSystem fileSystem,
                             FileWork fileWork,
@@ -74,9 +72,7 @@
     this.fileSystem = fileSystem;
     this.fileWork = fileWork;
     this.userName = userName;
-    this.config = config;
     this.maxErrors = config.getMaxErrors();
-    this.df = getValidDateObject("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
     this.errorCount = 0;
     this.buffer = context.getManagedBuffer().reallocIfNeeded(4096);
     this.projectedColumns = columns;
@@ -86,7 +82,7 @@
   }
 
   @Override
-  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
+  public void setup(final OperatorContext context, final OutputMutator output) {
     openFile();
     this.writer = new VectorContainerWriter(output);
   }
@@ -94,7 +90,7 @@
   private void openFile() {
     InputStream in;
     try {
-      in = fileSystem.open(fileWork.getPath());
+      in = fileSystem.openPossiblyCompressedStream(fileWork.getPath());
     } catch (Exception e) {
       throw UserException
               .dataReadError(e)
@@ -115,7 +111,7 @@
 
     try {
       BaseWriter.MapWriter map = this.writer.rootAsMap();
-      String line = null;
+      String line;
 
       while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
         lineCount++;
@@ -288,7 +284,7 @@
       return;
     }
     try {
-      byte[] bytes = value.getBytes("UTF-8");
+      byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
       int stringLength = bytes.length;
       this.buffer = buffer.reallocIfNeeded(stringLength);
       this.buffer.setBytes(0, bytes, 0, stringLength);
@@ -304,18 +300,10 @@
 
   //Helper function to flatten structured data
   private void mapFlattenedStructuredData(Map<String, List<StructuredDataParameter>> data, BaseWriter.MapWriter map) {
-    Iterator<Map.Entry<String, List<StructuredDataParameter>>> entries = data.entrySet().iterator();
-    while (entries.hasNext()) {
-      Map.Entry<String, List<StructuredDataParameter>> entry = entries.next();
-
-      List<StructuredDataParameter> dataParameters = entry.getValue();
-      String fieldName;
-      String fieldValue;
-
-      for (StructuredDataParameter parameter : dataParameters) {
-        fieldName = "structured_data_" + parameter.getName();
-        fieldValue = parameter.getValue();
-
+    for (Map.Entry<String, List<StructuredDataParameter>> entry : data.entrySet()) {
+      for (StructuredDataParameter parameter : entry.getValue()) {
+        String fieldName = "structured_data_" + parameter.getName();
+        String fieldValue = parameter.getValue();
         mapStringField(fieldName, fieldValue, map);
       }
     }
@@ -323,28 +311,19 @@
 
   //Gets field from the Structured Data Construct
   private String getFieldFromStructuredData(String fieldName, SyslogMessage parsedMessage) {
-    String result = null;
-    Map<String, List<StructuredDataParameter>> structuredData = parsedMessage.getStructuredData();
-    Iterator<Map.Entry<String, List<StructuredDataParameter>>> entries = parsedMessage.getStructuredData().entrySet().iterator();
-    while (entries.hasNext()) {
-      Map.Entry<String, List<StructuredDataParameter>> entry = entries.next();
-      List<StructuredDataParameter> dataParameters = entry.getValue();
-
-      for (StructuredDataParameter d : dataParameters) {
+    for (Map.Entry<String, List<StructuredDataParameter>> entry : parsedMessage.getStructuredData().entrySet()) {
+      for (StructuredDataParameter d : entry.getValue()) {
         if (d.getName().equals(fieldName)) {
           return d.getValue();
         }
       }
     }
-    return result;
+    return null;
   }
 
   //Helper function to map arrays
   private void mapComplexField(String mapName, Map<String, List<StructuredDataParameter>> data, BaseWriter.MapWriter map) {
-    Iterator<Map.Entry<String, List<StructuredDataParameter>>> entries = data.entrySet().iterator();
-    while (entries.hasNext()) {
-      Map.Entry<String, List<StructuredDataParameter>> entry = entries.next();
-
+    for (Map.Entry<String, List<StructuredDataParameter>> entry : data.entrySet()) {
       List<StructuredDataParameter> dataParameters = entry.getValue();
       String fieldName;
       String fieldValue;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index c3470d8..b72ecee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -544,7 +544,7 @@
     if (scan instanceof EnumerableTableScan) {
       final Object selection = DrillRelOptUtil.getDrillTable(scan).getSelection();
       if (selection instanceof FormatSelection
-          && ((FormatSelection)selection).supportDirPruning()) {
+          && ((FormatSelection)selection).supportsDirPruning()) {
         return true;  // Do directory-based pruning in Calcite logical
       } else {
         return false; // Do not do directory-based pruning in Calcite logical
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
index 6fa1793..78cb4e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
@@ -22,34 +22,30 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.logical.DynamicDrillTable;
 import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.Range;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Range;
-
 public class BasicFormatMatcher extends FormatMatcher {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicFormatMatcher.class);
 
   protected final FormatPlugin plugin;
-  protected final boolean compressible;
-  protected final CompressionCodecFactory codecFactory;
 
+  private final boolean compressible;
+  private final CompressionCodecFactory codecFactory;
   private final List<Pattern> patterns;
   private final MagicStringMatcher matcher;
 
   public BasicFormatMatcher(FormatPlugin plugin, List<Pattern> patterns, List<MagicString> magicStrings) {
-    super();
-    this.patterns = ImmutableList.copyOf(patterns);
+    this.patterns = new ArrayList<>(patterns);
     this.matcher = new MagicStringMatcher(magicStrings);
     this.plugin = plugin;
     this.compressible = false;
@@ -57,12 +53,10 @@
   }
 
   public BasicFormatMatcher(FormatPlugin plugin, Configuration fsConf, List<String> extensions, boolean compressible) {
-    List<Pattern> patterns = Lists.newArrayList();
-    for (String extension : extensions) {
-      patterns.add(Pattern.compile(".*\\." + extension));
-    }
-    this.patterns = patterns;
-    this.matcher = new MagicStringMatcher(new ArrayList<MagicString>());
+    this.patterns = extensions.stream()
+      .map(extension -> Pattern.compile(".*\\." + extension))
+      .collect(Collectors.toList());
+    this.matcher = new MagicStringMatcher(new ArrayList<>());
     this.plugin = plugin;
     this.compressible = compressible;
     this.codecFactory = new CompressionCodecFactory(fsConf);
@@ -84,8 +78,12 @@
     return null;
   }
 
-  /*
-   * Function returns true if the file extension matches the pattern
+  /**
+   * Function returns true if the file extension matches the pattern.
+   *
+   * @param fs     file system
+   * @param status file status
+   * @return true if file is readable, false otherwise
    */
   @Override
   public boolean isFileReadable(DrillFileSystem fs, FileStatus status) throws IOException {
@@ -109,10 +107,7 @@
       }
     }
 
-    if (matcher.matches(fs, status)) {
-      return true;
-    }
-    return false;
+    return matcher.matches(fs, status);
   }
 
   @Override
@@ -121,16 +116,14 @@
     return plugin;
   }
 
+  private static class MagicStringMatcher {
 
-  private class MagicStringMatcher {
+    private final List<RangeMagics> ranges;
 
-    private List<RangeMagics> ranges;
-
-    public MagicStringMatcher(List<MagicString> magicStrings) {
-      ranges = Lists.newArrayList();
-      for(MagicString ms : magicStrings) {
-        ranges.add(new RangeMagics(ms));
-      }
+    MagicStringMatcher(List<MagicString> magicStrings) {
+      this.ranges = magicStrings.stream()
+        .map(RangeMagics::new)
+        .collect(Collectors.toList());
     }
 
     public boolean matches(DrillFileSystem fs, FileStatus status) throws IOException{
@@ -168,15 +161,15 @@
       return false;
     }
 
-    private class RangeMagics{
-      Range<Long> range;
-      byte[][] magics;
+    private static class RangeMagics {
 
-      public RangeMagics(MagicString ms) {
-        this.range = Range.closedOpen( ms.getOffset(), (long) ms.getBytes().length);
+      private final Range<Long> range;
+      private final byte[][] magics;
+
+      RangeMagics(MagicString ms) {
+        this.range = Range.closedOpen(ms.getOffset(), (long) ms.getBytes().length);
         this.magics = new byte[][]{ms.getBytes()};
       }
     }
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 902abda..a1fccb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -17,6 +17,16 @@
  */
 package org.apache.drill.exec.store.dfs;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -26,20 +36,12 @@
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.util.DrillFileSystemUtil;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-
 /**
  * Jackson serializable description of a file selection.
  */
 public class FileSelection {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSelection.class);
+
+  private static final Logger logger = LoggerFactory.getLogger(FileSelection.class);
   private static final String WILD_CARD = "*";
 
   private List<FileStatus> statuses;
@@ -73,10 +75,10 @@
   }
 
   private StatusType dirStatus;
-  // whether this selection previously had a wildcard
-  private boolean hadWildcard = false;
-  // whether all partitions were previously pruned for this selection
-  private boolean wasAllPartitionsPruned = false;
+  // whether this selection previously had a wildcard, false by default
+  private boolean hadWildcard;
+  // whether all partitions were previously pruned for this selection, false by default
+  private boolean wasAllPartitionsPruned;
 
   /**
    * Creates a {@link FileSelection selection} out of given file statuses/files and selection root.
@@ -108,7 +110,7 @@
    * Copy constructor for convenience.
    */
   protected FileSelection(FileSelection selection) {
-    Preconditions.checkNotNull(selection, "selection cannot be null");
+    Preconditions.checkNotNull(selection, "File selection cannot be null");
     this.statuses = selection.statuses;
     this.files = selection.files;
     this.selectionRoot = selection.selectionRoot;
@@ -117,6 +119,7 @@
     this.metaContext = selection.metaContext;
     this.hadWildcard = selection.hadWildcard;
     this.wasAllPartitionsPruned = selection.wasAllPartitionsPruned;
+    this.emptyDirectory = selection.emptyDirectory;
   }
 
   public Path getSelectionRoot() {
@@ -127,8 +130,8 @@
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
 
     if (statuses == null)  {
-      List<FileStatus> newStatuses = Lists.newArrayList();
-      for (Path pathStr : files) {
+      List<FileStatus> newStatuses = new ArrayList<>();
+      for (Path pathStr : Objects.requireNonNull(files, "Files can not be null if statuses are null")) {
         newStatuses.add(fs.getFileStatus(pathStr));
       }
       statuses = newStatuses;
@@ -144,11 +147,9 @@
 
   public List<Path> getFiles() {
     if (files == null) {
-      List<Path> newFiles = Lists.newArrayList();
-      for (FileStatus status:statuses) {
-        newFiles.add(status.getPath());
-      }
-      files = newFiles;
+      files = Objects.requireNonNull(statuses, "Statuses can not be null if files are null").stream()
+        .map(FileStatus::getPath)
+        .collect(Collectors.toList());
     }
     return files;
   }
@@ -386,9 +387,8 @@
     Preconditions.checkArgument(!combinedPath.isEmpty(), "Empty path (" + combinedPath + "( in file selection path.");
 
     if (!combinedPath.startsWith(parent)) {
-      StringBuilder msg = new StringBuilder();
-      msg.append("Invalid path : ").append(subpath).append(" takes you outside the workspace.");
-      throw new IllegalArgumentException(msg.toString());
+      throw new IllegalArgumentException(
+        String.format("Invalid path [%s] takes you outside the workspace.", subpath));
     }
   }
 
@@ -396,7 +396,7 @@
     return statuses;
   }
 
-  public boolean supportDirPrunig() {
+  public boolean supportsDirPruning() {
     if (isExpandedFully() || isExpandedPartial()) {
       if (!wasAllPartitionsPruned) {
         return true;
@@ -440,26 +440,15 @@
     this.emptyDirectory = true;
   }
 
-
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append("root=").append(this.selectionRoot);
-
+    sb.append("root=").append(selectionRoot);
     sb.append("files=[");
-    boolean isFirst = true;
-    for (Path file : this.files) {
-      if (isFirst) {
-        isFirst = false;
-        sb.append(file);
-      } else {
-        sb.append(",");
-        sb.append(file);
-      }
-    }
+    sb.append(getFiles().stream()
+      .map(Path::toString)
+      .collect(Collectors.joining(", ")));
     sb.append("]");
-
     return sb.toString();
   }
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index cf1333d..8a97701 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -33,9 +34,9 @@
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.metastore.MetadataProviderManager;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -43,11 +44,14 @@
 import org.apache.drill.exec.store.LocalSyncableFileSystem;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet.Builder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A Storage engine associated with a Hadoop FileSystem Implementation. Examples include HDFS, MapRFS, QuantacastFileSystem,
@@ -57,7 +61,14 @@
  */
 public class FileSystemPlugin extends AbstractStoragePlugin {
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class);
+  private static final Logger logger = LoggerFactory.getLogger(FileSystemPlugin.class);
+
+  /**
+   * org.apache.hadoop.io.compress library supports such codecs as Gzip and Bzip2 out of box.
+   * This list stores only codecs that are missing in Hadoop library.
+   */
+  private static final List<String> ADDITIONAL_CODECS = Collections.singletonList(
+    ZipCodec.class.getCanonicalName());
 
   private final FileSystemSchemaFactory schemaFactory;
   private final FormatCreator formatCreator;
@@ -80,6 +91,8 @@
       fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
       fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName());
 
+      addCodecs(fsConf);
+
       if (isS3Connection(fsConf)) {
         handleS3Credentials(fsConf);
       }
@@ -111,6 +124,24 @@
     }
   }
 
+  /**
+   * Merges codecs from configuration with the {@link #ADDITIONAL_CODECS}
+   * and updates configuration property.
+   * Drill built-in codecs are added at the beginning of the codecs string
+   * so config codecs can override Drill ones.
+   *
+   * @param conf Hadoop configuration
+   */
+  private void addCodecs(Configuration conf) {
+    String confCodecs = conf.get(CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY);
+    String builtInCodecs = String.join(",", ADDITIONAL_CODECS);
+    String newCodecs = Strings.isNullOrEmpty(confCodecs)
+      ? builtInCodecs
+      : builtInCodecs + ", " + confCodecs;
+    logger.trace("Codecs: {}", newCodecs);
+    conf.set(CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY, newCodecs);
+  }
+
   private boolean isS3Connection(Configuration conf) {
     URI uri = FileSystem.getDefaultUri(conf);
     return uri.getScheme().equals("s3a");
@@ -131,7 +162,7 @@
     for (String key : credentialKeys) {
       char[] credentialChars = conf.getPassword(key);
       if (credentialChars == null) {
-        logger.warn(String.format("Property '%s' is absent.", key));
+        logger.warn("Property '{}' is absent.", key);
       } else {
         conf.set(key, String.valueOf(credentialChars));
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
index 7d7bcfa..d2a5545 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
@@ -17,18 +17,15 @@
  */
 package org.apache.drill.exec.store.dfs;
 
-import java.util.List;
-
-import org.apache.drill.common.logical.FormatPluginConfig;
-
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.hadoop.fs.Path;
 
+import java.util.List;
 
 public class FormatSelection {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatSelection.class);
 
   private FormatPluginConfig format;
   private FileSelection selection;
@@ -62,7 +59,7 @@
   }
 
   @JsonIgnore
-  public boolean supportDirPruning() {
-    return selection.supportDirPrunig();
+  public boolean supportsDirPruning() {
+    return selection.supportsDirPruning();
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ZipCodec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ZipCodec.java
new file mode 100644
index 0000000..c613076
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ZipCodec.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.DefaultCodec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+
+/**
+ * ZIP codec implementation which cna read or create single entry.
+ * <p/>
+ * Note: Do not rename this class. Class naming must be 'ZipCodec' so it can be mapped by
+ * {@link org.apache.hadoop.io.compress.CompressionCodecFactory} to the 'zip' extension.
+ */
+public class ZipCodec extends DefaultCodec {
+
+  private static final String EXTENSION = ".zip";
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
+    return new ZipCompressionOutputStream(new ResetableZipOutputStream(out));
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream in) throws IOException {
+    return new ZipCompressionInputStream(new ZipInputStream(in));
+  }
+
+  @Override
+  public String getDefaultExtension() {
+    return EXTENSION;
+  }
+
+  /**
+   * Reads only first entry from {@link ZipInputStream},
+   * other entries if present will be ignored.
+   */
+  private static class ZipCompressionInputStream extends CompressionInputStream {
+
+    ZipCompressionInputStream(ZipInputStream in) throws IOException {
+      super(in);
+      // positions stream at the beginning of the first entry data
+      in.getNextEntry();
+    }
+
+    @Override
+    public int read() throws IOException {
+      return in.read();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      return in.read(b, off, len);
+    }
+
+    @Override
+    public void resetState() throws IOException {
+      in.reset();
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        ((ZipInputStream) in).closeEntry();
+      } finally {
+        super.close();
+      }
+    }
+  }
+
+  /**
+   * Extends {@link ZipOutputStream} to allow resetting compressor stream,
+   * required by {@link CompressionOutputStream} implementation.
+   */
+  private static class ResetableZipOutputStream extends ZipOutputStream {
+
+    ResetableZipOutputStream(OutputStream out) {
+      super(out);
+    }
+
+    void resetState() {
+      def.reset();
+    }
+  }
+
+  /**
+   * Writes given data into ZIP archive by placing all data in one entry with default naming.
+   */
+  private static class ZipCompressionOutputStream extends CompressionOutputStream {
+
+    private static final String DEFAULT_ENTRY_NAME = "entry.out";
+
+    ZipCompressionOutputStream(ResetableZipOutputStream out) throws IOException {
+      super(out);
+      ZipEntry zipEntry = new ZipEntry(DEFAULT_ENTRY_NAME);
+      out.putNextEntry(zipEntry);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      out.write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      out.write(b, off, len);
+    }
+
+    @Override
+    public void finish() throws IOException {
+      ((ResetableZipOutputStream) out).closeEntry();
+    }
+
+    @Override
+    public void resetState() {
+      ((ResetableZipOutputStream) out).resetState();
+    }
+  }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
index 6d8b533..0ed71db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
@@ -178,7 +178,7 @@
   private void openFile(FileSchemaNegotiator negotiator) {
     InputStream in;
     try {
-      in = negotiator.fileSystem().open(split.getPath());
+      in = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
     } catch (Exception e) {
       throw UserException
           .dataReadError(e)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 985fb20..9129fc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -21,6 +21,8 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -59,13 +61,11 @@
 import org.apache.drill.exec.store.dfs.FormatSelection;
 import org.apache.drill.exec.store.dfs.MagicString;
 import org.apache.drill.exec.store.dfs.MetadataContext;
-import org.apache.drill.exec.store.mock.MockStorageEngine;
 import org.apache.drill.exec.store.parquet.metadata.Metadata;
 import org.apache.drill.exec.store.parquet.metadata.ParquetTableMetadataDirs;
 import org.apache.drill.exec.util.DrillFileSystemUtil;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -73,19 +73,21 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ParquetFormatPlugin implements FormatPlugin {
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
+  private static final Logger logger = LoggerFactory.getLogger(ParquetFormatPlugin.class);
 
   public static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
 
   private static final String DEFAULT_NAME = "parquet";
 
-  private static final List<Pattern> PATTERNS = Lists.newArrayList(
+  private static final List<Pattern> PATTERNS = Arrays.asList(
       Pattern.compile(".*\\.parquet$"),
       Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE));
-  private static final List<MagicString> MAGIC_STRINGS = Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC));
+  private static final List<MagicString> MAGIC_STRINGS = Collections.singletonList(new MagicString(0, ParquetFileWriter.MAGIC));
 
   private final DrillbitContext context;
   private final Configuration fsConf;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
index bc86916..c9caa24 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
@@ -28,15 +28,13 @@
 import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
 import org.apache.drill.exec.store.pcap.schema.Schema;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.mapred.FileSplit;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
+import java.io.InputStream;
 
 import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
 
@@ -48,15 +46,9 @@
 
   private FileSplit split;
 
-  private PcapReaderConfig readerConfig;
-
   private PacketDecoder decoder;
 
-  private ResultSetLoader loader;
-
-  private FSDataInputStream fsStream;
-
-  private Schema pcapSchema;
+  private InputStream fsStream;
 
   private RowSetLoader rowWriter;
 
@@ -135,7 +127,6 @@
   }
 
   public PcapBatchReader(PcapReaderConfig readerConfig) {
-    this.readerConfig = readerConfig;
   }
 
   @Override
@@ -143,10 +134,10 @@
     split = negotiator.split();
     openFile(negotiator);
     SchemaBuilder builder = new SchemaBuilder();
-    pcapSchema = new Schema();
+    Schema pcapSchema = new Schema();
     TupleMetadata schema = pcapSchema.buildSchema(builder);
     negotiator.setTableSchema(schema, false);
-    loader = negotiator.build();
+    ResultSetLoader loader = negotiator.build();
 
     // Creates writers for all fields (Since schema is known)
     rowWriter = loader.writer();
@@ -208,14 +199,13 @@
         .build(logger);
     }
     fsStream = null;
-    this.buffer = null;
-    this.decoder = null;
+    buffer = null;
+    decoder = null;
   }
 
   private void openFile(FileSchemaNegotiator negotiator) {
     try {
-      String filePath = split.getPath().toString();
-      fsStream = negotiator.fileSystem().open(new Path(filePath));
+      fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
       decoder = new PacketDecoder(fsStream);
       buffer = new byte[BUFFER_SIZE + decoder.getMaxLength()];
       validBytes = fsStream.read(buffer);
@@ -237,9 +227,6 @@
       getNextPacket(rowWriter);
     }
 
-    if (packet == null) {
-      return false;
-    }
     int old = offset;
     offset = decoder.decodePacket(buffer, offset, packet, decoder.getMaxLength(), validBytes);
     if (offset > validBytes) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
index 1678196..b8e0175 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
@@ -25,7 +25,6 @@
 import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -34,9 +33,11 @@
 import org.apache.drill.exec.store.pcap.PcapBatchReader.PcapReaderConfig;
 
 public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> {
+
   public static final String PLUGIN_NAME = "pcap";
 
   private static class PcapReaderFactory extends FileReaderFactory {
+
     private final PcapReaderConfig readerConfig;
 
     public PcapReaderFactory(PcapReaderConfig config) {
@@ -48,6 +49,7 @@
       return new PcapBatchReader(readerConfig);
     }
   }
+
   public PcapFormatPlugin(String name, DrillbitContext context,
                            Configuration fsConf, StoragePluginConfig storageConfig,
                            PcapFormatConfig formatConfig) {
@@ -58,7 +60,7 @@
     EasyFormatConfig config = new EasyFormatConfig();
     config.readable = true;
     config.writable = false;
-    config.blockSplittable = true;
+    config.blockSplittable = false;
     config.compressible = true;
     config.supportsProjectPushdown = true;
     config.extensions = pluginConfig.getExtensions();
@@ -70,13 +72,12 @@
   }
 
   @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
-    EasySubScan scan, OptionManager options) throws ExecutionSetupException {
+  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) {
     return new PcapBatchReader(new PcapReaderConfig(this));
   }
 
   @Override
-  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) throws ExecutionSetupException {
+  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
     FileScanBuilder builder = new FileScanBuilder();
     builder.setReaderFactory(new PcapReaderFactory(new PcapReaderConfig(this)));
     initScanBuilder(builder, scan);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
index de9a558..41be760 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
@@ -47,7 +47,7 @@
 
   public PcapngFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, PcapngFormatConfig formatPluginConfig) {
     super(name, context, fsConf, config, formatPluginConfig, true,
-        false, true, false,
+        false, false, true,
         formatPluginConfig.getExtensions(), DEFAULT_NAME);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
index 0ad234d..152e2e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
@@ -31,18 +31,18 @@
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.pcapng.schema.Column;
 import org.apache.drill.exec.store.pcapng.schema.DummyArrayImpl;
 import org.apache.drill.exec.store.pcapng.schema.DummyImpl;
 import org.apache.drill.exec.store.pcapng.schema.Schema;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -59,14 +59,14 @@
   private final Path pathToFile;
   private OutputMutator output;
   private List<ProjectedColumnInfo> projectedCols;
-  private FileSystem fs;
-  private FSDataInputStream in;
+  private DrillFileSystem fs;
+  private InputStream in;
   private List<SchemaPath> columns;
 
   private Iterator<IPcapngType> it;
 
   public PcapngRecordReader(final Path pathToFile,
-                            final FileSystem fileSystem,
+                            final DrillFileSystem fileSystem,
                             final List<SchemaPath> columns) {
     this.fs = fileSystem;
     this.pathToFile = fs.makeQualified(pathToFile);
@@ -79,7 +79,7 @@
     try {
 
       this.output = output;
-      this.in = fs.open(pathToFile);
+      this.in = fs.openPossiblyCompressedStream(pathToFile);
       PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
       decoder.decode();
       this.it = decoder.getSectionList().iterator();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java
index dafeaa3..493a3b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 /**
- * For comments on realization of this format plugin look at :
+ * For comments on implementation of this format plugin see:
  *
  * @see <a href="https://issues.apache.org/jira/browse/DRILL-6179"> Jira</a>
  */
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestCompressedFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestCompressedFiles.java
new file mode 100644
index 0000000..fd1bf9e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestCompressedFiles.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.exec.ExecTest;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.junit.Assert.assertNotNull;
+
+@Category(UnlikelyTest.class)
+public class TestCompressedFiles extends ClusterTest {
+
+  private static FileSystem fs;
+  private static CompressionCodecFactory factory;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    startCluster(builder);
+
+    fs = ExecTest.getLocalFileSystem();
+    Configuration conf = fs.getConf();
+    conf.set(CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY, ZipCodec.class.getCanonicalName());
+    factory = new CompressionCodecFactory(conf);
+  }
+
+  @Test
+  public void testGzip() throws Exception {
+    String fileName = "gz_data.csvh.gz";
+    writeData(fileName, "gzip", "id,name\n1,Fred\n2,Wilma");
+
+    testBuilder()
+      .sqlQuery("select * from dfs.`root`.`%s`", fileName)
+      .unOrdered()
+      .baselineColumns("id", "name")
+      .baselineValues("1", "Fred")
+      .baselineValues("2", "Wilma")
+      .go();
+  }
+
+  @Test
+  public void testBzip2() throws Exception {
+    String fileName = "bzip2_data.csvh.bz2";
+    writeData(fileName, "bzip2", "id,name\n3,Bamm-Bamm\n4,Barney");
+
+    testBuilder()
+      .sqlQuery("select * from dfs.`root`.`%s`", fileName)
+      .unOrdered()
+      .baselineColumns("id", "name")
+      .baselineValues("3", "Bamm-Bamm")
+      .baselineValues("4", "Barney")
+      .go();
+  }
+
+  @Test
+  public void testZip() throws Exception {
+    String fileName = "zip_data.csvh.zip";
+    writeData(fileName, "zip", "id,name\n5,Dino\n6,Pebbles");
+
+    testBuilder()
+      .sqlQuery("select * from dfs.`root`.`%s`", fileName)
+      .unOrdered()
+      .baselineColumns("id", "name")
+      .baselineValues("5", "Dino")
+      .baselineValues("6", "Pebbles")
+      .go();
+  }
+
+  private void writeData(String fileName, String codecName, String data) throws IOException {
+    CompressionCodec codec = factory.getCodecByName(codecName);
+    assertNotNull(codecName + " is not found", codec);
+    Path outFile = new Path(dirTestWatcher.getRootDir().getAbsolutePath(), fileName);
+    try (InputStream inputStream = new ByteArrayInputStream(data.getBytes());
+         OutputStream outputStream = codec.createOutputStream(fs.create(outFile))) {
+      IOUtils.copyBytes(inputStream, outputStream, fs.getConf(), false);
+    }
+  }
+}