IGNITE-22147 Fixed flaky ItTxResourcesVacuumTest.testRecoveryAfterPersistentStateVacuumized (#3689)

diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
index ccb91a8..95f2408 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
@@ -24,7 +24,6 @@
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.internal.tx.TxState.COMMITTED;
 import static org.apache.ignite.internal.tx.TxState.FINISHING;
 import static org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
 import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.findTupleToBeHostedOnNode;
@@ -229,7 +228,7 @@
                 if (finishRequest.txId().equals(txId)) {
                     finishStartedFuture.complete(null);
 
-                    finishAllowedFuture.join();
+                    joinWithTimeout(finishAllowedFuture);
                 }
             }
 
@@ -250,7 +249,7 @@
         // Check that the volatile state of the transaction is preserved.
         assertTrue(checkVolatileTxStateOnNodes(nodes, txId));
 
-        finishAllowedFuture.complete(null);
+        assertTrue(finishAllowedFuture.complete(null));
 
         assertThat(commitFut, willCompleteSuccessfully());
 
@@ -382,7 +381,8 @@
 
         Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
 
-        log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);
+        log.info("Test: Commit partition [part={}, leaseholder={}, hostingNodes={}].", commitPartGrpId, commitPartitionLeaseholder.name(),
+                commitPartNodes);
 
         // Some node that does not host the commit partition, will be the primary node for upserting another tuple.
         IgniteImpl leaseholderForAnotherTuple = findNode(n -> !commitPartNodes.contains(n.name()));
@@ -405,7 +405,7 @@
                 log.info("Test: cleanup started.");
 
                 if (commitPartNodes.contains(n)) {
-                    cleanupAllowed.join();
+                    joinWithTimeout(cleanupAllowed);
                 }
             }
 
@@ -425,13 +425,13 @@
         assertTxStateVacuumized(Set.of(leaseholderForAnotherTuple.name()), txId, commitPartId, false);
 
         // Unblocking cleanup.
-        cleanupAllowed.complete(null);
+        assertTrue(cleanupAllowed.complete(null));
 
         assertThat(commitFut, willCompleteSuccessfully());
 
         Transaction roTxAfter = beginReadOnlyTx(anyNode());
 
-        waitForCondition(() -> volatileTxState(commitPartitionLeaseholder, txId) != null, 10_000);
+        waitForCleanupCompletion(commitPartNodes, txId);
 
         triggerVacuum();
         assertTxStateVacuumized(txId, commitPartId, true);
@@ -498,7 +498,7 @@
             if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
                 cleanupStarted.complete(null);
 
-                cleanupAllowedFut.join();
+                joinWithTimeout(cleanupAllowedFut);
             }
 
             return false;
@@ -512,7 +512,7 @@
 
         transferPrimary(cluster.runningNodes().collect(toSet()), commitPartGrpId, commitPartNodes::contains);
 
-        cleanupAllowedFut.complete(null);
+        assertTrue(cleanupAllowedFut.complete(null));
 
         cleanupAllowed[0] = true;
 
@@ -570,7 +570,8 @@
 
         Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
 
-        log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);
+        log.info("Test: Commit partition [part={}, leaseholder={}, hostingNodes={}].", commitPartGrpId, commitPartitionLeaseholder.name(),
+                commitPartNodes);
 
         view.upsert(tx, tuple);
 
@@ -578,17 +579,20 @@
         CompletableFuture<Void> cleanupAllowedFut = new CompletableFuture<>();
         boolean[] cleanupAllowed = new boolean[1];
 
-        commitPartitionLeaseholder.dropMessages((n, msg) -> {
+        // Cleanup may be triggered by the primary replica reelection as well.
+        runningNodes().filter(n -> commitPartNodes.contains(n.name())).forEach(nd -> nd.dropMessages((n, msg) -> {
             if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
                 cleanupStarted.complete(null);
 
-                cleanupAllowedFut.join();
+                log.warn("Test: cleanup started.");
 
-                return true;
+                joinWithTimeout(cleanupAllowedFut);
+
+                log.info("Test: cleanup resumed.");
             }
 
             return false;
-        });
+        }));
 
         Transaction roTxBefore = beginReadOnlyTx(anyNode());
 
@@ -596,6 +600,8 @@
 
         waitForTxStateReplication(commitPartNodes, txId, commitPartId, 10_000);
 
