Spark: Apply row-level delete files when reading (#1444)

diff --git a/api/src/main/java/org/apache/iceberg/Accessors.java b/api/src/main/java/org/apache/iceberg/Accessors.java
index a3eb5a5..e62012d 100644
--- a/api/src/main/java/org/apache/iceberg/Accessors.java
+++ b/api/src/main/java/org/apache/iceberg/Accessors.java
@@ -212,10 +212,6 @@
         }
       }
 
-      if (accessors.isEmpty()) {
-        return null;
-      }
-
       return accessors;
     }
 
diff --git a/build.gradle b/build.gradle
index 88f8cc9..c19d5ef 100644
--- a/build.gradle
+++ b/build.gradle
@@ -674,6 +674,7 @@
       }
       testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
       testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
+      testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
     }
 
     test {
@@ -781,6 +782,7 @@
     }
     testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
     testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
+    testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
   }
 
   test {
diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
index a69d8ed..a9c60df 100644
--- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
@@ -31,6 +31,14 @@
   private TableScanUtil() {
   }
 
+  public static boolean hasDeletes(CombinedScanTask task) {
+    return task.files().stream().anyMatch(TableScanUtil::hasDeletes);
+  }
+
+  public static boolean hasDeletes(FileScanTask task) {
+    return !task.deletes().isEmpty();
+  }
+
   public static CloseableIterable<FileScanTask> splitFiles(CloseableIterable<FileScanTask> tasks, long splitSize) {
     Iterable<FileScanTask> splitTasks = FluentIterable
         .from(tasks)
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
new file mode 100644
index 0000000..2909a91
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructProjection;
+import org.apache.parquet.Preconditions;
+
+public abstract class DeleteFilter<T> {
+  private static final long DEFAULT_SET_FILTER_THRESHOLD = 100_000L;
+  private static final Schema POS_DELETE_SCHEMA = new Schema(
+      MetadataColumns.DELETE_FILE_PATH,
+      MetadataColumns.DELETE_FILE_POS);
+
+  private final long setFilterThreshold;
+  private final DataFile dataFile;
+  private final List<DeleteFile> posDeletes;
+  private final List<DeleteFile> eqDeletes;
+  private final Schema requiredSchema;
+  private final Accessor<StructLike> posAccessor;
+
+  public DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
+    this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD;
+    this.dataFile = task.file();
+
+    ImmutableList.Builder<DeleteFile> posDeleteBuilder = ImmutableList.builder();
+    ImmutableList.Builder<DeleteFile> eqDeleteBuilder = ImmutableList.builder();
+    for (DeleteFile delete : task.deletes()) {
+      switch (delete.content()) {
+        case POSITION_DELETES:
+          posDeleteBuilder.add(delete);
+          break;
+        case EQUALITY_DELETES:
+          eqDeleteBuilder.add(delete);
+          break;
+        default:
+          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+      }
+    }
+
+    this.posDeletes = posDeleteBuilder.build();
+    this.eqDeletes = eqDeleteBuilder.build();
+    this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes);
+    this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
+  }
+
+  public Schema requiredSchema() {
+    return requiredSchema;
+  }
+
+  Accessor<StructLike> posAccessor() {
+    return posAccessor;
+  }
+
+  protected abstract StructLike asStructLike(T record);
+
+  protected abstract InputFile getInputFile(String location);
+
+  protected long pos(T record) {
+    return (Long) posAccessor.get(asStructLike(record));
+  }
+
+  public CloseableIterable<T> filter(CloseableIterable<T> records) {
+    return applyEqDeletes(applyPosDeletes(records));
+  }
+
+  private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+    if (eqDeletes.isEmpty()) {
+      return records;
+    }
+
+    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+    for (DeleteFile delete : eqDeletes) {
+      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+    }
+
+    CloseableIterable<T> filteredRecords = records;
+    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+      Set<Integer> ids = entry.getKey();
+      Iterable<DeleteFile> deletes = entry.getValue();
+
+      Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
+
+      // a projection to select and reorder fields of the file schema to match the delete rows
+      StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);
+
+      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
+          delete -> openDeletes(delete, deleteSchema));
+      StructLikeSet deleteSet = Deletes.toEqualitySet(
+          // copy the delete records because they will be held in a set
+          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+          deleteSchema.asStruct());
+
+      filteredRecords = Deletes.filter(filteredRecords,
+          record -> projectRow.wrap(asStructLike(record)), deleteSet);
+    }
+
+    return filteredRecords;
+  }
+
+  private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
+    if (posDeletes.isEmpty()) {
+      return records;
+    }
+
+    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+
+    // if there are fewer deletes than a reasonable number to keep in memory, use a set
+    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
+      return Deletes.filter(
+          records, this::pos,
+          Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes)));
+    }
+
+    return Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(dataFile.path(), deletes));
+  }
+
+  private CloseableIterable<Record> openPosDeletes(DeleteFile file) {
+    return openDeletes(file, POS_DELETE_SCHEMA);
+  }
+
+  private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema deleteSchema) {
+    InputFile input = getInputFile(deleteFile.path().toString());
+    switch (deleteFile.format()) {
+      case AVRO:
+        return Avro.read(input)
+            .project(deleteSchema)
+            .reuseContainers()
+            .createReaderFunc(DataReader::create)
+            .build();
+
+      case PARQUET:
+        Parquet.ReadBuilder builder = Parquet.read(input)
+            .project(deleteSchema)
+            .reuseContainers()
+            .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema));
+
+        if (deleteFile.content() == FileContent.POSITION_DELETES) {
+          builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), dataFile.path()));
+        }
+
+        return builder.build();
+
+      case ORC:
+      default:
+        throw new UnsupportedOperationException(String.format(
+            "Cannot read deletes, %s is not a supported format: %s", deleteFile.format().name(), deleteFile.path()));
+    }
+  }
+
+  private static Schema fileProjection(Schema tableSchema, Schema requestedSchema,
+                                       List<DeleteFile> posDeletes, List<DeleteFile> eqDeletes) {
+    if (posDeletes.isEmpty() && eqDeletes.isEmpty()) {
+      return requestedSchema;
+    }
+
+    Set<Integer> requiredIds = Sets.newLinkedHashSet();
+    if (!posDeletes.isEmpty()) {
+      requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
+    }
+
+    for (DeleteFile eqDelete : eqDeletes) {
+      requiredIds.addAll(eqDelete.equalityFieldIds());
+    }
+
+    Set<Integer> missingIds = Sets.newLinkedHashSet(
+        Sets.difference(requiredIds, TypeUtil.getProjectedIds(requestedSchema)));
+
+    if (missingIds.isEmpty()) {
+      return requestedSchema;
+    }
+
+    // TODO: support adding nested columns. this will currently fail when finding nested columns to add
+    List<Types.NestedField> columns = Lists.newArrayList(requestedSchema.columns());
+    for (int fieldId : missingIds) {
+      if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
+        continue; // add _pos at the end
+      }
+
+      Types.NestedField field = tableSchema.asStruct().field(fieldId);
+      Preconditions.checkArgument(field != null, "Cannot find required field for ID %s", fieldId);
+
+      columns.add(field);
+    }
+
+    if (missingIds.contains(MetadataColumns.ROW_POSITION.fieldId())) {
+      columns.add(MetadataColumns.ROW_POSITION);
+    }
+
+    return new Schema(columns);
+  }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java
new file mode 100644
index 0000000..30109f1
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+
+public class GenericDeleteFilter extends DeleteFilter<Record> {
+  private final FileIO io;
+  private final InternalRecordWrapper asStructLike;
+
+  public GenericDeleteFilter(FileIO io, FileScanTask task, Schema tableSchema, Schema requestedSchema) {
+    super(task, tableSchema, requestedSchema);
+    this.io = io;
+    this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct());
+  }
+
+  @Override
+  protected long pos(Record record) {
+    return (Long) posAccessor().get(record);
+  }
+
+  @Override
+  protected StructLike asStructLike(Record record) {
+    return asStructLike.wrap(record);
+  }
+
+  @Override
+  protected InputFile getInputFile(String location) {
+    return io.newInputFile(location);
+  }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/GenericReader.java b/data/src/main/java/org/apache/iceberg/data/GenericReader.java
index e5f7117..913e783 100644
--- a/data/src/main/java/org/apache/iceberg/data/GenericReader.java
+++ b/data/src/main/java/org/apache/iceberg/data/GenericReader.java
@@ -20,26 +20,16 @@
 package org.apache.iceberg.data;
 
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import org.apache.iceberg.Accessor;
 import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.data.avro.DataReader;
 import org.apache.iceberg.data.orc.GenericOrcReader;
 import org.apache.iceberg.data.parquet.GenericParquetReaders;
