IGNITE-21763 Adjust TxnResourceVacuumTask in order to vacuum persistent txn state (#3591)

diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 0ba31a2..2494b21 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -213,8 +213,8 @@
     }
 
     @Override
-    public void vacuum() {
-        // No-op.
+    public CompletableFuture<Void> vacuum() {
+        return nullCompletedFuture();
     }
 
     @Override
diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
index 05179d3..c6523ef 100644
--- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
+++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
@@ -147,7 +147,7 @@
             return falseCompletedFuture();
         });
 
-        NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), tblReplicationGrp, null);
+        NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), tblReplicationGrp);
 
         assertTrue(waitForCondition(primaryChanged::get, 10_000));
     }
@@ -179,7 +179,7 @@
 
         Collection<IgniteImpl> nodes = cluster.runningNodes().collect(toSet());
 
-        NodeUtils.transferPrimary(nodes, tblReplicationGrp, null);
+        NodeUtils.transferPrimary(nodes, tblReplicationGrp);
 
         CompletableFuture<String> primaryChangeTask =
                 IgniteTestUtils.runAsync(() -> NodeUtils.transferPrimary(nodes, tblReplicationGrp, primary));
@@ -264,7 +264,7 @@
         assertTrue(primaryIgnite.txManager().lockManager().locks(rwTx.id()).hasNext());
         assertEquals(6, partitionStorage.pendingCursors() + hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
 
-        NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), tblReplicationGrp, null);
+        NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), tblReplicationGrp);
 
         assertTrue(primaryIgnite.txManager().lockManager().locks(rwTx.id()).hasNext());
         assertEquals(6, partitionStorage.pendingCursors() + hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index cc54faf..abcef59 100644
--- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -314,7 +314,7 @@
     }
 
     @Test
