blob: 08b79a07acb5fcb3cb30f2d6b56f907e5ec99425 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteTransaction;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.waitAndGetPrimaryReplica;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
/**
* Integration tests for the transactions running while the primary changes, not related to the tx recovery.
*/
public class ItTransactionPrimaryChangeTest extends ClusterPerTestIntegrationTest {
/** Table name. */
private static final String TABLE_NAME = "test_table";
/** Nodes bootstrap configuration pattern. */
private static final String NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"
+ " network: {\n"
+ " port: {},\n"
+ " nodeFinder: {\n"
+ " netClusterNodes: [ {} ]\n"
+ " }\n"
+ " },\n"
+ " clientConnector: { port:{} },\n"
+ " rest.port: {},\n"
+ " raft: { responseTimeout: 30000 },"
+ " compute.threadPoolSize: 1\n"
+ "}";
@BeforeEach
@Override
public void setup(TestInfo testInfo) throws Exception {
super.setup(testInfo);
String zoneSql = "create zone test_zone with partitions=1, replicas=3, storage_profiles='" + DEFAULT_AIPERSIST_PROFILE_NAME + "'";
String sql = "create table " + TABLE_NAME + " (key int primary key, val varchar(20)) with primary_zone='TEST_ZONE'";
cluster.doInSession(0, session -> {
executeUpdate(zoneSql, session);
executeUpdate(sql, session);
});
}
@Override
protected void customizeInitParameters(InitParametersBuilder builder) {
super.customizeInitParameters(builder);
builder.clusterConfiguration("{"
+ " transaction: {"
+ " implicitTransactionTimeout: 30000,"
+ " txnResourceTtl: 2"
+ " },"
+ " replication: {"
+ " rpcTimeout: 30000"
+ " },"
+ "}");
}
/**
* Returns node bootstrap config template.
*
* @return Node bootstrap config template.
*/
@Override
protected String getNodeBootstrapConfigTemplate() {
return NODE_BOOTSTRAP_CFG_TEMPLATE;
}
@Test
public void testFullTxConsistency() throws InterruptedException {
TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
int partId = 0;
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), partId);
String leaseholder = waitAndGetPrimaryReplica(node(0), tblReplicationGrp).getLeaseholder();
IgniteImpl firstLeaseholderNode = findNodeByName(leaseholder);
log.info("Test: Full transaction will be executed on [node={}].", firstLeaseholderNode.name());
IgniteImpl txCrdNode = findNode(0, initialNodes(), n -> !leaseholder.equals(n.name()));
log.info("Test: Transaction coordinator is [node={}].", txCrdNode.name());
RecordView<Tuple> view = txCrdNode.tables().table(TABLE_NAME).recordView();
// Put some value into the table.
Transaction txPreload = txCrdNode.transactions().begin();
log.info("Test: Preloading the data [tx={}].", ((ReadWriteTransactionImpl) unwrapIgniteTransaction(txPreload)).id());
view.upsert(txPreload, Tuple.create().set("key", 1).set("val", "1"));
txPreload.commit();
var fullTxReplicationAttemptFuture = new CompletableFuture<>();
var regularTxComplete = new CompletableFuture<>();
firstLeaseholderNode.dropMessages((node, msg) -> {
if (msg instanceof WriteActionRequest) {
WriteActionRequest writeActionRequest = (WriteActionRequest) msg;
if (tblReplicationGrp.toString().equals(writeActionRequest.groupId())
&& writeActionRequest.deserializedCommand() instanceof UpdateCommand
&& !fullTxReplicationAttemptFuture.isDone()) {
UpdateCommand updateCommand = (UpdateCommand) writeActionRequest.deserializedCommand();
if (updateCommand.full()) {
fullTxReplicationAttemptFuture.complete(null);
log.info("Test: Stopped the full tx before sending write command [txId={}].", updateCommand.txId());
regularTxComplete.join();
}
}
}
return false;
});
// Starting the full transaction.
log.info("Test: Starting the full transaction.");
CompletableFuture<Tuple> fullTxFut = view.getAndDeleteAsync(null, Tuple.create().set("key", 1));
try {
assertThat(fullTxReplicationAttemptFuture, willCompleteSuccessfully());
// Changing the primary.
NodeUtils.transferPrimary(cluster.runningNodes().collect(toList()), tblReplicationGrp, txCrdNode.name());
// Start a regular transaction that increments the value. It should see the initially inserted value and its commit should
// succeed.
Transaction tx = txCrdNode.transactions().begin();
log.info("Test: Started the regular transaction [txId={}].", ((ReadWriteTransactionImpl) unwrapIgniteTransaction(tx)).id());
Tuple t = view.get(tx, Tuple.create().set("key", 1));
assertEquals("1", t.value(1));
view.upsert(tx, Tuple.create().set("key", 1).set("val", "2"));
tx.commit();
log.info("Test: Completed the regular transaction [txId={}].", ((ReadWriteTransactionImpl) unwrapIgniteTransaction(tx)).id());
} finally {
regularTxComplete.complete(null);
}
fullTxFut.join();
// Full transaction should finally complete.
assertThat(fullTxFut, willCompleteSuccessfully());
log.info("Test: Completed the full transaction [txId={}].");
// Transaction "tx" got the value, this means the full transaction hasn't executed yet at the moment of "tx" commit.
// Hence the retrieved value should always be 2.
assertEquals("2", fullTxFut.join().value("val"));
}
private IgniteImpl findNode(int startRange, int endRange, Predicate<IgniteImpl> filter) {
return IntStream.range(startRange, endRange)
.mapToObj(this::node)
.filter(filter::test)
.findFirst()
.get();
}
private IgniteImpl findNodeByName(String leaseholder) {
return findNode(0, initialNodes(), n -> leaseholder.equals(n.name()));
}
}