DRILL-8350: Convert PCAP Format Plugin to EVF2 (#2698)
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
index fd7fef9..a09c3e3 100644
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java
@@ -18,8 +18,9 @@
package org.apache.drill.exec.store.pcap;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -30,7 +31,6 @@
import org.apache.drill.exec.store.pcap.schema.Schema;
import org.apache.drill.exec.store.pcap.plugin.PcapFormatConfig;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
-import org.apache.hadoop.mapred.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,13 +42,10 @@
import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
-public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> {
-
+public class PcapBatchReader implements ManagedReader {
protected static final int BUFFER_SIZE = 500_000;
-
private static final Logger logger = LoggerFactory.getLogger(PcapBatchReader.class);
-
- private FileSplit split;
+ private final FileDescrip file;
private PacketDecoder decoder;
private InputStream fsStream;
private RowSetLoader rowWriter;
@@ -97,22 +94,17 @@
private ScalarWriter remoteDataVolumeWriter;
private ScalarWriter hostDataWriter;
private ScalarWriter remoteDataWriter;
- private final int maxRecords;
private Map<Long, TcpSession> sessionQueue;
- public PcapBatchReader(PcapFormatConfig readerConfig, int maxRecords) {
+ public PcapBatchReader(PcapFormatConfig readerConfig, FileSchemaNegotiator negotiator) {
this.readerConfig = readerConfig;
if (readerConfig.getSessionizeTCPStreams()) {
sessionQueue = new HashMap<>();
}
- this.maxRecords = maxRecords;
- }
- @Override
- public boolean open(FileSchemaNegotiator negotiator) {
- split = negotiator.split();
- openFile(negotiator);
+ file = negotiator.file();
+ openFile();
SchemaBuilder builder = new SchemaBuilder();
Schema pcapSchema = new Schema(readerConfig.getSessionizeTCPStreams());
TupleMetadata schema = pcapSchema.buildSchema(builder);
@@ -122,8 +114,6 @@
// Creates writers for all fields (Since schema is known)
rowWriter = loader.writer();
populateColumnWriters(rowWriter);
-
- return true;
}
@Override
@@ -159,16 +149,16 @@
decoder = null;
}
- private void openFile(FileSchemaNegotiator negotiator) {
+ private void openFile() {
try {
- fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ fsStream = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
decoder = new PacketDecoder(fsStream);
buffer = new byte[BUFFER_SIZE + decoder.getMaxLength()];
validBytes = fsStream.read(buffer);
} catch (IOException io) {
throw UserException
.dataReadError(io)
- .addContext("File name:", split.getPath().toString())
+ .addContext("File name:", file.split().getPath().toString())
.build(logger);
}
}
@@ -234,12 +224,6 @@
}
private boolean parseNextPacket(RowSetLoader rowWriter) {
-
- // Push down limit
- if (rowWriter.limitReached(maxRecords)) {
- return false;
- }
-
// Decode the packet
Packet packet = new Packet();
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
index 87b7c95..fe24907 100644
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcap/plugin/BasePcapFormatPlugin.java
@@ -19,19 +19,17 @@
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedScanFramework;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
import org.apache.drill.exec.store.pcap.PcapBatchReader;
import org.apache.drill.exec.store.pcap.decoder.PacketDecoder;
import org.apache.drill.exec.store.pcapng.PcapngBatchReader;
@@ -64,7 +62,7 @@
.compressible(true)
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
- .scanVersion(ScanFrameworkVersion.EVF_V1)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
.supportsProjectPushdown(true)
.defaultName(PcapFormatConfig.NAME)
@@ -87,8 +85,9 @@
* @return PCAP or PCAPNG batch reader
*/
@Override
- public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- if (fileFramework().isPresent()) { // todo: can be simplified with java9 ifPresentOrElse
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+ if (negotiator.file().fileSystem() != null) { // todo: can be simplified with java9
+ // ifPresentOrElse
Path path = scan.getWorkUnits().stream()
.findFirst()
.orElseThrow(() -> UserException.
@@ -96,7 +95,7 @@
.addContext("There are no files for scanning")
.build(logger))
.getPath();
- fileFormat = getFileFormat(fileFramework().get().fileSystem(), path);
+ fileFormat = getFileFormat(negotiator.file().fileSystem(), path);
if (config.getExtensions().stream()
.noneMatch(f -> f.equals(fileFormat.name().toLowerCase()))) {
logger.error("File format {} is not within plugin extensions: {}. Trying to use default PCAP format plugin to " +
@@ -106,32 +105,23 @@
logger.error("It is not possible to detect file format, because the File Framework is not initialized. " +
"Trying to use default PCAP format plugin to read the file");
}
- return createReader(scan, config);
+ return createReader(scan, config, negotiator);
}
}
- @Override
- public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options) {
- return createReader(scan, formatConfig);
- }
-
- private static ManagedReader<? extends FileSchemaNegotiator> createReader(EasySubScan scan, PcapFormatConfig config) {
+ private static ManagedReader createReader(EasySubScan scan, PcapFormatConfig config,
+ FileSchemaNegotiator negotiator) {
switch(fileFormat) {
- case PCAPNG: return new PcapngBatchReader(config, scan);
+ case PCAPNG: return new PcapngBatchReader(config, scan, negotiator);
case PCAP:
case UNKNOWN:
- default: return new PcapBatchReader(config, scan.getMaxRecords());
+ default: return new PcapBatchReader(config, negotiator);
}
}
-
@Override
- protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
- FileScanBuilder builder = new FileScanBuilder();
- builder.setReaderFactory(new PcapReaderFactory(formatConfig, scan));
-
- initScanBuilder(builder, scan);
- builder.nullType(Types.optional(MinorType.VARCHAR));
- return builder;
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
+ builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+ builder.readerFactory(new PcapReaderFactory(formatConfig, scan));
}
/**
diff --git a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
index e8367c5..2e8d0aa 100644
--- a/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
+++ b/contrib/format-pcapng/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
@@ -32,8 +32,9 @@
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -51,13 +52,12 @@
import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
-public class PcapngBatchReader implements ManagedReader<FileSchemaNegotiator> {
-
+public class PcapngBatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(PcapngBatchReader.class);
-
private final PcapFormatConfig config;
private final EasySubScan scan;
- private final int maxRecords;
+ private final FileDescrip file;
+
private CustomErrorContext errorContext;
private List<SchemaPath> columns;
private List<ColumnDefn> projectedColumns;
@@ -67,20 +67,17 @@
private InputStream in;
private Path path;
- public PcapngBatchReader(final PcapFormatConfig config, final EasySubScan scan) {
+ public PcapngBatchReader(final PcapFormatConfig config, final EasySubScan scan,
+ FileSchemaNegotiator negotiator) {
this.config = config;
this.scan = scan;
- this.maxRecords = scan.getMaxRecords();
this.columns = scan.getColumns();
- }
-
- @Override
- public boolean open(FileSchemaNegotiator negotiator) {
+ this.file = negotiator.file();
try {
// init InputStream for pcap file
errorContext = negotiator.parentErrorContext();
- DrillFileSystem dfs = negotiator.fileSystem();
- path = dfs.makeQualified(negotiator.split().getPath());
+ DrillFileSystem dfs = file.fileSystem();
+ path = dfs.makeQualified(file.split().getPath());
in = dfs.openPossiblyCompressedStream(path);
// decode the pcap file
PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
@@ -106,7 +103,6 @@
loader = resultSetLoader.writer();
// bind the writer for columns
bindColumns(loader);
- return true;
}
/**
@@ -135,9 +131,6 @@
continue;
}
processBlock();
- if (loader.limitReached(maxRecords)) {
- return false;
- }
}
return true;
}
@@ -273,4 +266,4 @@
writer.setString(value);
}
}
-}
\ No newline at end of file
+}