blob: 70b68a98e6f6766841b79b08386f91bd245fbd2b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.raftsnapshot;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIMEM_PROFILE_NAME;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_ROCKSDB_PROFILE_NAME;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.raft.util.OptimizedMarshaller.NO_POOL;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.apache.ignite.internal.Cluster;
import org.apache.ignite.internal.IgniteIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.cluster.management.CmgGroupId;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.command.SafeTimeSyncCommand;
import org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryStorageEngine;
import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
import org.apache.ignite.internal.table.distributed.raft.snapshot.incoming.IncomingSnapshotCopier;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMetaResponse;
import org.apache.ignite.internal.table.distributed.schema.PartitionCommandsMarshallerImpl;
import org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.log4j2.LogInspector;
import org.apache.ignite.internal.testframework.log4j2.LogInspector.Handler;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.core.Replicator;
import org.apache.ignite.raft.jraft.error.RaftError;
import org.apache.ignite.raft.jraft.rpc.Message;
import org.apache.ignite.raft.jraft.rpc.RaftRpcFactory;
import org.apache.ignite.raft.jraft.rpc.RaftServerService;
import org.apache.ignite.raft.jraft.rpc.RpcProcessor;
import org.apache.ignite.raft.jraft.rpc.RpcRequestClosure;
import org.apache.ignite.raft.jraft.rpc.RpcRequestProcessor;
import org.apache.ignite.raft.jraft.rpc.RpcRequests.AppendEntriesRequest;
import org.apache.ignite.raft.jraft.rpc.RpcServer;
import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer;
import org.apache.ignite.raft.jraft.rpc.impl.core.AppendEntriesRequestProcessor;
import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotExecutorImpl;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
/**
* Tests how RAFT snapshots installation works for table partitions.
*/
@SuppressWarnings("resource")
@Timeout(90)
class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
private static final IgniteLogger LOG = Loggers.forClass(ItTableRaftSnapshotsTest.class);
/**
* Nodes bootstrap configuration pattern.
*
* <p>rpcInstallSnapshotTimeout is changed to 10 seconds so that sporadic snapshot installation failures still
* allow tests pass thanks to retries.
*/
private static final String NODE_BOOTSTRAP_CFG = "{\n"
+ " network: {\n"
+ " port: {},\n"
+ " nodeFinder.netClusterNodes: [ {} ]\n"
+ " },\n"
+ " raft.rpcInstallSnapshotTimeout: 10000,\n"
+ " storage.profiles: {"
+ " " + DEFAULT_AIPERSIST_PROFILE_NAME + ".engine: aipersist, "
+ " " + DEFAULT_AIMEM_PROFILE_NAME + ".engine: aimem, "
+ " " + DEFAULT_ROCKSDB_PROFILE_NAME + ".engine: rocksdb"
+ " },\n"
+ " clientConnector.port: {},\n"
+ " rest.port: {}\n"
+ "}";
/**
* Marker that instructs to create a table with the default storage engine. Used in tests that are indifferent
* to a storage engine used.
*/
private static final String DEFAULT_STORAGE_ENGINE = "<default>";
@WorkDirectory
private Path workDir;
private Cluster cluster;
private LogInspector replicatorLogInspector;
private LogInspector copierLogInspector;
@BeforeEach
void createCluster(TestInfo testInfo) {
cluster = new Cluster(testInfo, workDir, NODE_BOOTSTRAP_CFG);
replicatorLogInspector = LogInspector.create(Replicator.class, true);
copierLogInspector = LogInspector.create(IncomingSnapshotCopier.class, true);
}
@AfterEach
@Timeout(60)
void shutdownCluster() {
replicatorLogInspector.stop();
copierLogInspector.stop();
cluster.shutdown();
}
/**
* Tests that a leader successfully feeds a follower with a RAFT snapshot on any of the supported storage engines.
*/
// TODO: IGNITE-18481 - make sure we don't forget to add new storage engines here
@ParameterizedTest
@ValueSource(strings = {
RocksDbStorageEngine.ENGINE_NAME,
PersistentPageMemoryStorageEngine.ENGINE_NAME
// TODO: uncomment when https://issues.apache.org/jira/browse/IGNITE-19234 is fixed
// VolatilePageMemoryStorageEngine.ENGINE_NAME
})
void leaderFeedsFollowerWithSnapshot(String storageEngine) throws Exception {
testLeaderFeedsFollowerWithSnapshot(storageEngine);
}
/**
* Tests that a leader successfully feeds a follower with a RAFT snapshot (using the given storage engine).
*/
private void testLeaderFeedsFollowerWithSnapshot(String storageEngine) throws Exception {
feedNode2WithSnapshotOfOneRow(storageEngine);
transferLeadershipOnSolePartitionTo(2);
assertThat(getFromNode(2, 1), is("one"));
}
private @Nullable String getFromNode(int clusterNode, int key) {
return tableViewAt(clusterNode).get(null, key);
}
private void feedNode2WithSnapshotOfOneRow() throws InterruptedException {
feedNode2WithSnapshotOfOneRow(DEFAULT_STORAGE_ENGINE);
}
private void feedNode2WithSnapshotOfOneRow(String storageEngine) throws InterruptedException {
prepareClusterForInstallingSnapshotToNode2(storageEngine);
reanimateNode2AndWaitForSnapshotInstalled();
}
/**
* Transfer the cluster to a state in which, when node 2 is reanimated from being knocked-out, the only partition
* of the only table (called TEST) is transferred to it using RAFT snapshot installation mechanism.
*/
private void prepareClusterForInstallingSnapshotToNode2() throws InterruptedException {
prepareClusterForInstallingSnapshotToNode2(DEFAULT_STORAGE_ENGINE);
}
/**
* Transfer the cluster to a state in which, when node 2 is reanimated from being knocked-out, the only partition
* of the only table (called TEST) is transferred to it using RAFT snapshot installation mechanism.
*
* @param storageEngine Storage engine for the TEST table.
*/
private void prepareClusterForInstallingSnapshotToNode2(String storageEngine) throws InterruptedException {
prepareClusterForInstallingSnapshotToNode2(storageEngine, theCluster -> {});
}
/**
* Transfer the cluster to a state in which, when node 2 is reanimated from being knocked-out, the only partition
* of the only table (called TEST) is transferred to it using RAFT snapshot installation mechanism.
*
* @param storageEngine Storage engine for the TEST table.
* @param doOnClusterAfterInit Action executed just after the cluster is started and initialized.
*/
private void prepareClusterForInstallingSnapshotToNode2(
String storageEngine,
Consumer<Cluster> doOnClusterAfterInit
) throws InterruptedException {
startAndInitCluster();
doOnClusterAfterInit.accept(cluster);
createTestTableWith3Replicas(storageEngine);
// Prepare the scene: force node 0 to be a leader, and node 2 to be a follower.
transferLeadershipOnSolePartitionTo(0);
knockoutNode(2);
putToNode(0, 1, "one");
// Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot.
causeLogTruncationOnSolePartitionLeader(0);
}
private void startAndInitCluster() {
cluster.startAndInit(3, IntStream.range(0, 3).toArray());
}
private void putToNode(int nodeIndex, int key, String value) {
putToNode(nodeIndex, key, value, null);
}
private void putToNode(int nodeIndex, int key, String value, @Nullable Transaction tx) {
tableViewAt(nodeIndex).put(tx, key, value);
}
private KeyValueView<Integer, String> tableViewAt(int nodeIndex) {
Table table = cluster.node(nodeIndex).tables().table("test");
return table.keyValueView(Integer.class, String.class);
}
private void knockoutNode(int nodeIndex) {
cluster.stopNode(nodeIndex);
LOG.info("Node {} knocked out", nodeIndex);
}
private void createTestTableWith3Replicas(String storageEngine) {
String storageProfile =
DEFAULT_STORAGE_ENGINE.equals(storageEngine) ? DEFAULT_STORAGE_PROFILE : "default_" + storageEngine.toLowerCase();
String zoneSql = "create zone test_zone with partitions=1, replicas=3, storage_profiles='" + storageProfile + "';";
String sql = "create table test (key int primary key, val varchar(20))"
+ " with primary_zone='TEST_ZONE', storage_profile='" + storageProfile + "';";
cluster.doInSession(0, session -> {
executeUpdate(zoneSql, session);
executeUpdate(sql, session);
});
}
/**
* Causes log truncation on the RAFT leader of the sole table partition that exists in the cluster.
* After such truncation, when a knocked-out follower gets reanimated, the leader will not be able to feed it
* with AppendEntries (because the leader does not already have the index that is required to send AppendEntries
* to the lagging follower), so the leader will have to send InstallSnapshot instead.
*/
private void causeLogTruncationOnSolePartitionLeader(int expectedLeaderNodeIndex) throws InterruptedException {
// Doing this twice because first snapshot creation does not trigger log truncation.
doSnapshotOnSolePartitionLeader(expectedLeaderNodeIndex);
doSnapshotOnSolePartitionLeader(expectedLeaderNodeIndex);
}
/**
* Causes a RAFT snapshot to be taken on the RAFT leader of the sole table partition that exists in the cluster.
*/
private void doSnapshotOnSolePartitionLeader(int expectedLeaderNodeIndex) throws InterruptedException {
TablePartitionId tablePartitionId = cluster.solePartitionId();
doSnapshotOn(tablePartitionId, expectedLeaderNodeIndex);
}
/**
* Takes a RAFT snapshot on the leader of the RAFT group corresponding to the given table partition.
*/
private void doSnapshotOn(TablePartitionId tablePartitionId, int expectedLeaderNodeIndex) throws InterruptedException {
RaftGroupService raftGroupService = cluster.leaderServiceFor(tablePartitionId);
assertThat(
"Unexpected leadership change on group: " + tablePartitionId,
raftGroupService.getServerId().getConsistentId(), is(cluster.node(expectedLeaderNodeIndex).name())
);
CountDownLatch snapshotLatch = new CountDownLatch(1);
AtomicReference<Status> snapshotStatus = new AtomicReference<>();
raftGroupService.getRaftNode().snapshot(status -> {
snapshotStatus.set(status);
snapshotLatch.countDown();
});
assertTrue(snapshotLatch.await(10, TimeUnit.SECONDS), "Snapshot was not finished in time");
assertTrue(snapshotStatus.get().isOk(), "Snapshot failed: " + snapshotStatus.get());
}
/**
* Reanimates (that is, reverts the effects of a knock out) node 2 and waits until a RAFT snapshot is installed
* on it for the sole table partition in the cluster.
*/
private void reanimateNode2AndWaitForSnapshotInstalled() throws InterruptedException {
reanimateNodeAndWaitForSnapshotInstalled(2);
}
/**
* Reanimates (that is, reverts the effects of a knock out) a node with the given index and waits until a RAFT snapshot is installed
* on it for the sole table partition in the cluster.
*/
private void reanimateNodeAndWaitForSnapshotInstalled(int nodeIndex) throws InterruptedException {
CountDownLatch snapshotInstalledLatch = snapshotInstalledLatch(nodeIndex);
reanimateNode(nodeIndex);
assertTrue(snapshotInstalledLatch.await(60, TimeUnit.SECONDS), "Did not install a snapshot in time");
}
private CountDownLatch snapshotInstalledLatch(int nodeIndex) {
CountDownLatch snapshotInstalledLatch = new CountDownLatch(1);
replicatorLogInspector.addHandler(
evt -> evt.getMessage().getFormattedMessage().matches(
"Node \\S+ received InstallSnapshotResponse from \\S+_" + nodeIndex + " .+ success=true"),
snapshotInstalledLatch::countDown
);
return snapshotInstalledLatch;
}
private void reanimateNode(int nodeIndex) {
cluster.startNode(nodeIndex);
}
private void transferLeadershipOnSolePartitionTo(int nodeIndex) throws InterruptedException {
cluster.transferLeadershipTo(nodeIndex, cluster.solePartitionId());
}
/**
* Tests that, if first part of a transaction (everything before COMMIT) arrives using AppendEntries, and later the whole
* partition state arrives in a RAFT snapshot, then the transaction is seen as committed (i.e. its effects are seen).
*/
@Test
void txSemanticsIsMaintained() throws Exception {
txSemanticsIsMaintainedAfterInstallingSnapshot();
}
/**
* Tests that, if first part of a transaction (everything before COMMIT) arrives using AppendEntries, and later the whole
* partition state arrives in a RAFT snapshot, then the transaction is seen as committed (i.e. its effects are seen).
*/
private void txSemanticsIsMaintainedAfterInstallingSnapshot() throws Exception {
startAndInitCluster();
createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
// Prepare the scene: force node 0 to be a leader, and node 2 to be a follower.
transferLeadershipOnSolePartitionTo(0);
Transaction tx = cluster.node(0).transactions().begin();
putToNode(0, 1, "one", tx);
knockoutNode(2);
tx.commit();
// Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot.
causeLogTruncationOnSolePartitionLeader(0);
reanimateNode2AndWaitForSnapshotInstalled();
transferLeadershipOnSolePartitionTo(2);
assertThat(getFromNode(2, 1), is("one"));
}
/**
* Tests that entries can still be added to a follower using AppendEntries after it gets fed with a RAFT snapshot.
*/
@Test
void entriesKeepAppendedAfterSnapshotInstallation() throws Exception {
feedNode2WithSnapshotOfOneRow();
putToNode(0, 2, "two");
transferLeadershipOnSolePartitionTo(2);
assertThat(getFromNode(0, 1), is("one"));
assertThat(getFromNode(0, 2), is("two"));
}
/**
* Tests that, if commands are added to a leader while it installs a RAFT snapshot on a follower, these commands
* reach the follower and get applied after the snapshot is installed.
*/
@Test
void entriesKeepAppendedDuringSnapshotInstallation() throws Exception {
prepareClusterForInstallingSnapshotToNode2();
AtomicBoolean installedSnapshot = new AtomicBoolean(false);
AtomicInteger lastLoadedKey = new AtomicInteger();
CompletableFuture<?> loadingFuture = IgniteTestUtils.runAsync(() -> {
for (int key = 2; !installedSnapshot.get(); key++) {
putToNode(0, key, "extra");
lastLoadedKey.set(key);
}
});
reanimateNode2AndWaitForSnapshotInstalled();
installedSnapshot.set(true);
assertThat(loadingFuture, willSucceedIn(30, TimeUnit.SECONDS));
transferLeadershipOnSolePartitionTo(2);
assertThat(getFromNode(2, 1), is("one"));
List<Integer> expectedKeysAndNextKey = IntStream.rangeClosed(2, lastLoadedKey.get() + 1).boxed().collect(toList());
Map<Integer, String> keysToValues = tableViewAt(2).getAll(null, expectedKeysAndNextKey);
Set<Integer> expectedKeys = IntStream.rangeClosed(2, lastLoadedKey.get()).boxed().collect(toSet());
assertThat(keysToValues.keySet(), equalTo(expectedKeys));
assertThat(keysToValues.values(), everyItem(is("extra")));
}
/**
* Tests that, after a node gets a RAFT snapshot installed to it, and it switches to a leader, it can act as a leader
* (and can install a RAFT snapshot on the ex-leader).
*/
@Test
void nodeCanInstallSnapshotsAfterSnapshotInstalledToIt() throws Exception {
feedNode2WithSnapshotOfOneRow();
// The leader (0) has fed the follower (2). Now, change roles: the new leader will be node 2, it will feed node 0.
transferLeadershipOnSolePartitionTo(2);
knockoutNode(0);
putToNode(2, 2, "two");
// Make sure AppendEntries from leader to follower is impossible, making the leader to use InstallSnapshot.
causeLogTruncationOnSolePartitionLeader(2);
reanimateNodeAndWaitForSnapshotInstalled(0);
transferLeadershipOnSolePartitionTo(0);
assertThat(getFromNode(0, 1), is("one"));
assertThat(getFromNode(0, 2), is("two"));
}
/**
* Tests that, if a snapshot installation fails for some reason, a subsequent retry due to a timeout happens successfully.
*/
@Test
void snapshotInstallationRepeatsOnTimeout() throws Exception {
prepareClusterForInstallingSnapshotToNode2(DEFAULT_STORAGE_ENGINE, theCluster -> {
theCluster.node(0).dropMessages(dropFirstSnapshotMetaResponse());
});
reanimateNode2AndWaitForSnapshotInstalled();
}
private BiPredicate<String, NetworkMessage> dropFirstSnapshotMetaResponse() {
AtomicBoolean sentSnapshotMetaResponse = new AtomicBoolean(false);
return dropFirstSnapshotMetaResponse(sentSnapshotMetaResponse);
}
private BiPredicate<String, NetworkMessage> dropFirstSnapshotMetaResponse(AtomicBoolean sentSnapshotMetaResponse) {
String node2Name = cluster.node(2).name();
return (targetConsistentId, message) -> {
if (Objects.equals(targetConsistentId, node2Name) && message instanceof SnapshotMetaResponse) {
return sentSnapshotMetaResponse.compareAndSet(false, true);
} else {
return false;
}
};
}
private BiPredicate<String, NetworkMessage> dropSnapshotMetaResponse(CompletableFuture<Void> sentFirstSnapshotMetaResponse) {
String node2Name = cluster.node(2).name();
return (targetConsistentId, message) -> {
if (Objects.equals(targetConsistentId, node2Name) && message instanceof SnapshotMetaResponse) {
sentFirstSnapshotMetaResponse.complete(null);
// Always drop.
return true;
} else {
return false;
}
};
}
/**
* This is a test for a tricky scenario:
*
* <ol>
* <li>First InstallSnapshot request is sent, its processing starts hanging forever (it will be cancelled on step 3</li>
* <li>After a timeout, second InstallSnapshot request is sent with same index+term as the first had; in JRaft, it causes
* a special handling (previous request processing is NOT cancelled)</li>
* <li>After a timeout, third InstallSnapshot request is sent with DIFFERENT index, so it cancels the first snapshot processing
* effectively unblocking the first thread</li>
* </ol>
*
* <p>In the original JRaft implementation, after being unblocked, the first thread fails to clean up, so subsequent retries will
* always see a phantom of an unfinished snapshot, so the snapshotting process will be jammed. Also, node stop might
* stuck because one 'download' task will remain unfinished forever.
*/
@Test
void snapshotInstallTimeoutDoesNotBreakSubsequentInstallsWhenSecondAttemptIsIdenticalToFirst() throws Exception {
AtomicBoolean snapshotInstallFailedDueToIdenticalRetry = new AtomicBoolean(false);
LogInspector snapshotExecutorLogInspector = LogInspector.create(SnapshotExecutorImpl.class);
Handler snapshotInstallFailedDueToIdenticalRetryHandler =
snapshotExecutorLogInspector.addHandler(
evt -> evt.getMessage().getFormattedMessage().contains(
"Register DownloadingSnapshot failed: interrupted by retry installing request"),
() -> snapshotInstallFailedDueToIdenticalRetry.set(true));
snapshotExecutorLogInspector.start();
try {
prepareClusterForInstallingSnapshotToNode2(DEFAULT_STORAGE_ENGINE, theCluster -> {
IgniteImpl node = theCluster.node(0);
MessageSerializationRegistry serializationRegistry = node.raftManager().service().serializationRegistry();
BiPredicate<String, NetworkMessage> dropSafeTimeUntilSecondInstallSnapshotRequestIsProcessed = (recipientId, message) ->
message instanceof WriteActionRequest
&& isSafeTimeSyncCommand((WriteActionRequest) message, serializationRegistry)
&& !snapshotInstallFailedDueToIdenticalRetry.get();
theCluster.node(0).dropMessages(
dropFirstSnapshotMetaResponse().or(dropSafeTimeUntilSecondInstallSnapshotRequestIsProcessed)
);
theCluster.node(1).dropMessages(dropSafeTimeUntilSecondInstallSnapshotRequestIsProcessed);
theCluster.node(2).dropMessages(dropSafeTimeUntilSecondInstallSnapshotRequestIsProcessed);
});
reanimateNode2AndWaitForSnapshotInstalled();
} finally {
snapshotExecutorLogInspector.removeHandler(snapshotInstallFailedDueToIdenticalRetryHandler);
snapshotExecutorLogInspector.stop();
}
}
private static boolean isSafeTimeSyncCommand(WriteActionRequest request, MessageSerializationRegistry serializationRegistry) {
String groupId = request.groupId();
if (groupId.equals(MetastorageGroupId.INSTANCE.toString()) || groupId.equals(CmgGroupId.INSTANCE.toString())) {
return false;
}
var commandsMarshaller = new PartitionCommandsMarshallerImpl(serializationRegistry, NO_POOL);
return commandsMarshaller.unmarshall(request.command()) instanceof SafeTimeSyncCommand;
}
@Test
void testChangeLeaderOnInstallSnapshotInMiddle() throws Exception {
CompletableFuture<Void> sentSnapshotMetaResponseFormNode1Future = new CompletableFuture<>();
prepareClusterForInstallingSnapshotToNode2(DEFAULT_STORAGE_ENGINE, cluster -> {
// Let's hang the InstallSnapshot in the "middle" from the leader with index 1.
cluster.node(1).dropMessages(dropSnapshotMetaResponse(sentSnapshotMetaResponseFormNode1Future));
});
// Change the leader and truncate its log so that InstallSnapshot occurs instead of AppendEntries.
transferLeadershipOnSolePartitionTo(1);
causeLogTruncationOnSolePartitionLeader(1);
CompletableFuture<Void> installSnapshotSuccessfulFuture = new CompletableFuture<>();
listenForSnapshotInstalledSuccessFromLogger(0, 2, installSnapshotSuccessfulFuture);
// Return node 2.
reanimateNode(2);
// Waiting for the InstallSnapshot from node 2 to hang in the "middle".
assertThat(sentSnapshotMetaResponseFormNode1Future, willSucceedIn(1, TimeUnit.MINUTES));
// Change the leader to node 0.
transferLeadershipOnSolePartitionTo(0);
// Waiting for the InstallSnapshot successfully from node 0 to node 2.
assertThat(installSnapshotSuccessfulFuture, willSucceedIn(1, TimeUnit.MINUTES));
// Make sure the rebalancing is complete.
assertThat(getFromNode(2, 1), is("one"));
}
/**
* Adds a listener for the {@link #replicatorLogInspector} to hear the success of the snapshot installation.
*/
private void listenForSnapshotInstalledSuccessFromLogger(
int nodeIndexFrom,
int nodeIndexTo,
CompletableFuture<Void> snapshotInstallSuccessfullyFuture
) {
String regexp = "Node \\S+" + nodeIndexFrom + " received InstallSnapshotResponse from \\S+_" + nodeIndexTo + " .+ success=true";
replicatorLogInspector.addHandler(
evt -> evt.getMessage().getFormattedMessage().matches(regexp),
() -> snapshotInstallSuccessfullyFuture.complete(null)
);
}
/**
* This tests the following schenario.
*
* <ol>
* <li>
* A snapshot installation is started from Node A that is a leader because A does not have enough RAFT log to feed a follower
* with AppendEntries
* </li>
* <li>It is cancelled in the middle</li>
* <li>Node B is elected as a leader; B has enough log to feed the follower with AppendEntries</li>
* <li>The follower gets data from the leader using AppendEntries, not using InstallSnapshot</li>>
* </ol>
*/
@Test
void testChangeLeaderDuringSnapshotInstallationToLeaderWithEnoughLog() throws Exception {
CompletableFuture<Void> sentSnapshotMetaResponseFormNode0Future = new CompletableFuture<>();
prepareClusterForInstallingSnapshotToNode2(DEFAULT_STORAGE_ENGINE, cluster -> {
// Let's hang the InstallSnapshot in the "middle" from the leader with index 0.
cluster.node(0).dropMessages(dropSnapshotMetaResponse(sentSnapshotMetaResponseFormNode0Future));
});
CompletableFuture<Void> installSnapshotSuccessfulFuture = new CompletableFuture<>();
listenForSnapshotInstalledSuccessFromLogger(1, 2, installSnapshotSuccessfulFuture);
// Return node 2.
reanimateNode(2);
// Waiting for the InstallSnapshot from node 2 to hang in the "middle".
assertThat(sentSnapshotMetaResponseFormNode0Future, willSucceedIn(1, TimeUnit.MINUTES));
// Change the leader to node 1.
transferLeadershipOnSolePartitionTo(1);
boolean replicated = waitForCondition(() -> getFromNode(2, 1) != null, 20_000);
assertTrue(replicated, "Data has not been replicated to node 2 in time");
// No snapshot must be installed.
assertFalse(installSnapshotSuccessfulFuture.isDone());
// Make sure the rebalancing is complete.
assertThat(getFromNode(2, 1), is("one"));
}
/**
* The replication mechanism must not replicate commands for which schemas are not yet available on the node
* to which replication happens (in Raft, it means that followers/learners cannot receive commands that they
* cannot execute without waiting for schemas). This method tests that snapshots bringing such commands are
* rejected, and that, when metadata catches up, the snapshot gets successfully installed.
*/
@Test
void laggingSchemasOnFollowerPreventSnapshotInstallation() throws Exception {
startAndInitCluster();
createTestTableWith3Replicas(DEFAULT_STORAGE_ENGINE);
// Prepare the scene: force node 0 to be a leader, and node 2 to be a follower.
final int leaderIndex = 0;
final int followerIndex = 2;
transferLeadershipOnSolePartitionTo(leaderIndex);
cluster.transferLeadershipTo(leaderIndex, MetastorageGroupId.INSTANCE);
// Block AppendEntries from being accepted on the follower so that the leader will have to use a snapshot.
blockIncomingAppendEntriesAt(followerIndex);
// Inhibit the MetaStorage on the follower to make snapshots not eligible for installation.
WatchListenerInhibitor listenerInhibitor = inhibitMetastorageListenersAt(followerIndex);
try {
// Add some data in a schema that is not yet available on the follower
updateTableSchemaAt(leaderIndex);
putToTableAt(leaderIndex);
CountDownLatch installationRejected = installationRejectedLatch();
CountDownLatch snapshotInstalled = snapshotInstalledLatch(followerIndex);
// Force InstallSnapshot to be used.
causeLogTruncationOnSolePartitionLeader(leaderIndex);
assertTrue(installationRejected.await(20, TimeUnit.SECONDS), "Did not see snapshot installation rejection");
assertThat("Snapshot was installed before unblocking", snapshotInstalled.getCount(), is(not(0L)));
listenerInhibitor.stopInhibit();
assertTrue(snapshotInstalled.await(20, TimeUnit.SECONDS), "Did not see a snapshot installed");
} finally {
listenerInhibitor.stopInhibit();
}
}
private void updateTableSchemaAt(int nodeIndex) {
cluster.doInSession(nodeIndex, session -> {
session.execute(null, "alter table test add column added int");
});
}
private void putToTableAt(int nodeIndex) {
KeyValueView<Tuple, Tuple> kvView = cluster.node(nodeIndex)
.tables()
.table("test")
.keyValueView();
kvView.put(null, Tuple.create().set("key", 1), Tuple.create().set("val", "one"));
}
private void blockIncomingAppendEntriesAt(int nodeIndex) {
BlockingAppendEntriesRequestProcessor blockingProcessorOnFollower = installBlockingAppendEntriesProcessor(nodeIndex);
blockingProcessorOnFollower.startBlocking();
}
private WatchListenerInhibitor inhibitMetastorageListenersAt(int nodeIndex) {
IgniteImpl nodeToInhibitMetaStorage = cluster.node(nodeIndex);
WatchListenerInhibitor listenerInhibitor = WatchListenerInhibitor.metastorageEventsInhibitor(nodeToInhibitMetaStorage);
listenerInhibitor.startInhibit();
return listenerInhibitor;
}
private CountDownLatch installationRejectedLatch() {
CountDownLatch installationRejected = new CountDownLatch(1);
copierLogInspector.addHandler(
event -> event.getMessage().getFormattedMessage().startsWith("Metadata not yet available, rejecting snapshot installation"),
installationRejected::countDown
);
return installationRejected;
}
private BlockingAppendEntriesRequestProcessor installBlockingAppendEntriesProcessor(int nodeIndex) {
RaftServer raftServer = cluster.node(nodeIndex).raftManager().server();
RpcServer<?> rpcServer = getFieldValue(raftServer, JraftServerImpl.class, "rpcServer");
Map<String, RpcProcessor<?>> processors = getFieldValue(rpcServer, IgniteRpcServer.class, "processors");
AppendEntriesRequestProcessor originalProcessor =
(AppendEntriesRequestProcessor) processors.get(AppendEntriesRequest.class.getName());
Executor appenderExecutor = getFieldValue(originalProcessor, RpcRequestProcessor.class, "executor");
RaftMessagesFactory raftMessagesFactory = getFieldValue(originalProcessor, RpcRequestProcessor.class, "msgFactory");
BlockingAppendEntriesRequestProcessor blockingProcessor = new BlockingAppendEntriesRequestProcessor(
appenderExecutor,
raftMessagesFactory,
cluster.solePartitionId().toString()
);
rpcServer.registerProcessor(blockingProcessor);
return blockingProcessor;
}
/**
* {@link AppendEntriesRequestProcessor} that, when blocking is enabled, blocks all AppendEntriesRequests of
* the given group (that is, returns EBUSY error code, which makes JRaft repeat them).
*/
private static class BlockingAppendEntriesRequestProcessor extends AppendEntriesRequestProcessor {
private final String idOfGroupToBlock;
private volatile boolean block;
public BlockingAppendEntriesRequestProcessor(Executor executor,
RaftMessagesFactory msgFactory, String idOfGroupToBlock) {
super(executor, msgFactory);
this.idOfGroupToBlock = idOfGroupToBlock;
}
@Override
public Message processRequest0(RaftServerService service, AppendEntriesRequest request, RpcRequestClosure done) {
if (block && idOfGroupToBlock.equals(request.groupId())) {
return RaftRpcFactory.DEFAULT //
.newResponse(done.getMsgFactory(), RaftError.EBUSY,
"Blocking AppendEntries on '%s'.", request.groupId());
}
return super.processRequest0(service, request, done);
}
public void startBlocking() {
block = true;
}
}
}