-import org.apache.iceberg.deletes.Deletes;
 import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
@@ -51,23 +41,11 @@
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
-import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PartitionUtil;
-import org.apache.iceberg.util.StructLikeSet;
-import org.apache.iceberg.util.StructProjection;
-import org.apache.parquet.Preconditions;
 
 class GenericReader implements Serializable {
-  private static final Schema POS_DELETE_SCHEMA = new Schema(
-      MetadataColumns.DELETE_FILE_PATH,
-      MetadataColumns.DELETE_FILE_POS);
-
   private final FileIO io;
   private final Schema tableSchema;
   private final Schema projection;
@@ -92,67 +70,16 @@
   }
 
   public CloseableIterable<Record> open(FileScanTask task) {
-    List<DeleteFile> posDeletes = Lists.newArrayList();
-    List<DeleteFile> eqDeletes = Lists.newArrayList();
-    for (DeleteFile delete : task.deletes()) {
-      switch (delete.content()) {
-        case POSITION_DELETES:
-          posDeletes.add(delete);
-          break;
-        case EQUALITY_DELETES:
-          eqDeletes.add(delete);
-          break;
-        default:
-          throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
-      }
-    }
+    DeleteFilter<Record> deletes = new GenericDeleteFilter(io, task, tableSchema, projection);
+    Schema readSchema = deletes.requiredSchema();
 
-    Schema fileProjection = fileProjection(posDeletes, eqDeletes);
-
-    CloseableIterable<Record> records = openFile(task, fileProjection);
-    records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
-    records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
-    records = applyResidual(records, fileProjection, task.residual());
+    CloseableIterable<Record> records = openFile(task, readSchema);
+    records = deletes.filter(records);
+    records = applyResidual(records, readSchema, task.residual());
 
     return records;
   }
 
