blob: 426c370891a5dfed07e45e85b213cedba02b55b2 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.iotdb.cluster.server.member;
import org.apache.iotdb.cluster.client.async.AsyncClientPool;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
import org.apache.iotdb.cluster.client.sync.SyncClientPool;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
import org.apache.iotdb.cluster.exception.LogExecutionException;
import org.apache.iotdb.cluster.exception.UnknownLogTypeException;
import org.apache.iotdb.cluster.log.CommitLogCallback;
import org.apache.iotdb.cluster.log.CommitLogTask;
import org.apache.iotdb.cluster.log.HardState;
import org.apache.iotdb.cluster.log.Log;
import org.apache.iotdb.cluster.log.LogDispatcher;
import org.apache.iotdb.cluster.log.LogDispatcher.SendLogRequest;
import org.apache.iotdb.cluster.log.LogParser;
import org.apache.iotdb.cluster.log.catchup.CatchUpTask;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
import org.apache.iotdb.cluster.log.manage.RaftLogManager;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.IOUtils;
import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.IoTDBException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
* RaftMember process the common raft logic like leader election, log appending, catch-up and so on.
@SuppressWarnings("java:S3077") // reference volatile is enough
public abstract class RaftMember {
private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
public static final boolean USE_LOG_DISPATCHER = false;
private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time out";
private static final String MSG_FORWARD_ERROR =
"{}: encountered an error when forwarding {} to" + " {}";
private static final String MSG_NO_LEADER_COMMIT_INDEX =
"{}: Cannot request commit index from {}";
private static final String MSG_NO_LEADER_IN_SYNC = "{}: No leader is found when synchronizing";
public static final String MSG_LOG_IS_ACCEPTED = "{}: log {} is accepted";
* when there is no leader, wait for waitLeaderTimeMs before return a NoLeader response to the
* client.
private static long waitLeaderTimeMs = 60 * 1000L;
* when the leader of this node changes, the condition will be notified so other threads that wait
* on this may be woken.
private final Object waitLeaderCondition = new Object();
/** the lock is to make sure that only one thread can apply snapshot at the same time */
private final Object snapshotApplyLock = new Object();
protected Node thisNode = ClusterConstant.EMPTY_NODE;
/** the nodes that belong to the same raft group as thisNode. */
protected List<Node> allNodes;
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
/** the name of the member, to distinguish several members in the logs. */
String name;
/** to choose nodes to send request of joining cluster randomly. */
Random random = new Random();
/** when the node is a leader, this map is used to track log progress of each follower. */
Map<Node, Peer> peerMap;
* the current term of the node, this object also works as lock of some transactions of the member
* like elections.
AtomicLong term = new AtomicLong(0);
volatile NodeCharacter character = NodeCharacter.ELECTOR;
AtomicReference<Node> leader = new AtomicReference<>(ClusterConstant.EMPTY_NODE);
* the node that thisNode has voted for in this round of election, which prevents a node voting
* twice in a single election.
volatile Node voteFor;
* when this node is a follower, this records the unix-epoch timestamp when the last heartbeat
* arrived, and is reported in the timed member report to show how long the leader has been
* offline.
volatile long lastHeartbeatReceivedTime;
/** the raft logs are all stored and maintained in the log manager */
RaftLogManager logManager;
* the single thread pool that runs the heartbeat thread, which send heartbeats to the follower
* when this node is a leader, or start elections when this node is an elector.
ExecutorService heartBeatService;
* if set to true, the node will reject all log appends when the header of a group is removed from
* the cluster, the members of the group should no longer accept writes, but they still can be
* candidates for weak consistency reads and provide snapshots for the new data holders
volatile boolean readOnly = false;
* lastLogIndex when generating the previous member report, to show the log ingestion rate of the
* member by comparing it with the current last log index.
long lastReportedLogIndex;
/** the thread pool that runs catch-up tasks */
private ExecutorService catchUpService;
* lastCatchUpResponseTime records when is the latest response of each node's catch-up. There
* should be only one catch-up task for each node to avoid duplication, but the task may time out
* or the task may corrupt unexpectedly, and in that case, the next catch up should be enabled. So
* if we find a catch-up task that does not respond for long, we will start a new one instead of
* waiting for the previous one to finish.
private Map<Node, Long> lastCatchUpResponseTime = new ConcurrentHashMap<>();
* the pool that provides reusable Thrift clients to connect to other RaftMembers and execute RPC
* requests. It will be initialized according to the implementation of the subclasses
private AsyncClientPool asyncClientPool;
private AsyncClientPool asyncSendLogClientPool;
private SyncClientPool syncClientPool;
private AsyncClientPool asyncHeartbeatClientPool;
private SyncClientPool syncHeartbeatClientPool;
* when the commit progress is updated by a heartbeat, this object is notified so that we may know
* if this node is up-to-date with the leader, and whether the given consistency is reached
private Object syncLock = new Object();
* when this node sends logs to the followers, the send is performed in parallel in this pool, so
* that a slow or unavailable node will not block other nodes.
private ExecutorService appendLogThreadPool;
* when using sync server, this thread pool is used to convert serial operations (like sending
* heartbeats and asking for votes) into paralleled ones, so the process will not be blocked by
* one slow node.
private ExecutorService serialToParallelPool;
/** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
private ExecutorService commitLogPool;
* logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
* which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
private LogDispatcher logDispatcher;
protected RaftMember() {}
protected RaftMember(
String name,
AsyncClientPool asyncPool,
SyncClientPool syncPool,
AsyncClientPool asyncHeartbeatPool,
SyncClientPool syncHeartbeatPool) { = name;
this.asyncClientPool = asyncPool;
this.syncClientPool = syncPool;
this.asyncHeartbeatClientPool = asyncHeartbeatPool;
this.syncHeartbeatClientPool = syncHeartbeatPool;
this.asyncSendLogClientPool = asyncClientPool;
protected RaftMember(
String name,
AsyncClientPool asyncPool,
SyncClientPool syncPool,
AsyncClientPool asyncHeartbeatPool,
SyncClientPool syncHeartbeatPool,
AsyncClientPool asyncSendLogClientPool) { = name;
this.asyncClientPool = asyncPool;
this.syncClientPool = syncPool;
this.asyncHeartbeatClientPool = asyncHeartbeatPool;
this.syncHeartbeatClientPool = syncHeartbeatPool;
this.asyncSendLogClientPool = asyncSendLogClientPool;
* Start the heartbeat thread and the catch-up thread pool. Calling the method twice does not
* induce side effects.
public void start() {
if (heartBeatService != null) {
startBackGroundThreads();"{} started", name);
void startBackGroundThreads() {
heartBeatService =
r -> new Thread(r, name + "-HeartbeatThread@" + System.currentTimeMillis()));
catchUpService =
new ThreadFactoryBuilder().setNameFormat(getName() + "-CatchUpThread%d").build());
appendLogThreadPool =
Runtime.getRuntime().availableProcessors() * 10,
new ThreadFactoryBuilder().setNameFormat(getName() + "-AppendLog%d").build());
if (!config.isUseAsyncServer()) {
serialToParallelPool =
new ThreadPoolExecutor(
Math.max(allNodes.size(), Runtime.getRuntime().availableProcessors()),
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setNameFormat(getName() + "-SerialToParallel%d").build());
commitLogPool =
new ThreadFactoryBuilder().setNameFormat(getName() + "-CommitLog%d").build());
public String getName() {
return name;
public RaftLogManager getLogManager() {
return logManager;
public void setLogManager(RaftLogManager logManager) {
if (this.logManager != null) {
this.logManager = logManager;
* Stop the heartbeat thread and the catch-up thread pool. Calling the method twice does not
* induce side effects.
public void stop() {
if (heartBeatService == null) {
try {
heartBeatService.awaitTermination(10, TimeUnit.SECONDS);
catchUpService.awaitTermination(10, TimeUnit.SECONDS);
appendLogThreadPool.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
"Unexpected interruption when waiting for heartBeatService and catchUpService "
+ "to end",
if (serialToParallelPool != null) {
try {
serialToParallelPool.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("Unexpected interruption when waiting for asyncThreadPool to end", e);
if (commitLogPool != null) {
try {
commitLogPool.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("Unexpected interruption when waiting for commitLogPool to end", e);
catchUpService = null;
heartBeatService = null;
appendLogThreadPool = null;"Member {} stopped", name);
public void closeLogManager() {
if (logManager != null) {
* Process the HeartBeatRequest from the leader. If the term of the leader is smaller than the
* local term, reject the request by telling it the newest term. Else if the local logs are
* consistent with the leader's, commit them. Else help the leader find the last matched log. Also
* update the leadership, heartbeat timer and term of the local node.
public HeartBeatResponse processHeartbeatRequest(HeartBeatRequest request) {
logger.trace("{} received a heartbeat", name);
synchronized (term) {
long thisTerm = term.get();
long leaderTerm = request.getTerm();
HeartBeatResponse response = new HeartBeatResponse();
if (leaderTerm < thisTerm) {
// a leader with a term lower than this node is invalid, send it the local term to inform
// it to resign
if (logger.isTraceEnabled()) {
logger.trace("{} received a heartbeat from a stale leader {}", name, request.getLeader());
} else {
// try updating local term
stepDown(leaderTerm, true);
if (character != NodeCharacter.FOLLOWER) {
// interrupt current election
// the heartbeat comes from a valid leader, process it with the sub-class logic
processValidHeartbeatReq(request, response);
// tell the leader who I am in case of catch-up
synchronized (logManager) {
// tell the leader the local log progress so it may decide whether to perform a catch up
tryUpdateCommitIndex(leaderTerm, request.getCommitLogIndex(), request.getCommitLogTerm());
if (logger.isTraceEnabled()) {
logger.trace("{} received heartbeat from a valid leader {}", name, request.getLeader());
return response;
private void tryUpdateCommitIndex(long leaderTerm, long commitIndex, long commitTerm) {
if (leaderTerm >= term.get() && logManager.getCommitLogIndex() < commitIndex) {
// there are more local logs that can be committed, commit them in a ThreadPool so the
// heartbeat response will not be blocked
CommitLogTask commitLogTask = new CommitLogTask(logManager, commitIndex, commitTerm);
commitLogTask.registerCallback(new CommitLogCallback(this));
// if the log is not consistent, the commitment will be blocked until the leader makes the
// node catch up
if (commitLogPool != null && !commitLogPool.isShutdown()) {
"{}: Inconsistent log found, leaderCommit: {}-{}, localCommit: {}-{}, "
+ "localLast: {}-{}",
* Process an ElectionRequest. If the request comes from the last leader, accept it. Else decide
* whether to accept by examining the log status of the elector.
public long processElectionRequest(ElectionRequest electionRequest) {
synchronized (term) {
long currentTerm = term.get();
long response =
checkElectorTerm(currentTerm, electionRequest.getTerm(), electionRequest.getElector());
if (response != Response.RESPONSE_AGREE) {
return response;
// compare the log progress of the elector with this node
response = checkElectorLogProgress(electionRequest);
"{} sending response {} to the elector {}", name, response, electionRequest.getElector());
return response;
private long checkElectorTerm(long currentTerm, long electorTerm, Node elector) {
if (electorTerm < currentTerm) {
// the elector has a smaller term thus the request is invalid
"{} sending localTerm {} to the elector {} because it's term {} is smaller.",
return currentTerm;
if (currentTerm == electorTerm && voteFor != null && !Objects.equals(voteFor, elector)) {
// this node has voted in this round, but not for the elector, as one node cannot vote
// twice, reject the request
"{} sending rejection to the elector {} because member already has voted {} in this term {}.",
return Response.RESPONSE_REJECT;
if (electorTerm > currentTerm) {
// the elector has a larger term, this node should update its term first
"{} received an election from elector {} which has bigger term {} than localTerm {}, raftMember should step down first and then continue to decide whether to grant it's vote by log status.",
stepDown(electorTerm, false);
return Response.RESPONSE_AGREE;
* Process an AppendEntryRequest. First check the term of the leader, then parse the log and
* finally see if we can find a position to append the log.
public long appendEntry(AppendEntryRequest request) throws UnknownLogTypeException {
logger.debug("{} received an AppendEntryRequest: {}", name, request);
// the term checked here is that of the leader, not that of the log
long checkResult = checkRequestTerm(request.term, request.leader);
if (checkResult != Response.RESPONSE_AGREE) {
return checkResult;
long startTime = Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.getOperationStartTime();
int logByteSize = request.getEntry().length;
Log log = LogParser.getINSTANCE().parse(request.entry);
long result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, log);
logger.debug("{} AppendEntryRequest of {} completed with result {}", name, log, result);
return result;
/** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
public long appendEntries(AppendEntriesRequest request) throws UnknownLogTypeException {
logger.debug("{} received an AppendEntriesRequest", name);
// the term checked here is that of the leader, not that of the log
long checkResult = checkRequestTerm(request.term, request.leader);
if (checkResult != Response.RESPONSE_AGREE) {
return checkResult;
long response;
List<Log> logs = new ArrayList<>();
int logByteSize = 0;
long startTime = Timer.Statistic.RAFT_RECEIVER_LOG_PARSE.getOperationStartTime();
for (ByteBuffer buffer : request.getEntries()) {
Log log;
logByteSize = buffer.limit() - buffer.position();
try {
log = LogParser.getINSTANCE().parse(buffer);
} catch (BufferUnderflowException e) {
throw e;
response = appendEntries(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
if (logger.isDebugEnabled()) {
"{} AppendEntriesRequest of log size {} completed with result {}",
return response;
* Get an asynchronous heartbeat thrift client to the given node.
* @return an asynchronous thrift client or null if the caller tries to connect the local node.
public AsyncClient getAsyncHeartbeatClient(Node node) {
return getAsyncClient(node, asyncHeartbeatClientPool, false);
* NOTICE: client.putBack() must be called after use.
* @return the heartbeat client for the node
public Client getSyncHeartbeatClient(Node node) {
return getSyncClient(syncHeartbeatClientPool, node, false);
public void sendLogAsync(
Log log,
AtomicInteger voteCounter,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest request,
Peer peer) {
AsyncClient client = getSendLogAsyncClient(node);
if (client != null) {
AppendNodeEntryHandler handler =
getAppendNodeEntryHandler(log, voteCounter, node, leaderShipStale, newLeaderTerm, peer);
try {
client.appendEntry(request, handler);
logger.debug("{} sending a log to {}: {}", name, node, log);
} catch (Exception e) {
logger.warn("{} cannot append log to node {}", name, node, e);
private Client getSyncClient(SyncClientPool pool, Node node, boolean activatedOnly) {
if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
return null;
return pool.getClient(node, activatedOnly);
public NodeCharacter getCharacter() {
return character;
public void setCharacter(NodeCharacter character) {
if (!Objects.equals(character, this.character)) {"{} has become a {}", name, character);
this.character = character;
public long getLastHeartbeatReceivedTime() {
return lastHeartbeatReceivedTime;
public void setLastHeartbeatReceivedTime(long lastHeartbeatReceivedTime) {
this.lastHeartbeatReceivedTime = lastHeartbeatReceivedTime;
public Node getLeader() {
return leader.get();
public void setLeader(Node leader) {
if (!Objects.equals(leader, this.leader.get())) {
if (ClusterConstant.EMPTY_NODE.equals(leader) || leader == null) {"{} has been set to null in term {}", getName(), term.get());
} else if (!Objects.equals(leader, this.thisNode)) {"{} has become a follower of {} in term {}", getName(), leader, term.get());
synchronized (waitLeaderCondition) {
if (leader == null) {
} else {
if (!ClusterConstant.EMPTY_NODE.equals(this.leader.get())) {
public Collection<Node> getAllNodes() {
return allNodes;
public void setAllNodes(List<Node> allNodes) {
this.allNodes = allNodes;
public Map<Node, Long> getLastCatchUpResponseTime() {
return lastCatchUpResponseTime;
/** Sub-classes will add their own process of HeartBeatResponse in this method. */
public void processValidHeartbeatResp(HeartBeatResponse response, Node receiver) {}
/** The actions performed when the node wins in an election (becoming a leader). */
public void onElectionWins() {}
* Update the followers' log by sending logs whose index >= followerLastMatchedLogIndex to the
* follower. If some of the required logs are removed, also send the snapshot. <br>
* notice that if a part of data is in the snapshot, then it is not in the logs.
public void catchUp(Node follower, long lastLogIdx) {
// for one follower, there is at most one ongoing catch-up, so the same data will not be sent
// twice to the node
synchronized (catchUpService) {
// check if the last catch-up is still ongoing and does not time out yet
Long lastCatchupResp = lastCatchUpResponseTime.get(follower);
if (lastCatchupResp != null
&& System.currentTimeMillis() - lastCatchupResp < config.getCatchUpTimeoutMS()) {
logger.debug("{}: last catch up of {} is ongoing", name, follower);
} else {
// record the start of the catch-up
lastCatchUpResponseTime.put(follower, System.currentTimeMillis());
}"{}: Start to make {} catch up", name, follower);
if (!catchUpService.isShutdown()) {
Future<?> future =
catchUpService.submit(new CatchUpTask(follower, peerMap.get(follower), this, lastLogIdx));
() -> {
try {
} catch (InterruptedException e) {
} catch (ExecutionException e) {
logger.error("{}: Catch up task exits with unexpected exception", name, e);
* If the node is not a leader, the request will be sent to the leader or reports an error if
* there is no leader. Otherwise execute the plan locally (whether to send it to followers depends
* on the type of the plan).
public TSStatus executeNonQueryPlan(ExecutNonQueryReq request)
throws IOException, IllegalPathException {
// process the plan locally
PhysicalPlan plan = PhysicalPlan.Factory.create(request.planBytes);
TSStatus answer = executeNonQueryPlan(plan);
logger.debug("{}: Received a plan {}, executed answer: {}", name, plan, answer);
return answer;
* Execute a non-query plan. Subclass may have their individual implements.
* @param plan a non-query plan.
* @return A TSStatus indicating the execution result.
abstract TSStatus executeNonQueryPlan(PhysicalPlan plan);
* according to the consistency configuration, decide whether to execute syncLeader or not and
* throws exception when failed. Note that the write request will always try to sync leader
public void syncLeaderWithConsistencyCheck(boolean isWriteRequest)
throws CheckConsistencyException {
if (isWriteRequest) {
syncLeader(new StrongCheckConsistency());
} else {
switch (config.getConsistencyLevel()) {
syncLeader(new StrongCheckConsistency());
// if leaderCommitId bigger than localAppliedId a value,
syncLeader(new MidCheckConsistency());
// do nothing
// this should not happen in theory
throw new CheckConsistencyException(
"unknown consistency=" + config.getConsistencyLevel().name());
/** call back after syncLeader */
public interface CheckConsistency {
* deal leaderCommitId and localAppliedId after syncLeader
* @param leaderCommitId leader commit id
* @param localAppliedId local applied id
* @throws CheckConsistencyException maybe throw CheckConsistencyException, which is defined in
* implements.
void postCheckConsistency(long leaderCommitId, long localAppliedId)
throws CheckConsistencyException;
public static class MidCheckConsistency implements CheckConsistency {
* if leaderCommitId - localAppliedId > MaxReadLogLag, will throw
* @param leaderCommitId leader commit id
* @param localAppliedId local applied id
* @throws CheckConsistencyException
public void postCheckConsistency(long leaderCommitId, long localAppliedId)
throws CheckConsistencyException {
if (leaderCommitId == Long.MAX_VALUE
|| leaderCommitId == Long.MIN_VALUE
|| leaderCommitId - localAppliedId
> ClusterDescriptor.getInstance().getConfig().getMaxReadLogLag()) {
throw CheckConsistencyException.CHECK_MID_CONSISTENCY_EXCEPTION;
public static class StrongCheckConsistency implements CheckConsistency {
* if leaderCommitId > localAppliedId, will throw CHECK_STRONG_CONSISTENCY_EXCEPTION
* @param leaderCommitId leader commit id
* @param localAppliedId local applied id
* @throws CheckConsistencyException
public void postCheckConsistency(long leaderCommitId, long localAppliedId)
throws CheckConsistencyException {
if (leaderCommitId > localAppliedId
|| leaderCommitId == Long.MAX_VALUE
|| leaderCommitId == Long.MIN_VALUE) {
* Request and check the leader's commitId to see whether this node has caught up. If not, wait
* until this node catches up.
* @param checkConsistency check after syncleader
* @return true if the node has caught up, false otherwise
* @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
* value after timeout
public boolean syncLeader(CheckConsistency checkConsistency) throws CheckConsistencyException {
if (character == NodeCharacter.LEADER) {
return true;
if (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) {
// the leader has not been elected, we must assume the node falls behind
logger.warn(MSG_NO_LEADER_IN_SYNC, name);
return false;
if (character == NodeCharacter.LEADER) {
return true;
logger.debug("{}: try synchronizing with the leader {}", name, leader.get());
return waitUntilCatchUp(checkConsistency);
/** Wait until the leader of this node becomes known or time out. */
public void waitLeader() {
long startTime = System.currentTimeMillis();
while (leader.get() == null || ClusterConstant.EMPTY_NODE.equals(leader.get())) {
synchronized (waitLeaderCondition) {
try {
} catch (InterruptedException e) {
logger.error("Unexpected interruption when waiting for a leader", e);
long consumedTime = System.currentTimeMillis() - startTime;
if (consumedTime >= getWaitLeaderTimeMs()) {
logger.warn("{}: leader is still offline after {}ms", name, consumedTime);
logger.debug("{}: current leader is {}", name, leader.get());
* Request the leader's commit index and wait until the local commit index becomes not less than
* it.
* @return true if this node has caught up before timeout, false otherwise
* @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
* value after timeout
protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
throws CheckConsistencyException {
long leaderCommitId = Long.MIN_VALUE;
RequestCommitIndexResponse response;
try {
response = config.isUseAsyncServer() ? requestCommitIdAsync() : requestCommitIdSync();
leaderCommitId = response.getCommitLogIndex();
response.getTerm(), response.getCommitLogIndex(), response.getCommitLogTerm());
return syncLocalApply(leaderCommitId);
} catch (TException e) {
logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e);
} catch (InterruptedException e) {
logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e);
} finally {
if (checkConsistency != null) {
leaderCommitId, logManager.getMaxHaveAppliedCommitIndex());
return false;
* sync local applyId to leader commitId
* @param leaderCommitId leader commit id
* @return true if leaderCommitId <= localAppliedId
private boolean syncLocalApply(long leaderCommitId) {
long startTime = System.currentTimeMillis();
long waitedTime = 0;
long localAppliedId = 0;
while (waitedTime < RaftServer.getSyncLeaderMaxWaitMs()) {
try {
localAppliedId = logManager.getMaxHaveAppliedCommitIndex();
logger.debug("{}: synchronizing commitIndex {}/{}", name, localAppliedId, leaderCommitId);
if (leaderCommitId <= localAppliedId) {
// this node has caught up
if (logger.isDebugEnabled()) {
waitedTime = System.currentTimeMillis() - startTime;
logger.debug("{}: synchronized with the leader after {}ms", name, waitedTime);
return true;
// wait for next heartbeat to catch up
// the local node will not perform a commit here according to the leaderCommitId because
// the node may have some inconsistent logs with the leader
waitedTime = System.currentTimeMillis() - startTime;
synchronized (syncLock) {
} catch (InterruptedException e) {
logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e);
logger.warn("{}: Failed to synchronize with the leader after {}ms", name, waitedTime);
return false;
* Create a log for "plan" and append it locally and to all followers. Only the group leader can
* call this method. Will commit the log locally and send it to followers
* @return OK if over half of the followers accept the log or null if the leadership is lost
* during the appending
TSStatus processPlanLocally(PhysicalPlan plan) {
return processPlanLocallyV2(plan);
logger.debug("{}: Processing plan {}", name, plan);
if (readOnly) {
return StatusUtils.NODE_READ_ONLY;
long startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime();
PhysicalPlanLog log = new PhysicalPlanLog();
// assign term and index to the new log and append it
synchronized (logManager) {
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
// appendLogInGroup will serialize log, and set log size, and we will use the size after it
try {
if (appendLogInGroup(log)) {
return StatusUtils.OK;
} catch (LogExecutionException e) {
return handleLogExecutionException(log, e);
return StatusUtils.TIME_OUT;
private TSStatus processPlanLocallyV2(PhysicalPlan plan) {
logger.debug("{}: Processing plan {}", name, plan);
if (readOnly) {
return StatusUtils.NODE_READ_ONLY;
PhysicalPlanLog log = new PhysicalPlanLog();
// assign term and index to the new log and append it
SendLogRequest sendLogRequest;
long startTime =
synchronized (logManager) {
log.setCurrLogIndex(logManager.getLastLogIndex() + 1);
startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG_V2.getOperationStartTime();
// logDispatcher will serialize log, and set log size, and we will use the size after it
startTime = Statistic.RAFT_SENDER_BUILD_LOG_REQUEST.getOperationStartTime();
sendLogRequest = buildSendLogRequest(log);
startTime = Statistic.RAFT_SENDER_OFFER_LOG.getOperationStartTime();
try {
AppendLogResult appendLogResult =
switch (appendLogResult) {
case OK:
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
return StatusUtils.OK;
case TIME_OUT:
logger.debug("{}: log {} timed out...", name, log);
// abort the appending, the new leader will fix the local logs by catch-up
} catch (LogExecutionException e) {
return handleLogExecutionException(log, e);
return StatusUtils.TIME_OUT;
public SendLogRequest buildSendLogRequest(Log log) {
AtomicInteger voteCounter = new AtomicInteger(allNodes.size() / 2);
AtomicBoolean leaderShipStale = new AtomicBoolean(false);
AtomicLong newLeaderTerm = new AtomicLong(term.get());
long startTime = Statistic.RAFT_SENDER_BUILD_APPEND_REQUEST.getOperationStartTime();
AppendEntryRequest appendEntryRequest = buildAppendEntryRequest(log, false);
return new SendLogRequest(log, voteCounter, leaderShipStale, newLeaderTerm, appendEntryRequest);
* The maximum time to wait if there is no leader in the group, after which a
* LeadNotFoundException will be thrown.
static long getWaitLeaderTimeMs() {
return waitLeaderTimeMs;
static void setWaitLeaderTimeMs(long waitLeaderTimeMs) {
RaftMember.waitLeaderTimeMs = waitLeaderTimeMs;
@SuppressWarnings("java:S2274") // enable timeout
protected RequestCommitIndexResponse requestCommitIdAsync()
throws TException, InterruptedException {
// use Long.MAX_VALUE to indicate a timeout
RequestCommitIndexResponse response =
new RequestCommitIndexResponse(Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE);
AtomicReference<RequestCommitIndexResponse> commitIdResult = new AtomicReference<>(response);
AsyncClient client = getAsyncClient(leader.get());
if (client == null) {
// cannot connect to the leader
logger.warn(MSG_NO_LEADER_IN_SYNC, name);
return commitIdResult.get();
synchronized (commitIdResult) {
client.requestCommitIndex(getHeader(), new GenericHandler<>(leader.get(), commitIdResult));
return commitIdResult.get();
private RequestCommitIndexResponse requestCommitIdSync() throws TException {
Client client = getSyncClient(leader.get());
RequestCommitIndexResponse response;
if (client == null) {
// cannot connect to the leader
logger.warn(MSG_NO_LEADER_IN_SYNC, name);
// use Long.MAX_VALUE to indicate a timeouts
response = new RequestCommitIndexResponse(Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE);
return response;
try {
response = client.requestCommitIndex(getHeader());
} catch (TException e) {
throw e;
} finally {
return response;
* Tell the requester the current commit index if the local node is the leader of the group headed
* by header. Or forward it to the leader. Otherwise report an error.
* @return Long.MIN_VALUE if the node is not a leader, or the commitIndex
public long getCommitIndex() {
if (character == NodeCharacter.LEADER) {
return logManager.getCommitLogIndex();
} else {
return Long.MIN_VALUE;
public void setReadOnly() {
synchronized (logManager) {
readOnly = true;
public boolean isReadOnly() {
return readOnly;
public void initPeerMap() {
peerMap = new ConcurrentHashMap<>();
for (Node entry : allNodes) {
peerMap.computeIfAbsent(entry, k -> new Peer(logManager.getLastLogIndex()));
public Map<Node, Peer> getPeerMap() {
return peerMap;
/** @return true if there is a log whose index is "index" and term is "term", false otherwise */
public boolean matchLog(long index, long term) {
boolean matched = logManager.matchTerm(term, index);
logger.debug("Log {}-{} matched: {}", index, term, matched);
return matched;
public ExecutorService getSerialToParallelPool() {
return serialToParallelPool;
public Object getSyncLock() {
return syncLock;
/** Sub-classes will add their own process of HeartBeatRequest in this method. */
void processValidHeartbeatReq(HeartBeatRequest request, HeartBeatResponse response) {}
* Verify the validity of an ElectionRequest, and make itself a follower of the elector if the
* request is valid.
* @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
* smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
long checkElectorLogProgress(ElectionRequest electionRequest) {
long thatTerm = electionRequest.getTerm();
long thatLastLogIndex = electionRequest.getLastLogIndex();
long thatLastLogTerm = electionRequest.getLastLogTerm();
// check the log progress of the elector
long resp = checkLogProgress(thatLastLogIndex, thatLastLogTerm);
if (resp == Response.RESPONSE_AGREE) {
"{} accepted an election request, term:{}/{}, logIndex:{}/{}, logTerm:{}/{}",
lastHeartbeatReceivedTime = System.currentTimeMillis();
updateHardState(thatTerm, getVoteFor());
} else {
"{} rejected an election request, term:{}/{}, logIndex:{}/{}, logTerm:{}/{}",
return resp;
* Reject the election if the lastLogTerm of the candidate equals to the voter's but its
* lastLogIndex is smaller than the voter's Otherwise accept the election.
* @return Response.RESPONSE_AGREE if the elector is valid or the local term if the elector has a
* smaller term or Response.RESPONSE_LOG_MISMATCH if the elector has older logs.
long checkLogProgress(long lastLogIndex, long lastLogTerm) {
long response;
synchronized (logManager) {
if (logManager.isLogUpToDate(lastLogTerm, lastLogIndex)) {
response = Response.RESPONSE_AGREE;
} else {
response = Response.RESPONSE_LOG_MISMATCH;
return response;
* Forward a non-query plan to a node using the default client.
* @param plan a non-query plan
* @param node cannot be the local node
* @param header must be set for data group communication, set to null for meta group
* communication
* @return a TSStatus indicating if the forwarding is successful.
public TSStatus forwardPlan(PhysicalPlan plan, Node node, Node header) {
if (node == null || node.equals(thisNode)) {
logger.debug("{}: plan {} has no where to be forwarded", name, plan);
return StatusUtils.NO_LEADER;
logger.debug("{}: Forward {} to node {}", name, plan, node);
TSStatus status;
if (config.isUseAsyncServer()) {
status = forwardPlanAsync(plan, node, header);
} else {
status = forwardPlanSync(plan, node, header);
if (status.getCode() == TSStatusCode.NO_CONNECTION.getStatusCode()
&& (header == null || header.equals(getHeader()))
&& (leader.get() != null)
&& leader.get().equals(node)) {
// leader is down, trigger a new election by resetting heartbeat
lastHeartbeatReceivedTime = -1;
return status;
* Forward a non-query plan to "receiver" using "client".
* @param plan a non-query plan
* @param header to determine which DataGroupMember of "receiver" will process the request.
* @return a TSStatus indicating if the forwarding is successful.
private TSStatus forwardPlanAsync(PhysicalPlan plan, Node receiver, Node header) {
AsyncClient client = getAsyncClient(receiver);
if (client == null) {
logger.debug("{}: can not get client for node={}", name, receiver);
return StatusUtils.NO_CONNECTION
.setMessage(String.format("%s cannot be reached", receiver));
return forwardPlanAsync(plan, receiver, header, client);
public TSStatus forwardPlanAsync(
PhysicalPlan plan, Node receiver, Node header, AsyncClient client) {
try {
TSStatus tsStatus = SyncClientAdaptor.executeNonQuery(client, plan, header, receiver);
if (tsStatus == null) {
tsStatus = StatusUtils.TIME_OUT;
logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver);
return tsStatus;
} catch (IOException | TException e) {
logger.error(MSG_FORWARD_ERROR, name, plan, receiver, e);
return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
} catch (InterruptedException e) {
logger.warn("{}: forward {} to {} interrupted", name, plan, receiver);
return StatusUtils.TIME_OUT;
private TSStatus forwardPlanSync(PhysicalPlan plan, Node receiver, Node header) {
Client client = getSyncClient(receiver);
if (client == null) {
logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver);
return StatusUtils.TIME_OUT;
return forwardPlanSync(plan, receiver, header, client);
public TSStatus forwardPlanSync(PhysicalPlan plan, Node receiver, Node header, Client client) {
try {
ExecutNonQueryReq req = new ExecutNonQueryReq();
if (header != null) {
TSStatus tsStatus = client.executeNonQueryPlan(req);
if (tsStatus == null) {
tsStatus = StatusUtils.TIME_OUT;
logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver);
return tsStatus;
} catch (IOException e) {
logger.error(MSG_FORWARD_ERROR, name, plan, receiver, e);
return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
} catch (TException e) {
TSStatus status;
if (e.getCause() instanceof SocketTimeoutException) {
status = StatusUtils.TIME_OUT;
logger.warn(MSG_FORWARD_TIMEOUT, name, plan, receiver);
} else {
logger.error(MSG_FORWARD_ERROR, name, plan, receiver, e);
status = StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
// the connection may be broken, close it to avoid it being reused
return status;
} finally {
* Get an asynchronous thrift client of the given node.
* @return an asynchronous thrift client or null if the caller tries to connect the local node or
* the node cannot be reached.
public AsyncClient getAsyncClient(Node node) {
return getAsyncClient(node, asyncClientPool, true);
public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
return getAsyncClient(node, asyncClientPool, activatedOnly);
public AsyncClient getSendLogAsyncClient(Node node) {
return getAsyncClient(node, asyncSendLogClientPool, true);
private AsyncClient getAsyncClient(Node node, AsyncClientPool pool, boolean activatedOnly) {
if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
return null;
try {
return pool.getClient(node, activatedOnly);
} catch (IOException e) {
logger.warn("{} cannot connect to node {}", name, node, e);
return null;
* NOTICE: client.putBack() must be called after use. the caller needs to check to see if the
* return value is null
* @param node the node to connect
* @return the client if node is available, otherwise null
public Client getSyncClient(Node node) {
return getSyncClient(syncClientPool, node, true);
public Client getSyncClient(Node node, boolean activatedOnly) {
return getSyncClient(syncClientPool, node, activatedOnly);
public AtomicLong getTerm() {
return term;
private synchronized LogDispatcher getLogDispatcher() {
if (logDispatcher == null) {
logDispatcher = new LogDispatcher(this);
return logDispatcher;
* wait until "voteCounter" counts down to zero, which means the quorum has received the log, or
* one follower tells the node that it is no longer a valid leader, or a timeout is triggered.
@SuppressWarnings({"java:S2445"}) // safe synchronized
private AppendLogResult waitAppendResult(
AtomicInteger voteCounter, AtomicBoolean leaderShipStale, AtomicLong newLeaderTerm) {
// wait for the followers to vote
long startTime = Timer.Statistic.RAFT_SENDER_VOTE_COUNTER.getOperationStartTime();
synchronized (voteCounter) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
while (voteCounter.get() > 0
&& alreadyWait < RaftServer.getWriteOperationTimeoutMS()
&& voteCounter.get() != Integer.MAX_VALUE) {
try {
} catch (InterruptedException e) {
logger.warn("Unexpected interruption when sending a log", e);
alreadyWait = System.currentTimeMillis() - waitStart;
// a node has a larger term than the local node, so this node is no longer a valid leader
if (leaderShipStale.get()) {
stepDown(newLeaderTerm.get(), false);
return AppendLogResult.LEADERSHIP_STALE;
// the node knows it is no long the leader from other requests
if (character != NodeCharacter.LEADER) {
return AppendLogResult.LEADERSHIP_STALE;
// cannot get enough agreements within a certain amount of time
if (voteCounter.get() > 0) {
return AppendLogResult.TIME_OUT;
// voteCounter has counted down to zero
return AppendLogResult.OK;
void commitLog(Log log) throws LogExecutionException {
long startTime =
synchronized (logManager) {
startTime = Statistic.RAFT_SENDER_COMMIT_LOG_IN_MANAGER.getOperationStartTime();
// when using async applier, the log here may not be applied. To return the execution
// result, we must wait until the log is applied.
startTime = Statistic.RAFT_SENDER_COMMIT_WAIT_LOG_APPLY.getOperationStartTime();
synchronized (log) {
while (!log.isApplied()) {
// wait until the log is applied
try {
} catch (InterruptedException e) {
throw new LogExecutionException(e);
if (log.getException() != null) {
throw new LogExecutionException(log.getException());
private TSStatus handleLogExecutionException(PhysicalPlanLog log, LogExecutionException e) {
Throwable cause = IOUtils.getRootCause(e);
if (cause instanceof BatchProcessException) {
return RpcUtils.getStatus(Arrays.asList(((BatchProcessException) cause).getFailingStatus()));
TSStatus tsStatus =
StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, cause.getMessage());
if (cause instanceof RuntimeException) {
logger.error("RuntimeException during executing {}", log, cause);
if (cause instanceof IoTDBException) {
tsStatus.setCode(((IoTDBException) cause).getErrorCode());
if (!(cause instanceof PathNotExistException)
&& !(cause instanceof StorageGroupNotSetException)
&& !(cause instanceof PathAlreadyExistException)
&& !(cause instanceof StorageGroupAlreadySetException)) {
logger.debug("{} cannot be executed because ", log, cause);
return tsStatus;
AppendEntryRequest buildAppendEntryRequest(Log log, boolean serializeNow) {
AppendEntryRequest request = new AppendEntryRequest();
if (serializeNow) {
ByteBuffer byteBuffer = log.serialize();
// don't need lock because even if it's larger than the commitIndex when appending this log to
// logManager, the follower can handle the larger commitIndex with no effect
request.setPrevLogIndex(log.getCurrLogIndex() - 1);
try {
request.setPrevLogTerm(logManager.getTerm(log.getCurrLogIndex() - 1));
} catch (Exception e) {
logger.error("getTerm failed for newly append entries", e);
if (getHeader() != null) {
// data groups use header to find a particular DataGroupMember
return request;
* If "newTerm" is larger than the local term, give up the leadership, become a follower and reset
* heartbeat timer.
* @param fromLeader true if the request is from a leader, false if the request is from an
* elector.
public void stepDown(long newTerm, boolean fromLeader) {
synchronized (term) {
long currTerm = term.get();
// confirm that the heartbeat of the new leader hasn't come
if (currTerm < newTerm) {"{} has update it's term to {}", getName(), newTerm);
updateHardState(newTerm, getVoteFor());
if (fromLeader) {
// only when the request is from a leader should we update lastHeartbeatReceivedTime,
// otherwise the node may be stuck in FOLLOWER state by a stale node.
lastHeartbeatReceivedTime = System.currentTimeMillis();
public Node getThisNode() {
return thisNode;
public void setThisNode(Node thisNode) {
this.thisNode = thisNode;
/** @return the header of the data raft group or null if this is in a meta group. */
public Node getHeader() {
return null;
public void updateHardState(long currentTerm, Node voteFor) {
HardState state = logManager.getHardState();
public Node getVoteFor() {
return voteFor;
public void setVoteFor(Node voteFor) {
if (!Objects.equals(voteFor, this.voteFor)) {"{} has update it's voteFor to {}", getName(), voteFor);
this.voteFor = voteFor;
* Append a log to all followers in the group until half of them accept the log or the leadership
* is lost.
* @return true if the log is accepted by the quorum of the group, false otherwise
boolean appendLogInGroup(Log log) throws LogExecutionException {
if (allNodes.size() == 1) {
// single node group, no followers
long startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
return true;
int retryTime = 0;
while (true) {
long startTime = Timer.Statistic.RAFT_SENDER_SEND_LOG_TO_FOLLOWERS.getOperationStartTime();
logger.debug("{}: Send log {} to other nodes, retry times: {}", name, log, retryTime);
if (character != NodeCharacter.LEADER) {
logger.debug("Has lose leadership, so need not to send log");
return false;
AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2);
switch (result) {
case OK:
startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
logger.debug(MSG_LOG_IS_ACCEPTED, name, log);
return true;
case TIME_OUT:
logger.debug("{}: log {} timed out, retrying...", name, log);
try {
} catch (InterruptedException e) {
if (retryTime > 5) {
return false;
// abort the appending, the new leader will fix the local logs by catch-up
return false;
* Send the given log to all the followers and decide the result by how many followers return a
* success.
* @param requiredQuorum the number of votes needed to make the log valid, when requiredQuorum <=
* 0, half of the cluster size will be used.
* @return an AppendLogResult
private AppendLogResult sendLogToFollowers(Log log, int requiredQuorum) {
if (requiredQuorum <= 0) {
// use half of the members' size as the quorum
return sendLogToFollowers(log, new AtomicInteger(allNodes.size() / 2));
} else {
// make sure quorum does not exceed the number of members - 1
return sendLogToFollowers(
log, new AtomicInteger(Math.min(requiredQuorum, allNodes.size() - 1)));
* Send the log to each follower. Every time a follower returns a success, "voteCounter" is
* decreased by 1 and when it counts to 0, return an OK. If any follower returns a higher term
* than the local term, retire from leader and return a LEADERSHIP_STALE. If "voteCounter" is
* still positive after a certain time, return TIME_OUT.
* @param voteCounter a decreasing vote counter
* @return an AppendLogResult indicating a success or a failure and why
private AppendLogResult sendLogToFollowers(Log log, AtomicInteger voteCounter) {
if (allNodes.size() == 1) {
// single node group, does not need the agreement of others
return AppendLogResult.OK;
logger.debug("{} sending a log to followers: {}", name, log);
// if a follower has larger term than this node, leaderShipStale will be set to true and
// newLeaderTerm will store the follower's term
AtomicBoolean leaderShipStale = new AtomicBoolean(false);
AtomicLong newLeaderTerm = new AtomicLong(term.get());
AppendEntryRequest request = buildAppendEntryRequest(log, true);
try {
if (allNodes.size() > 2) {
// if there are more than one followers, send the requests in parallel so that one slow
// follower will not be blocked
for (Node node : allNodes) {
() ->
log, voteCounter, node, leaderShipStale, newLeaderTerm, request));
if (character != NodeCharacter.LEADER) {
return AppendLogResult.LEADERSHIP_STALE;
} else {
// there is only one member, send to it within this thread to reduce thread switching
// overhead
for (Node node : allNodes) {
sendLogToFollower(log, voteCounter, node, leaderShipStale, newLeaderTerm, request);
if (character != NodeCharacter.LEADER) {
return AppendLogResult.LEADERSHIP_STALE;
} catch (ConcurrentModificationException e) {
// retry if allNodes has changed
return AppendLogResult.TIME_OUT;
return waitAppendResult(voteCounter, leaderShipStale, newLeaderTerm);
/** Send "log" to "node". */
public void sendLogToFollower(
Log log,
AtomicInteger voteCounter,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest request) {
if (node.equals(thisNode)) {
* if the peer's log progress is too stale, wait until it catches up, otherwise, there may be
* too many waiting requests on the peer's side.
long startTime = Timer.Statistic.RAFT_SENDER_WAIT_FOR_PREV_LOG.getOperationStartTime();
Peer peer = peerMap.computeIfAbsent(node, k -> new Peer(logManager.getLastLogIndex()));
if (!waitForPrevLog(peer, log)) {
logger.warn("{}: node {} timed out when appending {}", name, node, log);
if (character != NodeCharacter.LEADER) {
if (config.isUseAsyncServer()) {
sendLogAsync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer);
} else {
sendLogSync(log, voteCounter, node, leaderShipStale, newLeaderTerm, request, peer);
* wait until the difference of log index between the matched log of peer and the given log become
* no bigger than maxLogDiff.
@SuppressWarnings("java:S2445") // safe synchronized
public boolean waitForPrevLog(Peer peer, Log log) {
final int maxLogDiff = config.getMaxNumOfLogsInMem();
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
// if the peer falls behind too much, wait until it catches up, otherwise there may be too
// many client threads in the peer
while (peer.getMatchIndex() < log.getCurrLogIndex() - maxLogDiff
&& character == NodeCharacter.LEADER
&& alreadyWait <= RaftServer.getWriteOperationTimeoutMS()) {
synchronized (peer) {
try {
} catch (InterruptedException e) {
logger.warn("Waiting for peer to catch up interrupted");
return false;
alreadyWait = System.currentTimeMillis() - waitStart;
return alreadyWait <= RaftServer.getWriteOperationTimeoutMS();
private void sendLogSync(
Log log,
AtomicInteger voteCounter,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
AppendEntryRequest request,
Peer peer) {
Client client = getSyncClient(node);
if (client != null) {
AppendNodeEntryHandler handler =
getAppendNodeEntryHandler(log, voteCounter, node, leaderShipStale, newLeaderTerm, peer);
try {
logger.debug("{} sending a log to {}: {}", name, node, log);
long result = client.appendEntry(request);
} catch (TException e) {
} catch (Exception e) {
} finally {
public AppendNodeEntryHandler getAppendNodeEntryHandler(
Log log,
AtomicInteger voteCounter,
Node node,
AtomicBoolean leaderShipStale,
AtomicLong newLeaderTerm,
Peer peer) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
return handler;
public void setAppendLogThreadPool(ExecutorService appendLogThreadPool) {
this.appendLogThreadPool = appendLogThreadPool;
public Object getSnapshotApplyLock() {
return snapshotApplyLock;
* Find the local previous log of "log". If such log is found, discard all local logs behind it
* and append "log" to it. Otherwise report a log mismatch.
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
* .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
private long appendEntry(long prevLogIndex, long prevLogTerm, long leaderCommit, Log log) {
long resp = checkPrevLogIndex(prevLogIndex);
if (resp != Response.RESPONSE_AGREE) {
return resp;
long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
long success;
synchronized (logManager) {
success = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, log);
if (success != -1) {
logger.debug("{} append a new log {}", name, log);
resp = Response.RESPONSE_AGREE;
} else {
// the incoming log points to an illegal position, reject it
return resp;
/** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
private boolean waitForPrevLog(long prevLogIndex) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
Object logUpdateCondition = logManager.getLogUpdateCondition(prevLogIndex);
long lastLogIndex = logManager.getLastLogIndex();
while (lastLogIndex < prevLogIndex && alreadyWait <= RaftServer.getWriteOperationTimeoutMS()) {
try {
// each time new logs are appended, this will be notified
synchronized (logUpdateCondition) {
lastLogIndex = logManager.getLastLogIndex();
if (lastLogIndex >= prevLogIndex) {
return true;
} catch (InterruptedException e) {
return false;
alreadyWait = System.currentTimeMillis() - waitStart;
return alreadyWait <= RaftServer.getWriteOperationTimeoutMS();
private long checkPrevLogIndex(long prevLogIndex) {
long lastLogIndex = logManager.getLastLogIndex();
long startTime = Timer.Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime();
if (lastLogIndex < prevLogIndex && !waitForPrevLog(prevLogIndex)) {
// there are logs missing between the incoming log and the local last log, and such logs
// did not come within a timeout, report a mismatch to the sender and it shall fix this
// through catch-up
Timer.Statistic.RAFT_RECEIVER_INDEX_DIFF.add(prevLogIndex - lastLogIndex);
return Response.RESPONSE_AGREE;
* Find the local previous log of "log". If such log is found, discard all local logs behind it
* and append "log" to it. Otherwise report a log mismatch.
* @param logs append logs
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
* .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
private long appendEntries(
long prevLogIndex, long prevLogTerm, long leaderCommit, List<Log> logs) {
"{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",
if (logs.isEmpty()) {
return Response.RESPONSE_AGREE;
long resp = checkPrevLogIndex(prevLogIndex);
if (resp != Response.RESPONSE_AGREE) {
return resp;
synchronized (logManager) {
long startTime = Timer.Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit, logs);
if (resp != -1) {
if (logger.isDebugEnabled()) {
logger.debug("{} append a new log list {}, commit to {}", name, logs, leaderCommit);
resp = Response.RESPONSE_AGREE;
} else {
// the incoming log points to an illegal position, reject it
return resp;
* Check the term of the AppendEntryRequest. The term checked is the term of the leader, not the
* term of the log. A new leader can still send logs of old leaders.
* @return -1 if the check is passed, >0 otherwise
private long checkRequestTerm(long leaderTerm, Node leader) {
long localTerm;
synchronized (term) {
// if the request comes before the heartbeat arrives, the local term may be smaller than the
// leader term
localTerm = term.get();
if (leaderTerm < localTerm) {
"{} rejected the AppendEntriesRequest for term: {}/{}", name, leaderTerm, localTerm);
return localTerm;
} else {
if (leaderTerm > localTerm) {
stepDown(leaderTerm, true);
} else {
lastHeartbeatReceivedTime = System.currentTimeMillis();
if (character != NodeCharacter.FOLLOWER) {
logger.debug("{} accepted the AppendEntryRequest for term: {}", name, localTerm);
return Response.RESPONSE_AGREE;
enum AppendLogResult {