DRILL-8310: Convert Syslog Format to EVF V2 (#2653)
diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogBatchReader.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogBatchReader.java
index 4573753..f5284ac 100644
--- a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogBatchReader.java
+++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogBatchReader.java
@@ -23,8 +23,9 @@
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -34,7 +35,6 @@
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
-import org.apache.hadoop.mapred.FileSplit;
import org.realityforge.jsyslog.message.StructuredDataParameter;
import org.realityforge.jsyslog.message.SyslogMessage;
import org.slf4j.Logger;
@@ -50,13 +50,11 @@
import java.util.List;
import java.util.Map;
-public class SyslogBatchReader implements ManagedReader<FileSchemaNegotiator> {
+public class SyslogBatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(SyslogBatchReader.class);
private final String STRUCTURED_DATA_PREFIX = "structured_data_";
private final String STRUCTURED_DATA_MAP_NAME = "structured_data";
private final String RAW_COLUMN_NAME = "_raw";
-
- private final int maxRecords;
private final SyslogFormatConfig config;
private final EasySubScan subScan;
private final Map<String, MinorType> mappedColumns = new LinkedHashMap<>();
@@ -64,7 +62,7 @@
private int errorCount;
private CustomErrorContext errorContext;
private InputStream fsStream;
- private FileSplit split;
+ private final FileDescrip file;
private BufferedReader reader;
private RowSetLoader rowWriter;
private List<ScalarWriter> writerArray;
@@ -73,16 +71,12 @@
private TupleWriter structuredDataWriter;
- public SyslogBatchReader(int maxRecords, SyslogFormatConfig config, EasySubScan scan) {
- this.maxRecords = maxRecords;
+ public SyslogBatchReader(SyslogFormatConfig config, EasySubScan scan, FileSchemaNegotiator negotiator) {
this.config = config;
this.subScan = scan;
populateMappedColumns();
- }
- @Override
- public boolean open(FileSchemaNegotiator negotiator) {
- split = negotiator.split();
+ file = negotiator.file();
openFile(negotiator);
negotiator.tableSchema(buildSchema(), false);
errorContext = negotiator.parentErrorContext();
@@ -92,7 +86,6 @@
writerArray = populateRowWriters();
rawColumnWriter = rowWriter.scalar(RAW_COLUMN_NAME);
messageWriter = rowWriter.scalar("message");
- return true;
}
@Override
@@ -120,11 +113,11 @@
private void openFile(FileSchemaNegotiator negotiator) {
try {
- fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ fsStream = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
} catch (IOException e) {
throw UserException
.dataReadError(e)
- .message("Unable to open Syslog File %s", split.getPath())
+ .message("Unable to open Syslog File %s", file.split().getPath())
.addContext(e.getMessage())
.addContext(errorContext)
.build(logger);
@@ -179,11 +172,6 @@
}
private boolean processNextLine() {
- // Check to see if the limit has been reached
- if (rowWriter.limitReached(maxRecords)) {
- return false;
- }
-
String line;
try {
line = reader.readLine();
diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
index c8af232..3e52f46 100644
--- a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
+++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
@@ -21,15 +21,13 @@
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
import org.apache.hadoop.conf.Configuration;
public class SyslogFormatPlugin extends EasyFormatPlugin<SyslogFormatConfig> {
@@ -37,20 +35,17 @@
public static final String DEFAULT_NAME = "syslog";
private static class SyslogReaderFactory extends FileReaderFactory {
-
- private final int maxRecords;
private final SyslogFormatConfig formatConfig;
private final EasySubScan scan;
- public SyslogReaderFactory(int maxRecords, SyslogFormatConfig formatConfig, EasySubScan scan) {
- this.maxRecords = maxRecords;
+ public SyslogReaderFactory(SyslogFormatConfig formatConfig, EasySubScan scan) {
this.formatConfig = formatConfig;
this.scan = scan;
}
@Override
- public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- return new SyslogBatchReader(maxRecords, formatConfig, scan);
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+ return new SyslogBatchReader(formatConfig, scan, negotiator);
}
}
@@ -70,23 +65,14 @@
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
- .scanVersion(ScanFrameworkVersion.EVF_V1)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
.build();
}
@Override
- public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options) {
- return new SyslogBatchReader(scan.getMaxRecords(), formatConfig, scan);
- }
-
- @Override
- protected FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
- FileScanBuilder builder = new FileScanBuilder();
- builder.setReaderFactory(new SyslogReaderFactory(scan.getMaxRecords(), formatConfig, scan));
-
- initScanBuilder(builder, scan);
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
- return builder;
+ builder.readerFactory(new SyslogReaderFactory(formatConfig, scan));
}
}