Spark: Apply row-level delete files when reading (#1444)
diff --git a/api/src/main/java/org/apache/iceberg/Accessors.java b/api/src/main/java/org/apache/iceberg/Accessors.java
index a3eb5a5..e62012d 100644
--- a/api/src/main/java/org/apache/iceberg/Accessors.java
+++ b/api/src/main/java/org/apache/iceberg/Accessors.java
@@ -212,10 +212,6 @@
}
}
- if (accessors.isEmpty()) {
- return null;
- }
-
return accessors;
}
diff --git a/build.gradle b/build.gradle
index 88f8cc9..c19d5ef 100644
--- a/build.gradle
+++ b/build.gradle
@@ -674,6 +674,7 @@
}
testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
+ testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
}
test {
@@ -781,6 +782,7 @@
}
testCompile project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
+ testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
}
test {
diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
index a69d8ed..a9c60df 100644
--- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java
@@ -31,6 +31,14 @@
private TableScanUtil() {
}
+ public static boolean hasDeletes(CombinedScanTask task) {
+ return task.files().stream().anyMatch(TableScanUtil::hasDeletes);
+ }
+
+ public static boolean hasDeletes(FileScanTask task) {
+ return !task.deletes().isEmpty();
+ }
+
public static CloseableIterable<FileScanTask> splitFiles(CloseableIterable<FileScanTask> tasks, long splitSize) {
Iterable<FileScanTask> splitTasks = FluentIterable
.from(tasks)
diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
new file mode 100644
index 0000000..2909a91
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.Accessor;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.deletes.Deletes;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructProjection;
+import org.apache.parquet.Preconditions;
+
+public abstract class DeleteFilter<T> {
+ private static final long DEFAULT_SET_FILTER_THRESHOLD = 100_000L;
+ private static final Schema POS_DELETE_SCHEMA = new Schema(
+ MetadataColumns.DELETE_FILE_PATH,
+ MetadataColumns.DELETE_FILE_POS);
+
+ private final long setFilterThreshold;
+ private final DataFile dataFile;
+ private final List<DeleteFile> posDeletes;
+ private final List<DeleteFile> eqDeletes;
+ private final Schema requiredSchema;
+ private final Accessor<StructLike> posAccessor;
+
+ public DeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
+ this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD;
+ this.dataFile = task.file();
+
+ ImmutableList.Builder<DeleteFile> posDeleteBuilder = ImmutableList.builder();
+ ImmutableList.Builder<DeleteFile> eqDeleteBuilder = ImmutableList.builder();
+ for (DeleteFile delete : task.deletes()) {
+ switch (delete.content()) {
+ case POSITION_DELETES:
+ posDeleteBuilder.add(delete);
+ break;
+ case EQUALITY_DELETES:
+ eqDeleteBuilder.add(delete);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
+ }
+ }
+
+ this.posDeletes = posDeleteBuilder.build();
+ this.eqDeletes = eqDeleteBuilder.build();
+ this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes);
+ this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
+ }
+
+ public Schema requiredSchema() {
+ return requiredSchema;
+ }
+
+ Accessor<StructLike> posAccessor() {
+ return posAccessor;
+ }
+
+ protected abstract StructLike asStructLike(T record);
+
+ protected abstract InputFile getInputFile(String location);
+
+ protected long pos(T record) {
+ return (Long) posAccessor.get(asStructLike(record));
+ }
+
+ public CloseableIterable<T> filter(CloseableIterable<T> records) {
+ return applyEqDeletes(applyPosDeletes(records));
+ }
+
+ private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
+ if (eqDeletes.isEmpty()) {
+ return records;
+ }
+
+ Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
+ for (DeleteFile delete : eqDeletes) {
+ filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
+ }
+
+ CloseableIterable<T> filteredRecords = records;
+ for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
+ Set<Integer> ids = entry.getKey();
+ Iterable<DeleteFile> deletes = entry.getValue();
+
+ Schema deleteSchema = TypeUtil.select(requiredSchema, ids);
+
+ // a projection to select and reorder fields of the file schema to match the delete rows
+ StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema);
+
+ Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
+ delete -> openDeletes(delete, deleteSchema));
+ StructLikeSet deleteSet = Deletes.toEqualitySet(
+ // copy the delete records because they will be held in a set
+ CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
+ deleteSchema.asStruct());
+
+ filteredRecords = Deletes.filter(filteredRecords,
+ record -> projectRow.wrap(asStructLike(record)), deleteSet);
+ }
+
+ return filteredRecords;
+ }
+
+ private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
+ if (posDeletes.isEmpty()) {
+ return records;
+ }
+
+ List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);
+
+ // if there are fewer deletes than a reasonable number to keep in memory, use a set
+ if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
+ return Deletes.filter(
+ records, this::pos,
+ Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes)));
+ }
+
+ return Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(dataFile.path(), deletes));
+ }
+
+ private CloseableIterable<Record> openPosDeletes(DeleteFile file) {
+ return openDeletes(file, POS_DELETE_SCHEMA);
+ }
+
+ private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, Schema deleteSchema) {
+ InputFile input = getInputFile(deleteFile.path().toString());
+ switch (deleteFile.format()) {
+ case AVRO:
+ return Avro.read(input)
+ .project(deleteSchema)
+ .reuseContainers()
+ .createReaderFunc(DataReader::create)
+ .build();
+
+ case PARQUET:
+ Parquet.ReadBuilder builder = Parquet.read(input)
+ .project(deleteSchema)
+ .reuseContainers()
+ .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema));
+
+ if (deleteFile.content() == FileContent.POSITION_DELETES) {
+ builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), dataFile.path()));
+ }
+
+ return builder.build();
+
+ case ORC:
+ default:
+ throw new UnsupportedOperationException(String.format(
+ "Cannot read deletes, %s is not a supported format: %s", deleteFile.format().name(), deleteFile.path()));
+ }
+ }
+
+ private static Schema fileProjection(Schema tableSchema, Schema requestedSchema,
+ List<DeleteFile> posDeletes, List<DeleteFile> eqDeletes) {
+ if (posDeletes.isEmpty() && eqDeletes.isEmpty()) {
+ return requestedSchema;
+ }
+
+ Set<Integer> requiredIds = Sets.newLinkedHashSet();
+ if (!posDeletes.isEmpty()) {
+ requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
+ }
+
+ for (DeleteFile eqDelete : eqDeletes) {
+ requiredIds.addAll(eqDelete.equalityFieldIds());
+ }
+
+ Set<Integer> missingIds = Sets.newLinkedHashSet(
+ Sets.difference(requiredIds, TypeUtil.getProjectedIds(requestedSchema)));
+
+ if (missingIds.isEmpty()) {
+ return requestedSchema;
+ }
+
+ // TODO: support adding nested columns. this will currently fail when finding nested columns to add
+ List<Types.NestedField> columns = Lists.newArrayList(requestedSchema.columns());
+ for (int fieldId : missingIds) {
+ if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
+ continue; // add _pos at the end
+ }
+
+ Types.NestedField field = tableSchema.asStruct().field(fieldId);
+ Preconditions.checkArgument(field != null, "Cannot find required field for ID %s", fieldId);
+
+ columns.add(field);
+ }
+
+ if (missingIds.contains(MetadataColumns.ROW_POSITION.fieldId())) {
+ columns.add(MetadataColumns.ROW_POSITION);
+ }
+
+ return new Schema(columns);
+ }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java
new file mode 100644
index 0000000..30109f1
--- /dev/null
+++ b/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+
+public class GenericDeleteFilter extends DeleteFilter<Record> {
+ private final FileIO io;
+ private final InternalRecordWrapper asStructLike;
+
+ public GenericDeleteFilter(FileIO io, FileScanTask task, Schema tableSchema, Schema requestedSchema) {
+ super(task, tableSchema, requestedSchema);
+ this.io = io;
+ this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct());
+ }
+
+ @Override
+ protected long pos(Record record) {
+ return (Long) posAccessor().get(record);
+ }
+
+ @Override
+ protected StructLike asStructLike(Record record) {
+ return asStructLike.wrap(record);
+ }
+
+ @Override
+ protected InputFile getInputFile(String location) {
+ return io.newInputFile(location);
+ }
+}
diff --git a/data/src/main/java/org/apache/iceberg/data/GenericReader.java b/data/src/main/java/org/apache/iceberg/data/GenericReader.java
index e5f7117..913e783 100644
--- a/data/src/main/java/org/apache/iceberg/data/GenericReader.java
+++ b/data/src/main/java/org/apache/iceberg/data/GenericReader.java
@@ -20,26 +20,16 @@
package org.apache.iceberg.data;
import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import org.apache.iceberg.Accessor;
import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
-import org.apache.iceberg.deletes.Deletes;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@@ -51,23 +41,11 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
-import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PartitionUtil;
-import org.apache.iceberg.util.StructLikeSet;
-import org.apache.iceberg.util.StructProjection;
-import org.apache.parquet.Preconditions;
class GenericReader implements Serializable {
- private static final Schema POS_DELETE_SCHEMA = new Schema(
- MetadataColumns.DELETE_FILE_PATH,
- MetadataColumns.DELETE_FILE_POS);
-
private final FileIO io;
private final Schema tableSchema;
private final Schema projection;
@@ -92,67 +70,16 @@
}
public CloseableIterable<Record> open(FileScanTask task) {
- List<DeleteFile> posDeletes = Lists.newArrayList();
- List<DeleteFile> eqDeletes = Lists.newArrayList();
- for (DeleteFile delete : task.deletes()) {
- switch (delete.content()) {
- case POSITION_DELETES:
- posDeletes.add(delete);
- break;
- case EQUALITY_DELETES:
- eqDeletes.add(delete);
- break;
- default:
- throw new UnsupportedOperationException("Unknown delete file content: " + delete.content());
- }
- }
+ DeleteFilter<Record> deletes = new GenericDeleteFilter(io, task, tableSchema, projection);
+ Schema readSchema = deletes.requiredSchema();
- Schema fileProjection = fileProjection(posDeletes, eqDeletes);
-
- CloseableIterable<Record> records = openFile(task, fileProjection);
- records = applyPosDeletes(records, fileProjection, task.file().path(), posDeletes, task.file());
- records = applyEqDeletes(records, fileProjection, eqDeletes, task.file());
- records = applyResidual(records, fileProjection, task.residual());
+ CloseableIterable<Record> records = openFile(task, readSchema);
+ records = deletes.filter(records);
+ records = applyResidual(records, readSchema, task.residual());
return records;
}
- private Schema fileProjection(List<DeleteFile> posDeletes, List<DeleteFile> eqDeletes) {
- Set<Integer> requiredIds = Sets.newLinkedHashSet();
- if (!posDeletes.isEmpty()) {
- requiredIds.add(MetadataColumns.ROW_POSITION.fieldId());
- }
-
- for (DeleteFile eqDelete : eqDeletes) {
- requiredIds.addAll(eqDelete.equalityFieldIds());
- }
-
- Set<Integer> missingIds = Sets.newLinkedHashSet(Sets.difference(requiredIds, TypeUtil.getProjectedIds(projection)));
-
- if (missingIds.isEmpty()) {
- return projection;
- }
-
- // TODO: support adding nested columns. this will currently fail when finding nested columns to add
- List<Types.NestedField> columns = Lists.newArrayList(projection.columns());
- for (int fieldId : missingIds) {
- if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
- continue; // add _pos at the end
- }
-
- Types.NestedField field = tableSchema.asStruct().field(fieldId);
- Preconditions.checkArgument(field != null, "Cannot find required field for ID %s", fieldId);
-
- columns.add(field);
- }
-
- if (requiredIds.contains(MetadataColumns.ROW_POSITION.fieldId())) {
- columns.add(MetadataColumns.ROW_POSITION);
- }
-
- return new Schema(columns);
- }
-
private CloseableIterable<Record> applyResidual(CloseableIterable<Record> records, Schema recordSchema,
Expression residual) {
if (residual != null && residual != Expressions.alwaysTrue()) {
@@ -164,95 +91,6 @@
return records;
}
- private CloseableIterable<Record> applyEqDeletes(CloseableIterable<Record> records, Schema recordSchema,
- List<DeleteFile> eqDeletes, DataFile dataFile) {
- if (eqDeletes.isEmpty()) {
- return records;
- }
-
- Multimap<Set<Integer>, DeleteFile> filesByDeleteIds = Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList);
- for (DeleteFile delete : eqDeletes) {
- filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete);
- }
-
- CloseableIterable<Record> filteredRecords = records;
- for (Map.Entry<Set<Integer>, Collection<DeleteFile>> entry : filesByDeleteIds.asMap().entrySet()) {
- Set<Integer> ids = entry.getKey();
- Iterable<DeleteFile> deletes = entry.getValue();
-
- Schema deleteSchema = TypeUtil.select(recordSchema, ids);
-
- // a wrapper to translate from generic objects to internal representations
- InternalRecordWrapper asStructLike = new InternalRecordWrapper(recordSchema.asStruct());
-
- // a projection to select and reorder fields of the file schema to match the delete rows
- StructProjection projectRow = StructProjection.create(recordSchema, deleteSchema);
-
- Iterable<CloseableIterable<Record>> deleteRecords = Iterables.transform(deletes,
- delete -> openDeletes(delete, dataFile, deleteSchema));
- StructLikeSet deleteSet = Deletes.toEqualitySet(
- // copy the delete records because they will be held in a set
- CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy),
- deleteSchema.asStruct());
-
- filteredRecords = Deletes.filter(filteredRecords,
- record -> projectRow.wrap(asStructLike.wrap(record)), deleteSet);
- }
-
- return filteredRecords;
- }
-
- private CloseableIterable<Record> applyPosDeletes(CloseableIterable<Record> records, Schema recordSchema,
- CharSequence file, List<DeleteFile> posDeletes, DataFile dataFile) {
- if (posDeletes.isEmpty()) {
- return records;
- }
-
- Accessor<StructLike> posAccessor = recordSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId());
- Function<Record, Long> posGetter = record -> (Long) posAccessor.get(record);
- List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes,
- delete -> openPosDeletes(delete, dataFile));
-
- // if there are fewer deletes than a reasonable number to keep in memory, use a set
- if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < 100_000L) {
- return Deletes.filter(records, posGetter, Deletes.toPositionSet(file, CloseableIterable.concat(deletes)));
- }
-
- return Deletes.streamingFilter(records, posGetter, Deletes.deletePositions(file, deletes));
- }
-
- private CloseableIterable<Record> openPosDeletes(DeleteFile file, DataFile dataFile) {
- return openDeletes(file, dataFile, POS_DELETE_SCHEMA);
- }
-
- private CloseableIterable<Record> openDeletes(DeleteFile deleteFile, DataFile dataFile, Schema deleteSchema) {
- InputFile input = io.newInputFile(deleteFile.path().toString());
- switch (deleteFile.format()) {
- case AVRO:
- return Avro.read(input)
- .project(deleteSchema)
- .reuseContainers()
- .createReaderFunc(DataReader::create)
- .build();
-
- case PARQUET:
- Parquet.ReadBuilder builder = Parquet.read(input)
- .project(deleteSchema)
- .reuseContainers()
- .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(deleteSchema, fileSchema));
-
- if (deleteFile.content() == FileContent.POSITION_DELETES) {
- builder.filter(Expressions.equal(MetadataColumns.DELETE_FILE_PATH.name(), dataFile.path()));
- }
-
- return builder.build();
-
- case ORC:
- default:
- throw new UnsupportedOperationException(String.format(
- "Cannot read %s file: %s", deleteFile.format().name(), deleteFile.path()));
- }
- }
private CloseableIterable<Record> openFile(FileScanTask task, Schema fileProjection) {
InputFile input = io.newInputFile(task.file().path().toString());
diff --git a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java
index 210f231..3323d30 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestGenericReaderDeletes.java
@@ -35,6 +35,7 @@
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructProjection;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -108,8 +109,9 @@
.addDeletes(eqDeletes)
.commit();
- StructLikeSet expected = rowSetWithoutIds(29, 89, 122);
- StructLikeSet actual = rowSet(table, "id"); // data is added by the reader to apply the eq deletes
+ StructLikeSet expected = selectColumns(rowSetWithoutIds(29, 89, 122), "id");
+ // data is added by the reader to apply the eq deletes, use StructProjection to remove it from comparison
+ StructLikeSet actual = selectColumns(rowSet(table, "id"), "id");
Assert.assertEquals("Table should contain expected rows", expected, actual);
}
@@ -250,6 +252,15 @@
return set;
}
+ private StructLikeSet selectColumns(StructLikeSet rows, String... columns) {
+ Schema projection = table.schema().select(columns);
+ StructLikeSet set = StructLikeSet.create(projection.asStruct());
+ rows.stream()
+ .map(row -> StructProjection.create(table.schema(), projection).wrap(row))
+ .forEach(set::add);
+ return set;
+ }
+
private StructLikeSet rowSetWithoutIds(int... idsToRemove) {
Set<Integer> deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove));
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index 1eab0a5..3708273 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -25,18 +25,19 @@
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
+import java.util.stream.Stream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.spark.rdd.InputFileBlockHolder;
@@ -50,21 +51,20 @@
*/
abstract class BaseDataReader<T> implements Closeable {
private final Iterator<FileScanTask> tasks;
- private final FileIO fileIo;
private final Map<String, InputFile> inputFiles;
private CloseableIterator<T> currentIterator;
private T current = null;
- BaseDataReader(CombinedScanTask task, FileIO fileIo, EncryptionManager encryptionManager) {
- this.fileIo = fileIo;
+ BaseDataReader(CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) {
this.tasks = task.files().iterator();
- Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(Iterables.transform(
- task.files(),
- fileScanTask ->
- EncryptedFiles.encryptedInput(
- this.fileIo.newInputFile(fileScanTask.file().path().toString()),
- fileScanTask.file().keyMetadata())));
+ Stream<EncryptedInputFile> encrypted = task.files().stream()
+ .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+ .map(file -> EncryptedFiles.encryptedInput(io.newInputFile(file.path().toString()), file.keyMetadata()));
+
+ // decrypt with the batch call to avoid multiple RPCs to a key server, if possible
+ Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(encrypted::iterator);
+
ImmutableMap.Builder<String, InputFile> inputFileBuilder = ImmutableMap.builder();
decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted));
this.inputFiles = inputFileBuilder.build();
@@ -104,11 +104,15 @@
}
}
- InputFile getInputFile(FileScanTask task) {
+ protected InputFile getInputFile(FileScanTask task) {
Preconditions.checkArgument(!task.isDataTask(), "Invalid task type");
return inputFiles.get(task.file().path().toString());
}
+ protected InputFile getInputFile(String location) {
+ return inputFiles.get(location);
+ }
+
protected static Object convertConstant(Type type, Object value) {
if (value == null) {
return null;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index a7fbe90..1b06ffb 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -21,16 +21,16 @@
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
-import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
@@ -40,7 +40,6 @@
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkSchemaUtil;
@@ -64,15 +63,17 @@
.impl(UnsafeProjection.class, InternalRow.class)
.build();
+ private final FileIO io;
private final Schema tableSchema;
private final Schema expectedSchema;
private final String nameMapping;
private final boolean caseSensitive;
RowDataReader(
- CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO fileIo,
+ CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive) {
- super(task, fileIo, encryptionManager);
+ super(task, io, encryptionManager);
+ this.io = io;
this.tableSchema = tableSchema;
this.expectedSchema = expectedSchema;
this.nameMapping = nameMapping;
@@ -81,23 +82,17 @@
@Override
CloseableIterator<InternalRow> open(FileScanTask task) {
+ SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema, expectedSchema);
+
+ // schema or rows returned by readers
+ Schema requiredSchema = deletes.requiredSchema();
+ Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
DataFile file = task.file();
// update the current file for Spark's filename() function
InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
- // schema or rows returned by readers
- PartitionSpec spec = task.spec();
- Set<Integer> idColumns = spec.identitySourceIds();
- Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns);
- boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();
-
- if (projectsIdentityPartitionColumns) {
- return open(task, expectedSchema, PartitionUtil.constantsMap(task, RowDataReader::convertConstant))
- .iterator();
- }
- // return the base iterator
- return open(task, expectedSchema, ImmutableMap.of()).iterator();
+ return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
}
private CloseableIterable<InternalRow> open(FileScanTask task, Schema readSchema, Map<Integer, ?> idToConstant) {
@@ -218,4 +213,23 @@
JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(),
JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq());
}
+
+ private class SparkDeleteFilter extends DeleteFilter<InternalRow> {
+ private final InternalRowWrapper asStructLike;
+
+ SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
+ super(task, tableSchema, requestedSchema);
+ this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
+ }
+
+ @Override
+ protected StructLike asStructLike(InternalRow row) {
+ return asStructLike.wrap(row);
+ }
+
+ @Override
+ protected InputFile getInputFile(String location) {
+ return RowDataReader.this.getInputFile(location);
+ }
+ }
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
index e66f3a4..3021288 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
@@ -23,6 +23,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -59,6 +61,12 @@
.getOrCreate();
SparkTestBase.catalog = new HiveCatalog(spark.sessionState().newHadoopConf());
+
+ try {
+ catalog.createNamespace(Namespace.of("default"));
+ } catch (AlreadyExistsException ignored) {
+ // the default namespace already exists. ignore the create error
+ }
}
@AfterClass
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
new file mode 100644
index 0000000..0a0dcd8
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers.Row;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.SparkStructLike;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.StructLikeSet;
+import org.apache.iceberg.util.StructProjection;
+import org.apache.spark.sql.Dataset;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public abstract class TestSparkReaderDeletes extends SparkTestBase {
+ private static final Schema SCHEMA = new Schema(
+ Types.NestedField.required(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "data", Types.StringType.get()));
+ private Table table = null;
+ private List<Record> records = null;
+ private DataFile dataFile = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Before
+ public void createTable() throws IOException {
+ this.table = catalog.createTable(TableIdentifier.of("default", "table"), SCHEMA);
+ TableOperations ops = ((BaseTable) table).operations();
+ TableMetadata meta = ops.current();
+ ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+ this.records = Lists.newArrayList();
+
+ // records all use IDs that are in bucket id_bucket=0
+ GenericRecord record = GenericRecord.create(table.schema());
+ records.add(record.copy("id", 29, "data", "a"));
+ records.add(record.copy("id", 43, "data", "b"));
+ records.add(record.copy("id", 61, "data", "c"));
+ records.add(record.copy("id", 89, "data", "d"));
+ records.add(record.copy("id", 100, "data", "e"));
+ records.add(record.copy("id", 121, "data", "f"));
+ records.add(record.copy("id", 122, "data", "g"));
+
+ this.dataFile = FileHelpers.writeDataFile(table, Files.localOutput(temp.newFile()), Row.of(0), records);
+
+ table.newAppend()
+ .appendFile(dataFile)
+ .commit();
+ }
+
+ @After
+ public void dropTable() {
+ catalog.dropTable(TableIdentifier.of("default", "table"));
+ }
+
+ @Test
+ public void testEqualityDeletes() throws IOException {
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ StructLikeSet expected = rowSetWithoutIds(29, 89, 122);
+ StructLikeSet actual = rowSet(table);
+
+ Assert.assertEquals("Table should contain expected rows", expected, actual);
+ }
+
+ @Test
+ public void testEqualityDeletesWithRequiredEqColumn() throws IOException {
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ StructLikeSet expected = selectColumns(rowSetWithoutIds(29, 89, 122), "id");
+ StructLikeSet actual = rowSet(table, "id"); // data is added by the reader to apply the eq deletes
+
+ Assert.assertEquals("Table should contain expected rows", expected, actual);
+ }
+
+ @Test
+ public void testPositionDeletes() throws IOException {
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 0L), // id = 29
+ Pair.of(dataFile.path(), 3L), // id = 89
+ Pair.of(dataFile.path(), 6L) // id = 122
+ );
+
+ DeleteFile posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), Row.of(0), deletes);
+
+ table.newRowDelta()
+ .addDeletes(posDeletes)
+ .commit();
+
+ StructLikeSet expected = rowSetWithoutIds(29, 89, 122);
+ StructLikeSet actual = rowSet(table);
+
+ Assert.assertEquals("Table should contain expected rows", expected, actual);
+ }
+
+ @Test
+ public void testMixedPositionAndEqualityDeletes() throws IOException {
+ Schema dataSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(dataSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, dataSchema);
+
+ List<Pair<CharSequence, Long>> deletes = Lists.newArrayList(
+ Pair.of(dataFile.path(), 3L), // id = 89
+ Pair.of(dataFile.path(), 5L) // id = 121
+ );
+
+ DeleteFile posDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), Row.of(0), deletes);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .addDeletes(posDeletes)
+ .commit();
+
+ StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122);
+ StructLikeSet actual = rowSet(table);
+
+ Assert.assertEquals("Table should contain expected rows", expected, actual);
+ }
+
+ @Test
+ public void testMultipleEqualityDeleteSchemas() throws IOException {
+ Schema dataSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(dataSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "a"), // id = 29
+ dataDelete.copy("data", "d"), // id = 89
+ dataDelete.copy("data", "g") // id = 122
+ );
+
+ DeleteFile dataEqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, dataSchema);
+
+ Schema idSchema = table.schema().select("id");
+ Record idDelete = GenericRecord.create(idSchema);
+ List<Record> idDeletes = Lists.newArrayList(
+ idDelete.copy("id", 121), // id = 121
+ idDelete.copy("id", 29) // id = 29
+ );
+
+ DeleteFile idEqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), Row.of(0), idDeletes, idSchema);
+
+ table.newRowDelta()
+ .addDeletes(dataEqDeletes)
+ .addDeletes(idEqDeletes)
+ .commit();
+
+ StructLikeSet expected = rowSetWithoutIds(29, 89, 121, 122);
+ StructLikeSet actual = rowSet(table);
+
+ Assert.assertEquals("Table should contain expected rows", expected, actual);
+ }
+
+ @Test
+ public void testEqualityDeleteByNull() throws IOException {
+ // data is required in the test table; make it optional for this test
+ table.updateSchema()
+ .makeColumnOptional("data")
+ .commit();
+
+ // add a new data file with a record where data is null
+ Record record = GenericRecord.create(table.schema());
+ DataFile dataFileWithNull = FileHelpers.writeDataFile(
+ table, Files.localOutput(temp.newFile()), Row.of(0),
+ Lists.newArrayList(record.copy("id", 131, "data", null)));
+
+ table.newAppend()
+ .appendFile(dataFileWithNull)
+ .commit();
+
+ // delete where data is null
+ Schema dataSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(dataSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", null) // id = 131
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), Row.of(0), dataDeletes, dataSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ StructLikeSet expected = rowSetWithoutIds(131);
+ StructLikeSet actual = rowSet(table);
+
+ Assert.assertEquals("Table should contain expected rows", expected, actual);
+ }
+
+ private static StructLikeSet rowSet(Table table) {
+ return rowSet(table, "*");
+ }
+
+ private static StructLikeSet rowSet(Table table, String... columns) {
+ Dataset<org.apache.spark.sql.Row> df = spark.read()
+ .format("iceberg")
+ .load("default.table")
+ .selectExpr(columns);
+
+ Types.StructType projection = table.schema().select(columns).asStruct();
+ StructLikeSet set = StructLikeSet.create(projection);
+ df.collectAsList().stream().forEach(row -> {
+ SparkStructLike rowWrapper = new SparkStructLike(projection);
+ set.add(rowWrapper.wrap(row));
+ });
+
+ return set;
+ }
+
+ private StructLikeSet selectColumns(StructLikeSet rows, String... columns) {
+ Schema projection = table.schema().select(columns);
+ StructLikeSet set = StructLikeSet.create(projection.asStruct());
+ rows.stream()
+ .map(row -> StructProjection.create(table.schema(), projection).wrap(row))
+ .forEach(set::add);
+ return set;
+ }
+
+ private StructLikeSet rowSetWithoutIds(int... idsToRemove) {
+ Set<Integer> deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove));
+ StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
+ records.stream()
+ .filter(row -> !deletedIds.contains(row.getField("id")))
+ .forEach(set::add);
+ return set;
+ }
+}
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
index 564f8f2..826c450 100644
--- a/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -38,6 +38,7 @@
import org.apache.iceberg.TableScan;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hadoop.HadoopFileIO;
@@ -50,6 +51,7 @@
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -205,6 +207,9 @@
String expectedSchemaString = SchemaParser.toJson(lazySchema());
String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);
+ ValidationException.check(tasks().stream().noneMatch(TableScanUtil::hasDeletes),
+ "Cannot scan table %s: cannot apply required delete files", table);
+
List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
for (CombinedScanTask task : tasks()) {
readTasks.add(new ReadTask<>(
@@ -324,7 +329,9 @@
boolean onlyPrimitives = lazySchema().columns().stream().allMatch(c -> c.type().isPrimitiveType());
- this.readUsingBatch = batchReadsEnabled && (allOrcFileScanTasks ||
+ boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);
+
+ this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
}
return readUsingBatch;
diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes24.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes24.java
new file mode 100644
index 0000000..5b13bff
--- /dev/null
+++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes24.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+public class TestSparkReaderDeletes24 extends TestSparkReaderDeletes {
+}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
index 6cd166b..b84b56d 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
@@ -46,6 +46,7 @@
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.Batch;
@@ -168,7 +169,9 @@
boolean onlyPrimitives = expectedSchema.columns().stream().allMatch(c -> c.type().isPrimitiveType());
- boolean readUsingBatch = batchReadsEnabled && (allOrcFileScanTasks ||
+ boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);
+
+ boolean readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
return new ReaderFactory(readUsingBatch ? batchSize : 0);
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes3.java
new file mode 100644
index 0000000..82835d6
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes3.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+public class TestSparkReaderDeletes3 extends TestSparkReaderDeletes {
+}