-  private Schema fileProjection(List<DeleteFile> posDeletes, List<DeleteFile> eqDeletes) {
-    Set<Integer> requiredIds = Sets.newLinkedHashSet();
-    if (!posDeletes.isEmpty()) {
-      requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
-    }
-
-    for (DeleteFile eqDelete : eqDeletes) {
-      requiredIds.addAll(eqDelete.equalityFieldIds());
-    }
-
-    Set<Integer> missingIds = Sets.newLinkedHashSet(Sets.difference(requiredIds, TypeUtil.getProjectedIds(projection)));
-
-    if (missingIds.isEmpty()) {
-      return projection;
-    }
-
-    // TODO: support adding nested columns. this will currently fail when finding nested columns to add
-    List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
-    for (int fieldId : missingIds) {
-      if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
-        continue; // add _pos at the end
-      }
-
-      Types.NestedField field = tableSchema.asStruct().field(fieldId);
-      Preconditions.checkArgument(field != null, "Cannot find required field for ID %s", fieldId);
-
-      columns.add(field);
-    }
-
-    if (requiredIds.contains(MetadataColumns.ROW_POSITION.fieldId())) {
-      columns.add(MetadataColumns.ROW_POSITION);
-    }
-
-    return new Schema(columns);
-  }
-
   private CloseableIterable<Record> applyResidual(CloseableIterable<Record> records, Schema recordSchema,
                                                   Expression residual) {
     if (residual != null && residual != Expressions.alwaysTrue()) {
@@ -164,95 +91,6 @@
     return records;
   }
 
-  private CloseableIterable<Record> applyEqDeletes(CloseableIterable<Record> records, Schema recordSchema,
-                                                   List<DeleteFile> eqDeletes, DataFile dataFile) {
-    if (eqDeletes.isEmpty()) {
-      return records;
-    }
-
-    Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
-    for (DeleteFile delete : eqDeletes) {
-      filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
-    }
-
-    CloseableIterable<Record> filteredRecords = records;
-    for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
-      Set<Integer> ids = entry.getKey();
-      Iterable<DeleteFile> deletes = entry.getValue();
-
-      Schema deleteSchema = TypeUtil.select(recordSchema, ids);
-
-      // a wrapper to translate from generic objects to internal representations
-      InternalRecordWrapper asStructLike = new InternalRecordWrapper(recordSchema.asStruct());
-
-      // a projection to select and reorder fields of the file schema to match the delete rows
-      StructProjection projectRow = StructProjection.create(recordSchema, deleteSchema);
-
-      Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
-          delete -> openDeletes(delete, dataFile, deleteSchema));
-      StructLikeSet deleteSet = Deletes.toEqualitySet(
-          // copy the delete records because they will be held in a set
-          CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
-          deleteSchema.asStruct());
-
-      filteredRecords = Deletes.filter(filteredRecords,
-          record -> projectRow.wrap(asStructLike.wrap(record)), deleteSet);
-    }
-
-    return filteredRecords;
-  }
-
-  private CloseableIterable<Record> applyPosDeletes(CloseableIterable<Record> records, Schema recordSchema,
-                                                    CharSequence file, List<DeleteFile> posDeletes, DataFile dataFile) {
-    if (posDeletes.isEmpty()) {
-      return records;
-    }
-
-    Accessor<StructLike> posAccessor = recordSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
-    Function<Record, Long> posGetter = record -> (Long) posAccessor.get(record);
-    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes,
-        delete -> openPosDeletes(delete, dataFile));
-
-    // if there are fewer deletes than a reasonable number to keep in memory, use a set
-    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < 100_000L) {
-      return Deletes.filter(records, posGetter, Deletes.toPositionSet(file, CloseableIterable.concat(deletes)));
-    }
-
-    return Deletes.streamingFilter(records, posGetter, Deletes.deletePositions(file, deletes));
-  }
-
-  private CloseableIterable<Record> openPosDeletes(DeleteFile file, DataFile dataFile) {
-    return openDeletes(file, dataFile, POS_DELETE_SCHEMA);
-  }
-
-  private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, DataFile dataFile, Schema deleteSchema) {
-    InputFile input = io.newInputFile(deleteFile.path().toString());
-    switch (deleteFile.format()) {
-      case AVRO:
-        return Avro.read(input)
-            .project(deleteSchema)
-            .reuseContainers()
-            .createReaderFunc(DataReader::create)
-            .build();
-
-      case PARQUET:
-        Parquet.ReadBuilder builder = Parquet.read(input)
-            .project(deleteSchema)
-            .reuseContainers()
-            .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema));
-
-        if (deleteFile.content() == FileContent.POSITION_DELETES) {
-          builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), dataFile.path()));
-        }
-
-        return builder.build();
-
-      case ORC:
-      default:
-        throw new UnsupportedOperationException(String.format(
-            "Cannot read %s file: %s", deleteFile.format().name(), deleteFile.path()));
-    }
-  }
 
   private CloseableIterable<Record> openFile(FileScanTask task, Schema fileProjection) {
     InputFile input = io.newInputFile(task.file().path().toString());
diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java
index 210f231..3323d30 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java
@@ -35,6 +35,7 @@
 import org.apache.iceberg.util.ArrayUtil;
 import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructProjection;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -108,8 +109,9 @@
         .addDeletes(eqDeletes)
         .commit();
 
-    StructLikeSet expected = rowSetWithoutIds(29, 89, 122);
-    StructLikeSet actual = rowSet(table, "id"); // data is added by the reader to apply the eq deletes
+    StructLikeSet expected = selectColumns(rowSetWithoutIds(29, 89, 122), "id");
+    // data is added by the reader to apply the eq deletes, use StructProjection to remove it from comparison
+    StructLikeSet actual = selectColumns(rowSet(table, "id"), "id");
 
     Assert.assertEquals("Table should contain expected rows", expected, actual);
   }