-    public void prolongAfterActiveActorChanger() throws Exception {
+    public void prolongAfterActiveActorChanged() throws Exception {
         var acceptedNodeRef = new AtomicReference<String>();
 
         leaseGrantHandler = (msg, from, to) -> {
diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index bc85ce3..3ed175c 100644
--- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.placementdriver;
 
+import static java.util.Objects.hash;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
@@ -247,10 +248,15 @@
      * Finds a node that can be the leaseholder.
      *
      * @param assignments Replication group assignment.
+     * @param grpId Group id.
      * @param proposedConsistentId Proposed consistent id, found out of a lease negotiation. The parameter might be {@code null}.
      * @return Cluster node, or {@code null} if no node in assignments can be the leaseholder.
      */
-    private @Nullable ClusterNode nextLeaseHolder(Set<Assignment> assignments, @Nullable String proposedConsistentId) {
+    private @Nullable ClusterNode nextLeaseHolder(
+            Set<Assignment> assignments,
+            ReplicationGroupId grpId,
+            @Nullable String proposedConsistentId
+    ) {
         // TODO: IGNITE-18879 Implement more intellectual algorithm to choose a node.
         ClusterNode primaryCandidate = null;
 
@@ -267,8 +273,15 @@
                 primaryCandidate = candidateNode;
 
                 break;
-            } else if (primaryCandidate == null || primaryCandidate.name().hashCode() > assignment.consistentId().hashCode()) {
+            } else if (primaryCandidate == null) {
                 primaryCandidate = candidateNode;
+            } else {
+                int candidateHash = hash(primaryCandidate.name(), grpId);
+                int assignmentHash = hash(assignment.consistentId(), grpId);
+
+                if (candidateHash > assignmentHash) {
+                    primaryCandidate = candidateNode;
+                }
             }
         }
 
@@ -360,7 +373,7 @@
                         continue;
                     } else if (agreement.isDeclined()) {
                         // Here we initiate negotiations for UNDEFINED_AGREEMENT and retry them on newly started active actor as well.
-                        ClusterNode candidate = nextLeaseHolder(assignments, agreement.getRedirectTo());
+                        ClusterNode candidate = nextLeaseHolder(assignments, grpId, agreement.getRedirectTo());
 
                         if (candidate == null) {
                             leaseUpdateStatistics.onLeaseWithoutCandidate();
@@ -385,7 +398,7 @@
                             ? lease.getLeaseholder()
                             : lease.proposedCandidate();
 
-                    ClusterNode candidate = nextLeaseHolder(assignments, proposedLeaseholder);
+                    ClusterNode candidate = nextLeaseHolder(assignments, grpId, proposedLeaseholder);
 
                     if (candidate == null) {
                         leaseUpdateStatistics.onLeaseWithoutCandidate();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 02f1b08..a36a388 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -188,7 +188,7 @@
 
                 logger().info("Start transferring primary.");
 
-                NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), defaultTablePartitionId(node(0)), null);
+                NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), defaultTablePartitionId(node(0)));
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             } finally {
@@ -287,7 +287,7 @@
 
                 logger().info("Start transferring primary.");
 
-                NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), defaultTablePartitionId(node(0)), null);
+                NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), defaultTablePartitionId(node(0)));
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             } finally {
diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
index cf2a29c..6a574d9 100644
--- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
+++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
@@ -25,6 +25,7 @@
 
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -45,20 +46,52 @@
     private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
 
     /**
+     * Transfers the primary rights to another node, choosing any node from the cluster except the current leaseholder.
+     *
+     * @param nodes Nodes collection.
+     * @param groupId Group id.
+     * @return New primary replica name.
+     * @throws InterruptedException If failed.
+     */
+    public static String transferPrimary(
+            Collection<IgniteImpl> nodes,
+            ReplicationGroupId groupId
+    ) throws InterruptedException {
+        return transferPrimary(nodes, groupId, (Predicate) null);
+    }
+
+    /**
      * Transfers the primary rights to another node.
      *
      * @param nodes Nodes collection.
      * @param groupId Group id.
-     * @param preferablePrimary Primary replica name which is preferred for being primary or {@code null}.
+     * @param preferablePrimary Primary replica preferable node name.
      * @return New primary replica name.
      * @throws InterruptedException If failed.
      */
     public static String transferPrimary(
             Collection<IgniteImpl> nodes,
             ReplicationGroupId groupId,
-            @Nullable String preferablePrimary
+            String preferablePrimary
     ) throws InterruptedException {
-        LOG.info("Moving the primary replica [preferablePrimary=" + preferablePrimary + "].");
+        return transferPrimary(nodes, groupId, s -> s.equals(preferablePrimary));
+    }
+
+    /**
+     * Transfers the primary rights to another node.
+     *
+     * @param nodes Nodes collection.
+     * @param groupId Group id.
+     * @param preferablePrimaryFilter Primary replica preferable nodes filter, accepts the node consistent id.
+     * @return New primary replica name.
+     * @throws InterruptedException If failed.
+     */
+    public static String transferPrimary(
+            Collection<IgniteImpl> nodes,
+            ReplicationGroupId groupId,
+            @Nullable Predicate<String> preferablePrimaryFilter
+    ) throws InterruptedException {
+        LOG.info("Moving the primary replica [groupId={}].", groupId);
 
         IgniteImpl node = nodes.stream().findAny().orElseThrow();
 
@@ -68,16 +101,50 @@
                 .filter(n -> n.id().equals(currentLeaseholder.getLeaseholderId()))
                 .findFirst().orElseThrow();
 
-        if (preferablePrimary == null) {
-            preferablePrimary = nodes.stream()
-                    .map(IgniteImpl::name)
-                    .filter(n -> !n.equals(currentLeaseholder.getLeaseholder()))
-                    .findFirst()
-                    .orElseThrow();
+        Predicate<String> filter = preferablePrimaryFilter == null ? name -> true : preferablePrimaryFilter::test;
+
+        String finalPreferablePrimary = nodes.stream()
+                .map(IgniteImpl::name)
+                .filter(n -> !n.equals(currentLeaseholder.getLeaseholder()) && filter.test(n))
+                .findFirst()
+                .orElseThrow();
+
+        LOG.info("Moving the primary replica [groupId={}, currentLeaseholder={}, preferablePrimary={}].", groupId, leaseholderNode.name(),
+                finalPreferablePrimary);
+
+        ReplicaMeta[] newPrimaryReplica = new ReplicaMeta[1];
+        boolean[] stopLeaseNeeded = { true };
+
+        boolean success = waitForCondition(() -> {
+            if (stopLeaseNeeded[0]) {
+                stopLeaseProlongation(nodes, leaseholderNode, groupId, finalPreferablePrimary);
+            }
+
+            ReplicaMeta previousPrimary = newPrimaryReplica[0] == null ? currentLeaseholder : newPrimaryReplica[0];
+
+            newPrimaryReplica[0] = leaseholder(node, groupId);
+
+            // If the lease is changed to not suitable one, then stopLeaseProlongation will be retried, otherwise the cycle will be stopped.
+            stopLeaseNeeded[0] =
+                    !previousPrimary.getStartTime().equals(newPrimaryReplica[0].getStartTime())                // if lease changed
+                    || !previousPrimary.getExpirationTime().equals(newPrimaryReplica[0].getExpirationTime());  // if lease prolonged
+
+            return newPrimaryReplica[0].getLeaseholder().equals(finalPreferablePrimary);
+        }, 30_000);
+
+        if (success) {
+            LOG.info("Primary replica moved successfully from [{}] to [{}].", currentLeaseholder.getLeaseholder(), finalPreferablePrimary);
+        } else {
+            LOG.info("Moving the primary replica failed [groupId={}, actualPrimary={}].", groupId, newPrimaryReplica[0]);
         }
 
-        String finalPreferablePrimary = preferablePrimary;
+        assertTrue(success);
 
+        return finalPreferablePrimary;
+    }
+
+    private static void stopLeaseProlongation(Collection<IgniteImpl> nodes, IgniteImpl leaseholderNode, ReplicationGroupId groupId,
+            String preferablePrimary) {
         StopLeaseProlongationMessage msg = PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
                 .groupId(groupId)
                 .redirectProposal(preferablePrimary)
@@ -86,17 +153,6 @@
         nodes.forEach(
                 n -> leaseholderNode.clusterService().messagingService().send(n.clusterService().topologyService().localMember(), msg)
         );
-
-        assertTrue(waitForCondition(() -> {
-            ReplicaMeta newPrimaryReplica = leaseholder(node, groupId);
-
-            return newPrimaryReplica.getLeaseholder().equals(finalPreferablePrimary);
-        }, 10_000));
-
-        LOG.info("Primary replica moved successfully from [{}] to [{}].",
-                currentLeaseholder.getLeaseholder(), finalPreferablePrimary);
-
-        return finalPreferablePrimary;
     }
 
     private static ReplicaMeta leaseholder(IgniteImpl node, ReplicationGroupId groupId) {
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
index d4617d3..08b79a0 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
@@ -17,13 +17,13 @@
 
 package org.apache.ignite.internal.table;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
 import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteTransaction;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.waitAndGetPrimaryReplica;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -33,8 +33,6 @@
 import org.apache.ignite.InitParametersBuilder;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.placementdriver.ReplicaMeta;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
 import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
@@ -87,7 +85,8 @@
 
         builder.clusterConfiguration("{"
                 + "  transaction: {"
-                + "      implicitTransactionTimeout: 30000"
+                + "      implicitTransactionTimeout: 30000,"
+                + "      txnResourceTtl: 2"
                 + "  },"
                 + "  replication: {"
                 + "      rpcTimeout: 30000"
@@ -205,17 +204,4 @@
     private IgniteImpl findNodeByName(String leaseholder) {
         return findNode(0, initialNodes(), n -> leaseholder.equals(n.name()));
     }
-
-    private static ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node, ReplicationGroupId tblReplicationGrp) {
-        CompletableFuture<ReplicaMeta> primaryReplicaFut = node.placementDriver().awaitPrimaryReplica(
-                tblReplicationGrp,
-                node.clock().now(),
-                10,
-                SECONDS
-        );
-
-        assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
-        return primaryReplicaFut.join();
-    }
 }
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index 105d8d3..5147713 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -26,6 +26,7 @@
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.waitAndGetPrimaryReplica;
 import static org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -57,7 +58,6 @@
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.network.DefaultMessagingService;
 import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
 import org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -1149,19 +1149,6 @@
         return findNode(1, initialNodes(), n -> !leaseholder.equals(n.name()));
     }
 
-    private static ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node, ReplicationGroupId tblReplicationGrp) {
-        CompletableFuture<ReplicaMeta> primaryReplicaFut = node.placementDriver().awaitPrimaryReplica(
-                tblReplicationGrp,
-                node.clock().now(),
-                10,
-                SECONDS
-        );
-
-        assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
-        return primaryReplicaFut.join();
-    }
-
     private static String waitAndGetLeaseholder(IgniteImpl node, ReplicationGroupId tblReplicationGrp) {
         return waitAndGetPrimaryReplica(node, tblReplicationGrp).getLeaseholder();
     }
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
new file mode 100644
index 0000000..eb02e0e
--- /dev/null
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
@@ -0,0 +1,1052 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static org.apache.ignite.internal.table.NodeUtils.transferPrimary;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.FINISHING;
+import static org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.findTupleToBeHostedOnNode;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.partitionAssignment;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.partitionIdForTuple;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.table;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.tableId;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.txId;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.waitAndGetPrimaryReplica;
+import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.thread.ThreadOperation;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.message.TxCleanupMessage;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionOptions;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Integration tests for tx recources vacuum.
+ */
+@ExtendWith(SystemPropertiesExtension.class)
+@WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = "1000")
+public class ItTxResourcesVacuumTest extends ClusterPerTestIntegrationTest {
+    /** Table name. */
+    private static final String TABLE_NAME = "test_table";
+
+    private static final Tuple INITIAL_TUPLE = Tuple.create().set("key", 1L).set("val", "1");
+
+    private static final Function<Tuple, Tuple> NEXT_TUPLE = t -> Tuple.create()
+            .set("key", t.longValue("key") + 1)
+            .set("val", "" + (t.longValue("key") + 1));
+
+    private static final int REPLICAS = 2;
+
+    /** Nodes bootstrap configuration pattern. */
+    private static final String NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"
+            + "  network: {\n"
+            + "    port: {},\n"
+            + "    nodeFinder: {\n"
+            + "      netClusterNodes: [ {} ]\n"
+            + "    }\n"
+            + "  },\n"
+            + "  clientConnector: { port:{} },\n"
+            + "  rest.port: {},\n"
+            + "  raft: { responseTimeout: 30000 },"
+            + "  compute.threadPoolSize: 1\n"
+            + "}";
+
+    private ExecutorService txStateStorageExecutor = Executors.newSingleThreadExecutor();
+
+    @BeforeEach
+    @Override
+    public void setup(TestInfo testInfo) throws Exception {
+        super.setup(testInfo);
+
+        String zoneSql = "create zone test_zone with partitions=20, replicas=" + REPLICAS
+                + ", storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'";
+        String sql = "create table " + TABLE_NAME + " (key bigint primary key, val varchar(20)) with primary_zone='TEST_ZONE'";
+
+        cluster.doInSession(0, session -> {
+            executeUpdate(zoneSql, session);
+            executeUpdate(sql, session);
+        });
+
+        txStateStorageExecutor = Executors.newSingleThreadExecutor(IgniteThreadFactory.create("test", "tx-state-storage-test-pool", log,
+                ThreadOperation.STORAGE_READ));
+    }
+
+    @Override
+    @AfterEach
+    public void tearDown() {
+        shutdownAndAwaitTermination(txStateStorageExecutor, 10, TimeUnit.SECONDS);
+
+        super.tearDown();
+    }
+
+    @Override
+    protected void customizeInitParameters(InitParametersBuilder builder) {
+        super.customizeInitParameters(builder);
+
+        builder.clusterConfiguration("{"
+                + "  transaction: {"
+                + "      txnResourceTtl: 0"
+                + "  },"
+                + "  replication: {"
+                + "      rpcTimeout: 30000"
+                + "  },"
+                + "}");
+    }
+
+    /**
+     * Returns node bootstrap config template.
+     *
+     * @return Node bootstrap config template.
+     */
+    @Override
+    protected String getNodeBootstrapConfigTemplate() {
+        return NODE_BOOTSTRAP_CFG_TEMPLATE;
+    }
+
+    /**
+     * Simple TTL-triggered vacuum test, checking also that PENDING and FINISHING states are not removed.
+     *
+     * <ul>
+     *     <li>Run a transaction;</li>
+     *     <li>Run a parallel transaction;</li>
+     *     <li>Insert values within both transactions;</li>
+     *     <li>Commit the parallel transaction and wait for vacuum of its state;</li>
+     *     <li>Run another parallel transaction;</li>
+     *     <li>Check that the volatile PENDING state of the transaction is preserved;</li>
+     *     <li>Block {@link TxFinishReplicaRequest} for the pending transaction;</li>
+     *     <li>Start the tx commit;</li>
+     *     <li>While the state is FINISHING, commit the parallel transaction and wait for vacuum of its state;</li>
+     *     <li>Check that the volatile state of the transaction is preserved;</li>
+     *     <li>Unblock {@link TxFinishReplicaRequest};</li>
+     *     <li>Check that both volatile and persistent state is vacuumized;</li>
+     *     <li>Check that the committed value is correct.</li>
+     * </ul>
+     */
+    @Test
+    public void testVacuum() throws InterruptedException {
+        // We should test the TTL-triggered vacuum.
+        setTxResourceTtl(1);
+
+        IgniteImpl node = anyNode();
+
+        RecordView<Tuple> view = node.tables().table(TABLE_NAME).recordView();
+
+        // Put some value into the table.
+        Transaction tx = node.transactions().begin();
+        Transaction parallelTx1 = node.transactions().begin();
+        UUID txId = txId(tx);
+        UUID parallelTx1Id = txId(parallelTx1);
+
+        log.info("Test: Loading the data [tx={}].", txId);
+
+        Tuple tuple = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
+        Tuple tupleForParallelTx = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, NEXT_TUPLE.apply(tuple), NEXT_TUPLE, true);
+        int partIdForParallelTx = partitionIdForTuple(anyNode(), TABLE_NAME, tupleForParallelTx, parallelTx1);
+
+        int partId = partitionIdForTuple(node, TABLE_NAME, tuple, tx);
+
+        Set<String> nodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), partId));
+
+        view.upsert(tx, tuple);
+        view.upsert(parallelTx1, tupleForParallelTx);
+
+        // Check that the volatile PENDING state of the transaction is preserved.
+        parallelTx1.commit();
+        waitForTxStateVacuum(nodes, parallelTx1Id, partIdForParallelTx, true, 10_000);
+        assertTrue(checkVolatileTxStateOnNodes(nodes, txId));
+
+        Transaction parallelTx2 = node.transactions().begin();
+        UUID parallelTx2Id = txId(parallelTx2);
+        view.upsert(parallelTx2, tupleForParallelTx);
+
+        CompletableFuture<Void> finishStartedFuture = new CompletableFuture<>();
+        CompletableFuture<Void> finishAllowedFuture = new CompletableFuture<>();
+
+        node.dropMessages((n, msg) -> {
+            if (msg instanceof TxFinishReplicaRequest) {
+                TxFinishReplicaRequest finishRequest = (TxFinishReplicaRequest) msg;
+
+                if (finishRequest.txId().equals(txId)) {
+                    finishStartedFuture.complete(null);
+
+                    finishAllowedFuture.join();
+                }
+            }
+
+            return false;
+        });
+
+        Transaction roTxBefore = beginReadOnlyTx(anyNode());
+
+        CompletableFuture<Void> commitFut = runAsync(tx::commit);
+
+        assertThat(finishStartedFuture, willCompleteSuccessfully());
+
+        // While the state is FINISHING, wait 3 seconds.
+        assertEquals(FINISHING, volatileTxState(node, txId).txState());
+        parallelTx2.commit();
+        waitForTxStateVacuum(nodes, parallelTx2Id, partId, true, 10_000);
+
+        // Check that the volatile state of the transaction is preserved.
+        assertTrue(checkVolatileTxStateOnNodes(nodes, txId));
+
+        finishAllowedFuture.complete(null);
+
+        assertThat(commitFut, willCompleteSuccessfully());
+
+        log.info("Test: Tx committed [tx={}].", txId);
+
+        Transaction roTxAfter = beginReadOnlyTx(anyNode());
+
+        waitForTxStateReplication(nodes, txId, partId, 10_000);
+
+        // Check that both volatile and persistent state is vacuumized..
+        waitForTxStateVacuum(txId, partId, true, 10_000);
+
+        // Trying to read the value.
+        Tuple keyRec = Tuple.create().set("key", tuple.longValue("key"));
+        checkValueReadOnly(view, roTxBefore, keyRec, null);
+        checkValueReadOnly(view, roTxAfter, keyRec, tuple);
+    }
+
+    /**
+     * Check that the ABANDONED transaction state is preserved until recovery.
+     *
+     * <ul>
+     *     <li>Start a transaction from a coordinator that would be not included into commit partition group;</li>
+     *     <li>Start a parallel transaction;</li>
+     *     <li>Find a tuple for parallel tx that would be hosted on the same partition as a tuple for the abandoned tx;</li>
+     *     <li>Insert values within both transactions;</li>
+     *     <li>Commit the parallel transaction;</li>
+     *     <li>Stop the tx coordinator;</li>
+     *     <li>Wait for tx state of parallel tx to be vacuumized;</li>
+     *     <li>Check that the volatile state of the transaction is preserved;</li>
+     *     <li>Try to read the value using another transaction, which starts the tx recovery;</li>
+     *     <li>Check that abandoned tx is rolled back and thus the value is null;</li>
+     *     <li>Check that the abandoned transaction is recovered; its volatile and persistent states are vacuumized.</li>
+     * </ul>
+     */
+    @Test
+    public void testAbandonedTxnsAreNotVacuumizedUntilRecovered() throws InterruptedException {
+        setTxResourceTtl(1);
+
+        IgniteImpl leaseholder = cluster.node(0);
+
+        Tuple tuple = findTupleToBeHostedOnNode(leaseholder, TABLE_NAME, null, INITIAL_TUPLE, NEXT_TUPLE, true);
+
+        int partId = partitionIdForTuple(anyNode(), TABLE_NAME, tuple, null);
+
+        TablePartitionId groupId = new TablePartitionId(tableId(anyNode(), TABLE_NAME), partId);
+
+        Set<String> txNodes = partitionAssignment(anyNode(), groupId);
+
+        IgniteImpl abandonedTxCoord = findNode(n -> !txNodes.contains(n.name()));
+
+        RecordView<Tuple> view = abandonedTxCoord.tables().table(TABLE_NAME).recordView();
+
+        Transaction abandonedTx = abandonedTxCoord.transactions().begin();
+        UUID abandonedTxId = txId(abandonedTx);
+        Transaction parallelTx = abandonedTxCoord.transactions().begin();
+        UUID parallelTxId = txId(parallelTx);
+
+        // Find a tuple hosted on the same partition.
+        Tuple tupleForParallelTx = tuple;
+        int partIdForParallelTx = -1;
+        while (partIdForParallelTx != partId) {
+            tupleForParallelTx = findTupleToBeHostedOnNode(leaseholder, TABLE_NAME, null, NEXT_TUPLE.apply(tupleForParallelTx), NEXT_TUPLE,
+                    true);
+
+            partIdForParallelTx = partitionIdForTuple(anyNode(), TABLE_NAME, tupleForParallelTx, parallelTx);
+        }
+
+        view.upsert(abandonedTx, tuple);
+        view.upsert(parallelTx, tupleForParallelTx);
+
+        parallelTx.commit();
+
+        stopNode(abandonedTxCoord.name());
+
+        waitForTxStateVacuum(txNodes, parallelTxId, partIdForParallelTx, true, 10_000);
+
+        // Check that the volatile state of the transaction is preserved.
+        assertTrue(checkVolatileTxStateOnNodes(txNodes, abandonedTxId));
+
+        // Try to read the value using another transaction, which starts the tx recovery.
+        RecordView<Tuple> viewLh = leaseholder.tables().table(TABLE_NAME).recordView();
+        Tuple value = viewLh.get(null, Tuple.create().set("key", tuple.longValue("key")));
+        // Check that abandoned tx is rolled back and thus the value is null.
+        assertNull(value);
+
+        // Check that the abandoned transaction is recovered; its volatile and persistent states are vacuumized.
+        // Wait for it, because we don't have the recovery completion future.
+        waitForTxStateVacuum(txNodes, abandonedTxId, partId, true, 10_000);
+    }
+
+    /**
+     * Check that the tx state on commit partition is vacuumized only when cleanup is completed.
+     *
+     * <ul>
+     *     <li>Start a transaction;</li>
+     *     <li>Generate some tuple and define on which nodes it would be hosted;</li>
+     *     <li>Choose one more node that doesn't host the first tuple and choose a tuple that will be sent on this node as primary;</li>
+     *     <li>Upsert both tuples within a transaction;</li>
+     *     <li>Block {@link TxCleanupMessage}-s from commit partition primary;</li>
+     *     <li>Start a tx commit;</li>
+     *     <li>Wait for vacuum completion on a node that doesn't host the commit partition;</li>
+     *     <li>Unblock {@link TxCleanupMessage}-s;</li>
+     *     <li>Wait for the tx state vacuum on the commit partition group.</li>
+     * </ul>
+     */
+    @Test
+    @WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = "0")
+    public void testVacuumWithCleanupDelay() throws InterruptedException {
+        IgniteImpl node = anyNode();
+
+        RecordView<Tuple> view = node.tables().table(TABLE_NAME).recordView();
+
+        // Put some value into the table.
+        Transaction tx = node.transactions().begin();
+        UUID txId = txId(tx);
+
+        log.info("Test: Loading the data [tx={}].", txId);
+
+        // Generate some tuple and define on which nodes it would be hosted.
+        Tuple tuple0 = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
+
+        int commitPartId = partitionIdForTuple(node, TABLE_NAME, tuple0, tx);
+
+        TablePartitionId commitPartGrpId = new TablePartitionId(tableId(node, TABLE_NAME), commitPartId);
+
+        ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, commitPartGrpId);
+        IgniteImpl commitPartitionLeaseholder = findNode(n -> n.id().equals(replicaMeta.getLeaseholderId()));
+
+        Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
+
+        log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);
+
+        // Some node that does not host the commit partition, will be the primary node for upserting another tuple.
+        IgniteImpl leaseholderForAnotherTuple = findNode(n -> !commitPartNodes.contains(n.name()));
+
+        log.info("Test: leaseholderForAnotherTuple={}", leaseholderForAnotherTuple.name());
+
+        Tuple tuple1 = findTupleToBeHostedOnNode(leaseholderForAnotherTuple, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
+
+        // Upsert both tuples within a transaction.
+        view.upsert(tx, tuple0);
+        view.upsert(tx, tuple1);
+
+        CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
+        CompletableFuture<Void> cleanupAllowed = new CompletableFuture<>();
+
+        commitPartitionLeaseholder.dropMessages((n, msg) -> {
+            if (msg instanceof TxCleanupMessage) {
+                cleanupStarted.complete(null);
+
+                log.info("Test: cleanup started.");
+
+                if (commitPartNodes.contains(n)) {
+                    cleanupAllowed.join();
+                }
+            }
+
+            return false;
+        });
+
+        Transaction roTxBefore = beginReadOnlyTx(anyNode());
+
+        CompletableFuture<Void> commitFut = tx.commitAsync();
+
+        waitForTxStateReplication(commitPartNodes, txId, commitPartId, 10_000);
+
+        assertThat(cleanupStarted, willCompleteSuccessfully());
+
+        // Check the vacuum result on a node that doesn't host the commit partition.
+        triggerVacuum();
+        assertTxStateVacuumized(Set.of(leaseholderForAnotherTuple.name()), txId, commitPartId, false);
+
+        // Unblocking cleanup.
+        cleanupAllowed.complete(null);
+
+        assertThat(commitFut, willCompleteSuccessfully());
+
+        Transaction roTxAfter = beginReadOnlyTx(anyNode());
+
+        waitForCondition(() -> volatileTxState(commitPartitionLeaseholder, txId) != null, 10_000);
+
+        triggerVacuum();
+        assertTxStateVacuumized(txId, commitPartId, true);
+
+        // Trying to read the values.
+        Tuple key0 = Tuple.create().set("key", tuple0.longValue("key"));
+        Tuple key1 = Tuple.create().set("key", tuple1.longValue("key"));
+        checkValueReadOnly(view, roTxBefore, key0, null);
+        checkValueReadOnly(view, roTxAfter, key0, tuple0);
+        checkValueReadOnly(view, roTxBefore, key1, null);
+        checkValueReadOnly(view, roTxAfter, key1, tuple1);
+    }
+
+    /**
+     * Check that the tx state on commit partition is vacuumized only when cleanup is completed.
+     *
+     * <ul>
+     *     <li>Start a transaction;</li>
+     *     <li>Upsert a value;</li>
+     *     <li>Block {@link TxCleanupMessage}-s;</li>
+     *     <li>Start a tx commit;</li>
+     *     <li>Transfer the primary replica;</li>
+     *     <li>Unblock the {@link TxCleanupMessage}-s;</li>
+     *     <li>Ensure that tx states are finally vacuumized.</li>
+     * </ul>
+     */
+    @Test
+    public void testCommitPartitionPrimaryChangesBeforeVacuum() throws InterruptedException {
+        // We can't leave TTL as 0 here, because the primary replica is changed during cleanup, and this means
+        // WriteIntentSwitchReplicaRequest will be processed not on the primary. Removing tx state instantly will cause incorrect
+        // tx recovery and write intent switch with tx state as ABORTED.
+        setTxResourceTtl(1);
+
+        IgniteImpl node = anyNode();
+
+        RecordView<Tuple> view = node.tables().table(TABLE_NAME).recordView();
+
+        // Put some value into the table.
+        Transaction tx = node.transactions().begin();
+        UUID txId = txId(tx);
+
+        log.info("Test: Loading the data [tx={}].", txId);
+
+        Tuple tuple = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
+
+        int commitPartId = partitionIdForTuple(node, TABLE_NAME, tuple, tx);
+
+        TablePartitionId commitPartGrpId = new TablePartitionId(tableId(node, TABLE_NAME), commitPartId);
+
+        ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, commitPartGrpId);
+        IgniteImpl commitPartitionLeaseholder = findNode(n -> n.id().equals(replicaMeta.getLeaseholderId()));
+
+        Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
+
+        log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);
+
+        view.upsert(tx, tuple);
+
+        CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
+        CompletableFuture<Void> cleanupAllowedFut = new CompletableFuture<>();
+        boolean[] cleanupAllowed = new boolean[1];
+
+        commitPartitionLeaseholder.dropMessages((n, msg) -> {
+            if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
+                cleanupStarted.complete(null);
+
+                cleanupAllowedFut.join();
+            }
+
+            return false;
+        });
+
+        Transaction roTxBefore = beginReadOnlyTx(anyNode());
+
+        CompletableFuture<Void> commitFut = tx.commitAsync();
+
+        assertThat(cleanupStarted, willCompleteSuccessfully());
+
+        transferPrimary(cluster.runningNodes().collect(toSet()), commitPartGrpId, commitPartNodes::contains);
+
+        cleanupAllowedFut.complete(null);
+
+        cleanupAllowed[0] = true;
+
+        assertThat(commitFut, willCompleteSuccessfully());
+
+        log.info("Test: tx committed.");
+
+        waitForTxStateVacuum(txId, commitPartId, true, 10_000);
+
+        Transaction roTxAfter = beginReadOnlyTx(anyNode());
+
+        log.info("Test: checking values.");
+
+        // Trying to read the value.
+        Tuple key = Tuple.create().set("key", tuple.longValue("key"));
+        checkValueReadOnly(view, roTxBefore, key, null);
+        checkValueReadOnly(view, roTxAfter, key, tuple);
+    }
+
+    /**
+     * Check that the tx state on commit partition is vacuumized only when cleanup is completed.
+     *
+     * <ul>
+     *     <li>Start a transaction;</li>
+     *     <li>Upsert a tuple;</li>
+     *     <li>Block {@link TxCleanupMessage}-s from commit partition primary;</li>
+     *     <li>Start a tx commit;</li>
+     *     <li>Wait for tx cleanup to start;</li>
+     *     <li>Wait for volatile tx state vacuum;</li>
+     *     <li>Unblock {@link TxCleanupMessage}-s;</li>
+     *     <li>Wait for the tx state vacuum on the commit partition group.</li>
+     * </ul>
+     */
+    @Test
+    @WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = "0")
+    public void testVacuumPersistentStateAfterCleanupDelayAndVolatileStateVacuum() throws InterruptedException {
+        IgniteImpl node = anyNode();
+
+        RecordView<Tuple> view = node.tables().table(TABLE_NAME).recordView();
+
+        // Put some value into the table.
+        Transaction tx = node.transactions().begin();
+        UUID txId = txId(tx);
+
+        log.info("Test: Loading the data [tx={}].", txId);
+
+        Tuple tuple = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
+
+        int commitPartId = partitionIdForTuple(node, TABLE_NAME, tuple, tx);
+
+        TablePartitionId commitPartGrpId = new TablePartitionId(tableId(node, TABLE_NAME), commitPartId);
+
+        ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, commitPartGrpId);
+        IgniteImpl commitPartitionLeaseholder = findNode(n -> n.id().equals(replicaMeta.getLeaseholderId()));
+
+        Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
+
+        log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);
+
+        view.upsert(tx, tuple);
+
+        CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
+        CompletableFuture<Void> cleanupAllowedFut = new CompletableFuture<>();
+        boolean[] cleanupAllowed = new boolean[1];
+
+        commitPartitionLeaseholder.dropMessages((n, msg) -> {
+            if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
+                cleanupStarted.complete(null);
+
+                cleanupAllowedFut.join();
+
+                return true;
+            }
+
+            return false;
+        });
+
+        Transaction roTxBefore = beginReadOnlyTx(anyNode());
+
+        CompletableFuture<Void> commitFut = tx.commitAsync();
+
+        waitForTxStateReplication(commitPartNodes, txId, commitPartId, 10_000);
+
+        assertThat(cleanupStarted, willCompleteSuccessfully());
+
+        // Wait for volatile tx state vacuum. This is possible because tx finish is complete.
+        triggerVacuum();
+        assertTxStateVacuumized(txId, commitPartId, false);
+
+        log.info("Test: volatile state vacuumized");
+
+        cleanupAllowedFut.complete(null);
+
+        cleanupAllowed[0] = true;
+
+        assertThat(commitFut, willCompleteSuccessfully());
+
+        log.info("Test: commit completed.");
+
+        Transaction roTxAfter = beginReadOnlyTx(anyNode());
+
+        waitForCondition(() -> {
+            TxStateMeta txStateMeta = (TxStateMeta) volatileTxState(commitPartitionLeaseholder, txId);
+
+            return txStateMeta != null && txStateMeta.cleanupCompletionTimestamp() != null;
+        }, 10_000);
+
+        log.info("Test: cleanup completed.");
+
+        triggerVacuum();
+        assertTxStateVacuumized(txId, commitPartId, true);
+
+        // Trying to read the data.
+        Tuple key = Tuple.create().set("key", tuple.longValue("key"));
+        checkValueReadOnly(view, roTxBefore, key, null);
+        checkValueReadOnly(view, roTxAfter, key, tuple);
+    }
+
+    /**
+     * Checks that the tx recovery doesn't change tx finish result from COMMITTED to ABORTED if it once saved in the persistent storage.
+     *
+     * <ul>
+     *     <li>Start a transaction tx0;</li>
+     *     <li>Upsert some value;</li>
+     *     <li>Block {@link TxCleanupMessage}-s;</li>
+     *     <li>Start the commit of tx0 and with for tx state COMMITTED to be replicated in persistent storage;</li>
+     *     <li>Stop the tx0's coordinator;</li>
+     *     <li>Wait for tx0's state vacuum;</li>
+     *     <li>Try to get the data that has been committed by tx0, ensure the data is correct.</li>
+     * </ul>
+     */
+    @Test
+    public void testRecoveryAfterPersistentStateVacuumized() throws InterruptedException {
+        // This node isn't going to be stopped, so let it be node 0.
+        IgniteImpl commitPartitionLeaseholder = cluster.node(0);
+
+        Tuple tuple0 = findTupleToBeHostedOnNode(commitPartitionLeaseholder, TABLE_NAME, null, INITIAL_TUPLE, NEXT_TUPLE, true);
+
+        int commitPartId = partitionIdForTuple(commitPartitionLeaseholder, TABLE_NAME, tuple0, null);
+
+        Set<String> commitPartitionNodes = partitionAssignment(commitPartitionLeaseholder,
+                new TablePartitionId(tableId(commitPartitionLeaseholder, TABLE_NAME), commitPartId));
+
+        // Choose some node that doesn't host the partition as a tx coordinator.
+        IgniteImpl coord0 = findNode(n -> !commitPartitionNodes.contains(n.name()));
+
+        RecordView<Tuple> view0 = coord0.tables().table(TABLE_NAME).recordView();
+
+        // Put some value into the table.
+        Transaction tx0 = coord0.transactions().begin();
+        UUID txId0 = txId(tx0);
+
+        log.info("Test: Transaction 0 [tx={}].", txId0);
+
+        log.info("Test: Commit partition of transaction 0 [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(),
+                commitPartitionNodes);
+
+        view0.upsert(tx0, tuple0);
+
+        CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
+
+        commitPartitionLeaseholder.dropMessages((n, msg) -> {
+            if (msg instanceof TxCleanupMessage) {
+                cleanupStarted.complete(null);
+
+                return false;
+            }
+
+            return false;
+        });
+
+        log.info("Test: Committing the transaction 0 [tx={}].", txId0);
+
+        tx0.commitAsync();
+
+        // Check that the final tx state COMMITTED is saved to the persistent tx storage.
+        assertTrue(waitForCondition(() -> cluster.runningNodes().filter(n -> commitPartitionNodes.contains(n.name())).allMatch(n -> {
+            TransactionMeta meta = persistentTxState(n, txId0, commitPartId);
+
+            return meta != null && meta.txState() == COMMITTED;
+        }), 10_000));
+
+        assertThat(cleanupStarted, willCompleteSuccessfully());
+
+        // Stop the first transaction coordinator.
+        stopNode(coord0.name());
+
+        // No cleanup happened, waiting for vacuum on the remaining nodes that participated on tx0.
+        waitForTxStateVacuum(txId0, commitPartId, true, 10_000);
+
+        // Preparing to run another tx.
+        IgniteImpl coord1 = anyNode();
+
+        RecordView<Tuple> view1 = coord1.tables().table(TABLE_NAME).recordView();
+
+        // Another tx should get the data committed by tx 0.
+        Tuple keyTuple = Tuple.create().set("key", tuple0.longValue("key"));
+        Tuple tx0Data = view1.get(null, keyTuple);
+        assertEquals(tuple0.longValue("key"), tx0Data.longValue("key"));
+        assertEquals(tuple0.stringValue("val"), tx0Data.stringValue("val"));
+
+        // Waiting for vacuum, because there is no recovery future here.
+        waitForTxStateVacuum(txId0, commitPartId, true, 10_000);
+    }
+
+    /**
+     * Check that RO txns read the correct data consistent with commit timestamps.
+     *
+     * <ul>
+     *     <li>For this test, create another zone and table with number of replicas that is equal to number of nodes;</li>
+     *     <li>Start RO tx 1;</li>
+     *     <li>Upsert (k1, v1) within RW tx 1 and commit it;</li>
+     *     <li>Start RO tx 2;</li>
+     *     <li>Upsert (k1, v2) within RW tx 2 and commit it;</li>
+     *     <li>Start RO tx 3;</li>
+     *     <li>Wait for vacuum of the states of RW tx 1 and RW tx 2;</li>
+     *     <li>Read the data by k1 within RO tx 1, should be null;</li>
+     *     <li>Read the data by k1 within RO tx 2, should be v1;</li>
+     *     <li>Read the data by k1 within RO tx 3, should be v2.</li>
+     * </ul>
+     */
+    @Test
+    public void testRoReadTheCorrectDataInBetween() {
+        setTxResourceTtl(0);
+
+        IgniteImpl node = anyNode();
+
+        String tableName = TABLE_NAME + "_1";
+
+        // For this test, create another zone and table with number of replicas that is equal to number of nodes.
+        String zoneSql = "create zone test_zone_1 with partitions=20, replicas=" + initialNodes()
+                + ", storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'";
+        String sql = "create table " + tableName + " (key bigint primary key, val varchar(20)) with primary_zone='TEST_ZONE_1'";
+
+        cluster.doInSession(0, session -> {
+            executeUpdate(zoneSql, session);
+            executeUpdate(sql, session);
+        });
+
+        Transaction roTx1 = beginReadOnlyTx(node);
+
+        Tuple t1 = Tuple.create().set("key", 1L).set("val", "val1");
+        Tuple t2 = Tuple.create().set("key", 1L).set("val", "val2");
+
+        RecordView<Tuple> view = table(node, tableName).recordView();
+
+        Transaction rwTx1 = node.transactions().begin();
+        view.upsert(rwTx1, t1);
+        rwTx1.commit();
+        UUID rwTxId1 = txId(rwTx1);
+
+        Transaction roTx2 = beginReadOnlyTx(node);
+
+        Transaction rwTx2 = node.transactions().begin();
+        view.upsert(rwTx2, t2);
+        rwTx2.commit();
+        UUID rwTxId2 = txId(rwTx1);
+
+        Transaction roTx3 = beginReadOnlyTx(node);
+
+        triggerVacuum();
+
+        assertTxStateVacuumized(rwTxId1, tableName, partitionIdForTuple(node, tableName, t1, rwTx1), true);
+        assertTxStateVacuumized(rwTxId2, tableName, partitionIdForTuple(node, tableName, t2, rwTx2), true);
+
+        Tuple keyRec = Tuple.create().set("key", 1L);
+
+        checkValueReadOnly(view, roTx1, keyRec, null);
+        checkValueReadOnly(view, roTx2, keyRec, t1);
+        checkValueReadOnly(view, roTx3, keyRec, t2);
+    }
+
+    private static Transaction beginReadOnlyTx(IgniteImpl node) {
+        return node.transactions().begin(new TransactionOptions().readOnly(true));
+    }
+
+    /**
+     * Check value using given read only tx.
+     *
+     * @param view Record view.
+     * @param readOnlyTx RO tx.
+     * @param keyTuple Key tuple.
+     * @param expected Expected tuple.
+     */
+    private static void checkValueReadOnly(RecordView<Tuple> view, Transaction readOnlyTx, Tuple keyTuple, @Nullable Tuple expected) {
+        Tuple actual = view.get(readOnlyTx, keyTuple);
+
+        if (expected == null) {
+            assertNull(actual);
+        } else {
+            assertEquals(expected.stringValue("val"), actual.stringValue("val"));
+        }
+    }
+
+    private void setTxResourceTtl(long ttl) {
+        CompletableFuture<Void> changeFuture = anyNode().clusterConfiguration().change(c ->
+                c.changeRoot(TransactionConfiguration.KEY).changeTxnResourceTtl(ttl));
+
+        assertThat(changeFuture, willCompleteSuccessfully());
+    }
+
+    /**
+     * To use it, set tx resource TTL should be set to {@code 0}, see {@link #setTxResourceTtl(long)}.
+     */
+    private void triggerVacuum() {
+        runningNodes().forEach(node -> {
+            log.info("Test: triggering vacuum manually on node: " + node.name());
+
+            CompletableFuture<Void> vacuumFut = node.txManager().vacuum();
+            assertThat(vacuumFut, willCompleteSuccessfully());
+        });
+    }
+
+    private boolean checkVolatileTxStateOnNodes(Set<String> nodeConsistentIds, UUID txId) {
+        return cluster.runningNodes()
+                .filter(n -> nodeConsistentIds.contains(n.name()))
+                .allMatch(n -> volatileTxState(n, txId) != null);
+    }
+
+    private boolean checkPersistentTxStateOnNodes(Set<String> nodeConsistentIds, UUID txId, int partId) {
+        return cluster.runningNodes()
+                .filter(n -> nodeConsistentIds.contains(n.name()))
+                .allMatch(n -> persistentTxState(n, txId, partId) != null);
+    }
+
+    /**
+     * Waits for persistent tx state to be replicated on the given nodes.
+     *
+     * @param nodeConsistentIds Node names.
+     * @param txId Transaction id.
+     * @param partId Commit partition id.
+     * @param timeMs Time to wait.
+     */
+    private void waitForTxStateReplication(Set<String> nodeConsistentIds, UUID txId, int partId, long timeMs)
+            throws InterruptedException {
+        assertTrue(waitForCondition(() -> checkPersistentTxStateOnNodes(nodeConsistentIds, txId, partId), timeMs));
+    }
+
+    /**
+     * Waits for vacuum of volatile (and if needed, persistent) state of the given tx on all nodes of the cluster.
+     *
+     * @param txId Transaction id.
+     * @param partId Commit partition id to check the persistent tx state storage of this partition.
+     * @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
+     * @param timeMs Time to wait.
+     */
+    private void waitForTxStateVacuum(UUID txId, int partId, boolean checkPersistent, long timeMs) throws InterruptedException {
+        waitForTxStateVacuum(cluster.runningNodes().map(IgniteImpl::name).collect(toSet()), txId, partId, checkPersistent, timeMs);
+    }
+
+    /**
+     * Waits for vacuum of volatile (and if needed, persistent) state of the given tx on the given nodes.
+     *
+     * @param nodeConsistentIds Node names.
+     * @param txId Transaction id.
+     * @param partId Commit partition id to check the persistent tx state storage of this partition.
+     * @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
+     * @param timeMs Time to wait.
+     */
+    private void waitForTxStateVacuum(Set<String> nodeConsistentIds, UUID txId, int partId, boolean checkPersistent, long timeMs)
+            throws InterruptedException {
+        boolean r = waitForCondition(() -> txStateIsAbsent(nodeConsistentIds, txId, TABLE_NAME, partId, checkPersistent, false), timeMs);
+
+        if (!r) {
+            logCurrentTxState(nodeConsistentIds, txId, TABLE_NAME, partId);
+        }
+
+        assertTrue(r);
+    }
+
+    /**
+     * Assert that volatile (and if needed, persistent) state of the given tx is vacuumized on all nodes of the cluster.
+     *
+     * @param txId Transaction id.
+     * @param partId Commit partition id to check the persistent tx state storage of this partition.
+     * @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
+     */
+    private void assertTxStateVacuumized(UUID txId, int partId, boolean checkPersistent) {
+        assertTxStateVacuumized(txId, TABLE_NAME, partId, checkPersistent);
+    }
+
+    /**
+     * Assert that volatile (and if needed, persistent) state of the given tx is vacuumized on all nodes of the cluster.
+     *
+     * @param txId Transaction id.
+     * @param tableName Table name of the table that commit partition belongs to.
+     * @param partId Commit partition id to check the persistent tx state storage of this partition.
+     * @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
+     */
+    private void assertTxStateVacuumized(UUID txId, String tableName, int partId, boolean checkPersistent) {
+        Set<String> allNodes = cluster.runningNodes().map(IgniteImpl::name).collect(toSet());
+
+        assertTxStateVacuumized(allNodes, txId, tableName, partId, checkPersistent);
+    }
+
+    /**
+     * Assert that volatile (and if needed, persistent) state of the given tx is vacuumized on the given nodes. Uses default
+     * {@link #TABLE_NAME}.
+     *
+     * @param nodeConsistentIds Node names.
+     * @param txId Transaction id.
+     * @param partId Commit partition id to check the persistent tx state storage of this partition.
+     * @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
+     */
+    private void assertTxStateVacuumized(Set<String> nodeConsistentIds, UUID txId, int partId, boolean checkPersistent) {
+        assertTxStateVacuumized(nodeConsistentIds, txId, TABLE_NAME, partId, checkPersistent);
+    }
+
+    /**
+     * Assert that volatile (and if needed, persistent) state of the given tx is vacuumized on the given nodes.
+     *
+     * @param nodeConsistentIds Node names.
+     * @param txId Transaction id.
+     * @param tableName Table name of the table that commit partition belongs to.
+     * @param partId Commit partition id to check the persistent tx state storage of this partition.
+     * @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
+     */
+    private void assertTxStateVacuumized(Set<String> nodeConsistentIds, UUID txId, String tableName, int partId, boolean checkPersistent) {
+        boolean result = txStateIsAbsent(nodeConsistentIds, txId, tableName, partId, checkPersistent, true);
+
+        if (!result) {
+            triggerVacuum();
+
+            result = txStateIsAbsent(nodeConsistentIds, txId, tableName, partId, checkPersistent, true);
+
+            if (!result) {
+                logCurrentTxState(nodeConsistentIds, txId, tableName, partId);
+            }
+        }
+
+        assertTrue(result);
+    }
+
+    /**
+     * Checks whether the tx state is absent on all of the given nodes.
+     *
+     * @param nodeConsistentIds Set of node names to check.
+     * @param txId Transaction id.
+     * @param tableName Table name of the table that commit partition belongs to.
+     * @param partId Commit partition id.
+     * @param checkPersistent Whether the persistent state should be checked.
+     * @param checkCpPrimaryOnly If {@code} true, the persistent state should be checked only on the commit partition primary,
+     *     otherwise it would be checked on every given node.
+     * @return {@code true} if tx state is absent, {@code false} otherwise. Call {@link #logCurrentTxState(Set, UUID, String, int)}
+     *     for details.
+     */
+    private boolean txStateIsAbsent(
+            Set<String> nodeConsistentIds,
+            UUID txId,
+            String tableName,
+            int partId,
+            boolean checkPersistent,
+            boolean checkCpPrimaryOnly
+    ) {
+        boolean result = true;
+
+        String cpPrimaryId = null;
+
+        if (checkCpPrimaryOnly) {
+            IgniteImpl node = anyNode();
+
+            TablePartitionId tablePartitionId = new TablePartitionId(tableId(node, tableName), partId);
+
+            CompletableFuture<ReplicaMeta> replicaFut = node.placementDriver().getPrimaryReplica(tablePartitionId, node.clock().now());
+            assertThat(replicaFut, willCompleteSuccessfully());
+
+            ReplicaMeta replicaMeta = replicaFut.join();
+            // The test doesn't make sense if there is no primary right now.
+            assertNotNull(replicaMeta);
+
+            cpPrimaryId = replicaMeta.getLeaseholderId();
+        }
+
+        for (Iterator<IgniteImpl> iterator = cluster.runningNodes().iterator(); iterator.hasNext();) {
+            IgniteImpl node = iterator.next();
+
+            if (!nodeConsistentIds.contains(node.name())) {
+                continue;
+            }
+
+            result = result
+                    && volatileTxState(node, txId) == null
+                    && (!checkPersistent || !node.id().equals(cpPrimaryId) || persistentTxState(node, txId, partId) == null);
+        }
+
+        return result;
+    }
+
+    private void logCurrentTxState(Set<String> nodeConsistentIds, UUID txId, String table, int partId) {
+        cluster.runningNodes().filter(n -> nodeConsistentIds.contains(n.name())).forEach(node -> {
+            log.info("Test: volatile   state [tx={}, node={}, state={}].", txId, node.name(), volatileTxState(node, txId));
+            log.info("Test: persistent state [tx={}, node={}, state={}].", txId, node.name(), persistentTxState(node, txId, table, partId));
+        });
+    }
+
+    private IgniteImpl anyNode() {
+        return runningNodes().findFirst().orElseThrow();
+    }
+
+    @Nullable
+    private static TransactionMeta volatileTxState(IgniteImpl node, UUID txId) {
+        TxManagerImpl txManager = (TxManagerImpl) node.txManager();
+        return txManager.stateMeta(txId);
+    }
+
+    @Nullable
+    private TransactionMeta persistentTxState(IgniteImpl node, UUID txId, int partId) {
+        return persistentTxState(node, txId, TABLE_NAME, partId);
+    }
+
+    @Nullable
+    private TransactionMeta persistentTxState(IgniteImpl node, UUID txId, String tableName, int partId) {
+        TransactionMeta[] meta = new TransactionMeta[1];
+
+        Future f = txStateStorageExecutor.submit(() -> {
+            TxStateStorage txStateStorage = table(node, tableName).internalTable().txStateStorage().getTxStateStorage(partId);
+
+            assertNotNull(txStateStorage);
+
+            meta[0] = txStateStorage.get(txId);
+        });
+
+        try {
+            f.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+
+        return meta[0];
+    }
+
+    private IgniteImpl findNode(Predicate<IgniteImpl> filter) {
+        return cluster.runningNodes()
+                .filter(n -> n != null && filter.test(n))
+                .findFirst()
+                .get();
+    }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 94903d0..7ca6b96 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
 
 import java.util.ArrayList;
@@ -47,6 +48,8 @@
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+import org.jetbrains.annotations.VisibleForTesting;
 
 /**
  * Table view implementation for binary objects.
@@ -403,6 +406,25 @@
     }
 
     /**
+     * Marshal a tuple to a row. Test-only public method.
+     *
+     * @param tx Transaction, if present.
+     * @param rec Tuple record.
+     * @return A future, with row as a result.
+     */
+    @TestOnly
+    @VisibleForTesting
+    public CompletableFuture<BinaryRowEx> tupleToBinaryRow(@Nullable Transaction tx, Tuple rec) {
+        Objects.requireNonNull(rec);
+
+        return doOperation(tx, schemaVersion -> {
+            Row row = marshal(rec, schemaVersion, false);
+
+            return completedFuture(row);
+        });
+    }
+
+    /**
      * Returns table row tuple.
      *
      * @param row Binary row.
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 32aaa5c..56f1813 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -79,6 +79,7 @@
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.UpdateCommandResult;
+import org.apache.ignite.internal.tx.message.VacuumTxStatesCommand;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.util.TrackerClosedException;
@@ -220,6 +221,8 @@
                     handleBuildIndexCommand((BuildIndexCommand) command, commandIndex, commandTerm);
                 } else if (command instanceof PrimaryReplicaChangeCommand) {
                     handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command, commandIndex, commandTerm);
+                } else if (command instanceof VacuumTxStatesCommand) {
+                    handleVacuumTxStatesCommand((VacuumTxStatesCommand) command, commandIndex, commandTerm);
                 } else {
                     assert false : "Command was not found [cmd=" + command + ']';
                 }
@@ -397,7 +400,10 @@
                 commandTerm
         );
 
-        markFinished(txId, cmd.commit(), cmd.commitTimestamp());
+        // Assume that we handle the finish command only on the commit partition.
+        TablePartitionId commitPartitionId = new TablePartitionId(storage.tableId(), storage.partitionId());
+
+        markFinished(txId, cmd.commit(), cmd.commitTimestamp(), commitPartitionId);
 
         LOG.debug("Finish the transaction txId = {}, state = {}, txStateChangeRes = {}", txId, txMetaToSet, txStateChangeRes);
 
@@ -433,7 +439,7 @@
 
         UUID txId = cmd.txId();
 
-        markFinished(txId, cmd.commit(), cmd.commitTimestamp());
+        markFinished(txId, cmd.commit(), cmd.commitTimestamp(), null);
 
         storageUpdateHandler.switchWriteIntents(
                 txId,
@@ -639,6 +645,24 @@
         });
     }
 
+    /**
+     * Handler for {@link VacuumTxStatesCommand}.
+     *
+     * @param cmd Command.
+     * @param commandIndex Command index.
+     * @param commandTerm Command term.
+     */
+    private void handleVacuumTxStatesCommand(VacuumTxStatesCommand cmd, long commandIndex, long commandTerm) {
+        // Skips the write command because the storage has already executed it.
+        if (commandIndex <= storage.lastAppliedIndex()) {
+            return;
+        }
+
+        for (UUID txId : cmd.txIds()) {
+            txStateStorage.remove(txId, commandIndex, commandTerm);
+        }
+    }
+
     private static void onTxStateStorageCasFail(UUID txId, TxMeta txMetaBeforeCas, TxMeta txMetaToSet) {
         String errorMsg = format("Failed to update tx state in the storage, transaction txId = {} because of inconsistent state,"
                         + " expected state = {}, state to set = {}",
@@ -697,11 +721,11 @@
         ));
     }
 
