| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ignite.raft.jraft.core; |
| |
| import com.codahale.metrics.ConsoleReporter; |
| import java.io.File; |
| import java.nio.ByteBuffer; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.Vector; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.BooleanSupplier; |
| import java.util.stream.Stream; |
| import org.apache.ignite.internal.testframework.WorkDirectory; |
| import org.apache.ignite.internal.testframework.WorkDirectoryExtension; |
| import org.apache.ignite.lang.IgniteLogger; |
| import org.apache.ignite.network.ClusterLocalConfiguration; |
| import org.apache.ignite.network.ClusterService; |
| import org.apache.ignite.network.NodeFinder; |
| import org.apache.ignite.network.StaticNodeFinder; |
| import org.apache.ignite.network.TestMessageSerializationRegistryImpl; |
| import org.apache.ignite.network.scalecube.TestScaleCubeClusterServiceFactory; |
| import org.apache.ignite.raft.jraft.Iterator; |
| import org.apache.ignite.raft.jraft.JRaftUtils; |
| import org.apache.ignite.raft.jraft.Node; |
| import org.apache.ignite.raft.jraft.NodeManager; |
| import org.apache.ignite.raft.jraft.RaftGroupService; |
| import org.apache.ignite.raft.jraft.StateMachine; |
| import org.apache.ignite.raft.jraft.Status; |
| import org.apache.ignite.raft.jraft.closure.JoinableClosure; |
| import org.apache.ignite.raft.jraft.closure.ReadIndexClosure; |
| import org.apache.ignite.raft.jraft.closure.SynchronizedClosure; |
| import org.apache.ignite.raft.jraft.closure.TaskClosure; |
| import org.apache.ignite.raft.jraft.conf.Configuration; |
| import org.apache.ignite.raft.jraft.disruptor.StripedDisruptor; |
| import org.apache.ignite.raft.jraft.entity.EnumOutter; |
| import org.apache.ignite.raft.jraft.entity.PeerId; |
| import org.apache.ignite.raft.jraft.entity.Task; |
| import org.apache.ignite.raft.jraft.entity.UserLog; |
| import org.apache.ignite.raft.jraft.error.LogIndexOutOfBoundsException; |
| import org.apache.ignite.raft.jraft.error.LogNotFoundException; |
| import org.apache.ignite.raft.jraft.error.RaftError; |
| import org.apache.ignite.raft.jraft.error.RaftException; |
| import org.apache.ignite.raft.jraft.option.BootstrapOptions; |
| import org.apache.ignite.raft.jraft.option.NodeOptions; |
| import org.apache.ignite.raft.jraft.option.RaftOptions; |
| import org.apache.ignite.raft.jraft.rpc.RpcClientEx; |
| import org.apache.ignite.raft.jraft.rpc.RpcRequests; |
| import org.apache.ignite.raft.jraft.rpc.RpcServer; |
| import org.apache.ignite.raft.jraft.rpc.TestIgniteRpcServer; |
| import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcClient; |
| import org.apache.ignite.raft.jraft.rpc.impl.IgniteRpcServer; |
| import org.apache.ignite.raft.jraft.rpc.impl.core.DefaultRaftClientService; |
| import org.apache.ignite.raft.jraft.storage.SnapshotThrottle; |
| import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl; |
| import org.apache.ignite.raft.jraft.storage.snapshot.SnapshotReader; |
| import org.apache.ignite.raft.jraft.storage.snapshot.ThroughputSnapshotThrottle; |
| import org.apache.ignite.raft.jraft.test.TestUtils; |
| import org.apache.ignite.raft.jraft.util.Bits; |
| import org.apache.ignite.raft.jraft.util.Endpoint; |
| import org.apache.ignite.raft.jraft.util.Utils; |
| import org.junit.jupiter.api.AfterAll; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeAll; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Disabled; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.api.TestInfo; |
| import org.junit.jupiter.api.extension.ExtendWith; |
| |
| import static java.util.stream.Collectors.collectingAndThen; |
| import static java.util.stream.Collectors.toList; |
| import static org.junit.jupiter.api.Assertions.assertArrayEquals; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertNotEquals; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertNotSame; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| import static org.junit.jupiter.api.Assertions.assertSame; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| import static org.junit.jupiter.api.Assertions.fail; |
| |
| /** |
| * Integration tests for raft cluster. TODO asch get rid of sleeps wherether possible IGNITE-14832 |
| */ |
| @ExtendWith(WorkDirectoryExtension.class) |
| public class ITNodeTest { |
| private static final IgniteLogger LOG = IgniteLogger.forClass(ITNodeTest.class); |
| |
| private static DumpThread dumpThread; |
| |
| private static class DumpThread extends Thread { |
| private static long DUMP_TIMEOUT_MS = 5 * 60 * 1000; |
| private volatile boolean stopped = false; |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("BusyWait") @Override |
| public void run() { |
| while (!stopped) { |
| try { |
| Thread.sleep(DUMP_TIMEOUT_MS); |
| LOG.info("Test hang too long, dump threads"); |
| TestUtils.dumpThreads(); |
| } |
| catch (InterruptedException e) { |
| // reset request, continue |
| continue; |
| } |
| } |
| } |
| } |
| |
| private String dataPath; |
| |
| private final AtomicInteger startedCounter = new AtomicInteger(0); |
| |
| private final AtomicInteger stoppedCounter = new AtomicInteger(0); |
| |
| private long testStartMs; |
| |
| private TestCluster cluster; |
| |
| private final List<RaftGroupService> services = new ArrayList<>(); |
| |
| @BeforeAll |
| public static void setupNodeTest() { |
| dumpThread = new DumpThread(); |
| dumpThread.setName("NodeTest-DumpThread"); |
| dumpThread.setDaemon(true); |
| dumpThread.start(); |
| } |
| |
| @AfterAll |
| public static void tearNodeTest() throws Exception { |
| dumpThread.stopped = true; |
| dumpThread.interrupt(); |
| dumpThread.join(100); |
| } |
| |
| @BeforeEach |
| public void setup(TestInfo testInfo, @WorkDirectory Path workDir) throws Exception { |
| LOG.info(">>>>>>>>>>>>>>> Start test method: " + testInfo.getDisplayName()); |
| |
| dataPath = workDir.toString(); |
| |
| testStartMs = Utils.monotonicMs(); |
| dumpThread.interrupt(); // reset dump timeout |
| } |
| |
| @AfterEach |
| public void teardown(TestInfo testInfo) throws Exception { |
| services.forEach(service -> { |
| try { |
| service.shutdown(); |
| } |
| catch (Exception e) { |
| LOG.error("Error while closing a service", e); |
| } |
| }); |
| |
| if (cluster != null) |
| cluster.stopAll(); |
| |
| startedCounter.set(0); |
| stoppedCounter.set(0); |
| LOG.info(">>>>>>>>>>>>>>> End test method: " + testInfo.getDisplayName() + ", cost:" |
| + (Utils.monotonicMs() - testStartMs) + " ms."); |
| } |
| |
| @Test |
| public void testInitShutdown() { |
| Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT); |
| NodeOptions nodeOptions = createNodeOptions(); |
| |
| nodeOptions.setFsm(new MockStateMachine(addr)); |
| nodeOptions.setLogUri(dataPath + File.separator + "log"); |
| nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta"); |
| nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); |
| |
| RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions); |
| |
| service.start(); |
| } |
| |
| @Test |
| public void testNodeTaskOverload() throws Exception { |
| Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT); |
| PeerId peer = new PeerId(addr, 0); |
| |
| NodeOptions nodeOptions = createNodeOptions(); |
| RaftOptions raftOptions = new RaftOptions(); |
| raftOptions.setDisruptorBufferSize(2); |
| nodeOptions.setRaftOptions(raftOptions); |
| MockStateMachine fsm = new MockStateMachine(addr); |
| nodeOptions.setFsm(fsm); |
| nodeOptions.setLogUri(dataPath + File.separator + "log"); |
| nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta"); |
| nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); |
| nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer))); |
| |
| RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions); |
| |
| final Node node = service.start(); |
| |
| assertEquals(1, node.listPeers().size()); |
| assertTrue(node.listPeers().contains(peer)); |
| |
| while (!node.isLeader()) |
| ; |
| |
| List<Task> tasks = new ArrayList<>(); |
| AtomicInteger c = new AtomicInteger(0); |
| for (int i = 0; i < 10; i++) { |
| ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes()); |
| Task task = new Task(data, new JoinableClosure(status -> { |
| System.out.println(status); |
| if (!status.isOk()) { |
| assertTrue( |
| status.getRaftError() == RaftError.EBUSY || status.getRaftError() == RaftError.EPERM); |
| } |
| c.incrementAndGet(); |
| })); |
| node.apply(task); |
| tasks.add(task); |
| } |
| |
| Task.joinAll(tasks, TimeUnit.SECONDS.toMillis(30)); |
| assertEquals(10, c.get()); |
| } |
| |
| /** |
| * Test rollback stateMachine with readIndex for issue 317: https://github.com/sofastack/sofa-jraft/issues/317 |
| */ |
| @Test |
| public void testRollbackStateMachineWithReadIndex_Issue317(TestInfo testInfo) throws Exception { |
| Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT); |
| PeerId peer = new PeerId(addr, 0); |
| |
| NodeOptions nodeOptions = createNodeOptions(); |
| CountDownLatch applyCompleteLatch = new CountDownLatch(1); |
| CountDownLatch applyLatch = new CountDownLatch(1); |
| CountDownLatch readIndexLatch = new CountDownLatch(1); |
| AtomicInteger currentValue = new AtomicInteger(-1); |
| String errorMsg = testInfo.getDisplayName(); |
| StateMachine fsm = new StateMachineAdapter() { |
| |
| @Override |
| public void onApply(Iterator iter) { |
| // Notify that the #onApply is preparing to go. |
| readIndexLatch.countDown(); |
| // Wait for submitting a read-index request |
| try { |
| applyLatch.await(); |
| } |
| catch (InterruptedException e) { |
| fail(); |
| } |
| int i = 0; |
| while (iter.hasNext()) { |
| byte[] data = iter.next().array(); |
| int v = Bits.getInt(data, 0); |
| assertEquals(i++, v); |
| currentValue.set(v); |
| } |
| if (i > 0) { |
| // rollback |
| currentValue.set(i - 1); |
| iter.setErrorAndRollback(1, new Status(-1, errorMsg)); |
| applyCompleteLatch.countDown(); |
| } |
| } |
| }; |
| nodeOptions.setFsm(fsm); |
| nodeOptions.setLogUri(dataPath + File.separator + "log"); |
| nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta"); |
| nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); |
| nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer))); |
| |
| RaftGroupService service = createService("unittest", peer, nodeOptions); |
| |
| final Node node = service.start(); |
| |
| assertEquals(1, node.listPeers().size()); |
| assertTrue(node.listPeers().contains(peer)); |
| |
| while (!node.isLeader()) |
| ; |
| |
| int n = 5; |
| { |
| // apply tasks |
| for (int i = 0; i < n; i++) { |
| byte[] b = new byte[4]; |
| Bits.putInt(b, 0, i); |
| node.apply(new Task(ByteBuffer.wrap(b), null)); |
| } |
| } |
| |
| AtomicInteger readIndexSuccesses = new AtomicInteger(0); |
| { |
| // Submit a read-index, wait for #onApply |
| readIndexLatch.await(); |
| CountDownLatch latch = new CountDownLatch(1); |
| node.readIndex(null, new ReadIndexClosure() { |
| |
| @Override |
| public void run(Status status, long index, byte[] reqCtx) { |
| try { |
| if (status.isOk()) |
| readIndexSuccesses.incrementAndGet(); |
| else { |
| assertTrue( |
| status.getErrorMsg().contains(errorMsg) || status.getRaftError() == RaftError.ETIMEDOUT |
| || status.getErrorMsg().contains("Invalid state for readIndex: STATE_ERROR"), |
| "Unexpected status: " + status); |
| } |
| } |
| finally { |
| latch.countDown(); |
| } |
| } |
| }); |
| // We have already submit a read-index request, |
| // notify #onApply can go right now |
| applyLatch.countDown(); |
| |
| // The state machine is in error state, the node should step down. |
| waitForCondition(() -> !node.isLeader(), 5_000); |
| |
| latch.await(); |
| applyCompleteLatch.await(); |
| } |
| // No read-index request succeed. |
| assertEquals(0, readIndexSuccesses.get()); |
| assertTrue(n - 1 >= currentValue.get()); |
| } |
| |
| @Test |
| public void testSingleNode() throws Exception { |
| Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT); |
| PeerId peer = new PeerId(addr, 0); |
| |
| NodeOptions nodeOptions = createNodeOptions(); |
| MockStateMachine fsm = new MockStateMachine(addr); |
| nodeOptions.setFsm(fsm); |
| nodeOptions.setLogUri(dataPath + File.separator + "log"); |
| nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta"); |
| nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); |
| nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer))); |
| RaftGroupService service = createService("unittest", peer, nodeOptions); |
| |
| Node node = service.start(); |
| |
| assertEquals(1, node.listPeers().size()); |
| assertTrue(node.listPeers().contains(peer)); |
| |
| while (!node.isLeader()) |
| ; |
| |
| sendTestTaskAndWait(node); |
| assertEquals(10, fsm.getLogs().size()); |
| int i = 0; |
| for (ByteBuffer data : fsm.getLogs()) |
| assertEquals("hello" + i++, new String(data.array())); |
| } |
| |
| @Test |
| public void testNoLeader() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| |
| assertTrue(cluster.start(peers.get(0).getEndpoint())); |
| |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(1, followers.size()); |
| |
| Node follower = followers.get(0); |
| sendTestTaskAndWait(follower, 0, RaftError.EPERM); |
| |
| // adds a peer3 |
| PeerId peer3 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3); |
| CountDownLatch latch = new CountDownLatch(1); |
| follower.addPeer(peer3, new ExpectClosure(RaftError.EPERM, latch)); |
| waitLatch(latch); |
| |
| // remove the peer0 |
| PeerId peer0 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT); |
| latch = new CountDownLatch(1); |
| follower.removePeer(peer0, new ExpectClosure(RaftError.EPERM, latch)); |
| waitLatch(latch); |
| } |
| |
| @Test |
| public void testTripleNodesWithReplicatorStateListener() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| //final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers); |
| |
| UserReplicatorStateListener listener1 = new UserReplicatorStateListener(); |
| UserReplicatorStateListener listener2 = new UserReplicatorStateListener(); |
| |
| cluster = new TestCluster("unitest", dataPath, peers, new LinkedHashSet<>(), 300, |
| opts -> opts.setReplicationStateListeners(List.of(listener1, listener2))); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| // elect leader |
| cluster.waitLeader(); |
| cluster.ensureLeader(cluster.getLeader()); |
| |
| for (Node follower : cluster.getFollowers()) |
| waitForCondition(() -> follower.getLeaderId() != null, 5_000); |
| |
| assertEquals(4, startedCounter.get()); |
| assertEquals(2, cluster.getLeader().getReplicatorStateListeners().size()); |
| assertEquals(2, cluster.getFollowers().get(0).getReplicatorStateListeners().size()); |
| assertEquals(2, cluster.getFollowers().get(1).getReplicatorStateListeners().size()); |
| |
| for (Node node : cluster.getNodes()) |
| node.removeReplicatorStateListener(listener1); |
| assertEquals(1, cluster.getLeader().getReplicatorStateListeners().size()); |
| assertEquals(1, cluster.getFollowers().get(0).getReplicatorStateListeners().size()); |
| assertEquals(1, cluster.getFollowers().get(1).getReplicatorStateListeners().size()); |
| } |
| |
| // TODO asch Broken then using volatile log. A follower with empty log can become a leader IGNITE-14832. |
| @Test |
| @Disabled |
| public void testVoteTimedoutStepDown() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| assertEquals(3, leader.listPeers().size()); |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| // Stop all followers |
| List<Node> followers = cluster.getFollowers(); |
| assertFalse(followers.isEmpty()); |
| for (Node node : followers) |
| assertTrue(cluster.stop(node.getNodeId().getPeerId().getEndpoint())); |
| |
| // Wait leader to step down. |
| while (leader.isLeader()) |
| Thread.sleep(10); |
| |
| // old leader try to elect self, it should fail. |
| ((NodeImpl) leader).tryElectSelf(); |
| Thread.sleep(1500); |
| |
| assertNull(cluster.getLeader()); |
| |
| // Start followers |
| for (Node node : followers) |
| assertTrue(cluster.start(node.getNodeId().getPeerId().getEndpoint())); |
| |
| cluster.ensureSame(); |
| } |
| |
| class UserReplicatorStateListener implements Replicator.ReplicatorStateListener { |
| /** {@inheritDoc} */ |
| @Override |
| public void onCreated(PeerId peer) { |
| int val = startedCounter.incrementAndGet(); |
| |
| LOG.info("Replicator has been created {} {}", peer, val); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void onError(PeerId peer, Status status) { |
| LOG.info("Replicator has errors {} {}", peer, status); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void onDestroyed(PeerId peer) { |
| int val = stoppedCounter.incrementAndGet(); |
| |
| LOG.info("Replicator has been destroyed {} {}", peer, val); |
| } |
| } |
| |
| @Test |
| public void testLeaderTransferWithReplicatorStateListener() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers, new LinkedHashSet<>(), 300, |
| opts -> opts.setReplicationStateListeners(List.of(new UserReplicatorStateListener()))); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| cluster.waitLeader(); |
| Node leader = cluster.getLeader(); |
| cluster.ensureLeader(leader); |
| |
| sendTestTaskAndWait(leader); |
| Thread.sleep(100); |
| List<Node> followers = cluster.getFollowers(); |
| |
| assertTrue(waitForCondition(() -> startedCounter.get() == 2, 5_000), startedCounter.get() + ""); |
| |
| PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy(); |
| LOG.info("Transfer leadership from {} to {}", leader, targetPeer); |
| assertTrue(leader.transferLeadershipTo(targetPeer).isOk()); |
| Thread.sleep(1000); |
| cluster.waitLeader(); |
| |
| assertTrue(waitForCondition(() -> startedCounter.get() == 4, 5_000), startedCounter.get() + ""); |
| |
| for (Node node : cluster.getNodes()) |
| node.clearReplicatorStateListeners(); |
| assertEquals(0, cluster.getLeader().getReplicatorStateListeners().size()); |
| assertEquals(0, cluster.getFollowers().get(0).getReplicatorStateListeners().size()); |
| assertEquals(0, cluster.getFollowers().get(1).getReplicatorStateListeners().size()); |
| } |
| |
| @Test |
| public void testTripleNodes() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| assertEquals(3, leader.listPeers().size()); |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| { |
| ByteBuffer data = ByteBuffer.wrap("no closure".getBytes()); |
| Task task = new Task(data, null); |
| leader.apply(task); |
| } |
| |
| { |
| // task with TaskClosure |
| ByteBuffer data = ByteBuffer.wrap("task closure".getBytes()); |
| Vector<String> cbs = new Vector<>(); |
| CountDownLatch latch = new CountDownLatch(1); |
| Task task = new Task(data, new TaskClosure() { |
| |
| @Override |
| public void run(Status status) { |
| cbs.add("apply"); |
| latch.countDown(); |
| } |
| |
| @Override |
| public void onCommitted() { |
| cbs.add("commit"); |
| |
| } |
| }); |
| leader.apply(task); |
| latch.await(); |
| assertEquals(2, cbs.size()); |
| assertEquals("commit", cbs.get(0)); |
| assertEquals("apply", cbs.get(1)); |
| } |
| |
| cluster.ensureSame(); |
| assertEquals(2, cluster.getFollowers().size()); |
| } |
| |
| @Test |
| public void testSingleNodeWithLearner() throws Exception { |
| Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT); |
| PeerId peer = new PeerId(addr, 0); |
| |
| Endpoint learnerAddr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 1); |
| PeerId learnerPeer = new PeerId(learnerAddr, 0); |
| |
| final int cnt = 10; |
| MockStateMachine learnerFsm; |
| RaftGroupService learnerServer; |
| { |
| // Start learner |
| NodeOptions nodeOptions = createNodeOptions(); |
| learnerFsm = new MockStateMachine(learnerAddr); |
| nodeOptions.setFsm(learnerFsm); |
| nodeOptions.setLogUri(dataPath + File.separator + "log1"); |
| nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta1"); |
| nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot1"); |
| nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer), Collections |
| .singletonList(learnerPeer))); |
| |
| learnerServer = createService("unittest", new PeerId(learnerAddr, 0), nodeOptions); |
| learnerServer.start(); |
| } |
| |
| { |
| // Start leader |
| NodeOptions nodeOptions = createNodeOptions(); |
| MockStateMachine fsm = new MockStateMachine(addr); |
| nodeOptions.setFsm(fsm); |
| nodeOptions.setLogUri(dataPath + File.separator + "log"); |
| nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta"); |
| nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); |
| nodeOptions.setInitialConf(new Configuration(Collections.singletonList(peer), Collections |
| .singletonList(learnerPeer))); |
| |
| RaftGroupService server = createService("unittest", new PeerId(addr, 0), nodeOptions); |
| Node node = server.start(); |
| |
| assertEquals(1, node.listPeers().size()); |
| assertTrue(node.listPeers().contains(peer)); |
| assertTrue(waitForCondition(() -> node.isLeader(), 1_000)); |
| |
| sendTestTaskAndWait(node, cnt); |
| assertEquals(cnt, fsm.getLogs().size()); |
| int i = 0; |
| for (ByteBuffer data : fsm.getLogs()) |
| assertEquals("hello" + i++, new String(data.array())); |
| Thread.sleep(1000); //wait for entries to be replicated to learner. |
| server.shutdown(); |
| } |
| { |
| // assert learner fsm |
| assertEquals(cnt, learnerFsm.getLogs().size()); |
| int i = 0; |
| for (ByteBuffer data : learnerFsm.getLogs()) |
| assertEquals("hello" + i++, new String(data.array())); |
| learnerServer.shutdown(); |
| } |
| } |
| |
| @Test |
| public void testResetLearners() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| LinkedHashSet<PeerId> learners = new LinkedHashSet<>(); |
| |
| for (int i = 0; i < 3; i++) |
| learners.add(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3 + i)); |
| |
| cluster = new TestCluster("unittest", dataPath, peers, learners, 300); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| for (PeerId peer : learners) |
| assertTrue(cluster.startLearner(peer)); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| Node leader = cluster.getLeader(); |
| cluster.ensureLeader(leader); |
| |
| waitForCondition(() -> leader.listAlivePeers().size() == 3, 5_000); |
| waitForCondition(() -> leader.listAliveLearners().size() == 3, 5_000); |
| |
| sendTestTaskAndWait(leader); |
| List<MockStateMachine> fsms = cluster.getFsms(); |
| assertEquals(6, fsms.size()); |
| cluster.ensureSame(); |
| |
| { |
| // Reset learners to 2 nodes |
| PeerId learnerPeer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3); |
| learners.remove(learnerPeer); |
| assertEquals(2, learners.size()); |
| |
| SynchronizedClosure done = new SynchronizedClosure(); |
| leader.resetLearners(new ArrayList<>(learners), done); |
| assertTrue(done.await().isOk()); |
| assertEquals(2, leader.listAliveLearners().size()); |
| assertEquals(2, leader.listLearners().size()); |
| sendTestTaskAndWait(leader); |
| Thread.sleep(500); |
| |
| assertEquals(6, fsms.size()); |
| |
| MockStateMachine fsm = fsms.remove(3); // get the removed learner's fsm |
| assertEquals(fsm.getAddress(), learnerPeer.getEndpoint()); |
| // Ensure no more logs replicated to the removed learner. |
| assertTrue(cluster.getLeaderFsm().getLogs().size() > fsm.getLogs().size()); |
| assertEquals(cluster.getLeaderFsm().getLogs().size(), 2 * fsm.getLogs().size()); |
| } |
| { |
| // remove another learner |
| PeerId learnerPeer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 4); |
| SynchronizedClosure done = new SynchronizedClosure(); |
| leader.removeLearners(Arrays.asList(learnerPeer), done); |
| assertTrue(done.await().isOk()); |
| |
| sendTestTaskAndWait(leader); |
| Thread.sleep(500); |
| MockStateMachine fsm = fsms.remove(3); // get the removed learner's fsm |
| assertEquals(fsm.getAddress(), learnerPeer.getEndpoint()); |
| // Ensure no more logs replicated to the removed learner. |
| assertTrue(cluster.getLeaderFsm().getLogs().size() > fsm.getLogs().size()); |
| assertEquals(cluster.getLeaderFsm().getLogs().size(), fsm.getLogs().size() / 2 * 3); |
| } |
| |
| assertEquals(3, leader.listAlivePeers().size()); |
| assertEquals(1, leader.listAliveLearners().size()); |
| assertEquals(1, leader.listLearners().size()); |
| } |
| |
| @Test |
| public void testTripleNodesWithStaticLearners() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| LinkedHashSet<PeerId> learners = new LinkedHashSet<>(); |
| PeerId learnerPeer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3); |
| learners.add(learnerPeer); |
| cluster.setLearners(learners); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| // elect leader |
| cluster.waitLeader(); |
| Node leader = cluster.getLeader(); |
| cluster.ensureLeader(leader); |
| |
| assertEquals(3, leader.listPeers().size()); |
| assertEquals(1, leader.listLearners().size()); |
| assertTrue(leader.listLearners().contains(learnerPeer)); |
| assertTrue(leader.listAliveLearners().isEmpty()); |
| |
| // start learner after cluster setup. |
| assertTrue(cluster.start(learnerPeer.getEndpoint())); |
| |
| Thread.sleep(1000); |
| |
| assertEquals(3, leader.listPeers().size()); |
| assertEquals(1, leader.listLearners().size()); |
| assertEquals(1, leader.listAliveLearners().size()); |
| |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| cluster.ensureSame(); |
| assertEquals(4, cluster.getFsms().size()); |
| } |
| |
| @Test |
| public void testTripleNodesWithLearners() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| assertEquals(3, leader.listPeers().size()); |
| assertTrue(leader.listLearners().isEmpty()); |
| assertTrue(leader.listAliveLearners().isEmpty()); |
| |
| { |
| // Adds a learner |
| SynchronizedClosure done = new SynchronizedClosure(); |
| PeerId learnerPeer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3); |
| // Start learner |
| assertTrue(cluster.startLearner(learnerPeer)); |
| leader.addLearners(Arrays.asList(learnerPeer), done); |
| assertTrue(done.await().isOk()); |
| assertEquals(1, leader.listAliveLearners().size()); |
| assertEquals(1, leader.listLearners().size()); |
| } |
| |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| { |
| ByteBuffer data = ByteBuffer.wrap("no closure".getBytes()); |
| Task task = new Task(data, null); |
| leader.apply(task); |
| } |
| |
| { |
| // task with TaskClosure |
| ByteBuffer data = ByteBuffer.wrap("task closure".getBytes()); |
| Vector<String> cbs = new Vector<>(); |
| CountDownLatch latch = new CountDownLatch(1); |
| Task task = new Task(data, new TaskClosure() { |
| |
| @Override |
| public void run(Status status) { |
| cbs.add("apply"); |
| latch.countDown(); |
| } |
| |
| @Override |
| public void onCommitted() { |
| cbs.add("commit"); |
| |
| } |
| }); |
| leader.apply(task); |
| latch.await(); |
| assertEquals(2, cbs.size()); |
| assertEquals("commit", cbs.get(0)); |
| assertEquals("apply", cbs.get(1)); |
| } |
| |
| assertEquals(4, cluster.getFsms().size()); |
| assertEquals(2, cluster.getFollowers().size()); |
| assertEquals(1, cluster.getLearners().size()); |
| cluster.ensureSame(); |
| |
| { |
| // Adds another learner |
| SynchronizedClosure done = new SynchronizedClosure(); |
| PeerId learnerPeer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 4); |
| // Start learner |
| assertTrue(cluster.startLearner(learnerPeer)); |
| leader.addLearners(Arrays.asList(learnerPeer), done); |
| assertTrue(done.await().isOk()); |
| assertEquals(2, leader.listAliveLearners().size()); |
| assertEquals(2, leader.listLearners().size()); |
| cluster.ensureSame(); |
| } |
| { |
| // stop two followers |
| for (Node follower : cluster.getFollowers()) |
| assertTrue(cluster.stop(follower.getNodeId().getPeerId().getEndpoint())); |
| // send a new task |
| ByteBuffer data = ByteBuffer.wrap("task closure".getBytes()); |
| SynchronizedClosure done = new SynchronizedClosure(); |
| leader.apply(new Task(data, done)); |
| // should fail |
| assertFalse(done.await().isOk()); |
| assertEquals(RaftError.EPERM, done.getStatus().getRaftError()); |
| // One peer with two learners. |
| assertEquals(3, cluster.getFsms().size()); |
| } |
| } |
| |
| @Test |
| public void testNodesWithPriorityElection() throws Exception { |
| List<Integer> priorities = new ArrayList<>(); |
| priorities.add(100); |
| priorities.add(40); |
| priorities.add(40); |
| |
| List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), peer.getPriority())); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| assertEquals(3, leader.listPeers().size()); |
| assertEquals(100, leader.getNodeTargetPriority()); |
| assertEquals(100, leader.getLeaderId().getPriority()); |
| assertEquals(2, cluster.getFollowers().size()); |
| } |
| |
| @Test |
| public void testNodesWithPartPriorityElection() throws Exception { |
| List<Integer> priorities = new ArrayList<>(); |
| priorities.add(100); |
| priorities.add(40); |
| priorities.add(-1); |
| |
| List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), peer.getPriority())); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| assertEquals(3, leader.listPeers().size()); |
| assertEquals(2, cluster.getFollowers().size()); |
| } |
| |
| @Test |
| public void testNodesWithSpecialPriorityElection() throws Exception { |
| |
| List<Integer> priorities = new ArrayList<>(); |
| priorities.add(0); |
| priorities.add(0); |
| priorities.add(-1); |
| |
| List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), peer.getPriority())); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| assertEquals(3, leader.listPeers().size()); |
| assertEquals(2, cluster.getFollowers().size()); |
| } |
| |
| @Test |
| public void testNodesWithZeroValPriorityElection() throws Exception { |
| |
| List<Integer> priorities = new ArrayList<>(); |
| priorities.add(50); |
| priorities.add(0); |
| priorities.add(0); |
| |
| List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), peer.getPriority())); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| assertEquals(3, leader.listPeers().size()); |
| assertEquals(2, cluster.getFollowers().size()); |
| assertEquals(50, leader.getNodeTargetPriority()); |
| assertEquals(50, leader.getLeaderId().getPriority()); |
| } |
| |
| @Test |
| public void testNoLeaderWithZeroValPriorityElection() throws Exception { |
| List<Integer> priorities = new ArrayList<>(); |
| priorities.add(0); |
| priorities.add(0); |
| priorities.add(0); |
| |
| List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), peer.getPriority())); |
| |
| Thread.sleep(200); |
| |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(3, followers.size()); |
| |
| for (Node follower : followers) |
| assertEquals(0, follower.getNodeId().getPeerId().getPriority()); |
| } |
| |
| @Test |
| public void testLeaderStopAndReElectWithPriority() throws Exception { |
| List<Integer> priorities = new ArrayList<>(); |
| priorities.add(100); |
| priorities.add(60); |
| priorities.add(10); |
| |
| List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), peer.getPriority())); |
| |
| cluster.waitLeader(); |
| Node leader = cluster.getLeader(); |
| cluster.ensureLeader(leader); |
| |
| assertNotNull(leader); |
| assertEquals(100, leader.getNodeId().getPeerId().getPriority()); |
| assertEquals(100, leader.getNodeTargetPriority()); |
| |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| // wait for all update received, before election of new leader |
| cluster.ensureSame(); |
| |
| // stop leader |
| assertTrue(cluster.stop(leader.getNodeId().getPeerId().getEndpoint())); |
| |
| // elect new leader |
| cluster.waitLeader(); |
| leader = cluster.getLeader(); |
| |
| assertNotNull(leader); |
| |
| // nodes with the same log size will elect leader only by priority |
| assertEquals(60, leader.getNodeId().getPeerId().getPriority()); |
| assertEquals(100, leader.getNodeTargetPriority()); |
| } |
| |
| @Test |
| public void testRemoveLeaderWithPriority() throws Exception { |
| List<Integer> priorities = new ArrayList<>(); |
| priorities.add(100); |
| priorities.add(60); |
| priorities.add(10); |
| |
| List<PeerId> peers = TestUtils.generatePriorityPeers(3, priorities); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), peer.getPriority())); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| assertEquals(100, leader.getNodeTargetPriority()); |
| assertEquals(100, leader.getNodeId().getPeerId().getPriority()); |
| |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(2, followers.size()); |
| |
| PeerId oldLeader = leader.getNodeId().getPeerId().copy(); |
| Endpoint oldLeaderAddr = oldLeader.getEndpoint(); |
| |
| // remove old leader |
| LOG.info("Remove old leader {}", oldLeader); |
| CountDownLatch latch = new CountDownLatch(1); |
| leader.removePeer(oldLeader, new ExpectClosure(latch)); |
| waitLatch(latch); |
| assertEquals(60, leader.getNodeTargetPriority()); |
| |
| // stop and clean old leader |
| LOG.info("Stop and clean old leader {}", oldLeader); |
| assertTrue(cluster.stop(oldLeaderAddr)); |
| cluster.clean(oldLeaderAddr); |
| |
| // elect new leader |
| cluster.waitLeader(); |
| leader = cluster.getLeader(); |
| LOG.info("New leader is {}", leader); |
| assertNotNull(leader); |
| assertNotSame(leader, oldLeader); |
| } |
| |
| @Test |
| public void testChecksum() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| // start with checksum validation |
| { |
| TestCluster cluster = new TestCluster("unittest", dataPath, peers); |
| try { |
| RaftOptions raftOptions = new RaftOptions(); |
| raftOptions.setEnableLogEntryChecksum(true); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), false, 300, true, null, raftOptions)); |
| |
| cluster.waitLeader(); |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| assertEquals(3, leader.listPeers().size()); |
| sendTestTaskAndWait(leader); |
| cluster.ensureSame(); |
| } |
| finally { |
| cluster.stopAll(); |
| } |
| } |
| |
| // restart with peer3 enable checksum validation |
| { |
| TestCluster cluster = new TestCluster("unittest", dataPath, peers); |
| try { |
| RaftOptions raftOptions = new RaftOptions(); |
| raftOptions.setEnableLogEntryChecksum(false); |
| for (PeerId peer : peers) { |
| if (peer.equals(peers.get(2))) { |
| raftOptions = new RaftOptions(); |
| raftOptions.setEnableLogEntryChecksum(true); |
| } |
| assertTrue(cluster.start(peer.getEndpoint(), false, 300, true, null, raftOptions)); |
| } |
| |
| cluster.waitLeader(); |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| assertEquals(3, leader.listPeers().size()); |
| sendTestTaskAndWait(leader); |
| cluster.ensureSame(); |
| } |
| finally { |
| cluster.stopAll(); |
| } |
| } |
| |
| // restart with no checksum validation |
| { |
| TestCluster cluster = new TestCluster("unittest", dataPath, peers); |
| try { |
| RaftOptions raftOptions = new RaftOptions(); |
| raftOptions.setEnableLogEntryChecksum(false); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), false, 300, true, null, raftOptions)); |
| |
| cluster.waitLeader(); |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| assertEquals(3, leader.listPeers().size()); |
| sendTestTaskAndWait(leader); |
| cluster.ensureSame(); |
| } |
| finally { |
| cluster.stopAll(); |
| } |
| } |
| |
| // restart with all peers enable checksum validation |
| { |
| TestCluster cluster = new TestCluster("unittest", dataPath, peers); |
| try { |
| RaftOptions raftOptions = new RaftOptions(); |
| raftOptions.setEnableLogEntryChecksum(true); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), false, 300, true, null, raftOptions)); |
| |
| cluster.waitLeader(); |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| assertEquals(3, leader.listPeers().size()); |
| sendTestTaskAndWait(leader); |
| cluster.ensureSame(); |
| } |
| finally { |
| cluster.stopAll(); |
| } |
| } |
| |
| } |
| |
| @Test |
| public void testReadIndex() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), false, 300, true)); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| |
| assertEquals(3, leader.listPeers().size()); |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| // first call will fail-fast when no connection |
| if (!assertReadIndex(leader, 11)) |
| assertTrue(assertReadIndex(leader, 11)); |
| |
| // read from follower |
| for (Node follower : cluster.getFollowers()) { |
| assertNotNull(follower); |
| |
| assertTrue(waitForCondition(() -> leader.getNodeId().getPeerId().equals(follower.getLeaderId()), 5_000)); |
| |
| assertReadIndex(follower, 11); |
| } |
| |
| // read with null request context |
| CountDownLatch latch = new CountDownLatch(1); |
| leader.readIndex(null, new ReadIndexClosure() { |
| |
| @Override |
| public void run(Status status, long index, byte[] reqCtx) { |
| assertNull(reqCtx); |
| assertTrue(status.isOk()); |
| latch.countDown(); |
| } |
| }); |
| latch.await(); |
| } |
| |
| @Test // TODO asch do we need read index timeout ? https://issues.apache.org/jira/browse/IGNITE-14832 |
| public void testReadIndexTimeout() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), false, 300, true)); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| |
| assertEquals(3, leader.listPeers().size()); |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| // first call will fail-fast when no connection |
| if (!assertReadIndex(leader, 11)) |
| assertTrue(assertReadIndex(leader, 11)); |
| |
| // read from follower |
| for (Node follower : cluster.getFollowers()) { |
| assertNotNull(follower); |
| |
| assertTrue(waitForCondition(() -> leader.getNodeId().getPeerId().equals(follower.getLeaderId()), 5_000)); |
| |
| assertReadIndex(follower, 11); |
| } |
| |
| // read with null request context |
| CountDownLatch latch = new CountDownLatch(1); |
| long start = System.currentTimeMillis(); |
| leader.readIndex(null, new ReadIndexClosure() { |
| |
| @Override |
| public void run(Status status, long index, byte[] reqCtx) { |
| assertNull(reqCtx); |
| if (status.isOk()) |
| System.err.println("Read-index so fast: " + (System.currentTimeMillis() - start) + "ms"); |
| else { |
| assertEquals(new Status(RaftError.ETIMEDOUT, "read-index request timeout"), status); |
| assertEquals(-1, index); |
| } |
| latch.countDown(); |
| } |
| }); |
| latch.await(); |
| } |
| |
| @Test |
| public void testReadIndexFromLearner() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), false, 300, true)); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| assertEquals(3, leader.listPeers().size()); |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| { |
| // Adds a learner |
| SynchronizedClosure done = new SynchronizedClosure(); |
| PeerId learnerPeer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3); |
| // Start learner |
| assertTrue(cluster.startLearner(learnerPeer)); |
| leader.addLearners(Arrays.asList(learnerPeer), done); |
| assertTrue(done.await().isOk()); |
| assertEquals(1, leader.listAliveLearners().size()); |
| assertEquals(1, leader.listLearners().size()); |
| } |
| |
| Thread.sleep(100); |
| // read from learner |
| Node learner = cluster.getNodes().get(3); |
| assertNotNull(leader); |
| assertReadIndex(learner, 12); |
| assertReadIndex(learner, 12); |
| } |
| |
| @Test |
| public void testReadIndexChaos() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), false, 300, true)); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| assertEquals(3, leader.listPeers().size()); |
| |
| CountDownLatch latch = new CountDownLatch(10); |
| for (int i = 0; i < 10; i++) { |
| new Thread() { |
| @Override |
| public void run() { |
| try { |
| for (int i = 0; i < 100; i++) { |
| try { |
| sendTestTaskAndWait(leader); |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| readIndexRandom(cluster); |
| } |
| } |
| finally { |
| latch.countDown(); |
| } |
| } |
| |
| private void readIndexRandom(TestCluster cluster) { |
| CountDownLatch readLatch = new CountDownLatch(1); |
| byte[] requestContext = TestUtils.getRandomBytes(); |
| cluster.getNodes().get(ThreadLocalRandom.current().nextInt(3)) |
| .readIndex(requestContext, new ReadIndexClosure() { |
| |
| @Override |
| public void run(Status status, long index, byte[] reqCtx) { |
| if (status.isOk()) { |
| assertTrue(status.isOk(), status.toString()); |
| assertTrue(index > 0); |
| assertArrayEquals(requestContext, reqCtx); |
| } |
| readLatch.countDown(); |
| } |
| }); |
| try { |
| readLatch.await(); |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| }.start(); |
| } |
| |
| latch.await(); |
| |
| cluster.ensureSame(); |
| |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(10000, fsm.getLogs().size()); |
| } |
| |
| @Test |
| public void testNodeMetrics() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), false, 300, true)); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| assertEquals(3, leader.listPeers().size()); |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| { |
| ByteBuffer data = ByteBuffer.wrap("no closure".getBytes()); |
| Task task = new Task(data, null); |
| leader.apply(task); |
| } |
| |
| cluster.ensureSame(); |
| for (Node node : cluster.getNodes()) { |
| System.out.println("-------------" + node.getNodeId() + "-------------"); |
| ConsoleReporter reporter = ConsoleReporter.forRegistry(node.getNodeMetrics().getMetricRegistry()) |
| .build(); |
| reporter.report(); |
| reporter.close(); |
| System.out.println(); |
| } |
| // TODO check http status https://issues.apache.org/jira/browse/IGNITE-14832 |
| assertEquals(2, cluster.getFollowers().size()); |
| } |
| |
| @Test |
| public void testLeaderFail() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| LOG.info("Current leader is {}", leader.getLeaderId()); |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| List<Node> followers = cluster.getFollowers(); |
| |
| for (Node follower : followers) { |
| NodeImpl follower0 = (NodeImpl) follower; |
| DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService(); |
| RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient(); |
| rpcClientEx.blockMessages((msg, nodeId) -> { |
| if (msg instanceof RpcRequests.RequestVoteRequest) { |
| RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg; |
| |
| return !msg0.preVote(); |
| } |
| |
| return false; |
| }); |
| } |
| |
| // stop leader |
| LOG.warn("Stop leader {}", leader.getNodeId().getPeerId()); |
| PeerId oldLeader = leader.getNodeId().getPeerId(); |
| assertTrue(cluster.stop(leader.getNodeId().getPeerId().getEndpoint())); |
| |
| // apply something when follower |
| //final List<Node> followers = cluster.getFollowers(); |
| assertFalse(followers.isEmpty()); |
| sendTestTaskAndWait("follower apply ", followers.get(0), -1); |
| |
| for (Node follower : followers) { |
| NodeImpl follower0 = (NodeImpl) follower; |
| DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService(); |
| RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient(); |
| rpcClientEx.stopBlock(); |
| } |
| |
| // elect new leader |
| cluster.waitLeader(); |
| leader = cluster.getLeader(); |
| LOG.info("Elect new leader is {}", leader.getLeaderId()); |
| // apply tasks to new leader |
| CountDownLatch latch = new CountDownLatch(10); |
| for (int i = 10; i < 20; i++) { |
| ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes()); |
| Task task = new Task(data, new ExpectClosure(latch)); |
| leader.apply(task); |
| } |
| waitLatch(latch); |
| |
| // restart old leader |
| LOG.info("restart old leader {}", oldLeader); |
| assertTrue(cluster.start(oldLeader.getEndpoint())); |
| // apply something |
| latch = new CountDownLatch(10); |
| for (int i = 20; i < 30; i++) { |
| ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes()); |
| Task task = new Task(data, new ExpectClosure(latch)); |
| leader.apply(task); |
| } |
| waitLatch(latch); |
| |
| // stop and clean old leader |
| cluster.stop(oldLeader.getEndpoint()); |
| cluster.clean(oldLeader.getEndpoint()); |
| |
| // restart old leader |
| LOG.info("restart old leader {}", oldLeader); |
| assertTrue(cluster.start(oldLeader.getEndpoint())); |
| cluster.ensureSame(); |
| |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(30, fsm.getLogs().size()); |
| } |
| |
| @Test |
| public void testJoinNodes() throws Exception { |
| PeerId peer0 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT); |
| PeerId peer1 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 1); |
| PeerId peer2 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 2); |
| PeerId peer3 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + 3); |
| |
| ArrayList<PeerId> peers = new ArrayList<>(); |
| peers.add(peer0); |
| |
| // start single cluster |
| cluster = new TestCluster("unittest", dataPath, peers); |
| assertTrue(cluster.start(peer0.getEndpoint())); |
| |
| cluster.waitLeader(); |
| |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| assertEquals(leader.getNodeId().getPeerId(), peer0); |
| sendTestTaskAndWait(leader); |
| |
| // start peer1 |
| assertTrue(cluster.start(peer1.getEndpoint(), false, 300)); |
| |
| // add peer1 |
| CountDownLatch latch = new CountDownLatch(1); |
| peers.add(peer1); |
| leader.addPeer(peer1, new ExpectClosure(latch)); |
| waitLatch(latch); |
| |
| cluster.ensureSame(); |
| assertEquals(2, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(10, fsm.getLogs().size()); |
| |
| // add peer2 but not start |
| peers.add(peer2); |
| latch = new CountDownLatch(1); |
| leader.addPeer(peer2, new ExpectClosure(RaftError.ECATCHUP, latch)); |
| waitLatch(latch); |
| |
| // start peer2 after 2 seconds |
| Thread.sleep(2000); |
| assertTrue(cluster.start(peer2.getEndpoint(), false, 300)); |
| |
| // re-add peer2 |
| latch = new CountDownLatch(2); |
| leader.addPeer(peer2, new ExpectClosure(latch)); |
| // concurrent configuration change |
| leader.addPeer(peer3, new ExpectClosure(RaftError.EBUSY, latch)); |
| waitLatch(latch); |
| |
| // re-add peer2 directly |
| |
| try { |
| leader.addPeer(peer2, new ExpectClosure(latch)); |
| fail(); |
| } |
| catch (IllegalArgumentException e) { |
| assertEquals("Peer already exists in current configuration", e.getMessage()); |
| } |
| |
| cluster.ensureSame(); |
| assertEquals(3, cluster.getFsms().size()); |
| assertEquals(2, cluster.getFollowers().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(10, fsm.getLogs().size()); |
| } |
| |
| @Test |
| public void testRemoveFollower() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| cluster.ensureSame(); |
| |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(2, followers.size()); |
| |
| PeerId followerPeer = followers.get(0).getNodeId().getPeerId(); |
| Endpoint followerAddr = followerPeer.getEndpoint(); |
| |
| // stop and clean follower |
| LOG.info("Stop and clean follower {}", followerPeer); |
| assertTrue(cluster.stop(followerAddr)); |
| cluster.clean(followerAddr); |
| |
| // remove follower |
| LOG.info("Remove follower {}", followerPeer); |
| CountDownLatch latch = new CountDownLatch(1); |
| leader.removePeer(followerPeer, new ExpectClosure(latch)); |
| waitLatch(latch); |
| |
| sendTestTaskAndWait(leader, 10, RaftError.SUCCESS); |
| followers = cluster.getFollowers(); |
| assertEquals(1, followers.size()); |
| |
| peers = TestUtils.generatePeers(3); |
| assertTrue(peers.remove(followerPeer)); |
| |
| // start follower |
| LOG.info("Start and add follower {}", followerPeer); |
| assertTrue(cluster.start(followerAddr)); |
| // re-add follower |
| latch = new CountDownLatch(1); |
| leader.addPeer(followerPeer, new ExpectClosure(latch)); |
| waitLatch(latch); |
| |
| followers = cluster.getFollowers(); |
| assertEquals(2, followers.size()); |
| |
| cluster.ensureSame(); |
| assertEquals(3, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(20, fsm.getLogs().size()); |
| } |
| |
| @Test |
| public void testRemoveLeader() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unittest", dataPath, peers); |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| // elect leader |
| cluster.waitLeader(); |
| |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| cluster.ensureSame(); |
| |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(2, followers.size()); |
| |
| PeerId oldLeader = leader.getNodeId().getPeerId().copy(); |
| Endpoint oldLeaderAddr = oldLeader.getEndpoint(); |
| |
| // remove old leader |
| LOG.info("Remove old leader {}", oldLeader); |
| CountDownLatch latch = new CountDownLatch(1); |
| leader.removePeer(oldLeader, new ExpectClosure(latch)); |
| waitLatch(latch); |
| |
| // elect new leader |
| cluster.waitLeader(); |
| leader = cluster.getLeader(); |
| LOG.info("New leader is {}", leader); |
| assertNotNull(leader); |
| // apply tasks to new leader |
| sendTestTaskAndWait(leader, 10, RaftError.SUCCESS); |
| |
| // stop and clean old leader |
| LOG.info("Stop and clean old leader {}", oldLeader); |
| assertTrue(cluster.stop(oldLeaderAddr)); |
| cluster.clean(oldLeaderAddr); |
| |
| // Add and start old leader |
| LOG.info("Start and add old leader {}", oldLeader); |
| assertTrue(cluster.start(oldLeaderAddr)); |
| |
| peers = TestUtils.generatePeers(3); |
| assertTrue(peers.remove(oldLeader)); |
| latch = new CountDownLatch(1); |
| leader.addPeer(oldLeader, new ExpectClosure(latch)); |
| waitLatch(latch); |
| |
| followers = cluster.getFollowers(); |
| assertEquals(2, followers.size()); |
| cluster.ensureSame(); |
| assertEquals(3, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(20, fsm.getLogs().size()); |
| } |
| |
| @Test |
| @Disabled("https://issues.apache.org/jira/browse/IGNITE-15312") |
| public void testPreVote() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| cluster.waitLeader(); |
| // get leader |
| Node leader = cluster.getLeader(); |
| long savedTerm = ((NodeImpl) leader).getCurrentTerm(); |
| assertNotNull(leader); |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| cluster.ensureSame(); |
| |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(2, followers.size()); |
| |
| PeerId followerPeer = followers.get(0).getNodeId().getPeerId(); |
| Endpoint followerAddr = followerPeer.getEndpoint(); |
| |
| // remove follower |
| LOG.info("Remove follower {}", followerPeer); |
| CountDownLatch latch = new CountDownLatch(1); |
| leader.removePeer(followerPeer, new ExpectClosure(latch)); |
| waitLatch(latch); |
| |
| sendTestTaskAndWait(leader, 10, RaftError.SUCCESS); |
| |
| Thread.sleep(2000); |
| |
| // add follower |
| LOG.info("Add follower {}", followerAddr); |
| peers = TestUtils.generatePeers(3); |
| assertTrue(peers.remove(followerPeer)); |
| latch = new CountDownLatch(1); |
| leader.addPeer(followerPeer, new ExpectClosure(latch)); |
| waitLatch(latch); |
| leader = cluster.getLeader(); |
| assertNotNull(leader); |
| // leader term should not be changed. |
| assertEquals(savedTerm, ((NodeImpl) leader).getCurrentTerm()); |
| } |
| |
| @Test |
| public void testSetPeer1() throws Exception { |
| cluster = new TestCluster("testSetPeer1", dataPath, new ArrayList<>()); |
| |
| PeerId bootPeer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT); |
| assertTrue(cluster.start(bootPeer.getEndpoint())); |
| List<Node> nodes = cluster.getFollowers(); |
| assertEquals(1, nodes.size()); |
| |
| List<PeerId> peers = new ArrayList<>(); |
| peers.add(bootPeer); |
| // reset peers from empty |
| assertTrue(nodes.get(0).resetPeers(new Configuration(peers)).isOk()); |
| cluster.waitLeader(); |
| assertNotNull(cluster.getLeader()); |
| } |
| |
| @Test |
| public void testSetPeer2() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| cluster.waitLeader(); |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| cluster.ensureSame(); |
| |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(2, followers.size()); |
| |
| PeerId followerPeer1 = followers.get(0).getNodeId().getPeerId(); |
| Endpoint followerAddr1 = followerPeer1.getEndpoint(); |
| PeerId followerPeer2 = followers.get(1).getNodeId().getPeerId(); |
| Endpoint followerAddr2 = followerPeer2.getEndpoint(); |
| |
| LOG.info("Stop and clean follower {}", followerPeer1); |
| assertTrue(cluster.stop(followerAddr1)); |
| cluster.clean(followerAddr1); |
| |
| // apply tasks to leader again |
| sendTestTaskAndWait(leader, 10, RaftError.SUCCESS); |
| // set peer when no quorum die |
| Endpoint leaderAddr = leader.getLeaderId().getEndpoint().copy(); |
| LOG.info("Set peers to {}", leaderAddr); |
| |
| LOG.info("Stop and clean follower {}", followerPeer2); |
| assertTrue(cluster.stop(followerAddr2)); |
| cluster.clean(followerAddr2); |
| |
| assertTrue(waitForTopology(cluster, leaderAddr, 1, 5_000)); |
| |
| // leader will step-down, become follower |
| Thread.sleep(2000); |
| List<PeerId> newPeers = new ArrayList<>(); |
| newPeers.add(new PeerId(leaderAddr, 0)); |
| |
| // new peers equal to current conf |
| assertTrue(leader.resetPeers(new Configuration(peers)).isOk()); |
| // set peer when quorum die |
| LOG.warn("Set peers to {}", leaderAddr); |
| assertTrue(leader.resetPeers(new Configuration(newPeers)).isOk()); |
| |
| cluster.waitLeader(); |
| leader = cluster.getLeader(); |
| assertNotNull(leader); |
| assertEquals(leaderAddr, leader.getNodeId().getPeerId().getEndpoint()); |
| |
| LOG.info("start follower {}", followerAddr1); |
| assertTrue(cluster.start(followerAddr1, true, 300)); |
| LOG.info("start follower {}", followerAddr2); |
| assertTrue(cluster.start(followerAddr2, true, 300)); |
| |
| CountDownLatch latch = new CountDownLatch(1); |
| LOG.info("Add old follower {}", followerAddr1); |
| leader.addPeer(followerPeer1, new ExpectClosure(latch)); |
| waitLatch(latch); |
| |
| latch = new CountDownLatch(1); |
| LOG.info("Add old follower {}", followerAddr2); |
| leader.addPeer(followerPeer2, new ExpectClosure(latch)); |
| waitLatch(latch); |
| |
| newPeers.add(followerPeer1); |
| newPeers.add(followerPeer2); |
| |
| cluster.ensureSame(); |
| assertEquals(3, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(20, fsm.getLogs().size()); |
| } |
| |
| /** |
| * @throws Exception |
| */ |
| @Test |
| public void testRestoreSnapshot() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| cluster.waitLeader(); |
| // get leader |
| Node leader = cluster.getLeader(); |
| |
| LOG.info("Leader: " + leader); |
| |
| assertNotNull(leader); |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| cluster.ensureSame(); |
| triggerLeaderSnapshot(cluster, leader); |
| |
| // stop leader |
| Endpoint leaderAddr = leader.getNodeId().getPeerId().getEndpoint().copy(); |
| assertTrue(cluster.stop(leaderAddr)); |
| |
| // restart leader |
| cluster.waitLeader(); |
| assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes()); |
| assertTrue(cluster.start(leaderAddr)); |
| cluster.ensureSame(); |
| assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes()); |
| } |
| |
| /** |
| * @throws Exception |
| */ |
| @Test |
| public void testRestoreSnapshotWithDelta() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| cluster.waitLeader(); |
| // get leader |
| Node leader = cluster.getLeader(); |
| |
| LOG.info("Leader: " + leader); |
| |
| assertNotNull(leader); |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| cluster.ensureSame(); |
| triggerLeaderSnapshot(cluster, leader); |
| |
| // stop leader |
| Endpoint leaderAddr = leader.getNodeId().getPeerId().getEndpoint().copy(); |
| assertTrue(cluster.stop(leaderAddr)); |
| |
| // restart leader |
| cluster.waitLeader(); |
| |
| sendTestTaskAndWait(cluster.getLeader(), 10, RaftError.SUCCESS); |
| |
| assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes()); |
| assertTrue(cluster.start(leaderAddr)); |
| |
| Node oldLeader = cluster.getNode(leaderAddr); |
| |
| cluster.ensureSame(); |
| assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes()); |
| |
| MockStateMachine fsm = (MockStateMachine) oldLeader.getOptions().getFsm(); |
| assertEquals(1, fsm.getLoadSnapshotTimes()); |
| } |
| |
| @Test |
| public void testInstallSnapshotWithThrottle() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), false, 200, false, new ThroughputSnapshotThrottle(1024, 1))); |
| |
| cluster.waitLeader(); |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| cluster.ensureSame(); |
| |
| // stop follower1 |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(2, followers.size()); |
| |
| Endpoint followerAddr = followers.get(0).getNodeId().getPeerId().getEndpoint(); |
| assertTrue(cluster.stop(followerAddr)); |
| |
| cluster.waitLeader(); |
| |
| // apply something more |
| sendTestTaskAndWait(leader, 10, RaftError.SUCCESS); |
| |
| Thread.sleep(1000); |
| |
| // trigger leader snapshot |
| triggerLeaderSnapshot(cluster, leader); |
| // apply something more |
| sendTestTaskAndWait(leader, 20, RaftError.SUCCESS); |
| // trigger leader snapshot |
| triggerLeaderSnapshot(cluster, leader, 2); |
| |
| // wait leader to compact logs |
| Thread.sleep(1000); |
| |
| // restart follower. |
| cluster.clean(followerAddr); |
| assertTrue(cluster.start(followerAddr, true, 300, false, new ThroughputSnapshotThrottle(1024, 1))); |
| |
| Thread.sleep(2000); |
| cluster.ensureSame(); |
| |
| assertEquals(3, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(30, fsm.getLogs().size()); |
| } |
| |
| @Test // TODO add test for timeout on snapshot install https://issues.apache.org/jira/browse/IGNITE-14832 |
| @Disabled("https://issues.apache.org/jira/browse/IGNITE-14943") |
| public void testInstallLargeSnapshotWithThrottle() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(4); |
| cluster = new TestCluster("unitest", dataPath, peers.subList(0, 3)); |
| for (int i = 0; i < peers.size() - 1; i++) { |
| PeerId peer = peers.get(i); |
| boolean started = cluster.start(peer.getEndpoint(), false, 200, false); |
| assertTrue(started); |
| } |
| cluster.waitLeader(); |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| // apply tasks to leader |
| sendTestTaskAndWait(leader, 0, RaftError.SUCCESS); |
| |
| cluster.ensureSame(); |
| |
| // apply something more |
| for (int i = 1; i < 100; i++) |
| sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS); |
| |
| Thread.sleep(1000); |
| |
| // trigger leader snapshot |
| triggerLeaderSnapshot(cluster, leader); |
| |
| // apply something more |
| for (int i = 100; i < 200; i++) |
| sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS); |
| // trigger leader snapshot |
| triggerLeaderSnapshot(cluster, leader, 2); |
| |
| // wait leader to compact logs |
| Thread.sleep(1000); |
| |
| // add follower |
| PeerId newPeer = peers.get(3); |
| SnapshotThrottle snapshotThrottle = new ThroughputSnapshotThrottle(128, 1); |
| boolean started = cluster.start(newPeer.getEndpoint(), false, 300, false, snapshotThrottle); |
| assertTrue(started); |
| |
| CountDownLatch latch = new CountDownLatch(1); |
| leader.addPeer(newPeer, status -> { |
| assertTrue(status.isOk(), status.toString()); |
| latch.countDown(); |
| }); |
| waitLatch(latch); |
| |
| cluster.ensureSame(); |
| |
| assertEquals(4, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(2000, fsm.getLogs().size()); |
| } |
| |
| @Test |
| public void testInstallLargeSnapshot() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(4); |
| cluster = new TestCluster("unitest", dataPath, peers.subList(0, 3)); |
| for (int i = 0; i < peers.size() - 1; i++) { |
| PeerId peer = peers.get(i); |
| boolean started = cluster.start(peer.getEndpoint(), false, 200, false); |
| assertTrue(started); |
| } |
| cluster.waitLeader(); |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| // apply tasks to leader |
| sendTestTaskAndWait(leader, 0, RaftError.SUCCESS); |
| |
| cluster.ensureSame(); |
| |
| // apply something more |
| for (int i = 1; i < 100; i++) |
| sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS); |
| |
| Thread.sleep(1000); |
| |
| // trigger leader snapshot |
| triggerLeaderSnapshot(cluster, leader); |
| |
| // apply something more |
| for (int i = 100; i < 200; i++) |
| sendTestTaskAndWait(leader, i * 10, RaftError.SUCCESS); |
| // trigger leader snapshot |
| triggerLeaderSnapshot(cluster, leader, 2); |
| |
| // wait leader to compact logs |
| Thread.sleep(1000); |
| |
| // add follower |
| PeerId newPeer = peers.get(3); |
| RaftOptions raftOptions = new RaftOptions(); |
| raftOptions.setMaxByteCountPerRpc(128); |
| boolean started = cluster.start(newPeer.getEndpoint(), false, 300, false, null, raftOptions); |
| assertTrue(started); |
| |
| CountDownLatch latch = new CountDownLatch(1); |
| leader.addPeer(newPeer, status -> { |
| assertTrue(status.isOk(), status.toString()); |
| latch.countDown(); |
| }); |
| waitLatch(latch); |
| |
| cluster.ensureSame(); |
| |
| assertEquals(4, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(2000, fsm.getLogs().size()); |
| } |
| |
| @Test |
| public void testInstallSnapshot() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| cluster.waitLeader(); |
| // get leader |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| // apply tasks to leader |
| sendTestTaskAndWait(leader); |
| |
| cluster.ensureSame(); |
| |
| // stop follower1 |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(2, followers.size()); |
| |
| Endpoint followerAddr = followers.get(0).getNodeId().getPeerId().getEndpoint(); |
| assertTrue(cluster.stop(followerAddr)); |
| |
| // apply something more |
| sendTestTaskAndWait(leader, 10, RaftError.SUCCESS); |
| |
| // trigger leader snapshot |
| triggerLeaderSnapshot(cluster, leader); |
| // apply something more |
| sendTestTaskAndWait(leader, 20, RaftError.SUCCESS); |
| triggerLeaderSnapshot(cluster, leader, 2); |
| |
| // wait leader to compact logs |
| Thread.sleep(50); |
| |
| //restart follower. |
| cluster.clean(followerAddr); |
| assertTrue(cluster.start(followerAddr, false, 300)); |
| |
| cluster.ensureSame(); |
| |
| assertEquals(3, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(30, fsm.getLogs().size(), fsm.getAddress().toString()); |
| } |
| |
| @Test |
| public void testNoSnapshot() throws Exception { |
| Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT); |
| NodeOptions nodeOptions = createNodeOptions(); |
| MockStateMachine fsm = new MockStateMachine(addr); |
| nodeOptions.setFsm(fsm); |
| nodeOptions.setLogUri(dataPath + File.separator + "log"); |
| nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta"); |
| nodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(addr, 0)))); |
| |
| RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions); |
| Node node = service.start(); |
| // wait node elect self as leader |
| |
| Thread.sleep(2000); |
| |
| sendTestTaskAndWait(node); |
| |
| assertEquals(0, fsm.getSaveSnapshotTimes()); |
| // do snapshot but returns error |
| CountDownLatch latch = new CountDownLatch(1); |
| node.snapshot(new ExpectClosure(RaftError.EINVAL, "Snapshot is not supported", latch)); |
| waitLatch(latch); |
| assertEquals(0, fsm.getSaveSnapshotTimes()); |
| } |
| |
| @Test |
| public void testAutoSnapshot() throws Exception { |
| Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT); |
| NodeOptions nodeOptions = createNodeOptions(); |
| MockStateMachine fsm = new MockStateMachine(addr); |
| nodeOptions.setFsm(fsm); |
| nodeOptions.setLogUri(dataPath + File.separator + "log"); |
| nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); |
| nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta"); |
| nodeOptions.setSnapshotIntervalSecs(10); |
| nodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(addr, 0)))); |
| |
| RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions); |
| Node node = service.start(); |
| // wait node elect self as leader |
| Thread.sleep(2000); |
| |
| sendTestTaskAndWait(node); |
| |
| // wait for auto snapshot |
| Thread.sleep(10000); |
| // first snapshot will be triggered randomly |
| int times = fsm.getSaveSnapshotTimes(); |
| assertTrue(times >= 1, "snapshotTimes=" + times); |
| assertTrue(fsm.getSnapshotIndex() > 0); |
| } |
| |
| @Test |
| public void testLeaderShouldNotChange() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| cluster.waitLeader(); |
| // get leader |
| Node leader0 = cluster.getLeader(); |
| assertNotNull(leader0); |
| long savedTerm = ((NodeImpl) leader0).getCurrentTerm(); |
| LOG.info("Current leader is {}, term is {}", leader0, savedTerm); |
| Thread.sleep(5000); |
| cluster.waitLeader(); |
| Node leader1 = cluster.getLeader(); |
| assertNotNull(leader1); |
| LOG.info("Current leader is {}", leader1); |
| assertEquals(savedTerm, ((NodeImpl) leader1).getCurrentTerm()); |
| } |
| |
| @Test |
| public void testRecoverFollower() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| cluster.waitLeader(); |
| |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(2, followers.size()); |
| |
| Endpoint followerAddr = followers.get(0).getNodeId().getPeerId().getEndpoint().copy(); |
| assertTrue(cluster.stop(followerAddr)); |
| |
| sendTestTaskAndWait(leader); |
| |
| for (int i = 10; i < 30; i++) { |
| ByteBuffer data = ByteBuffer.wrap(("no cluster" + i).getBytes()); |
| Task task = new Task(data, null); |
| leader.apply(task); |
| } |
| // wait leader to compact logs |
| Thread.sleep(5000); |
| // restart follower |
| assertTrue(cluster.start(followerAddr)); |
| cluster.ensureSame(); |
| assertEquals(3, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(30, fsm.getLogs().size()); |
| } |
| |
| @Test |
| public void testLeaderTransfer() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers, 300); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| cluster.waitLeader(); |
| |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| sendTestTaskAndWait(leader); |
| |
| Thread.sleep(100); |
| |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(2, followers.size()); |
| |
| PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy(); |
| LOG.info("Transfer leadership from {} to {}", leader, targetPeer); |
| assertTrue(leader.transferLeadershipTo(targetPeer).isOk()); |
| cluster.waitLeader(); |
| leader = cluster.getLeader(); |
| assertEquals(leader.getNodeId().getPeerId(), targetPeer); |
| } |
| |
| @Test |
| public void testLeaderTransferBeforeLogIsCompleted() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers, 300); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), false, 1)); |
| |
| cluster.waitLeader(); |
| |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(2, followers.size()); |
| |
| PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy(); |
| assertTrue(cluster.stop(targetPeer.getEndpoint())); |
| |
| sendTestTaskAndWait(leader); |
| LOG.info("Transfer leadership from {} to {}", leader, targetPeer); |
| assertTrue(leader.transferLeadershipTo(targetPeer).isOk()); |
| |
| CountDownLatch latch = new CountDownLatch(1); |
| Task task = new Task(ByteBuffer.wrap("aaaaa".getBytes()), new ExpectClosure(RaftError.EBUSY, latch)); |
| leader.apply(task); |
| waitLatch(latch); |
| |
| cluster.waitLeader(); |
| |
| assertTrue(cluster.start(targetPeer.getEndpoint())); |
| |
| leader = cluster.getLeader(); |
| |
| assertNotEquals(targetPeer, leader.getNodeId().getPeerId()); |
| cluster.ensureSame(); |
| } |
| |
| @Test |
| public void testLeaderTransferResumeOnFailure() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers, 300); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint(), false, 1)); |
| |
| cluster.waitLeader(); |
| |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(2, followers.size()); |
| |
| PeerId targetPeer = followers.get(0).getNodeId().getPeerId().copy(); |
| assertTrue(cluster.stop(targetPeer.getEndpoint())); |
| |
| sendTestTaskAndWait(leader); |
| |
| assertTrue(leader.transferLeadershipTo(targetPeer).isOk()); |
| Node savedLeader = leader; |
| //try to apply task when transferring leadership |
| CountDownLatch latch = new CountDownLatch(1); |
| Task task = new Task(ByteBuffer.wrap("aaaaa".getBytes()), new ExpectClosure(RaftError.EBUSY, latch)); |
| leader.apply(task); |
| waitLatch(latch); |
| |
| Thread.sleep(100); |
| cluster.waitLeader(); |
| leader = cluster.getLeader(); |
| assertSame(leader, savedLeader); |
| |
| // restart target peer |
| assertTrue(cluster.start(targetPeer.getEndpoint())); |
| Thread.sleep(100); |
| // retry apply task |
| latch = new CountDownLatch(1); |
| task = new Task(ByteBuffer.wrap("aaaaa".getBytes()), new ExpectClosure(latch)); |
| leader.apply(task); |
| waitLatch(latch); |
| |
| cluster.ensureSame(); |
| } |
| |
| /** |
| * mock state machine that fails to load snapshot. |
| */ |
| static class MockFSM1 extends MockStateMachine { |
| |
| MockFSM1() { |
| this(new Endpoint(Utils.IP_ANY, 0)); |
| } |
| |
| MockFSM1(Endpoint address) { |
| super(address); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public boolean onSnapshotLoad(SnapshotReader reader) { |
| return false; |
| } |
| } |
| |
| @Test |
| public void testShutdownAndJoinWorkAfterInitFails() throws Exception { |
| Endpoint addr = new Endpoint(TestUtils.getLocalAddress(), TestUtils.INIT_PORT); |
| { |
| NodeOptions nodeOptions = createNodeOptions(); |
| MockStateMachine fsm = new MockStateMachine(addr); |
| nodeOptions.setFsm(fsm); |
| nodeOptions.setLogUri(dataPath + File.separator + "log"); |
| nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); |
| nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta"); |
| nodeOptions.setSnapshotIntervalSecs(10); |
| nodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(addr, 0)))); |
| |
| RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions); |
| Node node = service.start(); |
| |
| Thread.sleep(1000); |
| sendTestTaskAndWait(node); |
| |
| // save snapshot |
| CountDownLatch latch = new CountDownLatch(1); |
| node.snapshot(new ExpectClosure(latch)); |
| waitLatch(latch); |
| service.shutdown(); |
| } |
| { |
| NodeOptions nodeOptions = createNodeOptions(); |
| MockStateMachine fsm = new MockFSM1(addr); |
| nodeOptions.setFsm(fsm); |
| nodeOptions.setLogUri(dataPath + File.separator + "log"); |
| nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); |
| nodeOptions.setRaftMetaUri(dataPath + File.separator + "meta"); |
| nodeOptions.setSnapshotIntervalSecs(10); |
| nodeOptions.setInitialConf(new Configuration(Collections.singletonList(new PeerId(addr, 0)))); |
| |
| RaftGroupService service = createService("unittest", new PeerId(addr, 0), nodeOptions); |
| try { |
| service.start(); |
| |
| fail(); |
| } |
| catch (Exception e) { |
| // Expected. |
| } |
| } |
| } |
| |
| /** |
| * 4.2.2 Removing the current leader |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testShuttingDownLeaderTriggerTimeoutNow() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers, 300); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| cluster.waitLeader(); |
| |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| Node oldLeader = leader; |
| |
| LOG.info("Shutdown leader {}", leader); |
| leader.shutdown(); |
| leader.join(); |
| |
| cluster.waitLeader(); |
| leader = cluster.getLeader(); |
| |
| assertNotNull(leader); |
| assertNotSame(leader, oldLeader); |
| } |
| |
| @Test |
| public void testRemovingLeaderTriggerTimeoutNow() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers, 300); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| cluster.waitLeader(); |
| |
| // Ensure the quorum before removing a leader, otherwise removePeer can be rejected. |
| for (Node follower : cluster.getFollowers()) |
| assertTrue(waitForCondition(() -> follower.getLeaderId() != null, 5_000)); |
| |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| Node oldLeader = leader; |
| |
| CountDownLatch latch = new CountDownLatch(1); |
| oldLeader.removePeer(oldLeader.getNodeId().getPeerId(), new ExpectClosure(latch)); |
| waitLatch(latch); |
| |
| cluster.waitLeader(); |
| leader = cluster.getLeader(); |
| assertNotNull(leader); |
| assertNotSame(leader, oldLeader); |
| } |
| |
| @Test |
| public void testTransferShouldWorkAfterInstallSnapshot() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers, 1000); |
| |
| for (int i = 0; i < peers.size() - 1; i++) |
| assertTrue(cluster.start(peers.get(i).getEndpoint())); |
| |
| cluster.waitLeader(); |
| |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| |
| sendTestTaskAndWait(leader); |
| |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(1, followers.size()); |
| |
| PeerId follower = followers.get(0).getNodeId().getPeerId(); |
| assertTrue(leader.transferLeadershipTo(follower).isOk()); |
| cluster.waitLeader(); |
| leader = cluster.getLeader(); |
| assertEquals(follower, leader.getNodeId().getPeerId()); |
| |
| CountDownLatch latch = new CountDownLatch(1); |
| leader.snapshot(new ExpectClosure(latch)); |
| waitLatch(latch); |
| latch = new CountDownLatch(1); |
| leader.snapshot(new ExpectClosure(latch)); |
| waitLatch(latch); |
| |
| // start the last peer which should be recover with snapshot. |
| PeerId lastPeer = peers.get(2); |
| assertTrue(cluster.start(lastPeer.getEndpoint())); |
| Thread.sleep(5000); |
| assertTrue(leader.transferLeadershipTo(lastPeer).isOk()); |
| Thread.sleep(2000); |
| leader = cluster.getLeader(); |
| assertEquals(lastPeer, leader.getNodeId().getPeerId()); |
| assertEquals(3, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(10, fsm.getLogs().size()); |
| } |
| |
| @Test |
| public void testAppendEntriesWhenFollowerIsInErrorState() throws Exception { |
| // start five nodes |
| List<PeerId> peers = TestUtils.generatePeers(5); |
| |
| cluster = new TestCluster("unitest", dataPath, peers, 1000); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| cluster.waitLeader(); |
| Node oldLeader = cluster.getLeader(); |
| assertNotNull(oldLeader); |
| // apply something |
| sendTestTaskAndWait(oldLeader); |
| |
| // set one follower into error state |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(4, followers.size()); |
| Node errorNode = followers.get(0); |
| PeerId errorPeer = errorNode.getNodeId().getPeerId().copy(); |
| Endpoint errorFollowerAddr = errorPeer.getEndpoint(); |
| LOG.info("Set follower {} into error state", errorNode); |
| ((NodeImpl) errorNode).onError(new RaftException(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(-1, |
| "Follower has something wrong."))); |
| |
| // increase term by stopping leader and electing a new leader again |
| Endpoint oldLeaderAddr = oldLeader.getNodeId().getPeerId().getEndpoint().copy(); |
| assertTrue(cluster.stop(oldLeaderAddr)); |
| cluster.waitLeader(); |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| LOG.info("Elect a new leader {}", leader); |
| // apply something again |
| sendTestTaskAndWait(leader, 10, RaftError.SUCCESS); |
| |
| // stop error follower |
| Thread.sleep(20); |
| LOG.info("Stop error follower {}", errorNode); |
| assertTrue(cluster.stop(errorFollowerAddr)); |
| // restart error and old leader |
| LOG.info("Restart error follower {} and old leader {}", errorFollowerAddr, oldLeaderAddr); |
| |
| assertTrue(cluster.start(errorFollowerAddr)); |
| assertTrue(cluster.start(oldLeaderAddr)); |
| cluster.ensureSame(); |
| assertEquals(5, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(20, fsm.getLogs().size()); |
| } |
| |
| @Test |
| public void testFollowerStartStopFollowing() throws Exception { |
| // start five nodes |
| List<PeerId> peers = TestUtils.generatePeers(5); |
| |
| cluster = new TestCluster("unitest", dataPath, peers, 1000); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| cluster.waitLeader(); |
| Node firstLeader = cluster.getLeader(); |
| assertNotNull(firstLeader); |
| cluster.ensureLeader(firstLeader); |
| |
| // apply something |
| sendTestTaskAndWait(firstLeader); |
| |
| // assert follow times |
| List<Node> firstFollowers = cluster.getFollowers(); |
| assertEquals(4, firstFollowers.size()); |
| for (Node node : firstFollowers) { |
| assertTrue( |
| waitForCondition(() -> ((MockStateMachine) node.getOptions().getFsm()).getOnStartFollowingTimes() == 1, 5_000)); |
| assertEquals(0, ((MockStateMachine) node.getOptions().getFsm()).getOnStopFollowingTimes()); |
| } |
| |
| // stop leader and elect new one |
| Endpoint fstLeaderAddr = firstLeader.getNodeId().getPeerId().getEndpoint(); |
| assertTrue(cluster.stop(fstLeaderAddr)); |
| cluster.waitLeader(); |
| Node secondLeader = cluster.getLeader(); |
| assertNotNull(secondLeader); |
| sendTestTaskAndWait(secondLeader, 10, RaftError.SUCCESS); |
| |
| // ensure start/stop following times |
| List<Node> secondFollowers = cluster.getFollowers(); |
| assertEquals(3, secondFollowers.size()); |
| for (Node node : secondFollowers) { |
| assertTrue( |
| waitForCondition(() -> ((MockStateMachine) node.getOptions().getFsm()).getOnStartFollowingTimes() == 2, 5_000)); |
| assertEquals(1, ((MockStateMachine) node.getOptions().getFsm()).getOnStopFollowingTimes()); |
| } |
| |
| // transfer leadership to a follower |
| PeerId targetPeer = secondFollowers.get(0).getNodeId().getPeerId().copy(); |
| assertTrue(secondLeader.transferLeadershipTo(targetPeer).isOk()); |
| Thread.sleep(100); |
| cluster.waitLeader(); |
| Node thirdLeader = cluster.getLeader(); |
| assertEquals(targetPeer, thirdLeader.getNodeId().getPeerId()); |
| sendTestTaskAndWait(thirdLeader, 20, RaftError.SUCCESS); |
| |
| List<Node> thirdFollowers = cluster.getFollowers(); |
| assertEquals(3, thirdFollowers.size()); |
| for (int i = 0; i < 3; i++) { |
| Node follower = thirdFollowers.get(i); |
| if (follower.getNodeId().getPeerId().equals(secondLeader.getNodeId().getPeerId())) { |
| assertTrue( |
| waitForCondition(() -> ((MockStateMachine) follower.getOptions().getFsm()).getOnStartFollowingTimes() == 2, 5_000)); |
| assertEquals(1, |
| ((MockStateMachine) follower.getOptions().getFsm()).getOnStopFollowingTimes()); |
| continue; |
| } |
| |
| assertTrue(waitForCondition(() -> ((MockStateMachine) follower.getOptions().getFsm()).getOnStartFollowingTimes() == 3, 5_000)); |
| assertEquals(2, ((MockStateMachine) follower.getOptions().getFsm()).getOnStopFollowingTimes()); |
| } |
| |
| cluster.ensureSame(); |
| } |
| |
| @Test |
| @Disabled("https://issues.apache.org/jira/browse/IGNITE-15202") |
| public void readCommittedUserLog() throws Exception { |
| // setup cluster |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| |
| cluster = new TestCluster("unitest", dataPath, peers, 1000); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| cluster.waitLeader(); |
| |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| cluster.ensureLeader(leader); |
| |
| sendTestTaskAndWait(leader); |
| |
| // index == 1 is a CONFIGURATION log, so real_index will be 2 when returned. |
| UserLog userLog = leader.readCommittedUserLog(1); |
| assertNotNull(userLog); |
| assertEquals(2, userLog.getIndex()); |
| assertEquals("hello0", new String(userLog.getData().array())); |
| |
| // index == 5 is a DATA log(a user log) |
| userLog = leader.readCommittedUserLog(5); |
| assertNotNull(userLog); |
| assertEquals(5, userLog.getIndex()); |
| assertEquals("hello3", new String(userLog.getData().array())); |
| |
| // index == 15 is greater than last_committed_index |
| try { |
| assertNull(leader.readCommittedUserLog(15)); |
| fail(); |
| } |
| catch (LogIndexOutOfBoundsException e) { |
| assertEquals("Request index 15 is greater than lastAppliedIndex: 11", e.getMessage()); |
| } |
| |
| // index == 0 invalid request |
| try { |
| assertNull(leader.readCommittedUserLog(0)); |
| fail(); |
| } |
| catch (LogIndexOutOfBoundsException e) { |
| assertEquals("Request index is invalid: 0", e.getMessage()); |
| } |
| LOG.info("Trigger leader snapshot"); |
| CountDownLatch latch = new CountDownLatch(1); |
| leader.snapshot(new ExpectClosure(latch)); |
| waitLatch(latch); |
| |
| // remove and add a peer to add two CONFIGURATION logs |
| List<Node> followers = cluster.getFollowers(); |
| assertEquals(2, followers.size()); |
| Node testFollower = followers.get(0); |
| latch = new CountDownLatch(1); |
| leader.removePeer(testFollower.getNodeId().getPeerId(), new ExpectClosure(latch)); |
| waitLatch(latch); |
| latch = new CountDownLatch(1); |
| leader.addPeer(testFollower.getNodeId().getPeerId(), new ExpectClosure(latch)); |
| waitLatch(latch); |
| |
| sendTestTaskAndWait(leader, 10, RaftError.SUCCESS); |
| |
| // trigger leader snapshot for the second time, after this the log of index 1~11 will be deleted. |
| LOG.info("Trigger leader snapshot"); |
| latch = new CountDownLatch(1); |
| leader.snapshot(new ExpectClosure(latch)); |
| waitLatch(latch); |
| Thread.sleep(100); |
| |
| // index == 5 log has been deleted in log_storage. |
| try { |
| leader.readCommittedUserLog(5); |
| fail(); |
| } |
| catch (LogNotFoundException e) { |
| assertEquals("User log is deleted at index: 5", e.getMessage()); |
| } |
| |
| // index == 12、index == 13、index=14、index=15 are 4 CONFIGURATION logs(joint consensus), so real_index will be 16 when returned. |
| userLog = leader.readCommittedUserLog(12); |
| assertNotNull(userLog); |
| assertEquals(16, userLog.getIndex()); |
| assertEquals("hello10", new String(userLog.getData().array())); |
| |
| // now index == 17 is a user log |
| userLog = leader.readCommittedUserLog(17); |
| assertNotNull(userLog); |
| assertEquals(17, userLog.getIndex()); |
| assertEquals("hello11", new String(userLog.getData().array())); |
| |
| cluster.ensureSame(); |
| assertEquals(3, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) { |
| assertEquals(20, fsm.getLogs().size()); |
| for (int i = 0; i < 20; i++) |
| assertEquals("hello" + i, new String(fsm.getLogs().get(i).array())); |
| } |
| } |
| |
| @Test |
| public void testBootStrapWithSnapshot() throws Exception { |
| Endpoint addr = new Endpoint("127.0.0.1", 5006); |
| MockStateMachine fsm = new MockStateMachine(addr); |
| |
| for (char ch = 'a'; ch <= 'z'; ch++) |
| fsm.getLogs().add(ByteBuffer.wrap(new byte[] {(byte) ch})); |
| |
| BootstrapOptions opts = new BootstrapOptions(); |
| opts.setServiceFactory(new DefaultJRaftServiceFactory()); |
| opts.setLastLogIndex(fsm.getLogs().size()); |
| opts.setRaftMetaUri(dataPath + File.separator + "meta"); |
| opts.setLogUri(dataPath + File.separator + "log"); |
| opts.setSnapshotUri(dataPath + File.separator + "snapshot"); |
| opts.setGroupConf(JRaftUtils.getConfiguration("127.0.0.1:5006")); |
| opts.setFsm(fsm); |
| |
| NodeOptions nodeOpts = createNodeOptions(); |
| opts.setNodeOptions(nodeOpts); |
| |
| assertTrue(JRaftUtils.bootstrap(opts)); |
| |
| nodeOpts.setRaftMetaUri(dataPath + File.separator + "meta"); |
| nodeOpts.setLogUri(dataPath + File.separator + "log"); |
| nodeOpts.setSnapshotUri(dataPath + File.separator + "snapshot"); |
| nodeOpts.setFsm(fsm); |
| |
| RaftGroupService service = createService("test", new PeerId(addr, 0), nodeOpts); |
| |
| Node node = service.start(); |
| assertEquals(26, fsm.getLogs().size()); |
| |
| for (int i = 0; i < 26; i++) |
| assertEquals('a' + i, fsm.getLogs().get(i).get()); |
| |
| // Group configuration will be restored from snapshot meta. |
| while (!node.isLeader()) |
| Thread.sleep(20); |
| sendTestTaskAndWait(node); |
| assertEquals(36, fsm.getLogs().size()); |
| } |
| |
| @Test |
| public void testBootStrapWithoutSnapshot() throws Exception { |
| Endpoint addr = new Endpoint("127.0.0.1", 5006); |
| MockStateMachine fsm = new MockStateMachine(addr); |
| |
| BootstrapOptions opts = new BootstrapOptions(); |
| opts.setServiceFactory(new DefaultJRaftServiceFactory()); |
| opts.setLastLogIndex(0); |
| opts.setRaftMetaUri(dataPath + File.separator + "meta"); |
| opts.setLogUri(dataPath + File.separator + "log"); |
| opts.setSnapshotUri(dataPath + File.separator + "snapshot"); |
| opts.setGroupConf(JRaftUtils.getConfiguration("127.0.0.1:5006")); |
| opts.setFsm(fsm); |
| NodeOptions nodeOpts = createNodeOptions(); |
| opts.setNodeOptions(nodeOpts); |
| |
| assertTrue(JRaftUtils.bootstrap(opts)); |
| |
| nodeOpts.setRaftMetaUri(dataPath + File.separator + "meta"); |
| nodeOpts.setLogUri(dataPath + File.separator + "log"); |
| nodeOpts.setSnapshotUri(dataPath + File.separator + "snapshot"); |
| nodeOpts.setFsm(fsm); |
| |
| RaftGroupService service = createService("test", new PeerId(addr, 0), nodeOpts); |
| |
| Node node = service.start(); |
| while (!node.isLeader()) |
| Thread.sleep(20); |
| sendTestTaskAndWait(node); |
| assertEquals(10, fsm.getLogs().size()); |
| } |
| |
| @Test |
| public void testChangePeers() throws Exception { |
| PeerId peer0 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT); |
| cluster = new TestCluster("testChangePeers", dataPath, Collections.singletonList(peer0)); |
| assertTrue(cluster.start(peer0.getEndpoint())); |
| |
| cluster.waitLeader(); |
| Node leader = cluster.getLeader(); |
| sendTestTaskAndWait(leader); |
| |
| for (int i = 1; i < 10; i++) { |
| PeerId peer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i); |
| assertTrue(cluster.start(peer.getEndpoint(), false, 300)); |
| } |
| for (int i = 0; i < 9; i++) { |
| cluster.waitLeader(); |
| leader = cluster.getLeader(); |
| assertNotNull(leader); |
| PeerId peer = new PeerId(TestUtils.getLocalAddress(), peer0.getEndpoint().getPort() + i); |
| assertEquals(peer, leader.getNodeId().getPeerId()); |
| peer = new PeerId(TestUtils.getLocalAddress(), peer0.getEndpoint().getPort() + i + 1); |
| SynchronizedClosure done = new SynchronizedClosure(); |
| leader.changePeers(new Configuration(Collections.singletonList(peer)), done); |
| Status status = done.await(); |
| assertTrue(status.isOk(), status.getRaftError().toString()); |
| } |
| |
| cluster.waitLeader(); |
| |
| for (final MockStateMachine fsm : cluster.getFsms()) { |
| assertEquals(10, fsm.getLogs().size()); |
| } |
| } |
| |
| @Test |
| public void testChangePeersAddMultiNodes() throws Exception { |
| PeerId peer0 = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT); |
| cluster = new TestCluster("testChangePeers", dataPath, Collections.singletonList(peer0)); |
| assertTrue(cluster.start(peer0.getEndpoint())); |
| |
| cluster.waitLeader(); |
| Node leader = cluster.getLeader(); |
| sendTestTaskAndWait(leader); |
| |
| Configuration conf = new Configuration(); |
| for (int i = 0; i < 3; i++) { |
| PeerId peer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i); |
| conf.addPeer(peer); |
| } |
| |
| PeerId peer = new PeerId(TestUtils.getLocalAddress(), peer0.getEndpoint().getPort() + 1); |
| // fail, because the peers are not started. |
| SynchronizedClosure done = new SynchronizedClosure(); |
| leader.changePeers(new Configuration(Collections.singletonList(peer)), done); |
| assertEquals(RaftError.ECATCHUP, done.await().getRaftError()); |
| |
| // start peer1 |
| assertTrue(cluster.start(peer.getEndpoint())); |
| // still fail, because peer2 is not started |
| done.reset(); |
| leader.changePeers(conf, done); |
| assertEquals(RaftError.ECATCHUP, done.await().getRaftError()); |
| // start peer2 |
| peer = new PeerId(TestUtils.getLocalAddress(), peer0.getEndpoint().getPort() + 2); |
| assertTrue(cluster.start(peer.getEndpoint())); |
| done.reset(); |
| // works |
| leader.changePeers(conf, done); |
| assertTrue(done.await().isOk()); |
| |
| cluster.ensureSame(); |
| assertEquals(3, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertEquals(10, fsm.getLogs().size()); |
| } |
| |
| @Test |
| public void testChangePeersStepsDownInJointConsensus() throws Exception { |
| List<PeerId> peers = new ArrayList<>(); |
| |
| PeerId peer0 = JRaftUtils.getPeerId(TestUtils.getLocalAddress() + ":5006"); |
| PeerId peer1 = JRaftUtils.getPeerId(TestUtils.getLocalAddress() + ":5007"); |
| PeerId peer2 = JRaftUtils.getPeerId(TestUtils.getLocalAddress() + ":5008"); |
| PeerId peer3 = JRaftUtils.getPeerId(TestUtils.getLocalAddress() + ":5009"); |
| |
| // start single cluster |
| peers.add(peer0); |
| cluster = new TestCluster("testChangePeersStepsDownInJointConsensus", dataPath, peers); |
| assertTrue(cluster.start(peer0.getEndpoint())); |
| |
| cluster.waitLeader(); |
| Node leader = cluster.getLeader(); |
| assertNotNull(leader); |
| sendTestTaskAndWait(leader); |
| |
| // start peer1-3 |
| assertTrue(cluster.start(peer1.getEndpoint())); |
| assertTrue(cluster.start(peer2.getEndpoint())); |
| assertTrue(cluster.start(peer3.getEndpoint())); |
| |
| // Make sure the topology is ready before adding peers. |
| assertTrue(waitForTopology(cluster, leader.getNodeId().getPeerId().getEndpoint(), 4, 3_000)); |
| |
| Configuration conf = new Configuration(); |
| conf.addPeer(peer0); |
| conf.addPeer(peer1); |
| conf.addPeer(peer2); |
| conf.addPeer(peer3); |
| |
| // change peers |
| SynchronizedClosure done = new SynchronizedClosure(); |
| leader.changePeers(conf, done); |
| assertTrue(done.await().isOk()); |
| |
| // stop peer3 |
| assertTrue(cluster.stop(peer3.getEndpoint())); |
| |
| conf.removePeer(peer0); |
| conf.removePeer(peer1); |
| |
| // Change peers to [peer2, peer3], which must fail since peer3 is stopped |
| done.reset(); |
| leader.changePeers(conf, done); |
| assertEquals(RaftError.EPERM, done.await().getRaftError()); |
| LOG.info(done.getStatus().toString()); |
| |
| assertFalse(((NodeImpl) leader).getConf().isStable()); |
| |
| leader = cluster.getLeader(); |
| assertNull(leader); |
| |
| assertTrue(cluster.start(peer3.getEndpoint())); |
| Thread.sleep(1000); |
| cluster.waitLeader(); |
| leader = cluster.getLeader(); |
| List<PeerId> thePeers = leader.listPeers(); |
| assertTrue(!thePeers.isEmpty()); |
| assertEquals(conf.getPeerSet(), new HashSet<>(thePeers)); |
| } |
| |
| static class ChangeArg { |
| TestCluster c; |
| List<PeerId> peers; |
| volatile boolean stop; |
| boolean dontRemoveFirstPeer; |
| |
| ChangeArg(TestCluster c, List<PeerId> peers, boolean stop, |
| boolean dontRemoveFirstPeer) { |
| super(); |
| this.c = c; |
| this.peers = peers; |
| this.stop = stop; |
| this.dontRemoveFirstPeer = dontRemoveFirstPeer; |
| } |
| |
| } |
| |
| private Future<?> startChangePeersThread(ChangeArg arg) { |
| Set<RaftError> expectedErrors = new HashSet<>(); |
| expectedErrors.add(RaftError.EBUSY); |
| expectedErrors.add(RaftError.EPERM); |
| expectedErrors.add(RaftError.ECATCHUP); |
| |
| ExecutorService executor = Executors.newSingleThreadExecutor(); |
| |
| return Utils.runInThread(executor, () -> { |
| try { |
| while (!arg.stop) { |
| arg.c.waitLeader(); |
| Node leader = arg.c.getLeader(); |
| if (leader == null) |
| continue; |
| // select peers in random |
| Configuration conf = new Configuration(); |
| if (arg.dontRemoveFirstPeer) |
| conf.addPeer(arg.peers.get(0)); |
| for (int i = 0; i < arg.peers.size(); i++) { |
| boolean select = ThreadLocalRandom.current().nextInt(64) < 32; |
| if (select && !conf.contains(arg.peers.get(i))) |
| conf.addPeer(arg.peers.get(i)); |
| } |
| if (conf.isEmpty()) { |
| LOG.warn("No peer has been selected"); |
| continue; |
| } |
| SynchronizedClosure done = new SynchronizedClosure(); |
| leader.changePeers(conf, done); |
| done.await(); |
| assertTrue(done.getStatus().isOk() || expectedErrors.contains(done.getStatus().getRaftError()), done.getStatus().toString()); |
| } |
| } |
| catch (InterruptedException e) { |
| LOG.error("ChangePeersThread is interrupted", e); |
| } |
| }); |
| } |
| |
| @Test |
| public void testChangePeersChaosWithSnapshot() throws Exception { |
| // start cluster |
| List<PeerId> peers = new ArrayList<>(); |
| peers.add(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT)); |
| cluster = new TestCluster("unittest", dataPath, peers, 1000); |
| assertTrue(cluster.start(peers.get(0).getEndpoint(), false, 2)); |
| // start other peers |
| for (int i = 1; i < 10; i++) { |
| PeerId peer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i); |
| peers.add(peer); |
| assertTrue(cluster.start(peer.getEndpoint())); |
| } |
| |
| ChangeArg arg = new ChangeArg(cluster, peers, false, false); |
| |
| Future<?> future = startChangePeersThread(arg); |
| for (int i = 0; i < 5000; ) { |
| cluster.waitLeader(); |
| Node leader = cluster.getLeader(); |
| if (leader == null) |
| continue; |
| SynchronizedClosure done = new SynchronizedClosure(); |
| Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes()), done); |
| leader.apply(task); |
| Status status = done.await(); |
| if (status.isOk()) { |
| if (++i % 100 == 0) |
| System.out.println("Progress:" + i); |
| } |
| else |
| assertEquals(RaftError.EPERM, status.getRaftError()); |
| } |
| arg.stop = true; |
| future.get(); |
| cluster.waitLeader(); |
| SynchronizedClosure done = new SynchronizedClosure(); |
| Node leader = cluster.getLeader(); |
| leader.changePeers(new Configuration(peers), done); |
| Status st = done.await(); |
| assertTrue(st.isOk(), st.getErrorMsg()); |
| cluster.ensureSame(); |
| assertEquals(10, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) |
| assertTrue(fsm.getLogs().size() >= 5000); |
| } |
| |
| @Test |
| public void testChangePeersChaosWithoutSnapshot() throws Exception { |
| // start cluster |
| List<PeerId> peers = new ArrayList<>(); |
| peers.add(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT)); |
| cluster = new TestCluster("unittest", dataPath, peers, 1000); |
| assertTrue(cluster.start(peers.get(0).getEndpoint(), false, 100000)); |
| // start other peers |
| for (int i = 1; i < 10; i++) { |
| PeerId peer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i); |
| peers.add(peer); |
| assertTrue(cluster.start(peer.getEndpoint(), false, 10000)); |
| } |
| |
| ChangeArg arg = new ChangeArg(cluster, peers, false, true); |
| |
| Future<?> future = startChangePeersThread(arg); |
| final int tasks = 5000; |
| for (int i = 0; i < tasks; ) { |
| cluster.waitLeader(); |
| Node leader = cluster.getLeader(); |
| if (leader == null) |
| continue; |
| SynchronizedClosure done = new SynchronizedClosure(); |
| Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes()), done); |
| leader.apply(task); |
| Status status = done.await(); |
| if (status.isOk()) { |
| if (++i % 100 == 0) |
| System.out.println("Progress:" + i); |
| } |
| else |
| assertEquals(RaftError.EPERM, status.getRaftError()); |
| } |
| arg.stop = true; |
| future.get(); |
| cluster.waitLeader(); |
| SynchronizedClosure done = new SynchronizedClosure(); |
| Node leader = cluster.getLeader(); |
| leader.changePeers(new Configuration(peers), done); |
| assertTrue(done.await().isOk()); |
| cluster.ensureSame(); |
| assertEquals(10, cluster.getFsms().size()); |
| for (MockStateMachine fsm : cluster.getFsms()) { |
| assertTrue(fsm.getLogs().size() >= tasks); |
| assertTrue(fsm.getLogs().size() - tasks < 100); |
| } |
| } |
| |
| @Test |
| public void testChangePeersChaosApplyTasks() throws Exception { |
| // start cluster |
| List<PeerId> peers = new ArrayList<>(); |
| peers.add(new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT)); |
| cluster = new TestCluster("unittest", dataPath, peers, 1000); |
| assertTrue(cluster.start(peers.get(0).getEndpoint(), false, 100000)); |
| // start other peers |
| for (int i = 1; i < 10; i++) { |
| PeerId peer = new PeerId(TestUtils.getLocalAddress(), TestUtils.INIT_PORT + i); |
| peers.add(peer); |
| assertTrue(cluster.start(peer.getEndpoint(), false, 100000)); |
| } |
| |
| final int threads = 3; |
| List<ChangeArg> args = new ArrayList<>(); |
| List<Future<?>> futures = new ArrayList<>(); |
| CountDownLatch latch = new CountDownLatch(threads); |
| |
| Executor executor = Executors.newFixedThreadPool(threads); |
| |
| for (int t = 0; t < threads; t++) { |
| ChangeArg arg = new ChangeArg(cluster, peers, false, true); |
| args.add(arg); |
| futures.add(startChangePeersThread(arg)); |
| |
| Utils.runInThread(executor, () -> { |
| try { |
| for (int i = 0; i < 5000; ) { |
| cluster.waitLeader(); |
| Node leader = cluster.getLeader(); |
| if (leader == null) |
| continue; |
| SynchronizedClosure done = new SynchronizedClosure(); |
| Task task = new Task(ByteBuffer.wrap(("hello" + i).getBytes()), done); |
| leader.apply(task); |
| Status status = done.await(); |
| if (status.isOk()) { |
| if (++i % 100 == 0) |
| System.out.println("Progress:" + i); |
| } |
| else |
| assertEquals(RaftError.EPERM, status.getRaftError()); |
| } |
| } |
| catch (Exception e) { |
| LOG.error("Failed to run tasks", e); |
| } |
| finally { |
| latch.countDown(); |
| } |
| }); |
| } |
| |
| latch.await(); |
| for (ChangeArg arg : args) |
| arg.stop = true; |
| for (Future<?> future : futures) |
| future.get(); |
| |
| cluster.waitLeader(); |
| SynchronizedClosure done = new SynchronizedClosure(); |
| Node leader = cluster.getLeader(); |
| leader.changePeers(new Configuration(peers), done); |
| assertTrue(done.await().isOk()); |
| cluster.ensureSame(); |
| assertEquals(10, cluster.getFsms().size()); |
| |
| for (MockStateMachine fsm : cluster.getFsms()) { |
| int logSize = fsm.getLogs().size(); |
| assertTrue(logSize >= 5000 * threads, "logSize= " + logSize); |
| assertTrue(logSize - 5000 * threads < 100, "logSize= " + logSize); |
| } |
| } |
| |
| @Test |
| public void testBlockedElection() throws Exception { |
| List<PeerId> peers = TestUtils.generatePeers(3); |
| cluster = new TestCluster("unittest", dataPath, peers); |
| |
| for (PeerId peer : peers) |
| assertTrue(cluster.start(peer.getEndpoint())); |
| |
| cluster.waitLeader(); |
| |
| Node leader = cluster.getLeader(); |
| |
| LOG.warn("Current leader {}, electTimeout={}", leader.getNodeId().getPeerId(), ((NodeImpl) leader).getOptions().getElectionTimeoutMs()); |
| |
| List<Node> followers = cluster.getFollowers(); |
| |
| for (Node follower : followers) { |
| NodeImpl follower0 = (NodeImpl) follower; |
| DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService(); |
| RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient(); |
| rpcClientEx.blockMessages((msg, nodeId) -> { |
| if (msg instanceof RpcRequests.RequestVoteRequest) { |
| RpcRequests.RequestVoteRequest msg0 = (RpcRequests.RequestVoteRequest)msg; |
| |
| return !msg0.preVote(); |
| } |
| |
| return false; |
| }); |
| } |
| |
| LOG.warn("Stop leader {}, curTerm={}", leader.getNodeId().getPeerId(), ((NodeImpl) leader).getCurrentTerm()); |
| |
| assertTrue(cluster.stop(leader.getNodeId().getPeerId().getEndpoint())); |
| |
| assertNull(cluster.getLeader()); |
| |
| Thread.sleep(2000); |
| |
| assertNull(cluster.getLeader()); |
| |
| for (Node follower : followers) { |
| NodeImpl follower0 = (NodeImpl) follower; |
| DefaultRaftClientService rpcService = (DefaultRaftClientService) follower0.getRpcClientService(); |
| RpcClientEx rpcClientEx = (RpcClientEx) rpcService.getRpcClient(); |
| rpcClientEx.stopBlock(); |
| } |
| |
| // elect new leader |
| cluster.waitLeader(); |
| leader = cluster.getLeader(); |
| LOG.info("Elect new leader is {}, curTerm={}", leader.getLeaderId(), ((NodeImpl) leader).getCurrentTerm()); |
| } |
| |
| private NodeOptions createNodeOptions() { |
| NodeOptions options = new NodeOptions(); |
| |
| options.setCommonExecutor(JRaftUtils.createCommonExecutor(options)); |
| options.setStripedExecutor(JRaftUtils.createAppendEntriesExecutor(options)); |
| |
| return options; |
| } |
| |
| /** |
| * TODO asch get rid of waiting for topology IGNITE-14832 |
| * |
| * @param cluster |
| * @param addr |
| * @param expected |
| * @param timeout |
| * @return |
| */ |
| private boolean waitForTopology(TestCluster cluster, Endpoint addr, int expected, long timeout) { |
| RaftGroupService grp = cluster.getServer(addr); |
| |
| if (grp == null) { |
| LOG.warn("Node has not been found {}", addr); |
| |
| return false; |
| } |
| |
| RpcServer rpcServer = grp.getRpcServer(); |
| |
| if (!(rpcServer instanceof IgniteRpcServer)) |
| return true; |
| |
| ClusterService service = ((IgniteRpcServer) grp.getRpcServer()).clusterService(); |
| |
| long stop = System.currentTimeMillis() + timeout; |
| |
| while (System.currentTimeMillis() < stop) { |
| if (service.topologyService().allMembers().size() >= expected) |
| return true; |
| |
| try { |
| Thread.sleep(50); |
| } |
| catch (InterruptedException e) { |
| return false; |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param cond The condition. |
| * @param timeout The timeout. |
| * @return {@code True} if the condition is satisfied. |
| */ |
| private boolean waitForCondition(BooleanSupplier cond, long timeout) { |
| long stop = System.currentTimeMillis() + timeout; |
| |
| while (System.currentTimeMillis() < stop) { |
| if (cond.getAsBoolean()) |
| return true; |
| |
| try { |
| Thread.sleep(50); |
| } |
| catch (InterruptedException e) { |
| return false; |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param groupId Group id. |
| * @param peerId Peer id. |
| * @param nodeOptions Node options. |
| * @return Raft group service. |
| */ |
| private RaftGroupService createService(String groupId, PeerId peerId, NodeOptions nodeOptions) { |
| Configuration initialConf = nodeOptions.getInitialConf(); |
| nodeOptions.setStripes(1); |
| |
| StripedDisruptor<FSMCallerImpl.ApplyTask> fsmCallerDusruptor; |
| StripedDisruptor<NodeImpl.LogEntryAndClosure> nodeDisruptor; |
| StripedDisruptor<ReadOnlyServiceImpl.ReadIndexEvent> readOnlyServiceDisruptor; |
| StripedDisruptor<LogManagerImpl.StableClosureEvent> logManagerDisruptor; |
| |
| nodeOptions.setfSMCallerExecutorDisruptor(fsmCallerDusruptor = new StripedDisruptor<>( |
| "JRaft-FSMCaller-Disruptor_ITNodeTest", |
| nodeOptions.getRaftOptions().getDisruptorBufferSize(), |
| () -> new FSMCallerImpl.ApplyTask(), |
| nodeOptions.getStripes())); |
| |
| nodeOptions.setNodeApplyDisruptor(nodeDisruptor = new StripedDisruptor<>( |
| "JRaft-NodeImpl-Disruptor_ITNodeTest", |
| nodeOptions.getRaftOptions().getDisruptorBufferSize(), |
| () -> new NodeImpl.LogEntryAndClosure(), |
| nodeOptions.getStripes())); |
| |
| nodeOptions.setReadOnlyServiceDisruptor(readOnlyServiceDisruptor = new StripedDisruptor<>( |
| "JRaft-ReadOnlyService-Disruptor_ITNodeTest", |
| nodeOptions.getRaftOptions().getDisruptorBufferSize(), |
| () -> new ReadOnlyServiceImpl.ReadIndexEvent(), |
| nodeOptions.getStripes())); |
| |
| nodeOptions.setLogManagerDisruptor(logManagerDisruptor = new StripedDisruptor<>( |
| "JRaft-LogManager-Disruptor_ITNodeTest", |
| nodeOptions.getRaftOptions().getDisruptorBufferSize(), |
| () -> new LogManagerImpl.StableClosureEvent(), |
| nodeOptions.getStripes())); |
| |
| Stream<PeerId> peers = initialConf == null ? |
| Stream.empty() : |
| Stream.concat(initialConf.getPeers().stream(), initialConf.getLearners().stream()); |
| |
| NodeFinder nodeFinder = peers |
| .map(PeerId::getEndpoint) |
| .map(JRaftUtils::addressFromEndpoint) |
| .collect(collectingAndThen(toList(), StaticNodeFinder::new)); |
| |
| var nodeManager = new NodeManager(); |
| |
| ClusterService clusterService = createClusterService(peerId.getEndpoint(), nodeFinder); |
| |
| IgniteRpcServer rpcServer = new TestIgniteRpcServer(clusterService, nodeManager, nodeOptions); |
| |
| nodeOptions.setRpcClient(new IgniteRpcClient(clusterService)); |
| |
| clusterService.start(); |
| |
| var service = new RaftGroupService(groupId, peerId, nodeOptions, rpcServer, nodeManager) { |
| @Override public synchronized void shutdown() { |
| super.shutdown(); |
| |
| clusterService.stop(); |
| |
| fsmCallerDusruptor.shutdown(); |
| nodeDisruptor.shutdown(); |
| readOnlyServiceDisruptor.shutdown(); |
| logManagerDisruptor.shutdown(); |
| } |
| }; |
| |
| services.add(service); |
| |
| return service; |
| } |
| |
| /** |
| * Creates a non-started {@link ClusterService}. |
| */ |
| private static ClusterService createClusterService(Endpoint endpoint, NodeFinder nodeFinder) { |
| var registry = new TestMessageSerializationRegistryImpl(); |
| |
| var clusterConfig = new ClusterLocalConfiguration(endpoint.toString(), endpoint.getPort(), nodeFinder, registry); |
| |
| var clusterServiceFactory = new TestScaleCubeClusterServiceFactory(); |
| |
| return clusterServiceFactory.createClusterService(clusterConfig); |
| } |
| |
| private void sendTestTaskAndWait(final Node node) throws InterruptedException { |
| this.sendTestTaskAndWait(node, 0, 10, RaftError.SUCCESS); |
| } |
| |
| private void sendTestTaskAndWait(final Node node, int amount) throws InterruptedException { |
| this.sendTestTaskAndWait(node, 0, amount, RaftError.SUCCESS); |
| } |
| |
| private void sendTestTaskAndWait(final Node node, final RaftError err) throws InterruptedException { |
| this.sendTestTaskAndWait(node, 0, 10, err); |
| } |
| |
| private void sendTestTaskAndWait(final Node node, final int start, int amount, |
| final RaftError err) throws InterruptedException { |
| final CountDownLatch latch = new CountDownLatch(amount); |
| for (int i = start; i < start + amount; i++) { |
| final ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes()); |
| final Task task = new Task(data, new ExpectClosure(err, latch)); |
| node.apply(task); |
| } |
| waitLatch(latch); |
| } |
| |
| private void sendTestTaskAndWait(final Node node, final int start, |
| final RaftError err) throws InterruptedException { |
| sendTestTaskAndWait(node, start, 10, err); |
| } |
| |
| @SuppressWarnings("SameParameterValue") |
| private void sendTestTaskAndWait(final String prefix, final Node node, final int code) throws InterruptedException { |
| sendTestTaskAndWait(prefix, node, 10, code); |
| } |
| |
| @SuppressWarnings("SameParameterValue") |
| private void sendTestTaskAndWait(final String prefix, final Node node, int amount, |
| final int code) throws InterruptedException { |
| final CountDownLatch latch = new CountDownLatch(10); |
| for (int i = 0; i < amount; i++) { |
| final ByteBuffer data = ByteBuffer.wrap((prefix + i).getBytes()); |
| final Task task = new Task(data, new ExpectClosure(code, null, latch)); |
| node.apply(task); |
| } |
| waitLatch(latch); |
| } |
| |
| private void triggerLeaderSnapshot(TestCluster cluster, Node leader) throws InterruptedException { |
| triggerLeaderSnapshot(cluster, leader, 1); |
| } |
| |
| private void triggerLeaderSnapshot(TestCluster cluster, Node leader, int times) |
| throws InterruptedException { |
| // trigger leader snapshot |
| // first snapshot will be triggered randomly |
| int snapshotTimes = cluster.getLeaderFsm().getSaveSnapshotTimes(); |
| assertTrue(snapshotTimes == times - 1 || snapshotTimes == times, "snapshotTimes=" + snapshotTimes + ", times=" + times); |
| CountDownLatch latch = new CountDownLatch(1); |
| leader.snapshot(new ExpectClosure(latch)); |
| waitLatch(latch); |
| assertEquals(snapshotTimes + 1, cluster.getLeaderFsm().getSaveSnapshotTimes()); |
| } |
| |
| private void waitLatch(final CountDownLatch latch) throws InterruptedException { |
| assertTrue(latch.await(30, TimeUnit.SECONDS)); |
| } |
| |
| @SuppressWarnings({"unused", "SameParameterValue"}) |
| private boolean assertReadIndex(Node node, int index) throws InterruptedException { |
| CountDownLatch latch = new CountDownLatch(1); |
| byte[] requestContext = TestUtils.getRandomBytes(); |
| AtomicBoolean success = new AtomicBoolean(false); |
| node.readIndex(requestContext, new ReadIndexClosure() { |
| |
| @Override |
| public void run(Status status, long theIndex, byte[] reqCtx) { |
| if (status.isOk()) { |
| assertEquals(index, theIndex); |
| assertArrayEquals(requestContext, reqCtx); |
| success.set(true); |
| } |
| else { |
| assertTrue(status.getErrorMsg().contains("RPC exception:Check connection["), status.getErrorMsg()); |
| assertTrue(status.getErrorMsg().contains("] fail and try to create new one"), status.getErrorMsg()); |
| } |
| latch.countDown(); |
| } |
| }); |
| latch.await(); |
| return success.get(); |
| } |
| } |