blob: 38f2e43aa751ce76aa20298a4bba5c8047b29026 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis;
import org.apache.log4j.Level;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Log4jUtils;
import org.apache.ratis.util.SizeInBytes;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
static final Logger LOG = LoggerFactory.getLogger(InstallSnapshotNotificationTests.class);
{
Log4jUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
}
{
final RaftProperties prop = getProperties();
prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
StateMachine4InstallSnapshotNotificationTests.class, StateMachine.class);
RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(prop, false);
RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
prop, SNAPSHOT_TRIGGER_THRESHOLD);
RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, true);
RaftServerConfigKeys.Log.setPurgeGap(prop, PURGE_GAP);
RaftServerConfigKeys.Log.setSegmentSizeMax(prop, SizeInBytes.valueOf(1024)); // 1k segment
}
private static final int SNAPSHOT_TRIGGER_THRESHOLD = 64;
private static final int PURGE_GAP = 8;
private static final AtomicReference<SnapshotInfo> leaderSnapshotInfoRef = new AtomicReference<>();
private static final AtomicInteger numSnapshotRequests = new AtomicInteger();
private static class StateMachine4InstallSnapshotNotificationTests extends SimpleStateMachine4Testing {
@Override
public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
RaftProtos.RoleInfoProto roleInfoProto,
TermIndex termIndex) {
if (!roleInfoProto.getFollowerInfo().hasLeaderInfo()) {
return JavaUtils.completeExceptionally(new IOException("Failed " +
"notifyInstallSnapshotFromLeader due to missing leader info"));
}
numSnapshotRequests.incrementAndGet();
final SingleFileSnapshotInfo leaderSnapshotInfo = (SingleFileSnapshotInfo) leaderSnapshotInfoRef.get();
LOG.info("{}: leaderSnapshotInfo = {}", getId(), leaderSnapshotInfo);
if (leaderSnapshotInfo == null) {
return super.notifyInstallSnapshotFromLeader(roleInfoProto, termIndex);
}
try {
Path leaderSnapshotFile = leaderSnapshotInfo.getFile().getPath();
File followerSnapshotFilePath = new File(getSMdir(),
leaderSnapshotFile.getFileName().toString());
Files.copy(leaderSnapshotFile, followerSnapshotFilePath.toPath());
} catch (IOException e) {
LOG.error("Failed notifyInstallSnapshotFromLeader", e);
return JavaUtils.completeExceptionally(e);
}
return CompletableFuture.completedFuture(leaderSnapshotInfo.getTermIndex());
}
}
/**
* Basic test for install snapshot notification: start a one node cluster
* (disable install snapshot option) and let it generate a snapshot. Then
* delete the log and restart the node, and add more nodes as followers.
* The new follower nodes should get a install snapshot notification.
*/
@Test
public void testAddNewFollowers() throws Exception {
runWithNewCluster(1, this::testAddNewFollowers);
}
private void testAddNewFollowers(CLUSTER cluster) throws Exception {
leaderSnapshotInfoRef.set(null);
final List<LogSegmentPath> logs;
int i = 0;
try {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();
try(final RaftClient client = cluster.createClient(leaderId)) {
for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
RaftClientReply
reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
}
// wait for the snapshot to be done
final RaftServer.Division leader = cluster.getLeader();
final long nextIndex = leader.getRaftLog().getNextIndex();
LOG.info("nextIndex = {}", nextIndex);
final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
JavaUtils.attemptRepeatedly(() -> {
Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
return null;
}, 10, ONE_SECOND, "snapshotFile.exist", LOG);
logs = LogSegmentPath.getLogSegmentPaths(leader.getRaftStorage());
} finally {
cluster.shutdown();
}
// delete the log segments from the leader
LOG.info("Delete logs {}", logs);
for (LogSegmentPath path : logs) {
FileUtils.deleteFully(path.getPath()); // the log may be already puged
}
// restart the peer
LOG.info("Restarting the cluster");
cluster.restart(false);
try {
RaftSnapshotBaseTest.assertLeaderContent(cluster);
// generate some more traffic
try(final RaftClient client = cluster.createClient(cluster.getLeader().getId())) {
Assert.assertTrue(client.io().send(new RaftTestUtil.SimpleMessage("m" + i)).isSuccess());
}
final SnapshotInfo leaderSnapshotInfo = cluster.getLeader().getStateMachine().getLatestSnapshot();
final boolean set = leaderSnapshotInfoRef.compareAndSet(null, leaderSnapshotInfo);
Assert.assertTrue(set);
// add two more peers
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
true);
// trigger setConfiguration
cluster.setConfiguration(change.allPeersInNewConf);
RaftServerTestUtil
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
// Check the installed snapshot index on each Follower matches with the
// leader snapshot.
for (RaftServer.Division follower : cluster.getFollowers()) {
Assert.assertEquals(leaderSnapshotInfo.getIndex(),
RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower));
}
// restart the peer and check if it can correctly handle conf change
cluster.restartServer(cluster.getLeader().getId(), false);
RaftSnapshotBaseTest.assertLeaderContent(cluster);
} finally {
cluster.shutdown();
}
}
@Test
public void testRestartFollower() throws Exception {
runWithNewCluster(3, this::testRestartFollower);
}
private void testRestartFollower(CLUSTER cluster) throws Exception {
leaderSnapshotInfoRef.set(null);
int i = 0;
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = leader.getId();
try (final RaftClient client = cluster.createClient(leaderId)) {
for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
}
// wait for the snapshot to be done
final long oldLeaderNextIndex = leader.getRaftLog().getNextIndex();
{
LOG.info("{}: oldLeaderNextIndex = {}", leaderId, oldLeaderNextIndex);
final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
oldLeaderNextIndex - SNAPSHOT_TRIGGER_THRESHOLD, oldLeaderNextIndex);
JavaUtils.attemptRepeatedly(() -> {
Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
return null;
}, 10, ONE_SECOND, "snapshotFile.exist", LOG);
}
final RaftPeerId followerId = cluster.getFollowers().get(0).getId();
cluster.killServer(followerId);
// generate some more traffic
try (final RaftClient client = cluster.createClient(leader.getId())) {
Assert.assertTrue(client.io().send(new RaftTestUtil.SimpleMessage("m" + i)).isSuccess());
}
FIVE_SECONDS.sleep();
cluster.restartServer(followerId, false);
final RaftServer.Division follower = cluster.getDivision(followerId);
JavaUtils.attempt(() -> {
final long newLeaderNextIndex = leader.getRaftLog().getNextIndex();
LOG.info("{}: newLeaderNextIndex = {}", leaderId, newLeaderNextIndex);
Assert.assertTrue(newLeaderNextIndex > oldLeaderNextIndex);
Assert.assertEquals(newLeaderNextIndex, follower.getRaftLog().getNextIndex());
}, 10, ONE_SECOND, "followerNextIndex", LOG);
}
@Test
public void testInstallSnapshotNotificationCount() throws Exception {
runWithNewCluster(3, this::testInstallSnapshotNotificationCount);
}
private void testInstallSnapshotNotificationCount(CLUSTER cluster) throws Exception {
leaderSnapshotInfoRef.set(null);
numSnapshotRequests.set(0);
int i = 0;
try {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();
// Let a few heartbeats pass.
ONE_SECOND.sleep();
Assert.assertEquals(0, numSnapshotRequests.get());
// Generate data.
try(final RaftClient client = cluster.createClient(leaderId)) {
for (; i < 10; i++) {
RaftClientReply
reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
Assert.assertTrue(reply.isSuccess());
}
}
// Wait until index has been updated
RaftTestUtil.waitFor(
() -> cluster.getLeader().getStateMachine().getLastAppliedTermIndex().getIndex() == 20,
300, 15000);
// Take snapshot and check result.
long snapshotIndex = cluster.getLeader().getStateMachine().takeSnapshot();
Assert.assertEquals(20, snapshotIndex);
final SnapshotInfo leaderSnapshotInfo = cluster.getLeader().getStateMachine().getLatestSnapshot();
Assert.assertEquals(20, leaderSnapshotInfo.getIndex());
final boolean set = leaderSnapshotInfoRef.compareAndSet(null, leaderSnapshotInfo);
Assert.assertTrue(set);
// Wait for the snapshot to be done.
final RaftServer.Division leader = cluster.getLeader();
final long nextIndex = leader.getRaftLog().getNextIndex();
Assert.assertEquals(21, nextIndex);
// End index is exclusive.
final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
0, nextIndex);
JavaUtils.attemptRepeatedly(() -> {
Assert.assertTrue(snapshotFiles.stream().anyMatch(RaftSnapshotBaseTest::exists));
return null;
}, 10, ONE_SECOND, "snapshotFile.exist", LOG);
// Clear all log files and reset cached log start index.
long snapshotInstallIndex =
leader.getRaftLog().onSnapshotInstalled(leader.getRaftLog().getLastCommittedIndex()).get();
Assert.assertEquals(20, snapshotInstallIndex);
// Check that logs are gone.
Assert.assertEquals(0,
LogSegmentPath.getLogSegmentPaths(leader.getRaftStorage()).size());
Assert.assertEquals(RaftLog.INVALID_LOG_INDEX, leader.getRaftLog().getStartIndex());
// Allow some heartbeats to go through, then make sure none of them had
// snapshot requests.
ONE_SECOND.sleep();
Assert.assertEquals(0, numSnapshotRequests.get());
// Make sure leader and followers are still up to date.
for (RaftServer.Division follower : cluster.getFollowers()) {
Assert.assertEquals(
leader.getRaftLog().getNextIndex(),
follower.getRaftLog().getNextIndex());
}
// Add two more peers who will need snapshots from the leader.
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
true);
// trigger setConfiguration
cluster.setConfiguration(change.allPeersInNewConf);
RaftServerTestUtil
.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
// Generate more data.
try (final RaftClient client = cluster.createClient(leader.getId())) {
Assert.assertTrue(client.io().send(new RaftTestUtil.SimpleMessage("m" + i)).isSuccess());
}
// Make sure leader and followers are still up to date.
for (RaftServer.Division follower : cluster.getFollowers()) {
// Give follower slightly time to catch up
RaftTestUtil.waitFor(
() -> leader.getRaftLog().getNextIndex() == follower.getRaftLog().getNextIndex(),
300, 15000);
}
// Make sure each new peer got one snapshot notification.
Assert.assertEquals(2, numSnapshotRequests.get());
} finally {
cluster.shutdown();
}
}
}