@@ -250,6 +252,15 @@
     return set;
   }
 
+  private StructLikeSet selectColumns(StructLikeSet rows, String... columns) {
+    Schema projection = table.schema().select(columns);
+    StructLikeSet set = StructLikeSet.create(projection.asStruct());
+    rows.stream()
+        .map(row -> StructProjection.create(table.schema(), projection).wrap(row))
+        .forEach(set::add);
+    return set;
+  }
+
   private StructLikeSet rowSetWithoutIds(int... idsToRemove) {
     Set<Integer> deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove));
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index 1eab0a5..3708273 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -25,18 +25,19 @@
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.stream.Stream;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.util.Utf8;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.util.ByteBuffers;
 import org.apache.spark.rdd.InputFileBlockHolder;
@@ -50,21 +51,20 @@
  */
 abstract class BaseDataReader<T> implements Closeable {
   private final Iterator<FileScanTask> tasks;
-  private final FileIO fileIo;
   private final Map<String, InputFile> inputFiles;
 
   private CloseableIterator<T> currentIterator;
   private T current = null;
 
-  BaseDataReader(CombinedScanTask task, FileIO fileIo, EncryptionManager encryptionManager) {
-    this.fileIo = fileIo;
+  BaseDataReader(CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) {
     this.tasks = task.files().iterator();
-    Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(Iterables.transform(
-        task.files(),
-        fileScanTask ->
-            EncryptedFiles.encryptedInput(
-                this.fileIo.newInputFile(fileScanTask.file().path().toString()),
-                fileScanTask.file().keyMetadata())));
+    Stream<EncryptedInputFile> encrypted = task.files().stream()
+        .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+        .map(file -> EncryptedFiles.encryptedInput(io.newInputFile(file.path().toString()), file.keyMetadata()));
+
+    // decrypt with the batch call to avoid multiple RPCs to a key server, if possible
+    Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(encrypted::iterator);
+
     ImmutableMap.Builder<String, InputFile> inputFileBuilder = ImmutableMap.builder();
     decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted));
     this.inputFiles = inputFileBuilder.build();
@@ -104,11 +104,15 @@
     }
   }
 