+        log.info("Test: state replicated.");
+
         assertThat(cleanupStarted, willCompleteSuccessfully());
 
         // Wait for volatile tx state vacuum. This is possible because tx finish is complete.
@@ -604,7 +610,7 @@
 
         log.info("Test: volatile state vacuumized");
 
-        cleanupAllowedFut.complete(null);
+        assertTrue(cleanupAllowedFut.complete(null));
 
         cleanupAllowed[0] = true;
 
@@ -614,11 +620,7 @@
 
         Transaction roTxAfter = beginReadOnlyTx(anyNode());
 
-        waitForCondition(() -> {
-            TxStateMeta txStateMeta = (TxStateMeta) volatileTxState(commitPartitionLeaseholder, txId);
-
-            return txStateMeta != null && txStateMeta.cleanupCompletionTimestamp() != null;
-        }, 10_000);
+        waitForCleanupCompletion(commitPartNodes, txId);
 
         log.info("Test: cleanup completed.");
 
@@ -689,13 +691,7 @@
 
         tx0.commitAsync();
 
-        // Check that the final tx state COMMITTED is saved to the persistent tx storage.
-        assertTrue(waitForCondition(() -> cluster.runningNodes().filter(n -> commitPartitionNodes.contains(n.name())).allMatch(n -> {
-            TransactionMeta meta = persistentTxState(n, txId0, commitPartId);
-
-            return meta != null && meta.txState() == COMMITTED;
-        }), 10_000));
-
+        // Cleanup starts not earlier than the finish command is applied to commit partition group.
         assertThat(cleanupStarted, willCompleteSuccessfully());
 
         // Stop the first transaction coordinator.
