IGNITE-21992 Fix remove-after-insert in Data Streamer within the same batch (#3713)

Within the same streamer batch, skip previous insert operations when a delete operation is encountered.
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
index af8e4a3..169cf67 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/streamer/ItAbstractDataStreamerTest.java
@@ -48,7 +48,6 @@
 import org.apache.ignite.tx.TransactionOptions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
@@ -290,8 +289,7 @@
 
     @ParameterizedTest
     @ValueSource(ints = {1, 2, 3})
-    @Disabled("IGNITE-21992 Data Streamer removal does not work for a new key in the same batch")
-    public void testSameItemInsertUpdateRemove(int pageSize) {
+    public void testSameItemInsertRemove(int pageSize) {
         RecordView<Tuple> view = defaultTable().recordView();
         CompletableFuture<Void> streamerFut;
         int key = 333000;
@@ -308,6 +306,27 @@
         assertNull(view.get(null, tupleKey(key)));
     }
 
+    @ParameterizedTest
+    @ValueSource(ints = {1, 2, 3})
+    public void testSameItemInsertRemoveInsertUpdate(int pageSize) {
+        RecordView<Tuple> view = defaultTable().recordView();
+        CompletableFuture<Void> streamerFut;
+        int key = 333001;
+
+        try (var publisher = new SubmissionPublisher<DataStreamerItem<Tuple>>()) {
+            streamerFut = view.streamData(publisher, DataStreamerOptions.builder().pageSize(pageSize).build());
+
+            publisher.submit(DataStreamerItem.of(tuple(key, "foo")));
+            publisher.submit(DataStreamerItem.removed(tupleKey(key)));
+            publisher.submit(DataStreamerItem.of(tuple(key, "bar")));
+            publisher.submit(DataStreamerItem.of(tuple(key, "baz")));
+        }
+
+        streamerFut.orTimeout(1, TimeUnit.SECONDS).join();
+
+        assertEquals("baz", view.get(null, tupleKey(key)).stringValue("name"));
+    }
+
     @SuppressWarnings("resource")
     @Test
     public void testSchemaUpdateWhileStreaming() throws InterruptedException {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 4c5bfaa..aa3aab5 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -2219,7 +2219,7 @@
             case RW_DELETE_EXACT_ALL: {
                 CompletableFuture<RowId>[] deleteExactLockFuts = new CompletableFuture[searchRows.size()];
 
-                Map<UUID, HybridTimestamp> lastCommitTimes = new HashMap<>();
+                Map<UUID, HybridTimestamp> lastCommitTimes = new ConcurrentHashMap<>();
 
                 for (int i = 0; i < searchRows.size(); i++) {
                     BinaryRow searchRow = searchRows.get(i);
@@ -2353,28 +2353,41 @@
             }
             case RW_UPSERT_ALL: {
                 CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>>[] rowIdFuts = new CompletableFuture[searchRows.size()];
+                BinaryTuple[] pks = new BinaryTuple[searchRows.size()];
 
-                Map<UUID, HybridTimestamp> lastCommitTimes = new HashMap<>();
+                Map<UUID, HybridTimestamp> lastCommitTimes = new ConcurrentHashMap<>();
                 BitSet deleted = request.deleted();
 
                 // When the same key is updated multiple times within the same batch, we need to maintain operation order and apply
                 // only the last update. This map stores the previous searchRows index for each key.
-                Map<ByteBuffer, Integer> newKeyMap = new HashMap<>();
+                Map<ByteBuffer, Integer> prevRowIdx = new HashMap<>();
 
                 for (int i = 0; i < searchRows.size(); i++) {
                     BinaryRow searchRow = searchRows.get(i);
-
                     boolean isDelete = deleted != null && deleted.get(i);
 
                     BinaryTuple pk = isDelete
                             ? resolvePk(searchRow.tupleSlice())
                             : extractPk(searchRow);
 
-                    int rowIdx = i;
+                    pks[i] = pk;
 
-                    rowIdFuts[i] = resolveRowByPk(pk, txId, (rowId, row, lastCommitTime) -> {
+                    Integer prevRowIdx0 = prevRowIdx.put(pk.byteBuffer(), i);
+                    if (prevRowIdx0 != null) {
+                        rowIdFuts[prevRowIdx0] = nullCompletedFuture(); // Skip previous row with the same key.
+                    }
+                }
+
+                for (int i = 0; i < searchRows.size(); i++) {
+                    if (rowIdFuts[i] != null) {
+                        continue; // Skip previous row with the same key.
+                    }
+
+                    BinaryRow searchRow = searchRows.get(i);
+                    boolean isDelete = deleted != null && deleted.get(i);
+
+                    rowIdFuts[i] = resolveRowByPk(pks[i], txId, (rowId, row, lastCommitTime) -> {
                         if (isDelete && rowId == null) {
-                            // Does not exist, nothing to delete.
                             return nullCompletedFuture();
                         }
 
@@ -2383,34 +2396,16 @@
                             lastCommitTimes.put(rowId.uuid(), lastCommitTime);
                         }
 
-                        boolean insert = rowId == null;
-                        RowId rowId0;
-
-                        if (insert) {
-                            Integer prevRowIdx = newKeyMap.put(pk.byteBuffer(), rowIdx);
-
-                            if (prevRowIdx != null) {
-                                // Return existing lock.
-                                CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> lockFut = rowIdFuts[prevRowIdx];
-
-                                // Skip previous update with the same key.
-                                rowIdFuts[prevRowIdx] = nullCompletedFuture();
-
-                                return lockFut;
-                            }
-
-                            rowId0 = new RowId(partId(), UUID.randomUUID());
-                        } else {
-                            rowId0 = rowId;
-                        }
-
                         if (isDelete) {
                             assert row != null;
 
-                            return takeLocksForDelete(row, rowId0, txId)
+                            return takeLocksForDelete(row, rowId, txId)
                                     .thenApply(id -> new IgniteBiTuple<>(id, null));
                         }
 
+                        boolean insert = rowId == null;
+                        RowId rowId0 = insert ? new RowId(partId(), UUID.randomUUID()) : rowId;
+
                         return insert
                                 ? takeLocksForInsert(searchRow, rowId0, txId)
                                 : takeLocksForUpdate(searchRow, rowId0, txId);
@@ -2526,7 +2521,7 @@
             case RW_DELETE_ALL: {
                 CompletableFuture<RowId>[] rowIdLockFuts = new CompletableFuture[primaryKeys.size()];
 
-                Map<UUID, HybridTimestamp> lastCommitTimes = new HashMap<>();
+                Map<UUID, HybridTimestamp> lastCommitTimes = new ConcurrentHashMap<>();
 
                 for (int i = 0; i < primaryKeys.size(); i++) {
                     rowIdLockFuts[i] = resolveRowByPk(primaryKeys.get(i), txId, (rowId, row, lastCommitTime) -> {