-  InputFile getInputFile(FileScanTask task) {
+  protected InputFile getInputFile(FileScanTask task) {
     Preconditions.checkArgument(!task.isDataTask(), "Invalid task type");
     return inputFiles.get(task.file().path().toString());
   }
 
+  protected InputFile getInputFile(String location) {
+    return inputFiles.get(location);
+  }
+
   protected static Object convertConstant(Type type, Object value) {
     if (value == null) {
       return null;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index a7fbe90..1b06ffb 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -21,16 +21,16 @@
 
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataTask;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataColumns;
-import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
@@ -40,7 +40,6 @@
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkSchemaUtil;
@@ -64,15 +63,17 @@
       .impl(UnsafeProjection.class, InternalRow.class)
       .build();
 
+  private final FileIO io;
   private final Schema tableSchema;
   private final Schema expectedSchema;
   private final String nameMapping;
   private final boolean caseSensitive;
 
   RowDataReader(
-      CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO fileIo,
+      CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io,
       EncryptionManager encryptionManager, boolean caseSensitive) {
-    super(task, fileIo, encryptionManager);
+    super(task, io, encryptionManager);
+    this.io = io;
     this.tableSchema = tableSchema;
     this.expectedSchema = expectedSchema;
     this.nameMapping = nameMapping;
@@ -81,23 +82,17 @@
 
   @Override
   CloseableIterator<InternalRow> open(FileScanTask task) {
+    SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema, expectedSchema);
+
+    // schema or rows returned by readers
+    Schema requiredSchema = deletes.requiredSchema();
+    Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
     DataFile file = task.file();
 
     // update the current file for Spark's filename() function
     InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
 
-    // schema or rows returned by readers
-    PartitionSpec spec = task.spec();
-    Set<Integer> idColumns = spec.identitySourceIds();
-    Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns);
-    boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();
-
-    if (projectsIdentityPartitionColumns) {
-      return open(task, expectedSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant))
-          .iterator();
-    }
-    // return the base iterator
-    return open(task, expectedSchema, ImmutableMap.of()).iterator();
+    return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
   }
 
   private CloseableIterable<InternalRow> open(FileScanTask task, Schema readSchema, Map<Integer, ?> idToConstant) {
@@ -218,4 +213,23 @@
         JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(),
         JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq());
   }