@@ -737,8 +733,6 @@
      */
     @Test
     public void testRoReadTheCorrectDataInBetween() {
-        setTxResourceTtl(0);
-
         IgniteImpl node = anyNode();
 
         String tableName = TABLE_NAME + "_1";
@@ -885,6 +879,28 @@
     }
 
     /**
+     * Wait for cleanup completion timestamp on any node of commit partition group.
+     *
+     * @param commitPartitionNodeNames Node names of nodes in commit partition group.
+     * @param txId Transaction id.
+     */
+    private void waitForCleanupCompletion(Set<String> commitPartitionNodeNames, UUID txId) throws InterruptedException {
+        Set<IgniteImpl> commitPartitionNodes = runningNodes().filter(n -> commitPartitionNodeNames.contains(n.name())).collect(toSet());
+
+        assertTrue(waitForCondition(() -> {
+            boolean res = false;
+
+            for (IgniteImpl node : commitPartitionNodes) {
+                TxStateMeta txStateMeta = (TxStateMeta) volatileTxState(node, txId);
+
+                res = res || txStateMeta != null && txStateMeta.cleanupCompletionTimestamp() != null;
+            }
+
+            return res;
+        }, 10_000));
+    }
+
+    /**
      * Assert that volatile (and if needed, persistent) state of the given tx is vacuumized on all nodes of the cluster.
      *
      * @param txId Transaction id.
@@ -1051,4 +1067,14 @@
                 .findFirst()
                 .get();
     }
+
+    private void joinWithTimeout(CompletableFuture<?> future) {
+        future.orTimeout(60, TimeUnit.SECONDS)
+                .exceptionally(e -> {
+                    log.error("Could not wait for the future.", e);
+
+                    return null;
+                })
+                .join();
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 56f1813..1df76b7 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -726,7 +726,9 @@
                 commit ? COMMITTED : ABORTED,
                 old == null ? null : old.txCoordinatorId(),
                 old == null ? partId : old.commitPartitionId(),
-                commit ? commitTimestamp : null
+                commit ? commitTimestamp : null,
+                old == null ? null : old.initialVacuumObservationTimestamp(),
+                old == null ? null : old.cleanupCompletionTimestamp()
         ));
     }
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index aa3aab5..52fef60 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -3922,7 +3922,9 @@
                 txState,
                 old == null ? null : old.txCoordinatorId(),
                 old == null ? null : old.commitPartitionId(),
-                txState == COMMITTED ? commitTimestamp : null
+                txState == COMMITTED ? commitTimestamp : null,
+                old == null ? null : old.initialVacuumObservationTimestamp(),
+                old == null ? null : old.cleanupCompletionTimestamp()
         ));
     }
 
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
index f73e9c9..1676be8 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java
@@ -57,12 +57,12 @@
     public void start() {
         messagingService.addMessageHandler(TxMessageGroup.class, (msg, sender, correlationId) -> {
             if (msg instanceof FinishedTransactionsBatchMessage) {
-                processTxCleanup((FinishedTransactionsBatchMessage) msg);
+                processFinishedTransactionsBatchMessage((FinishedTransactionsBatchMessage) msg);
             }
         });
     }
 
-    private void processTxCleanup(FinishedTransactionsBatchMessage closeCursorsMessage) {
+    private void processFinishedTransactionsBatchMessage(FinishedTransactionsBatchMessage closeCursorsMessage) {
         asyncExecutor.execute(() -> closeCursorsMessage.transactions().forEach(resourcesRegistry::close));
     }
 
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
index 7af5d71..9796727 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
@@ -139,7 +139,7 @@
                     if (ex == null) {
                         msg = prepareResponse();
                     } else {
-                        msg = prepareErrorResponse(ex);
+                        msg = prepareErrorResponse(txCleanupMessage.txId(), ex);
 
                         // Run durable cleanup for the partitions that we failed to cleanup properly.
                         // No need to wait on this future.
@@ -178,9 +178,10 @@
                 .build();
     }
 
-    private NetworkMessage prepareErrorResponse(Throwable th) {
+    private NetworkMessage prepareErrorResponse(UUID txId, Throwable th) {
         return FACTORY
                 .txCleanupMessageErrorResponse()
+                .txId(txId)
                 .throwable(th)
                 .timestampLong(clockService.nowLong())
                 .build();
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index 7c43ebf..dcf97ff 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -32,12 +32,15 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl.TransactionFailureHandler;
 import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
+import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
 import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.util.CompletableFutures;
@@ -47,6 +50,9 @@
  * Sends TX Cleanup request.
  */
 public class TxCleanupRequestSender {
+    /** Logger. */
+    private final IgniteLogger log = Loggers.forClass(TxCleanupRequestSender.class);
+
     /** Placement driver helper. */
     private final PlacementDriverHelper placementDriverHelper;
 
@@ -86,6 +92,12 @@
                 if (result != null) {
                     onCleanupReplicated(result);
                 }
+
+                if (msg instanceof TxCleanupMessageErrorResponse) {
+                    TxCleanupMessageErrorResponse response = (TxCleanupMessageErrorResponse) msg;
+
+                    log.warn("Exception happened during transaction cleanup [txId={}].", response.throwable(), response.txId());
+                }
             }
         });
     }
@@ -168,6 +180,9 @@
             @Nullable HybridTimestamp commitTimestamp,
             UUID txId
     ) {
+        // Start tracking the partitions we want to learn the replication confirmation from.
+        writeIntentsReplicated.put(txId, new CleanupContext(new HashSet<>(partitionIds), commit ? TxState.COMMITTED : TxState.ABORTED));
+
         return placementDriverHelper.findPrimaryReplicas(partitionIds)
                 .thenCompose(partitionData -> {
                     cleanupPartitionsWithoutPrimary(commit, commitTimestamp, txId, partitionData.partitionsWithoutPrimary);
@@ -257,7 +272,7 @@
          */
         private final TxState txState;
 
-        public CleanupContext(Set<TablePartitionId> partitions, TxState txState) {
+        private CleanupContext(Set<TablePartitionId> partitions, TxState txState) {
             this.partitions = partitions;
 
             this.txState = txState;
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
index 5be7788..8539742 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageErrorResponse.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.tx.message;
 
+import java.util.UUID;
 import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
 
@@ -26,6 +27,13 @@
 @Transferable(TxMessageGroup.TX_CLEANUP_MSG_ERR_RESPONSE)
 public interface TxCleanupMessageErrorResponse extends TxCleanupMessageResponse {
     /**
+     * Transaction id.
+     *
+     * @return Transaction id.
+     */
+    UUID txId();
+
+    /**
      * Returns a {@link Throwable} that was thrown during handling a lock release message.
      *
      * @return {@link Throwable} that was thrown during handling a lock release message.