IGNITE-21763 Adjust TxnResourceVacuumTask in order to vacuum persistent txn state (#3591)
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 0ba31a2..2494b21 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -213,8 +213,8 @@
}
@Override
- public void vacuum() {
- // No-op.
+ public CompletableFuture<Void> vacuum() {
+ return nullCompletedFuture();
}
@Override
diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
index 05179d3..c6523ef 100644
--- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
+++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
@@ -147,7 +147,7 @@
return falseCompletedFuture();
});
- NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), tblReplicationGrp, null);
+ NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), tblReplicationGrp);
assertTrue(waitForCondition(primaryChanged::get, 10_000));
}
@@ -179,7 +179,7 @@
Collection<IgniteImpl> nodes = cluster.runningNodes().collect(toSet());
- NodeUtils.transferPrimary(nodes, tblReplicationGrp, null);
+ NodeUtils.transferPrimary(nodes, tblReplicationGrp);
CompletableFuture<String> primaryChangeTask =
IgniteTestUtils.runAsync(() -> NodeUtils.transferPrimary(nodes, tblReplicationGrp, primary));
@@ -264,7 +264,7 @@
assertTrue(primaryIgnite.txManager().lockManager().locks(rwTx.id()).hasNext());
assertEquals(6, partitionStorage.pendingCursors() + hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
- NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), tblReplicationGrp, null);
+ NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), tblReplicationGrp);
assertTrue(primaryIgnite.txManager().lockManager().locks(rwTx.id()).hasNext());
assertEquals(6, partitionStorage.pendingCursors() + hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index cc54faf..abcef59 100644
--- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -314,7 +314,7 @@
}
@Test
- public void prolongAfterActiveActorChanger() throws Exception {
+ public void prolongAfterActiveActorChanged() throws Exception {
var acceptedNodeRef = new AtomicReference<String>();
leaseGrantHandler = (msg, from, to) -> {
diff --git a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index bc85ce3..3ed175c 100644
--- a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.placementdriver;
+import static java.util.Objects.hash;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
@@ -247,10 +248,15 @@
* Finds a node that can be the leaseholder.
*
* @param assignments Replication group assignment.
+ * @param grpId Group id.
* @param proposedConsistentId Proposed consistent id, found out of a lease negotiation. The parameter might be {@code null}.
* @return Cluster node, or {@code null} if no node in assignments can be the leaseholder.
*/
- private @Nullable ClusterNode nextLeaseHolder(Set<Assignment> assignments, @Nullable String proposedConsistentId) {
+ private @Nullable ClusterNode nextLeaseHolder(
+ Set<Assignment> assignments,
+ ReplicationGroupId grpId,
+ @Nullable String proposedConsistentId
+ ) {
// TODO: IGNITE-18879 Implement more intellectual algorithm to choose a node.
ClusterNode primaryCandidate = null;
@@ -267,8 +273,15 @@
primaryCandidate = candidateNode;
break;
- } else if (primaryCandidate == null || primaryCandidate.name().hashCode() > assignment.consistentId().hashCode()) {
+ } else if (primaryCandidate == null) {
primaryCandidate = candidateNode;
+ } else {
+ int candidateHash = hash(primaryCandidate.name(), grpId);
+ int assignmentHash = hash(assignment.consistentId(), grpId);
+
+ if (candidateHash > assignmentHash) {
+ primaryCandidate = candidateNode;
+ }
}
}
@@ -360,7 +373,7 @@
continue;
} else if (agreement.isDeclined()) {
// Here we initiate negotiations for UNDEFINED_AGREEMENT and retry them on newly started active actor as well.
- ClusterNode candidate = nextLeaseHolder(assignments, agreement.getRedirectTo());
+ ClusterNode candidate = nextLeaseHolder(assignments, grpId, agreement.getRedirectTo());
if (candidate == null) {
leaseUpdateStatistics.onLeaseWithoutCandidate();
@@ -385,7 +398,7 @@
? lease.getLeaseholder()
: lease.proposedCandidate();
- ClusterNode candidate = nextLeaseHolder(assignments, proposedLeaseholder);
+ ClusterNode candidate = nextLeaseHolder(assignments, grpId, proposedLeaseholder);
if (candidate == null) {
leaseUpdateStatistics.onLeaseWithoutCandidate();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
index 02f1b08..a36a388 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java
@@ -188,7 +188,7 @@
logger().info("Start transferring primary.");
- NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), defaultTablePartitionId(node(0)), null);
+ NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), defaultTablePartitionId(node(0)));
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
@@ -287,7 +287,7 @@
logger().info("Start transferring primary.");
- NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), defaultTablePartitionId(node(0)), null);
+ NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), defaultTablePartitionId(node(0)));
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
index cf2a29c..6a574d9 100644
--- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
+++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/table/NodeUtils.java
@@ -25,6 +25,7 @@
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -45,20 +46,52 @@
private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory();
/**
+ * Transfers the primary rights to another node, choosing any node from the cluster except the current leaseholder.
+ *
+ * @param nodes Nodes collection.
+ * @param groupId Group id.
+ * @return New primary replica name.
+ * @throws InterruptedException If failed.
+ */
+ public static String transferPrimary(
+ Collection<IgniteImpl> nodes,
+ ReplicationGroupId groupId
+ ) throws InterruptedException {
+ return transferPrimary(nodes, groupId, (Predicate) null);
+ }
+
+ /**
* Transfers the primary rights to another node.
*
* @param nodes Nodes collection.
* @param groupId Group id.
- * @param preferablePrimary Primary replica name which is preferred for being primary or {@code null}.
+ * @param preferablePrimary Primary replica preferable node name.
* @return New primary replica name.
* @throws InterruptedException If failed.
*/
public static String transferPrimary(
Collection<IgniteImpl> nodes,
ReplicationGroupId groupId,
- @Nullable String preferablePrimary
+ String preferablePrimary
) throws InterruptedException {
- LOG.info("Moving the primary replica [preferablePrimary=" + preferablePrimary + "].");
+ return transferPrimary(nodes, groupId, s -> s.equals(preferablePrimary));
+ }
+
+ /**
+ * Transfers the primary rights to another node.
+ *
+ * @param nodes Nodes collection.
+ * @param groupId Group id.
+ * @param preferablePrimaryFilter Primary replica preferable nodes filter, accepts the node consistent id.
+ * @return New primary replica name.
+ * @throws InterruptedException If failed.
+ */
+ public static String transferPrimary(
+ Collection<IgniteImpl> nodes,
+ ReplicationGroupId groupId,
+ @Nullable Predicate<String> preferablePrimaryFilter
+ ) throws InterruptedException {
+ LOG.info("Moving the primary replica [groupId={}].", groupId);
IgniteImpl node = nodes.stream().findAny().orElseThrow();
@@ -68,16 +101,50 @@
.filter(n -> n.id().equals(currentLeaseholder.getLeaseholderId()))
.findFirst().orElseThrow();
- if (preferablePrimary == null) {
- preferablePrimary = nodes.stream()
- .map(IgniteImpl::name)
- .filter(n -> !n.equals(currentLeaseholder.getLeaseholder()))
- .findFirst()
- .orElseThrow();
+ Predicate<String> filter = preferablePrimaryFilter == null ? name -> true : preferablePrimaryFilter::test;
+
+ String finalPreferablePrimary = nodes.stream()
+ .map(IgniteImpl::name)
+ .filter(n -> !n.equals(currentLeaseholder.getLeaseholder()) && filter.test(n))
+ .findFirst()
+ .orElseThrow();
+
+ LOG.info("Moving the primary replica [groupId={}, currentLeaseholder={}, preferablePrimary={}].", groupId, leaseholderNode.name(),
+ finalPreferablePrimary);
+
+ ReplicaMeta[] newPrimaryReplica = new ReplicaMeta[1];
+ boolean[] stopLeaseNeeded = { true };
+
+ boolean success = waitForCondition(() -> {
+ if (stopLeaseNeeded[0]) {
+ stopLeaseProlongation(nodes, leaseholderNode, groupId, finalPreferablePrimary);
+ }
+
+ ReplicaMeta previousPrimary = newPrimaryReplica[0] == null ? currentLeaseholder : newPrimaryReplica[0];
+
+ newPrimaryReplica[0] = leaseholder(node, groupId);
+
+ // If the lease is changed to not suitable one, then stopLeaseProlongation will be retried, otherwise the cycle will be stopped.
+ stopLeaseNeeded[0] =
+ !previousPrimary.getStartTime().equals(newPrimaryReplica[0].getStartTime()) // if lease changed
+ || !previousPrimary.getExpirationTime().equals(newPrimaryReplica[0].getExpirationTime()); // if lease prolonged
+
+ return newPrimaryReplica[0].getLeaseholder().equals(finalPreferablePrimary);
+ }, 30_000);
+
+ if (success) {
+ LOG.info("Primary replica moved successfully from [{}] to [{}].", currentLeaseholder.getLeaseholder(), finalPreferablePrimary);
+ } else {
+ LOG.info("Moving the primary replica failed [groupId={}, actualPrimary={}].", groupId, newPrimaryReplica[0]);
}
- String finalPreferablePrimary = preferablePrimary;
+ assertTrue(success);
+ return finalPreferablePrimary;
+ }
+
+ private static void stopLeaseProlongation(Collection<IgniteImpl> nodes, IgniteImpl leaseholderNode, ReplicationGroupId groupId,
+ String preferablePrimary) {
StopLeaseProlongationMessage msg = PLACEMENT_DRIVER_MESSAGES_FACTORY.stopLeaseProlongationMessage()
.groupId(groupId)
.redirectProposal(preferablePrimary)
@@ -86,17 +153,6 @@
nodes.forEach(
n -> leaseholderNode.clusterService().messagingService().send(n.clusterService().topologyService().localMember(), msg)
);
-
- assertTrue(waitForCondition(() -> {
- ReplicaMeta newPrimaryReplica = leaseholder(node, groupId);
-
- return newPrimaryReplica.getLeaseholder().equals(finalPreferablePrimary);
- }, 10_000));
-
- LOG.info("Primary replica moved successfully from [{}] to [{}].",
- currentLeaseholder.getLeaseholder(), finalPreferablePrimary);
-
- return finalPreferablePrimary;
}
private static ReplicaMeta leaseholder(IgniteImpl node, ReplicationGroupId groupId) {
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
index d4617d3..08b79a0 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionPrimaryChangeTest.java
@@ -17,13 +17,13 @@
package org.apache.ignite.internal.table;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteTransaction;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.waitAndGetPrimaryReplica;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -33,8 +33,6 @@
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.internal.placementdriver.ReplicaMeta;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
@@ -87,7 +85,8 @@
builder.clusterConfiguration("{"
+ " transaction: {"
- + " implicitTransactionTimeout: 30000"
+ + " implicitTransactionTimeout: 30000,"
+ + " txnResourceTtl: 2"
+ " },"
+ " replication: {"
+ " rpcTimeout: 30000"
@@ -205,17 +204,4 @@
private IgniteImpl findNodeByName(String leaseholder) {
return findNode(0, initialNodes(), n -> leaseholder.equals(n.name()));
}
-
- private static ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node, ReplicationGroupId tblReplicationGrp) {
- CompletableFuture<ReplicaMeta> primaryReplicaFut = node.placementDriver().awaitPrimaryReplica(
- tblReplicationGrp,
- node.clock().now(),
- 10,
- SECONDS
- );
-
- assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
- return primaryReplicaFut.join();
- }
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index 105d8d3..5147713 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -26,6 +26,7 @@
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.waitAndGetPrimaryReplica;
import static org.apache.ignite.internal.util.ExceptionUtils.extractCodeFrom;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -57,7 +58,6 @@
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.DefaultMessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
-import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
import org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -1149,19 +1149,6 @@
return findNode(1, initialNodes(), n -> !leaseholder.equals(n.name()));
}
- private static ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node, ReplicationGroupId tblReplicationGrp) {
- CompletableFuture<ReplicaMeta> primaryReplicaFut = node.placementDriver().awaitPrimaryReplica(
- tblReplicationGrp,
- node.clock().now(),
- 10,
- SECONDS
- );
-
- assertThat(primaryReplicaFut, willCompleteSuccessfully());
-
- return primaryReplicaFut.join();
- }
-
private static String waitAndGetLeaseholder(IgniteImpl node, ReplicationGroupId tblReplicationGrp) {
return waitAndGetPrimaryReplica(node, tblReplicationGrp).getLeaseholder();
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
new file mode 100644
index 0000000..eb02e0e
--- /dev/null
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTxResourcesVacuumTest.java
@@ -0,0 +1,1052 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static org.apache.ignite.internal.table.NodeUtils.transferPrimary;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.FINISHING;
+import static org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.findTupleToBeHostedOnNode;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.partitionAssignment;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.partitionIdForTuple;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.table;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.tableId;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.txId;
+import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.waitAndGetPrimaryReplica;
+import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.ignite.InitParametersBuilder;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.internal.thread.ThreadOperation;
+import org.apache.ignite.internal.tx.TransactionMeta;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.message.TxCleanupMessage;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionOptions;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Integration tests for tx recources vacuum.
+ */
+@ExtendWith(SystemPropertiesExtension.class)
+@WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = "1000")
+public class ItTxResourcesVacuumTest extends ClusterPerTestIntegrationTest {
+ /** Table name. */
+ private static final String TABLE_NAME = "test_table";
+
+ private static final Tuple INITIAL_TUPLE = Tuple.create().set("key", 1L).set("val", "1");
+
+ private static final Function<Tuple, Tuple> NEXT_TUPLE = t -> Tuple.create()
+ .set("key", t.longValue("key") + 1)
+ .set("val", "" + (t.longValue("key") + 1));
+
+ private static final int REPLICAS = 2;
+
+ /** Nodes bootstrap configuration pattern. */
+ private static final String NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"
+ + " network: {\n"
+ + " port: {},\n"
+ + " nodeFinder: {\n"
+ + " netClusterNodes: [ {} ]\n"
+ + " }\n"
+ + " },\n"
+ + " clientConnector: { port:{} },\n"
+ + " rest.port: {},\n"
+ + " raft: { responseTimeout: 30000 },"
+ + " compute.threadPoolSize: 1\n"
+ + "}";
+
+ private ExecutorService txStateStorageExecutor = Executors.newSingleThreadExecutor();
+
+ @BeforeEach
+ @Override
+ public void setup(TestInfo testInfo) throws Exception {
+ super.setup(testInfo);
+
+ String zoneSql = "create zone test_zone with partitions=20, replicas=" + REPLICAS
+ + ", storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'";
+ String sql = "create table " + TABLE_NAME + " (key bigint primary key, val varchar(20)) with primary_zone='TEST_ZONE'";
+
+ cluster.doInSession(0, session -> {
+ executeUpdate(zoneSql, session);
+ executeUpdate(sql, session);
+ });
+
+ txStateStorageExecutor = Executors.newSingleThreadExecutor(IgniteThreadFactory.create("test", "tx-state-storage-test-pool", log,
+ ThreadOperation.STORAGE_READ));
+ }
+
+ @Override
+ @AfterEach
+ public void tearDown() {
+ shutdownAndAwaitTermination(txStateStorageExecutor, 10, TimeUnit.SECONDS);
+
+ super.tearDown();
+ }
+
+ @Override
+ protected void customizeInitParameters(InitParametersBuilder builder) {
+ super.customizeInitParameters(builder);
+
+ builder.clusterConfiguration("{"
+ + " transaction: {"
+ + " txnResourceTtl: 0"
+ + " },"
+ + " replication: {"
+ + " rpcTimeout: 30000"
+ + " },"
+ + "}");
+ }
+
+ /**
+ * Returns node bootstrap config template.
+ *
+ * @return Node bootstrap config template.
+ */
+ @Override
+ protected String getNodeBootstrapConfigTemplate() {
+ return NODE_BOOTSTRAP_CFG_TEMPLATE;
+ }
+
+ /**
+ * Simple TTL-triggered vacuum test, checking also that PENDING and FINISHING states are not removed.
+ *
+ * <ul>
+ * <li>Run a transaction;</li>
+ * <li>Run a parallel transaction;</li>
+ * <li>Insert values within both transactions;</li>
+ * <li>Commit the parallel transaction and wait for vacuum of its state;</li>
+ * <li>Run another parallel transaction;</li>
+ * <li>Check that the volatile PENDING state of the transaction is preserved;</li>
+ * <li>Block {@link TxFinishReplicaRequest} for the pending transaction;</li>
+ * <li>Start the tx commit;</li>
+ * <li>While the state is FINISHING, commit the parallel transaction and wait for vacuum of its state;</li>
+ * <li>Check that the volatile state of the transaction is preserved;</li>
+ * <li>Unblock {@link TxFinishReplicaRequest};</li>
+ * <li>Check that both volatile and persistent state is vacuumized;</li>
+ * <li>Check that the committed value is correct.</li>
+ * </ul>
+ */
+ @Test
+ public void testVacuum() throws InterruptedException {
+ // We should test the TTL-triggered vacuum.
+ setTxResourceTtl(1);
+
+ IgniteImpl node = anyNode();
+
+ RecordView<Tuple> view = node.tables().table(TABLE_NAME).recordView();
+
+ // Put some value into the table.
+ Transaction tx = node.transactions().begin();
+ Transaction parallelTx1 = node.transactions().begin();
+ UUID txId = txId(tx);
+ UUID parallelTx1Id = txId(parallelTx1);
+
+ log.info("Test: Loading the data [tx={}].", txId);
+
+ Tuple tuple = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
+ Tuple tupleForParallelTx = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, NEXT_TUPLE.apply(tuple), NEXT_TUPLE, true);
+ int partIdForParallelTx = partitionIdForTuple(anyNode(), TABLE_NAME, tupleForParallelTx, parallelTx1);
+
+ int partId = partitionIdForTuple(node, TABLE_NAME, tuple, tx);
+
+ Set<String> nodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), partId));
+
+ view.upsert(tx, tuple);
+ view.upsert(parallelTx1, tupleForParallelTx);
+
+ // Check that the volatile PENDING state of the transaction is preserved.
+ parallelTx1.commit();
+ waitForTxStateVacuum(nodes, parallelTx1Id, partIdForParallelTx, true, 10_000);
+ assertTrue(checkVolatileTxStateOnNodes(nodes, txId));
+
+ Transaction parallelTx2 = node.transactions().begin();
+ UUID parallelTx2Id = txId(parallelTx2);
+ view.upsert(parallelTx2, tupleForParallelTx);
+
+ CompletableFuture<Void> finishStartedFuture = new CompletableFuture<>();
+ CompletableFuture<Void> finishAllowedFuture = new CompletableFuture<>();
+
+ node.dropMessages((n, msg) -> {
+ if (msg instanceof TxFinishReplicaRequest) {
+ TxFinishReplicaRequest finishRequest = (TxFinishReplicaRequest) msg;
+
+ if (finishRequest.txId().equals(txId)) {
+ finishStartedFuture.complete(null);
+
+ finishAllowedFuture.join();
+ }
+ }
+
+ return false;
+ });
+
+ Transaction roTxBefore = beginReadOnlyTx(anyNode());
+
+ CompletableFuture<Void> commitFut = runAsync(tx::commit);
+
+ assertThat(finishStartedFuture, willCompleteSuccessfully());
+
+ // While the state is FINISHING, wait 3 seconds.
+ assertEquals(FINISHING, volatileTxState(node, txId).txState());
+ parallelTx2.commit();
+ waitForTxStateVacuum(nodes, parallelTx2Id, partId, true, 10_000);
+
+ // Check that the volatile state of the transaction is preserved.
+ assertTrue(checkVolatileTxStateOnNodes(nodes, txId));
+
+ finishAllowedFuture.complete(null);
+
+ assertThat(commitFut, willCompleteSuccessfully());
+
+ log.info("Test: Tx committed [tx={}].", txId);
+
+ Transaction roTxAfter = beginReadOnlyTx(anyNode());
+
+ waitForTxStateReplication(nodes, txId, partId, 10_000);
+
+ // Check that both volatile and persistent state is vacuumized..
+ waitForTxStateVacuum(txId, partId, true, 10_000);
+
+ // Trying to read the value.
+ Tuple keyRec = Tuple.create().set("key", tuple.longValue("key"));
+ checkValueReadOnly(view, roTxBefore, keyRec, null);
+ checkValueReadOnly(view, roTxAfter, keyRec, tuple);
+ }
+
+ /**
+ * Check that the ABANDONED transaction state is preserved until recovery.
+ *
+ * <ul>
+ * <li>Start a transaction from a coordinator that would be not included into commit partition group;</li>
+ * <li>Start a parallel transaction;</li>
+ * <li>Find a tuple for parallel tx that would be hosted on the same partition as a tuple for the abandoned tx;</li>
+ * <li>Insert values within both transactions;</li>
+ * <li>Commit the parallel transaction;</li>
+ * <li>Stop the tx coordinator;</li>
+ * <li>Wait for tx state of parallel tx to be vacuumized;</li>
+ * <li>Check that the volatile state of the transaction is preserved;</li>
+ * <li>Try to read the value using another transaction, which starts the tx recovery;</li>
+ * <li>Check that abandoned tx is rolled back and thus the value is null;</li>
+ * <li>Check that the abandoned transaction is recovered; its volatile and persistent states are vacuumized.</li>
+ * </ul>
+ */
+ @Test
+ public void testAbandonedTxnsAreNotVacuumizedUntilRecovered() throws InterruptedException {
+ setTxResourceTtl(1);
+
+ IgniteImpl leaseholder = cluster.node(0);
+
+ Tuple tuple = findTupleToBeHostedOnNode(leaseholder, TABLE_NAME, null, INITIAL_TUPLE, NEXT_TUPLE, true);
+
+ int partId = partitionIdForTuple(anyNode(), TABLE_NAME, tuple, null);
+
+ TablePartitionId groupId = new TablePartitionId(tableId(anyNode(), TABLE_NAME), partId);
+
+ Set<String> txNodes = partitionAssignment(anyNode(), groupId);
+
+ IgniteImpl abandonedTxCoord = findNode(n -> !txNodes.contains(n.name()));
+
+ RecordView<Tuple> view = abandonedTxCoord.tables().table(TABLE_NAME).recordView();
+
+ Transaction abandonedTx = abandonedTxCoord.transactions().begin();
+ UUID abandonedTxId = txId(abandonedTx);
+ Transaction parallelTx = abandonedTxCoord.transactions().begin();
+ UUID parallelTxId = txId(parallelTx);
+
+ // Find a tuple hosted on the same partition.
+ Tuple tupleForParallelTx = tuple;
+ int partIdForParallelTx = -1;
+ while (partIdForParallelTx != partId) {
+ tupleForParallelTx = findTupleToBeHostedOnNode(leaseholder, TABLE_NAME, null, NEXT_TUPLE.apply(tupleForParallelTx), NEXT_TUPLE,
+ true);
+
+ partIdForParallelTx = partitionIdForTuple(anyNode(), TABLE_NAME, tupleForParallelTx, parallelTx);
+ }
+
+ view.upsert(abandonedTx, tuple);
+ view.upsert(parallelTx, tupleForParallelTx);
+
+ parallelTx.commit();
+
+ stopNode(abandonedTxCoord.name());
+
+ waitForTxStateVacuum(txNodes, parallelTxId, partIdForParallelTx, true, 10_000);
+
+ // Check that the volatile state of the transaction is preserved.
+ assertTrue(checkVolatileTxStateOnNodes(txNodes, abandonedTxId));
+
+ // Try to read the value using another transaction, which starts the tx recovery.
+ RecordView<Tuple> viewLh = leaseholder.tables().table(TABLE_NAME).recordView();
+ Tuple value = viewLh.get(null, Tuple.create().set("key", tuple.longValue("key")));
+ // Check that abandoned tx is rolled back and thus the value is null.
+ assertNull(value);
+
+ // Check that the abandoned transaction is recovered; its volatile and persistent states are vacuumized.
+ // Wait for it, because we don't have the recovery completion future.
+ waitForTxStateVacuum(txNodes, abandonedTxId, partId, true, 10_000);
+ }
+
+ /**
+ * Check that the tx state on commit partition is vacuumized only when cleanup is completed.
+ *
+ * <ul>
+ * <li>Start a transaction;</li>
+ * <li>Generate some tuple and define on which nodes it would be hosted;</li>
+ * <li>Choose one more node that doesn't host the first tuple and choose a tuple that will be sent on this node as primary;</li>
+ * <li>Upsert both tuples within a transaction;</li>
+ * <li>Block {@link TxCleanupMessage}-s from commit partition primary;</li>
+ * <li>Start a tx commit;</li>
+ * <li>Wait for vacuum completion on a node that doesn't host the commit partition;</li>
+ * <li>Unblock {@link TxCleanupMessage}-s;</li>
+ * <li>Wait for the tx state vacuum on the commit partition group.</li>
+ * </ul>
+ */
+ @Test
+ @WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = "0")
+ public void testVacuumWithCleanupDelay() throws InterruptedException {
+ IgniteImpl node = anyNode();
+
+ RecordView<Tuple> view = node.tables().table(TABLE_NAME).recordView();
+
+ // Put some value into the table.
+ Transaction tx = node.transactions().begin();
+ UUID txId = txId(tx);
+
+ log.info("Test: Loading the data [tx={}].", txId);
+
+ // Generate some tuple and define on which nodes it would be hosted.
+ Tuple tuple0 = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
+
+ int commitPartId = partitionIdForTuple(node, TABLE_NAME, tuple0, tx);
+
+ TablePartitionId commitPartGrpId = new TablePartitionId(tableId(node, TABLE_NAME), commitPartId);
+
+ ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, commitPartGrpId);
+ IgniteImpl commitPartitionLeaseholder = findNode(n -> n.id().equals(replicaMeta.getLeaseholderId()));
+
+ Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
+
+ log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);
+
+ // Some node that does not host the commit partition, will be the primary node for upserting another tuple.
+ IgniteImpl leaseholderForAnotherTuple = findNode(n -> !commitPartNodes.contains(n.name()));
+
+ log.info("Test: leaseholderForAnotherTuple={}", leaseholderForAnotherTuple.name());
+
+ Tuple tuple1 = findTupleToBeHostedOnNode(leaseholderForAnotherTuple, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
+
+ // Upsert both tuples within a transaction.
+ view.upsert(tx, tuple0);
+ view.upsert(tx, tuple1);
+
+ CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
+ CompletableFuture<Void> cleanupAllowed = new CompletableFuture<>();
+
+ commitPartitionLeaseholder.dropMessages((n, msg) -> {
+ if (msg instanceof TxCleanupMessage) {
+ cleanupStarted.complete(null);
+
+ log.info("Test: cleanup started.");
+
+ if (commitPartNodes.contains(n)) {
+ cleanupAllowed.join();
+ }
+ }
+
+ return false;
+ });
+
+ Transaction roTxBefore = beginReadOnlyTx(anyNode());
+
+ CompletableFuture<Void> commitFut = tx.commitAsync();
+
+ waitForTxStateReplication(commitPartNodes, txId, commitPartId, 10_000);
+
+ assertThat(cleanupStarted, willCompleteSuccessfully());
+
+ // Check the vacuum result on a node that doesn't host the commit partition.
+ triggerVacuum();
+ assertTxStateVacuumized(Set.of(leaseholderForAnotherTuple.name()), txId, commitPartId, false);
+
+ // Unblocking cleanup.
+ cleanupAllowed.complete(null);
+
+ assertThat(commitFut, willCompleteSuccessfully());
+
+ Transaction roTxAfter = beginReadOnlyTx(anyNode());
+
+ waitForCondition(() -> volatileTxState(commitPartitionLeaseholder, txId) != null, 10_000);
+
+ triggerVacuum();
+ assertTxStateVacuumized(txId, commitPartId, true);
+
+ // Trying to read the values.
+ Tuple key0 = Tuple.create().set("key", tuple0.longValue("key"));
+ Tuple key1 = Tuple.create().set("key", tuple1.longValue("key"));
+ checkValueReadOnly(view, roTxBefore, key0, null);
+ checkValueReadOnly(view, roTxAfter, key0, tuple0);
+ checkValueReadOnly(view, roTxBefore, key1, null);
+ checkValueReadOnly(view, roTxAfter, key1, tuple1);
+ }
+
+ /**
+ * Check that the tx state on commit partition is vacuumized only when cleanup is completed.
+ *
+ * <ul>
+ * <li>Start a transaction;</li>
+ * <li>Upsert a value;</li>
+ * <li>Block {@link TxCleanupMessage}-s;</li>
+ * <li>Start a tx commit;</li>
+ * <li>Transfer the primary replica;</li>
+ * <li>Unblock the {@link TxCleanupMessage}-s;</li>
+ * <li>Ensure that tx states are finally vacuumized.</li>
+ * </ul>
+ */
+ @Test
+ public void testCommitPartitionPrimaryChangesBeforeVacuum() throws InterruptedException {
+ // We can't leave TTL as 0 here, because the primary replica is changed during cleanup, and this means
+ // WriteIntentSwitchReplicaRequest will be processed not on the primary. Removing tx state instantly will cause incorrect
+ // tx recovery and write intent switch with tx state as ABORTED.
+ setTxResourceTtl(1);
+
+ IgniteImpl node = anyNode();
+
+ RecordView<Tuple> view = node.tables().table(TABLE_NAME).recordView();
+
+ // Put some value into the table.
+ Transaction tx = node.transactions().begin();
+ UUID txId = txId(tx);
+
+ log.info("Test: Loading the data [tx={}].", txId);
+
+ Tuple tuple = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
+
+ int commitPartId = partitionIdForTuple(node, TABLE_NAME, tuple, tx);
+
+ TablePartitionId commitPartGrpId = new TablePartitionId(tableId(node, TABLE_NAME), commitPartId);
+
+ ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, commitPartGrpId);
+ IgniteImpl commitPartitionLeaseholder = findNode(n -> n.id().equals(replicaMeta.getLeaseholderId()));
+
+ Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
+
+ log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);
+
+ view.upsert(tx, tuple);
+
+ CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
+ CompletableFuture<Void> cleanupAllowedFut = new CompletableFuture<>();
+ boolean[] cleanupAllowed = new boolean[1];
+
+ commitPartitionLeaseholder.dropMessages((n, msg) -> {
+ if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
+ cleanupStarted.complete(null);
+
+ cleanupAllowedFut.join();
+ }
+
+ return false;
+ });
+
+ Transaction roTxBefore = beginReadOnlyTx(anyNode());
+
+ CompletableFuture<Void> commitFut = tx.commitAsync();
+
+ assertThat(cleanupStarted, willCompleteSuccessfully());
+
+ transferPrimary(cluster.runningNodes().collect(toSet()), commitPartGrpId, commitPartNodes::contains);
+
+ cleanupAllowedFut.complete(null);
+
+ cleanupAllowed[0] = true;
+
+ assertThat(commitFut, willCompleteSuccessfully());
+
+ log.info("Test: tx committed.");
+
+ waitForTxStateVacuum(txId, commitPartId, true, 10_000);
+
+ Transaction roTxAfter = beginReadOnlyTx(anyNode());
+
+ log.info("Test: checking values.");
+
+ // Trying to read the value.
+ Tuple key = Tuple.create().set("key", tuple.longValue("key"));
+ checkValueReadOnly(view, roTxBefore, key, null);
+ checkValueReadOnly(view, roTxAfter, key, tuple);
+ }
+
+ /**
+ * Check that the tx state on commit partition is vacuumized only when cleanup is completed.
+ *
+ * <ul>
+ * <li>Start a transaction;</li>
+ * <li>Upsert a tuple;</li>
+ * <li>Block {@link TxCleanupMessage}-s from commit partition primary;</li>
+ * <li>Start a tx commit;</li>
+ * <li>Wait for tx cleanup to start;</li>
+ * <li>Wait for volatile tx state vacuum;</li>
+ * <li>Unblock {@link TxCleanupMessage}-s;</li>
+ * <li>Wait for the tx state vacuum on the commit partition group.</li>
+ * </ul>
+ */
+ @Test
+ @WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = "0")
+ public void testVacuumPersistentStateAfterCleanupDelayAndVolatileStateVacuum() throws InterruptedException {
+ IgniteImpl node = anyNode();
+
+ RecordView<Tuple> view = node.tables().table(TABLE_NAME).recordView();
+
+ // Put some value into the table.
+ Transaction tx = node.transactions().begin();
+ UUID txId = txId(tx);
+
+ log.info("Test: Loading the data [tx={}].", txId);
+
+ Tuple tuple = findTupleToBeHostedOnNode(node, TABLE_NAME, tx, INITIAL_TUPLE, NEXT_TUPLE, true);
+
+ int commitPartId = partitionIdForTuple(node, TABLE_NAME, tuple, tx);
+
+ TablePartitionId commitPartGrpId = new TablePartitionId(tableId(node, TABLE_NAME), commitPartId);
+
+ ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, commitPartGrpId);
+ IgniteImpl commitPartitionLeaseholder = findNode(n -> n.id().equals(replicaMeta.getLeaseholderId()));
+
+ Set<String> commitPartNodes = partitionAssignment(node, new TablePartitionId(tableId(node, TABLE_NAME), commitPartId));
+
+ log.info("Test: Commit partition [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(), commitPartNodes);
+
+ view.upsert(tx, tuple);
+
+ CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
+ CompletableFuture<Void> cleanupAllowedFut = new CompletableFuture<>();
+ boolean[] cleanupAllowed = new boolean[1];
+
+ commitPartitionLeaseholder.dropMessages((n, msg) -> {
+ if (msg instanceof TxCleanupMessage && !cleanupAllowed[0]) {
+ cleanupStarted.complete(null);
+
+ cleanupAllowedFut.join();
+
+ return true;
+ }
+
+ return false;
+ });
+
+ Transaction roTxBefore = beginReadOnlyTx(anyNode());
+
+ CompletableFuture<Void> commitFut = tx.commitAsync();
+
+ waitForTxStateReplication(commitPartNodes, txId, commitPartId, 10_000);
+
+ assertThat(cleanupStarted, willCompleteSuccessfully());
+
+ // Wait for volatile tx state vacuum. This is possible because tx finish is complete.
+ triggerVacuum();
+ assertTxStateVacuumized(txId, commitPartId, false);
+
+ log.info("Test: volatile state vacuumized");
+
+ cleanupAllowedFut.complete(null);
+
+ cleanupAllowed[0] = true;
+
+ assertThat(commitFut, willCompleteSuccessfully());
+
+ log.info("Test: commit completed.");
+
+ Transaction roTxAfter = beginReadOnlyTx(anyNode());
+
+ waitForCondition(() -> {
+ TxStateMeta txStateMeta = (TxStateMeta) volatileTxState(commitPartitionLeaseholder, txId);
+
+ return txStateMeta != null && txStateMeta.cleanupCompletionTimestamp() != null;
+ }, 10_000);
+
+ log.info("Test: cleanup completed.");
+
+ triggerVacuum();
+ assertTxStateVacuumized(txId, commitPartId, true);
+
+ // Trying to read the data.
+ Tuple key = Tuple.create().set("key", tuple.longValue("key"));
+ checkValueReadOnly(view, roTxBefore, key, null);
+ checkValueReadOnly(view, roTxAfter, key, tuple);
+ }
+
+ /**
+ * Checks that the tx recovery doesn't change tx finish result from COMMITTED to ABORTED if it once saved in the persistent storage.
+ *
+ * <ul>
+ * <li>Start a transaction tx0;</li>
+ * <li>Upsert some value;</li>
+ * <li>Block {@link TxCleanupMessage}-s;</li>
+ * <li>Start the commit of tx0 and with for tx state COMMITTED to be replicated in persistent storage;</li>
+ * <li>Stop the tx0's coordinator;</li>
+ * <li>Wait for tx0's state vacuum;</li>
+ * <li>Try to get the data that has been committed by tx0, ensure the data is correct.</li>
+ * </ul>
+ */
+ @Test
+ public void testRecoveryAfterPersistentStateVacuumized() throws InterruptedException {
+ // This node isn't going to be stopped, so let it be node 0.
+ IgniteImpl commitPartitionLeaseholder = cluster.node(0);
+
+ Tuple tuple0 = findTupleToBeHostedOnNode(commitPartitionLeaseholder, TABLE_NAME, null, INITIAL_TUPLE, NEXT_TUPLE, true);
+
+ int commitPartId = partitionIdForTuple(commitPartitionLeaseholder, TABLE_NAME, tuple0, null);
+
+ Set<String> commitPartitionNodes = partitionAssignment(commitPartitionLeaseholder,
+ new TablePartitionId(tableId(commitPartitionLeaseholder, TABLE_NAME), commitPartId));
+
+ // Choose some node that doesn't host the partition as a tx coordinator.
+ IgniteImpl coord0 = findNode(n -> !commitPartitionNodes.contains(n.name()));
+
+ RecordView<Tuple> view0 = coord0.tables().table(TABLE_NAME).recordView();
+
+ // Put some value into the table.
+ Transaction tx0 = coord0.transactions().begin();
+ UUID txId0 = txId(tx0);
+
+ log.info("Test: Transaction 0 [tx={}].", txId0);
+
+ log.info("Test: Commit partition of transaction 0 [leaseholder={}, hostingNodes={}].", commitPartitionLeaseholder.name(),
+ commitPartitionNodes);
+
+ view0.upsert(tx0, tuple0);
+
+ CompletableFuture<Void> cleanupStarted = new CompletableFuture<>();
+
+ commitPartitionLeaseholder.dropMessages((n, msg) -> {
+ if (msg instanceof TxCleanupMessage) {
+ cleanupStarted.complete(null);
+
+ return false;
+ }
+
+ return false;
+ });
+
+ log.info("Test: Committing the transaction 0 [tx={}].", txId0);
+
+ tx0.commitAsync();
+
+ // Check that the final tx state COMMITTED is saved to the persistent tx storage.
+ assertTrue(waitForCondition(() -> cluster.runningNodes().filter(n -> commitPartitionNodes.contains(n.name())).allMatch(n -> {
+ TransactionMeta meta = persistentTxState(n, txId0, commitPartId);
+
+ return meta != null && meta.txState() == COMMITTED;
+ }), 10_000));
+
+ assertThat(cleanupStarted, willCompleteSuccessfully());
+
+ // Stop the first transaction coordinator.
+ stopNode(coord0.name());
+
+ // No cleanup happened, waiting for vacuum on the remaining nodes that participated on tx0.
+ waitForTxStateVacuum(txId0, commitPartId, true, 10_000);
+
+ // Preparing to run another tx.
+ IgniteImpl coord1 = anyNode();
+
+ RecordView<Tuple> view1 = coord1.tables().table(TABLE_NAME).recordView();
+
+ // Another tx should get the data committed by tx 0.
+ Tuple keyTuple = Tuple.create().set("key", tuple0.longValue("key"));
+ Tuple tx0Data = view1.get(null, keyTuple);
+ assertEquals(tuple0.longValue("key"), tx0Data.longValue("key"));
+ assertEquals(tuple0.stringValue("val"), tx0Data.stringValue("val"));
+
+ // Waiting for vacuum, because there is no recovery future here.
+ waitForTxStateVacuum(txId0, commitPartId, true, 10_000);
+ }
+
+ /**
+ * Check that RO txns read the correct data consistent with commit timestamps.
+ *
+ * <ul>
+ * <li>For this test, create another zone and table with number of replicas that is equal to number of nodes;</li>
+ * <li>Start RO tx 1;</li>
+ * <li>Upsert (k1, v1) within RW tx 1 and commit it;</li>
+ * <li>Start RO tx 2;</li>
+ * <li>Upsert (k1, v2) within RW tx 2 and commit it;</li>
+ * <li>Start RO tx 3;</li>
+ * <li>Wait for vacuum of the states of RW tx 1 and RW tx 2;</li>
+ * <li>Read the data by k1 within RO tx 1, should be null;</li>
+ * <li>Read the data by k1 within RO tx 2, should be v1;</li>
+ * <li>Read the data by k1 within RO tx 3, should be v2.</li>
+ * </ul>
+ */
+ @Test
+ public void testRoReadTheCorrectDataInBetween() {
+ setTxResourceTtl(0);
+
+ IgniteImpl node = anyNode();
+
+ String tableName = TABLE_NAME + "_1";
+
+ // For this test, create another zone and table with number of replicas that is equal to number of nodes.
+ String zoneSql = "create zone test_zone_1 with partitions=20, replicas=" + initialNodes()
+ + ", storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'";
+ String sql = "create table " + tableName + " (key bigint primary key, val varchar(20)) with primary_zone='TEST_ZONE_1'";
+
+ cluster.doInSession(0, session -> {
+ executeUpdate(zoneSql, session);
+ executeUpdate(sql, session);
+ });
+
+ Transaction roTx1 = beginReadOnlyTx(node);
+
+ Tuple t1 = Tuple.create().set("key", 1L).set("val", "val1");
+ Tuple t2 = Tuple.create().set("key", 1L).set("val", "val2");
+
+ RecordView<Tuple> view = table(node, tableName).recordView();
+
+ Transaction rwTx1 = node.transactions().begin();
+ view.upsert(rwTx1, t1);
+ rwTx1.commit();
+ UUID rwTxId1 = txId(rwTx1);
+
+ Transaction roTx2 = beginReadOnlyTx(node);
+
+ Transaction rwTx2 = node.transactions().begin();
+ view.upsert(rwTx2, t2);
+ rwTx2.commit();
+ UUID rwTxId2 = txId(rwTx1);
+
+ Transaction roTx3 = beginReadOnlyTx(node);
+
+ triggerVacuum();
+
+ assertTxStateVacuumized(rwTxId1, tableName, partitionIdForTuple(node, tableName, t1, rwTx1), true);
+ assertTxStateVacuumized(rwTxId2, tableName, partitionIdForTuple(node, tableName, t2, rwTx2), true);
+
+ Tuple keyRec = Tuple.create().set("key", 1L);
+
+ checkValueReadOnly(view, roTx1, keyRec, null);
+ checkValueReadOnly(view, roTx2, keyRec, t1);
+ checkValueReadOnly(view, roTx3, keyRec, t2);
+ }
+
+ private static Transaction beginReadOnlyTx(IgniteImpl node) {
+ return node.transactions().begin(new TransactionOptions().readOnly(true));
+ }
+
+ /**
+ * Check value using given read only tx.
+ *
+ * @param view Record view.
+ * @param readOnlyTx RO tx.
+ * @param keyTuple Key tuple.
+ * @param expected Expected tuple.
+ */
+ private static void checkValueReadOnly(RecordView<Tuple> view, Transaction readOnlyTx, Tuple keyTuple, @Nullable Tuple expected) {
+ Tuple actual = view.get(readOnlyTx, keyTuple);
+
+ if (expected == null) {
+ assertNull(actual);
+ } else {
+ assertEquals(expected.stringValue("val"), actual.stringValue("val"));
+ }
+ }
+
+ private void setTxResourceTtl(long ttl) {
+ CompletableFuture<Void> changeFuture = anyNode().clusterConfiguration().change(c ->
+ c.changeRoot(TransactionConfiguration.KEY).changeTxnResourceTtl(ttl));
+
+ assertThat(changeFuture, willCompleteSuccessfully());
+ }
+
+ /**
+ * To use it, set tx resource TTL should be set to {@code 0}, see {@link #setTxResourceTtl(long)}.
+ */
+ private void triggerVacuum() {
+ runningNodes().forEach(node -> {
+ log.info("Test: triggering vacuum manually on node: " + node.name());
+
+ CompletableFuture<Void> vacuumFut = node.txManager().vacuum();
+ assertThat(vacuumFut, willCompleteSuccessfully());
+ });
+ }
+
+ private boolean checkVolatileTxStateOnNodes(Set<String> nodeConsistentIds, UUID txId) {
+ return cluster.runningNodes()
+ .filter(n -> nodeConsistentIds.contains(n.name()))
+ .allMatch(n -> volatileTxState(n, txId) != null);
+ }
+
+ private boolean checkPersistentTxStateOnNodes(Set<String> nodeConsistentIds, UUID txId, int partId) {
+ return cluster.runningNodes()
+ .filter(n -> nodeConsistentIds.contains(n.name()))
+ .allMatch(n -> persistentTxState(n, txId, partId) != null);
+ }
+
+ /**
+ * Waits for persistent tx state to be replicated on the given nodes.
+ *
+ * @param nodeConsistentIds Node names.
+ * @param txId Transaction id.
+ * @param partId Commit partition id.
+ * @param timeMs Time to wait.
+ */
+ private void waitForTxStateReplication(Set<String> nodeConsistentIds, UUID txId, int partId, long timeMs)
+ throws InterruptedException {
+ assertTrue(waitForCondition(() -> checkPersistentTxStateOnNodes(nodeConsistentIds, txId, partId), timeMs));
+ }
+
+ /**
+ * Waits for vacuum of volatile (and if needed, persistent) state of the given tx on all nodes of the cluster.
+ *
+ * @param txId Transaction id.
+ * @param partId Commit partition id to check the persistent tx state storage of this partition.
+ * @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
+ * @param timeMs Time to wait.
+ */
+ private void waitForTxStateVacuum(UUID txId, int partId, boolean checkPersistent, long timeMs) throws InterruptedException {
+ waitForTxStateVacuum(cluster.runningNodes().map(IgniteImpl::name).collect(toSet()), txId, partId, checkPersistent, timeMs);
+ }
+
+ /**
+ * Waits for vacuum of volatile (and if needed, persistent) state of the given tx on the given nodes.
+ *
+ * @param nodeConsistentIds Node names.
+ * @param txId Transaction id.
+ * @param partId Commit partition id to check the persistent tx state storage of this partition.
+ * @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
+ * @param timeMs Time to wait.
+ */
+ private void waitForTxStateVacuum(Set<String> nodeConsistentIds, UUID txId, int partId, boolean checkPersistent, long timeMs)
+ throws InterruptedException {
+ boolean r = waitForCondition(() -> txStateIsAbsent(nodeConsistentIds, txId, TABLE_NAME, partId, checkPersistent, false), timeMs);
+
+ if (!r) {
+ logCurrentTxState(nodeConsistentIds, txId, TABLE_NAME, partId);
+ }
+
+ assertTrue(r);
+ }
+
+ /**
+ * Assert that volatile (and if needed, persistent) state of the given tx is vacuumized on all nodes of the cluster.
+ *
+ * @param txId Transaction id.
+ * @param partId Commit partition id to check the persistent tx state storage of this partition.
+ * @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
+ */
+ private void assertTxStateVacuumized(UUID txId, int partId, boolean checkPersistent) {
+ assertTxStateVacuumized(txId, TABLE_NAME, partId, checkPersistent);
+ }
+
+ /**
+ * Assert that volatile (and if needed, persistent) state of the given tx is vacuumized on all nodes of the cluster.
+ *
+ * @param txId Transaction id.
+ * @param tableName Table name of the table that commit partition belongs to.
+ * @param partId Commit partition id to check the persistent tx state storage of this partition.
+ * @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
+ */
+ private void assertTxStateVacuumized(UUID txId, String tableName, int partId, boolean checkPersistent) {
+ Set<String> allNodes = cluster.runningNodes().map(IgniteImpl::name).collect(toSet());
+
+ assertTxStateVacuumized(allNodes, txId, tableName, partId, checkPersistent);
+ }
+
+ /**
+ * Assert that volatile (and if needed, persistent) state of the given tx is vacuumized on the given nodes. Uses default
+ * {@link #TABLE_NAME}.
+ *
+ * @param nodeConsistentIds Node names.
+ * @param txId Transaction id.
+ * @param partId Commit partition id to check the persistent tx state storage of this partition.
+ * @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
+ */
+ private void assertTxStateVacuumized(Set<String> nodeConsistentIds, UUID txId, int partId, boolean checkPersistent) {
+ assertTxStateVacuumized(nodeConsistentIds, txId, TABLE_NAME, partId, checkPersistent);
+ }
+
+ /**
+ * Assert that volatile (and if needed, persistent) state of the given tx is vacuumized on the given nodes.
+ *
+ * @param nodeConsistentIds Node names.
+ * @param txId Transaction id.
+ * @param tableName Table name of the table that commit partition belongs to.
+ * @param partId Commit partition id to check the persistent tx state storage of this partition.
+ * @param checkPersistent Whether to wait for vacuum of persistent tx state as well.
+ */
+ private void assertTxStateVacuumized(Set<String> nodeConsistentIds, UUID txId, String tableName, int partId, boolean checkPersistent) {
+ boolean result = txStateIsAbsent(nodeConsistentIds, txId, tableName, partId, checkPersistent, true);
+
+ if (!result) {
+ triggerVacuum();
+
+ result = txStateIsAbsent(nodeConsistentIds, txId, tableName, partId, checkPersistent, true);
+
+ if (!result) {
+ logCurrentTxState(nodeConsistentIds, txId, tableName, partId);
+ }
+ }
+
+ assertTrue(result);
+ }
+
+ /**
+ * Checks whether the tx state is absent on all of the given nodes.
+ *
+ * @param nodeConsistentIds Set of node names to check.
+ * @param txId Transaction id.
+ * @param tableName Table name of the table that commit partition belongs to.
+ * @param partId Commit partition id.
+ * @param checkPersistent Whether the persistent state should be checked.
+ * @param checkCpPrimaryOnly If {@code} true, the persistent state should be checked only on the commit partition primary,
+ * otherwise it would be checked on every given node.
+ * @return {@code true} if tx state is absent, {@code false} otherwise. Call {@link #logCurrentTxState(Set, UUID, String, int)}
+ * for details.
+ */
+ private boolean txStateIsAbsent(
+ Set<String> nodeConsistentIds,
+ UUID txId,
+ String tableName,
+ int partId,
+ boolean checkPersistent,
+ boolean checkCpPrimaryOnly
+ ) {
+ boolean result = true;
+
+ String cpPrimaryId = null;
+
+ if (checkCpPrimaryOnly) {
+ IgniteImpl node = anyNode();
+
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId(node, tableName), partId);
+
+ CompletableFuture<ReplicaMeta> replicaFut = node.placementDriver().getPrimaryReplica(tablePartitionId, node.clock().now());
+ assertThat(replicaFut, willCompleteSuccessfully());
+
+ ReplicaMeta replicaMeta = replicaFut.join();
+ // The test doesn't make sense if there is no primary right now.
+ assertNotNull(replicaMeta);
+
+ cpPrimaryId = replicaMeta.getLeaseholderId();
+ }
+
+ for (Iterator<IgniteImpl> iterator = cluster.runningNodes().iterator(); iterator.hasNext();) {
+ IgniteImpl node = iterator.next();
+
+ if (!nodeConsistentIds.contains(node.name())) {
+ continue;
+ }
+
+ result = result
+ && volatileTxState(node, txId) == null
+ && (!checkPersistent || !node.id().equals(cpPrimaryId) || persistentTxState(node, txId, partId) == null);
+ }
+
+ return result;
+ }
+
+ private void logCurrentTxState(Set<String> nodeConsistentIds, UUID txId, String table, int partId) {
+ cluster.runningNodes().filter(n -> nodeConsistentIds.contains(n.name())).forEach(node -> {
+ log.info("Test: volatile state [tx={}, node={}, state={}].", txId, node.name(), volatileTxState(node, txId));
+ log.info("Test: persistent state [tx={}, node={}, state={}].", txId, node.name(), persistentTxState(node, txId, table, partId));
+ });
+ }
+
+ private IgniteImpl anyNode() {
+ return runningNodes().findFirst().orElseThrow();
+ }
+
+ @Nullable
+ private static TransactionMeta volatileTxState(IgniteImpl node, UUID txId) {
+ TxManagerImpl txManager = (TxManagerImpl) node.txManager();
+ return txManager.stateMeta(txId);
+ }
+
+ @Nullable
+ private TransactionMeta persistentTxState(IgniteImpl node, UUID txId, int partId) {
+ return persistentTxState(node, txId, TABLE_NAME, partId);
+ }
+
+ @Nullable
+ private TransactionMeta persistentTxState(IgniteImpl node, UUID txId, String tableName, int partId) {
+ TransactionMeta[] meta = new TransactionMeta[1];
+
+ Future f = txStateStorageExecutor.submit(() -> {
+ TxStateStorage txStateStorage = table(node, tableName).internalTable().txStateStorage().getTxStateStorage(partId);
+
+ assertNotNull(txStateStorage);
+
+ meta[0] = txStateStorage.get(txId);
+ });
+
+ try {
+ f.get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+
+ return meta[0];
+ }
+
+ private IgniteImpl findNode(Predicate<IgniteImpl> filter) {
+ return cluster.runningNodes()
+ .filter(n -> n != null && filter.test(n))
+ .findFirst()
+ .get();
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 94903d0..7ca6b96 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
import java.util.ArrayList;
@@ -47,6 +48,8 @@
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+import org.jetbrains.annotations.VisibleForTesting;
/**
* Table view implementation for binary objects.
@@ -403,6 +406,25 @@
}
/**
+ * Marshal a tuple to a row. Test-only public method.
+ *
+ * @param tx Transaction, if present.
+ * @param rec Tuple record.
+ * @return A future, with row as a result.
+ */
+ @TestOnly
+ @VisibleForTesting
+ public CompletableFuture<BinaryRowEx> tupleToBinaryRow(@Nullable Transaction tx, Tuple rec) {
+ Objects.requireNonNull(rec);
+
+ return doOperation(tx, schemaVersion -> {
+ Row row = marshal(rec, schemaVersion, false);
+
+ return completedFuture(row);
+ });
+ }
+
+ /**
* Returns table row tuple.
*
* @param row Binary row.
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 32aaa5c..56f1813 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -79,6 +79,7 @@
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.UpdateCommandResult;
+import org.apache.ignite.internal.tx.message.VacuumTxStatesCommand;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.TrackerClosedException;
@@ -220,6 +221,8 @@
handleBuildIndexCommand((BuildIndexCommand) command, commandIndex, commandTerm);
} else if (command instanceof PrimaryReplicaChangeCommand) {
handlePrimaryReplicaChangeCommand((PrimaryReplicaChangeCommand) command, commandIndex, commandTerm);
+ } else if (command instanceof VacuumTxStatesCommand) {
+ handleVacuumTxStatesCommand((VacuumTxStatesCommand) command, commandIndex, commandTerm);
} else {
assert false : "Command was not found [cmd=" + command + ']';
}
@@ -397,7 +400,10 @@
commandTerm
);
- markFinished(txId, cmd.commit(), cmd.commitTimestamp());
+ // Assume that we handle the finish command only on the commit partition.
+ TablePartitionId commitPartitionId = new TablePartitionId(storage.tableId(), storage.partitionId());
+
+ markFinished(txId, cmd.commit(), cmd.commitTimestamp(), commitPartitionId);
LOG.debug("Finish the transaction txId = {}, state = {}, txStateChangeRes = {}", txId, txMetaToSet, txStateChangeRes);
@@ -433,7 +439,7 @@
UUID txId = cmd.txId();
- markFinished(txId, cmd.commit(), cmd.commitTimestamp());
+ markFinished(txId, cmd.commit(), cmd.commitTimestamp(), null);
storageUpdateHandler.switchWriteIntents(
txId,
@@ -639,6 +645,24 @@
});
}
+ /**
+ * Handler for {@link VacuumTxStatesCommand}.
+ *
+ * @param cmd Command.
+ * @param commandIndex Command index.
+ * @param commandTerm Command term.
+ */
+ private void handleVacuumTxStatesCommand(VacuumTxStatesCommand cmd, long commandIndex, long commandTerm) {
+ // Skips the write command because the storage has already executed it.
+ if (commandIndex <= storage.lastAppliedIndex()) {
+ return;
+ }
+
+ for (UUID txId : cmd.txIds()) {
+ txStateStorage.remove(txId, commandIndex, commandTerm);
+ }
+ }
+
private static void onTxStateStorageCasFail(UUID txId, TxMeta txMetaBeforeCas, TxMeta txMetaToSet) {
String errorMsg = format("Failed to update tx state in the storage, transaction txId = {} because of inconsistent state,"
+ " expected state = {}, state to set = {}",
@@ -697,11 +721,11 @@
));
}
- private void markFinished(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp) {
+ private void markFinished(UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp, @Nullable TablePartitionId partId) {
txManager.updateTxMeta(txId, old -> new TxStateMeta(
commit ? COMMITTED : ABORTED,
old == null ? null : old.txCoordinatorId(),
- old == null ? null : old.commitPartitionId(),
+ old == null ? partId : old.commitPartitionId(),
commit ? commitTimestamp : null
));
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index b4589a0..4c5bfaa 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -176,8 +176,11 @@
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.message.TxCleanupRecoveryRequest;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.TxRecoveryMessage;
import org.apache.ignite.internal.tx.message.TxStateCommitPartitionRequest;
+import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
+import org.apache.ignite.internal.tx.message.VacuumTxStatesCommand;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicatedInfo;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -232,6 +235,9 @@
/** Factory for creating replica command messages. */
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
+ /** Factory for creating transaction command messages. */
+ private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
+
/** Replication retries limit. */
private static final int MAX_RETIES_ON_SAFE_TIME_REORDERING = 1000;
@@ -536,7 +542,7 @@
new HybridTimestampTracker(),
replicationGroupId,
false,
- // term is not required for the rollback.
+ // Enlistment consistency token is not required for the rollback, so it is 0L.
Map.of(replicationGroupId, new IgniteBiTuple<>(clusterNodeResolver.getById(senderId), 0L)),
txId
)
@@ -764,6 +770,8 @@
return processReadOnlyDirectMultiEntryAction((ReadOnlyDirectMultiRowReplicaRequest) request, opStartTsIfDirectRo);
} else if (request instanceof TxStateCommitPartitionRequest) {
return processTxStateCommitPartitionRequest((TxStateCommitPartitionRequest) request);
+ } else if (request instanceof VacuumTxStateReplicaRequest) {
+ return processVacuumTxStateReplicaRequest((VacuumTxStateReplicaRequest) request);
} else {
throw new UnsupportedReplicaRequestException(request.getClass());
}
@@ -3686,12 +3694,12 @@
// We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
return txManager.executeWriteIntentSwitchAsync(() -> inBusyLock(busyLock,
- () -> storageUpdateHandler.switchWriteIntents(
- txId,
- txState == COMMITTED,
- commitTimestamp,
- indexIdsAtRwTxBeginTs(txId)
- )
+ () -> storageUpdateHandler.switchWriteIntents(
+ txId,
+ txState == COMMITTED,
+ commitTimestamp,
+ indexIdsAtRwTxBeginTs(txId)
+ )
)).whenComplete((unused, e) -> {
if (e != null) {
LOG.warn("Failed to complete transaction cleanup command [txId=" + txId + ']', e);
@@ -4068,6 +4076,14 @@
return source == null ? null : new BinaryRowUpgrader(schemaRegistry, targetSchemaVersion).upgrade(source);
}
+ private CompletableFuture<?> processVacuumTxStateReplicaRequest(VacuumTxStateReplicaRequest request) {
+ VacuumTxStatesCommand cmd = TX_MESSAGES_FACTORY.vacuumTxStatesCommand()
+ .txIds(request.transactionIds())
+ .build();
+
+ return raftClient.run(cmd);
+ }
+
/**
* Operation unique identifier.
*/
diff --git a/modules/transactions/build.gradle b/modules/transactions/build.gradle
index 371e5e0..ac8c827 100644
--- a/modules/transactions/build.gradle
+++ b/modules/transactions/build.gradle
@@ -55,12 +55,19 @@
integrationTestImplementation project(':ignite-api')
integrationTestImplementation(testFixtures(project(':ignite-core')))
+ integrationTestImplementation(testFixtures(project(':ignite-transactions')))
integrationTestImplementation(testFixtures(project(':ignite-runner')))
testFixturesImplementation project(':ignite-configuration')
testFixturesImplementation project(':ignite-core')
testFixturesImplementation project(':ignite-api')
testFixturesImplementation project(':ignite-schema')
+ testFixturesImplementation project(':ignite-runner')
+ testFixturesImplementation project(':ignite-affinity')
+ testFixturesImplementation project(':ignite-metastorage-api')
+ testFixturesImplementation project(':ignite-placement-driver-api')
+ testFixturesImplementation project(':ignite-distribution-zones')
+ testFixturesImplementation project(':ignite-table')
testFixturesImplementation(testFixtures(project(':ignite-core')))
testFixturesImplementation libs.jetbrains.annotations
testFixturesImplementation libs.mockito.junit
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index e01e279..badc077 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -129,7 +129,7 @@
* should pass its own tracker to provide linearizability between read-write and read-only transactions started by this client.
* @param commitPartition Partition to store a transaction state.
* @param commit {@code true} if a commit requested.
- * @param enlistedGroups Enlisted partition groups with consistency token.
+ * @param enlistedGroups Enlisted partition groups with consistency tokens.
* @param txId Transaction id.
*/
CompletableFuture<Void> finish(
@@ -185,8 +185,12 @@
*/
CompletableFuture<Void> cleanup(String node, UUID txId);
- /** Locally vacuums no longer needed transactional resources, like txnState both persistent and volatile. */
- void vacuum();
+ /**
+ * Locally vacuums no longer needed transactional resources, like txnState both persistent and volatile.
+ *
+ * @return Vacuum complete future.
+ */
+ CompletableFuture<Void> vacuum();
/**
* Returns a number of finished transactions.
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
index 686e86f..f489d4a 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java
@@ -43,6 +43,7 @@
private final Long initialVacuumObservationTimestamp;
+ @Nullable
private final Long cleanupCompletionTimestamp;
/**
@@ -69,6 +70,7 @@
* @param txCoordinatorId Transaction coordinator id.
* @param commitPartitionId Commit partition replication group id.
* @param commitTimestamp Commit timestamp.
+ * @param initialVacuumObservationTimestamp Initial vacuum observation timestamp.
*/
public TxStateMeta(
TxState txState,
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
new file mode 100644
index 0000000..52beab9
--- /dev/null
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Implements the logic of persistent tx states vacuum.
+ */
+public class PersistentTxStateVacuumizer {
+ private static final IgniteLogger LOG = Loggers.forClass(PersistentTxStateVacuumizer.class);
+
+ private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
+
+ private final ReplicaService replicaService;
+
+ private final ClusterNode localNode;
+
+ private final ClockService clockService;
+
+ private final PlacementDriver placementDriver;
+
+ /**
+ * Constructor.
+ *
+ * @param replicaService Replica service.
+ * @param localNode Local node.
+ * @param clockService Clock service.
+ * @param placementDriver Placement driver.
+ */
+ public PersistentTxStateVacuumizer(
+ ReplicaService replicaService,
+ ClusterNode localNode,
+ ClockService clockService,
+ PlacementDriver placementDriver
+ ) {
+ this.replicaService = replicaService;
+ this.localNode = localNode;
+ this.clockService = clockService;
+ this.placementDriver = placementDriver;
+ }
+
+ /**
+ * Vacuum persistent tx states.
+ *
+ * @param txIds Transaction ids to vacuum; map of commit partition ids to sets of tx ids.
+ * @return A future, result is the set of successfully vacuumized txn states.
+ */
+ public CompletableFuture<Set<UUID>> vacuumPersistentTxStates(Map<TablePartitionId, Set<UUID>> txIds) {
+ Set<UUID> successful = ConcurrentHashMap.newKeySet();
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+ HybridTimestamp now = clockService.now();
+
+ txIds.forEach((commitPartitionId, txs) -> {
+ CompletableFuture<?> future = placementDriver.getPrimaryReplica(commitPartitionId, now)
+ .thenCompose(replicaMeta -> {
+ // If the primary replica is absent or is not located on the local node, this means that the primary either is
+ // on another node or would be re-elected on local one; then the volatile state (as well as cleanup completion
+ // timestamp) would be updated there, and then this operation would be called from there.
+ // Also, we are going to send the vacuum request only to the local node.
+ if (replicaMeta != null && localNode.id().equals(replicaMeta.getLeaseholderId())) {
+ VacuumTxStateReplicaRequest request = TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest()
+ .enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
+ .groupId(commitPartitionId)
+ .transactionIds(txs)
+ .build();
+
+ return replicaService.invoke(localNode, request).whenComplete((v, e) -> {
+ if (e == null) {
+ successful.addAll(txs);
+ // We can log the exceptions without further handling because failed requests' txns are not added
+ // to the set of successful and will be retried. PrimaryReplicaMissException can be considered as
+ // a part of regular flow and doesn't need to be logged.
+ } else if (unwrapCause(e) instanceof PrimaryReplicaMissException) {
+ LOG.debug("Failed to vacuum tx states from the persistent storage.", e);
+ } else {
+ LOG.warn("Failed to vacuum tx states from the persistent storage.", e);
+ }
+ });
+ } else {
+ successful.addAll(txs);
+
+ return nullCompletedFuture();
+ }
+ });
+
+ futures.add(future);
+ });
+
+ return allOf(futures.toArray(new CompletableFuture[0]))
+ .handle((unused, unusedEx) -> successful);
+ }
+}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
index c4d2c1a..b355ef8 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java
@@ -106,19 +106,21 @@
@Override
public CompletableFuture<Void> startAsync() {
- resourceVacuumExecutor.scheduleAtFixedRate(
- this::runVacuumOperations,
- 0,
- resourceVacuumIntervalMilliseconds,
- TimeUnit.MILLISECONDS
- );
+ if (resourceVacuumIntervalMilliseconds > 0) {
+ resourceVacuumExecutor.scheduleAtFixedRate(
+ this::runVacuumOperations,
+ 0,
+ resourceVacuumIntervalMilliseconds,
+ TimeUnit.MILLISECONDS
+ );
- resourceVacuumExecutor.scheduleAtFixedRate(
- finishedReadOnlyTransactionTracker::broadcastClosedTransactions,
- 0,
- resourceVacuumIntervalMilliseconds,
- TimeUnit.MILLISECONDS
- );
+ resourceVacuumExecutor.scheduleAtFixedRate(
+ finishedReadOnlyTransactionTracker::broadcastClosedTransactions,
+ 0,
+ resourceVacuumIntervalMilliseconds,
+ TimeUnit.MILLISECONDS
+ );
+ }
finishedTransactionBatchRequestHandler.start();
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index fedacac..a8c8414 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -187,7 +187,7 @@
private final MessagingService messagingService;
/** Local node network identity. This id is available only after the network has started. */
- private String localNodeId;
+ private volatile String localNodeId;
/** Server cleanup processor. */
private final TxCleanupRequestHandler txCleanupRequestHandler;
@@ -211,6 +211,10 @@
private final TransactionInflights transactionInflights;
+ private final ReplicaService replicaService;
+
+ private volatile PersistentTxStateVacuumizer persistentTxStateVacuumizer;
+
/**
* Test-only constructor.
*
@@ -310,6 +314,7 @@
this.partitionOperationsExecutor = partitionOperationsExecutor;
this.transactionInflights = transactionInflights;
this.lowWatermark = lowWatermark;
+ this.replicaService = replicaService;
placementDriverHelper = new PlacementDriverHelper(placementDriver, clockService);
@@ -643,7 +648,9 @@
result.transactionState(),
old == null ? null : old.txCoordinatorId(),
commitPartition,
- result.commitTimestamp()
+ result.commitTimestamp(),
+ old == null ? null : old.initialVacuumObservationTimestamp(),
+ old == null ? null : old.cleanupCompletionTimestamp()
)
);
@@ -707,7 +714,9 @@
txResult.transactionState(),
localNodeId,
old == null ? null : old.commitPartitionId(),
- txResult.commitTimestamp()
+ txResult.commitTimestamp(),
+ old == null ? null : old.initialVacuumObservationTimestamp(),
+ old == null ? null : old.cleanupCompletionTimestamp()
));
assert isFinalState(updatedMeta.txState()) :
@@ -754,6 +763,9 @@
messagingService.addMessageHandler(ReplicaMessageGroup.class, this);
+ persistentTxStateVacuumizer = new PersistentTxStateVacuumizer(replicaService, topologyService.localMember(), clockService,
+ placementDriver);
+
txStateVolatileStorage.start();
orphanDetector.start(txStateVolatileStorage, txConfig.abandonedCheckTs());
@@ -833,10 +845,15 @@
}
@Override
- public void vacuum() {
+ public CompletableFuture<Void> vacuum() {
+ if (persistentTxStateVacuumizer == null) {
+ return nullCompletedFuture(); // Not started yet.
+ }
+
long vacuumObservationTimestamp = System.currentTimeMillis();
- txStateVolatileStorage.vacuum(vacuumObservationTimestamp, txConfig.txnResourceTtl().value());
+ return txStateVolatileStorage.vacuum(vacuumObservationTimestamp, txConfig.txnResourceTtl().value(),
+ persistentTxStateVacuumizer::vacuumPersistentTxStates);
}
@Override
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
index 6997173..d0e65a9 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java
@@ -17,16 +17,24 @@
package org.apache.ignite.internal.tx.impl;
+import static java.lang.Math.max;
import static org.apache.ignite.internal.tx.TxState.PENDING;
import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.jetbrains.annotations.Nullable;
@@ -36,6 +44,7 @@
*/
public class VolatileTxStateMetaStorage {
private static final IgniteLogger LOG = Loggers.forClass(VolatileTxStateMetaStorage.class);
+
/** The local map for tx states. */
private ConcurrentHashMap<UUID, TxStateMeta> txStateMap;
@@ -117,8 +126,15 @@
*
* @param vacuumObservationTimestamp Timestamp of the vacuum attempt.
* @param txnResourceTtl Transactional resource time to live in milliseconds.
+ * @param persistentVacuumOp Persistent vacuum operation. Accepts the map of commit partition ids to set of
+ * tx ids, returns a future with set of successfully vacuumized tx ids.
+ * @return Vacuum complete future.
*/
- public void vacuum(long vacuumObservationTimestamp, long txnResourceTtl) {
+ public CompletableFuture<Void> vacuum(
+ long vacuumObservationTimestamp,
+ long txnResourceTtl,
+ Function<Map<TablePartitionId, Set<UUID>>, CompletableFuture<Set<UUID>>> persistentVacuumOp
+ ) {
LOG.info("Vacuum started [vacuumObservationTimestamp={}, txnResourceTtl={}].", vacuumObservationTimestamp, txnResourceTtl);
AtomicInteger vacuumizedTxnsCount = new AtomicInteger(0);
@@ -126,27 +142,44 @@
AtomicInteger alreadyMarkedTxnsCount = new AtomicInteger(0);
AtomicInteger skippedForFurtherProcessingUnfinishedTxnsCount = new AtomicInteger(0);
+ Map<TablePartitionId, Set<UUID>> txIds = new HashMap<>();
+ Map<UUID, Long> cleanupCompletionTimestamps = new HashMap<>();
+
txStateMap.forEach((txId, meta) -> {
txStateMap.computeIfPresent(txId, (txId0, meta0) -> {
if (TxState.isFinalState(meta0.txState())) {
- if (txnResourceTtl == 0) {
- vacuumizedTxnsCount.incrementAndGet();
- return null;
- } else if (meta0.initialVacuumObservationTimestamp() == null) {
+ Long initialVacuumObservationTimestamp = meta0.initialVacuumObservationTimestamp();
+
+ if (initialVacuumObservationTimestamp == null && txnResourceTtl > 0) {
markedAsInitiallyDetectedTxnsCount.incrementAndGet();
- return new TxStateMeta(
- meta0.txState(),
- meta0.txCoordinatorId(),
- meta0.commitPartitionId(),
- meta0.commitTimestamp(),
- vacuumObservationTimestamp
- );
- } else if (meta0.initialVacuumObservationTimestamp() + txnResourceTtl < vacuumObservationTimestamp) {
- vacuumizedTxnsCount.incrementAndGet();
- return null;
+
+ return markInitialVacuumObservationTimestamp(meta0, vacuumObservationTimestamp);
} else {
- alreadyMarkedTxnsCount.incrementAndGet();
- return meta0;
+ Long cleanupCompletionTimestamp = meta0.cleanupCompletionTimestamp();
+
+ boolean shouldBeVacuumized = shouldBeVacuumized(initialVacuumObservationTimestamp,
+ cleanupCompletionTimestamp, txnResourceTtl, vacuumObservationTimestamp);
+
+ if (shouldBeVacuumized) {
+ if (meta0.commitPartitionId() == null) {
+ vacuumizedTxnsCount.incrementAndGet();
+
+ return null;
+ } else {
+ Set<UUID> ids = txIds.computeIfAbsent(meta0.commitPartitionId(), k -> new HashSet<>());
+ ids.add(txId);
+
+ if (cleanupCompletionTimestamp != null) {
+ cleanupCompletionTimestamps.put(txId, cleanupCompletionTimestamp);
+ }
+
+ return meta0;
+ }
+ } else {
+ alreadyMarkedTxnsCount.incrementAndGet();
+
+ return meta0;
+ }
}
} else {
skippedForFurtherProcessingUnfinishedTxnsCount.incrementAndGet();
@@ -155,14 +188,71 @@
});
});
- LOG.info("Vacuum finished [vacuumObservationTimestamp={}, txnResourceTtl={}, vacuumizedTxnsCount={},"
- + " markedAsInitiallyDetectedTxnsCount={}, alreadyMarkedTxnsCount={}, skippedForFurtherProcessingUnfinishedTxnsCount={}].",
+ return persistentVacuumOp.apply(txIds)
+ .thenAccept(successful -> {
+ for (UUID txId : successful) {
+ txStateMap.compute(txId, (k, v) -> {
+ if (v == null) {
+ return null;
+ } else {
+ Long cleanupCompletionTs = cleanupCompletionTimestamps.get(txId);
+
+ TxStateMeta newMeta = (Objects.equals(cleanupCompletionTs, v.cleanupCompletionTimestamp())) ? null : v;
+
+ if (newMeta == null) {
+ vacuumizedTxnsCount.incrementAndGet();
+ }
+
+ return newMeta;
+ }
+ });
+ }
+
+ LOG.info("Vacuum finished [vacuumObservationTimestamp={}, "
+ + "txnResourceTtl={}, "
+ + "vacuumizedTxnsCount={}, "
+ + "vacuumizedPersistentTxnStatesCount={}, "
+ + "markedAsInitiallyDetectedTxnsCount={}, "
+ + "alreadyMarkedTxnsCount={}, "
+ + "skippedForFurtherProcessingUnfinishedTxnsCount={}].",
+ vacuumObservationTimestamp,
+ txnResourceTtl,
+ vacuumizedTxnsCount,
+ successful.size(),
+ markedAsInitiallyDetectedTxnsCount,
+ alreadyMarkedTxnsCount,
+ skippedForFurtherProcessingUnfinishedTxnsCount
+ );
+ });
+ }
+
+ private static TxStateMeta markInitialVacuumObservationTimestamp(TxStateMeta meta, long vacuumObservationTimestamp) {
+ return new TxStateMeta(
+ meta.txState(),
+ meta.txCoordinatorId(),
+ meta.commitPartitionId(),
+ meta.commitTimestamp(),
vacuumObservationTimestamp,
- txnResourceTtl,
- vacuumizedTxnsCount,
- markedAsInitiallyDetectedTxnsCount,
- alreadyMarkedTxnsCount,
- skippedForFurtherProcessingUnfinishedTxnsCount
+ meta.cleanupCompletionTimestamp()
);
}
+
+ private static boolean shouldBeVacuumized(
+ @Nullable Long initialVacuumObservationTimestamp,
+ @Nullable Long cleanupCompletionTimestamp,
+ long txnResourceTtl,
+ long vacuumObservationTimestamp) {
+ if (txnResourceTtl == 0) {
+ return true;
+ }
+
+ assert initialVacuumObservationTimestamp != null : "initialVacuumObservationTimestamp should have been set if txnResourceTtl > 0 "
+ + "[txnResourceTtl=" + txnResourceTtl + "].";
+
+ if (cleanupCompletionTimestamp == null) {
+ return initialVacuumObservationTimestamp + txnResourceTtl < vacuumObservationTimestamp;
+ } else {
+ return max(cleanupCompletionTimestamp, initialVacuumObservationTimestamp) + txnResourceTtl < vacuumObservationTimestamp;
+ }
+ }
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
index ed93c53..68fd6cc 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxMessageGroup.java
@@ -84,4 +84,14 @@
*/
public static final short TX_CLEANUP_RECOVERY = 11;
+ /**
+ * Message type for {@link VacuumTxStateReplicaRequest}.
+ */
+ public static final short VACUUM_TX_STATE_REPLICA_REQUEST = 12;
+
+ /**
+ * Message type for {@link VacuumTxStatesCommand}.
+ */
+ public static final short VACUUM_TX_STATE_COMMAND = 13;
+
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/VacuumTxStateReplicaRequest.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/VacuumTxStateReplicaRequest.java
new file mode 100644
index 0000000..8c287db
--- /dev/null
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/VacuumTxStateReplicaRequest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import static org.apache.ignite.internal.tx.message.TxMessageGroup.VACUUM_TX_STATE_REPLICA_REQUEST;
+
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.replicator.message.PrimaryReplicaRequest;
+
+/**
+ * Request that is sent to vacuumize the transaction states.
+ */
+@Transferable(VACUUM_TX_STATE_REPLICA_REQUEST)
+public interface VacuumTxStateReplicaRequest extends PrimaryReplicaRequest {
+ Set<UUID> transactionIds();
+}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/VacuumTxStatesCommand.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/VacuumTxStatesCommand.java
new file mode 100644
index 0000000..3e6d10f
--- /dev/null
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/VacuumTxStatesCommand.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import static org.apache.ignite.internal.tx.message.TxMessageGroup.VACUUM_TX_STATE_COMMAND;
+
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.raft.WriteCommand;
+
+/**
+ * Command that vacuumizes the transaction states.
+ */
+@Transferable(VACUUM_TX_STATE_COMMAND)
+public interface VacuumTxStatesCommand extends WriteCommand {
+ Set<UUID> txIds();
+}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java
index 9e8beb5..0294826 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/ThreadAssertingTxStateStorage.java
@@ -67,10 +67,10 @@
}
@Override
- public void remove(UUID txId) {
+ public void remove(UUID txId, long commandIndex, long commandTerm) {
assertThreadAllowsToWrite();
- storage.remove(txId);
+ storage.remove(txId, commandIndex, commandTerm);
}
@Override
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
index 42ad627..81c42d7 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/TxStateStorage.java
@@ -74,9 +74,11 @@
* Removes the tx meta from the storage.
*
* @param txId Tx id.
+ * @param commandIndex New value for {@link #lastAppliedIndex()}.
+ * @param commandTerm New value for {@link #lastAppliedTerm()}.
* @throws IgniteInternalException with {@link Transactions#TX_STATE_STORAGE_ERR} error code in case when the operation has failed.
*/
- void remove(UUID txId);
+ void remove(UUID txId, long commandIndex, long commandTerm);
/**
* Creates a cursor to scan all data in the storage.
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
index 563f286..53439cc0 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java
@@ -219,12 +219,21 @@
}
@Override
- public void remove(UUID txId) {
+ public void remove(UUID txId, long commandIndex, long commandTerm) {
busy(() -> {
- try {
+ try (WriteBatch writeBatch = new WriteBatch()) {
throwExceptionIfStorageInProgressOfRebalance();
- sharedStorage.db().delete(txIdToKey(txId));
+ writeBatch.delete(txIdToKey(txId));
+
+ // If the store is in the process of rebalancing, then there is no need to update lastAppliedIndex and lastAppliedTerm.
+ // This is necessary to prevent a situation where, in the middle of the rebalance, the node will be restarted and we will
+ // have non-consistent storage. They will be updated by either #abortRebalance() or #finishRebalance(long, long).
+ if (state.get() != StorageState.REBALANCE) {
+ updateLastApplied(writeBatch, commandIndex, commandTerm);
+ }
+
+ sharedStorage.db().write(sharedStorage.writeOptions, writeBatch);
return null;
} catch (RocksDBException e) {
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
index 6015ee9..0d6357b 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/AbstractTxStateStorageTest.java
@@ -103,7 +103,7 @@
for (int i = 0; i < 100; i++) {
if (i % 2 == 0) {
- storage.remove(txIds.get(i));
+ storage.remove(txIds.get(i), i, 1);
}
}
@@ -446,7 +446,7 @@
assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, () -> storage.lastApplied(100, 500));
assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, () -> storage.get(UUID.randomUUID()));
- assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, () -> storage.remove(UUID.randomUUID()));
+ assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, () -> storage.remove(UUID.randomUUID(), 1, 1));
assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, storage::scan);
}
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
index 689bd0b..047dd1c 100644
--- a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java
@@ -99,10 +99,15 @@
}
@Override
- public void remove(UUID txId) {
+ public void remove(UUID txId, long commandIndex, long commandTerm) {
checkStorageClosedOrInProgressOfRebalance();
storage.remove(txId);
+
+ if (rebalanceFutureReference.get() == null) {
+ lastAppliedIndex = commandIndex;
+ lastAppliedTerm = commandTerm;
+ }
}
@Override
diff --git a/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
new file mode 100644
index 0000000..12aa397
--- /dev/null
+++ b/modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/ItTransactionTestUtils.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.test;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toSet;
+import static org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
+import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.Assignments;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.lang.ByteArray;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.table.RecordBinaryViewImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
+import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Test utils for transaction integration tests.
+ */
+public class ItTransactionTestUtils {
+ /**
+ * Get the names of the nodes that are assignments of the given partition.
+ *
+ * @param node Any node in the cluster.
+ * @param grpId Group id.
+ * @return Node names.
+ */
+ public static Set<String> partitionAssignment(IgniteImpl node, TablePartitionId grpId) {
+ MetaStorageManager metaStorageManager = node.metaStorageManager();
+
+ ByteArray stableAssignmentKey = stablePartAssignmentsKey(grpId);
+
+ CompletableFuture<Entry> assignmentEntryFut = metaStorageManager.get(stableAssignmentKey);
+
+ assertThat(assignmentEntryFut, willCompleteSuccessfully());
+
+ Entry e = assignmentEntryFut.join();
+
+ assertNotNull(e);
+ assertFalse(e.empty());
+ assertFalse(e.tombstone());
+
+ Set<Assignment> a = requireNonNull(Assignments.fromBytes(e.value())).nodes();
+
+ return a.stream().filter(Assignment::isPeer).map(Assignment::consistentId).collect(toSet());
+ }
+
+ /**
+ * Calculate the partition id on which the given tuple would be placed.
+ *
+ * @param node Any node in the cluster.
+ * @param tableName Table name.
+ * @param tuple Data tuple.
+ * @param tx Transaction, if present.
+ * @return Partition id.
+ */
+ public static int partitionIdForTuple(IgniteImpl node, String tableName, Tuple tuple, @Nullable Transaction tx) {
+ TableImpl table = table(node, tableName);
+ RecordBinaryViewImpl view = unwrapRecordBinaryViewImpl(table.recordView());
+
+ CompletableFuture<BinaryRowEx> rowFut = view.tupleToBinaryRow(tx, tuple);
+ assertThat(rowFut, willCompleteSuccessfully());
+ BinaryRowEx row = rowFut.join();
+
+ return table.internalTable().partitionId(row);
+ }
+
+ /**
+ * Generates some tuple that would be placed in the partition that is hosted on the given node in the cluster.
+ *
+ * @param node Node that should host the result tuple.
+ * @param tableName Table name.
+ * @param tx Transaction, if present.
+ * @param initialTuple Initial tuple, for calculation.
+ * @param nextTuple This function will be used to generate new tuples in order to find suitable one.
+ * @param primary Whether the given node should be the primary node.
+ * @return Tuple that would be placed on the given node.
+ */
+ public static Tuple findTupleToBeHostedOnNode(
+ IgniteImpl node,
+ String tableName,
+ @Nullable Transaction tx,
+ Tuple initialTuple,
+ Function<Tuple, Tuple> nextTuple,
+ boolean primary
+ ) {
+ Tuple t = initialTuple;
+ int tableId = tableId(node, tableName);
+
+ int maxAttempts = 100;
+
+ while (maxAttempts >= 0) {
+ int partId = partitionIdForTuple(node, tableName, t, tx);
+
+ TablePartitionId grpId = new TablePartitionId(tableId, partId);
+
+ if (primary) {
+ ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, grpId);
+
+ if (node.id().equals(replicaMeta.getLeaseholderId())) {
+ return t;
+ }
+ } else {
+ Set<String> assignments = partitionAssignment(node, grpId);
+
+ if (assignments.contains(node.name())) {
+ return t;
+ }
+ }
+
+ t = nextTuple.apply(t);
+
+ maxAttempts--;
+ }
+
+ throw new AssertionError("Failed to find a suitable tuple.");
+ }
+
+ /**
+ * Returns table instance.
+ *
+ * @param node Ignite node.
+ * @param tableName Table name.
+ * @return Table instance.
+ */
+ public static TableImpl table(IgniteImpl node, String tableName) {
+ return unwrapTableImpl(node.tables().table(tableName));
+ }
+
+ /**
+ * Returns the table id.
+ *
+ * @param node Any node in the cluster.
+ * @param tableName Table name.
+ * @return Table id.
+ */
+ public static int tableId(IgniteImpl node, String tableName) {
+ return table(node, tableName).tableId();
+ }
+
+ /**
+ * Transaction id.
+ *
+ * @param tx Transaction.
+ * @return Transaction id.
+ */
+ public static UUID txId(Transaction tx) {
+ return ((ReadWriteTransactionImpl) unwrapIgniteTransaction(tx)).id();
+ }
+
+ /**
+ * Waits for the primary replica appearance for the given replication group and returns it.
+ *
+ * @param node Any node in the cluster.
+ * @param replicationGrpId Replication group.
+ * @return Primary replica meta.
+ */
+ public static ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node, ReplicationGroupId replicationGrpId) {
+ CompletableFuture<ReplicaMeta> primaryReplicaFut = node.placementDriver().awaitPrimaryReplica(
+ replicationGrpId,
+ node.clock().now(),
+ 10,
+ SECONDS
+ );
+
+ assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+ return primaryReplicaFut.join();
+ }
+
+ /**
+ * Unwraps {@link RecordBinaryViewImpl} from a {@link RecordView}.
+ *
+ * @param view View to unwrap.
+ */
+ private static RecordBinaryViewImpl unwrapRecordBinaryViewImpl(RecordView view) {
+ return Wrappers.unwrap(view, RecordBinaryViewImpl.class);
+ }
+
+ /**
+ * Unwraps {@link TableImpl} from a {@link Table}.
+ *
+ * @param table Table to unwrap.
+ */
+ private static TableImpl unwrapTableImpl(Table table) {
+ return Wrappers.unwrap(table, TableImpl.class);
+ }
+
+ /**
+ * Unwraps {@link Transaction} from an {@link Transaction}.
+ *
+ * @param tx Object to unwrap.
+ */
+ private static Transaction unwrapIgniteTransaction(Transaction tx) {
+ return Wrappers.unwrap(tx, Transaction.class);
+ }
+}