blob: 933a995cc9661ae7308f8b6a8a0c5d7402a8f4c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.CallId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.ServerFactory;
import org.apache.ratis.server.raftlog.memory.MemoryRaftLog;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ReferenceCountedLeakDetector;
import org.apache.ratis.util.ReflectionUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public abstract class MiniRaftCluster implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class);
public static final String CLASS_NAME = JavaUtils.getClassSimpleName(MiniRaftCluster.class);
public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class";
private static final StateMachine.Registry STATEMACHINE_REGISTRY_DEFAULT = gid -> new BaseStateMachine();
private static final TimeDuration RETRY_INTERVAL_DEFAULT =
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
public static abstract class Factory<CLUSTER extends MiniRaftCluster> {
public interface Get<CLUSTER extends MiniRaftCluster> {
Supplier<RaftProperties> properties = JavaUtils.memoize(RaftProperties::new);
Factory<CLUSTER> getFactory();
default RaftProperties getProperties() {
return properties.get();
}
default RaftProperties setStateMachine(Class<? extends StateMachine> stateMachineClass) {
final RaftProperties p = getProperties();
p.setClass(STATEMACHINE_CLASS_KEY, stateMachineClass, StateMachine.class);
return p;
}
default CLUSTER newCluster(int numPeers) {
return newCluster(numPeers, 0);
}
default CLUSTER newCluster(int numPeers, int numListeners) {
return getFactory().newCluster(numPeers, numListeners, getProperties());
}
default void runWithNewCluster(int numServers, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
runWithNewCluster(numServers, 0, true, testCase);
}
default void runWithNewCluster(int numServers, boolean startCluster, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
runWithNewCluster(numServers, 0, startCluster, testCase);
}
default void runWithNewCluster(int numServers, int numListeners, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
runWithNewCluster(numServers, numListeners, true, testCase);
}
default void runWithNewCluster(int numServers, int numListeners, boolean startCluster, CheckedConsumer<CLUSTER, Exception> testCase)
throws Exception {
final StackTraceElement caller = JavaUtils.getCallerStackTraceElement();
LOG.info("Running " + caller.getMethodName());
final CLUSTER cluster = newCluster(numServers, numListeners);
try {
if (startCluster) {
cluster.start();
}
testCase.accept(cluster);
} catch(Exception t) {
LOG.info(cluster.printServers());
LOG.error("Failed " + caller, t);
throw t;
} finally {
cluster.shutdown();
}
}
default void runWithSameCluster(int numServers, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
runWithSameCluster(numServers, 0, testCase);
}
default void runWithSameCluster(int numServers, int numListeners, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
final StackTraceElement caller = JavaUtils.getCallerStackTraceElement();
LOG.info("Running " + caller.getMethodName());
CLUSTER cluster = null;
try {
cluster = getFactory().reuseCluster(numServers, numListeners, getProperties());
testCase.accept(cluster);
} catch(Exception t) {
if (cluster != null) {
LOG.info(cluster.printServers());
}
LOG.error("Failed " + caller, t);
throw t;
}
}
}
private final AtomicReference<CLUSTER> reusableCluster = new AtomicReference<>();
private CLUSTER reuseCluster(int numServers, int numListeners, RaftProperties prop) throws IOException {
for(;;) {
final CLUSTER cluster = reusableCluster.get();
if (cluster != null) {
return cluster;
}
final CLUSTER newCluster = newCluster(numServers, numListeners, prop);
if (reusableCluster.compareAndSet(null, newCluster)) {
newCluster.start();
Runtime.getRuntime().addShutdownHook(new Thread(newCluster::shutdown));
return newCluster;
}
}
}
public abstract CLUSTER newCluster(
String[] ids, String[] listenerIds, RaftProperties prop);
public CLUSTER newCluster(int numServer, RaftProperties prop) {
return newCluster(numServer, 0, prop);
}
public CLUSTER newCluster(int numServer, int numListeners, RaftProperties prop) {
return newCluster(generateIds(numServer, 0), generateIds(numListeners, numServer), prop);
}
}
public static abstract class RpcBase extends MiniRaftCluster {
public RpcBase(String[] ids, String[] listenerIds, RaftProperties properties, Parameters parameters) {
super(ids, listenerIds, properties, parameters);
}
@Override
public void setBlockRequestsFrom(String src, boolean block) {
if (block) {
BlockRequestHandlingInjection.getInstance().blockRequestor(src);
} else {
BlockRequestHandlingInjection.getInstance().unblockRequestor(src);
}
}
protected int getPort(RaftPeerId id, RaftGroup g) {
return getPort(getAddress(id, g, RaftPeer::getAddress));
}
protected String getAddress(RaftPeerId id, RaftGroup g, Function<RaftPeer, String> getAddress) {
final RaftPeer p = getPeer(id, g);
return p == null? null : getAddress.apply(p);
}
protected int getDataStreamPort(RaftPeerId id, RaftGroup g) {
final RaftPeer p = getPeer(id, g);
final String address = p == null? null : p.getDataStreamAddress();
return getPort(address);
}
private int getPort(String address) {
return Optional.ofNullable(address)
.map(NetUtils::createSocketAddr)
.map(InetSocketAddress::getPort)
.orElseGet(NetUtils::getFreePort);
}
}
public static class PeerChanges {
public final RaftPeer[] allPeersInNewConf;
public final RaftPeer[] newPeers;
public final RaftPeer[] removedPeers;
public PeerChanges(RaftPeer[] all, RaftPeer[] newPeers, RaftPeer[] removed) {
this.allPeersInNewConf = all;
this.newPeers = newPeers;
this.removedPeers = removed;
}
}
public static RaftGroup initRaftGroup(Collection<String> ids, Collection<String> listenerIds) {
Stream<RaftPeer> peer = ids.stream()
.map(id -> RaftPeer.newBuilder().setId(id))
.map(MiniRaftCluster::assignAddresses)
.map(RaftPeer.Builder::build);
Stream<RaftPeer> listener = listenerIds.stream()
.map(id -> RaftPeer.newBuilder().setId(id))
.map(MiniRaftCluster::assignAddresses)
.map(p -> p.setStartupRole(RaftProtos.RaftPeerRole.LISTENER))
.map(RaftPeer.Builder::build);
final RaftPeer[] peers = Stream.concat(peer, listener).toArray(RaftPeer[]::new);
return RaftGroup.valueOf(RaftGroupId.randomId(), peers);
}
private static RaftPeer.Builder assignAddresses(RaftPeer.Builder builder) {
return builder
.setAddress(NetUtils.localhostWithFreePort())
.setAdminAddress(NetUtils.localhostWithFreePort())
.setClientAddress(NetUtils.localhostWithFreePort())
.setDataStreamAddress(NetUtils.localhostWithFreePort());
}
private final Supplier<File> rootTestDir = JavaUtils.memoize(
() -> new File(BaseTest.getRootTestDir(),
JavaUtils.getClassSimpleName(getClass()) + Integer.toHexString(ThreadLocalRandom.current().nextInt())));
public File getStorageDir(RaftPeerId id) {
return new File(rootTestDir.get(), id.toString());
}
public static String[] generateIds(int numServers, int base) {
String[] ids = new String[numServers];
for (int i = 0; i < numServers; i++) {
ids[i] = "s" + (i + base);
}
return ids;
}
public static int getIdIndex(String id) {
return Integer.parseInt(id.substring(1));
}
protected RaftGroup group;
protected final RaftProperties properties;
protected final Parameters parameters;
protected final Map<RaftPeerId, RaftServerProxy> servers = new ConcurrentHashMap<>();
protected final Map<RaftPeerId, RaftPeer> peers = new ConcurrentHashMap<>();
private volatile StateMachine.Registry stateMachineRegistry = null;
private final AtomicReference<Timer> timer = new AtomicReference<>();
protected MiniRaftCluster(String[] ids, String[] listenerIds, RaftProperties properties, Parameters parameters) {
this.group = initRaftGroup(Arrays.asList(ids), Arrays.asList(listenerIds));
LOG.info("new {} with {}", JavaUtils.getClassSimpleName(getClass()), group);
this.properties = new RaftProperties(properties);
this.parameters = parameters;
ExitUtils.disableSystemExit();
}
public RaftProperties getProperties() {
return properties;
}
public MiniRaftCluster initServers() {
LOG.info("servers = " + servers);
if (servers.isEmpty()) {
putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), true, group);
}
return this;
}
public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean format) {
final RaftServerProxy s = newRaftServer(id, group, format);
peers.put(s.getId(), s.getPeer());
Preconditions.assertTrue(servers.put(id, s) == null);
return s;
}
private Collection<RaftServer> putNewServers(Iterable<RaftPeerId> peers, boolean format, RaftGroup raftGroup) {
return StreamSupport.stream(peers.spliterator(), false)
.map(id -> putNewServer(id, raftGroup, format))
.collect(Collectors.toList());
}
public void start() throws IOException {
LOG.info(".............................................................. ");
LOG.info("... ");
LOG.info("... Starting " + JavaUtils.getClassSimpleName(getClass()));
LOG.info("... ");
LOG.info(".............................................................. ");
initServers();
startServers(servers.values());
this.timer.updateAndGet(t -> t != null? t
: JavaUtils.runRepeatedly(() -> LOG.info("TIMED-PRINT: " + printServers()), 10, 10, TimeUnit.SECONDS));
}
/**
* start a stopped server again.
*/
public RaftServer.Division restartServer(RaftPeerId serverId, boolean format) throws IOException {
return restartServer(serverId, group, format);
}
public RaftServer.Division restartServer(RaftPeerId serverId, RaftGroup group, boolean format) throws IOException {
killServer(serverId);
servers.remove(serverId);
final RaftServer proxy = putNewServer(serverId, group, format);
proxy.start();
return group == null? null: proxy.getDivision(group.getGroupId());
}
public void restart(boolean format) throws IOException {
shutdown();
List<RaftPeerId> idList = new ArrayList<>(servers.keySet());
servers.clear();
putNewServers(idList, format, group);
start();
}
public TimeDuration getTimeoutMax() {
return RaftServerConfigKeys.Rpc.timeoutMax(properties);
}
private RaftServerProxy newRaftServer(RaftPeerId id, RaftGroup group, boolean format) {
LOG.info("newRaftServer: {}, {}, format? {}", id, group, format);
try {
final File dir = getStorageDir(id);
if (format) {
FileUtils.deleteFully(dir);
LOG.info("Formatted directory {}", dir);
}
final RaftProperties prop = new RaftProperties(properties);
RaftServerConfigKeys.setStorageDir(prop, Collections.singletonList(dir));
return ServerImplUtils.newRaftServer(id, group,
format? RaftStorage.StartupOption.FORMAT: RaftStorage.StartupOption.RECOVER,
getStateMachineRegistry(prop), null, prop, setPropertiesAndInitParameters(id, group, prop));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
protected abstract Parameters setPropertiesAndInitParameters(
RaftPeerId id, RaftGroup group, RaftProperties properties);
public void setStateMachineRegistry(StateMachine.Registry stateMachineRegistry) {
this.stateMachineRegistry = stateMachineRegistry;
}
StateMachine.Registry getStateMachineRegistry(RaftProperties properties) {
if (stateMachineRegistry != null) {
return stateMachineRegistry;
}
final Class<? extends StateMachine> smClass = properties.getClass(
STATEMACHINE_CLASS_KEY, null, StateMachine.class);
if (smClass == null) {
return STATEMACHINE_REGISTRY_DEFAULT;
}
return gid -> {
final RuntimeException exception;
try {
return ReflectionUtils.newInstance(smClass);
} catch(RuntimeException e) {
exception = e;
}
try {
final Class<?>[] argClasses = {RaftProperties.class};
return ReflectionUtils.newInstance(smClass, argClasses, properties);
} catch(RuntimeException e) {
exception.addSuppressed(e);
}
throw exception;
};
}
private static List<RaftPeer> toRaftPeers(Iterable<RaftServer> servers) {
return StreamSupport.stream(servers.spliterator(), false)
.map(RaftServer::getPeer)
.collect(Collectors.toList());
}
public PeerChanges addNewPeers(int number, boolean startNewPeer)
throws IOException {
return addNewPeers(generateIds(number, servers.size()), startNewPeer, false);
}
public PeerChanges addNewPeers(int number, boolean startNewPeer,
boolean emptyPeer) throws IOException {
return addNewPeers(generateIds(number, servers.size()), startNewPeer, emptyPeer,
RaftProtos.RaftPeerRole.FOLLOWER);
}
public PeerChanges addNewPeers(String[] ids, boolean startNewPeer,
boolean emptyPeer) throws IOException {
return addNewPeers(ids, startNewPeer, emptyPeer, RaftProtos.RaftPeerRole.FOLLOWER);
}
public PeerChanges addNewPeers(int number, boolean startNewPeer,
boolean emptyPeer, RaftProtos.RaftPeerRole startRole) throws IOException {
return addNewPeers(generateIds(number, servers.size()), startNewPeer, emptyPeer, startRole);
}
public PeerChanges addNewPeers(String[] ids, boolean startNewPeer,
boolean emptyPeer, RaftProtos.RaftPeerRole startRole) throws IOException {
LOG.info("Add new peers {}", Arrays.asList(ids));
final Iterable<RaftPeerId> peerIds = CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf);
final RaftGroup raftGroup;
if (emptyPeer) {
raftGroup = RaftGroup.valueOf(group.getGroupId(), Collections.emptyList());
} else {
final Collection<RaftPeer> newPeers = StreamSupport.stream(peerIds.spliterator(), false)
.map(id -> RaftPeer.newBuilder().setId(id)
.setStartupRole(startRole))
.map(MiniRaftCluster::assignAddresses)
.map(RaftPeer.Builder::build)
.collect(Collectors.toSet());
newPeers.addAll(group.getPeers());
raftGroup = RaftGroup.valueOf(group.getGroupId(), newPeers);
}
// create and add new RaftServers
final Collection<RaftServer> newServers = putNewServers(peerIds, true, raftGroup);
if (startNewPeer) {
// start the server
for(RaftServer s : newServers) {
s.start();
}
}
final Collection<RaftPeer> newPeers = toRaftPeers(newServers);
final RaftPeer[] np = newPeers.toArray(RaftPeer.emptyArray());
newPeers.addAll(group.getPeers());
RaftPeer[] p = newPeers.toArray(RaftPeer.emptyArray());
group = RaftGroup.valueOf(group.getGroupId(), p);
return new PeerChanges(p, np, RaftPeer.emptyArray());
}
void startServers(Iterable<? extends RaftServer> servers) throws IOException {
for(RaftServer s : servers) {
s.start();
peers.put(s.getId(), s.getPeer());
}
}
/**
* prepare the peer list when removing some peers from the conf
*/
public PeerChanges removePeers(int number, boolean removeLeader,
Collection<RaftPeer> excluded) throws InterruptedException {
Collection<RaftPeer> peers = new ArrayList<>(group.getPeers());
List<RaftPeer> removedPeers = new ArrayList<>(number);
if (removeLeader) {
final RaftPeer leader = RaftTestUtil.waitForLeader(this).getPeer();
Preconditions.assertTrue(!excluded.contains(leader));
peers.remove(leader);
removedPeers.add(leader);
}
final List<RaftServer.Division> followers = getFollowers();
for (int i = 0, removed = 0; i < followers.size() &&
removed < (removeLeader ? number - 1 : number); i++) {
RaftPeer toRemove = followers.get(i).getPeer();
if (!excluded.contains(toRemove)) {
peers.remove(toRemove);
removedPeers.add(toRemove);
removed++;
}
}
final RaftPeer[] p = peers.toArray(RaftPeer.emptyArray());
group = RaftGroup.valueOf(group.getGroupId(), p);
return new PeerChanges(p, RaftPeer.emptyArray(), removedPeers.toArray(RaftPeer.emptyArray()));
}
public void killServer(RaftPeerId id) {
LOG.info("killServer " + id);
servers.get(id).close();
}
public String printServers() {
return printServers(null);
}
public String printServers(RaftGroupId groupId) {
final StringBuilder b = new StringBuilder("printing ");
if (groupId != null) {
b.append(groupId);
} else {
b.append("ALL groups");
}
getRaftServerProxyStream(groupId).forEach(s -> b.append("\n ").append(s));
return b.toString();
}
public String printAllLogs() {
StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n");
for (RaftServer.Division s : iterateDivisions()) {
b.append(" ");
b.append(s).append("\n");
final RaftLog log = s.getRaftLog();
if (log instanceof MemoryRaftLog) {
b.append(" ");
b.append(((MemoryRaftLog) log).getEntryString());
}
}
return b.toString();
}
public RaftServer.Division getLeaderAndSendFirstMessage(boolean ignoreException) throws IOException {
final RaftServer.Division leader = getLeader();
try(RaftClient client = createClient(leader.getId())) {
client.io().send(new RaftTestUtil.SimpleMessage("first msg to make leader ready"));
} catch (IOException e) {
if (!ignoreException) {
throw e;
}
}
return leader;
}
public IllegalStateException newIllegalStateExceptionForNoLeaders(RaftGroupId groupId) {
final String g = groupId == null? "": " for " + groupId;
return new IllegalStateException("No leader yet " + g + ": " + printServers(groupId));
}
public IllegalStateException newIllegalStateExceptionForMultipleLeaders(RaftGroupId groupId,
List<RaftServer.Division> leaders) {
final String g = groupId == null? "": " for " + groupId;
return new IllegalStateException("Found multiple leaders" + g
+ " at the same term (=" + leaders.get(0).getInfo().getCurrentTerm()
+ "), leaders.size() = " + leaders.size() + " > 1, leaders = " + leaders
+ ": " + printServers(groupId));
}
/**
* Get leader for the single group case.
* Do not use this method if this cluster has multiple groups.
*
* @return the unique leader with the highest term. Or, return null if there is no leader.
* @throws IllegalStateException if there are multiple leaders with the same highest term.
*/
public RaftServer.Division getLeader() {
return getLeader(getLeaders(null), null, leaders -> {
throw newIllegalStateExceptionForMultipleLeaders(null, leaders);
});
}
public RaftServer.Division getLeader(RaftGroupId groupId, Runnable handleNoLeaders,
Consumer<List<RaftServer.Division>> handleMultipleLeaders) {
return getLeader(getLeaders(groupId), handleNoLeaders, handleMultipleLeaders);
}
static RaftServer.Division getLeader(List<RaftServer.Division> leaders, Runnable handleNoLeaders,
Consumer<List<RaftServer.Division>> handleMultipleLeaders) {
if (leaders.isEmpty()) {
if (handleNoLeaders != null) {
handleNoLeaders.run();
}
return null;
} else if (leaders.size() > 1) {
if (handleMultipleLeaders != null) {
handleMultipleLeaders.accept(leaders);
}
return null;
} else {
return leaders.get(0);
}
}
/**
* @return the list of leaders with the highest term (i.e. leaders with a lower term are not included).
* from the given group.
*/
private List<RaftServer.Division> getLeaders(RaftGroupId groupId) {
final Stream<RaftServer.Division> serverAliveStream = getServerAliveStream(groupId);
final List<RaftServer.Division> leaders = new ArrayList<>();
serverAliveStream.filter(server -> server.getInfo().isLeader()).forEach(s -> {
if (leaders.isEmpty()) {
leaders.add(s);
} else {
final long leaderTerm = leaders.get(0).getInfo().getCurrentTerm();
final long term = s.getInfo().getCurrentTerm();
if (term >= leaderTerm) {
if (term > leaderTerm) {
leaders.clear();
}
leaders.add(s);
}
}
});
return leaders;
}
boolean isLeader(String leaderId) {
final RaftServer.Division leader = getLeader();
return leader != null && leader.getId().toString().equals(leaderId);
}
public List<RaftServer.Division> getFollowers() {
return getServerAliveStream()
.filter(server -> server.getInfo().isFollower())
.collect(Collectors.toList());
}
public List<RaftServer.Division> getListeners() {
return getServerAliveStream()
.filter(server -> server.getInfo().isListener())
.collect(Collectors.toList());
}
public int getNumServers() {
return servers.size();
}
public Iterable<RaftServer> getServers() {
return CollectionUtils.as(servers.values(), s -> s);
}
private Stream<RaftServerProxy> getRaftServerProxyStream(RaftGroupId groupId) {
return servers.values().stream()
.filter(s -> groupId == null || s.getGroupIds().contains(groupId));
}
public Iterable<RaftServer.Division> iterateDivisions() {
return CollectionUtils.as(getServers(), this::getDivision);
}
private Stream<RaftServer.Division> getServerStream(RaftGroupId groupId) {
final Stream<RaftServerProxy> stream = getRaftServerProxyStream(groupId);
return groupId != null?
stream.map(s -> JavaUtils.callAsUnchecked(() -> s.getDivision(groupId)))
: stream.flatMap(s -> JavaUtils.callAsUnchecked(s::getImpls).stream());
}
public Stream<RaftServer.Division> getServerAliveStream() {
return getServerAliveStream(getGroupId());
}
private Stream<RaftServer.Division> getServerAliveStream(RaftGroupId groupId) {
return getServerStream(groupId).filter(server -> server.getInfo().isAlive());
}
private RetryPolicy getDefaultRetryPolicy() {
return RetryPolicies.retryForeverWithSleep(RETRY_INTERVAL_DEFAULT);
}
public RaftServerProxy getServer(RaftPeerId id) {
return servers.get(id);
}
public ServerFactory getServerFactory(RaftPeerId id) {
return servers.get(id).getFactory();
}
public RaftServer.Division getDivision(RaftPeerId id) {
return getDivision(servers.get(id));
}
public RaftServer.Division getDivision(RaftPeerId id, RaftGroupId groupId) {
return RaftServerTestUtil.getDivision(servers.get(id), groupId);
}
public RaftServer.Division getDivision(RaftServer server) {
return RaftServerTestUtil.getDivision(server, getGroupId());
}
public List<RaftPeer> getPeers() {
return toRaftPeers(getServers());
}
RaftPeer getPeer(RaftPeerId id, RaftGroup group) {
RaftPeer p = peers.get(id);
if (p != null) {
return p;
}
if (group != null) {
p = group.getPeer(id);
}
if (p == null) {
p = Optional.ofNullable(servers.get(id)).map(RaftServerProxy::getPeer).orElse(null);
}
if (p != null) {
peers.put(id, p);
}
return p;
}
public RaftGroup getGroup() {
return group;
}
public RaftClient createClient() {
return createClient(null, group);
}
public RaftClient createClient(RaftGroup g) {
return createClient(null, g);
}
public RaftClient createClient(RaftPeerId leaderId) {
return createClient(leaderId, group);
}
public RaftClient createClient(RetryPolicy retryPolicy) {
return createClient(null, group, retryPolicy);
}
public RaftClient createClient(RaftPeerId leaderId, RetryPolicy retryPolicy) {
return createClient(leaderId, group, retryPolicy);
}
public RaftClient createClient(RaftPeerId leaderId, RaftGroup group) {
return createClient(leaderId, group, getDefaultRetryPolicy());
}
public RaftClient createClient(RaftPeer primaryServer) {
return createClient(null, group, getDefaultRetryPolicy(), primaryServer);
}
public RaftClient createClient(RaftPeerId leaderId, RaftGroup group, RetryPolicy retryPolicy) {
return createClient(leaderId, group, retryPolicy, null);
}
public RaftClient createClient(RaftPeerId leaderId, RaftGroup group, RetryPolicy retryPolicy, RaftPeer primaryServer) {
RaftClient.Builder builder = RaftClient.newBuilder()
.setRaftGroup(group)
.setLeaderId(leaderId)
.setProperties(properties)
.setParameters(parameters)
.setPrimaryDataStreamServer(primaryServer)
.setRetryPolicy(retryPolicy);
return builder.build();
}
public RaftClientRequest newRaftClientRequest(
ClientId clientId, RaftPeerId leaderId, Message message) {
return newRaftClientRequest(clientId, leaderId, CallId.getDefault(), message);
}
public RaftClientRequest newRaftClientRequest(
ClientId clientId, RaftPeerId leaderId, long callId, Message message) {
return RaftClientRequest.newBuilder()
.setClientId(clientId)
.setLeaderId(leaderId)
.setGroupId(getGroupId())
.setCallId(callId)
.setMessage(message)
.setType(RaftClientRequest.writeRequestType())
.build();
}
public SetConfigurationRequest newSetConfigurationRequest(
ClientId clientId, RaftPeerId leaderId,
RaftPeer... peers) {
return new SetConfigurationRequest(clientId, leaderId, getGroupId(), CallId.getDefault(),
SetConfigurationRequest.Arguments.newBuilder().setServersInNewConf(peers).build());
}
public void setConfiguration(RaftPeer... peers) throws IOException {
try(RaftClient client = createClient()) {
LOG.info("Start changing the configuration: {}", Arrays.asList(peers));
final RaftClientReply reply = client.admin().setConfiguration(peers);
Preconditions.assertTrue(reply.isSuccess());
}
}
@Override
public void close() {
shutdown();
}
public void shutdown() {
LOG.info("************************************************************** ");
LOG.info("*** ");
LOG.info("*** Stopping " + JavaUtils.getClassSimpleName(getClass()));
LOG.info("*** ");
LOG.info("************************************************************** ");
LOG.info(printServers());
// TODO: classes like RaftLog may throw uncaught exception during shutdown (e.g. write after close)
ExitUtils.setTerminateOnUncaughtException(false);
final ExecutorService executor = Executors.newFixedThreadPool(servers.size(), (t) ->
Daemon.newBuilder().setName("MiniRaftCluster-" + THREAD_COUNT.incrementAndGet()).setRunnable(t).build());
getServers().forEach(proxy -> executor.submit(() -> JavaUtils.runAsUnchecked(proxy::close)));
try {
executor.shutdown();
// just wait for a few seconds
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("shutdown interrupted", e);
Thread.currentThread().interrupt();
}
Optional.ofNullable(timer.get()).ifPresent(Timer::cancel);
ExitUtils.assertNotTerminated();
LOG.info("{} shutdown completed", JavaUtils.getClassSimpleName(getClass()));
// GC to ensure leak detection work.
try {
RaftTestUtil.gc();
} catch (InterruptedException e) {
LOG.info("gc interrupted.");
}
ReferenceCountedLeakDetector.getLeakDetector().assertNoLeaks();
}
/**
* Block all the incoming requests for the peer with leaderId. Also delay
* outgoing or incoming msg for all other peers.
*/
protected abstract void blockQueueAndSetDelay(String leaderId, int delayMs)
throws InterruptedException;
/**
* Try to enforce the leader of the cluster.
* @param leaderId ID of the targeted leader server.
* @return true if server has been successfully enforced to the leader, false
* otherwise.
*/
public boolean tryEnforceLeader(String leaderId) throws InterruptedException {
// do nothing and see if the given id is already a leader.
if (isLeader(leaderId)) {
return true;
}
// Blocking all other server's RPC read process to make sure a read takes at
// least ELECTION_TIMEOUT_MIN. In this way when the target leader request a
// vote, all non-leader servers can grant the vote.
// Disable the target leader server RPC so that it can request a vote.
blockQueueAndSetDelay(leaderId,
RaftServerConfigKeys.Rpc.TIMEOUT_MIN_DEFAULT.toIntExact(TimeUnit.MILLISECONDS));
// Reopen queues so that the vote can make progress.
blockQueueAndSetDelay(leaderId, 0);
return isLeader(leaderId);
}
/** Block/unblock the requests sent from the given source. */
public abstract void setBlockRequestsFrom(String src, boolean block);
public RaftGroupId getGroupId() {
return group.getGroupId();
}
}