-    private void markFinished(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp) {
+    private void markFinished(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp, @Nullable TablePartitionId partId) {
         txManager.updateTxMeta(txId, old -> new TxStateMeta(
                 commit ? COMMITTED : ABORTED,
                 old == null ? null : old.txCoordinatorId(),
-                old == null ? null : old.commitPartitionId(),
+                old == null ? partId : old.commitPartitionId(),
                 commit ? commitTimestamp : null
         ));
     }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index b4589a0..4c5bfaa 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -176,8 +176,11 @@
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.message.TxCleanupRecoveryRequest;
 import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
 import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
+import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
+import org.apache.ignite.internal.tx.message.VacuumTxStatesCommand;
 import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
 import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -232,6 +235,9 @@
     /** Factory for creating replica command messages. */
     private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
 
+    /** Factory for creating transaction command messages. */
+    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
+
     /** Replication retries limit. */
     private static final int MAX_RETIES_ON_SAFE_TIME_REORDERING = 1000;
 
@@ -536,7 +542,7 @@
                         new HybridTimestampTracker(),
                         replicationGroupId,
                         false,
-                        // term is not required for the rollback.
+                        // Enlistment consistency token is not required for the rollback, so it is 0L.
                         Map.of(replicationGroupId, new IgniteBiTuple<>(clusterNodeResolver.getById(senderId), 0L)),
                         txId
                 )