+
+  private class SparkDeleteFilter extends DeleteFilter<InternalRow> {
+    private final InternalRowWrapper asStructLike;
+
+    SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
+      super(task, tableSchema, requestedSchema);
+      this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
+    }
+
+    @Override
+    protected StructLike asStructLike(InternalRow row) {
+      return asStructLike.wrap(row);
+    }
+
+    @Override
+    protected InputFile getInputFile(String location) {
+      return RowDataReader.this.getInputFile(location);
+    }
+  }
 }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
index e66f3a4..3021288 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
@@ -23,6 +23,8 @@
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.TestHiveMetastore;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -59,6 +61,12 @@
         .getOrCreate();
 
     SparkTestBase.catalog = new HiveCatalog(spark.sessionState().newHadoopConf());
+
+    try {
+      catalog.createNamespace(Namespace.of("default"));
+    } catch (AlreadyExistsException ignored) {
+      // the default namespace already exists. ignore the create error
+    }
   }
 
   @AfterClass
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
new file mode 100644
index 0000000..0a0dcd8
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers.Row;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkStructLike;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructProjection;
+import org.apache.spark.sql.Dataset;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public abstract class TestSparkReaderDeletes extends SparkTestBase {
+  private static final Schema SCHEMA = new Schema(
+      Types.NestedField.required(1, "id", Types.IntegerType.get()),
+      Types.NestedField.required(2, "data", Types.StringType.get()));
+  private Table table = null;
+  private List<Record> records = null;
+  private DataFile dataFile = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Before
+  public void createTable() throws IOException {
+    this.table = catalog.createTable(TableIdentifier.of("default", "table"), SCHEMA);
+    TableOperations ops = ((BaseTable) table).operations();
+    TableMetadata meta = ops.current();
+    ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+    this.records = Lists.newArrayList();
+
+    // records all use IDs that are in bucket id_bucket=0
+    GenericRecord record = GenericRecord.create(table.schema());
+    records.add(record.copy("id", 29, "data", "a"));
+    records.add(record.copy("id", 43, "data", "b"));
+    records.add(record.copy("id", 61, "data", "c"));
+    records.add(record.copy("id", 89, "data", "d"));
+    records.add(record.copy("id", 100, "data", "e"));
+    records.add(record.copy("id", 121, "data", "f"));
+    records.add(record.copy("id", 122, "data", "g"));
+
+    this.dataFile = FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), Row.of(0), records);
+
+    table.newAppend()
+        .appendFile(dataFile)
+        .commit();
+  }
+
+  @After
+  public void dropTable() {
+    catalog.dropTable(TableIdentifier.of("default", "table"));
+  }
+
+  @Test
+  public void testEqualityDeletes() throws IOException {
+    Schema deleteRowSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "a"), // id = 29
+        dataDelete.copy("data", "d"), // id = 89
+        dataDelete.copy("data", "g") // id = 122
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, deleteRowSchema);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .commit();
+
+    StructLikeSet expected = rowSetWithoutIds(29, 89, 122);
+    StructLikeSet actual = rowSet(table);
+
+    Assert.assertEquals("Table should contain expected rows", expected, actual);
+  }
+
+  @Test
+  public void testEqualityDeletesWithRequiredEqColumn() throws IOException {
+    Schema deleteRowSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(deleteRowSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "a"), // id = 29
+        dataDelete.copy("data", "d"), // id = 89
+        dataDelete.copy("data", "g") // id = 122
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, deleteRowSchema);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .commit();
+
+    StructLikeSet expected = selectColumns(rowSetWithoutIds(29, 89, 122), "id");
+    StructLikeSet actual = rowSet(table, "id"); // data is added by the reader to apply the eq deletes
+
+    Assert.assertEquals("Table should contain expected rows", expected, actual);
+  }
+
+  @Test
+  public void testPositionDeletes() throws IOException {
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+        Pair.of(dataFile.path(), 0L), // id = 29
+        Pair.of(dataFile.path(), 3L), // id = 89
+        Pair.of(dataFile.path(), 6L) // id = 122
+    );
+
+    DeleteFile posDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), Row.of(0), deletes);
+
+    table.newRowDelta()
+        .addDeletes(posDeletes)
+        .commit();
+
+    StructLikeSet expected = rowSetWithoutIds(29, 89, 122);
+    StructLikeSet actual = rowSet(table);
+
+    Assert.assertEquals("Table should contain expected rows", expected, actual);
+  }
+
+  @Test
+  public void testMixedPositionAndEqualityDeletes() throws IOException {
+    Schema dataSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(dataSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "a"), // id = 29
+        dataDelete.copy("data", "d"), // id = 89
+        dataDelete.copy("data", "g") // id = 122
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, dataSchema);
+
+    List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+        Pair.of(dataFile.path(), 3L), // id = 89
+        Pair.of(dataFile.path(), 5L) // id = 121
+    );
+
+    DeleteFile posDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), Row.of(0), deletes);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .addDeletes(posDeletes)
+        .commit();
+
+    StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122);
+    StructLikeSet actual = rowSet(table);
+
+    Assert.assertEquals("Table should contain expected rows", expected, actual);
+  }
+
+  @Test
+  public void testMultipleEqualityDeleteSchemas() throws IOException {
+    Schema dataSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(dataSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", "a"), // id = 29
+        dataDelete.copy("data", "d"), // id = 89
+        dataDelete.copy("data", "g") // id = 122
+    );
+
+    DeleteFile dataEqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, dataSchema);
+
+    Schema idSchema = table.schema().select("id");
+    Record idDelete = GenericRecord.create(idSchema);
+    List<Record> idDeletes = Lists.newArrayList(
+        idDelete.copy("id", 121), // id = 121
+        idDelete.copy("id", 29) // id = 29
+    );
+
+    DeleteFile idEqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), Row.of(0), idDeletes, idSchema);
+
+    table.newRowDelta()
+        .addDeletes(dataEqDeletes)
+        .addDeletes(idEqDeletes)
+        .commit();
+
+    StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122);
+    StructLikeSet actual = rowSet(table);
+
+    Assert.assertEquals("Table should contain expected rows", expected, actual);
+  }
+
+  @Test
+  public void testEqualityDeleteByNull() throws IOException {
+    // data is required in the test table; make it optional for this test
+    table.updateSchema()
+        .makeColumnOptional("data")
+        .commit();
+
+    // add a new data file with a record where data is null
+    Record record = GenericRecord.create(table.schema());
+    DataFile dataFileWithNull = FileHelpers.writeDataFile(
+        table, Files.localOutput(temp.newFile()), Row.of(0),
+        Lists.newArrayList(record.copy("id", 131, "data", null)));
+
+    table.newAppend()
+        .appendFile(dataFileWithNull)
+        .commit();
+
+    // delete where data is null
+    Schema dataSchema = table.schema().select("data");
+    Record dataDelete = GenericRecord.create(dataSchema);
+    List<Record> dataDeletes = Lists.newArrayList(
+        dataDelete.copy("data", null) // id = 131
+    );
+
+    DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+        table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, dataSchema);
+
+    table.newRowDelta()
+        .addDeletes(eqDeletes)
+        .commit();
+
+    StructLikeSet expected = rowSetWithoutIds(131);
+    StructLikeSet actual = rowSet(table);
+
+    Assert.assertEquals("Table should contain expected rows", expected, actual);
+  }
+
+  private static StructLikeSet rowSet(Table table) {
+    return rowSet(table, "*");
+  }
+
+  private static StructLikeSet rowSet(Table table, String... columns) {
+    Dataset<org.apache.spark.sql.Row> df = spark.read()
+        .format("iceberg")
+        .load("default.table")
+        .selectExpr(columns);
+
+    Types.StructType projection = table.schema().select(columns).asStruct();
+    StructLikeSet set = StructLikeSet.create(projection);
+    df.collectAsList().stream().forEach(row -> {
+      SparkStructLike rowWrapper = new SparkStructLike(projection);
+      set.add(rowWrapper.wrap(row));
+    });
+
+    return set;
+  }
+
+  private StructLikeSet selectColumns(StructLikeSet rows, String... columns) {
+    Schema projection = table.schema().select(columns);
+    StructLikeSet set = StructLikeSet.create(projection.asStruct());
+    rows.stream()
+        .map(row -> StructProjection.create(table.schema(), projection).wrap(row))
+        .forEach(set::add);
+    return set;
+  }
+
+  private StructLikeSet rowSetWithoutIds(int... idsToRemove) {
+    Set<Integer> deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove));
+    StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+    records.stream()
+        .filter(row -> !deletedIds.contains(row.getField("id")))
+        .forEach(set::add);
+    return set;
+  }
+}
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 564f8f2..826c450 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -38,6 +38,7 @@
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopFileIO;
@@ -50,6 +51,7 @@
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.TableScanUtil;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -205,6 +207,9 @@
     String expectedSchemaString = SchemaParser.toJson(lazySchema());
     String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
 
