DRILL-8330: Convert ESRI Shape File Reader to EVF2 (#2670)
diff --git a/contrib/format-esri/pom.xml b/contrib/format-esri/pom.xml
index cccabbe..84c445a 100644
--- a/contrib/format-esri/pom.xml
+++ b/contrib/format-esri/pom.xml
@@ -39,7 +39,7 @@
<dependency>
<groupId>com.esri.geometry</groupId>
<artifactId>esri-geometry-api</artifactId>
- <version>2.2.3</version>
+ <version>2.2.4</version>
</dependency>
<dependency>
<groupId>org.jamel.dbf</groupId>
diff --git a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java
index 5a7325e..49d5775 100644
--- a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java
+++ b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpBatchReader.java
@@ -24,8 +24,9 @@
import com.esri.core.geometry.ogc.OGCGeometry;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -34,7 +35,6 @@
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
import org.jamel.dbf.DbfReader;
import org.jamel.dbf.structure.DbfField;
import org.slf4j.Logger;
@@ -48,7 +48,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-public class ShpBatchReader implements ManagedReader<FileSchemaNegotiator> {
+public class ShpBatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(ShpBatchReader.class);
private static final String GID_FIELD_NAME = "gid";
@@ -56,36 +56,28 @@
private static final String SHAPE_TYPE_FIELD_NAME = "shapeType";
private static final String GEOM_FIELD_NAME = "geom";
private static final String SRID_PATTERN_TEXT = "AUTHORITY\\[\"\\w+\"\\s*,\\s*\"*(\\d+)\"*\\]\\]$";
-
- private FileSplit split;
- private ResultSetLoader loader;
- private Path hadoopShp;
- private Path hadoopDbf;
- private Path hadoopPrj;
+ private final FileDescrip file;
+ private final Path hadoopShp;
+ private final Path hadoopDbf;
+ private final Path hadoopPrj;
private InputStream fileReaderShp = null;
private InputStream fileReaderDbf = null;
private InputStream fileReaderPrj = null;
private GeometryCursor geomCursor = null;
private DbfReader dbfReader = null;
- private ScalarWriter gidWriter;
- private ScalarWriter sridWriter;
- private ScalarWriter shapeTypeWriter;
- private ScalarWriter geomWriter;
- private RowSetLoader rowWriter;
+ private final ScalarWriter gidWriter;
+ private final ScalarWriter sridWriter;
+ private final ScalarWriter shapeTypeWriter;
+ private final ScalarWriter geomWriter;
+ private final RowSetLoader rowWriter;
private int srid;
private SpatialReference spatialReference;
- private final int maxRecords;
- public ShpBatchReader(int maxRecords) {
- this.maxRecords = maxRecords;
- }
+ public ShpBatchReader(FileSchemaNegotiator negotiator) {
+ this.file = negotiator.file();
+ hadoopShp = file.split().getPath();
- @Override
- public boolean open(FileSchemaNegotiator negotiator) {
- split = negotiator.split();
- hadoopShp = split.getPath();
-
- String filePath = split.getPath().toString();
+ String filePath = file.split().getPath().toString();
hadoopDbf = new Path(filePath.replace(".shp", ".dbf"));
hadoopPrj = new Path(filePath.replace(".shp", ".prj"));
@@ -97,15 +89,13 @@
.addNullable(GEOM_FIELD_NAME, TypeProtos.MinorType.VARBINARY);
negotiator.tableSchema(builder.buildSchema(), false);
- loader = negotiator.build();
+ ResultSetLoader loader = negotiator.build();
rowWriter = loader.writer();
gidWriter = rowWriter.scalar(GID_FIELD_NAME);
sridWriter = rowWriter.scalar(SRID_FIELD_NAME);
shapeTypeWriter = rowWriter.scalar(SHAPE_TYPE_FIELD_NAME);
geomWriter = rowWriter.scalar(GEOM_FIELD_NAME);
-
- return true;
}
@Override
@@ -125,7 +115,7 @@
private void openFile(FileSchemaNegotiator negotiator) {
try {
- fileReaderShp = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ fileReaderShp = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
byte[] shpBuf = new byte[fileReaderShp.available()];
fileReaderShp.read(shpBuf);
@@ -135,10 +125,10 @@
ShapefileReader shpReader = new ShapefileReader();
geomCursor = shpReader.getGeometryCursor(byteBuffer);
- fileReaderDbf = negotiator.fileSystem().openPossiblyCompressedStream(hadoopDbf);
+ fileReaderDbf = file.fileSystem().openPossiblyCompressedStream(hadoopDbf);
dbfReader = new DbfReader(fileReaderDbf);
- fileReaderPrj = negotiator.fileSystem().openPossiblyCompressedStream(hadoopPrj);
+ fileReaderPrj = file.fileSystem().openPossiblyCompressedStream(hadoopPrj);
byte[] prjBuf = new byte[fileReaderPrj.available()];
fileReaderPrj.read(prjBuf);
fileReaderPrj.close();
@@ -156,17 +146,13 @@
} catch (IOException e) {
throw UserException
.dataReadError(e)
- .message("Failed to open open input file: %s", split.getPath())
+ .message("Failed to open open input file: %s",file.split().getPath())
.addContext("User name", negotiator.userName())
.build(logger);
}
}
private void processShapefileSet(RowSetLoader rowWriter, final int gid, final Geometry geom, final Object[] dbfRow) {
- if (rowWriter.limitReached(maxRecords)) {
- return;
- }
-
rowWriter.start();
gidWriter.setInt(gid);
diff --git a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
index 5023ff6..c8ffb1d 100644
--- a/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
+++ b/contrib/format-esri/src/main/java/org/apache/drill/exec/store/esri/ShpFormatPlugin.java
@@ -20,13 +20,11 @@
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.hadoop.conf.Configuration;
@@ -37,15 +35,9 @@
public static class ShpReaderFactory extends FileReaderFactory {
- private final int maxRecords;
-
- public ShpReaderFactory(int maxRecords) {
- this.maxRecords = maxRecords;
- }
-
@Override
- public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
- return new ShpBatchReader(maxRecords);
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) {
+ return new ShpBatchReader(negotiator);
}
}
@@ -53,18 +45,11 @@
super(name, easyConfig(fsConf, formatConfig), context, storageConfig, formatConfig);
}
- @Override
- public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionSet options) {
- return new ShpBatchReader(scan.getMaxRecords());
- }
@Override
- protected FileScanFramework.FileScanBuilder frameworkBuilder(EasySubScan scan, OptionSet options) {
- FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
- builder.setReaderFactory(new ShpReaderFactory(scan.getMaxRecords()));
- initScanBuilder(builder, scan);
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
- return builder;
+ builder.readerFactory(new ShpReaderFactory());
}
private static EasyFormatConfig easyConfig(Configuration fsConf, ShpFormatConfig pluginConfig) {
@@ -77,7 +62,7 @@
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.defaultName(PLUGIN_NAME)
- .scanVersion(ScanFrameworkVersion.EVF_V1)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
.build();
}