Flink: Don't fail to serialize IcebergSourceSplit when there is too many delete files (#9464)

diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
index e4bfbf1..44e37af 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
@@ -132,6 +132,14 @@
   }
 
   byte[] serializeV2() throws IOException {
+    return serialize(2);
+  }
+
+  byte[] serializeV3() throws IOException {
+    return serialize(3);
+  }
+
+  private byte[] serialize(int version) throws IOException {
     if (serializedBytesCache == null) {
       DataOutputSerializer out = SERIALIZER_CACHE.get();
       Collection<FileScanTask> fileScanTasks = task.tasks();
@@ -147,7 +155,7 @@
 
       for (FileScanTask fileScanTask : fileScanTasks) {
         String taskJson = FileScanTaskParser.toJson(fileScanTask);
-        out.writeUTF(taskJson);
+        writeTaskJson(out, taskJson, version);
       }
 
       serializedBytesCache = out.getCopyOfBuffer();
@@ -157,8 +165,32 @@
     return serializedBytesCache;
   }
 
+  private static void writeTaskJson(DataOutputSerializer out, String taskJson, int version)
+      throws IOException {
+    switch (version) {
+      case 2:
+        out.writeUTF(taskJson);
+        break;
+      case 3:
+        SerializerHelper.writeLongUTF(out, taskJson);
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported version: " + version);
+    }
+  }
+
   static IcebergSourceSplit deserializeV2(byte[] serialized, boolean caseSensitive)
       throws IOException {
+    return deserialize(serialized, caseSensitive, 2);
+  }
+
+  static IcebergSourceSplit deserializeV3(byte[] serialized, boolean caseSensitive)
+      throws IOException {
+    return deserialize(serialized, caseSensitive, 3);
+  }
+
+  private static IcebergSourceSplit deserialize(
+      byte[] serialized, boolean caseSensitive, int version) throws IOException {
     DataInputDeserializer in = new DataInputDeserializer(serialized);
     int fileOffset = in.readInt();
     long recordOffset = in.readLong();
@@ -166,7 +198,7 @@
 
     List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
     for (int i = 0; i < taskCount; ++i) {
-      String taskJson = in.readUTF();
+      String taskJson = readTaskJson(in, version);
       FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
       tasks.add(task);
     }
@@ -174,4 +206,15 @@
     CombinedScanTask combinedScanTask = new BaseCombinedScanTask(tasks);
     return IcebergSourceSplit.fromCombinedScanTask(combinedScanTask, fileOffset, recordOffset);
   }
+
+  private static String readTaskJson(DataInputDeserializer in, int version) throws IOException {
+    switch (version) {
+      case 2:
+        return in.readUTF();
+      case 3:
+        return SerializerHelper.readLongUTF(in);
+      default:
+        throw new IllegalArgumentException("Unsupported version: " + version);
+    }
+  }
 }
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
index 8c08981..d4b0f9e 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
@@ -24,7 +24,7 @@
 
 @Internal
 public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer<IcebergSourceSplit> {
-  private static final int VERSION = 2;
+  private static final int VERSION = 3;
 
   private final boolean caseSensitive;
 
@@ -39,7 +39,7 @@
 
   @Override
   public byte[] serialize(IcebergSourceSplit split) throws IOException {
-    return split.serializeV2();
+    return split.serializeV3();
   }
 
   @Override
@@ -49,6 +49,8 @@
         return IcebergSourceSplit.deserializeV1(serialized);
       case 2:
         return IcebergSourceSplit.deserializeV2(serialized, caseSensitive);
+      case 3:
+        return IcebergSourceSplit.deserializeV3(serialized, caseSensitive);
       default:
         throw new IOException(
             String.format(
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java
new file mode 100644
index 0000000..a0395f2
--- /dev/null
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.split;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UTFDataFormatException;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+/**
+ * Helper class to serialize and deserialize strings longer than 65K. The inspiration is mostly
+ * taken from the class org.apache.flink.core.memory.DataInputSerializer.readUTF and
+ * org.apache.flink.core.memory.DataOutputSerializer.writeUTF.
+ */
+class SerializerHelper implements Serializable {
+
+  private SerializerHelper() {}
+
+  /**
+   * Similar to {@link DataOutputSerializer#writeUTF(String)}. Except this supports larger payloads
+   * which is up to max integer value.
+   *
+   * <p>Note: This method can be removed when the method which does similar thing within the {@link
+   * DataOutputSerializer} already which does the same thing, so use that one instead once that is
+   * released on Flink version 1.20.
+   *
+   * <p>See * <a href="https://issues.apache.org/jira/browse/FLINK-34228">FLINK-34228</a> * <a
+   * href="https://github.com/apache/flink/pull/24191">https://github.com/apache/flink/pull/24191</a>
+   *
+   * @param out the output stream to write the string to.
+   * @param str the string value to be written.
+   */
+  public static void writeLongUTF(DataOutputSerializer out, String str) throws IOException {
+    int strlen = str.length();
+    long utflen = 0;
+    int ch;
+
+    /* use charAt instead of copying String to char array */
+    for (int i = 0; i < strlen; i++) {
+      ch = str.charAt(i);
+      utflen += getUTFBytesSize(ch);
+
+      if (utflen > Integer.MAX_VALUE) {
+        throw new UTFDataFormatException("Encoded string reached maximum length: " + utflen);
+      }
+    }
+
+    if (utflen > Integer.MAX_VALUE - 4) {
+      throw new UTFDataFormatException("Encoded string is too long: " + utflen);
+    }
+
+    out.writeInt((int) utflen);
+    writeUTFBytes(out, str, (int) utflen);
+  }
+
+  /**
+   * Similar to {@link DataInputDeserializer#readUTF()}. Except this supports larger payloads which
+   * is up to max integer value.
+   *
+   * <p>Note: This method can be removed when the method which does similar thing within the {@link
+   * DataOutputSerializer} already which does the same thing, so use that one instead once that is
+   * released on Flink version 1.20.
+   *
+   * <p>See * <a href="https://issues.apache.org/jira/browse/FLINK-34228">FLINK-34228</a> * <a
+   * href="https://github.com/apache/flink/pull/24191">https://github.com/apache/flink/pull/24191</a>
+   *
+   * @param in the input stream to read the string from.
+   * @return the string value read from the input stream.
+   * @throws IOException if an I/O error occurs when reading from the input stream.
+   */
+  public static String readLongUTF(DataInputDeserializer in) throws IOException {
+    int utflen = in.readInt();
+    byte[] bytearr = new byte[utflen];
+    char[] chararr = new char[utflen];
+
+    int ch;
+    int char2;
+    int char3;
+    int count = 0;
+    int chararrCount = 0;
+
+    in.readFully(bytearr, 0, utflen);
+
+    while (count < utflen) {
+      ch = (int) bytearr[count] & 0xff;
+      if (ch > 127) {
+        break;
+      }
+      count++;
+      chararr[chararrCount++] = (char) ch;
+    }
+
+    while (count < utflen) {
+      ch = (int) bytearr[count] & 0xff;
+      switch (ch >> 4) {
+        case 0:
+        case 1:
+        case 2:
+        case 3:
+        case 4:
+        case 5:
+        case 6:
+        case 7:
+          /* 0xxxxxxx */
+          count++;
+          chararr[chararrCount++] = (char) ch;
+          break;
+        case 12:
+        case 13:
+          /* 110x xxxx 10xx xxxx */
+          count += 2;
+          if (count > utflen) {
+            throw new UTFDataFormatException("malformed input: partial character at end");
+          }
+          char2 = (int) bytearr[count - 1];
+          if ((char2 & 0xC0) != 0x80) {
+            throw new UTFDataFormatException("malformed input around byte " + count);
+          }
+          chararr[chararrCount++] = (char) (((ch & 0x1F) << 6) | (char2 & 0x3F));
+          break;
+        case 14:
+          /* 1110 xxxx 10xx xxxx 10xx xxxx */
+          count += 3;
+          if (count > utflen) {
+            throw new UTFDataFormatException("malformed input: partial character at end");
+          }
+          char2 = (int) bytearr[count - 2];
+          char3 = (int) bytearr[count - 1];
+          if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+            throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+          }
+          chararr[chararrCount++] =
+              (char) (((ch & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
+          break;
+        default:
+          /* 10xx xxxx, 1111 xxxx */
+          throw new UTFDataFormatException("malformed input around byte " + count);
+      }
+    }
+    // The number of chars produced may be less than utflen
+    return new String(chararr, 0, chararrCount);
+  }
+
+  private static int getUTFBytesSize(int ch) {
+    if ((ch >= 0x0001) && (ch <= 0x007F)) {
+      return 1;
+    } else if (ch > 0x07FF) {
+      return 3;
+    } else {
+      return 2;
+    }
+  }
+
+  private static void writeUTFBytes(DataOutputSerializer out, String str, int utflen)
+      throws IOException {
+    int strlen = str.length();
+    int ch;
+
+    int len = Math.max(1024, utflen);
+
+    byte[] bytearr = new byte[len];
+    int count = 0;
+
+    int index;
+    for (index = 0; index < strlen; index++) {
+      ch = str.charAt(index);
+      if (!((ch >= 0x0001) && (ch <= 0x007F))) {
+        break;
+      }
+      bytearr[count++] = (byte) ch;
+    }
+
+    for (; index < strlen; index++) {
+      ch = str.charAt(index);
+      if ((ch >= 0x0001) && (ch <= 0x007F)) {
+        bytearr[count++] = (byte) ch;
+      } else if (ch > 0x07FF) {
+        bytearr[count++] = (byte) (0xE0 | ((ch >> 12) & 0x0F));
+        bytearr[count++] = (byte) (0x80 | ((ch >> 6) & 0x3F));
+        bytearr[count++] = (byte) (0x80 | (ch & 0x3F));
+      } else {
+        bytearr[count++] = (byte) (0xC0 | ((ch >> 6) & 0x1F));
+        bytearr[count++] = (byte) (0x80 | (ch & 0x3F));
+      }
+    }
+
+    out.write(bytearr, 0, count);
+  }
+}
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
index 3a80715..ebd220b 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
@@ -18,19 +18,30 @@
  */
 package org.apache.iceberg.flink.source;
 
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.BaseFileScanTask;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileMetadata;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericAppenderHelper;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
 import org.apache.iceberg.hadoop.HadoopCatalog;
@@ -129,4 +140,64 @@
       catalog.close();
     }
   }
+
+  /**
+   * This method will equip the {@code icebergSourceSplits} with mock delete files.
+   * <li>For each split, create {@code deleteFilesPerSplit} number of delete files
+   * <li>Replace the original {@code FileScanTask} with the new {@code FileScanTask} with mock
+   * <li>Caller should not attempt to read the deleted files since they are created as mock, and
+   *     they are not real files
+   *
+   * @param icebergSourceSplits The real splits to equip with mock delete files
+   * @param temporaryFolder The temporary folder to create the mock delete files with
+   * @param deleteFilesPerSplit The number of delete files to create for each split
+   * @return The list of re-created splits with mock delete files
+   * @throws IOException If there is any error creating the mock delete files
+   */
+  public static List<IcebergSourceSplit> equipSplitsWithMockDeleteFiles(
+      List<IcebergSourceSplit> icebergSourceSplits,
+      TemporaryFolder temporaryFolder,
+      int deleteFilesPerSplit)
+      throws IOException {
+    List<IcebergSourceSplit> icebergSourceSplitsWithMockDeleteFiles = Lists.newArrayList();
+    for (IcebergSourceSplit split : icebergSourceSplits) {
+      final CombinedScanTask combinedScanTask = spy(split.task());
+
+      final List<DeleteFile> deleteFiles = Lists.newArrayList();
+      final PartitionSpec spec =
+          PartitionSpec.builderFor(TestFixtures.SCHEMA).withSpecId(0).build();
+
+      for (int i = 0; i < deleteFilesPerSplit; ++i) {
+        final DeleteFile deleteFile =
+            FileMetadata.deleteFileBuilder(spec)
+                .withFormat(FileFormat.PARQUET)
+                .withPath(temporaryFolder.newFile().getPath())
+                .ofPositionDeletes()
+                .withFileSizeInBytes(1000)
+                .withRecordCount(1000)
+                .build();
+        deleteFiles.add(deleteFile);
+      }
+
+      List<FileScanTask> newFileScanTasks = Lists.newArrayList();
+      for (FileScanTask task : combinedScanTask.tasks()) {
+        String schemaString = SchemaParser.toJson(task.schema());
+        String specString = PartitionSpecParser.toJson(task.spec());
+
+        BaseFileScanTask baseFileScanTask =
+            new BaseFileScanTask(
+                task.file(),
+                deleteFiles.toArray(new DeleteFile[] {}),
+                schemaString,
+                specString,
+                ResidualEvaluator.unpartitioned(task.residual()));
+        newFileScanTasks.add(baseFileScanTask);
+      }
+      doReturn(newFileScanTasks).when(combinedScanTask).tasks();
+      icebergSourceSplitsWithMockDeleteFiles.add(
+          IcebergSourceSplit.fromCombinedScanTask(
+              combinedScanTask, split.fileOffset(), split.recordOffset()));
+    }
+    return icebergSourceSplitsWithMockDeleteFiles;
+  }
 }
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
index cd77830..c72d622 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
@@ -101,6 +101,26 @@
   }
 
   @Test
+  public void testV3WithTooManyDeleteFiles() throws Exception {
+    serializeAndDeserializeV3(1, 1, 5000);
+  }
+
+  private void serializeAndDeserializeV3(int splitCount, int filesPerSplit, int mockDeletesPerSplit)
+      throws Exception {
+    final List<IcebergSourceSplit> splits =
+        SplitHelpers.createSplitsFromTransientHadoopTable(
+            TEMPORARY_FOLDER, splitCount, filesPerSplit);
+    final List<IcebergSourceSplit> splitsWithMockDeleteFiles =
+        SplitHelpers.equipSplitsWithMockDeleteFiles(splits, TEMPORARY_FOLDER, mockDeletesPerSplit);
+
+    for (IcebergSourceSplit split : splitsWithMockDeleteFiles) {
+      byte[] result = split.serializeV3();
+      IcebergSourceSplit deserialized = IcebergSourceSplit.deserializeV3(result, true);
+      assertSplitEquals(split, deserialized);
+    }
+  }
+
+  @Test
   public void testDeserializeV1() throws Exception {
     final List<IcebergSourceSplit> splits =
         SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);