| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ratis.statemachine; |
| |
| import static org.apache.ratis.server.impl.StateMachineMetrics.RATIS_STATEMACHINE_METRICS; |
| import static org.apache.ratis.server.impl.StateMachineMetrics.RATIS_STATEMACHINE_METRICS_DESC; |
| import static org.apache.ratis.server.impl.StateMachineMetrics.STATEMACHINE_TAKE_SNAPSHOT_TIMER; |
| import static org.apache.ratis.metrics.RatisMetrics.RATIS_APPLICATION_NAME_METRICS; |
| import static org.apache.ratis.server.storage.RaftStorageTestUtils.getLogUnsafe; |
| |
| import org.apache.ratis.BaseTest; |
| import org.apache.ratis.metrics.LongCounter; |
| import org.apache.ratis.metrics.impl.DefaultTimekeeperImpl; |
| import org.apache.ratis.protocol.RaftPeer; |
| import org.apache.ratis.server.impl.MiniRaftCluster; |
| import org.apache.ratis.RaftTestUtil; |
| import org.apache.ratis.RaftTestUtil.SimpleMessage; |
| import org.apache.ratis.client.RaftClient; |
| import org.apache.ratis.conf.RaftProperties; |
| import org.apache.ratis.metrics.MetricRegistries; |
| import org.apache.ratis.metrics.MetricRegistryInfo; |
| import org.apache.ratis.metrics.RatisMetricRegistry; |
| import org.apache.ratis.protocol.RaftClientReply; |
| import org.apache.ratis.protocol.RaftPeerId; |
| import org.apache.ratis.server.RaftServer; |
| import org.apache.ratis.server.RaftServerConfigKeys; |
| import org.apache.ratis.server.impl.RaftServerTestUtil; |
| import org.apache.ratis.server.metrics.RaftServerMetricsImpl; |
| import org.apache.ratis.server.raftlog.RaftLog; |
| import org.apache.ratis.server.raftlog.segmented.LogSegmentPath; |
| import org.apache.ratis.proto.RaftProtos.LogEntryProto; |
| import org.apache.ratis.server.storage.RaftStorageTestUtils; |
| import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing; |
| import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; |
| import org.apache.ratis.util.FileUtils; |
| import org.apache.ratis.util.JavaUtils; |
| import org.apache.ratis.util.LifeCycle; |
| import org.apache.ratis.util.Slf4jUtils; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.stream.Collectors; |
| import java.util.stream.LongStream; |
| |
| import org.apache.ratis.thirdparty.com.codahale.metrics.Timer; |
| import org.slf4j.event.Level; |
| |
| public abstract class RaftSnapshotBaseTest extends BaseTest { |
| { |
| Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG); |
| Slf4jUtils.setLogLevel(RaftLog.LOG, Level.DEBUG); |
| Slf4jUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); |
| } |
| |
| static final Logger LOG = LoggerFactory.getLogger(RaftSnapshotBaseTest.class); |
| private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10; |
| |
| public static List<File> getSnapshotFiles(MiniRaftCluster cluster, long startIndex, long endIndex) { |
| final RaftServer.Division leader = cluster.getLeader(); |
| final SimpleStateMachineStorage storage = SimpleStateMachine4Testing.get(leader).getStateMachineStorage(); |
| final long term = leader.getInfo().getCurrentTerm(); |
| return LongStream.range(startIndex, endIndex) |
| .mapToObj(i -> storage.getSnapshotFile(term, i)) |
| .collect(Collectors.toList()); |
| } |
| |
| |
| public static void assertLeaderContent(MiniRaftCluster cluster) throws Exception { |
| final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster); |
| assertLogContent(leader, true); |
| } |
| |
| public static void assertLogContent(RaftServer.Division server, boolean isLeader) throws Exception { |
| final RaftLog log = server.getRaftLog(); |
| final long lastIndex = log.getLastEntryTermIndex().getIndex(); |
| final LogEntryProto e = getLogUnsafe(log, lastIndex); |
| Assert.assertTrue(e.hasMetadataEntry()); |
| |
| JavaUtils.attemptRepeatedly(() -> { |
| Assert.assertEquals(log.getLastCommittedIndex() - 1, e.getMetadataEntry().getCommitIndex()); |
| return null; |
| }, 50, BaseTest.HUNDRED_MILLIS, "CheckMetadataEntry", LOG); |
| |
| SimpleStateMachine4Testing simpleStateMachine = SimpleStateMachine4Testing.get(server); |
| if (isLeader) { |
| Assert.assertTrue("Not notified as a leader", simpleStateMachine.isNotifiedAsLeader()); |
| } |
| final LogEntryProto[] entries = simpleStateMachine.getContent(); |
| long message = 0; |
| for (int i = 0; i < entries.length; i++) { |
| LOG.info("{}) {} {}", i, message, entries[i].toString().replace("\n", ", ")); |
| if (entries[i].hasStateMachineLogEntry()) { |
| final SimpleMessage m = new SimpleMessage("m" + message++); |
| Assert.assertArrayEquals(m.getContent().toByteArray(), |
| entries[i].getStateMachineLogEntry().getLogData().toByteArray()); |
| } |
| } |
| } |
| |
| private MiniRaftCluster cluster; |
| |
| public abstract MiniRaftCluster.Factory<?> getFactory(); |
| |
| @Before |
| public void setup() throws IOException { |
| final RaftProperties prop = new RaftProperties(); |
| prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, |
| SimpleStateMachine4Testing.class, StateMachine.class); |
| RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold( |
| prop, SNAPSHOT_TRIGGER_THRESHOLD); |
| RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, true); |
| this.cluster = getFactory().newCluster(1, prop); |
| cluster.start(); |
| } |
| |
| @After |
| public void tearDown() { |
| if (cluster != null) { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Keep generating writing traffic and make sure snapshots are taken. |
| * We then restart the whole raft peer and check if it can correctly load |
| * snapshots + raft log. |
| */ |
| @Test |
| public void testRestartPeer() throws Exception { |
| RaftTestUtil.waitForLeader(cluster); |
| final RaftPeerId leaderId = cluster.getLeader().getId(); |
| int i = 0; |
| try(final RaftClient client = cluster.createClient(leaderId)) { |
| for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { |
| RaftClientReply reply = client.io().send(new SimpleMessage("m" + i)); |
| Assert.assertTrue(reply.isSuccess()); |
| } |
| } |
| |
| final long nextIndex = cluster.getLeader().getRaftLog().getNextIndex(); |
| LOG.info("nextIndex = {}", nextIndex); |
| // wait for the snapshot to be done |
| final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex); |
| JavaUtils.attemptRepeatedly(() -> { |
| Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists)); |
| return null; |
| }, 10, ONE_SECOND, "snapshotFile.exist", LOG); |
| |
| // restart the peer and check if it can correctly load snapshot |
| cluster.restart(false); |
| try { |
| // 200 messages + two leader elections --> last committed = 201 |
| assertLeaderContent(cluster); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| public static boolean exists(File f) { |
| if (f.exists()) { |
| LOG.info("File exists: " + f); |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Basic test for install snapshot: start a one node cluster and let it |
| * generate a snapshot. Then delete the log and restart the node, and add more |
| * nodes as followers. |
| */ |
| @Test |
| public void testBasicInstallSnapshot() throws Exception { |
| final List<LogSegmentPath> logs; |
| int i = 0; |
| try { |
| RaftTestUtil.waitForLeader(cluster); |
| final RaftPeerId leaderId = cluster.getLeader().getId(); |
| |
| try(final RaftClient client = cluster.createClient(leaderId)) { |
| for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { |
| RaftClientReply reply = client.io().send(new SimpleMessage("m" + i)); |
| Assert.assertTrue(reply.isSuccess()); |
| } |
| } |
| |
| // wait for the snapshot to be done |
| final long nextIndex = cluster.getLeader().getRaftLog().getNextIndex(); |
| LOG.info("nextIndex = {}", nextIndex); |
| final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex); |
| JavaUtils.attemptRepeatedly(() -> { |
| Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists)); |
| return null; |
| }, 10, ONE_SECOND, "snapshotFile.exist", LOG); |
| verifyTakeSnapshotMetric(cluster.getLeader()); |
| logs = LogSegmentPath.getLogSegmentPaths(cluster.getLeader().getRaftStorage()); |
| } finally { |
| cluster.shutdown(); |
| } |
| |
| // delete the log segments from the leader |
| for (LogSegmentPath path : logs) { |
| FileUtils.delete(path.getPath()); |
| } |
| |
| // restart the peer |
| LOG.info("Restarting the cluster"); |
| cluster.restart(false); |
| try { |
| assertLeaderContent(cluster); |
| |
| // generate some more traffic |
| try(final RaftClient client = cluster.createClient(cluster.getLeader().getId())) { |
| Assert.assertTrue(client.io().send(new SimpleMessage("m" + i)).isSuccess()); |
| } |
| |
| // add two more peers |
| String[] newPeers = new String[]{"s3", "s4"}; |
| MiniRaftCluster.PeerChanges change = cluster.addNewPeers( |
| newPeers, true, false); |
| // trigger setConfiguration |
| RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf), |
| peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray()))); |
| |
| for (String newPeer : newPeers) { |
| final RaftServer.Division s = cluster.getDivision(RaftPeerId.valueOf(newPeer)); |
| SimpleStateMachine4Testing simpleStateMachine = SimpleStateMachine4Testing.get(s); |
| Assert.assertSame(LifeCycle.State.RUNNING, simpleStateMachine.getLifeCycleState()); |
| } |
| |
| // Verify installSnapshot counter on leader before restart. |
| verifyInstallSnapshotMetric(cluster.getLeader()); |
| RaftServerTestUtil.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); |
| |
| Timer timer = getTakeSnapshotTimer(cluster.getLeader()); |
| long count = timer.getCount(); |
| |
| // restart the peer and check if it can correctly handle conf change |
| cluster.restartServer(cluster.getLeader().getId(), false); |
| assertLeaderContent(cluster); |
| |
| // verify that snapshot was taken when stopping the server |
| Assert.assertTrue(count < timer.getCount()); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| /** |
| * Test for install snapshot during a peer bootstrap: start a one node cluster |
| * and let it generate a snapshot. Add another node and verify that the new |
| * node installs a snapshot from the old node. |
| */ |
| @Test |
| public void testInstallSnapshotDuringBootstrap() throws Exception { |
| int i = 0; |
| try { |
| RaftTestUtil.waitForLeader(cluster); |
| final RaftPeerId leaderId = cluster.getLeader().getId(); |
| |
| try(final RaftClient client = cluster.createClient(leaderId)) { |
| for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) { |
| RaftClientReply reply = client.io().send(new SimpleMessage("m" + i)); |
| Assert.assertTrue(reply.isSuccess()); |
| } |
| } |
| |
| // wait for the snapshot to be done |
| final long nextIndex = cluster.getLeader().getRaftLog().getNextIndex(); |
| LOG.info("nextIndex = {}", nextIndex); |
| final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex); |
| JavaUtils.attemptRepeatedly(() -> { |
| Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists)); |
| return null; |
| }, 10, ONE_SECOND, "snapshotFile.exist", LOG); |
| verifyTakeSnapshotMetric(cluster.getLeader()); |
| |
| assertLeaderContent(cluster); |
| |
| // add two more peers |
| String[] newPeers = new String[]{"s3", "s4"}; |
| MiniRaftCluster.PeerChanges change = cluster.addNewPeers( |
| newPeers, true, false); |
| // trigger setConfiguration |
| RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf), |
| peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray()))); |
| |
| for (String newPeer : newPeers) { |
| final RaftServer.Division s = cluster.getDivision(RaftPeerId.valueOf(newPeer)); |
| SimpleStateMachine4Testing simpleStateMachine = SimpleStateMachine4Testing.get(s); |
| Assert.assertSame(LifeCycle.State.RUNNING, simpleStateMachine.getLifeCycleState()); |
| } |
| |
| // Verify installSnapshot counter on leader |
| verifyInstallSnapshotMetric(cluster.getLeader()); |
| RaftServerTestUtil.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null); |
| } finally { |
| cluster.shutdown(); |
| } |
| } |
| |
| protected void verifyInstallSnapshotMetric(RaftServer.Division leader) { |
| final LongCounter installSnapshotCounter = ((RaftServerMetricsImpl)leader.getRaftServerMetrics()) |
| .getNumInstallSnapshot(); |
| Assert.assertNotNull(installSnapshotCounter); |
| Assert.assertTrue(installSnapshotCounter.getCount() >= 1); |
| } |
| |
| private static void verifyTakeSnapshotMetric(RaftServer.Division leader) { |
| Timer timer = getTakeSnapshotTimer(leader); |
| Assert.assertTrue(timer.getCount() > 0); |
| } |
| |
| private static Timer getTakeSnapshotTimer(RaftServer.Division leader) { |
| MetricRegistryInfo info = new MetricRegistryInfo(leader.getMemberId().toString(), |
| RATIS_APPLICATION_NAME_METRICS, |
| RATIS_STATEMACHINE_METRICS, RATIS_STATEMACHINE_METRICS_DESC); |
| Optional<RatisMetricRegistry> opt = MetricRegistries.global().get(info); |
| Assert.assertTrue(opt.isPresent()); |
| RatisMetricRegistry metricRegistry = opt.get(); |
| Assert.assertNotNull(metricRegistry); |
| return ((DefaultTimekeeperImpl)metricRegistry.timer(STATEMACHINE_TAKE_SNAPSHOT_TIMER)).getTimer(); |
| } |
| } |