blob: 26430736769ac623414d648e685469f98e926b15 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.storage.MemoryRaftLog;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
public abstract class MiniRaftCluster {
public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class);
public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName();
public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class";
public static final Class<? extends StateMachine> STATEMACHINE_CLASS_DEFAULT = BaseStateMachine.class;
public static abstract class Factory<CLUSTER extends MiniRaftCluster> {
public interface Get<CLUSTER extends MiniRaftCluster> {
Supplier<RaftProperties> properties = JavaUtils.memoize(() -> new RaftProperties());
Factory<CLUSTER> getFactory();
default RaftProperties getProperties() {
return properties.get();
}
default CLUSTER newCluster(int numPeers) throws IOException {
return getFactory().newCluster(numPeers, getProperties());
}
}
public abstract CLUSTER newCluster(
String[] ids, RaftProperties prop);
public CLUSTER newCluster(int numServer, RaftProperties prop) {
return newCluster(generateIds(numServer, 0), prop);
}
}
public static abstract class RpcBase extends MiniRaftCluster {
public RpcBase(String[] ids, RaftProperties properties, Parameters parameters) {
super(ids, properties, parameters);
}
@Override
public void setBlockRequestsFrom(String src, boolean block) {
if (block) {
BlockRequestHandlingInjection.getInstance().blockRequestor(src);
} else {
BlockRequestHandlingInjection.getInstance().unblockRequestor(src);
}
}
public static int getPort(RaftPeerId id, RaftGroup group) {
final List<RaftPeer> peers = group.getPeers().stream()
.filter(raftPeer -> raftPeer.getId().equals(id)).collect(Collectors.toList());
final String address = peers.isEmpty() ? null : peers.get(0).getAddress();
final InetSocketAddress inetAddress = address != null?
NetUtils.createSocketAddr(address): NetUtils.createLocalServerAddress();
return inetAddress.getPort();
}
}
public static class PeerChanges {
public final RaftPeer[] allPeersInNewConf;
public final RaftPeer[] newPeers;
public final RaftPeer[] removedPeers;
public PeerChanges(RaftPeer[] all, RaftPeer[] newPeers, RaftPeer[] removed) {
this.allPeersInNewConf = all;
this.newPeers = newPeers;
this.removedPeers = removed;
}
}
public static RaftGroup initRaftGroup(Collection<String> ids) {
final RaftPeer[] peers = ids.stream()
.map(RaftPeerId::valueOf)
.map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
.toArray(RaftPeer[]::new);
return new RaftGroup(RaftGroupId.randomId(), peers);
}
private File getStorageDir(RaftPeerId id) {
return new File(BaseTest.getRootTestDir()
+ "/" + getClass().getSimpleName() + "/" + id);
}
public static String[] generateIds(int numServers, int base) {
String[] ids = new String[numServers];
for (int i = 0; i < numServers; i++) {
ids[i] = "s" + (i + base);
}
return ids;
}
protected RaftGroup group;
protected final RaftProperties properties;
protected final Parameters parameters;
protected final Map<RaftPeerId, RaftServerProxy> servers = new ConcurrentHashMap<>();
private final Timer timer;
protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) {
this.group = initRaftGroup(Arrays.asList(ids));
this.properties = new RaftProperties(properties);
this.parameters = parameters;
this.timer = JavaUtils.runRepeatedly(() -> LOG.info("TIMED-PRINT: " + printServers()),
10, 10, TimeUnit.SECONDS);
ExitUtils.disableSystemExit();
}
public RaftProperties getProperties() {
return properties;
}
public MiniRaftCluster initServers() {
if (servers.isEmpty()) {
putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), true);
}
return this;
}
private RaftServerProxy putNewServer(RaftPeerId id, boolean format) {
return putNewServer(id, group, format);
}
public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean format) {
final RaftServerProxy s = newRaftServer(id, group, format);
Preconditions.assertTrue(servers.put(id, s) == null);
return s;
}
private Collection<RaftServerProxy> putNewServers(
Iterable<RaftPeerId> peers, boolean format) {
return StreamSupport.stream(peers.spliterator(), false)
.map(id -> putNewServer(id, format))
.collect(Collectors.toList());
}
public void start() {
LOG.info(".............................................................. ");
LOG.info("... ");
LOG.info("... Starting " + getClass().getSimpleName());
LOG.info("... ");
LOG.info(".............................................................. ");
initServers();
servers.values().forEach(RaftServer::start);
}
/**
* start a stopped server again.
*/
public void restartServer(RaftPeerId newId, boolean format) throws IOException {
killServer(newId);
servers.remove(newId);
startServer(putNewServer(newId, format), true);
}
public void restart(boolean format) throws IOException {
shutdown();
List<RaftPeerId> idList = new ArrayList<>(servers.keySet());
servers.clear();
putNewServers(idList, format);
start();
}
public int getMaxTimeout() {
return RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS);
}
private RaftServerProxy newRaftServer(RaftPeerId id, RaftGroup group,
boolean format) {
try {
final File dir = getStorageDir(id);
if (format) {
FileUtils.deleteFully(dir);
LOG.info("Formatted directory {}", dir);
}
final RaftProperties prop = new RaftProperties(properties);
RaftServerConfigKeys.setStorageDir(prop, dir);
final StateMachine stateMachine = getStateMachine4Test(properties);
return newRaftServer(id, stateMachine, group, prop);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
protected abstract RaftServerProxy newRaftServer(
RaftPeerId id, StateMachine stateMachine, RaftGroup group,
RaftProperties properties) throws IOException;
static StateMachine getStateMachine4Test(RaftProperties properties) {
final Class<? extends StateMachine> smClass = properties.getClass(
STATEMACHINE_CLASS_KEY,
STATEMACHINE_CLASS_DEFAULT,
StateMachine.class);
final RuntimeException exception;
try {
return ReflectionUtils.newInstance(smClass);
} catch(RuntimeException e) {
exception = e;
}
try {
final Class<?>[] argClasses = {RaftProperties.class};
return ReflectionUtils.newInstance(smClass, argClasses, properties);
} catch(RuntimeException e) {
exception.addSuppressed(e);
}
throw exception;
}
public static List<RaftPeer> toRaftPeers(
Collection<RaftServerProxy> servers) {
return servers.stream()
.map(MiniRaftCluster::toRaftPeer)
.collect(Collectors.toList());
}
public static RaftPeer toRaftPeer(RaftServerImpl s) {
return toRaftPeer(s.getProxy());
}
public static RaftPeer toRaftPeer(RaftServerProxy s) {
return new RaftPeer(s.getId(), s.getServerRpc().getInetSocketAddress());
}
public PeerChanges addNewPeers(int number, boolean startNewPeer)
throws IOException {
return addNewPeers(generateIds(number, servers.size()), startNewPeer);
}
public PeerChanges addNewPeers(String[] ids, boolean startNewPeer) {
LOG.info("Add new peers {}", Arrays.asList(ids));
// create and add new RaftServers
final Collection<RaftServerProxy> newServers = putNewServers(
CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true);
newServers.forEach(s -> startServer(s, true));
if (!startNewPeer) {
// start and then close, in order to bind the port
newServers.forEach(p -> p.close());
}
final Collection<RaftPeer> newPeers = toRaftPeers(newServers);
final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]);
newPeers.addAll(group.getPeers());
RaftPeer[] p = newPeers.toArray(new RaftPeer[newPeers.size()]);
group = new RaftGroup(group.getGroupId(), p);
return new PeerChanges(p, np, new RaftPeer[0]);
}
protected void startServer(RaftServer server, boolean startService) {
if (startService) {
server.start();
}
}
public void startServer(RaftPeerId id) {
startServer(getServer(id), true);
}
/**
* prepare the peer list when removing some peers from the conf
*/
public PeerChanges removePeers(int number, boolean removeLeader,
Collection<RaftPeer> excluded) {
Collection<RaftPeer> peers = new ArrayList<>(group.getPeers());
List<RaftPeer> removedPeers = new ArrayList<>(number);
if (removeLeader) {
final RaftPeer leader = toRaftPeer(getLeader());
assert !excluded.contains(leader);
peers.remove(leader);
removedPeers.add(leader);
}
List<RaftServerImpl> followers = getFollowers();
for (int i = 0, removed = 0; i < followers.size() &&
removed < (removeLeader ? number - 1 : number); i++) {
RaftPeer toRemove = toRaftPeer(followers.get(i));
if (!excluded.contains(toRemove)) {
peers.remove(toRemove);
removedPeers.add(toRemove);
removed++;
}
}
RaftPeer[] p = peers.toArray(new RaftPeer[peers.size()]);
group = new RaftGroup(group.getGroupId(), p);
return new PeerChanges(p, new RaftPeer[0],
removedPeers.toArray(new RaftPeer[removedPeers.size()]));
}
public void killServer(RaftPeerId id) {
LOG.info("killServer " + id);
servers.get(id).close();
}
public String printServers() {
return printServers(null);
}
public String printServers(RaftGroupId groupId) {
final StringBuilder b = new StringBuilder("printing ");
if (groupId != null) {
b.append(groupId);
} else {
b.append("ALL groups");
}
getServers().stream().filter(
s -> {
if (groupId == null) {
return true;
}
try {
return groupId.equals(s.getImpl().getGroupId());
} catch (IOException e) {
return false;
}
})
.forEach(s -> b.append("\n ").append(s));
return b.toString();
}
public String printAllLogs() {
StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n");
for (RaftServerImpl s : iterateServerImpls()) {
b.append(" ");
b.append(s).append("\n");
final RaftLog log = s.getState().getLog();
if (log instanceof MemoryRaftLog) {
b.append(" ");
b.append(((MemoryRaftLog) log).getEntryString());
}
}
return b.toString();
}
public RaftServerImpl getLeaderAndSendFirstMessage() throws IOException {
return getLeaderAndSendFirstMessage(false);
}
public RaftServerImpl getLeaderAndSendFirstMessage(boolean ignoreException) throws IOException {
final RaftServerImpl leader = getLeader();
try(RaftClient client = createClient(leader.getId())) {
client.send(new RaftTestUtil.SimpleMessage("first msg to make leader ready"));
} catch (IOException e) {
if (!ignoreException) {
throw e;
}
}
return leader;
}
public RaftServerImpl getLeader() {
return getLeader((RaftGroupId)null);
}
public RaftServerImpl getLeader(RaftGroupId groupId) {
Stream<RaftServerImpl> stream = getServerAliveStream();
if (groupId != null) {
stream = stream.filter(s -> groupId.equals(s.getGroupId()));
}
return getLeader(stream);
}
static RaftServerImpl getLeader(Stream<RaftServerImpl> serverAliveStream) {
final List<RaftServerImpl> leaders = new ArrayList<>();
serverAliveStream.filter(RaftServerImpl::isLeader).forEach(s -> {
if (leaders.isEmpty()) {
leaders.add(s);
} else {
final long leaderTerm = leaders.get(0).getState().getCurrentTerm();
final long term = s.getState().getCurrentTerm();
if (term >= leaderTerm) {
if (term > leaderTerm) {
leaders.clear();
}
leaders.add(s);
}
}
});
if (leaders.isEmpty()) {
return null;
} else if (leaders.size() > 1) {
throw new IllegalStateException(leaders
+ ", leaders.size() = " + leaders.size() + " > 1");
}
return leaders.get(0);
}
boolean isLeader(String leaderId) throws InterruptedException {
final RaftServerImpl leader = getLeader();
return leader != null && leader.getId().toString().equals(leaderId);
}
public List<RaftServerImpl> getFollowers() {
return getServerAliveStream()
.filter(RaftServerImpl::isFollower)
.collect(Collectors.toList());
}
public Collection<RaftServerProxy> getServers() {
return servers.values();
}
public Iterable<RaftServerImpl> iterateServerImpls() {
return CollectionUtils.as(getServers(), RaftTestUtil::getImplAsUnchecked);
}
public static Stream<RaftServerImpl> getServerStream(Collection<RaftServerProxy> servers) {
return servers.stream().map(RaftTestUtil::getImplAsUnchecked);
}
public Stream<RaftServerImpl> getServerStream() {
return getServerStream(getServers());
}
public Stream<RaftServerImpl> getServerAliveStream() {
return getServerStream(getServers()).filter(RaftServerImpl::isAlive);
}
public RaftServerProxy getServer(RaftPeerId id) {
return servers.get(id);
}
public List<RaftPeer> getPeers() {
return toRaftPeers(getServers());
}
public RaftGroup getGroup() {
return group;
}
public RaftClient createClient() {
return createClient(null, group);
}
public RaftClient createClient(RaftGroup g) {
return createClient(null, g);
}
public RaftClient createClientWithLeader() {
return createClient(getLeader().getId(), group);
}
public RaftClient createClientWithFollower() {
return createClient(getFollowers().get(0).getId(), group);
}
public RaftClient createClient(RaftPeerId leaderId) {
return createClient(leaderId, group);
}
public RaftClient createClient(RaftPeerId leaderId, RaftGroup group) {
return RaftClient.newBuilder()
.setRaftGroup(group)
.setLeaderId(leaderId)
.setProperties(properties)
.setParameters(parameters)
.build();
}
public RaftClientRequest newRaftClientRequest(
ClientId clientId, RaftPeerId leaderId, Message message) {
return newRaftClientRequest(clientId, leaderId,
DEFAULT_CALLID, DEFAULT_SEQNUM, message);
}
public RaftClientRequest newRaftClientRequest(
ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, Message message) {
return new RaftClientRequest(clientId, leaderId, getGroupId(),
callId, seqNum, message, RaftClientRequest.writeRequestType(ReplicationLevel.MAJORITY));
}
public SetConfigurationRequest newSetConfigurationRequest(
ClientId clientId, RaftPeerId leaderId,
RaftPeer... peers) {
return new SetConfigurationRequest(clientId, leaderId, getGroupId(),
DEFAULT_CALLID, peers);
}
public void setConfiguration(RaftPeer... peers) throws IOException {
final RaftServerImpl leader = getLeader();
final SetConfigurationRequest r = newSetConfigurationRequest(
ClientId.randomId(), leader.getId(), peers);
LOG.info("Start changing the configuration: {}", r);
leader.setConfiguration(r);
}
public void shutdown() {
LOG.info("************************************************************** ");
LOG.info("*** ");
LOG.info("*** Stopping " + getClass().getSimpleName());
LOG.info("*** ");
LOG.info("************************************************************** ");
timer.cancel();
getServerAliveStream().map(RaftServerImpl::getProxy).forEach(RaftServerProxy::close);
ExitUtils.assertNotTerminated();
}
/**
* Block all the incoming requests for the peer with leaderId. Also delay
* outgoing or incoming msg for all other peers.
*/
protected abstract void blockQueueAndSetDelay(String leaderId, int delayMs)
throws InterruptedException;
/**
* Try to enforce the leader of the cluster.
* @param leaderId ID of the targeted leader server.
* @return true if server has been successfully enforced to the leader, false
* otherwise.
*/
public boolean tryEnforceLeader(String leaderId) throws InterruptedException {
// do nothing and see if the given id is already a leader.
if (isLeader(leaderId)) {
return true;
}
// Blocking all other server's RPC read process to make sure a read takes at
// least ELECTION_TIMEOUT_MIN. In this way when the target leader request a
// vote, all non-leader servers can grant the vote.
// Disable the target leader server RPC so that it can request a vote.
blockQueueAndSetDelay(leaderId,
RaftServerConfigKeys.Rpc.TIMEOUT_MIN_DEFAULT.toInt(TimeUnit.MILLISECONDS));
// Reopen queues so that the vote can make progress.
blockQueueAndSetDelay(leaderId, 0);
return isLeader(leaderId);
}
/** Block/unblock the requests sent from the given source. */
public abstract void setBlockRequestsFrom(String src, boolean block);
public RaftGroupId getGroupId() {
return group.getGroupId();
}
}