blob: 58a51e0514a57264369f8c632c32a5a92270a6d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.DataStreamServer;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedConsumer;
import org.junit.Assert;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class RaftServerTestUtil {
static final Logger LOG = LoggerFactory.getLogger(RaftServerTestUtil.class);
public static final RaftGroupMemberId TEST_MEMBER_ID = RaftGroupMemberId.valueOf(
RaftPeerId.valueOf("test"), RaftGroupId.emptyGroupId());
public static DelayLocalExecutionInjection getLogSyncDelay() {
return new DelayLocalExecutionInjection(RaftServerImpl.LOG_SYNC);
}
public static void setStateMachineUpdaterLogLevel(Level level) {
Slf4jUtils.setLogLevel(StateMachineUpdater.LOG, level);
}
public static void setWatchRequestsLogLevel(Level level) {
Slf4jUtils.setLogLevel(WatchRequests.LOG, level);
}
public static void setPendingRequestsLogLevel(Level level) {
Slf4jUtils.setLogLevel(PendingRequests.LOG, level);
}
public static void waitAndCheckNewConf(MiniRaftCluster cluster,
RaftPeer[] peers, int numOfNewPeers, int numOfRemovedPeers, Collection<RaftPeerId> deadPeers)
throws Exception {
final TimeDuration sleepTime = cluster.getTimeoutMax().apply(n -> n * (numOfRemovedPeers + numOfNewPeers + 2));
JavaUtils.attempt(() -> waitAndCheckNewConf(cluster, Arrays.asList(peers), deadPeers),
10, sleepTime, "waitAndCheckNewConf", LOG);
}
public static void waitAndCheckNewConf(MiniRaftCluster cluster,
RaftPeer[] peers, int numOfRemovedPeers, Collection<RaftPeerId> deadPeers)
throws Exception {
final TimeDuration sleepTime = cluster.getTimeoutMax().apply(n -> n * (numOfRemovedPeers + 2));
JavaUtils.attempt(() -> waitAndCheckNewConf(cluster, Arrays.asList(peers), deadPeers),
10, sleepTime, "waitAndCheckNewConf", LOG);
}
private static void waitAndCheckNewConf(MiniRaftCluster cluster,
Collection<RaftPeer> peers, Collection<RaftPeerId> deadPeers) {
LOG.info("waitAndCheckNewConf: peers={}, deadPeers={}, {}", peers, deadPeers, cluster.printServers());
Assert.assertNotNull(cluster.getLeader());
int numIncluded = 0;
int deadIncluded = 0;
final RaftConfigurationImpl current = RaftConfigurationImpl.newBuilder()
.setConf(peers).setLogEntryIndex(0).build();
for (RaftServer.Division d : cluster.iterateDivisions()) {
final RaftServerImpl server = (RaftServerImpl)d;
LOG.info("checking {}", server);
if (deadPeers != null && deadPeers.contains(server.getId())) {
if (current.containsInConf(server.getId())) {
deadIncluded++;
}
continue;
}
final RaftConfigurationImpl conf = server.getState().getRaftConf();
if (current.containsInConf(server.getId())) {
numIncluded++;
Assert.assertTrue(conf.isStable());
Assert.assertTrue(conf.hasNoChange(peers, Collections.emptyList()));
} else if (server.getInfo().isAlive()) {
// The server is successfully removed from the conf
// It may not be shutdown since it may not be able to talk to the new leader (who is not in its conf).
Assert.assertTrue(conf.isStable());
Assert.assertFalse(conf.containsInConf(server.getId()));
}
}
Assert.assertEquals(peers.size(), numIncluded + deadIncluded);
}
public static long getNextIndex(RaftServer.Division server) {
return ((RaftServerImpl)server).getState().getNextIndex();
}
public static long getLatestInstalledSnapshotIndex(RaftServer.Division server) {
return ((RaftServerImpl)server).getState().getLatestInstalledSnapshotIndex();
}
static ServerState getState(RaftServer.Division server) {
return ((RaftServerImpl)server).getState();
}
public static ConfigurationManager getConfigurationManager(RaftServer.Division server) {
return (ConfigurationManager) RaftTestUtil.getDeclaredField(getState(server), "configurationManager");
}
public static Logger getTransactionContextLog() {
return TransactionManager.LOG;
}
public static Map<TermIndex, MemoizedSupplier<TransactionContext>> getTransactionContextMap(
RaftServer.Division server) {
return ((RaftServerImpl)server).getTransactionContextMapForTesting();
}
public static RaftConfiguration newRaftConfiguration(Collection<RaftPeer> peers) {
return RaftConfigurationImpl.newBuilder().setConf(peers).build();
}
public static void setRaftConf(RaftServer proxy, RaftGroupId groupId, RaftConfiguration conf) {
((RaftServerImpl)getDivision(proxy, groupId)).getState().setRaftConf(conf);
}
public static RaftServerRpc getServerRpc(RaftServer.Division server) {
return ((RaftServerImpl)server).getRaftServer().getServerRpc();
}
private static Optional<LeaderStateImpl> getLeaderState(RaftServer.Division server) {
return ((RaftServerImpl)server).getRole().getLeaderState();
}
public static Stream<LogAppender> getLogAppenders(RaftServer.Division server) {
return getLeaderState(server).map(LeaderStateImpl::getLogAppenders).orElse(null);
}
public static void assertLeaderLease(RaftServer.Division leader, boolean hasLease) {
final LeaderStateImpl l = getLeaderState(leader).orElse(null);
Assert.assertNotNull(l);
Assert.assertEquals(l.hasLease(), hasLease);
}
public static void restartLogAppenders(RaftServer.Division server) {
final LeaderStateImpl leaderState = getLeaderState(server).orElseThrow(
() -> new IllegalStateException(server + " is not the leader"));
leaderState.getLogAppenders().forEach(leaderState::restart);
}
public static RaftServer.Division getDivision(RaftServer server, RaftGroupId groupId) {
return JavaUtils.callAsUnchecked(() -> server.getDivision(groupId));
}
public static DataStreamServer newDataStreamServer(RaftServer server) {
return new DataStreamServerImpl(server, null);
}
public static DataStreamMap newDataStreamMap(Object name) {
return new DataStreamMapImpl(name);
}
public static void assertLostMajorityHeartbeatsRecently(RaftServer.Division leader) {
final FollowerState f = ((RaftServerImpl)leader).getRole().getFollowerState().orElse(null);
Assert.assertNotNull(f);
Assert.assertTrue(f.lostMajorityHeartbeatsRecently());
}
public static SegmentedRaftLog newSegmentedRaftLog(RaftGroupMemberId memberId, DivisionInfo info,
RaftStorage storage, RaftProperties properties) {
final RaftServerImpl server = Mockito.mock(RaftServerImpl.class);
Mockito.when(server.getInfo()).thenReturn(info);
return SegmentedRaftLog.newBuilder()
.setMemberId(memberId)
.setServer(server)
.setNotifyTruncatedLogEntry(server::notifyTruncatedLogEntry)
.setGetTransactionContext(server::getTransactionContext)
.setSubmitUpdateCommitEvent(server::submitUpdateCommitEvent)
.setStorage(storage)
.setProperties(properties)
.build();
}
public static boolean isHighestPriority(RaftConfiguration config, RaftPeerId peerId) {
return ((RaftConfigurationImpl)config).isHighestPriority(peerId);
}
public static void runWithMinorityPeers(MiniRaftCluster cluster, Collection<RaftPeer> peersInNewConf,
CheckedConsumer<Collection<RaftPeer>, IOException> consumer) throws IOException {
Collection<RaftPeer> peers = parseMinorityPeers(cluster, peersInNewConf);
while (peers != null) {
consumer.accept(peers);
peers = parseMinorityPeers(cluster, peersInNewConf);
}
}
private static Collection<RaftPeer> parseMinorityPeers(MiniRaftCluster cluster, Collection<RaftPeer> peersInNewConf) {
RaftConfigurationImpl conf = (RaftConfigurationImpl) cluster.getLeader().getRaftConf();
Set<RaftPeer> peers = new HashSet<>(conf.getCurrentPeers());
// Add new peers to construct minority conf peers.
List<RaftPeer> peersToAdd = peersInNewConf.stream().filter(
peer -> !conf.containsInConf(peer.getId(), RaftProtos.RaftPeerRole.FOLLOWER)).collect(Collectors.toList());
if (!peersToAdd.isEmpty()) {
for (RaftPeer peer : peersToAdd) {
if (peers.add(peer) && conf.changeMajority(peers)) {
peers.remove(peer);
break;
}
}
return peers;
}
// All new peers has been added. Handle the removed peers.
List<RaftPeer> peersToRemove = peers.stream().filter(peer -> !peersInNewConf.contains(peer)).collect(Collectors.toList());
if (!peersToRemove.isEmpty()) {
return peersInNewConf;
}
// The peers in new conf are the same as current conf, return null.
return null;
}
}