DRILL-8353: Format plugin for Delta Lake (#2702)

diff --git a/contrib/format-deltalake/README.md b/contrib/format-deltalake/README.md
new file mode 100644
index 0000000..e4cbaef
--- /dev/null
+++ b/contrib/format-deltalake/README.md
@@ -0,0 +1,36 @@
+# Delta Lake format plugin
+
+This format plugin enables Drill to query Delta Lake tables.
+
+## Supported optimizations and features
+
+### Project pushdown
+
+This format plugin supports project and filter pushdown optimizations.
+
+For the case of project pushdown, only columns specified in the query will be read, even when they are nested columns.
+
+### Filter pushdown
+
+For the case of filter pushdown, all expressions supported by Delta Lake API will be pushed down, so only data that
+matches the filter expression will be read. Additionally, filtering logic for parquet files is enabled
+to allow pruning of parquet files that do not match the filter expression.
+
+## Configuration
+
+The format plugin has the following configuration options:
+
+- `type` - format plugin type, should be `'delta'`
+
+### Format config example:
+
+```json
+{
+  "type": "file",
+  "formats": {
+    "delta": {
+      "type": "delta"
+    }
+  }
+}
+```
diff --git a/contrib/format-deltalake/pom.xml b/contrib/format-deltalake/pom.xml
new file mode 100644
index 0000000..5f0e31f
--- /dev/null
+++ b/contrib/format-deltalake/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>drill-contrib-parent</artifactId>
+    <groupId>org.apache.drill.contrib</groupId>
+    <version>2.0.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>drill-deltalake-format</artifactId>
+
+  <name>Drill : Contrib : Format : Delta Lake</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.delta</groupId>
+      <artifactId>delta-storage</artifactId>
+      <version>2.1.0</version>
+    </dependency>
+    <dependency>
+      <groupId>io.delta</groupId>
+      <artifactId>delta-standalone_2.13</artifactId>
+      <version>0.5.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-avro</artifactId>
+      <version>${parquet.version}</version>
+    </dependency>
+
+    <!-- Test dependency -->
+    <dependency>
+      <groupId>org.apache.drill.exec</groupId>
+      <artifactId>drill-java-exec</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.drill</groupId>
+      <artifactId>drill-common</artifactId>
+      <classifier>tests</classifier>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaGroupScan.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaGroupScan.java
new file mode 100644
index 0000000..2130bea
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaGroupScan.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import io.delta.standalone.DeltaLog;
+import io.delta.standalone.DeltaScan;
+import io.delta.standalone.Snapshot;
+import io.delta.standalone.actions.AddFile;
+import io.delta.standalone.data.CloseableIterator;
+import io.delta.standalone.types.ArrayType;
+import io.delta.standalone.types.BinaryType;
+import io.delta.standalone.types.BooleanType;
+import io.delta.standalone.types.ByteType;
+import io.delta.standalone.types.DataType;
+import io.delta.standalone.types.DateType;
+import io.delta.standalone.types.DecimalType;
+import io.delta.standalone.types.DoubleType;
+import io.delta.standalone.types.FloatType;
+import io.delta.standalone.types.IntegerType;
+import io.delta.standalone.types.LongType;
+import io.delta.standalone.types.ShortType;
+import io.delta.standalone.types.StringType;
+import io.delta.standalone.types.StructField;
+import io.delta.standalone.types.StructType;
+import io.delta.standalone.types.TimestampType;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.expr.StatisticsProvider;
+import org.apache.drill.exec.expr.fn.impl.DateUtility;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.metastore.store.FileSystemMetadataProviderManager;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.TupleBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPlugin;
+import org.apache.drill.exec.store.delta.plan.DrillExprToDeltaTranslator;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.metastore.metadata.LocationProvider;
+import org.apache.drill.metastore.metadata.Metadata;
+import org.apache.drill.metastore.statistics.ColumnStatistics;
+import org.apache.drill.metastore.statistics.TableStatisticsKind;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+@JsonTypeName("delta-scan")
+public class DeltaGroupScan extends AbstractParquetGroupScan {
+
+  private final DeltaFormatPlugin formatPlugin;
+
+  private final String path;
+
+  private final TupleMetadata schema;
+
+  private final LogicalExpression condition;
+
+  private final DrillFileSystem fs;
+
+  private List<AddFile> addFiles;
+
+  private List<EndpointAffinity> endpointAffinities;
+
+  private final Map<Path, Map<String, String>> partitionHolder;
+
+  @JsonCreator
+  public DeltaGroupScan(
+    @JsonProperty("userName") String userName,
+    @JsonProperty("entries") List<ReadEntryWithPath> entries,
+    @JsonProperty("storage") StoragePluginConfig storageConfig,
+    @JsonProperty("format") FormatPluginConfig formatConfig,
+    @JsonProperty("columns") List<SchemaPath> columns,
+    @JsonProperty("schema") TupleMetadata schema,
+    @JsonProperty("path") String path,
+    @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
+    @JsonProperty("condition") LogicalExpression condition,
+    @JsonProperty("limit") Integer limit,
+    @JsonProperty("partitionHolder") Map<Path, Map<String, String>> partitionHolder,
+    @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException {
+    super(ImpersonationUtil.resolveUserName(userName), columns, entries, readerConfig, condition);
+    this.formatPlugin = pluginRegistry.resolveFormat(storageConfig, formatConfig, DeltaFormatPlugin.class);
+    this.columns = columns;
+    this.path = path;
+    this.schema = schema;
+    this.condition = condition;
+    this.limit = limit;
+    this.fs = ImpersonationUtil.createFileSystem(
+      ImpersonationUtil.resolveUserName(userName), formatPlugin.getFsConf());
+
+    DeltaParquetTableMetadataProvider metadataProvider =
+      defaultTableMetadataProviderBuilder(new FileSystemMetadataProviderManager())
+        .withEntries(entries)
+        .withFormatPlugin(formatPlugin)
+        .withReaderConfig(readerConfig)
+        .withSchema(schema)
+        .build();
+
+    this.metadataProvider = metadataProvider;
+    this.entries = metadataProvider.getEntries();
+    this.partitionHolder = partitionHolder;
+    this.fileSet = metadataProvider.getFileSet();
+
+    init();
+  }
+
+  private DeltaGroupScan(DeltaGroupScanBuilder builder) throws IOException {
+    super(ImpersonationUtil.resolveUserName(builder.userName), builder.columns,
+      builder.entries, builder.readerConfig, builder.condition);
+    this.formatPlugin = builder.formatPlugin;
+    this.columns = builder.columns;
+    this.path = builder.path;
+    this.schema = builder.schema;
+    this.condition = builder.condition;
+    this.limit = builder.limit;
+    this.fs = ImpersonationUtil.createFileSystem(
+      ImpersonationUtil.resolveUserName(userName), formatPlugin.getFsConf());
+
+    DeltaParquetTableMetadataProvider metadataProvider =
+      defaultTableMetadataProviderBuilder(new FileSystemMetadataProviderManager())
+        .withEntries(entries)
+        .withFormatPlugin(formatPlugin)
+        .withReaderConfig(readerConfig)
+        .withSchema(schema)
+        .build();
+
+    this.metadataProvider = metadataProvider;
+    this.entries = metadataProvider.getEntries();
+    this.partitionHolder = builder.partitionValues;
+    this.fileSet = metadataProvider.getFileSet();
+
+    init();
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   *
+   * @param that The DeltaGroupScan to clone
+   */
+  private DeltaGroupScan(DeltaGroupScan that) {
+    super(that);
+    this.columns = that.columns;
+    this.formatPlugin = that.formatPlugin;
+    this.path = that.path;
+    this.condition = that.condition;
+    this.schema = that.schema;
+    this.mappings = that.mappings;
+    this.fs = that.fs;
+    this.limit = that.limit;
+    this.addFiles = that.addFiles;
+    this.endpointAffinities = that.endpointAffinities;
+    this.partitionHolder = that.partitionHolder;
+  }
+
+  @Override
+  protected DeltaParquetTableMetadataProvider.Builder tableMetadataProviderBuilder(MetadataProviderManager source) {
+    return defaultTableMetadataProviderBuilder(source);
+  }
+
+  @Override
+  protected DeltaParquetTableMetadataProvider.Builder defaultTableMetadataProviderBuilder(MetadataProviderManager source) {
+    return new DeltaParquetTableMetadataProvider.Builder(source);
+  }
+
+  public static DeltaGroupScanBuilder builder() {
+    return new DeltaGroupScanBuilder();
+  }
+
+  @Override
+  public DeltaGroupScan clone(List<SchemaPath> columns) {
+    try {
+      return toBuilder().columns(columns).build();
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  @Override
+  public DeltaGroupScan applyLimit(int maxRecords) {
+    DeltaGroupScan clone = new DeltaGroupScan(this);
+    clone.limit = maxRecords;
+    return clone;
+  }
+
+  @Override
+  public SubScan getSpecificScan(int minorFragmentId) {
+    List<RowGroupReadEntry> readEntries = getReadEntries(minorFragmentId);
+    Map<Path, Map<String, String>> subPartitionHolder = new HashMap<>();
+    for (RowGroupReadEntry readEntry : readEntries) {
+      Map<String, String> values = partitionHolder.get(readEntry.getPath());
+      subPartitionHolder.put(readEntry.getPath(), values);
+    }
+    return new DeltaRowGroupScan(getUserName(), formatPlugin, readEntries, columns, subPartitionHolder,
+      readerConfig, filter, getTableMetadata().getSchema());
+  }
+
+  @Override
+  public DeltaGroupScan clone(FileSelection selection) throws IOException {
+    DeltaGroupScan newScan = new DeltaGroupScan(this);
+    newScan.modifyFileSelection(selection);
+    newScan.init();
+    return newScan;
+  }
+
+  @Override
+  protected RowGroupScanFilterer<?> getFilterer() {
+    return new DeltaParquetScanFilterer(this);
+  }
+
+  @Override
+  protected Collection<DrillbitEndpoint> getDrillbits() {
+    return formatPlugin.getContext().getBits();
+  }
+
+  @Override
+  protected AbstractParquetGroupScan cloneWithFileSelection(Collection<Path> filePaths) throws IOException {
+    FileSelection newSelection = new FileSelection(null, new ArrayList<>(filePaths), null, null, false);
+    return clone(newSelection);
+  }
+
+  @Override
+  protected boolean supportsFileImplicitColumns() {
+    // current group scan should populate directory partition values
+    return false;
+  }
+
+  @Override
+  protected List<String> getPartitionValues(LocationProvider locationProvider) {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new DeltaGroupScan(this);
+  }
+
+  @Override
+  public boolean supportsLimitPushdown() {
+    return false;
+  }
+
+  @JsonProperty("schema")
+  public TupleMetadata getSchema() {
+    return schema;
+  }
+
+  @JsonProperty("storage")
+  public StoragePluginConfig getStorageConfig() {
+    return formatPlugin.getStorageConfig();
+  }
+
+  @JsonProperty("format")
+  public FormatPluginConfig getFormatConfig() {
+    return formatPlugin.getConfig();
+  }
+
+  @JsonProperty("path")
+  public String getPath() {
+    return path;
+  }
+
+  @JsonProperty("condition")
+  public LogicalExpression getCondition() {
+    return condition;
+  }
+
+  @JsonProperty("partitionHolder")
+  public Map<Path, Map<String, String>> getPartitionHolder() {
+    return partitionHolder;
+  }
+
+  @Override
+  public String toString() {
+    return new PlanStringBuilder(this)
+      .field("path", path)
+      .field("entries", entries)
+      .field("schema", schema)
+      .field("columns", columns)
+      .field("addFiles", addFiles)
+      .field("limit", limit)
+      .field("numFiles", getEntries().size())
+      .toString();
+  }
+
+  public DeltaGroupScanBuilder toBuilder() {
+    return new DeltaGroupScanBuilder()
+      .userName(this.userName)
+      .formatPlugin(this.formatPlugin)
+      .schema(this.schema)
+      .path(this.path)
+      .condition(this.condition)
+      .columns(this.columns)
+      .limit(this.limit);
+  }
+
+  private static class DeltaParquetScanFilterer extends RowGroupScanFilterer<DeltaParquetScanFilterer> {
+
+    public DeltaParquetScanFilterer(DeltaGroupScan source) {
+      super(source);
+    }
+
+    @Override
+    protected AbstractParquetGroupScan getNewScan() {
+      return new DeltaGroupScan((DeltaGroupScan) source);
+    }
+
+    @Override
+    protected DeltaParquetScanFilterer self() {
+      return this;
+    }
+
+    @Override
+    protected  <T extends Metadata> Map<SchemaPath, ColumnStatistics<?>> getImplicitColumnStatistics(
+      OptionManager optionManager, T metadata, Map<SchemaPath, ColumnStatistics<?>> columnsStatistics) {
+      if (metadata instanceof LocationProvider && optionManager != null) {
+        LocationProvider locationProvider = (LocationProvider) metadata;
+        columnsStatistics = new HashMap<>(columnsStatistics);
+        Map<String, String> partitions =
+          ((DeltaGroupScan) source).getPartitionHolder().get(locationProvider.getPath());
+        for (Map.Entry<String, String> partitionValue : partitions.entrySet()) {
+          TypeProtos.MinorType minorType =
+            tableSchema.column(partitionValue.getKey()).getType().getMinorType();
+          String value = partitionValue.getValue();
+          if (value != null) {
+            columnsStatistics.put(SchemaPath.getCompoundPath(partitionValue.getKey()),
+              StatisticsProvider.getConstantColumnStatistics(
+                castPartitionValue(value, minorType), minorType));
+          } else {
+            Long rowCount = TableStatisticsKind.ROW_COUNT.getValue(metadata);
+            columnsStatistics.put(SchemaPath.getCompoundPath(partitionValue.getKey()),
+              StatisticsProvider.getColumnStatistics(null, null, rowCount, minorType));
+          }
+        }
+      }
+
+      return columnsStatistics;
+    }
+
+    private Object castPartitionValue(String value, TypeProtos.MinorType type) {
+      switch (type) {
+      case BIT:
+        return Boolean.parseBoolean(value);
+      case TINYINT:
+        return Byte.parseByte(value);
+      case SMALLINT:
+        return Short.parseShort(value);
+      case INT:
+        return Integer.parseInt(value);
+      case BIGINT:
+        return Long.parseLong(value);
+      case FLOAT4:
+        return Float.parseFloat(value);
+      case FLOAT8:
+        return Double.parseDouble(value);
+      case DATE:
+        return DateUtility.parseLocalDate(value);
+      case TIME:
+        return DateUtility.parseLocalTime(value);
+      case TIMESTAMP:
+        return DateUtility.parseBest(value);
+      case VARCHAR:
+        return value;
+      case VARDECIMAL:
+        return new BigDecimal(value);
+      default:
+        throw new UnsupportedOperationException("Unsupported partition type: " + type);
+      }
+    }
+  }
+
+  public static class DeltaGroupScanBuilder {
+    private String userName;
+
+    private DeltaFormatPlugin formatPlugin;
+
+    private TupleMetadata schema;
+
+    private String path;
+
+    private LogicalExpression condition;
+
+    private List<SchemaPath> columns;
+
+    private int limit;
+
+    private List<ReadEntryWithPath> entries;
+
+    private ParquetReaderConfig readerConfig = ParquetReaderConfig.getDefaultInstance();
+
+    private Map<Path, Map<String, String>> partitionValues;
+
+    public DeltaGroupScanBuilder userName(String userName) {
+      this.userName = userName;
+      return this;
+    }
+
+    public DeltaGroupScanBuilder formatPlugin(DeltaFormatPlugin formatPlugin) {
+      this.formatPlugin = formatPlugin;
+      return this;
+    }
+
+    public DeltaGroupScanBuilder schema(TupleMetadata schema) {
+      this.schema = schema;
+      return this;
+    }
+
+    public DeltaGroupScanBuilder path(String path) {
+      this.path = path;
+      return this;
+    }
+
+    public DeltaGroupScanBuilder condition(LogicalExpression condition) {
+      this.condition = condition;
+      return this;
+    }
+
+    public DeltaGroupScanBuilder columns(List<SchemaPath> columns) {
+      this.columns = columns;
+      return this;
+    }
+
+    public DeltaGroupScanBuilder limit(int maxRecords) {
+      this.limit = maxRecords;
+      return this;
+    }
+
+    public DeltaGroupScanBuilder readerConfig(ParquetReaderConfig readerConfig) {
+      this.readerConfig = readerConfig;
+      return this;
+    }
+
+    public DeltaGroupScan build() throws IOException {
+      DeltaLog log = DeltaLog.forTable(formatPlugin.getFsConf(), path);
+      Snapshot snapshot = log.snapshot();
+      StructType structType = snapshot.getMetadata().getSchema();
+      schema = toSchema(structType);
+
+      DeltaScan scan = Optional.ofNullable(condition)
+        .map(c -> c.accept(new DrillExprToDeltaTranslator(structType), null))
+        .map(snapshot::scan)
+        .orElse(snapshot.scan());
+
+      try {
+        CloseableIterator<AddFile> files = scan.getFiles();
+        ArrayList<AddFile> addFiles = Lists.newArrayList(() -> files);
+        entries = addFiles.stream()
+          .map(addFile -> new ReadEntryWithPath(new Path(URI.create(path).getPath(), URI.create(addFile.getPath()).getPath())))
+          .collect(Collectors.toList());
+
+        partitionValues = addFiles.stream()
+          .collect(Collectors.toMap(
+            addFile -> new Path(URI.create(path).getPath(), URI.create(addFile.getPath()).getPath()),
+            addFile -> collectPartitionedValues(snapshot, addFile)));
+
+        files.close();
+      } catch (IOException e) {
+        throw new DrillRuntimeException(e);
+      }
+
+      return new DeltaGroupScan(this);
+    }
+
+    private Map<String, String> collectPartitionedValues(Snapshot snapshot, AddFile addFile) {
+      Map<String, String> partitionValues = new LinkedHashMap<>();
+      snapshot.getMetadata().getPartitionColumns().stream()
+        .map(col -> Pair.of(col, addFile.getPartitionValues().get(col)))
+        .forEach(pair -> partitionValues.put(pair.getKey(), pair.getValue()));
+      return partitionValues;
+    }
+
+    private TupleMetadata toSchema(StructType structType) {
+      TupleBuilder tupleBuilder = new TupleBuilder();
+      for (StructField field : structType.getFields()) {
+        tupleBuilder.addColumn(toColumnMetadata(field));
+      }
+
+      return tupleBuilder.schema();
+    }
+
+    private ColumnMetadata toColumnMetadata(StructField field) {
+      DataType dataType = field.getDataType();
+      if (dataType instanceof ArrayType) {
+        DataType elementType = ((ArrayType) dataType).getElementType();
+        if (elementType instanceof ArrayType) {
+          return MetadataUtils.newRepeatedList(field.getName(),
+            toColumnMetadata(new StructField(field.getName(), ((ArrayType) elementType).getElementType(), false)));
+        } else if (elementType instanceof StructType) {
+          return MetadataUtils.newMapArray(field.getName(), toSchema((StructType) elementType));
+        }
+        return MetadataUtils.newScalar(field.getName(), toMinorType(elementType), TypeProtos.DataMode.REPEATED);
+      } else if (dataType instanceof StructType) {
+        return MetadataUtils.newMap(field.getName(), toSchema((StructType) dataType));
+      } else {
+        return MetadataUtils.newScalar(field.getName(), toMinorType(field.getDataType()),
+          field.isNullable() ? TypeProtos.DataMode.OPTIONAL : TypeProtos.DataMode.REQUIRED);
+      }
+    }
+
+    private TypeProtos.MinorType toMinorType(DataType dataType) {
+      if (dataType instanceof BinaryType) {
+        return TypeProtos.MinorType.VARBINARY;
+      } else if (dataType instanceof BooleanType) {
+        return TypeProtos.MinorType.BIT;
+      } else if (dataType instanceof ByteType) {
+        return TypeProtos.MinorType.TINYINT;
+      } else if (dataType instanceof DateType) {
+        return TypeProtos.MinorType.DATE;
+      } else if (dataType instanceof DecimalType) {
+        return TypeProtos.MinorType.VARDECIMAL;
+      } else if (dataType instanceof DoubleType) {
+        return TypeProtos.MinorType.FLOAT8;
+      } else if (dataType instanceof FloatType) {
+        return TypeProtos.MinorType.FLOAT4;
+      } else if (dataType instanceof IntegerType) {
+        return TypeProtos.MinorType.INT;
+      } else if (dataType instanceof LongType) {
+        return TypeProtos.MinorType.BIGINT;
+      } else if (dataType instanceof ShortType) {
+        return TypeProtos.MinorType.SMALLINT;
+      } else if (dataType instanceof StringType) {
+        return TypeProtos.MinorType.VARCHAR;
+      } else if (dataType instanceof TimestampType) {
+        return TypeProtos.MinorType.TIMESTAMP;
+      } else {
+        throw new DrillRuntimeException("Unsupported data type: " + dataType);
+      }
+    }
+  }
+}
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaParquetTableMetadataProvider.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaParquetTableMetadataProvider.java
new file mode 100644
index 0000000..aeec5b0
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaParquetTableMetadataProvider.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta;
+
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPlugin;
+import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
+import org.apache.drill.exec.store.parquet.BaseParquetMetadataProvider;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * This is Metadata provider for Delta tables, which are read by Drill native Parquet reader
+ */
+public class DeltaParquetTableMetadataProvider extends BaseParquetMetadataProvider {
+
+  private final DeltaFormatPlugin deltaFormatPlugin;
+
+  private DeltaParquetTableMetadataProvider(Builder builder) throws IOException {
+    super(builder);
+
+    this.deltaFormatPlugin = builder.formatPlugin;
+
+    init((BaseParquetMetadataProvider) builder.metadataProviderManager().getTableMetadataProvider());
+  }
+
+  @Override
+  protected void initInternal() throws IOException {
+    Map<FileStatus, FileSystem> fileStatusConfMap = new LinkedHashMap<>();
+    for (ReadEntryWithPath entry : entries) {
+      Path path = entry.getPath();
+      FileSystem fs = path.getFileSystem(deltaFormatPlugin.getFsConf());
+      fileStatusConfMap.put(fs.getFileStatus(Path.getPathWithoutSchemeAndAuthority(path)), fs);
+    }
+    parquetTableMetadata = Metadata.getParquetTableMetadata(fileStatusConfMap, readerConfig);
+  }
+
+  public static class Builder extends BaseParquetMetadataProvider.Builder<Builder> {
+    private DeltaFormatPlugin formatPlugin;
+
+    public Builder(MetadataProviderManager source) {
+      super(source);
+    }
+
+    protected Builder withFormatPlugin(DeltaFormatPlugin formatPlugin) {
+      this.formatPlugin = formatPlugin;
+      return self();
+    }
+
+    @Override
+    protected Builder self() {
+      return this;
+    }
+
+    @Override
+    public DeltaParquetTableMetadataProvider build() throws IOException {
+      return new DeltaParquetTableMetadataProvider(this);
+    }
+  }
+}
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaRowGroupScan.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaRowGroupScan.java
new file mode 100644
index 0000000..5645c3f
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/DeltaRowGroupScan.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPlugin;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPluginConfig;
+import org.apache.drill.exec.store.parquet.AbstractParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@JsonTypeName("delta-row-group-scan")
+public class DeltaRowGroupScan extends AbstractParquetRowGroupScan {
+
+  public static final String OPERATOR_TYPE = "DELTA_ROW_GROUP_SCAN";
+
+  private final DeltaFormatPlugin formatPlugin;
+  private final DeltaFormatPluginConfig formatPluginConfig;
+  private final Map<Path, Map<String, String>> partitions;
+
+  @JsonCreator
+  public DeltaRowGroupScan(@JacksonInject StoragePluginRegistry registry,
+    @JsonProperty("userName") String userName,
+    @JsonProperty("storage") StoragePluginConfig storageConfig,
+    @JsonProperty("formatPluginConfig") FormatPluginConfig formatPluginConfig,
+    @JsonProperty("rowGroupReadEntries") List<RowGroupReadEntry> rowGroupReadEntries,
+    @JsonProperty("columns") List<SchemaPath> columns,
+    @JsonProperty("partitions") Map<Path, Map<String, String>> partitions,
+    @JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
+    @JsonProperty("filter") LogicalExpression filter,
+    @JsonProperty("schema") TupleMetadata schema) {
+    this(userName,
+      registry.resolveFormat(storageConfig, formatPluginConfig, DeltaFormatPlugin.class),
+      rowGroupReadEntries,
+      columns,
+      partitions,
+      readerConfig,
+      filter,
+      schema);
+  }
+
+  public DeltaRowGroupScan(String userName,
+    DeltaFormatPlugin formatPlugin,
+    List<RowGroupReadEntry> rowGroupReadEntries,
+    List<SchemaPath> columns,
+    Map<Path, Map<String, String>> partitions,
+    ParquetReaderConfig readerConfig,
+    LogicalExpression filter,
+    TupleMetadata schema) {
+    super(userName, rowGroupReadEntries, columns, readerConfig, filter,null, schema);
+    this.formatPlugin = formatPlugin;
+    this.formatPluginConfig = formatPlugin.getConfig();
+    this.partitions = partitions;
+  }
+
+  @JsonProperty
+  public DeltaFormatPluginConfig getFormatPluginConfig() {
+    return formatPluginConfig;
+  }
+
+  @JsonProperty
+  public Map<Path, Map<String, String>> getPartitions() {
+    return partitions;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    return new DeltaRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, partitions,
+      readerConfig, filter, schema);
+  }
+
+  @Override
+  public String getOperatorType() {
+    return OPERATOR_TYPE;
+  }
+
+  @Override
+  public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
+    return new DeltaRowGroupScan(getUserName(), formatPlugin, rowGroupReadEntries, columns, partitions,
+       readerConfig, filter, schema);
+  }
+
+  @Override
+  public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) {
+    return formatPlugin.getFsConf();
+  }
+
+  @Override
+  public boolean supportsFileImplicitColumns() {
+    return true;
+  }
+
+  @Override
+  public List<String> getPartitionValues(RowGroupReadEntry rowGroupReadEntry) {
+    return Collections.emptyList();
+  }
+
+  public Map<String, String> getPartitions(RowGroupReadEntry rowGroupReadEntry) {
+    return partitions.get(rowGroupReadEntry.getPath());
+  }
+
+  @Override
+  public boolean isImplicitColumn(SchemaPath path, String partitionColumnLabel) {
+    return partitions.values().stream()
+      .anyMatch(map -> map.containsKey(path.getAsUnescapedPath()));
+  }
+}
+
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatMatcher.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatMatcher.java
new file mode 100644
index 0000000..502a74d
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatMatcher.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import io.delta.standalone.DeltaLog;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.plan.rel.PluginDrillTable;
+import org.apache.hadoop.fs.FileStatus;
+
+public class DeltaFormatMatcher extends FormatMatcher {
+
+  private final DeltaFormatPlugin formatPlugin;
+
+  public DeltaFormatMatcher(DeltaFormatPlugin formatPlugin) {
+    this.formatPlugin = formatPlugin;
+  }
+
+  @Override
+  public boolean supportDirectoryReads() {
+    return true;
+  }
+
+  @Override
+  public DrillTable isReadable(DrillFileSystem fs, FileSelection selection, FileSystemPlugin fsPlugin,
+    String storageEngineName, SchemaConfig schemaConfig) {
+    if (DeltaLog.forTable(fsPlugin.getFsConf(), selection.getSelectionRoot()).tableExists()) {
+      FormatSelection formatSelection = new FormatSelection(formatPlugin.getConfig(), selection);
+      return new PluginDrillTable(fsPlugin, storageEngineName, schemaConfig.getUserName(), formatSelection, formatPlugin.getConvention());
+    }
+    return null;
+  }
+
+  @Override
+  public boolean isFileReadable(DrillFileSystem fs, FileStatus status) {
+    return false;
+  }
+
+  @Override
+  public FormatPlugin getFormatPlugin() {
+    return formatPlugin;
+  }
+
+  public int priority() {
+    return HIGH_PRIORITY;
+  }
+}
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPlugin.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPlugin.java
new file mode 100644
index 0000000..37e4803
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPlugin.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.metastore.MetadataProviderManager;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.AbstractWriter;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PlannerPhase;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.PluginRulesProviderImpl;
+import org.apache.drill.exec.store.StoragePluginRulesSupplier;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.store.dfs.FormatMatcher;
+import org.apache.drill.exec.store.dfs.FormatPlugin;
+import org.apache.drill.exec.store.delta.DeltaGroupScan;
+import org.apache.drill.exec.store.delta.plan.DeltaPluginImplementor;
+import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
+import org.apache.drill.exec.store.plan.rel.PluginRel;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DeltaFormatPlugin implements FormatPlugin {
+
+  private static final String DELTA_CONVENTION_PREFIX = "DELTA.";
+
+  /**
+   * Generator for format id values. Formats with the same name may be defined
+   * in multiple storage plugins, so using the unique id within the convention name
+   * to ensure the rule names will be unique for different plugin instances.
+   */
+  private static final AtomicInteger NEXT_ID = new AtomicInteger(0);
+
+  private final FileSystemConfig storageConfig;
+
+  private final DeltaFormatPluginConfig config;
+
+  private final Configuration fsConf;
+
+  private final DrillbitContext context;
+
+  private final String name;
+
+  private final DeltaFormatMatcher matcher;
+
+  private final StoragePluginRulesSupplier storagePluginRulesSupplier;
+
+  public DeltaFormatPlugin(
+    String name,
+    DrillbitContext context,
+    Configuration fsConf,
+    FileSystemConfig storageConfig,
+    DeltaFormatPluginConfig config) {
+    this.storageConfig = storageConfig;
+    this.config = config;
+    this.fsConf = fsConf;
+    this.context = context;
+    this.name = name;
+    this.matcher = new DeltaFormatMatcher(this);
+    this.storagePluginRulesSupplier = storagePluginRulesSupplier(name + NEXT_ID.getAndIncrement());
+  }
+
+  private static StoragePluginRulesSupplier storagePluginRulesSupplier(String name) {
+    Convention convention = new Convention.Impl(DELTA_CONVENTION_PREFIX + name, PluginRel.class);
+    return StoragePluginRulesSupplier.builder()
+      .rulesProvider(new PluginRulesProviderImpl(convention, DeltaPluginImplementor::new))
+      .supportsFilterPushdown(true)
+      .supportsProjectPushdown(true)
+      .supportsLimitPushdown(true)
+      .convention(convention)
+      .build();
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return false;
+  }
+
+  @Override
+  public boolean supportsAutoPartitioning() {
+    return false;
+  }
+
+  @Override
+  public FormatMatcher getMatcher() {
+    return matcher;
+  }
+
+  @Override
+  public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<? extends RelOptRule> getOptimizerRules(PlannerPhase phase) {
+    switch (phase) {
+      case PHYSICAL:
+      case LOGICAL:
+        return storagePluginRulesSupplier.getOptimizerRules();
+      case LOGICAL_PRUNE_AND_JOIN:
+      case LOGICAL_PRUNE:
+      case PARTITION_PRUNING:
+      case JOIN_PLANNING:
+      default:
+        return Collections.emptySet();
+    }
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) throws IOException {
+    return getGroupScan(userName, selection, columns, (OptionManager) null);
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns, OptionManager options) throws IOException {
+    return getGroupScan(userName, selection, columns, options, null);
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+    List<SchemaPath> columns, OptionManager options, MetadataProviderManager metadataProviderManager) throws IOException {
+    ParquetReaderConfig readerConfig = ParquetReaderConfig.builder()
+      .withConf(fsConf)
+      .withOptions(options)
+      .build();
+    return DeltaGroupScan.builder()
+      .userName(userName)
+      .formatPlugin(this)
+      .readerConfig(readerConfig)
+      .path(selection.selectionRoot.toUri().getPath())
+      .columns(columns)
+      .limit(-1)
+      .build();
+  }
+
+  @Override
+  public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
+    List<SchemaPath> columns, MetadataProviderManager metadataProviderManager) throws IOException {
+    SchemaProvider schemaProvider = metadataProviderManager.getSchemaProvider();
+    TupleMetadata schema = schemaProvider != null
+      ? schemaProvider.read().getSchema()
+      : null;
+    return DeltaGroupScan.builder()
+      .userName(userName)
+      .formatPlugin(this)
+      .readerConfig(ParquetReaderConfig.builder().withConf(fsConf).build())
+      .schema(schema)
+      .path(selection.selectionRoot.toUri().getPath())
+      .columns(columns)
+      .limit(-1)
+      .build();
+  }
+
+  @Override
+  public boolean supportsStatistics() {
+    return false;
+  }
+
+  @Override
+  public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
+    throw new UnsupportedOperationException("unimplemented");
+  }
+
+  @Override
+  public void writeStatistics(DrillStatsTable.TableStatistics statistics, FileSystem fs, Path statsTablePath) {
+    throw new UnsupportedOperationException("unimplemented");
+  }
+
+  @Override
+  public DeltaFormatPluginConfig getConfig() {
+    return config;
+  }
+
+  @Override
+  public FileSystemConfig getStorageConfig() {
+    return storageConfig;
+  }
+
+  @Override
+  public Configuration getFsConf() {
+    return fsConf;
+  }
+
+  @Override
+  public DrillbitContext getContext() {
+    return context;
+  }
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  public Convention getConvention() {
+    return storagePluginRulesSupplier.convention();
+  }
+
+}
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
new file mode 100644
index 0000000..9fd7bb7
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/format/DeltaFormatPluginConfig.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.format;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.FormatPluginConfig;
+
+@JsonTypeName(DeltaFormatPluginConfig.NAME)
+public class DeltaFormatPluginConfig implements FormatPluginConfig {
+
+  public static final String NAME = "delta";
+
+  @JsonCreator
+  public DeltaFormatPluginConfig() {
+  }
+}
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DeltaPluginImplementor.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DeltaPluginImplementor.java
new file mode 100644
index 0000000..84be68c
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DeltaPluginImplementor.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.plan;
+
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelShuttleImpl;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.Util;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.common.DrillLimitRelBase;
+import org.apache.drill.exec.planner.logical.DrillConstExecutor;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.store.StoragePlugin;
+import org.apache.drill.exec.store.delta.DeltaGroupScan;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.plan.AbstractPluginImplementor;
+import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
+import org.apache.drill.exec.store.plan.rel.PluginLimitRel;
+import org.apache.drill.exec.store.plan.rel.PluginProjectRel;
+import org.apache.drill.exec.store.plan.rel.StoragePluginTableScan;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class DeltaPluginImplementor extends AbstractPluginImplementor {
+
+  private DeltaGroupScan groupScan;
+
+  @Override
+  public void implement(StoragePluginTableScan scan) {
+    groupScan = (DeltaGroupScan) scan.getGroupScan();
+  }
+
+  @Override
+  public void implement(PluginFilterRel filter) throws IOException {
+    visitChild(filter.getInput());
+
+    RexNode condition = filter.getCondition();
+    LogicalExpression expression = DrillOptiq.toDrill(
+      new DrillParseContext(PrelUtil.getPlannerSettings(filter.getCluster().getPlanner())),
+      filter.getInput(),
+      condition);
+
+    DrillConstExecutor executor = (DrillConstExecutor) filter.getCluster().getPlanner().getExecutor();
+    PlannerSettings plannerSettings = filter.getCluster().getPlanner().getContext().unwrap(PlannerSettings.class);
+    groupScan = Optional.ofNullable((DeltaGroupScan) groupScan.applyFilter(expression, executor.getUdfUtilities(),
+      plannerSettings.functionImplementationRegistry, plannerSettings.getOptions()))
+      .orElse(groupScan);
+  }
+
+  @Override
+  public void implement(PluginProjectRel project) throws IOException {
+    visitChild(project.getInput());
+
+    DrillParseContext context = new DrillParseContext(PrelUtil.getPlannerSettings(project.getCluster().getPlanner()));
+    RelNode input = project.getInput();
+
+    List<SchemaPath> projects = project.getProjects().stream()
+      .map(e -> (SchemaPath) DrillOptiq.toDrill(context, input, e))
+      .collect(Collectors.toList());
+    groupScan = groupScan.clone(projects);
+  }
+
+  @Override
+  public boolean canImplement(Filter filter) {
+    FilterFinder filterFinder = new FilterFinder();
+    filter.getInput().accept(filterFinder);
+    return filterFinder.getFilter() == null;
+  }
+
+  @Override
+  public void implement(PluginLimitRel limit) throws IOException {
+    visitChild(limit.getInput());
+    int maxRecords = getArtificialLimit(limit);
+    if (maxRecords >= 0) {
+      groupScan = groupScan.applyLimit(maxRecords);
+    }
+  }
+
+  @Override
+  public boolean canImplement(DrillLimitRelBase limit) {
+    if (hasPluginGroupScan(limit)) {
+      FirstLimitFinder finder = new FirstLimitFinder();
+      limit.getInput().accept(finder);
+      int oldLimit = getArtificialLimit(finder.getFetch(), finder.getOffset());
+      int newLimit = getArtificialLimit(limit);
+      return newLimit >= 0 && (oldLimit < 0 || newLimit < oldLimit);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean artificialLimit() {
+    return true;
+  }
+
+  @Override
+  public boolean artificialFilter() {
+    return true;
+  }
+
+  @Override
+  protected Class<? extends StoragePlugin> supportedPlugin() {
+    return FileSystemPlugin.class;
+  }
+
+  @Override
+  public boolean splitProject(Project project) {
+    return true;
+  }
+
+  @Override
+  public boolean canImplement(Project project) {
+    return hasPluginGroupScan(project);
+  }
+
+  @Override
+  public GroupScan getPhysicalOperator() {
+    return groupScan;
+  }
+
+  @Override
+  protected boolean hasPluginGroupScan(RelNode node) {
+    return findGroupScan(node) instanceof DeltaGroupScan;
+  }
+
+  private int rexLiteralIntValue(RexLiteral offset) {
+    return ((BigDecimal) offset.getValue()).intValue();
+  }
+
+  private int getArtificialLimit(DrillLimitRelBase limit) {
+    return getArtificialLimit(limit.getFetch(), limit.getOffset());
+  }
+
+  private int getArtificialLimit(RexNode fetch, RexNode offset) {
+    int maxRows = -1;
+    if (fetch != null) {
+      maxRows = rexLiteralIntValue((RexLiteral) fetch);
+      if (offset != null) {
+        maxRows += rexLiteralIntValue((RexLiteral) offset);
+      }
+    }
+    return maxRows;
+  }
+
+  private static class FilterFinder extends RelShuttleImpl {
+    private RelNode filter;
+
+    @Override
+    public RelNode visit(LogicalFilter filter) {
+      this.filter = filter;
+      return filter;
+    }
+
+    @Override
+    public RelNode visit(RelNode other) {
+      if (other instanceof Filter) {
+        this.filter = other;
+        return other;
+      } else if (other instanceof RelSubset) {
+        RelSubset relSubset = (RelSubset) other;
+        Util.first(relSubset.getBest(), relSubset.getOriginal()).accept(this);
+      }
+      return super.visit(other);
+    }
+
+    public RelNode getFilter() {
+      return filter;
+    }
+  }
+
+  private static class FirstLimitFinder extends RelShuttleImpl {
+    private RexNode fetch;
+
+    private RexNode offset;
+
+    @Override
+    public RelNode visit(RelNode other) {
+      if (other instanceof DrillLimitRelBase) {
+        DrillLimitRelBase limitRelBase = (DrillLimitRelBase) other;
+        fetch = limitRelBase.getFetch();
+        offset = limitRelBase.getOffset();
+        return other;
+      } else if (other instanceof RelSubset) {
+        RelSubset relSubset = (RelSubset) other;
+        Util.first(relSubset.getBest(), relSubset.getOriginal()).accept(this);
+      }
+      return super.visit(other);
+    }
+
+    public RexNode getFetch() {
+      return fetch;
+    }
+
+    public RexNode getOffset() {
+      return offset;
+    }
+  }
+}
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DrillExprToDeltaTranslator.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DrillExprToDeltaTranslator.java
new file mode 100644
index 0000000..bd95bde
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/plan/DrillExprToDeltaTranslator.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.plan;
+
+import io.delta.standalone.expressions.And;
+import io.delta.standalone.expressions.EqualTo;
+import io.delta.standalone.expressions.Expression;
+import io.delta.standalone.expressions.GreaterThan;
+import io.delta.standalone.expressions.GreaterThanOrEqual;
+import io.delta.standalone.expressions.IsNotNull;
+import io.delta.standalone.expressions.IsNull;
+import io.delta.standalone.expressions.LessThan;
+import io.delta.standalone.expressions.LessThanOrEqual;
+import io.delta.standalone.expressions.Literal;
+import io.delta.standalone.expressions.Not;
+import io.delta.standalone.expressions.Or;
+import io.delta.standalone.expressions.Predicate;
+import io.delta.standalone.types.StructType;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+public class DrillExprToDeltaTranslator extends AbstractExprVisitor<Expression, Void, RuntimeException> {
+
+  private final StructType structType;
+
+  public DrillExprToDeltaTranslator(StructType structType) {
+    this.structType = structType;
+  }
+
+  @Override
+  public Expression visitFunctionCall(FunctionCall call, Void value) {
+    try {
+      return visitFunctionCall(call);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  private Predicate visitFunctionCall(FunctionCall call) {
+    switch (call.getName()) {
+      case FunctionNames.AND: {
+        Expression left = call.arg(0).accept(this, null);
+        Expression right = call.arg(1).accept(this, null);
+        if (left != null && right != null) {
+          return new And(left, right);
+        }
+        return null;
+      }
+      case FunctionNames.OR: {
+        Expression left = call.arg(0).accept(this, null);
+        Expression right = call.arg(1).accept(this, null);
+        if (left != null && right != null) {
+          return new Or(left, right);
+        }
+        return null;
+      }
+      case FunctionNames.NOT: {
+        Expression expression = call.arg(0).accept(this, null);
+        if (expression != null) {
+          return new Not(expression);
+        }
+        return null;
+      }
+      case FunctionNames.IS_NULL: {
+        LogicalExpression arg = call.arg(0);
+        if (arg instanceof SchemaPath) {
+          String name = getPath((SchemaPath) arg);
+          return new IsNull(structType.column(name));
+        }
+        return null;
+      }
+      case FunctionNames.IS_NOT_NULL: {
+        LogicalExpression arg = call.arg(0);
+        if (arg instanceof SchemaPath) {
+          String name = getPath((SchemaPath) arg);
+          return new IsNotNull(structType.column(name));
+        }
+        return null;
+      }
+      case FunctionNames.LT: {
+        LogicalExpression nameRef = call.arg(0);
+        Expression expression = call.arg(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new LessThan(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.LE: {
+        LogicalExpression nameRef = call.arg(0);
+        Expression expression = call.arg(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new LessThanOrEqual(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.GT: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new GreaterThan(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.GE: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(0).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new GreaterThanOrEqual(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.EQ: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new EqualTo(structType.column(name), expression);
+        }
+        return null;
+      }
+      case FunctionNames.NE: {
+        LogicalExpression nameRef = call.args().get(0);
+        Expression expression = call.args().get(1).accept(this, null);
+        if (nameRef instanceof SchemaPath) {
+          String name = getPath((SchemaPath) nameRef);
+          return new Not(new EqualTo(structType.column(name), expression));
+        }
+        return null;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public Expression visitFloatConstant(ValueExpressions.FloatExpression fExpr, Void value) {
+    return Literal.of(fExpr.getFloat());
+  }
+
+  @Override
+  public Expression visitIntConstant(ValueExpressions.IntExpression intExpr, Void value) {
+    return Literal.of(intExpr.getInt());
+  }
+
+  @Override
+  public Expression visitLongConstant(ValueExpressions.LongExpression longExpr, Void value) {
+    return Literal.of(longExpr.getLong());
+  }
+
+  @Override
+  public Expression visitDecimal9Constant(ValueExpressions.Decimal9Expression decExpr, Void value) {
+    return Literal.of(decExpr.getIntFromDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal18Constant(ValueExpressions.Decimal18Expression decExpr, Void value) {
+    return Literal.of(decExpr.getLongFromDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal28Constant(ValueExpressions.Decimal28Expression decExpr, Void value) {
+    return Literal.of(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitDecimal38Constant(ValueExpressions.Decimal38Expression decExpr, Void value) {
+    return Literal.of(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitVarDecimalConstant(ValueExpressions.VarDecimalExpression decExpr, Void value) {
+    return Literal.of(decExpr.getBigDecimal());
+  }
+
+  @Override
+  public Expression visitDateConstant(ValueExpressions.DateExpression dateExpr, Void value) {
+    return Literal.of(dateExpr.getDate());
+  }
+
+  @Override
+  public Expression visitTimeConstant(ValueExpressions.TimeExpression timeExpr, Void value) {
+    return Literal.of(timeExpr.getTime());
+  }
+
+  @Override
+  public Expression visitTimeStampConstant(ValueExpressions.TimeStampExpression timestampExpr, Void value) {
+    return Literal.of(timestampExpr.getTimeStamp());
+  }
+
+  @Override
+  public Expression visitDoubleConstant(ValueExpressions.DoubleExpression dExpr, Void value) {
+    return Literal.of(dExpr.getDouble());
+  }
+
+  @Override
+  public Expression visitBooleanConstant(ValueExpressions.BooleanExpression e, Void value) {
+    return Literal.of(e.getBoolean());
+  }
+
+  @Override
+  public Expression visitQuotedStringConstant(ValueExpressions.QuotedString e, Void value) {
+    return Literal.of(e.getString());
+  }
+
+  @Override
+  public Expression visitUnknown(LogicalExpression e, Void value) {
+    return null;
+  }
+
+  private static String getPath(SchemaPath schemaPath) {
+    StringBuilder sb = new StringBuilder();
+    PathSegment segment = schemaPath.getRootSegment();
+    sb.append(segment.getNameSegment().getPath());
+
+    while ((segment = segment.getChild()) != null) {
+      sb.append('.')
+        .append(segment.isNamed()
+          ? segment.getNameSegment().getPath()
+          : "element");
+    }
+    return sb.toString();
+  }
+}
diff --git a/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/read/DeltaScanBatchCreator.java b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/read/DeltaScanBatchCreator.java
new file mode 100644
index 0000000..78cac37
--- /dev/null
+++ b/contrib/format-deltalake/src/main/java/org/apache/drill/exec/store/delta/read/DeltaScanBatchCreator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta.read;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.store.delta.DeltaRowGroupScan;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.AbstractParquetRowGroupScan;
+import org.apache.drill.exec.store.parquet.AbstractParquetScanBatchCreator;
+import org.apache.drill.exec.store.parquet.ParquetScanBatchCreator;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+
+@SuppressWarnings("unused")
+public class DeltaScanBatchCreator extends AbstractParquetScanBatchCreator
+  implements BatchCreator<DeltaRowGroupScan> {
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, DeltaRowGroupScan rowGroupScan,
+    List<RecordBatch> children) throws ExecutionSetupException {
+    Preconditions.checkArgument(children.isEmpty());
+    OperatorContext oContext = context.newOperatorContext(rowGroupScan);
+    return getBatch(context, rowGroupScan, oContext);
+  }
+
+  @Override
+  protected AbstractDrillFileSystemManager getDrillFileSystemCreator(
+    OperatorContext operatorContext, OptionManager optionManager) {
+    return new ParquetScanBatchCreator.ParquetDrillFileSystemManager(operatorContext,
+      optionManager.getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val);
+  }
+
+  @Override
+  protected Map<String, String> getImplicitValues(AbstractParquetRowGroupScan rowGroupScan,
+    ColumnExplorer columnExplorer, RowGroupReadEntry rowGroup, DrillFileSystem fs) {
+    return ((DeltaRowGroupScan) rowGroupScan).getPartitions(rowGroup);
+  }
+
+}
diff --git a/contrib/format-deltalake/src/main/resources/bootstrap-format-plugins.json b/contrib/format-deltalake/src/main/resources/bootstrap-format-plugins.json
new file mode 100644
index 0000000..41d6f2f
--- /dev/null
+++ b/contrib/format-deltalake/src/main/resources/bootstrap-format-plugins.json
@@ -0,0 +1,20 @@
+{
+  "storage":{
+    "dfs": {
+      "type": "file",
+      "formats": {
+        "delta": {
+          "type": "delta"
+        }
+      }
+    },
+    "s3": {
+      "type": "file",
+      "formats": {
+        "delta": {
+          "type": "delta"
+        }
+      }
+    }
+  }
+}
diff --git a/contrib/format-deltalake/src/main/resources/drill-module.conf b/contrib/format-deltalake/src/main/resources/drill-module.conf
new file mode 100644
index 0000000..d4f6639
--- /dev/null
+++ b/contrib/format-deltalake/src/main/resources/drill-module.conf
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#  This file tells Drill to consider this module when class path scanning.
+#  This file can also include any supplementary configuration information.
+#  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.classpath.scanning: {
+  packages += "org.apache.drill.exec.store.delta"
+}
diff --git a/contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java b/contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java
new file mode 100644
index 0000000..e74b7d3
--- /dev/null
+++ b/contrib/format-deltalake/src/test/java/org/apache/drill/exec/store/delta/DeltaQueriesTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.delta;
+
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.delta.format.DeltaFormatPluginConfig;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.drill.exec.util.StoragePluginTestUtils.DFS_PLUGIN_NAME;
+import static org.junit.Assert.assertEquals;
+
+public class DeltaQueriesTest extends ClusterTest {
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    StoragePluginRegistry pluginRegistry = cluster.drillbit().getContext().getStorage();
+    FileSystemConfig pluginConfig = (FileSystemConfig) pluginRegistry.getPlugin(DFS_PLUGIN_NAME).getConfig();
+    Map<String, FormatPluginConfig> formats = new HashMap<>(pluginConfig.getFormats());
+    formats.put("delta", new DeltaFormatPluginConfig());
+    FileSystemConfig newPluginConfig = new FileSystemConfig(
+      pluginConfig.getConnection(),
+      pluginConfig.getConfig(),
+      pluginConfig.getWorkspaces(),
+      formats,
+      PlainCredentialsProvider.EMPTY_CREDENTIALS_PROVIDER);
+    newPluginConfig.setEnabled(pluginConfig.isEnabled());
+    pluginRegistry.put(DFS_PLUGIN_NAME, newPluginConfig);
+
+    dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-primitives"));
+    dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-partition-values"));
+    dirTestWatcher.copyResourceToRoot(Paths.get("data-reader-nested-struct"));
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String plan = queryBuilder().sql("select * from dfs.`data-reader-partition-values`").explainJson();
+    long count = queryBuilder().physical(plan).run().recordCount();
+    assertEquals(3, count);
+  }
+
+  @Test
+  public void testAllPrimitives() throws Exception {
+    testBuilder()
+      .sqlQuery("select * from dfs.`data-reader-primitives`")
+      .ordered()
+      .baselineColumns("as_int", "as_long", "as_byte", "as_short", "as_boolean", "as_float",
+        "as_double", "as_string", "as_binary", "as_big_decimal")
+      .baselineValues(null, null, null, null, null, null, null, null, null, null)
+      .baselineValues(0, 0L, 0, 0, true, 0.0f, 0.0, "0", new byte[]{0, 0}, BigDecimal.valueOf(0))
+      .baselineValues(1, 1L, 1, 1, false, 1.0f, 1.0, "1", new byte[]{1, 1}, BigDecimal.valueOf(1))
+      .baselineValues(2, 2L, 2, 2, true, 2.0f, 2.0, "2", new byte[]{2, 2}, BigDecimal.valueOf(2))
+      .baselineValues(3, 3L, 3, 3, false, 3.0f, 3.0, "3", new byte[]{3, 3}, BigDecimal.valueOf(3))
+      .baselineValues(4, 4L, 4, 4, true, 4.0f, 4.0, "4", new byte[]{4, 4}, BigDecimal.valueOf(4))
+      .baselineValues(5, 5L, 5, 5, false, 5.0f, 5.0, "5", new byte[]{5, 5}, BigDecimal.valueOf(5))
+      .baselineValues(6, 6L, 6, 6, true, 6.0f, 6.0, "6", new byte[]{6, 6}, BigDecimal.valueOf(6))
+      .baselineValues(7, 7L, 7, 7, false, 7.0f, 7.0, "7", new byte[]{7, 7}, BigDecimal.valueOf(7))
+      .baselineValues(8, 8L, 8, 8, true, 8.0f, 8.0, "8", new byte[]{8, 8}, BigDecimal.valueOf(8))
+      .baselineValues(9, 9L, 9, 9, false, 9.0f, 9.0, "9", new byte[]{9, 9}, BigDecimal.valueOf(9))
+      .go();
+  }
+
+  @Test
+  public void testProjectingColumns() throws Exception {
+
+    String query = "select as_int, as_string from dfs.`data-reader-primitives`";
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include("columns=\\[`as_int`, `as_string`\\]")
+      .match();
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("as_int", "as_string")
+      .baselineValues(null, null)
+      .baselineValues(0, "0")
+      .baselineValues(1, "1")
+      .baselineValues(2, "2")
+      .baselineValues(3, "3")
+      .baselineValues(4, "4")
+      .baselineValues(5, "5")
+      .baselineValues(6, "6")
+      .baselineValues(7, "7")
+      .baselineValues(8, "8")
+      .baselineValues(9, "9")
+      .go();
+  }
+
+  @Test
+  public void testProjectNestedColumn() throws Exception {
+    String query = "select t.a.ac.acb as acb, b from dfs.`data-reader-nested-struct` t";
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include("columns=\\[`a`.`ac`.`acb`, `b`\\]")
+      .match();
+
+    testBuilder()
+      .sqlQuery(query)
+      .unOrdered()
+      .baselineColumns("acb", "b")
+      .baselineValues(0L, 0)
+      .baselineValues(1L, 1)
+      .baselineValues(2L, 2)
+      .baselineValues(3L, 3)
+      .baselineValues(4L, 4)
+      .baselineValues(5L, 5)
+      .baselineValues(6L, 6)
+      .baselineValues(7L, 7)
+      .baselineValues(8L, 8)
+      .baselineValues(9L, 9)
+      .go();
+  }
+
+  @Test
+  public void testPartitionPruning() throws Exception {
+    String query = "select as_int, as_string from dfs.`data-reader-partition-values` where as_long = 1";
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include("numFiles\\=1")
+      .match();
+
+    testBuilder()
+      .sqlQuery(query)
+      .ordered()
+      .baselineColumns("as_int", "as_string")
+      .baselineValues("1", "1")
+      .go();
+  }
+
+  @Test
+  public void testEmptyResults() throws Exception {
+    String query = "select as_int, as_string from dfs.`data-reader-partition-values` where as_long = 101";
+
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include("numFiles\\=1")
+      .match();
+
+    testBuilder()
+      .sqlQuery(query)
+      .ordered()
+      .expectsEmptyResultSet()
+      .go();
+  }
+
+  @Test
+  public void testLimit() throws Exception {
+    String query = "select as_int, as_string from dfs.`data-reader-partition-values` limit 1";
+
+    // Note that both of the following two limits are expected because this format plugin supports an "artificial" limit.
+    queryBuilder()
+      .sql(query)
+      .planMatcher()
+      .include("Limit\\(fetch\\=\\[1\\]\\)")
+      .include("limit\\=1")
+      .match();
+
+    long count = queryBuilder().sql(query).run().recordCount();
+    assertEquals(1, count);
+  }
+}
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet.crc b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet.crc
new file mode 100644
index 0000000..0dacf51
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet.crc
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet.crc b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet.crc
new file mode 100644
index 0000000..976c62f
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/.part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet.crc
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/_delta_log/00000000000000000000.json b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/_delta_log/00000000000000000000.json
new file mode 100644
index 0000000..4046b14
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/_delta_log/00000000000000000000.json
@@ -0,0 +1,5 @@
+{"commitInfo":{"timestamp":1603724040818,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}
+{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
+{"metaData":{"id":"975ef365-8dec-4bbf-ab88-264c10987001","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"a\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"aa\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ab\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ac\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"aca\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"acb\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"b\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1603724040747}}
+{"add":{"path":"part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet","partitionValues":{},"size":1432,"modificationTime":1603724040000,"dataChange":true}}
+{"add":{"path":"part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet","partitionValues":{},"size":1439,"modificationTime":1603724040000,"dataChange":true}}
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet
new file mode 100644
index 0000000..d1b8614
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00000-f2547b28-9219-4628-8462-cc9c56edfebb-c000.snappy.parquet
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet
new file mode 100644
index 0000000..b2114ea
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-nested-struct/part-00001-0f755735-3b5b-449a-8f93-92a40d9f065d-c000.snappy.parquet
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/_delta_log/00000000000000000000.json b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/_delta_log/00000000000000000000.json
new file mode 100644
index 0000000..c0cc5a3
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/_delta_log/00000000000000000000.json
@@ -0,0 +1,6 @@
+{"commitInfo":{"timestamp":1636147668568,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"as_int\",\"as_long\",\"as_byte\",\"as_short\",\"as_boolean\",\"as_float\",\"as_double\",\"as_string\",\"as_string_lit_null\",\"as_date\",\"as_timestamp\",\"as_big_decimal\"]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputBytes":"5832","numOutputRows":"3"}}}
+{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
+{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"as_int\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_long\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_byte\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_short\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_boolean\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_float\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_string\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_string_lit_null\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_timestamp\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_big_decimal\",\"type\":\"decimal(1,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_list_of_records\",\"type\":{\"type\":\"array\",\"elementType\":{\"type\":\"struct\",\"fields\":[{\"name\":\"val\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]},\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"as_nested_struct\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"aa\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ab\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ac\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"aca\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"acb\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["as_int","as_long","as_byte","as_short","as_boolean","as_float","as_double","as_string","as_string_lit_null","as_date","as_timestamp","as_big_decimal"],"configuration":{},"createdTime":1636147666386}}
+{"add":{"path":"as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08%2011%253A11%253A11/as_big_decimal=0/part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet","partitionValues":{"as_big_decimal":"0","as_int":"0","as_byte":"0","as_long":"0","as_date":"2021-09-08","as_string":"0","as_timestamp":"2021-09-08 11:11:11","as_float":"0.0","as_short":"0","as_boolean":"true","as_string_lit_null":"null","as_double":"0.0"},"size":1944,"modificationTime":1636147668000,"dataChange":true}}
+{"add":{"path":"as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/part-00001-9ee474eb-385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet","partitionValues":{"as_big_decimal":null,"as_int":null,"as_byte":null,"as_long":null,"as_date":null,"as_string":null,"as_timestamp":null,"as_float":null,"as_short":null,"as_boolean":null,"as_string_lit_null":null,"as_double":null},"size":1944,"modificationTime":1636147668000,"dataChange":true}}
+{"add":{"path":"as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08%2011%253A11%253A11/as_big_decimal=1/part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet","partitionValues":{"as_big_decimal":"1","as_int":"1","as_byte":"1","as_long":"1","as_date":"2021-09-08","as_string":"1","as_timestamp":"2021-09-08 11:11:11","as_float":"1.0","as_short":"1","as_boolean":"false","as_string_lit_null":"null","as_double":"1.0"},"size":1944,"modificationTime":1636147668000,"dataChange":true}}
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=0/.part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet.crc b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=0/.part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet.crc
new file mode 100644
index 0000000..0191061
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=0/.part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet.crc
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=0/part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=0/part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet
new file mode 100644
index 0000000..e4919ae
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=0/as_long=0/as_byte=0/as_short=0/as_boolean=true/as_float=0.0/as_double=0.0/as_string=0/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=0/part-00000-b9dc86ae-0134-4363-bd87-19cfb3403e9a.c000.snappy.parquet
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=1/.part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet.crc b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=1/.part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet.crc
new file mode 100644
index 0000000..b79ff09
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=1/.part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet.crc
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=1/part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=1/part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet
new file mode 100644
index 0000000..b67fcb7
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=1/as_long=1/as_byte=1/as_short=1/as_boolean=false/as_float=1.0/as_double=1.0/as_string=1/as_string_lit_null=null/as_date=2021-09-08/as_timestamp=2021-09-08 11%3A11%3A11/as_big_decimal=1/part-00001-cb007d48-a9f5-40e7-adbe-60920680770f.c000.snappy.parquet
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/.part-00001-9ee474eb-385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet.crc b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/.part-00001-9ee474eb-385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet.crc
new file mode 100644
index 0000000..bf418f6
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/.part-00001-9ee474eb-385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet.crc
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/part-00001-9ee474eb-385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/part-00001-9ee474eb-385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet
new file mode 100644
index 0000000..4387e52
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-partition-values/as_int=__HIVE_DEFAULT_PARTITION__/as_long=__HIVE_DEFAULT_PARTITION__/as_byte=__HIVE_DEFAULT_PARTITION__/as_short=__HIVE_DEFAULT_PARTITION__/as_boolean=__HIVE_DEFAULT_PARTITION__/as_float=__HIVE_DEFAULT_PARTITION__/as_double=__HIVE_DEFAULT_PARTITION__/as_string=__HIVE_DEFAULT_PARTITION__/as_string_lit_null=__HIVE_DEFAULT_PARTITION__/as_date=__HIVE_DEFAULT_PARTITION__/as_timestamp=__HIVE_DEFAULT_PARTITION__/as_big_decimal=__HIVE_DEFAULT_PARTITION__/part-00001-9ee474eb-385b-43cf-9acb-0fbed63e011c.c000.snappy.parquet
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet.crc b/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet.crc
new file mode 100644
index 0000000..11f8928
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet.crc
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet.crc b/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet.crc
new file mode 100644
index 0000000..852ffc4
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-primitives/.part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet.crc
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-primitives/_delta_log/00000000000000000000.json b/contrib/format-deltalake/src/test/resources/data-reader-primitives/_delta_log/00000000000000000000.json
new file mode 100644
index 0000000..9c9a0d1
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-primitives/_delta_log/00000000000000000000.json
@@ -0,0 +1,5 @@
+{"commitInfo":{"timestamp":1607520163636,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"2","numOutputBytes":"5050","numOutputRows":"11"}}}
+{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
+{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"as_int\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_long\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_byte\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_short\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_boolean\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_float\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_string\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_binary\",\"type\":\"binary\",\"nullable\":true,\"metadata\":{}},{\"name\":\"as_big_decimal\",\"type\":\"decimal(1,0)\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1607520161353}}
+{"add":{"path":"part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet","partitionValues":{},"size":2482,"modificationTime":1607520163000,"dataChange":true}}
+{"add":{"path":"part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet","partitionValues":{},"size":2568,"modificationTime":1607520163000,"dataChange":true}}
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet b/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet
new file mode 100644
index 0000000..b0442b0
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00000-4f2f0b9f-50b3-4e7b-96a1-e2bb0f246b06-c000.snappy.parquet
Binary files differ
diff --git a/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet b/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet
new file mode 100644
index 0000000..745394c
--- /dev/null
+++ b/contrib/format-deltalake/src/test/resources/data-reader-primitives/part-00001-09e47b80-36c2-4475-a810-fbd8e7994971-c000.snappy.parquet
Binary files differ
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 6fd6925..e728da9 100644
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -56,6 +56,8 @@
     <module>format-xml</module>
     <module>format-image</module>
     <module>format-pcapng</module>
+    <module>format-iceberg</module>
+    <module>format-deltalake</module>
     <module>storage-phoenix</module>
     <module>storage-googlesheets</module>
     <module>storage-hive</module>
@@ -69,7 +71,6 @@
     <module>storage-druid</module>
     <module>storage-elasticsearch</module>
     <module>storage-cassandra</module>
-    <module>format-iceberg</module>
   </modules>
 
 </project>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index ce39807..f0fb953 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -484,6 +484,11 @@
           <artifactId>drill-iceberg-format</artifactId>
           <version>${project.version}</version>
         </dependency>
+        <dependency>
+          <groupId>org.apache.drill.contrib</groupId>
+          <artifactId>drill-deltalake-format</artifactId>
+          <version>${project.version}</version>
+        </dependency>
       </dependencies>
     </profile>
 
diff --git a/distribution/src/assemble/component.xml b/distribution/src/assemble/component.xml
index 38d79f0..7708be4 100644
--- a/distribution/src/assemble/component.xml
+++ b/distribution/src/assemble/component.xml
@@ -69,6 +69,7 @@
         <include>org.apache.drill.contrib:drill-udfs:jar</include>
         <include>org.apache.drill.contrib:drill-druid-storage:jar</include>
         <include>org.apache.drill.contrib:drill-iceberg-format:jar</include>
+        <include>org.apache.drill.contrib:drill-deltalake-format:jar</include>
       </includes>
       <outputDirectory>jars</outputDirectory>
       <useProjectArtifact>false</useProjectArtifact>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
index 5379760..93bb04b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/StatisticsProvider.java
@@ -31,11 +31,17 @@
 import org.apache.drill.exec.expr.fn.DrillSimpleFuncHolder;
 import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
 import org.apache.drill.exec.expr.holders.Float4Holder;
 import org.apache.drill.exec.expr.holders.Float8Holder;
 import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
 import org.apache.drill.exec.expr.holders.TimeStampHolder;
 import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.UdfUtilities;
+import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.vector.ValueHolderHelper;
 import org.apache.drill.metastore.statistics.ColumnStatistics;
 import org.apache.drill.metastore.statistics.ColumnStatisticsKind;
@@ -52,10 +58,13 @@
 
   private final Map<SchemaPath, ColumnStatistics<?>> columnStatMap;
   private final long rowCount;
+  private final UdfUtilities udfUtilities;
 
-  public StatisticsProvider(Map<SchemaPath, ColumnStatistics<?>> columnStatMap, long rowCount) {
+  public StatisticsProvider(Map<SchemaPath, ColumnStatistics<?>> columnStatMap, long rowCount,
+    UdfUtilities udfUtilities) {
     this.columnStatMap = columnStatMap;
     this.rowCount = rowCount;
+    this.udfUtilities = udfUtilities;
   }
 
   public long getRowCount() {
@@ -154,12 +163,13 @@
 
   private ColumnStatistics<?> evalCastFunc(FunctionHolderExpression holderExpr, ColumnStatistics<T> input) {
     try {
-      DrillSimpleFuncHolder funcHolder = (DrillSimpleFuncHolder) holderExpr.getHolder();
 
-      DrillSimpleFunc interpreter = funcHolder.createInterpreter();
-
-      ValueHolder minHolder;
-      ValueHolder maxHolder;
+      T minValue = ComparisonPredicate.getMinValue(input);
+      T maxValue = ComparisonPredicate.getMaxValue(input);
+      if (minValue == null && maxValue == null) {
+        // no need to evaluate cast for null arguments
+        return input;
+      }
 
       TypeProtos.MinorType srcType = holderExpr.args.get(0).getMajorType().getMinorType();
       TypeProtos.MinorType destType = holderExpr.getMajorType().getMinorType();
@@ -171,26 +181,33 @@
         return null; // cast func between srcType and destType is NOT allowed.
       }
 
+      ValueHolder minHolder;
+      ValueHolder maxHolder;
+
       switch (srcType) {
         case INT :
-          minHolder = ValueHolderHelper.getIntHolder((Integer) ComparisonPredicate.getMinValue(input));
-          maxHolder = ValueHolderHelper.getIntHolder((Integer) ComparisonPredicate.getMaxValue(input));
+          minHolder = ValueHolderHelper.getIntHolder((Integer) minValue);
+          maxHolder = ValueHolderHelper.getIntHolder((Integer) maxValue);
           break;
         case BIGINT:
-          minHolder = ValueHolderHelper.getBigIntHolder((Long) ComparisonPredicate.getMinValue(input));
-          maxHolder = ValueHolderHelper.getBigIntHolder((Long) ComparisonPredicate.getMaxValue(input));
+          minHolder = ValueHolderHelper.getBigIntHolder((Long) minValue);
+          maxHolder = ValueHolderHelper.getBigIntHolder((Long) maxValue);
           break;
         case FLOAT4:
-          minHolder = ValueHolderHelper.getFloat4Holder((Float) ComparisonPredicate.getMinValue(input));
-          maxHolder = ValueHolderHelper.getFloat4Holder((Float) ComparisonPredicate.getMaxValue(input));
+          minHolder = ValueHolderHelper.getFloat4Holder((Float) minValue);
+          maxHolder = ValueHolderHelper.getFloat4Holder((Float) maxValue);
           break;
         case FLOAT8:
-          minHolder = ValueHolderHelper.getFloat8Holder((Double) ComparisonPredicate.getMinValue(input));
-          maxHolder = ValueHolderHelper.getFloat8Holder((Double) ComparisonPredicate.getMaxValue(input));
+          minHolder = ValueHolderHelper.getFloat8Holder((Double) minValue);
+          maxHolder = ValueHolderHelper.getFloat8Holder((Double) maxValue);
           break;
         case DATE:
-          minHolder = ValueHolderHelper.getDateHolder((Long) ComparisonPredicate.getMinValue(input));
-          maxHolder = ValueHolderHelper.getDateHolder((Long) ComparisonPredicate.getMaxValue(input));
+          minHolder = ValueHolderHelper.getDateHolder((Long) minValue);
+          maxHolder = ValueHolderHelper.getDateHolder((Long) maxValue);
+          break;
+        case VARCHAR:
+          minHolder = ValueHolderHelper.getVarCharHolder(udfUtilities.getManagedBuffer(), (String) minValue);
+          maxHolder = ValueHolderHelper.getVarCharHolder(udfUtilities.getManagedBuffer(), (String) maxValue);
           break;
         default:
           return null;
@@ -199,10 +216,20 @@
       ValueHolder[] args1 = {minHolder};
       ValueHolder[] args2 = {maxHolder};
 
+      DrillSimpleFuncHolder funcHolder = (DrillSimpleFuncHolder) holderExpr.getHolder();
+
+      DrillSimpleFunc interpreter = funcHolder.createInterpreter();
+
       ValueHolder minFuncHolder = InterpreterEvaluator.evaluateFunction(interpreter, args1, holderExpr.getName());
       ValueHolder maxFuncHolder = InterpreterEvaluator.evaluateFunction(interpreter, args2, holderExpr.getName());
 
       switch (destType) {
+        case BIT:
+          return StatisticsProvider.getColumnStatistics(
+            ((BitHolder) minFuncHolder).value,
+            ((BitHolder) maxFuncHolder).value,
+            ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
+            destType);
         case INT:
           return StatisticsProvider.getColumnStatistics(
               ((IntHolder) minFuncHolder).value,
@@ -227,12 +254,32 @@
               ((Float8Holder) maxFuncHolder).value,
               ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
               destType);
+        case DATE:
+          return StatisticsProvider.getColumnStatistics(
+            ((DateHolder) minFuncHolder).value,
+            ((DateHolder) maxFuncHolder).value,
+            ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
+            destType);
+        case TIME:
+          return StatisticsProvider.getColumnStatistics(
+            ((TimeHolder) minFuncHolder).value,
+            ((TimeHolder) maxFuncHolder).value,
+            ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
+            destType);
         case TIMESTAMP:
           return StatisticsProvider.getColumnStatistics(
               ((TimeStampHolder) minFuncHolder).value,
               ((TimeStampHolder) maxFuncHolder).value,
               ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
               destType);
+        case VARDECIMAL:
+          VarDecimalHolder minVarDecimalHolder = (VarDecimalHolder) minFuncHolder;
+          VarDecimalHolder maxVarDecimalHolder = (VarDecimalHolder) maxFuncHolder;
+          return StatisticsProvider.getColumnStatistics(
+            DecimalUtility.getBigDecimalFromDrillBuf(minVarDecimalHolder.buffer, minVarDecimalHolder.start, minVarDecimalHolder.scale, minVarDecimalHolder.precision),
+            DecimalUtility.getBigDecimalFromDrillBuf(maxVarDecimalHolder.buffer, maxVarDecimalHolder.start, maxVarDecimalHolder.scale, maxVarDecimalHolder.precision),
+            ColumnStatisticsKind.NULLS_COUNT.getFrom(input),
+            destType);
         default:
           return null;
       }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
index a11f834..2a701ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScanWithMetadata.java
@@ -41,6 +41,7 @@
 import org.apache.drill.exec.metastore.analyze.FileMetadataInfoCollector;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.ops.UdfUtilities;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -1076,25 +1077,21 @@
       for (T metadata : metadataList) {
         TupleMetadata schema = metadata.getSchema();
         if (schema != null && !tableSchema.isEquivalent(schema)) {
+          schema = FixedReceiver.Builder.mergeSchemas(schema, tableSchema);
           filterPredicate = getFilterPredicate(filterExpression, udfUtilities,
               context, optionManager, true, true, schema);
         }
         Map<SchemaPath, ColumnStatistics<?>> columnsStatistics = metadata.getColumnsStatistics();
 
         // adds partition (dir) column statistics if it may be used during filter evaluation
-        if (metadata instanceof LocationProvider && optionManager != null) {
-          LocationProvider locationProvider = (LocationProvider) metadata;
-          columnsStatistics = ParquetTableMetadataUtils.addImplicitColumnsStatistics(columnsStatistics,
-              source.columns, source.getPartitionValues(locationProvider), optionManager,
-              locationProvider.getPath(), source.supportsFileImplicitColumns());
-        }
+        columnsStatistics = getImplicitColumnStatistics(optionManager, metadata, columnsStatistics);
 
         if (source.getNonInterestingColumnsMetadata() != null) {
           columnsStatistics.putAll(source.getNonInterestingColumnsMetadata().getColumnsStatistics());
         }
         RowsMatch match = FilterEvaluatorUtils.matches(filterPredicate,
             columnsStatistics, TableStatisticsKind.ROW_COUNT.getValue(metadata),
-            schema, schemaPathsInExpr);
+            schema, schemaPathsInExpr, udfUtilities);
         if (match == RowsMatch.NONE) {
           continue; // No file comply to the filter => drop the file
         }
@@ -1109,6 +1106,17 @@
       return qualifiedMetadata;
     }
 
+    protected  <T extends Metadata> Map<SchemaPath, ColumnStatistics<?>> getImplicitColumnStatistics(
+      OptionManager optionManager, T metadata, Map<SchemaPath, ColumnStatistics<?>> columnsStatistics) {
+      if (metadata instanceof LocationProvider && optionManager != null) {
+        LocationProvider locationProvider = (LocationProvider) metadata;
+        columnsStatistics = ParquetTableMetadataUtils.addImplicitColumnsStatistics(columnsStatistics,
+            source.columns, source.getPartitionValues(locationProvider), optionManager,
+            locationProvider.getPath(), source.supportsFileImplicitColumns());
+      }
+      return columnsStatistics;
+    }
+
     protected abstract B self();
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
index 09045f5..cefd727 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillConstExecutor.java
@@ -127,6 +127,10 @@
     this.plannerSettings = plannerSettings;
   }
 
+  public UdfUtilities getUdfUtilities() {
+    return udfUtilities;
+  }
+
   @Override
   @SuppressWarnings("deprecation")
   public void reduce(RexBuilder rexBuilder, List<RexNode> constExps, List<RexNode> reducedValues) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 3a3800d..cf1fd96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -663,7 +663,7 @@
       AbstractParquetGroupScan abstractParquetGroupScan = (AbstractParquetGroupScan) source;
 
       Map<Path, FileMetadata> filesToFilter = new HashMap<>(prunedFiles);
-      if (!abstractParquetGroupScan.rowGroups.isEmpty()) {
+      if (!abstractParquetGroupScan.getRowGroupsMetadata().isEmpty()) {
         prunedFiles.forEach((path, fileMetadata) -> {
           if (abstractParquetGroupScan.rowGroups.get(path).size() == 1) {
             omittedFiles.put(path, fileMetadata);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
index 460fa6c..52a5e4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetRowGroupScan.java
@@ -111,6 +111,10 @@
   @JsonProperty
   public TupleMetadata getSchema() { return schema; }
 
+  public boolean isImplicitColumn(SchemaPath path, String partitionColumnLabel) {
+    return path.toString().matches(partitionColumnLabel + "\\d+");
+  }
+
   public abstract AbstractParquetRowGroupScan copy(List<SchemaPath> columns);
   @JsonIgnore
   public abstract Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index ffba7f2..f5c252c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -123,7 +123,7 @@
         String partitionColumnLabel = context.getOptions().getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
         for (SchemaPath path : schemaPathsInExpr) {
           if (rowGroupScan.supportsFileImplicitColumns() &&
-            path.toString().matches(partitionColumnLabel+"\\d+")) {
+            rowGroupScan.isImplicitColumn(path, partitionColumnLabel)) {
             continue;  // skip implicit columns like dir0, dir1
           }
           columnsInExpr.add(SchemaPath.getSimplePath(path.getRootSegmentPath()));
@@ -210,7 +210,7 @@
                     rowGroupSchema);
               }
 
-              matchResult = FilterEvaluatorUtils.matches(filterPredicate, columnsStatistics, footerRowCount, rowGroupSchema, schemaPathsInExpr);
+              matchResult = FilterEvaluatorUtils.matches(filterPredicate, columnsStatistics, footerRowCount, rowGroupSchema, schemaPathsInExpr, context);
 
               // collect logging info
               long timeToRead = pruneTimer.elapsed(TimeUnit.MICROSECONDS);
@@ -343,10 +343,7 @@
         reader.getClass().getSimpleName());
     readers.add(reader);
 
-    List<String> partitionValues = rowGroupScan.getPartitionValues(rowGroup);
-    Map<String, String> implicitValues =
-        columnExplorer.populateColumns(rowGroup.getPath(), partitionValues,
-            rowGroupScan.supportsFileImplicitColumns(), fs, rowGroup.getRowGroupIndex(), rowGroup.getStart(), rowGroup.getLength());
+    Map<String, String> implicitValues = getImplicitValues(rowGroupScan, columnExplorer, rowGroup, fs);
     implicitColumns.add(implicitValues);
     if (implicitValues.size() > mapWithMaxColumns.size()) {
       mapWithMaxColumns = implicitValues;
@@ -354,6 +351,12 @@
     return mapWithMaxColumns;
   }
 
+  protected Map<String, String> getImplicitValues(AbstractParquetRowGroupScan rowGroupScan, ColumnExplorer columnExplorer, RowGroupReadEntry rowGroup, DrillFileSystem fs) {
+    List<String> partitionValues = rowGroupScan.getPartitionValues(rowGroup);
+    return columnExplorer.populateColumns(rowGroup.getPath(), partitionValues,
+        rowGroupScan.supportsFileImplicitColumns(), fs, rowGroup.getRowGroupIndex(), rowGroup.getStart(), rowGroup.getLength());
+  }
+
   protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager);
 
   private ParquetMetadata readFooter(Configuration conf, Path path, ParquetReaderConfig readerConfig) throws IOException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
index 8d35b1d..2c0942d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/FilterEvaluatorUtils.java
@@ -98,7 +98,7 @@
     FilterPredicate<?> parquetPredicate = FilterBuilder.buildFilterPredicate(
         materializedFilter, constantBoundaries, udfUtilities, true);
 
-    return matches(parquetPredicate, columnsStatistics, rowCount, schema, schemaPathsInExpr);
+    return matches(parquetPredicate, columnsStatistics, rowCount, schema, schemaPathsInExpr, udfUtilities);
   }
 
   @SuppressWarnings("unchecked")
@@ -106,12 +106,13 @@
                                   Map<SchemaPath, ColumnStatistics<?>> columnsStatistics,
                                   long rowCount,
                                   TupleMetadata fileMetadata,
-                                  Set<SchemaPath> schemaPathsInExpr) {
+                                  Set<SchemaPath> schemaPathsInExpr,
+                                  UdfUtilities udfUtilities) {
     if (parquetPredicate == null) {
       return RowsMatch.SOME;
     }
     @SuppressWarnings("rawtypes")
-    StatisticsProvider<T> rangeExprEvaluator = new StatisticsProvider(columnsStatistics, rowCount);
+    StatisticsProvider<T> rangeExprEvaluator = new StatisticsProvider(columnsStatistics, rowCount, udfUtilities);
     RowsMatch rowsMatch = parquetPredicate.matches(rangeExprEvaluator);
 
     if (rowsMatch == RowsMatch.ALL && isMetaNotApplicable(schemaPathsInExpr, fileMetadata)) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 8c91200..d6fade8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -51,12 +51,12 @@
   /**
    * Creates file system only if it was not created before, otherwise returns already created instance.
    */
-  private class ParquetDrillFileSystemManager extends AbstractDrillFileSystemManager {
+  public static class ParquetDrillFileSystemManager extends AbstractDrillFileSystemManager {
 
     private final boolean useAsyncPageReader;
     private DrillFileSystem fs;
 
-    ParquetDrillFileSystemManager(OperatorContext operatorContext, boolean useAsyncPageReader) {
+    public ParquetDrillFileSystemManager(OperatorContext operatorContext, boolean useAsyncPageReader) {
       super(operatorContext);
       this.useAsyncPageReader = useAsyncPageReader;
     }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
index 06384e5..17572f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/AbstractPluginImplementor.java
@@ -143,6 +143,11 @@
     return false;
   }
 
+  @Override
+  public boolean artificialFilter() {
+    return false;
+  }
+
   private UserException getUnsupported(String rel) {
     return UserException.unsupportedError()
         .message("Plugin implementor doesn't support push down for %s", rel)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
index f4589a6..7b54a6f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/PluginImplementor.java
@@ -93,4 +93,12 @@
    * to ensure returning the correct rows number.
    */
   boolean artificialLimit();
+
+  /**
+   * If the plugin doesn't support native filter pushdown,
+   * but the reader can prune the set of rows to read.
+   * In this case filter operator on top of the scan should be preserved
+   * to ensure returning the correct subset of rows.
+   */
+  boolean artificialFilter();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
index e5e02e7..8a2cf7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginFilterRule.java
@@ -24,6 +24,8 @@
 import org.apache.drill.exec.store.plan.PluginImplementor;
 import org.apache.drill.exec.store.plan.rel.PluginFilterRel;
 
+import java.util.Collections;
+
 /**
  * The rule that converts provided filter operator to plugin-specific implementation.
  */
@@ -36,11 +38,15 @@
   @Override
   public RelNode convert(RelNode rel) {
     Filter filter = (Filter) rel;
-    return new PluginFilterRel(
-        getOutConvention(),
-        rel.getCluster(),
-        filter.getTraitSet().replace(getOutConvention()),
-        convert(filter.getInput(), filter.getTraitSet().replace(getOutConvention())),
-        filter.getCondition());
+    PluginFilterRel pluginFilterRel = new PluginFilterRel(
+      getOutConvention(),
+      rel.getCluster(),
+      filter.getTraitSet().replace(getOutConvention()),
+      convert(filter.getInput(), filter.getTraitSet().replace(getOutConvention())),
+      filter.getCondition());
+    if (getPluginImplementor().artificialFilter()) {
+      return filter.copy(filter.getTraitSet(), Collections.singletonList(pluginFilterRel));
+    }
+    return pluginFilterRel;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
index a71060b..9df71ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/plan/rule/PluginJoinRule.java
@@ -31,7 +31,7 @@
 public class PluginJoinRule extends PluginConverterRule {
 
   public PluginJoinRule(RelTrait in, Convention out, PluginImplementor pluginImplementor) {
-    super(Join.class, in, out, "PluginProjectRule", pluginImplementor);
+    super(Join.class, in, out, "PluginJoinRule", pluginImplementor);
   }
 
   @Override