+    ValidationException.check(tasks().stream().noneMatch(TableScanUtil::hasDeletes),
+        "Cannot scan table %s: cannot apply required delete files", table);
+
     List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
     for (CombinedScanTask task : tasks()) {
       readTasks.add(new ReadTask<>(
@@ -324,7 +329,9 @@
 
       boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType());
 
-      this.readUsingBatch = batchReadsEnabled && (allOrcFileScanTasks ||
+      boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);
+
+      this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
           (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
     }
     return readUsingBatch;
diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes24.java
new file mode 100644
index 0000000..5b13bff
--- /dev/null
+++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes24.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+public class TestSparkReaderDeletes24 extends TestSparkReaderDeletes {
+}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
index 6cd166b..b84b56d 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
@@ -46,6 +46,7 @@
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.TableScanUtil;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.read.Batch;
@@ -168,7 +169,9 @@
 
     boolean onlyPrimitives = expectedSchema.columns().stream().allMatch(c -> c.type().isPrimitiveType());
 
-    boolean readUsingBatch = batchReadsEnabled && (allOrcFileScanTasks ||
+    boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);
+
+    boolean readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
         (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
 
     return new ReaderFactory(readUsingBatch ? batchSize : 0);
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes3.java
new file mode 100644
index 0000000..82835d6
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes3.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+public class TestSparkReaderDeletes3 extends TestSparkReaderDeletes {
+}