blob: 223e1ae39be4d21c75731d005eb48ae866f2df8d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.distributed;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.internal.raft.PeersAndLearners.fromConsistentIds;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.lang.SafeTimeReorderException;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TestReplicationGroupId;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
/**
* Replica safeTime propagation tests.
*/
@ExtendWith(ConfigurationExtension.class)
public class ReplicasSafeTimePropagationTest extends IgniteAbstractTest {
@InjectConfiguration("mock: { fsync: false }")
private RaftConfiguration raftConfiguration;
private static final int BASE_PORT = 1234;
private static final TestReplicationGroupId GROUP_ID = new TestReplicationGroupId("group_1");
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
private static final StaticNodeFinder NODE_FINDER = new StaticNodeFinder(
IntStream.range(BASE_PORT, BASE_PORT + 5)
.mapToObj(p -> new NetworkAddress("localhost", p))
.collect(Collectors.toList())
);
private final AtomicInteger port = new AtomicInteger(BASE_PORT);
private Map<String, PartialNode> cluster;
@AfterEach
public void after() throws Exception {
for (PartialNode partialNode : cluster.values()) {
try {
partialNode.stop();
} catch (NodeStoppingException ignored) {
// No-op, multiple stop.
}
}
}
private static long calculateSafeTime(ClockService clockService) {
return clockService.now().addPhysicalTime(clockService.maxClockSkewMillis()).longValue();
}
private static void sendSafeTimeSyncCommand(
RaftGroupService raftClient,
long safeTime,
boolean expectSafeTimeReorderException
) {
CompletableFuture<Object> safeTimeCommandFuture = raftClient.run(
REPLICA_MESSAGES_FACTORY
.safeTimeSyncCommand()
.safeTimeLong(safeTime)
.build()
);
if (expectSafeTimeReorderException) {
assertThat(safeTimeCommandFuture, willThrow(SafeTimeReorderException.class));
} else {
assertThat(safeTimeCommandFuture, willCompleteSuccessfully());
}
}
/**
* Test verifies that a new leader will reject a command with safeTime less than previously applied within old leader.
* <ol>
* <li>Start three nodes and a raft group with three peers.</li>
* <li>Send command with safe time X.</li>
* <li>Stop the leader - the only node that actually do safeTime watermark validation within onBeforeApply.</li>
* <li>Send command with safe time less than X to the new leader and verify that SafeTimeReorderException is thrown.</li>
* </ol>
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-21565")
public void testSafeTimeReorderingOnLeaderReElection() throws Exception {
// Start three nodes and a raft group with three peers.
{
cluster = Stream.of("node1", "node2", "node3").collect(toMap(identity(), PartialNode::new));
startCluster(cluster);
}
PartialNode someNode = cluster.values().iterator().next();
RaftGroupService raftClient = someNode.raftClient;
assertThat(raftClient.refreshLeader(), willCompleteSuccessfully());
long firstSafeTime = calculateSafeTime(someNode.clockService);
// Send command with safe time X.
sendSafeTimeSyncCommand(raftClient, firstSafeTime, false);
// Stop the leader - the only node that actually do safeTime watermark validation within onBeforeApply.
assertNotNull(raftClient.leader());
PartialNode nodeTopStop = cluster.get(raftClient.leader().consistentId());
assertNotNull(nodeTopStop);
nodeTopStop.stop();
// Select alive raft client.
Optional<PartialNode> aliveNode = cluster.values().stream().filter(node -> !node.nodeName.equals(nodeTopStop.nodeName)).findFirst();
assertTrue(aliveNode.isPresent());
RaftGroupService anotherClient = aliveNode.get().raftClient;
// Send command with safe time less than previously applied to the new leader and verify that SafeTimeReorderException is thrown.
sendSafeTimeSyncCommand(anotherClient, firstSafeTime - 1, true);
sendSafeTimeSyncCommand(anotherClient, calculateSafeTime(aliveNode.get().clockService), false);
}
private void startCluster(Map<String, PartialNode> cluster) throws Exception {
Collection<CompletableFuture<Void>> startingFutures = new ArrayList<>(cluster.size());
for (PartialNode node : cluster.values()) {
startingFutures.add(node.start());
}
CompletableFuture<Void> clusterReadyFuture = CompletableFuture.allOf(startingFutures.toArray(CompletableFuture[]::new));
assertThat(clusterReadyFuture, willCompleteSuccessfully());
}
/**
* Test verifies that a leader will reject a command with safeTime less than previously applied within leader restart.
* <ol>
* <li>Start two and a raft group with two peer.</li>
* <li>Send command with safe time X.</li>
* <li>Restart the cluster.</li>
* <li>Send command with safe time less than previously applied to the leader before the restart
* and verify that SafeTimeReorderException is thrown.</li>
* </ol>
*/
@Test
public void testSafeTimeReorderingOnLeaderRestart() throws Exception {
// Start two node and a raft group with two peer.
{
cluster = Set.of("node1", "node2").parallelStream().collect(toMap(identity(), PartialNode::new));
startCluster(cluster);
}
PartialNode someNode = cluster.values().iterator().next();
RaftGroupService raftClient = someNode.raftClient;
assertThat(raftClient.refreshLeader(), willCompleteSuccessfully());
long firstSafeTime = calculateSafeTime(someNode.clockService);
// Send command with safe time X.
sendSafeTimeSyncCommand(raftClient, firstSafeTime, false);
// Stop all nodes.
for (PartialNode node : cluster.values()
) {
node.stop();
}
// And restart.
startCluster(cluster);
// Send command with safe time less than previously applied to the leader before the restart
// and verify that SafeTimeReorderException is thrown.
sendSafeTimeSyncCommand(someNode.raftClient, firstSafeTime - 1, true);
sendSafeTimeSyncCommand(someNode.raftClient, calculateSafeTime(someNode.clockService), false);
}
private class PartialNode {
private final String nodeName;
private final ClockService clockService = new TestClockService(new HybridClockImpl());
private ClusterService clusterService;
private Loza raftManager;
private RaftGroupService raftClient;
PartialNode(String nodeName) {
this.nodeName = nodeName;
}
CompletableFuture<Void> start() throws Exception {
clusterService = ClusterServiceTestUtils.clusterService(nodeName, port.getAndIncrement(), NODE_FINDER);
assertThat(clusterService.startAsync(), willCompleteSuccessfully());
raftManager = new Loza(
clusterService,
raftConfiguration,
workDir.resolve(nodeName + "_loza"),
new HybridClockImpl(),
new RaftGroupEventsClientListener()
);
assertThat(raftManager.startAsync(), willCompleteSuccessfully());
TxManager txManagerMock = mock(TxManager.class);
return raftManager.startRaftGroupNode(
new RaftNodeId(GROUP_ID, new Peer(nodeName)),
fromConsistentIds(cluster.keySet()),
new PartitionListener(
txManagerMock,
mock(PartitionDataStorage.class),
mock(StorageUpdateHandler.class),
mock(TxStateStorage.class),
mock(PendingComparableValuesTracker.class),
mock(PendingComparableValuesTracker.class),
mock(CatalogService.class),
mock(SchemaRegistry.class),
clockService
),
RaftGroupEventsListener.noopLsnr,
RaftGroupOptions.defaults()
)
.thenApply(raftClient -> {
this.raftClient = raftClient;
return null;
});
}
void stop() throws Exception {
closeAll(
raftManager == null ? null : () -> raftManager.stopRaftNodes(GROUP_ID),
raftManager == null ? null : raftManager::beforeNodeStop,
clusterService == null ? null : clusterService::beforeNodeStop,
raftManager == null ? null : () -> assertThat(raftManager.stopAsync(), willCompleteSuccessfully()),
clusterService == null ? null : () -> assertThat(clusterService.stopAsync(), willCompleteSuccessfully())
);
}
}
}