[core] Fix deletion vectors mode with custom sequence field (#4075)
diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index ca70694..328015e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -18,6 +18,7 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
@@ -52,6 +53,7 @@
private final LookupLevels<T> lookupLevels;
private final MergeFunctionWrapperFactory<T> wrapperFactory;
+ private final boolean noSequenceField;
@Nullable private final DeletionVectorsMaintainer dvMaintainer;
public LookupMergeTreeCompactRewriter(
@@ -66,7 +68,8 @@
MergeSorter mergeSorter,
MergeFunctionWrapperFactory<T> wrapperFactory,
boolean produceChangelog,
- @Nullable DeletionVectorsMaintainer dvMaintainer) {
+ @Nullable DeletionVectorsMaintainer dvMaintainer,
+ CoreOptions options) {
super(
maxLevel,
mergeEngine,
@@ -81,6 +84,7 @@
this.dvMaintainer = dvMaintainer;
this.lookupLevels = lookupLevels;
this.wrapperFactory = wrapperFactory;
+ this.noSequenceField = options.sequenceField().isEmpty();
}
@Override
@@ -114,7 +118,7 @@
// DEDUPLICATE retains the latest records as the final result, so merging has no impact on
// it at all.
- if (mergeEngine == MergeEngine.DEDUPLICATE) {
+ if (mergeEngine == MergeEngine.DEDUPLICATE && noSequenceField) {
return CHANGELOG_NO_REWRITE;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 9f4570c..e0018a1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -318,7 +318,8 @@
? new PositionedKeyValueProcessor(
valueType,
lookupStrategy.produceChangelog
- || mergeEngine != DEDUPLICATE)
+ || mergeEngine != DEDUPLICATE
+ || !options.sequenceField().isEmpty())
: new KeyValueProcessor(valueType);
wrapperFactory =
new LookupMergeFunctionWrapperFactory<>(
@@ -339,7 +340,8 @@
mergeSorter,
wrapperFactory,
lookupStrategy.produceChangelog,
- dvMaintainer);
+ dvMaintainer,
+ options);
} else {
return new MergeTreeCompactRewriter(
readerFactory,
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index 9f27f94..832cdf2 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -245,4 +245,21 @@
}
}
}
+
+ @ParameterizedTest
+ @ValueSource(strings = {"none", "lookup"})
+ public void testBatchReadDVTableWithSequenceField(String changelogProducer) {
+ sql(
+ String.format(
+ "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, sequence INT, name STRING) "
+ + "WITH ('deletion-vectors.enabled' = 'true', 'sequence.field' = 'sequence', 'changelog-producer' = '%s')",
+ changelogProducer));
+
+ sql("INSERT INTO T VALUES (1, 1, '1'), (2, 1, '2')");
+ sql("INSERT INTO T VALUES (1, 2, '1_1'), (2, 2, '2_1')");
+ sql("INSERT INTO T VALUES (1, 3, '1_2'), (2, 1, '2_2')");
+
+ assertThat(batchSql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(Row.of(1, 3, "1_2"), Row.of(2, 2, "2_1"));
+ }
}