DRILL-8353: Format plugin for Delta Lake (#2702)
diff --git a/contrib/format-deltalake/README.md b/contrib/format-deltalake/README.md
new file mode 100644
index 0000000..e4cbaef
--- /dev/null
+++ b/contrib/format-deltalake/README.md
@@ -0,0 +1,36 @@
+# Delta Lake format plugin
+
+This format plugin enables Drill to query Delta Lake tables.
+
+## Supported optimizations and features
+
+### Project pushdown
+
+This format plugin supports project and filter pushdown optimizations.
+
+For the case of project pushdown, only columns specified in the query will be read, even when they are nested columns.
+
+### Filter pushdown
+
+For the case of filter pushdown, all expressions supported by Delta Lake API will be pushed down, so only data that
+matches the filter expression will be read. Additionally, filtering logic for parquet files is enabled
+to allow pruning of parquet files that do not match the filter expression.
+
+## Configuration
+
+The format plugin has the following configuration options:
+
+- `type` - format plugin type, should be `'delta'`
+
+### Format config example:
+
+```json
+{
+ "type": "file",
+ "formats": {
+ "delta": {
+ "type": "delta"
+ }
+ }
+}
+```
diff --git a/contrib/format-deltalake/pom.xml b/contrib/format-deltalake/pom.xml
new file mode 100644
index 0000000..5f0e31f
--- /dev/null
+++ b/contrib/format-deltalake/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>drill-contrib-parent</artifactId>
+ <groupId>org.apache.drill.contrib</groupId>
+ <version>2.0.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>drill-deltalake-format</artifactId>
+
+ <name>Drill : Contrib : Format : Delta Lake</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.delta</groupId>
+ <artifactId>delta-storage</artifactId>
+ <version>2.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>io.delta</groupId>
+ <artifactId>delta-standalone_2.13</artifactId>
+ <version>0.5.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-avro</artifactId>
+ <version>${parquet.version}</version>
+ </dependency>
+
+ <!-- Test dependency -->
+ <dependency>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>drill-java-exec</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.drill</groupId>
+ <artifactId>drill-common</artifactId>
+ <classifier>tests</classifier>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaGroupScan.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaGroupScan.java
new file mode 100644
index 0000000..2130bea
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaGroupScan.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.DeltaScan;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.data.CloseableIterator;
+import io.delta.standalone.types.ArrayType;
+import io.delta.standalone.types.BinaryType;
+import io.delta.standalone.types.BooleanType;
+import io.delta.standalone.types.ByteType;
+import io.delta.standalone.types.DataType;
+import io.delta.standalone.types.DateType;
+import io.delta.standalone.types.DecimalType;
+import io.delta.standalone.types.DoubleType;
+import io.delta.standalone.types.FloatType;
+import io.delta.standalone.types.IntegerType;
+import io.delta.standalone.types.LongType;
+import io.delta.standalone.types.ShortType;
+import io.delta.standalone.types.StringType;
+import io.delta.standalone.types.StructField;
+import io.delta.standalone.types.StructType;
+import io.delta.standalone.types.TimestampType;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.StatisticsProvider;
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.metastore.store.FileSystemMetadataProviderManager;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPlugin;
+import org.apache.drill.exec.store.delta.plan.DrillExprToDeltaTranslator;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.metastore.metadata.LocationProvider;
+import org.apache.drill.metastore.metadata.Metadata;
+import org.apache.drill.metastore.statistics.ColumnStatistics;
+import org.apache.drill.metastore.statistics.TableStatisticsKind;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@JsonTypeName("delta-scan")
+public class DeltaGroupScan extends AbstractParquetGroupScan {
+
+ private final DeltaFormatPlugin formatPlugin;
+
+ private final String path;
+
+ private final TupleMetadata schema;
+
+ private final LogicalExpression condition;
+
+ private final DrillFileSystem fs;
+
+ private List<AddFile> addFiles;
+
+ private List<EndpointAffinity> endpointAffinities;
+
+ private final Map<Path, Map<String, String>> partitionHolder;
+
+ @JsonCreator
+ public DeltaGroupScan(
+ @JsonProperty("userName") String userName,
+ @JsonProperty("entries") List<ReadEntryWithPath> entries,
+ @JsonProperty("storage") StoragePluginConfig storageConfig,
+ @JsonProperty("format") FormatPluginConfig formatConfig,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("schema") TupleMetadata schema,
+ @JsonProperty("path") String path,
+ @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
+ @JsonProperty("condition") LogicalExpression condition,
+ @JsonProperty("limit") Integer limit,
+ @JsonProperty("partitionHolder") Map<Path, Map<String, String>> partitionHolder,
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException {
+ super(ImpersonationUtil.resolveUserName(userName), columns, entries, readerConfig, condition);
+ this.formatPlugin = pluginRegistry.resolveFormat(storageConfig, formatConfig, DeltaFormatPlugin.class);
+ this.columns = columns;
+ this.path = path;
+ this.schema = schema;
+ this.condition = condition;
+ this.limit = limit;
+ this.fs = ImpersonationUtil.createFileSystem(
+ ImpersonationUtil.resolveUserName(userName), formatPlugin.getFsConf());
+
+ DeltaParquetTableMetadataProvider metadataProvider =
+ defaultTableMetadataProviderBuilder(new FileSystemMetadataProviderManager())
+ .withEntries(entries)
+ .withFormatPlugin(formatPlugin)
+ .withReaderConfig(readerConfig)
+ .withSchema(schema)
+ .build();
+
+ this.metadataProvider = metadataProvider;
+ this.entries = metadataProvider.getEntries();
+ this.partitionHolder = partitionHolder;
+ this.fileSet = metadataProvider.getFileSet();
+
+ init();
+ }
+
+ private DeltaGroupScan(DeltaGroupScanBuilder builder) throws IOException {
+ super(ImpersonationUtil.resolveUserName(builder.userName), builder.columns,
+ builder.entries, builder.readerConfig, builder.condition);
+ this.formatPlugin = builder.formatPlugin;
+ this.columns = builder.columns;
+ this.path = builder.path;
+ this.schema = builder.schema;
+ this.condition = builder.condition;
+ this.limit = builder.limit;
+ this.fs = ImpersonationUtil.createFileSystem(
+ ImpersonationUtil.resolveUserName(userName), formatPlugin.getFsConf());
+
+ DeltaParquetTableMetadataProvider metadataProvider =
+ defaultTableMetadataProviderBuilder(new FileSystemMetadataProviderManager())
+ .withEntries(entries)
+ .withFormatPlugin(formatPlugin)
+ .withReaderConfig(readerConfig)
+ .withSchema(schema)
+ .build();
+
+ this.metadataProvider = metadataProvider;
+ this.entries = metadataProvider.getEntries();
+ this.partitionHolder = builder.partitionValues;
+ this.fileSet = metadataProvider.getFileSet();
+
+ init();
+ }
+
+ /**
+ * Private constructor, used for cloning.
+ *
+ * @param that The DeltaGroupScan to clone
+ */
+ private DeltaGroupScan(DeltaGroupScan that) {
+ super(that);
+ this.columns = that.columns;
+ this.formatPlugin = that.formatPlugin;
+ this.path = that.path;
+ this.condition = that.condition;
+ this.schema = that.schema;
+ this.mappings = that.mappings;
+ this.fs = that.fs;
+ this.limit = that.limit;
+ this.addFiles = that.addFiles;
+ this.endpointAffinities = that.endpointAffinities;
+ this.partitionHolder = that.partitionHolder;
+ }
+
+ @Override
+ protected DeltaParquetTableMetadataProvider.Builder tableMetadataProviderBuilder(MetadataProviderManager source) {
+ return defaultTableMetadataProviderBuilder(source);
+ }
+
+ @Override
+ protected DeltaParquetTableMetadataProvider.Builder defaultTableMetadataProviderBuilder(MetadataProviderManager source) {
+ return new DeltaParquetTableMetadataProvider.Builder(source);
+ }
+
+ public static DeltaGroupScanBuilder builder() {
+ return new DeltaGroupScanBuilder();
+ }
+
+ @Override
+ public DeltaGroupScan clone(List<SchemaPath> columns) {
+ try {
+ return toBuilder().columns(columns).build();
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ @Override
+ public DeltaGroupScan applyLimit(int maxRecords) {
+ DeltaGroupScan clone = new DeltaGroupScan(this);
+ clone.limit = maxRecords;
+ return clone;
+ }
+
+ @Override
+ public SubScan getSpecificScan(int minorFragmentId) {
+ List<RowGroupReadEntry> readEntries = getReadEntries(minorFragmentId);
+ Map<Path, Map<String, String>> subPartitionHolder = new HashMap<>();
+ for (RowGroupReadEntry readEntry : readEntries) {
+ Map<String, String> values = partitionHolder.get(readEntry.getPath());
+ subPartitionHolder.put(readEntry.getPath(), values);
+ }
+ return new DeltaRowGroupScan(getUserName(), formatPlugin, readEntries, columns, subPartitionHolder,
+ readerConfig, filter, getTableMetadata().getSchema());
+ }
+
+ @Override
+ public DeltaGroupScan clone(FileSelection selection) throws IOException {
+ DeltaGroupScan newScan = new DeltaGroupScan(this);
+ newScan.modifyFileSelection(selection);
+ newScan.init();
+ return newScan;
+ }
+
+ @Override
+ protected RowGroupScanFilterer<?> getFilterer() {
+ return new DeltaParquetScanFilterer(this);
+ }
+
+ @Override
+ protected Collection<DrillbitEndpoint> getDrillbits() {
+ return formatPlugin.getContext().getBits();
+ }
+
+ @Override
+ protected AbstractParquetGroupScan cloneWithFileSelection(Collection<Path> filePaths) throws IOException {
+ FileSelection newSelection = new FileSelection(null, new ArrayList<>(filePaths), null, null, false);
+ return clone(newSelection);
+ }
+
+ @Override
+ protected boolean supportsFileImplicitColumns() {
+ // current group scan should populate directory partition values
+ return false;
+ }
+
+ @Override
+ protected List<String> getPartitionValues(LocationProvider locationProvider) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new DeltaGroupScan(this);
+ }
+
+ @Override
+ public boolean supportsLimitPushdown() {
+ return false;
+ }
+
+ @JsonProperty("schema")
+ public TupleMetadata getSchema() {
+ return schema;
+ }
+
+ @JsonProperty("storage")
+ public StoragePluginConfig getStorageConfig() {
+ return formatPlugin.getStorageConfig();
+ }
+
+ @JsonProperty("format")
+ public FormatPluginConfig getFormatConfig() {
+ return formatPlugin.getConfig();
+ }
+
+ @JsonProperty("path")
+ public String getPath() {
+ return path;
+ }
+
+ @JsonProperty("condition")
+ public LogicalExpression getCondition() {
+ return condition;
+ }
+
+ @JsonProperty("partitionHolder")
+ public Map<Path, Map<String, String>> getPartitionHolder() {
+ return partitionHolder;
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("path", path)
+ .field("entries", entries)
+ .field("schema", schema)
+ .field("columns", columns)
+ .field("addFiles", addFiles)
+ .field("limit", limit)
+ .field("numFiles", getEntries().size())
+ .toString();
+ }
+
+ public DeltaGroupScanBuilder toBuilder() {
+ return new DeltaGroupScanBuilder()
+ .userName(this.userName)
+ .formatPlugin(this.formatPlugin)
+ .schema(this.schema)
+ .path(this.path)
+ .condition(this.condition)
+ .columns(this.columns)
+ .limit(this.limit);
+ }
+
+ private static class DeltaParquetScanFilterer extends RowGroupScanFilterer<DeltaParquetScanFilterer> {
+
+ public DeltaParquetScanFilterer(DeltaGroupScan source) {
+ super(source);
+ }
+
+ @Override
+ protected AbstractParquetGroupScan getNewScan() {
+ return new DeltaGroupScan((DeltaGroupScan) source);
+ }
+
+ @Override
+ protected DeltaParquetScanFilterer self() {
+ return this;
+ }
+
+ @Override
+ protected <T extends Metadata> Map<SchemaPath, ColumnStatistics<?>> getImplicitColumnStatistics(
+ OptionManager optionManager, T metadata, Map<SchemaPath, ColumnStatistics<?>> columnsStatistics) {
+ if (metadata instanceof LocationProvider && optionManager != null) {
+ LocationProvider locationProvider = (LocationProvider) metadata;
+ columnsStatistics = new HashMap<>(columnsStatistics);
+ Map<String, String> partitions =
+ ((DeltaGroupScan) source).getPartitionHolder().get(locationProvider.getPath());
+ for (Map.Entry<String, String> partitionValue : partitions.entrySet()) {
+ TypeProtos.MinorType minorType =
+ tableSchema.column(partitionValue.getKey()).getType().getMinorType();
+ String value = partitionValue.getValue();
+ if (value != null) {
+ columnsStatistics.put(SchemaPath.getCompoundPath(partitionValue.getKey()),
+ StatisticsProvider.getConstantColumnStatistics(
+ castPartitionValue(value, minorType), minorType));
+ } else {
+ Long rowCount = TableStatisticsKind.ROW_COUNT.getValue(metadata);
+ columnsStatistics.put(SchemaPath.getCompoundPath(partitionValue.getKey()),
+ StatisticsProvider.getColumnStatistics(null, null, rowCount, minorType));
+ }
+ }
+ }
+
+ return columnsStatistics;
+ }
+
+ private Object castPartitionValue(String value, TypeProtos.MinorType type) {
+ switch (type) {
+ case BIT:
+ return Boolean.parseBoolean(value);
+ case TINYINT:
+ return Byte.parseByte(value);
+ case SMALLINT:
+ return Short.parseShort(value);
+ case INT:
+ return Integer.parseInt(value);
+ case BIGINT:
+ return Long.parseLong(value);
+ case FLOAT4:
+ return Float.parseFloat(value);
+ case FLOAT8:
+ return Double.parseDouble(value);
+ case DATE:
+ return DateUtility.parseLocalDate(value);
+ case TIME:
+ return DateUtility.parseLocalTime(value);
+ case TIMESTAMP:
+ return DateUtility.parseBest(value);
+ case VARCHAR:
+ return value;
+ case VARDECIMAL:
+ return new BigDecimal(value);
+ default:
+ throw new UnsupportedOperationException("Unsupported partition type: " + type);
+ }
+ }
+ }
+
+ public static class DeltaGroupScanBuilder {
+ private String userName;
+
+ private DeltaFormatPlugin formatPlugin;
+
+ private TupleMetadata schema;
+
+ private String path;
+
+ private LogicalExpression condition;
+
+ private List<SchemaPath> columns;
+
+ private int limit;
+
+ private List<ReadEntryWithPath> entries;
+
+ private ParquetReaderConfig readerConfig = ParquetReaderConfig.getDefaultInstance();
+
+ private Map<Path, Map<String, String>> partitionValues;
+
+ public DeltaGroupScanBuilder userName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public DeltaGroupScanBuilder formatPlugin(DeltaFormatPlugin formatPlugin) {
+ this.formatPlugin = formatPlugin;
+ return this;
+ }
+
+ public DeltaGroupScanBuilder schema(TupleMetadata schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ public DeltaGroupScanBuilder path(String path) {
+ this.path = path;
+ return this;
+ }
+
+ public DeltaGroupScanBuilder condition(LogicalExpression condition) {
+ this.condition = condition;
+ return this;
+ }
+
+ public DeltaGroupScanBuilder columns(List<SchemaPath> columns) {
+ this.columns = columns;
+ return this;
+ }
+
+ public DeltaGroupScanBuilder limit(int maxRecords) {
+ this.limit = maxRecords;
+ return this;
+ }
+
+ public DeltaGroupScanBuilder readerConfig(ParquetReaderConfig readerConfig) {
+ this.readerConfig = readerConfig;
+ return this;
+ }
+
+ public DeltaGroupScan build() throws IOException {
+ DeltaLog log = DeltaLog.forTable(formatPlugin.getFsConf(), path);
+ Snapshot snapshot = log.snapshot();
+ StructType structType = snapshot.getMetadata().getSchema();
+ schema = toSchema(structType);
+
+ DeltaScan scan = Optional.ofNullable(condition)
+ .map(c -> c.accept(new DrillExprToDeltaTranslator(structType), null))
+ .map(snapshot::scan)
+ .orElse(snapshot.scan());
+
+ try {
+ CloseableIterator<AddFile> files = scan.getFiles();
+ ArrayList<AddFile> addFiles = Lists.newArrayList(() -> files);
+ entries = addFiles.stream()
+ .map(addFile -> new ReadEntryWithPath(new Path(URI.create(path).getPath(), URI.create(addFile.getPath()).getPath())))
+ .collect(Collectors.toList());
+
+ partitionValues = addFiles.stream()
+ .collect(Collectors.toMap(
+ addFile -> new Path(URI.create(path).getPath(), URI.create(addFile.getPath()).getPath()),
+ addFile -> collectPartitionedValues(snapshot, addFile)));
+
+ files.close();
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+
+ return new DeltaGroupScan(this);
+ }
+
+ private Map<String, String> collectPartitionedValues(Snapshot snapshot, AddFile addFile) {
+ Map<String, String> partitionValues = new LinkedHashMap<>();
+ snapshot.getMetadata().getPartitionColumns().stream()
+ .map(col -> Pair.of(col, addFile.getPartitionValues().get(col)))
+ .forEach(pair -> partitionValues.put(pair.getKey(), pair.getValue()));
+ return partitionValues;
+ }
+
+ private TupleMetadata toSchema(StructType structType) {
+ TupleBuilder tupleBuilder = new TupleBuilder();
+ for (StructField field : structType.getFields()) {
+ tupleBuilder.addColumn(toColumnMetadata(field));
+ }
+
+ return tupleBuilder.schema();
+ }
+
+ private ColumnMetadata toColumnMetadata(StructField field) {
+ DataType dataType = field.getDataType();
+ if (dataType instanceof ArrayType) {
+ DataType elementType = ((ArrayType) dataType).getElementType();
+ if (elementType instanceof ArrayType) {
+ return MetadataUtils.newRepeatedList(field.getName(),
+ toColumnMetadata(new StructField(field.getName(), ((ArrayType) elementType).getElementType(), false)));
+ } else if (elementType instanceof StructType) {
+ return MetadataUtils.newMapArray(field.getName(), toSchema((StructType) elementType));
+ }
+ return MetadataUtils.newScalar(field.getName(), toMinorType(elementType), TypeProtos.DataMode.REPEATED);
+ } else if (dataType instanceof StructType) {
+ return MetadataUtils.newMap(field.getName(), toSchema((StructType) dataType));
+ } else {
+ return MetadataUtils.newScalar(field.getName(), toMinorType(field.getDataType()),
+ field.isNullable() ? TypeProtos.DataMode.OPTIONAL : TypeProtos.DataMode.REQUIRED);
+ }
+ }
+
+ private TypeProtos.MinorType toMinorType(DataType dataType) {
+ if (dataType instanceof BinaryType) {
+ return TypeProtos.MinorType.VARBINARY;
+ } else if (dataType instanceof BooleanType) {
+ return TypeProtos.MinorType.BIT;
+ } else if (dataType instanceof ByteType) {
+ return TypeProtos.MinorType.TINYINT;
+ } else if (dataType instanceof DateType) {
+ return TypeProtos.MinorType.DATE;
+ } else if (dataType instanceof DecimalType) {
+ return TypeProtos.MinorType.VARDECIMAL;
+ } else if (dataType instanceof DoubleType) {
+ return TypeProtos.MinorType.FLOAT8;
+ } else if (dataType instanceof FloatType) {
+ return TypeProtos.MinorType.FLOAT4;
+ } else if (dataType instanceof IntegerType) {
+ return TypeProtos.MinorType.INT;
+ } else if (dataType instanceof LongType) {
+ return TypeProtos.MinorType.BIGINT;
+ } else if (dataType instanceof ShortType) {
+ return TypeProtos.MinorType.SMALLINT;
+ } else if (dataType instanceof StringType) {
+ return TypeProtos.MinorType.VARCHAR;
+ } else if (dataType instanceof TimestampType) {
+ return TypeProtos.MinorType.TIMESTAMP;
+ } else {
+ throw new DrillRuntimeException("Unsupported data type: " + dataType);
+ }
+ }
+ }
+}
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaParquetTableMetadataProvider.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaParquetTableMetadataProvider.java
new file mode 100644
index 0000000..aeec5b0
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaParquetTableMetadataProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta;
+
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPlugin;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.parquet.BaseParquetMetadataProvider;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * This is Metadata provider for Delta tables, which are read by Drill native Parquet reader
+ */
+public class DeltaParquetTableMetadataProvider extends BaseParquetMetadataProvider {
+
+ private final DeltaFormatPlugin deltaFormatPlugin;
+
+ private DeltaParquetTableMetadataProvider(Builder builder) throws IOException {
+ super(builder);
+
+ this.deltaFormatPlugin = builder.formatPlugin;
+
+ init((BaseParquetMetadataProvider) builder.metadataProviderManager().getTableMetadataProvider());
+ }
+
+ @Override
+ protected void initInternal() throws IOException {
+ Map<FileStatus, FileSystem> fileStatusConfMap = new LinkedHashMap<>();
+ for (ReadEntryWithPath entry : entries) {
+ Path path = entry.getPath();
+ FileSystem fs = path.getFileSystem(deltaFormatPlugin.getFsConf());
+ fileStatusConfMap.put(fs.getFileStatus(Path.getPathWithoutSchemeAndAuthority(path)), fs);
+ }
+ parquetTableMetadata = Metadata.getParquetTableMetadata(fileStatusConfMap, readerConfig);
+ }
+
+ public static class Builder extends BaseParquetMetadataProvider.Builder<Builder> {
+ private DeltaFormatPlugin formatPlugin;
+
+ public Builder(MetadataProviderManager source) {
+ super(source);
+ }
+
+ protected Builder withFormatPlugin(DeltaFormatPlugin formatPlugin) {
+ this.formatPlugin = formatPlugin;
+ return self();
+ }
+
+ @Override
+ protected Builder self() {
+ return this;
+ }
+
+ @Override
+ public DeltaParquetTableMetadataProvider build() throws IOException {
+ return new DeltaParquetTableMetadataProvider(this);
+ }
+ }
+}
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaRowGroupScan.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaRowGroupScan.java
new file mode 100644
index 0000000..5645c3f
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaRowGroupScan.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPlugin;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPluginConfig;
+import org.apache.drill.exec.store.parquet.AbstractParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@JsonTypeName("delta-row-group-scan")
+public class DeltaRowGroupScan extends AbstractParquetRowGroupScan {
+
+ public static final String OPERATOR_TYPE = "DELTA_ROW_GROUP_SCAN";
+
+ private final DeltaFormatPlugin formatPlugin;
+ private final DeltaFormatPluginConfig formatPluginConfig;
+ private final Map<Path, Map<String, String>> partitions;
+
+ @JsonCreator
+ public DeltaRowGroupScan(@JacksonInject StoragePluginRegistry registry,
+ @JsonProperty("userName") String userName,
+ @JsonProperty("storage") StoragePluginConfig storageConfig,
+ @JsonProperty("formatPluginConfig") FormatPluginConfig formatPluginConfig,
+ @JsonProperty("rowGroupReadEntries") List<RowGroupReadEntry> rowGroupReadEntries,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("partitions") Map<Path, Map<String, String>> partitions,
+ @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
+ @JsonProperty("filter") LogicalExpression filter,
+ @JsonProperty("schema") TupleMetadata schema) {
+ this(userName,
+ registry.resolveFormat(storageConfig, formatPluginConfig, DeltaFormatPlugin.class),
+ rowGroupReadEntries,
+ columns,
+ partitions,
+ readerConfig,
+ filter,
+ schema);
+ }
+
+ public DeltaRowGroupScan(String userName,
+ DeltaFormatPlugin formatPlugin,
+ List<RowGroupReadEntry> rowGroupReadEntries,
+ List<SchemaPath> columns,
+ Map<Path, Map<String, String>> partitions,
+ ParquetReaderConfig readerConfig,
+ LogicalExpression filter,
+ TupleMetadata schema) {
+ super(userName, rowGroupReadEntries, columns, readerConfig, filter,null, schema);
+ this.formatPlugin = formatPlugin;
+ this.formatPluginConfig = formatPlugin.getConfig();
+ this.partitions = partitions;
+ }
+
+ @JsonProperty
+ public DeltaFormatPluginConfig getFormatPluginConfig() {
+ return formatPluginConfig;
+ }
+
+ @JsonProperty
+ public Map<Path, Map<String, String>> getPartitions() {
+ return partitions;
+ }
+
+ @Override
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+ Preconditions.checkArgument(children.isEmpty());
+ return new DeltaRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, partitions,
+ readerConfig, filter, schema);
+ }
+
+ @Override
+ public String getOperatorType() {
+ return OPERATOR_TYPE;
+ }
+
+ @Override
+ public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
+ return new DeltaRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, partitions,
+ readerConfig, filter, schema);
+ }
+
+ @Override
+ public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) {
+ return formatPlugin.getFsConf();
+ }
+
+ @Override
+ public boolean supportsFileImplicitColumns() {
+ return true;
+ }
+
+ @Override
+ public List<String> getPartitionValues(RowGroupReadEntry rowGroupReadEntry) {
+ return Collections.emptyList();
+ }
+
+ public Map<String, String> getPartitions(RowGroupReadEntry rowGroupReadEntry) {
+ return partitions.get(rowGroupReadEntry.getPath());
+ }
+
+ @Override
+ public boolean isImplicitColumn(SchemaPath path, String partitionColumnLabel) {
+ return partitions.values().stream()
+ .anyMatch(map -> map.containsKey(path.getAsUnescapedPath()));
+ }
+}
+
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatMatcher.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatMatcher.java
new file mode 100644
index 0000000..502a74d
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatMatcher.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import io.delta.standalone.DeltaLog;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
+import org.apache.hadoop.fs.FileStatus;
+
+public class DeltaFormatMatcher extends FormatMatcher {
+
+ private final DeltaFormatPlugin formatPlugin;
+
+ public DeltaFormatMatcher(DeltaFormatPlugin formatPlugin) {
+ this.formatPlugin = formatPlugin;
+ }
+
+ @Override
+ public boolean supportDirectoryReads() {
+ return true;
+ }
+
+ @Override
+ public DrillTable isReadable(DrillFileSystem fs, FileSelection selection, FileSystemPlugin fsPlugin,
+ String storageEngineName, SchemaConfig schemaConfig) {
+ if (DeltaLog.forTable(fsPlugin.getFsConf(), selection.getSelectionRoot()).tableExists()) {
+ FormatSelection formatSelection = new FormatSelection(formatPlugin.getConfig(), selection);
+ return new PluginDrillTable(fsPlugin, storageEngineName, schemaConfig.getUserName(), formatSelection, formatPlugin.getConvention());
+ }
+ return null;
+ }
+
+ @Override
+ public boolean isFileReadable(DrillFileSystem fs, FileStatus status) {
+ return false;
+ }
+
+ @Override
+ public FormatPlugin getFormatPlugin() {
+ return formatPlugin;
+ }
+
+ public int priority() {
+ return HIGH_PRIORITY;
+ }
+}
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPlugin.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPlugin.java
new file mode 100644
index 0000000..37e4803
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPlugin.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.delta.DeltaGroupScan;
+import org.apache.drill.exec.store.delta.plan.DeltaPluginImplementor;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DeltaFormatPlugin implements FormatPlugin {
+
+ private static final String DELTA_CONVENTION_PREFIX = "DELTA.";
+
+ /**
+ * Generator for format id values. Formats with the same name may be defined
+ * in multiple storage plugins, so using the unique id within the convention name
+ * to ensure the rule names will be unique for different plugin instances.
+ */
+ private static final AtomicInteger NEXT_ID = new AtomicInteger(0);
+
+ private final FileSystemConfig storageConfig;
+
+ private final DeltaFormatPluginConfig config;
+
+ private final Configuration fsConf;
+
+ private final DrillbitContext context;
+
+ private final String name;
+
+ private final DeltaFormatMatcher matcher;
+
+ private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+
+ public DeltaFormatPlugin(
+ String name,
+ DrillbitContext context,
+ Configuration fsConf,
+ FileSystemConfig storageConfig,
+ DeltaFormatPluginConfig config) {
+ this.storageConfig = storageConfig;
+ this.config = config;
+ this.fsConf = fsConf;
+ this.context = context;
+ this.name = name;
+ this.matcher = new DeltaFormatMatcher(this);
+ this.storagePluginRulesSupplier = storagePluginRulesSupplier(name + NEXT_ID.getAndIncrement());
+ }
+
+ private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name) {
+ Convention convention = new Convention.Impl(DELTA_CONVENTION_PREFIX + name, PluginRel.class);
+ return StoragePluginRulesSupplier.builder()
+ .rulesProvider(new PluginRulesProviderImpl(convention, DeltaPluginImplementor::new))
+ .supportsFilterPushdown(true)
+ .supportsProjectPushdown(true)
+ .supportsLimitPushdown(true)
+ .convention(convention)
+ .build();
+ }
+
+ @Override
+ public boolean supportsRead() {
+ return true;
+ }
+
+ @Override
+ public boolean supportsWrite() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsAutoPartitioning() {
+ return false;
+ }
+
+ @Override
+ public FormatMatcher getMatcher() {
+ return matcher;
+ }
+
+ @Override
+ public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) {
+ switch (phase) {
+ case PHYSICAL:
+ case LOGICAL:
+ return storagePluginRulesSupplier.getOptimizerRules();
+ case LOGICAL_PRUNE_AND_JOIN:
+ case LOGICAL_PRUNE:
+ case PARTITION_PRUNING:
+ case JOIN_PLANNING:
+ default:
+ return Collections.emptySet();
+ }
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException {
+ return getGroupScan(userName, selection, columns, (OptionManager) null);
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns, OptionManager options) throws IOException {
+ return getGroupScan(userName, selection, columns, options, null);
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+ List<SchemaPath> columns, OptionManager options, MetadataProviderManager metadataProviderManager) throws IOException {
+ ParquetReaderConfig readerConfig = ParquetReaderConfig.builder()
+ .withConf(fsConf)
+ .withOptions(options)
+ .build();
+ return DeltaGroupScan.builder()
+ .userName(userName)
+ .formatPlugin(this)
+ .readerConfig(readerConfig)
+ .path(selection.selectionRoot.toUri().getPath())
+ .columns(columns)
+ .limit(-1)
+ .build();
+ }
+
+ @Override
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+ List<SchemaPath> columns, MetadataProviderManager metadataProviderManager) throws IOException {
+ SchemaProvider schemaProvider = metadataProviderManager.getSchemaProvider();
+ TupleMetadata schema = schemaProvider != null
+ ? schemaProvider.read().getSchema()
+ : null;
+ return DeltaGroupScan.builder()
+ .userName(userName)
+ .formatPlugin(this)
+ .readerConfig(ParquetReaderConfig.builder().withConf(fsConf).build())
+ .schema(schema)
+ .path(selection.selectionRoot.toUri().getPath())
+ .columns(columns)
+ .limit(-1)
+ .build();
+ }
+
+ @Override
+ public boolean supportsStatistics() {
+ return false;
+ }
+
+ @Override
+ public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
+ throw new UnsupportedOperationException("unimplemented");
+ }
+
+ @Override
+ public void writeStatistics(DrillStatsTable.TableStatistics statistics, FileSystem fs, Path statsTablePath) {
+ throw new UnsupportedOperationException("unimplemented");
+ }
+
+ @Override
+ public DeltaFormatPluginConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public FileSystemConfig getStorageConfig() {
+ return storageConfig;
+ }
+
+ @Override
+ public Configuration getFsConf() {
+ return fsConf;
+ }
+
+ @Override
+ public DrillbitContext getContext() {
+ return context;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ public Convention getConvention() {
+ return storagePluginRulesSupplier.convention();
+ }
+
+}
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
new file mode 100644
index 0000000..9fd7bb7
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+@JsonTypeName(DeltaFormatPluginConfig.NAME)
+public class DeltaFormatPluginConfig implements FormatPluginConfig {
+
+ public static final String NAME = "delta";
+
+ @JsonCreator
+ public DeltaFormatPluginConfig() {
+ }
+}
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DeltaPluginImplementor.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DeltaPluginImplementor.java
new file mode 100644
index 0000000..84be68c
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DeltaPluginImplementor.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.plan;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.logical.DrillConstExecutor;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.delta.DeltaGroupScan;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.plan.AbstractPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class DeltaPluginImplementor extends AbstractPluginImplementor {
+
+ private DeltaGroupScan groupScan;
+
+ @Override
+ public void implement(StoragePluginTableScan scan) {
+ groupScan = (DeltaGroupScan) scan.getGroupScan();
+ }
+
+ @Override
+ public void implement(PluginFilterRel filter) throws IOException {
+ visitChild(filter.getInput());
+
+ RexNode condition = filter.getCondition();
+ LogicalExpression expression = DrillOptiq.toDrill(
+ new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+ filter.getInput(),
+ condition);
+
+ DrillConstExecutor executor = (DrillConstExecutor) filter.getCluster().getPlanner().getExecutor();
+ PlannerSettings plannerSettings = filter.getCluster().getPlanner().getContext().unwrap(PlannerSettings.class);
+ groupScan = Optional.ofNullable((DeltaGroupScan) groupScan.applyFilter(expression, executor.getUdfUtilities(),
+ plannerSettings.functionImplementationRegistry, plannerSettings.getOptions()))
+ .orElse(groupScan);
+ }
+
+ @Override
+ public void implement(PluginProjectRel project) throws IOException {
+ visitChild(project.getInput());
+
+ DrillParseContext context = new DrillParseContext(PrelUtil.getPlannerSettings(project.getCluster().getPlanner()));
+ RelNode input = project.getInput();
+
+ List<SchemaPath> projects = project.getProjects().stream()
+ .map(e -> (SchemaPath) DrillOptiq.toDrill(context, input, e))
+ .collect(Collectors.toList());
+ groupScan = groupScan.clone(projects);
+ }
+
+ @Override
+ public boolean canImplement(Filter filter) {
+ FilterFinder filterFinder = new FilterFinder();
+ filter.getInput().accept(filterFinder);
+ return filterFinder.getFilter() == null;
+ }
+
+ @Override
+ public void implement(PluginLimitRel limit) throws IOException {
+ visitChild(limit.getInput());
+ int maxRecords = getArtificialLimit(limit);
+ if (maxRecords >= 0) {
+ groupScan = groupScan.applyLimit(maxRecords);
+ }
+ }
+
+ @Override
+ public boolean canImplement(DrillLimitRelBase limit) {
+ if (hasPluginGroupScan(limit)) {
+ FirstLimitFinder finder = new FirstLimitFinder();
+ limit.getInput().accept(finder);
+ int oldLimit = getArtificialLimit(finder.getFetch(), finder.getOffset());
+ int newLimit = getArtificialLimit(limit);
+ return newLimit >= 0 && (oldLimit < 0 || newLimit < oldLimit);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean artificialLimit() {
+ return true;
+ }
+
+ @Override
+ public boolean artificialFilter() {
+ return true;
+ }
+
+ @Override
+ protected Class<? extends StoragePlugin> supportedPlugin() {
+ return FileSystemPlugin.class;
+ }
+
+ @Override
+ public boolean splitProject(Project project) {
+ return true;
+ }
+
+ @Override
+ public boolean canImplement(Project project) {
+ return hasPluginGroupScan(project);
+ }
+
+ @Override
+ public GroupScan getPhysicalOperator() {
+ return groupScan;
+ }
+
+ @Override
+ protected boolean hasPluginGroupScan(RelNode node) {
+ return findGroupScan(node) instanceof DeltaGroupScan;
+ }
+
+ private int rexLiteralIntValue(RexLiteral offset) {
+ return ((BigDecimal) offset.getValue()).intValue();
+ }
+
+ private int getArtificialLimit(DrillLimitRelBase limit) {
+ return getArtificialLimit(limit.getFetch(), limit.getOffset());
+ }
+
+ private int getArtificialLimit(RexNode fetch, RexNode offset) {
+ int maxRows = -1;
+ if (fetch != null) {
+ maxRows = rexLiteralIntValue((RexLiteral) fetch);
+ if (offset != null) {
+ maxRows += rexLiteralIntValue((RexLiteral) offset);
+ }
+ }
+ return maxRows;
+ }
+
+ private static class FilterFinder extends RelShuttleImpl {
+ private RelNode filter;
+
+ @Override
+ public RelNode visit(LogicalFilter filter) {
+ this.filter = filter;
+ return filter;
+ }
+
+ @Override
+ public RelNode visit(RelNode other) {
+ if (other instanceof Filter) {
+ this.filter = other;
+ return other;
+ } else if (other instanceof RelSubset) {
+ RelSubset relSubset = (RelSubset) other;
+ Util.first(relSubset.getBest(), relSubset.getOriginal()).accept(this);
+ }
+ return super.visit(other);
+ }
+
+ public RelNode getFilter() {
+ return filter;
+ }
+ }
+
+ private static class FirstLimitFinder extends RelShuttleImpl {
+ private RexNode fetch;
+
+ private RexNode offset;
+
+ @Override
+ public RelNode visit(RelNode other) {
+ if (other instanceof DrillLimitRelBase) {
+ DrillLimitRelBase limitRelBase = (DrillLimitRelBase) other;
+ fetch = limitRelBase.getFetch();
+ offset = limitRelBase.getOffset();
+ return other;
+ } else if (other instanceof RelSubset) {
+ RelSubset relSubset = (RelSubset) other;
+ Util.first(relSubset.getBest(), relSubset.getOriginal()).accept(this);
+ }
+ return super.visit(other);
+ }
+
+ public RexNode getFetch() {
+ return fetch;
+ }
+
+ public RexNode getOffset() {
+ return offset;
+ }
+ }
+}
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DrillExprToDeltaTranslator.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DrillExprToDeltaTranslator.java
new file mode 100644
index 0000000..bd95bde
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DrillExprToDeltaTranslator.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.plan;
+
+import io.delta.standalone.expressions.And;
+import io.delta.standalone.expressions.EqualTo;
+import io.delta.standalone.expressions.Expression;
+import io.delta.standalone.expressions.GreaterThan;
+import io.delta.standalone.expressions.GreaterThanOrEqual;
+import io.delta.standalone.expressions.IsNotNull;
+import io.delta.standalone.expressions.IsNull;
+import io.delta.standalone.expressions.LessThan;
+import io.delta.standalone.expressions.LessThanOrEqual;
+import io.delta.standalone.expressions.Literal;
+import io.delta.standalone.expressions.Not;
+import io.delta.standalone.expressions.Or;
+import io.delta.standalone.expressions.Predicate;
+import io.delta.standalone.types.StructType;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+public class DrillExprToDeltaTranslator extends AbstractExprVisitor<Expression, Void, RuntimeException> {
+
+ private final StructType structType;
+
+ public DrillExprToDeltaTranslator(StructType structType) {
+ this.structType = structType;
+ }
+
+ @Override
+ public Expression visitFunctionCall(FunctionCall call, Void value) {
+ try {
+ return visitFunctionCall(call);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private Predicate visitFunctionCall(FunctionCall call) {
+ switch (call.getName()) {
+ case FunctionNames.AND: {
+ Expression left = call.arg(0).accept(this, null);
+ Expression right = call.arg(1).accept(this, null);
+ if (left != null && right != null) {
+ return new And(left, right);
+ }
+ return null;
+ }
+ case FunctionNames.OR: {
+ Expression left = call.arg(0).accept(this, null);
+ Expression right = call.arg(1).accept(this, null);
+ if (left != null && right != null) {
+ return new Or(left, right);
+ }
+ return null;
+ }
+ case FunctionNames.NOT: {
+ Expression expression = call.arg(0).accept(this, null);
+ if (expression != null) {
+ return new Not(expression);
+ }
+ return null;
+ }
+ case FunctionNames.IS_NULL: {
+ LogicalExpression arg = call.arg(0);
+ if (arg instanceof SchemaPath) {
+ String name = getPath((SchemaPath) arg);
+ return new IsNull(structType.column(name));
+ }
+ return null;
+ }
+ case FunctionNames.IS_NOT_NULL: {
+ LogicalExpression arg = call.arg(0);
+ if (arg instanceof SchemaPath) {
+ String name = getPath((SchemaPath) arg);
+ return new IsNotNull(structType.column(name));
+ }
+ return null;
+ }
+ case FunctionNames.LT: {
+ LogicalExpression nameRef = call.arg(0);
+ Expression expression = call.arg(1).accept(this, null);
+ if (nameRef instanceof SchemaPath) {
+ String name = getPath((SchemaPath) nameRef);
+ return new LessThan(structType.column(name), expression);
+ }
+ return null;
+ }
+ case FunctionNames.LE: {
+ LogicalExpression nameRef = call.arg(0);
+ Expression expression = call.arg(1).accept(this, null);
+ if (nameRef instanceof SchemaPath) {
+ String name = getPath((SchemaPath) nameRef);
+ return new LessThanOrEqual(structType.column(name), expression);
+ }
+ return null;
+ }
+ case FunctionNames.GT: {
+ LogicalExpression nameRef = call.args().get(0);
+ Expression expression = call.args().get(1).accept(this, null);
+ if (nameRef instanceof SchemaPath) {
+ String name = getPath((SchemaPath) nameRef);
+ return new GreaterThan(structType.column(name), expression);
+ }
+ return null;
+ }
+ case FunctionNames.GE: {
+ LogicalExpression nameRef = call.args().get(0);
+ Expression expression = call.args().get(0).accept(this, null);
+ if (nameRef instanceof SchemaPath) {
+ String name = getPath((SchemaPath) nameRef);
+ return new GreaterThanOrEqual(structType.column(name), expression);
+ }
+ return null;
+ }
+ case FunctionNames.EQ: {
+ LogicalExpression nameRef = call.args().get(0);
+ Expression expression = call.args().get(1).accept(this, null);
+ if (nameRef instanceof SchemaPath) {
+ String name = getPath((SchemaPath) nameRef);
+ return new EqualTo(structType.column(name), expression);
+ }
+ return null;
+ }
+ case FunctionNames.NE: {
+ LogicalExpression nameRef = call.args().get(0);
+ Expression expression = call.args().get(1).accept(this, null);
+ if (nameRef instanceof SchemaPath) {
+ String name = getPath((SchemaPath) nameRef);
+ return new Not(new EqualTo(structType.column(name), expression));
+ }
+ return null;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public Expression visitFloatConstant(ValueExpressions.FloatExpression fExpr, Void value) {
+ return Literal.of(fExpr.getFloat());
+ }
+
+ @Override
+ public Expression visitIntConstant(ValueExpressions.IntExpression intExpr, Void value) {
+ return Literal.of(intExpr.getInt());
+ }
+
+ @Override
+ public Expression visitLongConstant(ValueExpressions.LongExpression longExpr, Void value) {
+ return Literal.of(longExpr.getLong());
+ }
+
+ @Override
+ public Expression visitDecimal9Constant(ValueExpressions.Decimal9Expression decExpr, Void value) {
+ return Literal.of(decExpr.getIntFromDecimal());
+ }
+
+ @Override
+ public Expression visitDecimal18Constant(ValueExpressions.Decimal18Expression decExpr, Void value) {
+ return Literal.of(decExpr.getLongFromDecimal());
+ }
+
+ @Override
+ public Expression visitDecimal28Constant(ValueExpressions.Decimal28Expression decExpr, Void value) {
+ return Literal.of(decExpr.getBigDecimal());
+ }
+
+ @Override
+ public Expression visitDecimal38Constant(ValueExpressions.Decimal38Expression decExpr, Void value) {
+ return Literal.of(decExpr.getBigDecimal());
+ }
+
+ @Override
+ public Expression visitVarDecimalConstant(ValueExpressions.VarDecimalExpression decExpr, Void value) {
+ return Literal.of(decExpr.getBigDecimal());
+ }
+
+ @Override
+ public Expression visitDateConstant(ValueExpressions.DateExpression dateExpr, Void value) {
+ return Literal.of(dateExpr.getDate());
+ }
+
+ @Override
+ public Expression visitTimeConstant(ValueExpressions.TimeExpression timeExpr, Void value) {
+ return Literal.of(timeExpr.getTime());
+ }
+
+ @Override
+ public Expression visitTimeStampConstant(ValueExpressions.TimeStampExpression timestampExpr, Void value) {
+ return Literal.of(timestampExpr.getTimeStamp());
+ }
+
+ @Override
+ public Expression visitDoubleConstant(ValueExpressions.DoubleExpression dExpr, Void value) {
+ return Literal.of(dExpr.getDouble());
+ }
+
+ @Override
+ public Expression visitBooleanConstant(ValueExpressions.BooleanExpression e, Void value) {
+ return Literal.of(e.getBoolean());
+ }
+
+ @Override
+ public Expression visitQuotedStringConstant(ValueExpressions.QuotedString e, Void value) {
+ return Literal.of(e.getString());
+ }
+
+ @Override
+ public Expression visitUnknown(LogicalExpression e, Void value) {
+ return null;
+ }
+
+ private static String getPath(SchemaPath schemaPath) {
+ StringBuilder sb = new StringBuilder();
+ PathSegment segment = schemaPath.getRootSegment();
+ sb.append(segment.getNameSegment().getPath());
+
+ while ((segment = segment.getChild()) != null) {
+ sb.append('.')
+ .append(segment.isNamed()
+ ? segment.getNameSegment().getPath()
+ : "element");
+ }
+ return sb.toString();
+ }
+}
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/read/DeltaScanBatchCreator.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/read/DeltaScanBatchCreator.java
new file mode 100644
index 0000000..78cac37
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/read/DeltaScanBatchCreator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.read;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.delta.DeltaRowGroupScan;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.AbstractParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.AbstractParquetScanBatchCreator;
+import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("unused")
+public class DeltaScanBatchCreator extends AbstractParquetScanBatchCreator
+ implements BatchCreator<DeltaRowGroupScan> {
+
+ @Override
+ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DeltaRowGroupScan rowGroupScan,
+ List<RecordBatch> children) throws ExecutionSetupException {
+ Preconditions.checkArgument(children.isEmpty());
+ OperatorContext oContext = context.newOperatorContext(rowGroupScan);
+ return getBatch(context, rowGroupScan, oContext);
+ }
+
+ @Override
+ protected AbstractDrillFileSystemManager getDrillFileSystemCreator(
+ OperatorContext operatorContext, OptionManager optionManager) {
+ return new ParquetScanBatchCreator.ParquetDrillFileSystemManager(operatorContext,
+ optionManager.getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val);
+ }
+
+ @Override
+ protected Map<String, String> getImplicitValues(AbstractParquetRowGroupScan rowGroupScan,
+ ColumnExplorer columnExplorer, RowGroupReadEntry rowGroup, DrillFileSystem fs) {
+ return ((DeltaRowGroupScan) rowGroupScan).getPartitions(rowGroup);
+ }
+
+}
diff --git a/contrib/format-deltalake/src/main/resources/bootstrap-format-plugins.json b/contrib/format-deltalake/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 0000000..41d6f2f
--- /dev/null
+++ b/contrib/format-deltalake/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,20 @@
+{
+ "storage":{
+ "dfs": {
+ "type": "file",
+ "formats": {
+ "delta": {
+ "type": "delta"
+ }
+ }
+ },
+ "s3": {
+ "type": "file",
+ "formats": {
+ "delta": {
+ "type": "delta"
+ }
+ }
+ }
+ }
+}
diff --git a/contrib/format-deltalake/src/main/resources/drill-module.conf b/contrib/format-deltalake/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..d4f6639
--- /dev/null
+++ b/contrib/format-deltalake/src/main/resources/drill-module.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This file tells Drill to consider this module when class path scanning.
+# This file can also include any supplementary configuration information.
+# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.classpath.scanning: {
+ packages += "org.apache.drill.exec.store.delta"
+}
diff --git a/contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java b/contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java
new file mode 100644
index 0000000..e74b7d3
--- /dev/null
+++ b/contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPluginConfig;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
+import static org.junit.Assert.assertEquals;
+
+public class DeltaQueriesTest extends ClusterTest {
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
+ FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig();
+ Map<String, FormatPluginConfig> formats = new HashMap<>(pluginConfig.getFormats());
+ formats.put("delta", new DeltaFormatPluginConfig());
+ FileSystemConfig newPluginConfig = new FileSystemConfig(
+ pluginConfig.getConnection(),
+ pluginConfig.getConfig(),
+ pluginConfig.getWorkspaces(),
+ formats,
+ PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+ newPluginConfig.setEnabled(pluginConfig.isEnabled());
+ pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
+
+ dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-primitives"));
+ dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-partition-values"));
+ dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-nested-struct"));
+ }
+
+ @Test
+ public void testSerDe() throws Exception {
+ String plan = queryBuilder().sql("select * from dfs.`data-reader-partition-values`").explainJson();
+ long count = queryBuilder().physical(plan).run().recordCount();
+ assertEquals(3, count);
+ }
+
+ @Test
+ public void testAllPrimitives() throws Exception {
+ testBuilder()
+ .sqlQuery("select * from dfs.`data-reader-primitives`")
+ .ordered()
+ .baselineColumns("as_int", "as_long", "as_byte", "as_short", "as_boolean", "as_float",
+ "as_double", "as_string", "as_binary", "as_big_decimal")
+ .baselineValues(null, null, null, null, null, null, null, null, null, null)
+ .baselineValues(0, 0L, 0, 0, true, 0.0f, 0.0, "0", new byte[]{0, 0}, BigDecimal.valueOf(0))
+ .baselineValues(1, 1L, 1, 1, false, 1.0f, 1.0, "1", new byte[]{1, 1}, BigDecimal.valueOf(1))
+ .baselineValues(2, 2L, 2, 2, true, 2.0f, 2.0, "2", new byte[]{2, 2}, BigDecimal.valueOf(2))
+ .baselineValues(3, 3L, 3, 3, false, 3.0f, 3.0, "3", new byte[]{3, 3}, BigDecimal.valueOf(3))
+ .baselineValues(4, 4L, 4, 4, true, 4.0f, 4.0, "4", new byte[]{4, 4}, BigDecimal.valueOf(4))
+ .baselineValues(5, 5L, 5, 5, false, 5.0f, 5.0, "5", new byte[]{5, 5}, BigDecimal.valueOf(5))
+ .baselineValues(6, 6L, 6, 6, true, 6.0f, 6.0, "6", new byte[]{6, 6}, BigDecimal.valueOf(6))
+ .baselineValues(7, 7L, 7, 7, false, 7.0f, 7.0, "7", new byte[]{7, 7}, BigDecimal.valueOf(7))
+ .baselineValues(8, 8L, 8, 8, true, 8.0f, 8.0, "8", new byte[]{8, 8}, BigDecimal.valueOf(8))
+ .baselineValues(9, 9L, 9, 9, false, 9.0f, 9.0, "9", new byte[]{9, 9}, BigDecimal.valueOf(9))
+ .go();
+ }
+
+ @Test
+ public void testProjectingColumns() throws Exception {
+
+ String query = "select as_int, as_string from dfs.`data-reader-primitives`";
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include("columns=\\[`as_int`, `as_string`\\]")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("as_int", "as_string")
+ .baselineValues(null, null)
+ .baselineValues(0, "0")
+ .baselineValues(1, "1")
+ .baselineValues(2, "2")
+ .baselineValues(3, "3")
+ .baselineValues(4, "4")
+ .baselineValues(5, "5")
+ .baselineValues(6, "6")
+ .baselineValues(7, "7")
+ .baselineValues(8, "8")
+ .baselineValues(9, "9")
+ .go();
+ }
+
+ @Test
+ public void testProjectNestedColumn() throws Exception {
+ String query = "select t.a.ac.acb as acb, b from dfs.`data-reader-nested-struct` t";
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include("columns=\\[`a`.`ac`.`acb`, `b`\\]")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("acb", "b")
+ .baselineValues(0L, 0)
+ .baselineValues(1L, 1)
+ .baselineValues(2L, 2)
+ .baselineValues(3L, 3)
+ .baselineValues(4L, 4)
+ .baselineValues(5L, 5)
+ .baselineValues(6L, 6)
+ .baselineValues(7L, 7)
+ .baselineValues(8L, 8)
+ .baselineValues(9L, 9)
+ .go();
+ }
+
+ @Test
+ public void testPartitionPruning() throws Exception {
+ String query = "select as_int, as_string from dfs.`data-reader-partition-values` where as_long = 1";
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include("numFiles\\=1")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .baselineColumns("as_int", "as_string")
+ .baselineValues("1", "1")
+ .go();
+ }
+
+ @Test
+ public void testEmptyResults() throws Exception {
+ String query = "select as_int, as_string from dfs.`data-reader-partition-values` where as_long = 101";
+
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include("numFiles\\=1")
+ .match();
+
+ testBuilder()
+ .sqlQuery(query)
+ .ordered()
+ .expectsEmptyResultSet()
+ .go();
+ }
+
+ @Test
+ public void testLimit() throws Exception {
+ String query = "select as_int, as_string from dfs.`data-reader-partition-values` limit 1";
+
+ // Note that both of the following two limits are expected because this format plugin supports an "artificial" limit.
+ queryBuilder()
+ .sql(query)
+ .planMatcher()
+ .include("Limit\\(fetch\\=\\[1\\]\\)")
+ .include("limit\\=1")
+ .match();
+
+ long count = queryBuilder().sql(query).run().recordCount();
+ assertEquals(1, count);
+ }
+}
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet.crc b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet.crc
new file mode 100644
index 0000000..0dacf51
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet.crc
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet.crc b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet.crc
new file mode 100644
index 0000000..976c62f
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet.crc
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/_delta_log/00000000000000000000.json b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/_delta_log/00000000000000000000.json
new file mode 100644
index 0000000..4046b14
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/_delta_log/00000000000000000000.json
@@ -0,0 +1,5 @@
+{"commitInfo":{"timestamp":1603724040818,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}
+{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
+{"metaData":{"id":"975ef365-8dec-4bbf-ab88-264c10987001","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"a\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"aa\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ab\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ac\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"aca\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"acb\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"b\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1603724040747}}
+{"add":{"path":"part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet","partitionValues":{},"size":1432,"modificationTime":1603724040000,"dataChange":true}}
+{"add":{"path":"part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet","partitionValues":{},"size":1439,"modificationTime":1603724040000,"dataChange":true}}
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet
new file mode 100644
index 0000000..d1b8614
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet
new file mode 100644
index 0000000..b2114ea
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/_delta_log/00000000000000000000.json b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/_delta_log/00000000000000000000.json
new file mode 100644
index 0000000..c0cc5a3
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/_delta_log/00000000000000000000.json
@@ -0,0 +1,6 @@
+{"commitInfo":{"timestamp":1636147668568,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"as_int\",\"as_long\",\"as_byte\",\"as_short\",\"as_boolean\",\"as_float\",\"as_double\",\"as_string\",\"as_string_lit_null\",\"as_date\",\"as_timestamp\",\"as_big_decimal\"]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputBytes":"5832","numOutputRows":"3"}}}
+{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
+{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"as_int\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_long\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_byte\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_short\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_boolean\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_float\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_string\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_string_lit_null\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_timestamp\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_big_decimal\",\"type\":\"decimal(1,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_list_of_records\",\"type\":{\"type\":\"array\",\"elementType\":{\"type\":\"struct\",\"fields\":[{\"name\":\"val\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]},\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"as_nested_struct\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"aa\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ab\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ac\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"aca\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"acb\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["as_int","as_long","as_byte","as_short","as_boolean","as_float","as_double","as_string","as_string_lit_null","as_date","as_timestamp","as_big_decimal"],"configuration":{},"createdTime":1636147666386}}
+{"add":{"path":"as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08%2011%253A11%253A11/as_big_decimal=0/part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet","partitionValues":{"as_big_decimal":"0","as_int":"0","as_byte":"0","as_long":"0","as_date":"2021-09-08","as_string":"0","as_timestamp":"2021-09-08 11:11:11","as_float":"0.0","as_short":"0","as_boolean":"true","as_string_lit_null":"null","as_double":"0.0"},"size":1944,"modificationTime":1636147668000,"dataChange":true}}
+{"add":{"path":"as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/part-00001-9ee474eb-385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet","partitionValues":{"as_big_decimal":null,"as_int":null,"as_byte":null,"as_long":null,"as_date":null,"as_string":null,"as_timestamp":null,"as_float":null,"as_short":null,"as_boolean":null,"as_string_lit_null":null,"as_double":null},"size":1944,"modificationTime":1636147668000,"dataChange":true}}
+{"add":{"path":"as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08%2011%253A11%253A11/as_big_decimal=1/part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet","partitionValues":{"as_big_decimal":"1","as_int":"1","as_byte":"1","as_long":"1","as_date":"2021-09-08","as_string":"1","as_timestamp":"2021-09-08 11:11:11","as_float":"1.0","as_short":"1","as_boolean":"false","as_string_lit_null":"null","as_double":"1.0"},"size":1944,"modificationTime":1636147668000,"dataChange":true}}
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=0/.part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet.crc b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=0/.part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet.crc
new file mode 100644
index 0000000..0191061
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=0/.part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet.crc
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=0/part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=0/part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet
new file mode 100644
index 0000000..e4919ae
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=0/part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=1/.part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet.crc b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=1/.part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet.crc
new file mode 100644
index 0000000..b79ff09
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=1/.part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet.crc
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=1/part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=1/part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet
new file mode 100644
index 0000000..b67fcb7
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=1/part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/.part-00001-9ee474eb-385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet.crc b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/.part-00001-9ee474eb-385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet.crc
new file mode 100644
index 0000000..bf418f6
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/.part-00001-9ee474eb-385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet.crc
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/part-00001-9ee474eb-385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/part-00001-9ee474eb-385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet
new file mode 100644
index 0000000..4387e52
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/part-00001-9ee474eb-385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet.crc b/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet.crc
new file mode 100644
index 0000000..11f8928
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet.crc
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet.crc b/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet.crc
new file mode 100644
index 0000000..852ffc4
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet.crc
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-primitives/_delta_log/00000000000000000000.json b/contrib/format-deltalake/src/test/resources/data-reader-primitives/_delta_log/00000000000000000000.json
new file mode 100644
index 0000000..9c9a0d1
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-primitives/_delta_log/00000000000000000000.json
@@ -0,0 +1,5 @@
+{"commitInfo":{"timestamp":1607520163636,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"2","numOutputBytes":"5050","numOutputRows":"11"}}}
+{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
+{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"as_int\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_long\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_byte\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_short\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_boolean\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_float\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_string\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_binary\",\"type\":\"binary\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_big_decimal\",\"type\":\"decimal(1,0)\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1607520161353}}
+{"add":{"path":"part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet","partitionValues":{},"size":2482,"modificationTime":1607520163000,"dataChange":true}}
+{"add":{"path":"part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet","partitionValues":{},"size":2568,"modificationTime":1607520163000,"dataChange":true}}
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet b/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet
new file mode 100644
index 0000000..b0442b0
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet b/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet
new file mode 100644
index 0000000..745394c
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet
Binary files differ
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 6fd6925..e728da9 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -56,6 +56,8 @@
<module>format-xml</module>
<module>format-image</module>
<module>format-pcapng</module>
+ <module>format-iceberg</module>
+ <module>format-deltalake</module>
<module>storage-phoenix</module>
<module>storage-googlesheets</module>
<module>storage-hive</module>
@@ -69,7 +71,6 @@
<module>storage-druid</module>
<module>storage-elasticsearch</module>
<module>storage-cassandra</module>
- <module>format-iceberg</module>
</modules>
</project>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index ce39807..f0fb953 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -484,6 +484,11 @@
<artifactId>drill-iceberg-format</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.drill.contrib</groupId>
+ <artifactId>drill-deltalake-format</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</profile>
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 38d79f0..7708be4 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -69,6 +69,7 @@
<include>org.apache.drill.contrib:drill-udfs:jar</include>
<include>org.apache.drill.contrib:drill-druid-storage:jar</include>
<include>org.apache.drill.contrib:drill-iceberg-format:jar</include>
+ <include>org.apache.drill.contrib:drill-deltalake-format:jar</include>
</includes>
<outputDirectory>jars</outputDirectory>
<useProjectArtifact>false</useProjectArtifact>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
index 5379760..93bb04b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
@@ -31,11 +31,17 @@
import org.apache.drill.exec.expr.fn.DrillSimpleFuncHolder;
import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator;
import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
import org.apache.drill.exec.expr.holders.Float4Holder;
import org.apache.drill.exec.expr.holders.Float8Holder;
import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
import org.apache.drill.exec.expr.holders.TimeStampHolder;
import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.UdfUtilities;
+import org.apache.drill.exec.util.DecimalUtility;
import org.apache.drill.exec.vector.ValueHolderHelper;
import org.apache.drill.metastore.statistics.ColumnStatistics;
import org.apache.drill.metastore.statistics.ColumnStatisticsKind;
@@ -52,10 +58,13 @@
private final Map<SchemaPath, ColumnStatistics<?>> columnStatMap;
private final long rowCount;
+ private final UdfUtilities udfUtilities;
- public StatisticsProvider(Map<SchemaPath, ColumnStatistics<?>> columnStatMap, long rowCount) {
+ public StatisticsProvider(Map<SchemaPath, ColumnStatistics<?>> columnStatMap, long rowCount,
+ UdfUtilities udfUtilities) {
this.columnStatMap = columnStatMap;
this.rowCount = rowCount;
+ this.udfUtilities = udfUtilities;
}
public long getRowCount() {
@@ -154,12 +163,13 @@
private ColumnStatistics<?> evalCastFunc(FunctionHolderExpression holderExpr, ColumnStatistics<T> input) {
try {
- DrillSimpleFuncHolder funcHolder = (DrillSimpleFuncHolder) holderExpr.getHolder();
- DrillSimpleFunc interpreter = funcHolder.createInterpreter();
-
- ValueHolder minHolder;
- ValueHolder maxHolder;
+ T minValue = ComparisonPredicate.getMinValue(input);
+ T maxValue = ComparisonPredicate.getMaxValue(input);
+ if (minValue == null && maxValue == null) {
+ // no need to evaluate cast for null arguments
+ return input;
+ }
TypeProtos.MinorType srcType = holderExpr.args.get(0).getMajorType().getMinorType();
TypeProtos.MinorType destType = holderExpr.getMajorType().getMinorType();
@@ -171,26 +181,33 @@
return null; // cast func between srcType and destType is NOT allowed.
}
+ ValueHolder minHolder;
+ ValueHolder maxHolder;
+
switch (srcType) {
case INT :
- minHolder = ValueHolderHelper.getIntHolder((Integer) ComparisonPredicate.getMinValue(input));
- maxHolder = ValueHolderHelper.getIntHolder((Integer) ComparisonPredicate.getMaxValue(input));
+ minHolder = ValueHolderHelper.getIntHolder((Integer) minValue);
+ maxHolder = ValueHolderHelper.getIntHolder((Integer) maxValue);
break;
case BIGINT:
- minHolder = ValueHolderHelper.getBigIntHolder((Long) ComparisonPredicate.getMinValue(input));
- maxHolder = ValueHolderHelper.getBigIntHolder((Long) ComparisonPredicate.getMaxValue(input));
+ minHolder = ValueHolderHelper.getBigIntHolder((Long) minValue);
+ maxHolder = ValueHolderHelper.getBigIntHolder((Long) maxValue);
break;
case FLOAT4:
- minHolder = ValueHolderHelper.getFloat4Holder((Float) ComparisonPredicate.getMinValue(input));
- maxHolder = ValueHolderHelper.getFloat4Holder((Float) ComparisonPredicate.getMaxValue(input));
+ minHolder = ValueHolderHelper.getFloat4Holder((Float) minValue);
+ maxHolder = ValueHolderHelper.getFloat4Holder((Float) maxValue);
break;
case FLOAT8:
- minHolder = ValueHolderHelper.getFloat8Holder((Double) ComparisonPredicate.getMinValue(input));
- maxHolder = ValueHolderHelper.getFloat8Holder((Double) ComparisonPredicate.getMaxValue(input));
+ minHolder = ValueHolderHelper.getFloat8Holder((Double) minValue);
+ maxHolder = ValueHolderHelper.getFloat8Holder((Double) maxValue);
break;
case DATE:
- minHolder = ValueHolderHelper.getDateHolder((Long) ComparisonPredicate.getMinValue(input));
- maxHolder = ValueHolderHelper.getDateHolder((Long) ComparisonPredicate.getMaxValue(input));
+ minHolder = ValueHolderHelper.getDateHolder((Long) minValue);
+ maxHolder = ValueHolderHelper.getDateHolder((Long) maxValue);
+ break;
+ case VARCHAR:
+ minHolder = ValueHolderHelper.getVarCharHolder(udfUtilities.getManagedBuffer(), (String) minValue);
+ maxHolder = ValueHolderHelper.getVarCharHolder(udfUtilities.getManagedBuffer(), (String) maxValue);
break;
default:
return null;
@@ -199,10 +216,20 @@
ValueHolder[] args1 = {minHolder};
ValueHolder[] args2 = {maxHolder};
+ DrillSimpleFuncHolder funcHolder = (DrillSimpleFuncHolder) holderExpr.getHolder();
+
+ DrillSimpleFunc interpreter = funcHolder.createInterpreter();
+
ValueHolder minFuncHolder = InterpreterEvaluator.evaluateFunction(interpreter, args1, holderExpr.getName());
ValueHolder maxFuncHolder = InterpreterEvaluator.evaluateFunction(interpreter, args2, holderExpr.getName());
switch (destType) {
+ case BIT:
+ return StatisticsProvider.getColumnStatistics(
+ ((BitHolder) minFuncHolder).value,
+ ((BitHolder) maxFuncHolder).value,
+ ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
+ destType);
case INT:
return StatisticsProvider.getColumnStatistics(
((IntHolder) minFuncHolder).value,
@@ -227,12 +254,32 @@
((Float8Holder) maxFuncHolder).value,
ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
destType);
+ case DATE:
+ return StatisticsProvider.getColumnStatistics(
+ ((DateHolder) minFuncHolder).value,
+ ((DateHolder) maxFuncHolder).value,
+ ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
+ destType);
+ case TIME:
+ return StatisticsProvider.getColumnStatistics(
+ ((TimeHolder) minFuncHolder).value,
+ ((TimeHolder) maxFuncHolder).value,
+ ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
+ destType);
case TIMESTAMP:
return StatisticsProvider.getColumnStatistics(
((TimeStampHolder) minFuncHolder).value,
((TimeStampHolder) maxFuncHolder).value,
ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
destType);
+ case VARDECIMAL:
+ VarDecimalHolder minVarDecimalHolder = (VarDecimalHolder) minFuncHolder;
+ VarDecimalHolder maxVarDecimalHolder = (VarDecimalHolder) maxFuncHolder;
+ return StatisticsProvider.getColumnStatistics(
+ DecimalUtility.getBigDecimalFromDrillBuf(minVarDecimalHolder.buffer, minVarDecimalHolder.start, minVarDecimalHolder.scale, minVarDecimalHolder.precision),
+ DecimalUtility.getBigDecimalFromDrillBuf(maxVarDecimalHolder.buffer, maxVarDecimalHolder.start, maxVarDecimalHolder.scale, maxVarDecimalHolder.precision),
+ ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
+ destType);
default:
return null;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index a11f834..2a701ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -41,6 +41,7 @@
import org.apache.drill.exec.metastore.analyze.FileMetadataInfoCollector;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.ops.UdfUtilities;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -1076,25 +1077,21 @@
for (T metadata : metadataList) {
TupleMetadata schema = metadata.getSchema();
if (schema != null && !tableSchema.isEquivalent(schema)) {
+ schema = FixedReceiver.Builder.mergeSchemas(schema, tableSchema);
filterPredicate = getFilterPredicate(filterExpression, udfUtilities,
context, optionManager, true, true, schema);
}
Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = metadata.getColumnsStatistics();
// adds partition (dir) column statistics if it may be used during filter evaluation
- if (metadata instanceof LocationProvider && optionManager != null) {
- LocationProvider locationProvider = (LocationProvider) metadata;
- columnsStatistics = ParquetTableMetadataUtils.addImplicitColumnsStatistics(columnsStatistics,
- source.columns, source.getPartitionValues(locationProvider), optionManager,
- locationProvider.getPath(), source.supportsFileImplicitColumns());
- }
+ columnsStatistics = getImplicitColumnStatistics(optionManager, metadata, columnsStatistics);
if (source.getNonInterestingColumnsMetadata() != null) {
columnsStatistics.putAll(source.getNonInterestingColumnsMetadata().getColumnsStatistics());
}
RowsMatch match = FilterEvaluatorUtils.matches(filterPredicate,
columnsStatistics, TableStatisticsKind.ROW_COUNT.getValue(metadata),
- schema, schemaPathsInExpr);
+ schema, schemaPathsInExpr, udfUtilities);
if (match == RowsMatch.NONE) {
continue; // No file comply to the filter => drop the file
}
@@ -1109,6 +1106,17 @@
return qualifiedMetadata;
}
+ protected <T extends Metadata> Map<SchemaPath, ColumnStatistics<?>> getImplicitColumnStatistics(
+ OptionManager optionManager, T metadata, Map<SchemaPath, ColumnStatistics<?>> columnsStatistics) {
+ if (metadata instanceof LocationProvider && optionManager != null) {
+ LocationProvider locationProvider = (LocationProvider) metadata;
+ columnsStatistics = ParquetTableMetadataUtils.addImplicitColumnsStatistics(columnsStatistics,
+ source.columns, source.getPartitionValues(locationProvider), optionManager,
+ locationProvider.getPath(), source.supportsFileImplicitColumns());
+ }
+ return columnsStatistics;
+ }
+
protected abstract B self();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
index 09045f5..cefd727 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
@@ -127,6 +127,10 @@
this.plannerSettings = plannerSettings;
}
+ public UdfUtilities getUdfUtilities() {
+ return udfUtilities;
+ }
+
@Override
@SuppressWarnings("deprecation")
public void reduce(RexBuilder rexBuilder, List<RexNode> constExps, List<RexNode> reducedValues) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 3a3800d..cf1fd96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -663,7 +663,7 @@
AbstractParquetGroupScan abstractParquetGroupScan = (AbstractParquetGroupScan) source;
Map<Path, FileMetadata> filesToFilter = new HashMap<>(prunedFiles);
- if (!abstractParquetGroupScan.rowGroups.isEmpty()) {
+ if (!abstractParquetGroupScan.getRowGroupsMetadata().isEmpty()) {
prunedFiles.forEach((path, fileMetadata) -> {
if (abstractParquetGroupScan.rowGroups.get(path).size() == 1) {
omittedFiles.put(path, fileMetadata);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
index 460fa6c..52a5e4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
@@ -111,6 +111,10 @@
@JsonProperty
public TupleMetadata getSchema() { return schema; }
+ public boolean isImplicitColumn(SchemaPath path, String partitionColumnLabel) {
+ return path.toString().matches(partitionColumnLabel + "\\d+");
+ }
+
public abstract AbstractParquetRowGroupScan copy(List<SchemaPath> columns);
@JsonIgnore
public abstract Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index ffba7f2..f5c252c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -123,7 +123,7 @@
String partitionColumnLabel = context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
for (SchemaPath path : schemaPathsInExpr) {
if (rowGroupScan.supportsFileImplicitColumns() &&
- path.toString().matches(partitionColumnLabel+"\\d+")) {
+ rowGroupScan.isImplicitColumn(path, partitionColumnLabel)) {
continue; // skip implicit columns like dir0, dir1
}
columnsInExpr.add(SchemaPath.getSimplePath(path.getRootSegmentPath()));
@@ -210,7 +210,7 @@
rowGroupSchema);
}
- matchResult = FilterEvaluatorUtils.matches(filterPredicate, columnsStatistics, footerRowCount, rowGroupSchema, schemaPathsInExpr);
+ matchResult = FilterEvaluatorUtils.matches(filterPredicate, columnsStatistics, footerRowCount, rowGroupSchema, schemaPathsInExpr, context);
// collect logging info
long timeToRead = pruneTimer.elapsed(TimeUnit.MICROSECONDS);
@@ -343,10 +343,7 @@
reader.getClass().getSimpleName());
readers.add(reader);
- List<String> partitionValues = rowGroupScan.getPartitionValues(rowGroup);
- Map<String, String> implicitValues =
- columnExplorer.populateColumns(rowGroup.getPath(), partitionValues,
- rowGroupScan.supportsFileImplicitColumns(), fs, rowGroup.getRowGroupIndex(), rowGroup.getStart(), rowGroup.getLength());
+ Map<String, String> implicitValues = getImplicitValues(rowGroupScan, columnExplorer, rowGroup, fs);
implicitColumns.add(implicitValues);
if (implicitValues.size() > mapWithMaxColumns.size()) {
mapWithMaxColumns = implicitValues;
@@ -354,6 +351,12 @@
return mapWithMaxColumns;
}
+ protected Map<String, String> getImplicitValues(AbstractParquetRowGroupScan rowGroupScan, ColumnExplorer columnExplorer, RowGroupReadEntry rowGroup, DrillFileSystem fs) {
+ List<String> partitionValues = rowGroupScan.getPartitionValues(rowGroup);
+ return columnExplorer.populateColumns(rowGroup.getPath(), partitionValues,
+ rowGroupScan.supportsFileImplicitColumns(), fs, rowGroup.getRowGroupIndex(), rowGroup.getStart(), rowGroup.getLength());
+ }
+
protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager);
private ParquetMetadata readFooter(Configuration conf, Path path, ParquetReaderConfig readerConfig) throws IOException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
index 8d35b1d..2c0942d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
@@ -98,7 +98,7 @@
FilterPredicate<?> parquetPredicate = FilterBuilder.buildFilterPredicate(
materializedFilter, constantBoundaries, udfUtilities, true);
- return matches(parquetPredicate, columnsStatistics, rowCount, schema, schemaPathsInExpr);
+ return matches(parquetPredicate, columnsStatistics, rowCount, schema, schemaPathsInExpr, udfUtilities);
}
@SuppressWarnings("unchecked")
@@ -106,12 +106,13 @@
Map<SchemaPath, ColumnStatistics<?>> columnsStatistics,
long rowCount,
TupleMetadata fileMetadata,
- Set<SchemaPath> schemaPathsInExpr) {
+ Set<SchemaPath> schemaPathsInExpr,
+ UdfUtilities udfUtilities) {
if (parquetPredicate == null) {
return RowsMatch.SOME;
}
@SuppressWarnings("rawtypes")
- StatisticsProvider<T> rangeExprEvaluator = new StatisticsProvider(columnsStatistics, rowCount);
+ StatisticsProvider<T> rangeExprEvaluator = new StatisticsProvider(columnsStatistics, rowCount, udfUtilities);
RowsMatch rowsMatch = parquetPredicate.matches(rangeExprEvaluator);
if (rowsMatch == RowsMatch.ALL && isMetaNotApplicable(schemaPathsInExpr, fileMetadata)) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 8c91200..d6fade8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -51,12 +51,12 @@
/**
* Creates file system only if it was not created before, otherwise returns already created instance.
*/
- private class ParquetDrillFileSystemManager extends AbstractDrillFileSystemManager {
+ public static class ParquetDrillFileSystemManager extends AbstractDrillFileSystemManager {
private final boolean useAsyncPageReader;
private DrillFileSystem fs;
- ParquetDrillFileSystemManager(OperatorContext operatorContext, boolean useAsyncPageReader) {
+ public ParquetDrillFileSystemManager(OperatorContext operatorContext, boolean useAsyncPageReader) {
super(operatorContext);
this.useAsyncPageReader = useAsyncPageReader;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
index 06384e5..17572f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
@@ -143,6 +143,11 @@
return false;
}
+ @Override
+ public boolean artificialFilter() {
+ return false;
+ }
+
private UserException getUnsupported(String rel) {
return UserException.unsupportedError()
.message("Plugin implementor doesn't support push down for %s", rel)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
index f4589a6..7b54a6f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
@@ -93,4 +93,12 @@
* to ensure returning the correct rows number.
*/
boolean artificialLimit();
+
+ /**
+ * If the plugin doesn't support native filter pushdown,
+ * but the reader can prune the set of rows to read.
+ * In this case filter operator on top of the scan should be preserved
+ * to ensure returning the correct subset of rows.
+ */
+ boolean artificialFilter();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
index e5e02e7..8a2cf7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
@@ -24,6 +24,8 @@
import org.apache.drill.exec.store.plan.PluginImplementor;
import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import java.util.Collections;
+
/**
* The rule that converts provided filter operator to plugin-specific implementation.
*/
@@ -36,11 +38,15 @@
@Override
public RelNode convert(RelNode rel) {
Filter filter = (Filter) rel;
- return new PluginFilterRel(
- getOutConvention(),
- rel.getCluster(),
- filter.getTraitSet().replace(getOutConvention()),
- convert(filter.getInput(), filter.getTraitSet().replace(getOutConvention())),
- filter.getCondition());
+ PluginFilterRel pluginFilterRel = new PluginFilterRel(
+ getOutConvention(),
+ rel.getCluster(),
+ filter.getTraitSet().replace(getOutConvention()),
+ convert(filter.getInput(), filter.getTraitSet().replace(getOutConvention())),
+ filter.getCondition());
+ if (getPluginImplementor().artificialFilter()) {
+ return filter.copy(filter.getTraitSet(), Collections.singletonList(pluginFilterRel));
+ }
+ return pluginFilterRel;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
index a71060b..9df71ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
@@ -31,7 +31,7 @@
public class PluginJoinRule extends PluginConverterRule {
public PluginJoinRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
- super(Join.class, in, out, "PluginProjectRule", pluginImplementor);
+ super(Join.class, in, out, "PluginJoinRule", pluginImplementor);
}
@Override