blob: ef5e1a7ed0a76180be50036bea4a6be3e3c8f76d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.leader;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.util.AwaitForSignal;
import org.apache.ratis.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Comparator;
/**
* A {@link LogAppender} is for the leader to send appendEntries to a particular follower.
*/
public interface LogAppender {
Logger LOG = LoggerFactory.getLogger(LogAppender.class);
Class<? extends LogAppender> DEFAULT_CLASS = ReflectionUtils.getClass(
LogAppender.class.getName() + "Default", LogAppender.class);
/** Create the default {@link LogAppender}. */
static LogAppender newLogAppenderDefault(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
final Class<?>[] argClasses = {RaftServer.Division.class, LeaderState.class, FollowerInfo.class};
return ReflectionUtils.newInstance(DEFAULT_CLASS, argClasses, server, leaderState, f);
}
/** @return the server. */
RaftServer.Division getServer();
/** The same as getServer().getRaftServer().getServerRpc(). */
default RaftServerRpc getServerRpc() {
return getServer().getRaftServer().getServerRpc();
}
/** The same as getServer().getRaftLog(). */
default RaftLog getRaftLog() {
return getServer().getRaftLog();
}
/** Start this {@link LogAppender}. */
void start();
/** Is this {@link LogAppender} running? */
boolean isRunning();
/** Stop this {@link LogAppender}. */
void stop();
/** @return the leader state. */
LeaderState getLeaderState();
/** @return the follower information for this {@link LogAppender}. */
FollowerInfo getFollower();
/** The same as getFollower().getPeer().getId(). */
default RaftPeerId getFollowerId() {
return getFollower().getPeer().getId();
}
/** @return the call id for the next {@link AppendEntriesRequestProto}. */
long getCallId();
/** @return the a {@link Comparator} for comparing call ids. */
Comparator<Long> getCallIdComparator();
/**
* Create a {@link AppendEntriesRequestProto} object using the {@link FollowerInfo} of this {@link LogAppender}.
* The {@link AppendEntriesRequestProto} object may contain zero or more log entries.
* When there is zero log entries, the {@link AppendEntriesRequestProto} object is a heartbeat.
*
* @param callId The call id of the returned request.
* @param heartbeat the returned request must be a heartbeat.
*
* @return a new {@link AppendEntriesRequestProto} object.
*/
AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) throws RaftLogIOException;
/** @return a new {@link InstallSnapshotRequestProto} object. */
InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex);
/** @return an {@link Iterable} of {@link InstallSnapshotRequestProto} for sending the given snapshot. */
Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot);
/**
* Should this {@link LogAppender} send a snapshot to the follower?
*
* @return the snapshot if it should install a snapshot; otherwise, return null.
*/
default SnapshotInfo shouldInstallSnapshot() {
// we should install snapshot if the follower needs to catch up and:
// 1. there is no local log entry but there is snapshot
// 2. or the follower's next index is smaller than the log start index
// 3. or the follower is bootstrapping and has not installed any snapshot yet
final FollowerInfo follower = getFollower();
final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower);
final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot();
if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) {
if (snapshot == null) {
// Leader cannot send null snapshot to follower. Hence, acknowledge InstallSnapshot attempt (even though it
// was not attempted) so that follower can come out of staging state after appending log entries.
follower.setAttemptedToInstallSnapshot();
} else {
return snapshot;
}
}
final long followerNextIndex = getFollower().getNextIndex();
if (followerNextIndex < getRaftLog().getNextIndex()) {
final long logStartIndex = getRaftLog().getStartIndex();
if (followerNextIndex < logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
return snapshot;
}
}
return null;
}
/** Define how this {@link LogAppender} should run. */
void run() throws InterruptedException, IOException;
/**
* Get the {@link AwaitForSignal} for events, which can be:
* (1) new log entries available,
* (2) log indices changed, or
* (3) a snapshot installation completed.
*/
AwaitForSignal getEventAwaitForSignal();
/** The same as getEventAwaitForSignal().signal(). */
default void notifyLogAppender() {
getEventAwaitForSignal().signal();
}
/** Should the leader send appendEntries RPC to the follower? */
default boolean shouldSendAppendEntries() {
return hasAppendEntries() || getHeartbeatWaitTimeMs() <= 0;
}
/** Does it have outstanding appendEntries? */
default boolean hasAppendEntries() {
return getFollower().getNextIndex() < getRaftLog().getNextIndex();
}
/** @return the wait time in milliseconds to send the next heartbeat. */
default long getHeartbeatWaitTimeMs() {
final int min = getServer().properties().minRpcTimeoutMs();
// time remaining to send a heartbeat
final long heartbeatRemainingTimeMs = min/2 - getFollower().getLastRpcResponseTime().elapsedTimeMs();
// avoid sending heartbeat too frequently
final long noHeartbeatTimeMs = min/4 - getFollower().getLastHeartbeatSendTime().elapsedTimeMs();
return Math.max(heartbeatRemainingTimeMs, noHeartbeatTimeMs);
}
/** Handle the event that the follower has replied a term. */
default boolean onFollowerTerm(long followerTerm) {
synchronized (getServer()) {
return isRunning() && getLeaderState().onFollowerTerm(getFollower(), followerTerm);
}
}
}