IGNITE-21992 Fix remove-after-insert in Data Streamer within the same batch (#3713)
Within the same streamer batch, skip previous insert operations when a delete operation is encountered.
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index af8e4a3..169cf67 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -48,7 +48,6 @@
import org.apache.ignite.tx.TransactionOptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
@@ -290,8 +289,7 @@
@ParameterizedTest
@ValueSource(ints = {1, 2, 3})
- @Disabled("IGNITE-21992 Data Streamer removal does not work for a new key in the same batch")
- public void testSameItemInsertUpdateRemove(int pageSize) {
+ public void testSameItemInsertRemove(int pageSize) {
RecordView<Tuple> view = defaultTable().recordView();
CompletableFuture<Void> streamerFut;
int key = 333000;
@@ -308,6 +306,27 @@
assertNull(view.get(null, tupleKey(key)));
}
+ @ParameterizedTest
+ @ValueSource(ints = {1, 2, 3})
+ public void testSameItemInsertRemoveInsertUpdate(int pageSize) {
+ RecordView<Tuple> view = defaultTable().recordView();
+ CompletableFuture<Void> streamerFut;
+ int key = 333001;
+
+ try (var publisher = new SubmissionPublisher<DataStreamerItem<Tuple>>()) {
+ streamerFut = view.streamData(publisher, DataStreamerOptions.builder().pageSize(pageSize).build());
+
+ publisher.submit(DataStreamerItem.of(tuple(key, "foo")));
+ publisher.submit(DataStreamerItem.removed(tupleKey(key)));
+ publisher.submit(DataStreamerItem.of(tuple(key, "bar")));
+ publisher.submit(DataStreamerItem.of(tuple(key, "baz")));
+ }
+
+ streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
+
+ assertEquals("baz", view.get(null, tupleKey(key)).stringValue("name"));
+ }
+
@SuppressWarnings("resource")
@Test
public void testSchemaUpdateWhileStreaming() throws InterruptedException {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 4c5bfaa..aa3aab5 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -2219,7 +2219,7 @@
case RW_DELETE_EXACT_ALL: {
CompletableFuture<RowId>[] deleteExactLockFuts = new CompletableFuture[searchRows.size()];
- Map<UUID, HybridTimestamp> lastCommitTimes = new HashMap<>();
+ Map<UUID, HybridTimestamp> lastCommitTimes = new ConcurrentHashMap<>();
for (int i = 0; i < searchRows.size(); i++) {
BinaryRow searchRow = searchRows.get(i);
@@ -2353,28 +2353,41 @@
}
case RW_UPSERT_ALL: {
CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>[] rowIdFuts = new CompletableFuture[searchRows.size()];
+ BinaryTuple[] pks = new BinaryTuple[searchRows.size()];
- Map<UUID, HybridTimestamp> lastCommitTimes = new HashMap<>();
+ Map<UUID, HybridTimestamp> lastCommitTimes = new ConcurrentHashMap<>();
BitSet deleted = request.deleted();
// When the same key is updated multiple times within the same batch, we need to maintain operation order and apply
// only the last update. This map stores the previous searchRows index for each key.
- Map<ByteBuffer, Integer> newKeyMap = new HashMap<>();
+ Map<ByteBuffer, Integer> prevRowIdx = new HashMap<>();
for (int i = 0; i < searchRows.size(); i++) {
BinaryRow searchRow = searchRows.get(i);
-
boolean isDelete = deleted != null && deleted.get(i);
BinaryTuple pk = isDelete
? resolvePk(searchRow.tupleSlice())
: extractPk(searchRow);
- int rowIdx = i;
+ pks[i] = pk;
- rowIdFuts[i] = resolveRowByPk(pk, txId, (rowId, row, lastCommitTime) -> {
+ Integer prevRowIdx0 = prevRowIdx.put(pk.byteBuffer(), i);
+ if (prevRowIdx0 != null) {
+ rowIdFuts[prevRowIdx0] = nullCompletedFuture(); // Skip previous row with the same key.
+ }
+ }
+
+ for (int i = 0; i < searchRows.size(); i++) {
+ if (rowIdFuts[i] != null) {
+ continue; // Skip previous row with the same key.
+ }
+
+ BinaryRow searchRow = searchRows.get(i);
+ boolean isDelete = deleted != null && deleted.get(i);
+
+ rowIdFuts[i] = resolveRowByPk(pks[i], txId, (rowId, row, lastCommitTime) -> {
if (isDelete && rowId == null) {
- // Does not exist, nothing to delete.
return nullCompletedFuture();
}
@@ -2383,34 +2396,16 @@
lastCommitTimes.put(rowId.uuid(), lastCommitTime);
}
- boolean insert = rowId == null;
- RowId rowId0;
-
- if (insert) {
- Integer prevRowIdx = newKeyMap.put(pk.byteBuffer(), rowIdx);
-
- if (prevRowIdx != null) {
- // Return existing lock.
- CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> lockFut = rowIdFuts[prevRowIdx];
-
- // Skip previous update with the same key.
- rowIdFuts[prevRowIdx] = nullCompletedFuture();
-
- return lockFut;
- }
-
- rowId0 = new RowId(partId(), UUID.randomUUID());
- } else {
- rowId0 = rowId;
- }
-
if (isDelete) {
assert row != null;
- return takeLocksForDelete(row, rowId0, txId)
+ return takeLocksForDelete(row, rowId, txId)
.thenApply(id -> new IgniteBiTuple<>(id, null));
}
+ boolean insert = rowId == null;
+ RowId rowId0 = insert ? new RowId(partId(), UUID.randomUUID()) : rowId;
+
return insert
? takeLocksForInsert(searchRow, rowId0, txId)
: takeLocksForUpdate(searchRow, rowId0, txId);
@@ -2526,7 +2521,7 @@
case RW_DELETE_ALL: {
CompletableFuture<RowId>[] rowIdLockFuts = new CompletableFuture[primaryKeys.size()];
- Map<UUID, HybridTimestamp> lastCommitTimes = new HashMap<>();
+ Map<UUID, HybridTimestamp> lastCommitTimes = new ConcurrentHashMap<>();
for (int i = 0; i < primaryKeys.size(); i++) {
rowIdLockFuts[i] = resolveRowByPk(primaryKeys.get(i), txId, (rowId, row, lastCommitTime) -> {