blob: c6523efc8254c25e267a35798344ae6af210da0c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.placementdriver;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static org.apache.ignite.internal.wrapper.Wrappers.unwrapNullable;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.impl.schema.TestProfileConfigurationSchema;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.NodeUtils;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.flow.TestFlowUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.internal.wrapper.Wrappers;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.TransactionOptions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
/**
* TODO: IGNITE-20485 Configure the lease interval as less as possible to decrease the duration of tests.
* The test class checks invariant of a primary replica choice.
*/
@SuppressWarnings("resource")
public class ItPrimaryReplicaChoiceTest extends ClusterPerTestIntegrationTest {
private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
private static final int PART_ID = 0;
private static final String ZONE_NAME = "ZONE_TABLE";
private static final String TABLE_NAME = "TEST_TABLE";
private static final String HASH_IDX = "HASH_IDX";
private static final String SORTED_IDX = "SORTED_IDX";
@BeforeEach
@Override
public void setup(TestInfo testInfo) throws Exception {
super.setup(testInfo);
String zoneSql = IgniteStringFormatter.format(
"CREATE ZONE IF NOT EXISTS {} WITH REPLICAS={}, PARTITIONS={}, STORAGE_PROFILES='{}'",
ZONE_NAME, 3, 1, TestProfileConfigurationSchema.TEST_PROFILE_NAME
);
String sql = IgniteStringFormatter.format(
"CREATE TABLE {} (key INT PRIMARY KEY, val VARCHAR(20)) WITH PRIMARY_ZONE='{}'",
TABLE_NAME, ZONE_NAME
);
String hashIdxSql = IgniteStringFormatter.format("CREATE INDEX {} ON {} USING HASH (val)",
HASH_IDX, TABLE_NAME);
String sortedIdxSql = IgniteStringFormatter.format("CREATE INDEX {} ON {} USING SORTED (val)",
SORTED_IDX, TABLE_NAME);
cluster.doInSession(0, session -> {
executeUpdate(zoneSql, session);
executeUpdate(sql, session);
executeUpdate(hashIdxSql, session);
executeUpdate(sortedIdxSql, session);
});
}
@Test
public void testPrimaryChangeSubscription() throws Exception {
TableViewInternal tbl = unwrapTableViewInternal(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
node(0).clock().now(),
AWAIT_PRIMARY_REPLICA_TIMEOUT,
SECONDS
);
assertThat(primaryReplicaFut, willCompleteSuccessfully());
String primary = primaryReplicaFut.get().getLeaseholder();
IgniteImpl ignite = node(primary);
AtomicBoolean primaryChanged = new AtomicBoolean();
ignite.placementDriver().listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, evt -> {
primaryChanged.set(true);
return falseCompletedFuture();
});
NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), tblReplicationGrp);
assertTrue(waitForCondition(primaryChanged::get, 10_000));
}
@Test
public void testPrimaryChangeLongHandling() throws Exception {
TableViewInternal tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME));
var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
node(0).clock().now(),
AWAIT_PRIMARY_REPLICA_TIMEOUT,
SECONDS
);
assertThat(primaryReplicaFut, willCompleteSuccessfully());
String primary = primaryReplicaFut.get().getLeaseholder();
IgniteImpl ignite = node(primary);
CompletableFuture<Boolean> primaryChangedHandling = new CompletableFuture<>();
ignite.placementDriver().listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, evt -> primaryChangedHandling);
log.info("Primary replica is: " + primary);
Collection<IgniteImpl> nodes = cluster.runningNodes().collect(toSet());
NodeUtils.transferPrimary(nodes, tblReplicationGrp);
CompletableFuture<String> primaryChangeTask =
IgniteTestUtils.runAsync(() -> NodeUtils.transferPrimary(nodes, tblReplicationGrp, primary));
waitingForLeaderCache(tbl, primary);
assertFalse(primaryChangeTask.isDone());
primaryChangedHandling.complete(false);
assertThat(primaryChangeTask, willCompleteSuccessfully());
assertEquals(primary, primaryChangeTask.get());
}
/**
* The test checks cleanup two types of resources: locks (which are used to support transaction logic) and cursors (which are used to
* hold a context of scan operations on the server side).
*
* @throws Exception If failed.
*/
@Test
public void testClearingTransactionResourcesWhenPrimaryChange() throws Exception {
Table publicTableOnNode0 = node(0).tables().table(TABLE_NAME);
TableViewInternal unwrappedTableOnNode0 = unwrapTableViewInternal(publicTableOnNode0);
for (int i = 0; i < 10; i++) {
assertTrue(publicTableOnNode0.recordView().insert(null, Tuple.create().set("key", i).set("val", "preload val")));
}
var tblReplicationGrp = new TablePartitionId(unwrappedTableOnNode0.tableId(), PART_ID);
CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
node(0).clock().now(),
AWAIT_PRIMARY_REPLICA_TIMEOUT,
SECONDS
);
assertThat(primaryReplicaFut, willCompleteSuccessfully());
String primary = primaryReplicaFut.get().getLeaseholder();
IgniteImpl primaryIgnite = node(primary);
int hashIdxId = getIndexId(HASH_IDX);
int sortedIdxId = getIndexId(SORTED_IDX);
InternalTransaction rwTx = (InternalTransaction) node(0).transactions().begin();
InternalTransaction roTx = (InternalTransaction) node(0).transactions().begin(new TransactionOptions().readOnly(true));
assertTrue(publicTableOnNode0.recordView().insert(rwTx, Tuple.create().set("key", 42).set("val", "val 42")));
BinaryTuple idxKey = new BinaryTuple(1, new BinaryTupleBuilder(1).appendString("preload val").build());
// Start cursors in the RW transaction.
scanSingleEntryAndLeaveCursorOpen(unwrappedTableOnNode0, rwTx, null, null);
scanSingleEntryAndLeaveCursorOpen(unwrappedTableOnNode0, rwTx, hashIdxId, idxKey);
scanSingleEntryAndLeaveCursorOpen(unwrappedTableOnNode0, rwTx, sortedIdxId, null);
// Start cursors in the RO transaction.
scanSingleEntryAndLeaveCursorOpen(unwrappedTableOnNode0, roTx, null, null);
scanSingleEntryAndLeaveCursorOpen(unwrappedTableOnNode0, roTx, hashIdxId, idxKey);
scanSingleEntryAndLeaveCursorOpen(unwrappedTableOnNode0, roTx, sortedIdxId, null);
TableViewInternal unwrappedTableFromPrimary = unwrapTableViewInternal(primaryIgnite.tables().table(TABLE_NAME));
TestMvPartitionStorage partitionStorage = unwrapNullable(
unwrappedTableFromPrimary.internalTable().storage().getMvPartition(PART_ID),
TestMvPartitionStorage.class
);
TestHashIndexStorage hashIdxStorage = unwrapNullable(
unwrappedTableFromPrimary.internalTable().storage().getIndex(PART_ID, hashIdxId),
TestHashIndexStorage.class
);
TestSortedIndexStorage sortedIdxStorage = unwrapNullable(
unwrappedTableFromPrimary.internalTable().storage().getIndex(PART_ID, sortedIdxId),
TestSortedIndexStorage.class
);
assertTrue(primaryIgnite.txManager().lockManager().locks(rwTx.id()).hasNext());
assertEquals(6, partitionStorage.pendingCursors() + hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
NodeUtils.transferPrimary(cluster.runningNodes().collect(toSet()), tblReplicationGrp);
assertTrue(primaryIgnite.txManager().lockManager().locks(rwTx.id()).hasNext());
assertEquals(6, partitionStorage.pendingCursors() + hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
rwTx.rollback();
assertFalse(primaryIgnite.txManager().lockManager().locks(rwTx.id()).hasNext());
assertEquals(3, partitionStorage.pendingCursors() + hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
}
/**
* Starts a scan procedure for a specific transaction and reads only the first line from the cursor.
*
* @param unwrappedTable Scanned table.
* @param tx Transaction.
* @param idxId Index id.
*/
private void scanSingleEntryAndLeaveCursorOpen(
TableViewInternal unwrappedTable,
InternalTransaction tx,
Integer idxId,
BinaryTuple exactKey
) {
bypassingThreadAssertions(() -> {
try {
scanSingleEntryAndLeaveCursorOpen0(unwrappedTable, tx, idxId, exactKey);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
private void scanSingleEntryAndLeaveCursorOpen0(
TableViewInternal unwrappedTable,
InternalTransaction tx,
Integer idxId,
BinaryTuple exactKey
) throws Exception {
InternalTable internalTable = unwrappedTable.internalTable();
Publisher<BinaryRow> publisher;
if (tx.isReadOnly()) {
CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().getPrimaryReplica(
new TablePartitionId(unwrappedTable.tableId(), PART_ID),
node(0).clock().now()
);
assertThat(primaryReplicaFut, willCompleteSuccessfully());
assertNotNull(primaryReplicaFut.join());
String primaryId = primaryReplicaFut.get().getLeaseholderId();
ClusterNode primaryNode = node(0).clusterNodes().stream().filter(node -> node.id().equals(primaryId)).findAny().get();
if (idxId == null) {
publisher = internalTable.scan(PART_ID, tx.id(), tx.readTimestamp(), primaryNode, tx.coordinatorId());
} else if (exactKey == null) {
publisher = internalTable.scan(PART_ID, tx.id(), tx.readTimestamp(), primaryNode, idxId, null, null, 0, null,
tx.coordinatorId());
} else {
publisher = internalTable.lookup(PART_ID, tx.id(), tx.readTimestamp(), primaryNode, idxId, exactKey, null,
tx.coordinatorId());
}
} else if (idxId == null) {
publisher = unwrappedTable.internalTable().scan(PART_ID, tx);
} else if (exactKey == null) {
publisher = unwrappedTable.internalTable().scan(PART_ID, tx, idxId, null, null, 0, null);
} else {
ReadWriteTransactionImpl rwTx = Wrappers.unwrap(tx, ReadWriteTransactionImpl.class);
CompletableFuture<ReplicaMeta> primaryReplicaFut = node(0).placementDriver().getPrimaryReplica(
new TablePartitionId(unwrappedTable.tableId(), PART_ID),
node(0).clock().now()
);
assertThat(primaryReplicaFut, willCompleteSuccessfully());
assertNotNull(primaryReplicaFut.join());
String primaryId = primaryReplicaFut.get().getLeaseholderId();
ClusterNode primaryNode = node(0).clusterNodes().stream().filter(node -> node.id().equals(primaryId)).findAny().get();
publisher = unwrappedTable.internalTable().lookup(
PART_ID,
rwTx.id(),
rwTx.commitPartition(),
rwTx.coordinatorId(),
new PrimaryReplica(primaryNode, primaryReplicaFut.get().getStartTime().longValue()),
idxId,
exactKey,
null
);
}
ArrayList<BinaryRow> scannedRows = new ArrayList<>();
CompletableFuture<Void> scanned = new CompletableFuture<>();
Subscription subscription = TestFlowUtils.subscribeToPublisher(scannedRows, publisher, scanned);
subscription.request(1);
assertTrue(waitForCondition(() -> scannedRows.size() == 1, 10_000));
assertFalse(scanned.isDone());
}
/**
* Gets index id by name.
*
* @param idxName Index name.
* @return Index id.
*/
private int getIndexId(String idxName) {
CatalogManager catalogManager = node(0).catalogManager();
int catalogVersion = catalogManager.latestCatalogVersion();
return catalogManager.indexes(catalogVersion).stream()
.filter(index -> {
log.info("Scanned idx " + index.name());
return idxName.equalsIgnoreCase(index.name());
})
.mapToInt(CatalogObjectDescriptor::id)
.findFirst()
.getAsInt();
}
/**
* Waits when the leader would be a different with the current primary replica.
*
* @param tbl Table.
* @param primary Current primary replica name.
* @throws InterruptedException If fail.
*/
private static void waitingForLeaderCache(TableViewInternal tbl, String primary) throws InterruptedException {
RaftGroupService raftSrvc = tbl.internalTable().tableRaftService().partitionRaftGroupService(0);
assertTrue(waitForCondition(() -> {
raftSrvc.refreshLeader();
Peer leader = raftSrvc.leader();
return leader != null;
}, 10_000));
}
/**
* Gets Ignite instance by the name.
*
* @param name Node name.
* @return Ignite instance.
*/
private @Nullable IgniteImpl node(String name) {
for (int i = 0; i < initialNodes(); i++) {
if (node(i).name().equals(name)) {
return node(i);
}
}
return null;
}
}