DRILL-8174: Convert Avro format to EVF2 (#2511)
* DRILL-8174: Convert Avro format to EVF2
* Addressed review comments
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/DynamicSchemaFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/DynamicSchemaFilter.java
index 513a102..6871765 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/DynamicSchemaFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/v3/schema/DynamicSchemaFilter.java
@@ -139,7 +139,7 @@
protected ProjResult fromSchema(ColumnMetadata schemaCol,
ColumnMetadata probeCol) {
SchemaUtils.verifyConsistency(schemaCol, probeCol, source, errorContext);
- if (schemaCol.isMap()) {
+ if (schemaCol.isMap() || schemaCol.isDict()) {
return new ProjResult(true, schemaCol, mapProjection(schemaCol));
} else {
return new ProjResult(true, schemaCol);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
index ca587ae..2075b74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroBatchReader.java
@@ -17,19 +17,27 @@
*/
package org.apache.drill.exec.store.avro;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.FsInput;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.ColumnConverter;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -38,59 +46,52 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-
-public class AvroBatchReader implements ManagedReader<FileScanFramework.FileSchemaNegotiator> {
+public class AvroBatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(AvroBatchReader.class);
- private Path filePath;
- private long endPosition;
- private DataFileReader<GenericRecord> reader;
- private ResultSetLoader loader;
- private ColumnConverter converter;
+ private final Path filePath;
+ private final long endPosition;
+ private final DataFileReader<GenericRecord> reader;
+ private final RowSetLoader loader;
+ private final ColumnConverter converter;
+ private final CustomErrorContext errorContext;
// re-use container instance
private GenericRecord record;
- private final int maxRecords;
- public AvroBatchReader(int maxRecords) {
- this.maxRecords = maxRecords;
- }
+ public AvroBatchReader(AvroFormatConfig config, EasySubScan scan, FileSchemaNegotiator negotiator) {
+ errorContext = negotiator.parentErrorContext();
+ FileDescrip file = negotiator.file();
+ filePath = file.split().getPath();
- @Override
- public boolean open(FileScanFramework.FileSchemaNegotiator negotiator) {
- FileSplit split = negotiator.split();
- filePath = split.getPath();
-
- // Avro files are splittable, define reading start / end positions
- long startPosition = split.getStart();
- endPosition = startPosition + split.getLength();
-
+ // Avro files are splittable, define reading start / end positions.
+ long startPosition = file.split().getStart();
+ endPosition = startPosition + file.split().getLength();
logger.debug("Processing Avro file: {}, start position: {}, end position: {}",
- filePath, startPosition, endPosition);
+ filePath, startPosition, endPosition);
- reader = prepareReader(split, negotiator.fileSystem(),
- negotiator.userName(), negotiator.context().getFragmentContext().getQueryUserName());
-
+ reader = prepareReader(file.split(), file.fileSystem(),
+ negotiator.userName(), negotiator.context().getFragmentContext().getQueryUserName());
logger.debug("Avro file schema: {}", reader.getSchema());
+
TupleMetadata readerSchema = AvroSchemaUtil.convert(reader.getSchema());
logger.debug("Avro file converted schema: {}", readerSchema);
+
TupleMetadata providedSchema = negotiator.providedSchema();
TupleMetadata tableSchema = FixedReceiver.Builder.mergeSchemas(providedSchema, readerSchema);
logger.debug("Avro file table schema: {}", tableSchema);
- negotiator.tableSchema(tableSchema, true);
- loader = negotiator.build();
- AvroColumnConverterFactory factory = new AvroColumnConverterFactory(providedSchema);
- converter = factory.getRootConverter(providedSchema, readerSchema, loader.writer());
- return true;
+ negotiator.tableSchema(tableSchema, true);
+ ResultSetLoader setLoader = negotiator.build();
+ loader = setLoader.writer();
+
+ AvroColumnConverterFactory factory = new AvroColumnConverterFactory(providedSchema);
+ converter = factory.getRootConverter(providedSchema, readerSchema, loader);
}
@Override
public boolean next() {
- RowSetLoader rowWriter = loader.writer();
- while (!rowWriter.isFull()) {
- if (!nextLine(rowWriter)) {
+ while (!loader.isFull()) {
+ if (!nextLine(loader)) {
return false;
}
}
@@ -99,17 +100,7 @@
@Override
public void close() {
- if (reader == null) {
- return;
- }
-
- try {
- reader.close();
- } catch (IOException e) {
- logger.warn("Error closing Avro reader: {}", e.getMessage(), e);
- } finally {
- reader = null;
- }
+ AutoCloseables.closeSilently(reader);
}
@Override
@@ -122,60 +113,38 @@
} catch (IOException e) {
logger.trace("Unable to obtain Avro reader position: {}", e.getMessage(), e);
}
- return "AvroBatchReader[File=" + filePath
- + ", Position=" + currentPosition
- + "]";
+ return new PlanStringBuilder(this)
+ .unquotedField("File", filePath.toString())
+ .unquotedField("Position", String.valueOf(currentPosition))
+ .toString();
}
/**
- * Initialized Avro data reader based on given file system and file path.
- * Moves reader to the sync point from where to start reading the data.
- *
- * @param fileSplit file split
- * @param fs file system
- * @param opUserName name of the user whom to impersonate while reading the data
- * @param queryUserName name of the user who issues the query
- * @return Avro file reader
+ * Process one row of records.
+ * @param rowWriter
+ * @return true true if one row is processed, false the EOF is reached.
*/
- private DataFileReader<GenericRecord> prepareReader(FileSplit fileSplit, FileSystem fs, String opUserName, String queryUserName) {
- try {
- UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
- DataFileReader<GenericRecord> reader = ugi.doAs((PrivilegedExceptionAction<DataFileReader<GenericRecord>>) () ->
- new DataFileReader<>(new FsInput(fileSplit.getPath(), fs.getConf()), new GenericDatumReader<GenericRecord>()));
-
- // move to sync point from where to read the file
- reader.sync(fileSplit.getStart());
- return reader;
- } catch (IOException | InterruptedException e) {
- throw UserException.dataReadError(e)
- .message("Error preparing Avro reader")
- .addContext(String.format("Reader: %s", this))
- .build(logger);
- }
- }
-
private boolean nextLine(RowSetLoader rowWriter) {
- if (rowWriter.limitReached(maxRecords)) {
- return false;
- }
-
try {
if (!reader.hasNext() || reader.pastSync(endPosition)) {
return false;
}
record = reader.next(record);
} catch (IOException e) {
- throw UserException.dataReadError(e)
- .addContext(String.format("Reader %s", this))
+ throw UserException
+ .dataReadError(e)
+ .addContext(e.getMessage())
+ .addContext(errorContext)
.build(logger);
}
Schema schema = record.getSchema();
if (Schema.Type.RECORD != schema.getType()) {
- throw UserException.dataReadError()
+ throw UserException
+ .dataReadError()
.message("Root object must be record type. Found: %s", schema.getType())
- .addContext(String.format("Reader %s", this))
+ .addContext(errorContext)
.build(logger);
}
@@ -185,4 +154,33 @@
return true;
}
+
+ /**
+ * Initialized Avro data reader based on given file system and file path.
+ * Moves reader to the sync point from where to start reading the data.
+ *
+ * @param fileSplit A section of an input file.
+ * @param fs A fairly generic filesystem.
+ * @param opUserName name of the user whom to impersonate while reading the data.
+ * @param queryUserName name of the user who issues the query.
+ * @return Avro file reader
+ */
+ private DataFileReader<GenericRecord> prepareReader(FileSplit fileSplit, FileSystem fs, String opUserName, String queryUserName) {
+ try {
+ UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(opUserName, queryUserName);
+ DataFileReader<GenericRecord> reader = ugi.doAs((PrivilegedExceptionAction<DataFileReader<GenericRecord>>) () ->
+ new DataFileReader<>(new FsInput(fileSplit.getPath(), fs.getConf()), new GenericDatumReader<GenericRecord>()));
+
+ // Move to sync point from where to read the file.
+ reader.sync(fileSplit.getStart());
+ return reader;
+ } catch (IOException | InterruptedException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Error preparing Avro reader")
+ .addContext(e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index 5235b00..0505a5c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -20,10 +20,12 @@
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.hadoop.conf.Configuration;
@@ -53,30 +55,30 @@
.extensions(formatConfig.getExtensions())
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
- .scanVersion(ScanFrameworkVersion.EVF_V1)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
.build();
}
@Override
- protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
- FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
- builder.setReaderFactory(new AvroReaderFactory(scan.getMaxRecords()));
- initScanBuilder(builder, scan);
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
- return builder;
+ builder.readerFactory(new AvroReaderFactory(formatConfig, scan));
}
- private static class AvroReaderFactory extends FileScanFramework.FileReaderFactory {
+ private static class AvroReaderFactory extends FileReaderFactory {
- private final int maxRecords;
- public AvroReaderFactory(int maxRecords) {
- this.maxRecords = maxRecords;
+ private final AvroFormatConfig config;
+ private final EasySubScan scan;
+
+ public AvroReaderFactory(AvroFormatConfig config, EasySubScan scan) {
+ this.config = config;
+ this.scan = scan;
}
@Override
- public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
- return new AvroBatchReader(maxRecords);
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
+ return new AvroBatchReader(config, scan, negotiator);
}
}
}