blob: a36a3886854269935e561c62410b77bf8dcb830b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.DefaultMessagingService;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.MismatchingTransactionOutcomeException;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.message.CleanupReplicatedInfo;
import org.apache.ignite.internal.tx.message.TxCleanupMessage;
import org.apache.ignite.internal.tx.message.TxCleanupMessageErrorResponse;
import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse;
import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
/**
* Test resending the finish request from the coordinator when the previous attempts failed for any reason.
*/
public class ItDurableFinishTest extends ClusterPerTestIntegrationTest {
private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
private static final String TABLE_NAME = "TEST_FINISH";
private final Collection<CompletableFuture<?>> futures = new ArrayList<>();
private void createTestTableWith3Replicas() {
String zoneSql = "create zone test_zone with partitions=1, replicas=3, storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'";
String sql = "create table " + TABLE_NAME + " (key int primary key, val varchar(20))"
+ " with primary_zone='TEST_ZONE'";
cluster.doInSession(0, session -> {
executeUpdate(zoneSql, session);
executeUpdate(sql, session);
});
}
private Context prepareTransactionData() throws ExecutionException, InterruptedException {
createTestTableWith3Replicas();
var tblReplicationGrp = defaultTablePartitionId(node(0));
CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
node(0).clock().now(),
AWAIT_PRIMARY_REPLICA_TIMEOUT,
SECONDS
);
assertThat(primaryReplicaFut, willCompleteSuccessfully());
String primary = primaryReplicaFut.get().getLeaseholder();
int primaryIndex = nodeIndex(primary);
IgniteImpl primaryNode = node(primaryIndex);
IgniteImpl coordinatorNode = node((primaryIndex + 1) % 3);
InternalTransaction rwTx = (InternalTransaction) coordinatorNode.transactions().begin();
Tuple keyTpl = Tuple.create().set("key", 42);
Tuple tpl = Tuple.create().set("key", 42).set("val", "val 42");
Table publicTable = coordinatorNode.tables().table(TABLE_NAME);
publicTable.recordView().upsert(rwTx, tpl);
return new Context(primaryNode, coordinatorNode, publicTable, rwTx, keyTpl);
}
private TablePartitionId defaultTablePartitionId(IgniteImpl node) {
TableViewInternal table = unwrapTableViewInternal(node.tables().table(TABLE_NAME));
return new TablePartitionId(table.tableId(), 0);
}
private void commitAndValidate(InternalTransaction rwTx, Table publicTable, Tuple keyTpl) {
rwTx.commit();
Tuple storedData = publicTable.recordView().get(null, keyTpl);
assertNotNull(storedData);
assertEquals("val 42", storedData.stringValue("val"));
}
private DefaultMessagingService messaging(IgniteImpl node) {
ClusterService coordinatorService = IgniteTestUtils.getFieldValue(node, IgniteImpl.class, "clusterSvc");
return (DefaultMessagingService) coordinatorService.messagingService();
}
@Test
void testChangedPrimaryOnFinish() throws Exception {
Context context = prepareTransactionData();
// Drop all finish messages to the old primary, pick a new one.
// The coordinator will get a response from the new primary.
CompletableFuture<Void> transferPrimaryFuture = changePrimaryOnFinish(context.coordinatorNode);
// The primary is changed after calculating the outcome and commit timestamp.
// The new primary successfully commits such transaction.
commitAndValidate(context.tx, context.publicTable, context.keyTpl);
assertThat(transferPrimaryFuture, willCompleteSuccessfully());
}
private CompletableFuture<Void> changePrimaryOnFinish(IgniteImpl coordinatorNode) {
DefaultMessagingService coordinatorMessaging = messaging(coordinatorNode);
AtomicBoolean dropMessage = new AtomicBoolean(true);
CountDownLatch commitStartedLatch = new CountDownLatch(1);
// Make sure the finish message is prepared, i.e. the outcome, commit timestamp, primary node, etc. have been set,
// and then temporarily block the messaging to simulate network issues.
coordinatorMessaging.dropMessages((s, networkMessage) -> {
if (networkMessage instanceof TxFinishReplicaRequest && dropMessage.get()) {
logger().info("Dropping: {}.", networkMessage);
commitStartedLatch.countDown();
return true;
}
return false;
});
// Now change the commit primary and run tx.commit().
// The transfer is performed asynchronously because the message processing block we added earlier
// will run in the current thread.
return CompletableFuture.runAsync(() -> {
try {
commitStartedLatch.await();
logger().info("Start transferring primary.");
NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), defaultTablePartitionId(node(0)));
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
logger().info("Finished transferring primary.");
dropMessage.set(false);
}
});
}
@Test
void testCommitOverCommit() throws ExecutionException, InterruptedException {
Context context = prepareTransactionData();
// The coordinator does not get the response from the first commit message, but it anyway reaches the primary and succeeds.
// The coordinator has to retry the finish request and survive a COMMIT over COMMIT.
coordinatorDropsFirstFinishMessage(context.coordinatorNode);
commitAndValidate(context.tx, context.publicTable, context.keyTpl);
for (CompletableFuture<?> future : futures) {
assertThat(future, willCompleteSuccessfully());
}
}
private void coordinatorDropsFirstFinishMessage(IgniteImpl coordinatorNode) {
DefaultMessagingService coordinatorMessaging = messaging(coordinatorNode);
// Make sure the finish message is prepared, i.e. the outcome, commit timestamp, primary node, etc. have been set,
// and then temporarily block the messaging to simulate network issues.
AtomicBoolean messageHandled = new AtomicBoolean();
coordinatorMessaging.dropMessages((s, networkMessage) -> {
if (networkMessage instanceof TxFinishReplicaRequest && !messageHandled.get()) {
messageHandled.set(true);
logger().info("Drop message [msg={}].", networkMessage);
// Here we act as a man-in-the-middle: the finish request is intercepted and further routed to
// the commit partition as normal. The coordinator instead fails with a timeout (see DefaultMessagingService.invoke0)
// and has to retry the finish request according to the durable finish logic.
// The test checks that the second coordinator attempt to commit succeeds
// and the server is able to apply a COMMIT over COMMIT without exceptions.
CompletableFuture<NetworkMessage> finish = coordinatorMessaging.invoke(s, networkMessage, 3000);
futures.add(finish);
return true;
}
return false;
});
}
@Test
void testChangePrimaryOnCleanup() throws ExecutionException, InterruptedException {
node(0).clusterConfiguration().getConfiguration(ReplicationConfiguration.KEY).change(replicationChange ->
replicationChange.changeRpcTimeout(3000));
Context context = prepareTransactionData();
// The transaction is committed but the primary expires right before applying the cleanup message.
CompletableFuture<Void> transferPrimaryFuture = changePrimaryOnCleanup(context.primaryNode);
commitAndValidate(context.tx, context.publicTable, context.keyTpl);
assertThat(transferPrimaryFuture, willCompleteSuccessfully());
}
private CompletableFuture<Void> changePrimaryOnCleanup(IgniteImpl primaryNode) {
DefaultMessagingService primaryMessaging = messaging(primaryNode);
AtomicBoolean dropMessage = new AtomicBoolean(true);
CountDownLatch cleanupStartedLatch = new CountDownLatch(1);
// Make sure the finish message is prepared, i.e. the outcome, commit timestamp, primary node, etc. have been set,
// and then temporarily block the messaging to simulate network issues.
primaryMessaging.dropMessages((s, networkMessage) -> {
if (networkMessage instanceof TxCleanupMessage && dropMessage.get()) {
logger().info("Dropping message: {}.", networkMessage);
cleanupStartedLatch.countDown();
return true;
}
return false;
});
// Now change the commit primary and run tx.commit().
// The transfer is performed asynchronously because the message processing block we added earlier
// will run in the current thread.
return CompletableFuture.runAsync(() -> {
try {
cleanupStartedLatch.await();
logger().info("Start transferring primary.");
NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), defaultTablePartitionId(node(0)));
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
logger().info("Finished transferring primary.");
dropMessage.set(false);
}
});
}
@Test
void testCommitAlreadyAbortedTx() throws ExecutionException, InterruptedException {
Context context = prepareTransactionData();
// Simulate the state when a tx has already been committed by writing a corresponding state into tx state storage.
markTxAbortedInTxStateStorage(context.primaryNode, context.tx, context.publicTable);
// Tx.commit should throw MismatchingTransactionOutcomeException.
TransactionException transactionException = assertThrows(TransactionException.class, context.tx::commit);
Throwable cause = ExceptionUtils.unwrapCause(transactionException.getCause());
assertInstanceOf(MismatchingTransactionOutcomeException.class, cause);
}
private void markTxAbortedInTxStateStorage(IgniteImpl primaryNode, InternalTransaction tx, Table publicTable) {
TableImpl tableImpl = unwrapTableImpl(publicTable);
TableViewInternal primaryTbl = unwrapTableViewInternal(primaryNode.tables().table(TABLE_NAME));
TxStateStorage storage = primaryTbl.internalTable().txStateStorage().getTxStateStorage(0);
TxMeta txMetaToSet = new TxMeta(
ABORTED,
asList(new TablePartitionId(tableImpl.tableId(), 0)),
null
);
bypassingThreadAssertions(() -> storage.put(tx.id(), txMetaToSet));
}
@Test
void testCleanupReplicatedMessage() throws ExecutionException, InterruptedException {
Context context = prepareTransactionData();
DefaultMessagingService primaryMessaging = messaging(context.primaryNode);
CompletableFuture<Void> cleanupReplicatedFuture = new CompletableFuture<>();
primaryMessaging.dropMessages((s, networkMessage) -> {
if (networkMessage instanceof TxCleanupMessageResponse) {
logger().info("Received message: {}.", networkMessage);
TxCleanupMessageResponse message = (TxCleanupMessageResponse) networkMessage;
if (message instanceof TxCleanupMessageErrorResponse) {
TxCleanupMessageErrorResponse error = (TxCleanupMessageErrorResponse) message;
logger().error("Cleanup Error: ", error);
return false;
}
CleanupReplicatedInfo result = message.result();
if (result != null) {
cleanupReplicatedFuture.complete(null);
}
}
return false;
});
commitAndValidate(context.tx, context.publicTable, context.keyTpl);
assertThat(cleanupReplicatedFuture, willCompleteSuccessfully());
assertTrue(waitForCondition(
() -> {
TxStateMeta stateMeta = context.primaryNode.txManager().stateMeta(context.tx.id());
return stateMeta != null && stateMeta.cleanupCompletionTimestamp() != null;
},
10_000)
);
}
private @Nullable Integer nodeIndex(String name) {
for (int i = 0; i < initialNodes(); i++) {
if (node(i).name().equals(name)) {
return i;
}
}
return null;
}
private static class Context {
private final IgniteImpl primaryNode;
private final IgniteImpl coordinatorNode;
private final Table publicTable;
private final InternalTransaction tx;
private final Tuple keyTpl;
private Context(IgniteImpl primaryNode, IgniteImpl coordinatorNode, Table publicTable, InternalTransaction tx, Tuple keyTpl) {
this.primaryNode = primaryNode;
this.coordinatorNode = coordinatorNode;
this.publicTable = publicTable;
this.tx = tx;
this.keyTpl = keyTpl;
}
}
}