DRILL-5674: Support ZIP compression
1. Added ZipCodec implementation which can read / write single file.
2. Revisited Drill plugin formats to ensure 'openPossiblyCompressedStream' method is used in those which support compression.
3. Added unit tests.
4. General refactoring.
diff --git a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java
index 8ff62ed..7284409 100644
--- a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java
+++ b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVFormatPlugin.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.ltsv;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.FragmentContext;
@@ -34,17 +33,14 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import java.io.IOException;
import java.util.List;
public class LTSVFormatPlugin extends EasyFormatPlugin<LTSVFormatPluginConfig> {
- private static final boolean IS_COMPRESSIBLE = false;
+ private static final boolean IS_COMPRESSIBLE = true;
private static final String DEFAULT_NAME = "ltsv";
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LTSVFormatPlugin.class);
-
public LTSVFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
this(name, context, fsConf, storageConfig, new LTSVFormatPluginConfig());
}
@@ -54,7 +50,7 @@
}
@Override
- public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns, String userName) throws ExecutionSetupException {
+ public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns, String userName) {
return new LTSVRecordReader(context, fileWork.getPath(), dfs, columns);
}
@@ -75,7 +71,7 @@
}
@Override
- public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException {
+ public RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) {
throw new UnsupportedOperationException("Drill doesn't currently support writing to LTSV files.");
}
@@ -85,13 +81,12 @@
}
@Override
- public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
+ public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
throw new UnsupportedOperationException("unimplemented");
}
@Override
- public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException {
+ public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) {
throw new UnsupportedOperationException("unimplemented");
}
-
}
diff --git a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java
index cb23850..619ceb1 100644
--- a/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java
+++ b/contrib/format-ltsv/src/main/java/org/apache/drill/exec/store/ltsv/LTSVRecordReader.java
@@ -19,7 +19,6 @@
import io.netty.buffer.DrillBuf;
import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -30,11 +29,13 @@
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
@@ -46,13 +47,13 @@
public class LTSVRecordReader extends AbstractRecordReader {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LTSVRecordReader.class);
+ private static final Logger logger = LoggerFactory.getLogger(LTSVRecordReader.class);
private static final int MAX_RECORDS_PER_BATCH = 8096;
private final String inputPath;
- private final FSDataInputStream fsStream;
+ private final InputStream fsStream;
private final BufferedReader reader;
@@ -64,14 +65,14 @@
List<SchemaPath> columns) throws OutOfMemoryException {
this.inputPath = path.toUri().getPath();
try {
- this.fsStream = fileSystem.open(path);
- this.reader = new BufferedReader(new InputStreamReader(fsStream.getWrappedStream(), StandardCharsets.UTF_8));
+ this.fsStream = fileSystem.openPossiblyCompressedStream(path);
+ this.reader = new BufferedReader(new InputStreamReader(fsStream, StandardCharsets.UTF_8));
this.buffer = fragmentContext.getManagedBuffer();
setColumns(columns);
-
} catch (IOException e) {
- String msg = String.format("Failed to open input file: %s", inputPath);
- throw UserException.dataReadError(e).message(msg).build(logger);
+ throw UserException.dataReadError(e)
+ .message(String.format("Failed to open input file: %s", inputPath))
+ .build(logger);
}
}
@@ -79,16 +80,14 @@
protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projected) {
Set<SchemaPath> transformed = new LinkedHashSet<>();
if (!isStarQuery()) {
- for (SchemaPath column : projected) {
- transformed.add(column);
- }
+ transformed.addAll(projected);
} else {
transformed.add(SchemaPath.STAR_COLUMN);
}
return transformed;
}
- public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
+ public void setup(final OperatorContext context, final OutputMutator output) {
this.writer = new VectorContainerWriter(output);
}
@@ -100,7 +99,7 @@
try {
BaseWriter.MapWriter map = this.writer.rootAsMap();
- String line = null;
+ String line;
while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
// Skip empty lines
@@ -145,7 +144,9 @@
} catch (final Exception e) {
String msg = String.format("Failure while reading messages from %s. Record reader was at record: %d", inputPath, recordCount + 1);
- throw UserException.dataReadError(e).message(msg).build(logger);
+ throw UserException.dataReadError(e)
+ .message(msg)
+ .build(logger);
}
}
@@ -161,5 +162,4 @@
public void close() throws Exception {
AutoCloseables.close(reader, fsStream);
}
-
}
diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
index 6f81ac6..d21035b 100644
--- a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
+++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
@@ -17,13 +17,12 @@
*/
package org.apache.drill.exec.store.syslog;
-import java.io.IOException;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
+
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
@@ -31,14 +30,13 @@
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasyWriter;
import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
-import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
-
-
-import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import java.util.List;
+
public class SyslogFormatPlugin extends EasyFormatPlugin<SyslogFormatConfig> {
public static final String DEFAULT_NAME = "syslog";
@@ -59,7 +57,7 @@
@Override
public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
- List<SchemaPath> columns, String userName) throws ExecutionSetupException {
+ List<SchemaPath> columns, String userName) {
return new SyslogRecordReader(context, dfs, fileWork, columns, userName, formatConfig);
}
@@ -90,12 +88,12 @@
}
@Override
- public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
+ public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
throw new UnsupportedOperationException("unimplemented");
}
@Override
- public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException {
+ public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) {
throw new UnsupportedOperationException("unimplemented");
}
}
diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
index 4b2831c..0f39887 100644
--- a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
+++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
@@ -19,7 +19,6 @@
package org.apache.drill.exec.store.syslog;
import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -34,18 +33,20 @@
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
import org.realityforge.jsyslog.message.StructuredDataParameter;
import org.realityforge.jsyslog.message.SyslogMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Map;
-import java.util.Iterator;
public class SyslogRecordReader extends AbstractRecordReader {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SyslogRecordReader.class);
+ private static final Logger logger = LoggerFactory.getLogger(SyslogRecordReader.class);
private static final int MAX_RECORDS_PER_BATCH = 4096;
private final DrillFileSystem fileSystem;
@@ -54,16 +55,13 @@
private BufferedReader reader;
private DrillBuf buffer;
private VectorContainerWriter writer;
- private SyslogFormatConfig config;
- private int maxErrors;
- private boolean flattenStructuredData;
+ private final int maxErrors;
+ private final boolean flattenStructuredData;
private int errorCount;
private int lineCount;
- private List<SchemaPath> projectedColumns;
+ private final List<SchemaPath> projectedColumns;
private String line;
- private SimpleDateFormat df;
-
public SyslogRecordReader(FragmentContext context,
DrillFileSystem fileSystem,
FileWork fileWork,
@@ -74,9 +72,7 @@
this.fileSystem = fileSystem;
this.fileWork = fileWork;
this.userName = userName;
- this.config = config;
this.maxErrors = config.getMaxErrors();
- this.df = getValidDateObject("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
this.errorCount = 0;
this.buffer = context.getManagedBuffer().reallocIfNeeded(4096);
this.projectedColumns = columns;
@@ -86,7 +82,7 @@
}
@Override
- public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
+ public void setup(final OperatorContext context, final OutputMutator output) {
openFile();
this.writer = new VectorContainerWriter(output);
}
@@ -94,7 +90,7 @@
private void openFile() {
InputStream in;
try {
- in = fileSystem.open(fileWork.getPath());
+ in = fileSystem.openPossiblyCompressedStream(fileWork.getPath());
} catch (Exception e) {
throw UserException
.dataReadError(e)
@@ -115,7 +111,7 @@
try {
BaseWriter.MapWriter map = this.writer.rootAsMap();
- String line = null;
+ String line;
while (recordCount < MAX_RECORDS_PER_BATCH && (line = this.reader.readLine()) != null) {
lineCount++;
@@ -288,7 +284,7 @@
return;
}
try {
- byte[] bytes = value.getBytes("UTF-8");
+ byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
int stringLength = bytes.length;
this.buffer = buffer.reallocIfNeeded(stringLength);
this.buffer.setBytes(0, bytes, 0, stringLength);
@@ -304,18 +300,10 @@
//Helper function to flatten structured data
private void mapFlattenedStructuredData(Map<String, List<StructuredDataParameter>> data, BaseWriter.MapWriter map) {
- Iterator<Map.Entry<String, List<StructuredDataParameter>>> entries = data.entrySet().iterator();
- while (entries.hasNext()) {
- Map.Entry<String, List<StructuredDataParameter>> entry = entries.next();
-
- List<StructuredDataParameter> dataParameters = entry.getValue();
- String fieldName;
- String fieldValue;
-
- for (StructuredDataParameter parameter : dataParameters) {
- fieldName = "structured_data_" + parameter.getName();
- fieldValue = parameter.getValue();
-
+ for (Map.Entry<String, List<StructuredDataParameter>> entry : data.entrySet()) {
+ for (StructuredDataParameter parameter : entry.getValue()) {
+ String fieldName = "structured_data_" + parameter.getName();
+ String fieldValue = parameter.getValue();
mapStringField(fieldName, fieldValue, map);
}
}
@@ -323,28 +311,19 @@
//Gets field from the Structured Data Construct
private String getFieldFromStructuredData(String fieldName, SyslogMessage parsedMessage) {
- String result = null;
- Map<String, List<StructuredDataParameter>> structuredData = parsedMessage.getStructuredData();
- Iterator<Map.Entry<String, List<StructuredDataParameter>>> entries = parsedMessage.getStructuredData().entrySet().iterator();
- while (entries.hasNext()) {
- Map.Entry<String, List<StructuredDataParameter>> entry = entries.next();
- List<StructuredDataParameter> dataParameters = entry.getValue();
-
- for (StructuredDataParameter d : dataParameters) {
+ for (Map.Entry<String, List<StructuredDataParameter>> entry : parsedMessage.getStructuredData().entrySet()) {
+ for (StructuredDataParameter d : entry.getValue()) {
if (d.getName().equals(fieldName)) {
return d.getValue();
}
}
}
- return result;
+ return null;
}
//Helper function to map arrays
private void mapComplexField(String mapName, Map<String, List<StructuredDataParameter>> data, BaseWriter.MapWriter map) {
- Iterator<Map.Entry<String, List<StructuredDataParameter>>> entries = data.entrySet().iterator();
- while (entries.hasNext()) {
- Map.Entry<String, List<StructuredDataParameter>> entry = entries.next();
-
+ for (Map.Entry<String, List<StructuredDataParameter>> entry : data.entrySet()) {
List<StructuredDataParameter> dataParameters = entry.getValue();
String fieldName;
String fieldValue;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index c3470d8..b72ecee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -544,7 +544,7 @@
if (scan instanceof EnumerableTableScan) {
final Object selection = DrillRelOptUtil.getDrillTable(scan).getSelection();
if (selection instanceof FormatSelection
- && ((FormatSelection)selection).supportDirPruning()) {
+ && ((FormatSelection)selection).supportsDirPruning()) {
return true; // Do directory-based pruning in Calcite logical
} else {
return false; // Do not do directory-based pruning in Calcite logical
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
index 6fa1793..78cb4e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
@@ -22,34 +22,30 @@
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.Range;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Range;
-
public class BasicFormatMatcher extends FormatMatcher {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicFormatMatcher.class);
protected final FormatPlugin plugin;
- protected final boolean compressible;
- protected final CompressionCodecFactory codecFactory;
+ private final boolean compressible;
+ private final CompressionCodecFactory codecFactory;
private final List<Pattern> patterns;
private final MagicStringMatcher matcher;
public BasicFormatMatcher(FormatPlugin plugin, List<Pattern> patterns, List<MagicString> magicStrings) {
- super();
- this.patterns = ImmutableList.copyOf(patterns);
+ this.patterns = new ArrayList<>(patterns);
this.matcher = new MagicStringMatcher(magicStrings);
this.plugin = plugin;
this.compressible = false;
@@ -57,12 +53,10 @@
}
public BasicFormatMatcher(FormatPlugin plugin, Configuration fsConf, List<String> extensions, boolean compressible) {
- List<Pattern> patterns = Lists.newArrayList();
- for (String extension : extensions) {
- patterns.add(Pattern.compile(".*\\." + extension));
- }
- this.patterns = patterns;
- this.matcher = new MagicStringMatcher(new ArrayList<MagicString>());
+ this.patterns = extensions.stream()
+ .map(extension -> Pattern.compile(".*\\." + extension))
+ .collect(Collectors.toList());
+ this.matcher = new MagicStringMatcher(new ArrayList<>());
this.plugin = plugin;
this.compressible = compressible;
this.codecFactory = new CompressionCodecFactory(fsConf);
@@ -84,8 +78,12 @@
return null;
}
- /*
- * Function returns true if the file extension matches the pattern
+ /**
+ * Function returns true if the file extension matches the pattern.
+ *
+ * @param fs file system
+ * @param status file status
+ * @return true if file is readable, false otherwise
*/
@Override
public boolean isFileReadable(DrillFileSystem fs, FileStatus status) throws IOException {
@@ -109,10 +107,7 @@
}
}
- if (matcher.matches(fs, status)) {
- return true;
- }
- return false;
+ return matcher.matches(fs, status);
}
@Override
@@ -121,16 +116,14 @@
return plugin;
}
+ private static class MagicStringMatcher {
- private class MagicStringMatcher {
+ private final List<RangeMagics> ranges;
- private List<RangeMagics> ranges;
-
- public MagicStringMatcher(List<MagicString> magicStrings) {
- ranges = Lists.newArrayList();
- for(MagicString ms : magicStrings) {
- ranges.add(new RangeMagics(ms));
- }
+ MagicStringMatcher(List<MagicString> magicStrings) {
+ this.ranges = magicStrings.stream()
+ .map(RangeMagics::new)
+ .collect(Collectors.toList());
}
public boolean matches(DrillFileSystem fs, FileStatus status) throws IOException{
@@ -168,15 +161,15 @@
return false;
}
- private class RangeMagics{
- Range<Long> range;
- byte[][] magics;
+ private static class RangeMagics {
- public RangeMagics(MagicString ms) {
- this.range = Range.closedOpen( ms.getOffset(), (long) ms.getBytes().length);
+ private final Range<Long> range;
+ private final byte[][] magics;
+
+ RangeMagics(MagicString ms) {
+ this.range = Range.closedOpen(ms.getOffset(), (long) ms.getBytes().length);
this.magics = new byte[][]{ms.getBytes()};
}
}
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 902abda..a1fccb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -17,6 +17,16 @@
*/
package org.apache.drill.exec.store.dfs;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -26,20 +36,12 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.util.DrillFileSystemUtil;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-
/**
* Jackson serializable description of a file selection.
*/
public class FileSelection {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSelection.class);
+
+ private static final Logger logger = LoggerFactory.getLogger(FileSelection.class);
private static final String WILD_CARD = "*";
private List<FileStatus> statuses;
@@ -73,10 +75,10 @@
}
private StatusType dirStatus;
- // whether this selection previously had a wildcard
- private boolean hadWildcard = false;
- // whether all partitions were previously pruned for this selection
- private boolean wasAllPartitionsPruned = false;
+ // whether this selection previously had a wildcard, false by default
+ private boolean hadWildcard;
+ // whether all partitions were previously pruned for this selection, false by default
+ private boolean wasAllPartitionsPruned;
/**
* Creates a {@link FileSelection selection} out of given file statuses/files and selection root.
@@ -108,7 +110,7 @@
* Copy constructor for convenience.
*/
protected FileSelection(FileSelection selection) {
- Preconditions.checkNotNull(selection, "selection cannot be null");
+ Preconditions.checkNotNull(selection, "File selection cannot be null");
this.statuses = selection.statuses;
this.files = selection.files;
this.selectionRoot = selection.selectionRoot;
@@ -117,6 +119,7 @@
this.metaContext = selection.metaContext;
this.hadWildcard = selection.hadWildcard;
this.wasAllPartitionsPruned = selection.wasAllPartitionsPruned;
+ this.emptyDirectory = selection.emptyDirectory;
}
public Path getSelectionRoot() {
@@ -127,8 +130,8 @@
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
if (statuses == null) {
- List<FileStatus> newStatuses = Lists.newArrayList();
- for (Path pathStr : files) {
+ List<FileStatus> newStatuses = new ArrayList<>();
+ for (Path pathStr : Objects.requireNonNull(files, "Files can not be null if statuses are null")) {
newStatuses.add(fs.getFileStatus(pathStr));
}
statuses = newStatuses;
@@ -144,11 +147,9 @@
public List<Path> getFiles() {
if (files == null) {
- List<Path> newFiles = Lists.newArrayList();
- for (FileStatus status:statuses) {
- newFiles.add(status.getPath());
- }
- files = newFiles;
+ files = Objects.requireNonNull(statuses, "Statuses can not be null if files are null").stream()
+ .map(FileStatus::getPath)
+ .collect(Collectors.toList());
}
return files;
}
@@ -386,9 +387,8 @@
Preconditions.checkArgument(!combinedPath.isEmpty(), "Empty path (" + combinedPath + "( in file selection path.");
if (!combinedPath.startsWith(parent)) {
- StringBuilder msg = new StringBuilder();
- msg.append("Invalid path : ").append(subpath).append(" takes you outside the workspace.");
- throw new IllegalArgumentException(msg.toString());
+ throw new IllegalArgumentException(
+ String.format("Invalid path [%s] takes you outside the workspace.", subpath));
}
}
@@ -396,7 +396,7 @@
return statuses;
}
- public boolean supportDirPrunig() {
+ public boolean supportsDirPruning() {
if (isExpandedFully() || isExpandedPartial()) {
if (!wasAllPartitionsPruned) {
return true;
@@ -440,26 +440,15 @@
this.emptyDirectory = true;
}
-
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("root=").append(this.selectionRoot);
-
+ sb.append("root=").append(selectionRoot);
sb.append("files=[");
- boolean isFirst = true;
- for (Path file : this.files) {
- if (isFirst) {
- isFirst = false;
- sb.append(file);
- } else {
- sb.append(",");
- sb.append(file);
- }
- }
+ sb.append(getFiles().stream()
+ .map(Path::toString)
+ .collect(Collectors.joining(", ")));
sb.append("]");
-
return sb.toString();
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index cf1333d..8a97701 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -33,9 +34,9 @@
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.store.AbstractStoragePlugin;
@@ -43,11 +44,14 @@
import org.apache.drill.exec.store.LocalSyncableFileSystem;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet.Builder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A Storage engine associated with a Hadoop FileSystem Implementation. Examples include HDFS, MapRFS, QuantacastFileSystem,
@@ -57,7 +61,14 @@
*/
public class FileSystemPlugin extends AbstractStoragePlugin {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class);
+ private static final Logger logger = LoggerFactory.getLogger(FileSystemPlugin.class);
+
+ /**
+ * org.apache.hadoop.io.compress library supports such codecs as Gzip and Bzip2 out of box.
+ * This list stores only codecs that are missing in Hadoop library.
+ */
+ private static final List<String> ADDITIONAL_CODECS = Collections.singletonList(
+ ZipCodec.class.getCanonicalName());
private final FileSystemSchemaFactory schemaFactory;
private final FormatCreator formatCreator;
@@ -80,6 +91,8 @@
fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName());
+ addCodecs(fsConf);
+
if (isS3Connection(fsConf)) {
handleS3Credentials(fsConf);
}
@@ -111,6 +124,24 @@
}
}
+ /**
+ * Merges codecs from configuration with the {@link #ADDITIONAL_CODECS}
+ * and updates configuration property.
+ * Drill built-in codecs are added at the beginning of the codecs string
+ * so config codecs can override Drill ones.
+ *
+ * @param conf Hadoop configuration
+ */
+ private void addCodecs(Configuration conf) {
+ String confCodecs = conf.get(CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY);
+ String builtInCodecs = String.join(",", ADDITIONAL_CODECS);
+ String newCodecs = Strings.isNullOrEmpty(confCodecs)
+ ? builtInCodecs
+ : builtInCodecs + ", " + confCodecs;
+ logger.trace("Codecs: {}", newCodecs);
+ conf.set(CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY, newCodecs);
+ }
+
private boolean isS3Connection(Configuration conf) {
URI uri = FileSystem.getDefaultUri(conf);
return uri.getScheme().equals("s3a");
@@ -131,7 +162,7 @@
for (String key : credentialKeys) {
char[] credentialChars = conf.getPassword(key);
if (credentialChars == null) {
- logger.warn(String.format("Property '%s' is absent.", key));
+ logger.warn("Property '{}' is absent.", key);
} else {
conf.set(key, String.valueOf(credentialChars));
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
index 7d7bcfa..d2a5545 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
@@ -17,18 +17,15 @@
*/
package org.apache.drill.exec.store.dfs;
-import java.util.List;
-
-import org.apache.drill.common.logical.FormatPluginConfig;
-
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.hadoop.fs.Path;
+import java.util.List;
public class FormatSelection {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatSelection.class);
private FormatPluginConfig format;
private FileSelection selection;
@@ -62,7 +59,7 @@
}
@JsonIgnore
- public boolean supportDirPruning() {
- return selection.supportDirPrunig();
+ public boolean supportsDirPruning() {
+ return selection.supportsDirPruning();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ZipCodec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ZipCodec.java
new file mode 100644
index 0000000..c613076
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ZipCodec.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.DefaultCodec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+
+/**
+ * ZIP codec implementation which cna read or create single entry.
+ * <p/>
+ * Note: Do not rename this class. Class naming must be 'ZipCodec' so it can be mapped by
+ * {@link org.apache.hadoop.io.compress.CompressionCodecFactory} to the 'zip' extension.
+ */
+public class ZipCodec extends DefaultCodec {
+
+ private static final String EXTENSION = ".zip";
+
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
+ return new ZipCompressionOutputStream(new ResetableZipOutputStream(out));
+ }
+
+ @Override
+ public CompressionInputStream createInputStream(InputStream in) throws IOException {
+ return new ZipCompressionInputStream(new ZipInputStream(in));
+ }
+
+ @Override
+ public String getDefaultExtension() {
+ return EXTENSION;
+ }
+
+ /**
+ * Reads only first entry from {@link ZipInputStream},
+ * other entries if present will be ignored.
+ */
+ private static class ZipCompressionInputStream extends CompressionInputStream {
+
+ ZipCompressionInputStream(ZipInputStream in) throws IOException {
+ super(in);
+ // positions stream at the beginning of the first entry data
+ in.getNextEntry();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public void resetState() throws IOException {
+ in.reset();
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ ((ZipInputStream) in).closeEntry();
+ } finally {
+ super.close();
+ }
+ }
+ }
+
+ /**
+ * Extends {@link ZipOutputStream} to allow resetting compressor stream,
+ * required by {@link CompressionOutputStream} implementation.
+ */
+ private static class ResetableZipOutputStream extends ZipOutputStream {
+
+ ResetableZipOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ void resetState() {
+ def.reset();
+ }
+ }
+
+ /**
+ * Writes given data into ZIP archive by placing all data in one entry with default naming.
+ */
+ private static class ZipCompressionOutputStream extends CompressionOutputStream {
+
+ private static final String DEFAULT_ENTRY_NAME = "entry.out";
+
+ ZipCompressionOutputStream(ResetableZipOutputStream out) throws IOException {
+ super(out);
+ ZipEntry zipEntry = new ZipEntry(DEFAULT_ENTRY_NAME);
+ out.putNextEntry(zipEntry);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void finish() throws IOException {
+ ((ResetableZipOutputStream) out).closeEntry();
+ }
+
+ @Override
+ public void resetState() {
+ ((ResetableZipOutputStream) out).resetState();
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
index 6d8b533..0ed71db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogBatchReader.java
@@ -178,7 +178,7 @@
private void openFile(FileSchemaNegotiator negotiator) {
InputStream in;
try {
- in = negotiator.fileSystem().open(split.getPath());
+ in = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
} catch (Exception e) {
throw UserException
.dataReadError(e)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 985fb20..9129fc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -21,6 +21,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -59,13 +61,11 @@
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MagicString;
import org.apache.drill.exec.store.dfs.MetadataContext;
-import org.apache.drill.exec.store.mock.MockStorageEngine;
import org.apache.drill.exec.store.parquet.metadata.Metadata;
import org.apache.drill.exec.store.parquet.metadata.ParquetTableMetadataDirs;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -73,19 +73,21 @@
import org.apache.hadoop.fs.Path;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ParquetFormatPlugin implements FormatPlugin {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
+ private static final Logger logger = LoggerFactory.getLogger(ParquetFormatPlugin.class);
public static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
private static final String DEFAULT_NAME = "parquet";
- private static final List<Pattern> PATTERNS = Lists.newArrayList(
+ private static final List<Pattern> PATTERNS = Arrays.asList(
Pattern.compile(".*\\.parquet$"),
Pattern.compile(".*/" + ParquetFileWriter.PARQUET_METADATA_FILE));
- private static final List<MagicString> MAGIC_STRINGS = Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC));
+ private static final List<MagicString> MAGIC_STRINGS = Collections.singletonList(new MagicString(0, ParquetFileWriter.MAGIC));
private final DrillbitContext context;
private final Configuration fsConf;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
index bc86916..c9caa24 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
@@ -28,15 +28,13 @@
import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
import org.apache.drill.exec.store.pcap.schema.Schema;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.mapred.FileSplit;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-
-import org.apache.hadoop.fs.Path;
+import java.io.InputStream;
import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
@@ -48,15 +46,9 @@
private FileSplit split;
- private PcapReaderConfig readerConfig;
-
private PacketDecoder decoder;
- private ResultSetLoader loader;
-
- private FSDataInputStream fsStream;
-
- private Schema pcapSchema;
+ private InputStream fsStream;
private RowSetLoader rowWriter;
@@ -135,7 +127,6 @@
}
public PcapBatchReader(PcapReaderConfig readerConfig) {
- this.readerConfig = readerConfig;
}
@Override
@@ -143,10 +134,10 @@
split = negotiator.split();
openFile(negotiator);
SchemaBuilder builder = new SchemaBuilder();
- pcapSchema = new Schema();
+ Schema pcapSchema = new Schema();
TupleMetadata schema = pcapSchema.buildSchema(builder);
negotiator.setTableSchema(schema, false);
- loader = negotiator.build();
+ ResultSetLoader loader = negotiator.build();
// Creates writers for all fields (Since schema is known)
rowWriter = loader.writer();
@@ -208,14 +199,13 @@
.build(logger);
}
fsStream = null;
- this.buffer = null;
- this.decoder = null;
+ buffer = null;
+ decoder = null;
}
private void openFile(FileSchemaNegotiator negotiator) {
try {
- String filePath = split.getPath().toString();
- fsStream = negotiator.fileSystem().open(new Path(filePath));
+ fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
decoder = new PacketDecoder(fsStream);
buffer = new byte[BUFFER_SIZE + decoder.getMaxLength()];
validBytes = fsStream.read(buffer);
@@ -237,9 +227,6 @@
getNextPacket(rowWriter);
}
- if (packet == null) {
- return false;
- }
int old = offset;
offset = decoder.decodePacket(buffer, offset, packet, decoder.getMaxLength(), validBytes);
if (offset > validBytes) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
index 1678196..b8e0175 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatPlugin.java
@@ -25,7 +25,6 @@
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.server.DrillbitContext;
@@ -34,9 +33,11 @@
import org.apache.drill.exec.store.pcap.PcapBatchReader.PcapReaderConfig;
public class PcapFormatPlugin extends EasyFormatPlugin<PcapFormatConfig> {
+
public static final String PLUGIN_NAME = "pcap";
private static class PcapReaderFactory extends FileReaderFactory {
+
private final PcapReaderConfig readerConfig;
public PcapReaderFactory(PcapReaderConfig config) {
@@ -48,6 +49,7 @@
return new PcapBatchReader(readerConfig);
}
}
+
public PcapFormatPlugin(String name, DrillbitContext context,
Configuration fsConf, StoragePluginConfig storageConfig,
PcapFormatConfig formatConfig) {
@@ -58,7 +60,7 @@
EasyFormatConfig config = new EasyFormatConfig();
config.readable = true;
config.writable = false;
- config.blockSplittable = true;
+ config.blockSplittable = false;
config.compressible = true;
config.supportsProjectPushdown = true;
config.extensions = pluginConfig.getExtensions();
@@ -70,13 +72,12 @@
}
@Override
- public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
- EasySubScan scan, OptionManager options) throws ExecutionSetupException {
+ public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options) {
return new PcapBatchReader(new PcapReaderConfig(this));
}
@Override
- protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) throws ExecutionSetupException {
+ protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
FileScanBuilder builder = new FileScanBuilder();
builder.setReaderFactory(new PcapReaderFactory(new PcapReaderConfig(this)));
initScanBuilder(builder, scan);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
index de9a558..41be760 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
@@ -47,7 +47,7 @@
public PcapngFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, PcapngFormatConfig formatPluginConfig) {
super(name, context, fsConf, config, formatPluginConfig, true,
- false, true, false,
+ false, false, true,
formatPluginConfig.getExtensions(), DEFAULT_NAME);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
index 0ad234d..152e2e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
@@ -31,18 +31,18 @@
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.pcapng.schema.Column;
import org.apache.drill.exec.store.pcapng.schema.DummyArrayImpl;
import org.apache.drill.exec.store.pcapng.schema.DummyImpl;
import org.apache.drill.exec.store.pcapng.schema.Schema;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -59,14 +59,14 @@
private final Path pathToFile;
private OutputMutator output;
private List<ProjectedColumnInfo> projectedCols;
- private FileSystem fs;
- private FSDataInputStream in;
+ private DrillFileSystem fs;
+ private InputStream in;
private List<SchemaPath> columns;
private Iterator<IPcapngType> it;
public PcapngRecordReader(final Path pathToFile,
- final FileSystem fileSystem,
+ final DrillFileSystem fileSystem,
final List<SchemaPath> columns) {
this.fs = fileSystem;
this.pathToFile = fs.makeQualified(pathToFile);
@@ -79,7 +79,7 @@
try {
this.output = output;
- this.in = fs.open(pathToFile);
+ this.in = fs.openPossiblyCompressedStream(pathToFile);
PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
decoder.decode();
this.it = decoder.getSectionList().iterator();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java
index dafeaa3..493a3b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/package-info.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
/**
- * For comments on realization of this format plugin look at :
+ * For comments on implementation of this format plugin see:
*
* @see <a href="https://issues.apache.org/jira/browse/DRILL-6179"> Jira</a>
*/
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestCompressedFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestCompressedFiles.java
new file mode 100644
index 0000000..fd1bf9e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestCompressedFiles.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.exec.ExecTest;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import static org.junit.Assert.assertNotNull;
+
+@Category(UnlikelyTest.class)
+public class TestCompressedFiles extends ClusterTest {
+
+ private static FileSystem fs;
+ private static CompressionCodecFactory factory;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ startCluster(builder);
+
+ fs = ExecTest.getLocalFileSystem();
+ Configuration conf = fs.getConf();
+ conf.set(CommonConfigurationKeys.IO_COMPRESSION_CODECS_KEY, ZipCodec.class.getCanonicalName());
+ factory = new CompressionCodecFactory(conf);
+ }
+
+ @Test
+ public void testGzip() throws Exception {
+ String fileName = "gz_data.csvh.gz";
+ writeData(fileName, "gzip", "id,name\n1,Fred\n2,Wilma");
+
+ testBuilder()
+ .sqlQuery("select * from dfs.`root`.`%s`", fileName)
+ .unOrdered()
+ .baselineColumns("id", "name")
+ .baselineValues("1", "Fred")
+ .baselineValues("2", "Wilma")
+ .go();
+ }
+
+ @Test
+ public void testBzip2() throws Exception {
+ String fileName = "bzip2_data.csvh.bz2";
+ writeData(fileName, "bzip2", "id,name\n3,Bamm-Bamm\n4,Barney");
+
+ testBuilder()
+ .sqlQuery("select * from dfs.`root`.`%s`", fileName)
+ .unOrdered()
+ .baselineColumns("id", "name")
+ .baselineValues("3", "Bamm-Bamm")
+ .baselineValues("4", "Barney")
+ .go();
+ }
+
+ @Test
+ public void testZip() throws Exception {
+ String fileName = "zip_data.csvh.zip";
+ writeData(fileName, "zip", "id,name\n5,Dino\n6,Pebbles");
+
+ testBuilder()
+ .sqlQuery("select * from dfs.`root`.`%s`", fileName)
+ .unOrdered()
+ .baselineColumns("id", "name")
+ .baselineValues("5", "Dino")
+ .baselineValues("6", "Pebbles")
+ .go();
+ }
+
+ private void writeData(String fileName, String codecName, String data) throws IOException {
+ CompressionCodec codec = factory.getCodecByName(codecName);
+ assertNotNull(codecName + " is not found", codec);
+ Path outFile = new Path(dirTestWatcher.getRootDir().getAbsolutePath(), fileName);
+ try (InputStream inputStream = new ByteArrayInputStream(data.getBytes());
+ OutputStream outputStream = codec.createOutputStream(fs.create(outFile))) {
+ IOUtils.copyBytes(inputStream, outputStream, fs.getConf(), false);
+ }
+ }
+}