@@ -764,6 +770,8 @@
             return processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) request, opStartTsIfDirectRo);
         } else if (request instanceof TxStateCommitPartitionRequest) {
             return processTxStateCommitPartitionRequest((TxStateCommitPartitionRequest) request);
+        } else if (request instanceof VacuumTxStateReplicaRequest) {
+            return processVacuumTxStateReplicaRequest((VacuumTxStateReplicaRequest) request);
         } else {
             throw new UnsupportedReplicaRequestException(request.getClass());
         }
@@ -3686,12 +3694,12 @@
 
             // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
             return txManager.executeWriteIntentSwitchAsync(() -> inBusyLock(busyLock,
-                    () -> storageUpdateHandler.switchWriteIntents(
-                            txId,
-                            txState == COMMITTED,
-                            commitTimestamp,
-                            indexIdsAtRwTxBeginTs(txId)
-                    )
+                   () -> storageUpdateHandler.switchWriteIntents(
+                           txId,
+                           txState == COMMITTED,
+                           commitTimestamp,
+                           indexIdsAtRwTxBeginTs(txId)
+                   )
             )).whenComplete((unused, e) -> {
                 if (e != null) {
                     LOG.warn("Failed to complete transaction cleanup command [txId=" + txId + ']', e);
@@ -4068,6 +4076,14 @@
         return source == null ? null : new BinaryRowUpgrader(schemaRegistry, targetSchemaVersion).upgrade(source);
     }
 
+    private CompletableFuture<?> processVacuumTxStateReplicaRequest(VacuumTxStateReplicaRequest request) {
+        VacuumTxStatesCommand cmd = TX_MESSAGES_FACTORY.vacuumTxStatesCommand()
+                .txIds(request.transactionIds())
+                .build();
+
+        return raftClient.run(cmd);
+    }
+
     /**
      * Operation unique identifier.
      */
diff --git a/modules/transactions/build.gradle b/modules/transactions/build.gradle
index 371e5e0..ac8c827 100644
--- a/modules/transactions/build.gradle
+++ b/modules/transactions/build.gradle
@@ -55,12 +55,19 @@
 
     integrationTestImplementation project(':ignite-api')
     integrationTestImplementation(testFixtures(project(':ignite-core')))
+    integrationTestImplementation(testFixtures(project(':ignite-transactions')))
     integrationTestImplementation(testFixtures(project(':ignite-runner')))
 
     testFixturesImplementation project(':ignite-configuration')
     testFixturesImplementation project(':ignite-core')
     testFixturesImplementation project(':ignite-api')
     testFixturesImplementation project(':ignite-schema')
+    testFixturesImplementation project(':ignite-runner')
+    testFixturesImplementation project(':ignite-affinity')
+    testFixturesImplementation project(':ignite-metastorage-api')
+    testFixturesImplementation project(':ignite-placement-driver-api')
+    testFixturesImplementation project(':ignite-distribution-zones')
+    testFixturesImplementation project(':ignite-table')
     testFixturesImplementation(testFixtures(project(':ignite-core')))
     testFixturesImplementation libs.jetbrains.annotations
     testFixturesImplementation libs.mockito.junit
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index e01e279..badc077 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -129,7 +129,7 @@
      *         should pass its own tracker to provide linearizability between read-write and read-only transactions started by this client.
      * @param commitPartition Partition to store a transaction state.
      * @param commit {@code true} if a commit requested.
-     * @param enlistedGroups Enlisted partition groups with consistency token.
+     * @param enlistedGroups Enlisted partition groups with consistency tokens.
      * @param txId Transaction id.
      */
     CompletableFuture<Void> finish(
@@ -185,8 +185,12 @@
      */
     CompletableFuture<Void> cleanup(String node, UUID txId);
 
-    /** Locally vacuums no longer needed transactional resources, like txnState both persistent and volatile. */
-    void vacuum();
+    /**
+     * Locally vacuums no longer needed transactional resources, like txnState both persistent and volatile.
+     *
+     * @return Vacuum complete future.
+     */
+    CompletableFuture<Void> vacuum();
 
     /**
      * Returns a number of finished transactions.
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index 686e86f..f489d4a 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -43,6 +43,7 @@
 
     private final Long initialVacuumObservationTimestamp;
 
+    @Nullable
     private final Long cleanupCompletionTimestamp;
 
     /**
@@ -69,6 +70,7 @@
      * @param txCoordinatorId Transaction coordinator id.
      * @param commitPartitionId Commit partition replication group id.
      * @param commitTimestamp Commit timestamp.
+     * @param initialVacuumObservationTimestamp Initial vacuum observation timestamp.
      */
     public TxStateMeta(
             TxState txState,
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
new file mode 100644
index 0000000..52beab9
--- /dev/null
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Implements the logic of persistent tx states vacuum.
+ */
+public class PersistentTxStateVacuumizer {
+    private static final IgniteLogger LOG = Loggers.forClass(PersistentTxStateVacuumizer.class);
+
+    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
+
+    private final ReplicaService replicaService;
+
+    private final ClusterNode localNode;
+
+    private final ClockService clockService;
+
+    private final PlacementDriver placementDriver;
+
+    /**
+     * Constructor.
+     *
+     * @param replicaService Replica service.
+     * @param localNode Local node.
+     * @param clockService Clock service.
+     * @param placementDriver Placement driver.
+     */
+    public PersistentTxStateVacuumizer(
+            ReplicaService replicaService,
+            ClusterNode localNode,
+            ClockService clockService,
+            PlacementDriver placementDriver
+    ) {
+        this.replicaService = replicaService;
+        this.localNode = localNode;
+        this.clockService = clockService;
+        this.placementDriver = placementDriver;
+    }
+
+    /**
+     * Vacuum persistent tx states.
+     *
+     * @param txIds Transaction ids to vacuum; map of commit partition ids to sets of tx ids.
+     * @return A future, result is the set of successfully vacuumized txn states.
+     */
+    public CompletableFuture<Set<UUID>> vacuumPersistentTxStates(Map<TablePartitionId, Set<UUID>> txIds) {
+        Set<UUID> successful = ConcurrentHashMap.newKeySet();
+        List<CompletableFuture<?>> futures = new ArrayList<>();
+        HybridTimestamp now = clockService.now();
+
+        txIds.forEach((commitPartitionId, txs) -> {
+            CompletableFuture<?> future = placementDriver.getPrimaryReplica(commitPartitionId, now)
+                    .thenCompose(replicaMeta -> {
+                        // If the primary replica is absent or is not located on the local node, this means that the primary either is
+                        // on another node or would be re-elected on local one; then the volatile state (as well as cleanup completion
+                        // timestamp) would be updated there, and then this operation would be called from there.
+                        // Also, we are going to send the vacuum request only to the local node.
+                        if (replicaMeta != null && localNode.id().equals(replicaMeta.getLeaseholderId())) {
+                            VacuumTxStateReplicaRequest request = TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest()
+                                    .enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
+                                    .groupId(commitPartitionId)
+                                    .transactionIds(txs)
+                                    .build();
+
+                            return replicaService.invoke(localNode, request).whenComplete((v, e) -> {
+                                if (e == null) {
+                                    successful.addAll(txs);
+                                    // We can log the exceptions without further handling because failed requests' txns are not added
+                                    // to the set of successful and will be retried. PrimaryReplicaMissException can be considered as
+                                    // a part of regular flow and doesn't need to be logged.
+                                } else if (unwrapCause(e) instanceof PrimaryReplicaMissException) {
+                                    LOG.debug("Failed to vacuum tx states from the persistent storage.", e);
+                                } else {
+                                    LOG.warn("Failed to vacuum tx states from the persistent storage.", e);
+                                }
+                            });
+                        } else {
+                            successful.addAll(txs);
+
+                            return nullCompletedFuture();
+                        }
+                    });
+
+            futures.add(future);
+        });
+
+        return allOf(futures.toArray(new CompletableFuture[0]))
+                .handle((unused, unusedEx) -> successful);
+    }
+}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
index c4d2c1a..b355ef8 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
@@ -106,19 +106,21 @@
 
     @Override
     public CompletableFuture<Void> startAsync() {
-        resourceVacuumExecutor.scheduleAtFixedRate(
-                this::runVacuumOperations,
-                0,
-                resourceVacuumIntervalMilliseconds,
-                TimeUnit.MILLISECONDS
-        );
+        if (resourceVacuumIntervalMilliseconds > 0) {
+            resourceVacuumExecutor.scheduleAtFixedRate(
+                    this::runVacuumOperations,
+                    0,
+                    resourceVacuumIntervalMilliseconds,
+                    TimeUnit.MILLISECONDS
+            );
 
-        resourceVacuumExecutor.scheduleAtFixedRate(
-                finishedReadOnlyTransactionTracker::broadcastClosedTransactions,
-                0,
-                resourceVacuumIntervalMilliseconds,
-                TimeUnit.MILLISECONDS
-        );
+            resourceVacuumExecutor.scheduleAtFixedRate(
+                    finishedReadOnlyTransactionTracker::broadcastClosedTransactions,
+                    0,
+                    resourceVacuumIntervalMilliseconds,
+                    TimeUnit.MILLISECONDS
+            );
+        }
 
         finishedTransactionBatchRequestHandler.start();
 
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index fedacac..a8c8414 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -187,7 +187,7 @@
     private final MessagingService messagingService;
 
     /** Local node network identity. This id is available only after the network has started. */
-    private String localNodeId;
+    private volatile String localNodeId;
 
     /** Server cleanup processor. */
     private final TxCleanupRequestHandler txCleanupRequestHandler;
@@ -211,6 +211,10 @@
 
     private final TransactionInflights transactionInflights;
 
+    private final ReplicaService replicaService;
+
+    private volatile PersistentTxStateVacuumizer persistentTxStateVacuumizer;
+
     /**
      * Test-only constructor.
      *
@@ -310,6 +314,7 @@
         this.partitionOperationsExecutor = partitionOperationsExecutor;
         this.transactionInflights = transactionInflights;
         this.lowWatermark = lowWatermark;
+        this.replicaService = replicaService;
 
         placementDriverHelper = new PlacementDriverHelper(placementDriver, clockService);
 
@@ -643,7 +648,9 @@
                                             result.transactionState(),
                                             old == null ? null : old.txCoordinatorId(),
                                             commitPartition,
-                                            result.commitTimestamp()
+                                            result.commitTimestamp(),
+                                            old == null ? null : old.initialVacuumObservationTimestamp(),
+                                            old == null ? null : old.cleanupCompletionTimestamp()
                                     )
                             );
 
@@ -707,7 +714,9 @@
                                     txResult.transactionState(),
                                     localNodeId,
                                     old == null ? null : old.commitPartitionId(),
-                                    txResult.commitTimestamp()
+                                    txResult.commitTimestamp(),
+                                    old == null ? null : old.initialVacuumObservationTimestamp(),
+                                    old == null ? null : old.cleanupCompletionTimestamp()
                             ));
 
                     assert isFinalState(updatedMeta.txState()) :
@@ -754,6 +763,9 @@
 
             messagingService.addMessageHandler(ReplicaMessageGroup.class, this);
 
+            persistentTxStateVacuumizer = new PersistentTxStateVacuumizer(replicaService, topologyService.localMember(), clockService,
+                    placementDriver);
+
             txStateVolatileStorage.start();
 
             orphanDetector.start(txStateVolatileStorage, txConfig.abandonedCheckTs());
@@ -833,10 +845,15 @@
     }
 
     @Override
-    public void vacuum() {
+    public CompletableFuture<Void> vacuum() {
+        if (persistentTxStateVacuumizer == null) {
+            return nullCompletedFuture(); // Not started yet.
+        }
+
         long vacuumObservationTimestamp = System.currentTimeMillis();
 
-        txStateVolatileStorage.vacuum(vacuumObservationTimestamp, txConfig.txnResourceTtl().value());
+        return txStateVolatileStorage.vacuum(vacuumObservationTimestamp, txConfig.txnResourceTtl().value(),
+                persistentTxStateVacuumizer::vacuumPersistentTxStates);
     }
 
     @Override
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
index 6997173..d0e65a9 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
@@ -17,16 +17,24 @@
 
 package org.apache.ignite.internal.tx.impl;
 
+import static java.lang.Math.max;
 import static org.apache.ignite.internal.tx.TxState.PENDING;
 import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.jetbrains.annotations.Nullable;
@@ -36,6 +44,7 @@
  */
 public class VolatileTxStateMetaStorage {
     private static final IgniteLogger LOG = Loggers.forClass(VolatileTxStateMetaStorage.class);
+
     /** The local map for tx states. */
     private ConcurrentHashMap<UUID, TxStateMeta> txStateMap;
 
@@ -117,8 +126,15 @@
      *
      * @param vacuumObservationTimestamp Timestamp of the vacuum attempt.
      * @param txnResourceTtl Transactional resource time to live in milliseconds.
+     * @param persistentVacuumOp Persistent vacuum operation. Accepts the map of commit partition ids to set of
+     *     tx ids, returns a future with set of successfully vacuumized tx ids.
+     * @return Vacuum complete future.
      */
-    public void vacuum(long vacuumObservationTimestamp, long txnResourceTtl) {
+    public CompletableFuture<Void> vacuum(
+            long vacuumObservationTimestamp,
+            long txnResourceTtl,
+            Function<Map<TablePartitionId, Set<UUID>>, CompletableFuture<Set<UUID>>> persistentVacuumOp
+    ) {
         LOG.info("Vacuum started [vacuumObservationTimestamp={}, txnResourceTtl={}].", vacuumObservationTimestamp, txnResourceTtl);
 
         AtomicInteger vacuumizedTxnsCount = new AtomicInteger(0);
@@ -126,27 +142,44 @@
         AtomicInteger alreadyMarkedTxnsCount = new AtomicInteger(0);
         AtomicInteger skippedForFurtherProcessingUnfinishedTxnsCount = new AtomicInteger(0);
 
+        Map<TablePartitionId, Set<UUID>> txIds = new HashMap<>();
+        Map<UUID, Long> cleanupCompletionTimestamps = new HashMap<>();
+
         txStateMap.forEach((txId, meta) -> {
             txStateMap.computeIfPresent(txId, (txId0, meta0) -> {
                 if (TxState.isFinalState(meta0.txState())) {
-                    if (txnResourceTtl == 0) {
-                        vacuumizedTxnsCount.incrementAndGet();
-                        return null;
-                    } else if (meta0.initialVacuumObservationTimestamp() == null) {
+                    Long initialVacuumObservationTimestamp = meta0.initialVacuumObservationTimestamp();
+
+                    if (initialVacuumObservationTimestamp == null && txnResourceTtl > 0) {
                         markedAsInitiallyDetectedTxnsCount.incrementAndGet();
-                        return new TxStateMeta(
-                                meta0.txState(),
-                                meta0.txCoordinatorId(),
-                                meta0.commitPartitionId(),
-                                meta0.commitTimestamp(),
-                                vacuumObservationTimestamp
-                        );
-                    } else if (meta0.initialVacuumObservationTimestamp() + txnResourceTtl < vacuumObservationTimestamp) {
-                        vacuumizedTxnsCount.incrementAndGet();
-                        return null;
+
+                        return markInitialVacuumObservationTimestamp(meta0, vacuumObservationTimestamp);
                     } else {
-                        alreadyMarkedTxnsCount.incrementAndGet();
-                        return meta0;
+                        Long cleanupCompletionTimestamp = meta0.cleanupCompletionTimestamp();
+
+                        boolean shouldBeVacuumized = shouldBeVacuumized(initialVacuumObservationTimestamp,
+                                cleanupCompletionTimestamp, txnResourceTtl, vacuumObservationTimestamp);
+
+                        if (shouldBeVacuumized) {
+                            if (meta0.commitPartitionId() == null) {
+                                vacuumizedTxnsCount.incrementAndGet();
+
+                                return null;
+                            } else {
+                                Set<UUID> ids = txIds.computeIfAbsent(meta0.commitPartitionId(), k -> new HashSet<>());
+                                ids.add(txId);
+
+                                if (cleanupCompletionTimestamp != null) {
+                                    cleanupCompletionTimestamps.put(txId, cleanupCompletionTimestamp);
+                                }
+
+                                return meta0;
+                            }
+                        } else {
+                            alreadyMarkedTxnsCount.incrementAndGet();
+
+                            return meta0;
+                        }
                     }
                 } else {
                     skippedForFurtherProcessingUnfinishedTxnsCount.incrementAndGet();
@@ -155,14 +188,71 @@
             });
         });
 
-        LOG.info("Vacuum finished [vacuumObservationTimestamp={}, txnResourceTtl={}, vacuumizedTxnsCount={},"
-                + " markedAsInitiallyDetectedTxnsCount={}, alreadyMarkedTxnsCount={}, skippedForFurtherProcessingUnfinishedTxnsCount={}].",
+        return persistentVacuumOp.apply(txIds)
+                .thenAccept(successful -> {
+                    for (UUID txId : successful) {
+                        txStateMap.compute(txId, (k, v) -> {
+                            if (v == null) {
+                                return null;
+                            } else {
+                                Long cleanupCompletionTs = cleanupCompletionTimestamps.get(txId);
+
+                                TxStateMeta newMeta = (Objects.equals(cleanupCompletionTs, v.cleanupCompletionTimestamp())) ? null : v;
+
+                                if (newMeta == null) {
+                                    vacuumizedTxnsCount.incrementAndGet();
+                                }
+
+                                return newMeta;
+                            }
+                        });
+                    }
+
+                    LOG.info("Vacuum finished [vacuumObservationTimestamp={}, "
+                                    + "txnResourceTtl={}, "
+                                    + "vacuumizedTxnsCount={}, "
+                                    + "vacuumizedPersistentTxnStatesCount={}, "
+                                    + "markedAsInitiallyDetectedTxnsCount={}, "
+                                    + "alreadyMarkedTxnsCount={}, "
+                                    + "skippedForFurtherProcessingUnfinishedTxnsCount={}].",
+                            vacuumObservationTimestamp,
+                            txnResourceTtl,
+                            vacuumizedTxnsCount,
+                            successful.size(),
+                            markedAsInitiallyDetectedTxnsCount,
+                            alreadyMarkedTxnsCount,
+                            skippedForFurtherProcessingUnfinishedTxnsCount
+                    );
+                });
+    }
+
+    private static TxStateMeta markInitialVacuumObservationTimestamp(TxStateMeta meta, long vacuumObservationTimestamp) {
+        return new TxStateMeta(
+                meta.txState(),
+                meta.txCoordinatorId(),
+                meta.commitPartitionId(),
+                meta.commitTimestamp(),
                 vacuumObservationTimestamp,
-                txnResourceTtl,
-                vacuumizedTxnsCount,
-                markedAsInitiallyDetectedTxnsCount,
-                alreadyMarkedTxnsCount,
-                skippedForFurtherProcessingUnfinishedTxnsCount
+                meta.cleanupCompletionTimestamp()
         );
     }
+
+    private static boolean shouldBeVacuumized(
+            @Nullable Long initialVacuumObservationTimestamp,
+            @Nullable Long cleanupCompletionTimestamp,
+            long txnResourceTtl,
+            long vacuumObservationTimestamp) {
+        if (txnResourceTtl == 0) {
+            return true;
+        }
+
+        assert initialVacuumObservationTimestamp != null : "initialVacuumObservationTimestamp should have been set if txnResourceTtl > 0 "
+                + "[txnResourceTtl=" + txnResourceTtl + "].";
+
+        if (cleanupCompletionTimestamp == null) {
+            return initialVacuumObservationTimestamp + txnResourceTtl < vacuumObservationTimestamp;
+        } else {
+            return max(cleanupCompletionTimestamp, initialVacuumObservationTimestamp) + txnResourceTtl < vacuumObservationTimestamp;
+        }
+    }
 }
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
index ed93c53..68fd6cc 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
@@ -84,4 +84,14 @@
      */
     public static final short TX_CLEANUP_RECOVERY = 11;
 
+    /**
+     * Message type for {@link VacuumTxStateReplicaRequest}.
+     */
+    public static final short VACUUM_TX_STATE_REPLICA_REQUEST = 12;
+
+    /**
+     * Message type for {@link VacuumTxStatesCommand}.
+     */
+    public static final short VACUUM_TX_STATE_COMMAND = 13;
+
 }
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/VacuumTxStateReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/VacuumTxStateReplicaRequest.java
new file mode 100644
index 0000000..8c287db
--- /dev/null
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/VacuumTxStateReplicaRequest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import static org.apache.ignite.internal.tx.message.TxMessageGroup.VACUUM_TX_STATE_REPLICA_REQUEST;
+
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+
+/**
+ * Request that is sent to vacuumize the transaction states.
+ */
+@Transferable(VACUUM_TX_STATE_REPLICA_REQUEST)
+public interface VacuumTxStateReplicaRequest extends PrimaryReplicaRequest {
+    Set<UUID> transactionIds();
+}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/VacuumTxStatesCommand.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/VacuumTxStatesCommand.java
new file mode 100644
index 0000000..3e6d10f
--- /dev/null
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/VacuumTxStatesCommand.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import static org.apache.ignite.internal.tx.message.TxMessageGroup.VACUUM_TX_STATE_COMMAND;
+
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.raft.WriteCommand;
+
+/**
+ * Command that vacuumizes the transaction states.
+ */
+@Transferable(VACUUM_TX_STATE_COMMAND)
+public interface VacuumTxStatesCommand extends WriteCommand {
+    Set<UUID> txIds();
+}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java
index 9e8beb5..0294826 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java
@@ -67,10 +67,10 @@
     }
 
     @Override
-    public void remove(UUID txId) {
+    public void remove(UUID txId, long commandIndex, long commandTerm) {
         assertThreadAllowsToWrite();
 
-        storage.remove(txId);
+        storage.remove(txId, commandIndex, commandTerm);
     }
 
     @Override
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
index 42ad627..81c42d7 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
@@ -74,9 +74,11 @@
      * Removes the tx meta from the storage.
      *
      * @param txId Tx id.
+     * @param commandIndex New value for {@link #lastAppliedIndex()}.
+     * @param commandTerm New value for {@link #lastAppliedTerm()}.
      * @throws IgniteInternalException with {@link Transactions#TX_STATE_STORAGE_ERR} error code in case when the operation has failed.
      */
-    void remove(UUID txId);
+    void remove(UUID txId, long commandIndex, long commandTerm);
 
     /**
      * Creates a cursor to scan all data in the storage.
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index 563f286..53439cc0 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -219,12 +219,21 @@
     }
 
     @Override
-    public void remove(UUID txId) {
+    public void remove(UUID txId, long commandIndex, long commandTerm) {
         busy(() -> {
-            try {
+            try (WriteBatch writeBatch = new WriteBatch()) {
                 throwExceptionIfStorageInProgressOfRebalance();
 
-                sharedStorage.db().delete(txIdToKey(txId));
+                writeBatch.delete(txIdToKey(txId));
+
+                // If the store is in the process of rebalancing, then there is no need to update lastAppliedIndex and lastAppliedTerm.
+                // This is necessary to prevent a situation where, in the middle of the rebalance, the node will be restarted and we will
+                // have non-consistent storage. They will be updated by either #abortRebalance() or #finishRebalance(long, long).
+                if (state.get() != StorageState.REBALANCE) {
+                    updateLastApplied(writeBatch, commandIndex, commandTerm);
+                }
+
+                sharedStorage.db().write(sharedStorage.writeOptions, writeBatch);
 
                 return null;
             } catch (RocksDBException e) {
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
index 6015ee9..0d6357b 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
@@ -103,7 +103,7 @@
 
         for (int i = 0; i < 100; i++) {
             if (i % 2 == 0) {
-                storage.remove(txIds.get(i));
+                storage.remove(txIds.get(i), i, 1);
             }
         }
 
@@ -446,7 +446,7 @@
 
         assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, () -> storage.lastApplied(100, 500));
         assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, () -> storage.get(UUID.randomUUID()));
-        assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, () -> storage.remove(UUID.randomUUID()));
+        assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, () -> storage.remove(UUID.randomUUID(), 1, 1));
         assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, storage::scan);
     }
 
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
index 689bd0b..047dd1c 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
@@ -99,10 +99,15 @@
     }
 
     @Override
-    public void remove(UUID txId) {
+    public void remove(UUID txId, long commandIndex, long commandTerm) {
         checkStorageClosedOrInProgressOfRebalance();
 
         storage.remove(txId);
+
+        if (rebalanceFutureReference.get() == null) {
+            lastAppliedIndex = commandIndex;
+            lastAppliedTerm = commandTerm;
+        }
     }
 
     @Override
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
new file mode 100644
index 0000000..12aa397
--- /dev/null
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.test;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.Assignments;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.table.RecordBinaryViewImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
+import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test utils for transaction integration tests.
+ */
+public class ItTransactionTestUtils {
+    /**
+     * Get the names of the nodes that are assignments of the given partition.
+     *
+     * @param node Any node in the cluster.
+     * @param grpId Group id.
+     * @return Node names.
+     */
+    public static Set<String> partitionAssignment(IgniteImpl node, TablePartitionId grpId) {
+        MetaStorageManager metaStorageManager = node.metaStorageManager();
+
+        ByteArray stableAssignmentKey = stablePartAssignmentsKey(grpId);
+
+        CompletableFuture<Entry> assignmentEntryFut = metaStorageManager.get(stableAssignmentKey);
+
+        assertThat(assignmentEntryFut, willCompleteSuccessfully());
+
+        Entry e = assignmentEntryFut.join();
+
+        assertNotNull(e);
+        assertFalse(e.empty());
+        assertFalse(e.tombstone());
+
+        Set<Assignment> a = requireNonNull(Assignments.fromBytes(e.value())).nodes();
+
+        return a.stream().filter(Assignment::isPeer).map(Assignment::consistentId).collect(toSet());
+    }
+
+    /**
+     * Calculate the partition id on which the given tuple would be placed.
+     *
+     * @param node Any node in the cluster.
+     * @param tableName Table name.
+     * @param tuple Data tuple.
+     * @param tx Transaction, if present.
+     * @return Partition id.
+     */
+    public static int partitionIdForTuple(IgniteImpl node, String tableName, Tuple tuple, @Nullable Transaction tx) {
+        TableImpl table = table(node, tableName);
+        RecordBinaryViewImpl view = unwrapRecordBinaryViewImpl(table.recordView());
+
+        CompletableFuture<BinaryRowEx> rowFut = view.tupleToBinaryRow(tx, tuple);
+        assertThat(rowFut, willCompleteSuccessfully());
+        BinaryRowEx row = rowFut.join();
+
+        return table.internalTable().partitionId(row);
+    }
+
+    /**
+     * Generates some tuple that would be placed in the partition that is hosted on the given node in the cluster.
+     *
+     * @param node Node that should host the result tuple.
+     * @param tableName Table name.
+     * @param tx Transaction, if present.
+     * @param initialTuple Initial tuple, for calculation.
+     * @param nextTuple This function will be used to generate new tuples in order to find suitable one.
+     * @param primary Whether the given node should be the primary node.
+     * @return Tuple that would be placed on the given node.
+     */
+    public static Tuple findTupleToBeHostedOnNode(
+            IgniteImpl node,
+            String tableName,
+            @Nullable Transaction tx,
+            Tuple initialTuple,
+            Function<Tuple, Tuple> nextTuple,
+            boolean primary
+    ) {
+        Tuple t = initialTuple;
+        int tableId = tableId(node, tableName);
+
+        int maxAttempts = 100;
+
+        while (maxAttempts >= 0) {
+            int partId = partitionIdForTuple(node, tableName, t, tx);
+
+            TablePartitionId grpId = new TablePartitionId(tableId, partId);
+
+            if (primary) {
+                ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, grpId);
+
+                if (node.id().equals(replicaMeta.getLeaseholderId())) {
+                    return t;
+                }
+            } else {
+                Set<String> assignments = partitionAssignment(node, grpId);
+
+                if (assignments.contains(node.name())) {
+                    return t;
+                }
+            }
+
+            t = nextTuple.apply(t);
+
+            maxAttempts--;
+        }
+
+        throw new AssertionError("Failed to find a suitable tuple.");
+    }
+
+    /**
+     * Returns table instance.
+     *
+     * @param node Ignite node.
+     * @param tableName Table name.
+     * @return Table instance.
+     */
+    public static TableImpl table(IgniteImpl node, String tableName) {
+        return unwrapTableImpl(node.tables().table(tableName));
+    }
+
+    /**
+     * Returns the table id.
+     *
+     * @param node Any node in the cluster.
+     * @param tableName Table name.
+     * @return Table id.
+     */
+    public static int tableId(IgniteImpl node, String tableName) {
+        return table(node, tableName).tableId();
+    }
+
+    /**
+     * Transaction id.
+     *
+     * @param tx Transaction.
+     * @return Transaction id.
+     */
+    public static UUID txId(Transaction tx) {
+        return ((ReadWriteTransactionImpl) unwrapIgniteTransaction(tx)).id();
+    }
+
+    /**
+     * Waits for the primary replica appearance for the given replication group and returns it.
+     *
+     * @param node Any node in the cluster.
+     * @param replicationGrpId Replication group.
+     * @return Primary replica meta.
+     */
+    public static ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node, ReplicationGroupId replicationGrpId) {
+        CompletableFuture<ReplicaMeta> primaryReplicaFut = node.placementDriver().awaitPrimaryReplica(
+                replicationGrpId,
+                node.clock().now(),
+                10,
+                SECONDS
+        );
+
+        assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+        return primaryReplicaFut.join();
+    }
+
+    /**
+     * Unwraps {@link RecordBinaryViewImpl} from a {@link RecordView}.
+     *
+     * @param view View to unwrap.
+     */
+    private static RecordBinaryViewImpl unwrapRecordBinaryViewImpl(RecordView view) {
+        return Wrappers.unwrap(view, RecordBinaryViewImpl.class);
+    }
+
+    /**
+     * Unwraps {@link TableImpl} from a {@link Table}.
+     *
+     * @param table Table to unwrap.
+     */
+    private static TableImpl unwrapTableImpl(Table table) {
+        return Wrappers.unwrap(table, TableImpl.class);
+    }
+
+    /**
+     * Unwraps {@link Transaction} from an {@link Transaction}.
+     *
+     * @param tx Object to unwrap.
+     */
+    private static Transaction unwrapIgniteTransaction(Transaction tx) {
+        return Wrappers.unwrap(tx, Transaction.class);
+    }
+}