blob: 74c19a15e8e79c8a8c7242b2425b740759412c33 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.storage.MemoryRaftLog;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ReflectionUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
public abstract class MiniRaftCluster implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class);
public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName();
public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class";
private static final StateMachine.Registry STATEMACHINE_REGISTRY_DEFAULT = gid -> new BaseStateMachine();
private static final TimeDuration RETRY_INTERVAL_DEFAULT =
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
public static abstract class Factory<CLUSTER extends MiniRaftCluster> {
public interface Get<CLUSTER extends MiniRaftCluster> {
Supplier<RaftProperties> properties = JavaUtils.memoize(RaftProperties::new);
Factory<CLUSTER> getFactory();
default RaftProperties getProperties() {
return properties.get();
}
default CLUSTER newCluster(int numPeers) {
return getFactory().newCluster(numPeers, getProperties());
}
default void runWithNewCluster(int numServers, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
runWithNewCluster(numServers, true, testCase);
}
default void runWithNewCluster(int numServers, boolean startCluster, CheckedConsumer<CLUSTER, Exception> testCase)
throws Exception {
final StackTraceElement caller = JavaUtils.getCallerStackTraceElement();
LOG.info("Running " + caller.getMethodName());
final CLUSTER cluster = newCluster(numServers);
try {
if (startCluster) {
cluster.start();
}
testCase.accept(cluster);
} catch(Throwable t) {
LOG.info(cluster.printServers());
LOG.error("Failed " + caller, t);
throw t;
} finally {
cluster.shutdown();
}
}
default void runWithSameCluster(int numServers, CheckedConsumer<CLUSTER, Exception> testCase) throws Exception {
final StackTraceElement caller = JavaUtils.getCallerStackTraceElement();
LOG.info("Running " + caller.getMethodName());
CLUSTER cluster = null;
try {
cluster = getFactory().reuseCluster(numServers, getProperties());
testCase.accept(cluster);
} catch(Throwable t) {
if (cluster != null) {
LOG.info(cluster.printServers());
}
LOG.error("Failed " + caller, t);
throw t;
}
}
}
private final AtomicReference<CLUSTER> reusableCluster = new AtomicReference<>();
private CLUSTER reuseCluster(int numServers, RaftProperties prop) throws IOException {
for(;;) {
final CLUSTER cluster = reusableCluster.get();
if (cluster != null) {
return cluster;
}
final CLUSTER newCluster = newCluster(numServers, prop);
if (reusableCluster.compareAndSet(null, newCluster)) {
newCluster.start();
Runtime.getRuntime().addShutdownHook(new Thread(newCluster::shutdown));
return newCluster;
}
}
}
public abstract CLUSTER newCluster(
String[] ids, RaftProperties prop);
public CLUSTER newCluster(int numServer, RaftProperties prop) {
return newCluster(generateIds(numServer, 0), prop);
}
}
public static abstract class RpcBase extends MiniRaftCluster {
public RpcBase(String[] ids, RaftProperties properties, Parameters parameters) {
super(ids, properties, parameters);
}
@Override
public void setBlockRequestsFrom(String src, boolean block) {
if (block) {
BlockRequestHandlingInjection.getInstance().blockRequestor(src);
} else {
BlockRequestHandlingInjection.getInstance().unblockRequestor(src);
}
}
protected int getPort(RaftPeerId id, RaftGroup g) {
final RaftPeer p = g != null? g.getPeer(id): peers.get(id);
final String address = p == null? null : p.getAddress();
final InetSocketAddress inetAddress = address != null?
NetUtils.createSocketAddr(address): NetUtils.createLocalServerAddress();
return inetAddress.getPort();
}
}
public static class PeerChanges {
public final RaftPeer[] allPeersInNewConf;
public final RaftPeer[] newPeers;
public final RaftPeer[] removedPeers;
public PeerChanges(RaftPeer[] all, RaftPeer[] newPeers, RaftPeer[] removed) {
this.allPeersInNewConf = all;
this.newPeers = newPeers;
this.removedPeers = removed;
}
}
public static RaftGroup initRaftGroup(Collection<String> ids) {
final RaftPeer[] peers = ids.stream()
.map(RaftPeerId::valueOf)
.map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
.toArray(RaftPeer[]::new);
return RaftGroup.valueOf(RaftGroupId.randomId(), peers);
}
private final Supplier<File> rootTestDir = JavaUtils.memoize(
() -> new File(BaseTest.getRootTestDir(),
getClass().getSimpleName() + Integer.toHexString(ThreadLocalRandom.current().nextInt())));
private File getStorageDir(RaftPeerId id) {
return new File(rootTestDir.get(), id.toString());
}
public static String[] generateIds(int numServers, int base) {
String[] ids = new String[numServers];
for (int i = 0; i < numServers; i++) {
ids[i] = "s" + (i + base);
}
return ids;
}
public static int getIdIndex(String id) {
return Integer.parseInt(id.substring(1));
}
protected RaftGroup group;
protected final RaftProperties properties;
protected final Parameters parameters;
protected final Map<RaftPeerId, RaftServerProxy> servers = new ConcurrentHashMap<>();
protected final Map<RaftPeerId, RaftPeer> peers = new ConcurrentHashMap<>();
private volatile StateMachine.Registry stateMachineRegistry = null;
private final AtomicReference<Timer> timer = new AtomicReference<>();
protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) {
this.group = initRaftGroup(Arrays.asList(ids));
LOG.info("new {} with {}", getClass().getSimpleName(), group);
this.properties = new RaftProperties(properties);
this.parameters = parameters;
ExitUtils.disableSystemExit();
}
public RaftProperties getProperties() {
return properties;
}
public MiniRaftCluster initServers() {
LOG.info("servers = " + servers);
if (servers.isEmpty()) {
putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), true);
}
return this;
}
public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean format) {
final RaftServerProxy s = newRaftServer(id, group, format);
Preconditions.assertTrue(servers.put(id, s) == null);
peers.put(id, toRaftPeer(s));
return s;
}
private Collection<RaftServerProxy> putNewServers(
Iterable<RaftPeerId> peers, boolean format) {
return StreamSupport.stream(peers.spliterator(), false)
.map(id -> putNewServer(id, group, format))
.collect(Collectors.toList());
}
public void start() throws IOException {
LOG.info(".............................................................. ");
LOG.info("... ");
LOG.info("... Starting " + getClass().getSimpleName());
LOG.info("... ");
LOG.info(".............................................................. ");
initServers();
startServers(servers.values());
this.timer.updateAndGet(t -> t != null? t
: JavaUtils.runRepeatedly(() -> LOG.info("TIMED-PRINT: " + printServers()), 10, 10, TimeUnit.SECONDS));
}
/**
* start a stopped server again.
*/
public RaftServerImpl restartServer(RaftPeerId newId, boolean format) throws IOException {
return restartServer(newId, group, format);
}
public RaftServerImpl restartServer(RaftPeerId newId, RaftGroup group, boolean format) throws IOException {
killServer(newId);
servers.remove(newId);
final RaftServerProxy proxy = putNewServer(newId, group, format);
proxy.start();
return group == null? null: proxy.getImpl(group.getGroupId());
}
public void restart(boolean format) throws IOException {
shutdown();
List<RaftPeerId> idList = new ArrayList<>(servers.keySet());
servers.clear();
putNewServers(idList, format);
start();
}
public TimeDuration getTimeoutMax() {
return RaftServerConfigKeys.Rpc.timeoutMax(properties);
}
private RaftServerProxy newRaftServer(RaftPeerId id, RaftGroup group, boolean format) {
LOG.info("newRaftServer: {}, {}, format? {}", id, group, format);
try {
final File dir = getStorageDir(id);
if (format) {
FileUtils.deleteFully(dir);
LOG.info("Formatted directory {}", dir);
}
final RaftProperties prop = new RaftProperties(properties);
RaftServerConfigKeys.setStorageDirs(prop, Collections.singletonList(dir));
return newRaftServer(id, getStateMachineRegistry(properties), group, prop);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
protected abstract RaftServerProxy newRaftServer(
RaftPeerId id, StateMachine.Registry stateMachineRegistry , RaftGroup group,
RaftProperties properties) throws IOException;
public void setStateMachineRegistry(StateMachine.Registry stateMachineRegistry) {
this.stateMachineRegistry = stateMachineRegistry;
}
StateMachine.Registry getStateMachineRegistry(RaftProperties properties) {
if (stateMachineRegistry != null) {
return stateMachineRegistry;
}
final Class<? extends StateMachine> smClass = properties.getClass(
STATEMACHINE_CLASS_KEY, null, StateMachine.class);
if (smClass == null) {
return STATEMACHINE_REGISTRY_DEFAULT;
}
return gid -> {
final RuntimeException exception;
try {
return ReflectionUtils.newInstance(smClass);
} catch(RuntimeException e) {
exception = e;
}
try {
final Class<?>[] argClasses = {RaftProperties.class};
return ReflectionUtils.newInstance(smClass, argClasses, properties);
} catch(RuntimeException e) {
exception.addSuppressed(e);
}
throw exception;
};
}
public static List<RaftPeer> toRaftPeers(
Collection<RaftServerProxy> servers) {
return servers.stream()
.map(MiniRaftCluster::toRaftPeer)
.collect(Collectors.toList());
}
public static RaftPeer toRaftPeer(RaftServerImpl s) {
return toRaftPeer(s.getProxy());
}
public static RaftPeer toRaftPeer(RaftServerProxy s) {
return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress());
}
public PeerChanges addNewPeers(int number, boolean startNewPeer)
throws IOException {
return addNewPeers(generateIds(number, servers.size()), startNewPeer);
}
public PeerChanges addNewPeers(String[] ids, boolean startNewPeer) throws IOException {
LOG.info("Add new peers {}", Arrays.asList(ids));
// create and add new RaftServers
final Collection<RaftServerProxy> newServers = putNewServers(
CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true);
startServers(newServers);
if (!startNewPeer) {
// start and then close, in order to bind the port
newServers.forEach(p -> p.close());
}
final Collection<RaftPeer> newPeers = toRaftPeers(newServers);
final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]);
newPeers.addAll(group.getPeers());
RaftPeer[] p = newPeers.toArray(new RaftPeer[newPeers.size()]);
group = RaftGroup.valueOf(group.getGroupId(), p);
return new PeerChanges(p, np, new RaftPeer[0]);
}
static void startServers(Iterable<? extends RaftServer> servers) throws IOException {
for(RaftServer s : servers) {
s.start();
}
}
/**
* prepare the peer list when removing some peers from the conf
*/
public PeerChanges removePeers(int number, boolean removeLeader,
Collection<RaftPeer> excluded) throws InterruptedException {
Collection<RaftPeer> peers = new ArrayList<>(group.getPeers());
List<RaftPeer> removedPeers = new ArrayList<>(number);
if (removeLeader) {
final RaftPeer leader = toRaftPeer(RaftTestUtil.waitForLeader(this));
Preconditions.assertTrue(!excluded.contains(leader));
peers.remove(leader);
removedPeers.add(leader);
}
List<RaftServerImpl> followers = getFollowers();
for (int i = 0, removed = 0; i < followers.size() &&
removed < (removeLeader ? number - 1 : number); i++) {
RaftPeer toRemove = toRaftPeer(followers.get(i));
if (!excluded.contains(toRemove)) {
peers.remove(toRemove);
removedPeers.add(toRemove);
removed++;
}
}
final RaftPeer[] p = peers.toArray(RaftPeer.emptyArray());
group = RaftGroup.valueOf(group.getGroupId(), p);
return new PeerChanges(p, RaftPeer.emptyArray(), removedPeers.toArray(RaftPeer.emptyArray()));
}
public void killServer(RaftPeerId id) {
LOG.info("killServer " + id);
servers.get(id).close();
}
public String printServers() {
return printServers(null);
}
public String printServers(RaftGroupId groupId) {
final StringBuilder b = new StringBuilder("printing ");
if (groupId != null) {
b.append(groupId);
} else {
b.append("ALL groups");
}
getRaftServerProxyStream(groupId).forEach(s -> b.append("\n ").append(s));
return b.toString();
}
public String printAllLogs() {
StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n");
for (RaftServerImpl s : iterateServerImpls()) {
b.append(" ");
b.append(s).append("\n");
final RaftLog log = s.getState().getLog();
if (log instanceof MemoryRaftLog) {
b.append(" ");
b.append(((MemoryRaftLog) log).getEntryString());
}
}
return b.toString();
}
public RaftServerImpl getLeaderAndSendFirstMessage(boolean ignoreException) throws IOException {
final RaftServerImpl leader = getLeader();
try(RaftClient client = createClient(leader.getId())) {
client.send(new RaftTestUtil.SimpleMessage("first msg to make leader ready"));
} catch (IOException e) {
if (!ignoreException) {
throw e;
}
}
return leader;
}
IllegalStateException newIllegalStateExceptionForNoLeaders(RaftGroupId groupId) {
final String g = groupId == null? "": " for " + groupId;
return new IllegalStateException("No leader yet " + g + ": " + printServers(groupId));
}
IllegalStateException newIllegalStateExceptionForMultipleLeaders(RaftGroupId groupId, List<RaftServerImpl> leaders) {
final String g = groupId == null? "": " for " + groupId;
return new IllegalStateException("Found multiple leaders" + g
+ " at the same term (=" + leaders.get(0).getState().getCurrentTerm()
+ "), leaders.size() = " + leaders.size() + " > 1, leaders = " + leaders
+ ": " + printServers(groupId));
}
/**
* Get leader for the single group case.
* Do not use this method if this cluster has multiple groups.
*
* @return the unique leader with the highest term. Or, return null if there is no leader.
* @throws IllegalStateException if there are multiple leaders with the same highest term.
*/
public RaftServerImpl getLeader() {
return getLeader(getLeaders(null), null, leaders -> {
throw newIllegalStateExceptionForMultipleLeaders(null, leaders);
});
}
RaftServerImpl getLeader(RaftGroupId groupId, Runnable handleNoLeaders,
Consumer<List<RaftServerImpl>> handleMultipleLeaders) {
return getLeader(getLeaders(groupId), handleNoLeaders, handleMultipleLeaders);
}
static RaftServerImpl getLeader(List<RaftServerImpl> leaders, Runnable handleNoLeaders,
Consumer<List<RaftServerImpl>> handleMultipleLeaders) {
if (leaders.isEmpty()) {
if (handleNoLeaders != null) {
handleNoLeaders.run();
}
return null;
} else if (leaders.size() > 1) {
if (handleMultipleLeaders != null) {
handleMultipleLeaders.accept(leaders);
}
return null;
} else {
return leaders.get(0);
}
}
/**
* @return the list of leaders with the highest term (i.e. leaders with a lower term are not included).
* from the given group.
*/
private List<RaftServerImpl> getLeaders(RaftGroupId groupId) {
final Stream<RaftServerImpl> serverAliveStream = getServerAliveStream(groupId);
final List<RaftServerImpl> leaders = new ArrayList<>();
serverAliveStream.filter(RaftServerImpl::isLeader).forEach(s -> {
if (leaders.isEmpty()) {
leaders.add(s);
} else {
final long leaderTerm = leaders.get(0).getState().getCurrentTerm();
final long term = s.getState().getCurrentTerm();
if (term >= leaderTerm) {
if (term > leaderTerm) {
leaders.clear();
}
leaders.add(s);
}
}
});
return leaders;
}
boolean isLeader(String leaderId) {
final RaftServerImpl leader = getLeader();
return leader != null && leader.getId().toString().equals(leaderId);
}
public List<RaftServerImpl> getFollowers() {
return getServerAliveStream()
.filter(RaftServerImpl::isFollower)
.collect(Collectors.toList());
}
public Collection<RaftServerProxy> getServers() {
return servers.values();
}
private Stream<RaftServerProxy> getRaftServerProxyStream(RaftGroupId groupId) {
return getServers().stream()
.filter(s -> groupId == null || s.containsGroup(groupId));
}
public Iterable<RaftServerImpl> iterateServerImpls() {
return CollectionUtils.as(getServers(), this::getRaftServerImpl);
}
private Stream<RaftServerImpl> getServerStream(RaftGroupId groupId) {
final Stream<RaftServerProxy> stream = getRaftServerProxyStream(groupId);
return groupId != null?
stream.filter(s -> s.containsGroup(groupId)).map(s -> RaftServerTestUtil.getRaftServerImpl(s, groupId))
: stream.flatMap(s -> RaftServerTestUtil.getRaftServerImpls(s).stream());
}
public Stream<RaftServerImpl> getServerAliveStream() {
return getServerAliveStream(getGroupId());
}
private Stream<RaftServerImpl> getServerAliveStream(RaftGroupId groupId) {
return getServerStream(groupId).filter(RaftServerImpl::isAlive);
}
private RetryPolicy getDefaultRetryPolicy() {
return RetryPolicies.retryForeverWithSleep(RETRY_INTERVAL_DEFAULT);
}
public RaftServerProxy getServer(RaftPeerId id) {
return servers.get(id);
}
public RaftServerImpl getRaftServerImpl(RaftPeerId id) {
return getRaftServerImpl(servers.get(id));
}
public RaftServerImpl getRaftServerImpl(RaftServerProxy proxy) {
return RaftServerTestUtil.getRaftServerImpl(proxy, getGroupId());
}
public List<RaftPeer> getPeers() {
return toRaftPeers(getServers());
}
public RaftGroup getGroup() {
return group;
}
public RaftClient createClient() {
return createClient(null, group);
}
public RaftClient createClient(RaftGroup g) {
return createClient(null, g);
}
public RaftClient createClientWithLeader() {
return createClient(getLeader().getId(), group);
}
public RaftClient createClientWithFollower() {
return createClient(getFollowers().get(0).getId(), group);
}
public RaftClient createClient(RaftPeerId leaderId) {
return createClient(leaderId, group);
}
public RaftClient createClient(RaftPeerId leaderId, RetryPolicy retryPolicy) {
return createClient(leaderId, group, null, retryPolicy);
}
public RaftClient createClient(RaftPeerId leaderId, RaftGroup group) {
return createClient(leaderId, group, null, getDefaultRetryPolicy());
}
public RaftClient createClient(RaftPeerId leaderId, RaftGroup group,
ClientId clientId) {
return createClient(leaderId, group, clientId, getDefaultRetryPolicy());
}
public RaftClient createClient(RaftPeerId leaderId, RaftGroup group,
ClientId clientId, RetryPolicy retryPolicy) {
RaftClient.Builder builder = RaftClient.newBuilder()
.setClientId(clientId)
.setRaftGroup(group)
.setLeaderId(leaderId)
.setProperties(properties)
.setParameters(parameters)
.setRetryPolicy(retryPolicy);
return builder.build();
}
public RaftClientRequest newRaftClientRequest(
ClientId clientId, RaftPeerId leaderId, Message message) {
return newRaftClientRequest(clientId, leaderId, DEFAULT_CALLID, message);
}
public RaftClientRequest newRaftClientRequest(
ClientId clientId, RaftPeerId leaderId, long callId, Message message) {
return new RaftClientRequest(clientId, leaderId, getGroupId(),
callId, message, RaftClientRequest.writeRequestType(), null);
}
public SetConfigurationRequest newSetConfigurationRequest(
ClientId clientId, RaftPeerId leaderId,
RaftPeer... peers) {
return new SetConfigurationRequest(clientId, leaderId, getGroupId(),
DEFAULT_CALLID, peers);
}
public void setConfiguration(RaftPeer... peers) throws IOException {
try(RaftClient client = createClient()) {
LOG.info("Start changing the configuration: {}", Arrays.asList(peers));
final RaftClientReply reply = client.setConfiguration(peers);
Preconditions.assertTrue(reply.isSuccess());
}
}
@Override
public void close() {
shutdown();
}
public void shutdown() {
LOG.info("************************************************************** ");
LOG.info("*** ");
LOG.info("*** Stopping " + getClass().getSimpleName());
LOG.info("*** ");
LOG.info("************************************************************** ");
LOG.info(printServers());
// TODO: classes like RaftLog may throw uncaught exception during shutdown (e.g. write after close)
ExitUtils.setTerminateOnUncaughtException(false);
final ExecutorService executor = Executors.newFixedThreadPool(servers.size(), Daemon::new);
getServers().forEach(proxy -> executor.submit(proxy::close));
try {
executor.shutdown();
// just wait for a few seconds
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch(InterruptedException e) {
LOG.warn("shutdown interrupted", e);
}
Optional.ofNullable(timer.get()).ifPresent(Timer::cancel);
ExitUtils.assertNotTerminated();
LOG.info(getClass().getSimpleName() + " shutdown completed");
}
/**
* Block all the incoming requests for the peer with leaderId. Also delay
* outgoing or incoming msg for all other peers.
*/
protected abstract void blockQueueAndSetDelay(String leaderId, int delayMs)
throws InterruptedException;
/**
* Try to enforce the leader of the cluster.
* @param leaderId ID of the targeted leader server.
* @return true if server has been successfully enforced to the leader, false
* otherwise.
*/
public boolean tryEnforceLeader(String leaderId) throws InterruptedException {
// do nothing and see if the given id is already a leader.
if (isLeader(leaderId)) {
return true;
}
// Blocking all other server's RPC read process to make sure a read takes at
// least ELECTION_TIMEOUT_MIN. In this way when the target leader request a
// vote, all non-leader servers can grant the vote.
// Disable the target leader server RPC so that it can request a vote.
blockQueueAndSetDelay(leaderId,
RaftServerConfigKeys.Rpc.TIMEOUT_MIN_DEFAULT.toIntExact(TimeUnit.MILLISECONDS));
// Reopen queues so that the vote can make progress.
blockQueueAndSetDelay(leaderId, 0);
return isLeader(leaderId);
}
/** Block/unblock the requests sent from the given source. */
public abstract void setBlockRequestsFrom(String src, boolean block);
public RaftGroupId getGroupId() {
return group.getGroupId();
}
}