Flink: Don't fail to serialize IcebergSourceSplit when there is too many delete files (#9464)
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
index e4bfbf1..44e37af 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
@@ -132,6 +132,14 @@
}
byte[] serializeV2() throws IOException {
+ return serialize(2);
+ }
+
+ byte[] serializeV3() throws IOException {
+ return serialize(3);
+ }
+
+ private byte[] serialize(int version) throws IOException {
if (serializedBytesCache == null) {
DataOutputSerializer out = SERIALIZER_CACHE.get();
Collection<FileScanTask> fileScanTasks = task.tasks();
@@ -147,7 +155,7 @@
for (FileScanTask fileScanTask : fileScanTasks) {
String taskJson = FileScanTaskParser.toJson(fileScanTask);
- out.writeUTF(taskJson);
+ writeTaskJson(out, taskJson, version);
}
serializedBytesCache = out.getCopyOfBuffer();
@@ -157,8 +165,32 @@
return serializedBytesCache;
}
+ private static void writeTaskJson(DataOutputSerializer out, String taskJson, int version)
+ throws IOException {
+ switch (version) {
+ case 2:
+ out.writeUTF(taskJson);
+ break;
+ case 3:
+ SerializerHelper.writeLongUTF(out, taskJson);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported version: " + version);
+ }
+ }
+
static IcebergSourceSplit deserializeV2(byte[] serialized, boolean caseSensitive)
throws IOException {
+ return deserialize(serialized, caseSensitive, 2);
+ }
+
+ static IcebergSourceSplit deserializeV3(byte[] serialized, boolean caseSensitive)
+ throws IOException {
+ return deserialize(serialized, caseSensitive, 3);
+ }
+
+ private static IcebergSourceSplit deserialize(
+ byte[] serialized, boolean caseSensitive, int version) throws IOException {
DataInputDeserializer in = new DataInputDeserializer(serialized);
int fileOffset = in.readInt();
long recordOffset = in.readLong();
@@ -166,7 +198,7 @@
List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
for (int i = 0; i < taskCount; ++i) {
- String taskJson = in.readUTF();
+ String taskJson = readTaskJson(in, version);
FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
tasks.add(task);
}
@@ -174,4 +206,15 @@
CombinedScanTask combinedScanTask = new BaseCombinedScanTask(tasks);
return IcebergSourceSplit.fromCombinedScanTask(combinedScanTask, fileOffset, recordOffset);
}
+
+ private static String readTaskJson(DataInputDeserializer in, int version) throws IOException {
+ switch (version) {
+ case 2:
+ return in.readUTF();
+ case 3:
+ return SerializerHelper.readLongUTF(in);
+ default:
+ throw new IllegalArgumentException("Unsupported version: " + version);
+ }
+ }
}
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
index 8c08981..d4b0f9e 100644
--- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java
@@ -24,7 +24,7 @@
@Internal
public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer<IcebergSourceSplit> {
- private static final int VERSION = 2;
+ private static final int VERSION = 3;
private final boolean caseSensitive;
@@ -39,7 +39,7 @@
@Override
public byte[] serialize(IcebergSourceSplit split) throws IOException {
- return split.serializeV2();
+ return split.serializeV3();
}
@Override
@@ -49,6 +49,8 @@
return IcebergSourceSplit.deserializeV1(serialized);
case 2:
return IcebergSourceSplit.deserializeV2(serialized, caseSensitive);
+ case 3:
+ return IcebergSourceSplit.deserializeV3(serialized, caseSensitive);
default:
throw new IOException(
String.format(
diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java
new file mode 100644
index 0000000..a0395f2
--- /dev/null
+++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.source.split;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UTFDataFormatException;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+/**
+ * Helper class to serialize and deserialize strings longer than 65K. The inspiration is mostly
+ * taken from the class org.apache.flink.core.memory.DataInputSerializer.readUTF and
+ * org.apache.flink.core.memory.DataOutputSerializer.writeUTF.
+ */
+class SerializerHelper implements Serializable {
+
+ private SerializerHelper() {}
+
+ /**
+ * Similar to {@link DataOutputSerializer#writeUTF(String)}. Except this supports larger payloads
+ * which is up to max integer value.
+ *
+ * <p>Note: This method can be removed when the method which does similar thing within the {@link
+ * DataOutputSerializer} already which does the same thing, so use that one instead once that is
+ * released on Flink version 1.20.
+ *
+ * <p>See * <a href="https://issues.apache.org/jira/browse/FLINK-34228">FLINK-34228</a> * <a
+ * href="https://github.com/apache/flink/pull/24191">https://github.com/apache/flink/pull/24191</a>
+ *
+ * @param out the output stream to write the string to.
+ * @param str the string value to be written.
+ */
+ public static void writeLongUTF(DataOutputSerializer out, String str) throws IOException {
+ int strlen = str.length();
+ long utflen = 0;
+ int ch;
+
+ /* use charAt instead of copying String to char array */
+ for (int i = 0; i < strlen; i++) {
+ ch = str.charAt(i);
+ utflen += getUTFBytesSize(ch);
+
+ if (utflen > Integer.MAX_VALUE) {
+ throw new UTFDataFormatException("Encoded string reached maximum length: " + utflen);
+ }
+ }
+
+ if (utflen > Integer.MAX_VALUE - 4) {
+ throw new UTFDataFormatException("Encoded string is too long: " + utflen);
+ }
+
+ out.writeInt((int) utflen);
+ writeUTFBytes(out, str, (int) utflen);
+ }
+
+ /**
+ * Similar to {@link DataInputDeserializer#readUTF()}. Except this supports larger payloads which
+ * is up to max integer value.
+ *
+ * <p>Note: This method can be removed when the method which does similar thing within the {@link
+ * DataOutputSerializer} already which does the same thing, so use that one instead once that is
+ * released on Flink version 1.20.
+ *
+ * <p>See * <a href="https://issues.apache.org/jira/browse/FLINK-34228">FLINK-34228</a> * <a
+ * href="https://github.com/apache/flink/pull/24191">https://github.com/apache/flink/pull/24191</a>
+ *
+ * @param in the input stream to read the string from.
+ * @return the string value read from the input stream.
+ * @throws IOException if an I/O error occurs when reading from the input stream.
+ */
+ public static String readLongUTF(DataInputDeserializer in) throws IOException {
+ int utflen = in.readInt();
+ byte[] bytearr = new byte[utflen];
+ char[] chararr = new char[utflen];
+
+ int ch;
+ int char2;
+ int char3;
+ int count = 0;
+ int chararrCount = 0;
+
+ in.readFully(bytearr, 0, utflen);
+
+ while (count < utflen) {
+ ch = (int) bytearr[count] & 0xff;
+ if (ch > 127) {
+ break;
+ }
+ count++;
+ chararr[chararrCount++] = (char) ch;
+ }
+
+ while (count < utflen) {
+ ch = (int) bytearr[count] & 0xff;
+ switch (ch >> 4) {
+ case 0:
+ case 1:
+ case 2:
+ case 3:
+ case 4:
+ case 5:
+ case 6:
+ case 7:
+ /* 0xxxxxxx */
+ count++;
+ chararr[chararrCount++] = (char) ch;
+ break;
+ case 12:
+ case 13:
+ /* 110x xxxx 10xx xxxx */
+ count += 2;
+ if (count > utflen) {
+ throw new UTFDataFormatException("malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 1];
+ if ((char2 & 0xC0) != 0x80) {
+ throw new UTFDataFormatException("malformed input around byte " + count);
+ }
+ chararr[chararrCount++] = (char) (((ch & 0x1F) << 6) | (char2 & 0x3F));
+ break;
+ case 14:
+ /* 1110 xxxx 10xx xxxx 10xx xxxx */
+ count += 3;
+ if (count > utflen) {
+ throw new UTFDataFormatException("malformed input: partial character at end");
+ }
+ char2 = (int) bytearr[count - 2];
+ char3 = (int) bytearr[count - 1];
+ if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+ throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+ }
+ chararr[chararrCount++] =
+ (char) (((ch & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
+ break;
+ default:
+ /* 10xx xxxx, 1111 xxxx */
+ throw new UTFDataFormatException("malformed input around byte " + count);
+ }
+ }
+ // The number of chars produced may be less than utflen
+ return new String(chararr, 0, chararrCount);
+ }
+
+ private static int getUTFBytesSize(int ch) {
+ if ((ch >= 0x0001) && (ch <= 0x007F)) {
+ return 1;
+ } else if (ch > 0x07FF) {
+ return 3;
+ } else {
+ return 2;
+ }
+ }
+
+ private static void writeUTFBytes(DataOutputSerializer out, String str, int utflen)
+ throws IOException {
+ int strlen = str.length();
+ int ch;
+
+ int len = Math.max(1024, utflen);
+
+ byte[] bytearr = new byte[len];
+ int count = 0;
+
+ int index;
+ for (index = 0; index < strlen; index++) {
+ ch = str.charAt(index);
+ if (!((ch >= 0x0001) && (ch <= 0x007F))) {
+ break;
+ }
+ bytearr[count++] = (byte) ch;
+ }
+
+ for (; index < strlen; index++) {
+ ch = str.charAt(index);
+ if ((ch >= 0x0001) && (ch <= 0x007F)) {
+ bytearr[count++] = (byte) ch;
+ } else if (ch > 0x07FF) {
+ bytearr[count++] = (byte) (0xE0 | ((ch >> 12) & 0x0F));
+ bytearr[count++] = (byte) (0x80 | ((ch >> 6) & 0x3F));
+ bytearr[count++] = (byte) (0x80 | (ch & 0x3F));
+ } else {
+ bytearr[count++] = (byte) (0xC0 | ((ch >> 6) & 0x1F));
+ bytearr[count++] = (byte) (0x80 | (ch & 0x3F));
+ }
+ }
+
+ out.write(bytearr, 0, count);
+ }
+}
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
index 3a80715..ebd220b 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java
@@ -18,19 +18,30 @@
*/
package org.apache.iceberg.flink.source;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
import java.io.File;
+import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.BaseFileScanTask;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.HadoopCatalog;
@@ -129,4 +140,64 @@
catalog.close();
}
}
+
+ /**
+ * This method will equip the {@code icebergSourceSplits} with mock delete files.
+ * <li>For each split, create {@code deleteFilesPerSplit} number of delete files
+ * <li>Replace the original {@code FileScanTask} with the new {@code FileScanTask} with mock
+ * <li>Caller should not attempt to read the deleted files since they are created as mock, and
+ * they are not real files
+ *
+ * @param icebergSourceSplits The real splits to equip with mock delete files
+ * @param temporaryFolder The temporary folder to create the mock delete files with
+ * @param deleteFilesPerSplit The number of delete files to create for each split
+ * @return The list of re-created splits with mock delete files
+ * @throws IOException If there is any error creating the mock delete files
+ */
+ public static List<IcebergSourceSplit> equipSplitsWithMockDeleteFiles(
+ List<IcebergSourceSplit> icebergSourceSplits,
+ TemporaryFolder temporaryFolder,
+ int deleteFilesPerSplit)
+ throws IOException {
+ List<IcebergSourceSplit> icebergSourceSplitsWithMockDeleteFiles = Lists.newArrayList();
+ for (IcebergSourceSplit split : icebergSourceSplits) {
+ final CombinedScanTask combinedScanTask = spy(split.task());
+
+ final List<DeleteFile> deleteFiles = Lists.newArrayList();
+ final PartitionSpec spec =
+ PartitionSpec.builderFor(TestFixtures.SCHEMA).withSpecId(0).build();
+
+ for (int i = 0; i < deleteFilesPerSplit; ++i) {
+ final DeleteFile deleteFile =
+ FileMetadata.deleteFileBuilder(spec)
+ .withFormat(FileFormat.PARQUET)
+ .withPath(temporaryFolder.newFile().getPath())
+ .ofPositionDeletes()
+ .withFileSizeInBytes(1000)
+ .withRecordCount(1000)
+ .build();
+ deleteFiles.add(deleteFile);
+ }
+
+ List<FileScanTask> newFileScanTasks = Lists.newArrayList();
+ for (FileScanTask task : combinedScanTask.tasks()) {
+ String schemaString = SchemaParser.toJson(task.schema());
+ String specString = PartitionSpecParser.toJson(task.spec());
+
+ BaseFileScanTask baseFileScanTask =
+ new BaseFileScanTask(
+ task.file(),
+ deleteFiles.toArray(new DeleteFile[] {}),
+ schemaString,
+ specString,
+ ResidualEvaluator.unpartitioned(task.residual()));
+ newFileScanTasks.add(baseFileScanTask);
+ }
+ doReturn(newFileScanTasks).when(combinedScanTask).tasks();
+ icebergSourceSplitsWithMockDeleteFiles.add(
+ IcebergSourceSplit.fromCombinedScanTask(
+ combinedScanTask, split.fileOffset(), split.recordOffset()));
+ }
+ return icebergSourceSplitsWithMockDeleteFiles;
+ }
}
diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
index cd77830..c72d622 100644
--- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
+++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java
@@ -101,6 +101,26 @@
}
@Test
+ public void testV3WithTooManyDeleteFiles() throws Exception {
+ serializeAndDeserializeV3(1, 1, 5000);
+ }
+
+ private void serializeAndDeserializeV3(int splitCount, int filesPerSplit, int mockDeletesPerSplit)
+ throws Exception {
+ final List<IcebergSourceSplit> splits =
+ SplitHelpers.createSplitsFromTransientHadoopTable(
+ TEMPORARY_FOLDER, splitCount, filesPerSplit);
+ final List<IcebergSourceSplit> splitsWithMockDeleteFiles =
+ SplitHelpers.equipSplitsWithMockDeleteFiles(splits, TEMPORARY_FOLDER, mockDeletesPerSplit);
+
+ for (IcebergSourceSplit split : splitsWithMockDeleteFiles) {
+ byte[] result = split.serializeV3();
+ IcebergSourceSplit deserialized = IcebergSourceSplit.deserializeV3(result, true);
+ assertSplitEquals(split, deserialized);
+ }
+ }
+
+ @Test
public void testDeserializeV1() throws Exception {
final List<IcebergSourceSplit> splits =
SplitHelpers.createSplitsFromTransientHadoopTable(TEMPORARY_FOLDER, 1, 1);