Core: Ignore split offsets array when split offset is past file length (#8925) (#8938)

diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java
index 31e9145..daa209d 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFile.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFile.java
@@ -460,21 +460,27 @@
 
   @Override
   public List<Long> splitOffsets() {
-    if (splitOffsets == null || splitOffsets.length == 0) {
-      return null;
+    if (hasWellDefinedOffsets()) {
+      return ArrayUtil.toUnmodifiableLongList(splitOffsets);
     }
 
-    // If the last split offset is past the file size this means the split offsets are corrupted and
-    // should not be used
-    if (splitOffsets[splitOffsets.length - 1] >= fileSizeInBytes) {
-      return null;
-    }
-
-    return ArrayUtil.toUnmodifiableLongList(splitOffsets);
+    return null;
   }
 
   long[] splitOffsetArray() {
-    return splitOffsets;
+    if (hasWellDefinedOffsets()) {
+      return splitOffsets;
+    }
+
+    return null;
+  }
+
+  private boolean hasWellDefinedOffsets() {
+    // If the last split offset is past the file size this means the split offsets are corrupted and
+    // should not be used
+    return splitOffsets != null
+        && splitOffsets.length != 0
+        && splitOffsets[splitOffsets.length - 1] < fileSizeInBytes;
   }
 
   @Override
diff --git a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
index 0dff941..cfe3cb6 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java
@@ -25,19 +25,26 @@
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.BaseFileScanTask;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MergeableScanTask;
 import org.apache.iceberg.MockFileScanTask;
 import org.apache.iceberg.PartitionScanTask;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.ScanTask;
 import org.apache.iceberg.ScanTaskGroup;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.SplittableScanTask;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableTestBase;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -127,6 +134,45 @@
   }
 
   @Test
+  public void testTaskGroupPlanningCorruptedOffset() {
+    DataFile dataFile =
+        DataFiles.builder(TableTestBase.SPEC)
+            .withPath("/path/to/data-a.parquet")
+            .withFileSizeInBytes(10)
+            .withPartitionPath("data_bucket=0")
+            .withRecordCount(1)
+            .withSplitOffsets(
+                ImmutableList.of(2L, 12L)) // the last offset is beyond the end of the file
+            .build();
+
+    ResidualEvaluator residualEvaluator =
+        ResidualEvaluator.of(TableTestBase.SPEC, Expressions.equal("id", 1), false);
+
+    BaseFileScanTask baseFileScanTask =
+        new BaseFileScanTask(
+            dataFile,
+            null,
+            SchemaParser.toJson(TableTestBase.SCHEMA),
+            PartitionSpecParser.toJson(TableTestBase.SPEC),
+            residualEvaluator);
+
+    List<BaseFileScanTask> baseFileScanTasks = ImmutableList.of(baseFileScanTask);
+
+    int taskCount = 0;
+    for (ScanTaskGroup<BaseFileScanTask> task :
+        TableScanUtil.planTaskGroups(CloseableIterable.withNoopClose(baseFileScanTasks), 1, 1, 0)) {
+      for (FileScanTask fileScanTask : task.tasks()) {
+        DataFile taskDataFile = fileScanTask.file();
+        Assertions.assertThat(taskDataFile.splitOffsets()).isNull();
+        taskCount++;
+      }
+    }
+
+    // 10 tasks since the split offsets are ignored and there are 1 byte splits for a 10 byte file
+    Assertions.assertThat(taskCount).isEqualTo(10);
+  }
+
+  @Test
   public void testTaskMerging() {
     List<ParentTask> tasks =
         ImmutableList.of(