Core: Ignore split offsets array when split offset is past file length (#8925) (#8938)
diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java
index 31e9145..daa209d 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFile.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFile.java
@@ -460,21 +460,27 @@
@Override
public List<Long> splitOffsets() {
- if (splitOffsets == null || splitOffsets.length == 0) {
- return null;
+ if (hasWellDefinedOffsets()) {
+ return ArrayUtil.toUnmodifiableLongList(splitOffsets);
}
- // If the last split offset is past the file size this means the split offsets are corrupted and
- // should not be used
- if (splitOffsets[splitOffsets.length - 1] >= fileSizeInBytes) {
- return null;
- }
-
- return ArrayUtil.toUnmodifiableLongList(splitOffsets);
+ return null;
}
long[] splitOffsetArray() {
- return splitOffsets;
+ if (hasWellDefinedOffsets()) {
+ return splitOffsets;
+ }
+
+ return null;
+ }
+
+ private boolean hasWellDefinedOffsets() {
+ // If the last split offset is past the file size this means the split offsets are corrupted and
+ // should not be used
+ return splitOffsets != null
+ && splitOffsets.length != 0
+ && splitOffsets[splitOffsets.length - 1] < fileSizeInBytes;
}
@Override
diff --git a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
index 0dff941..cfe3cb6 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
@@ -25,19 +25,26 @@
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.BaseFileScanTask;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MergeableScanTask;
import org.apache.iceberg.MockFileScanTask;
import org.apache.iceberg.PartitionScanTask;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.ScanTaskGroup;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SplittableScanTask;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -127,6 +134,45 @@
}
@Test
+ public void testTaskGroupPlanningCorruptedOffset() {
+ DataFile dataFile =
+ DataFiles.builder(TableTestBase.SPEC)
+ .withPath("/path/to/data-a.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("data_bucket=0")
+ .withRecordCount(1)
+ .withSplitOffsets(
+ ImmutableList.of(2L, 12L)) // the last offset is beyond the end of the file
+ .build();
+
+ ResidualEvaluator residualEvaluator =
+ ResidualEvaluator.of(TableTestBase.SPEC, Expressions.equal("id", 1), false);
+
+ BaseFileScanTask baseFileScanTask =
+ new BaseFileScanTask(
+ dataFile,
+ null,
+ SchemaParser.toJson(TableTestBase.SCHEMA),
+ PartitionSpecParser.toJson(TableTestBase.SPEC),
+ residualEvaluator);
+
+ List<BaseFileScanTask> baseFileScanTasks = ImmutableList.of(baseFileScanTask);
+
+ int taskCount = 0;
+ for (ScanTaskGroup<BaseFileScanTask> task :
+ TableScanUtil.planTaskGroups(CloseableIterable.withNoopClose(baseFileScanTasks), 1, 1, 0)) {
+ for (FileScanTask fileScanTask : task.tasks()) {
+ DataFile taskDataFile = fileScanTask.file();
+ Assertions.assertThat(taskDataFile.splitOffsets()).isNull();
+ taskCount++;
+ }
+ }
+
+ // 10 tasks since the split offsets are ignored and there are 1 byte splits for a 10 byte file
+ Assertions.assertThat(taskCount).isEqualTo(10);
+ }
+
+ @Test
public void testTaskMerging() {
List<ParentTask> tasks =
ImmutableList.of(