blob: 0c91427e57290fe1d21dc0d459cc5f59ecef82d1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.leader;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.rpc.CallId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Comparator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* The default implementation of {@link LogAppender}
* using {@link org.apache.ratis.server.protocol.RaftServerProtocol}.
*/
class LogAppenderDefault extends LogAppenderBase {
LogAppenderDefault(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
super(server, leaderState, f);
}
@Override
public long getCallId() {
return CallId.get();
}
@Override
public Comparator<Long> getCallIdComparator() {
return CallId.getComparator();
}
/** Send an appendEntries RPC; retry indefinitely. */
private AppendEntriesReplyProto sendAppendEntriesWithRetries()
throws InterruptedException, InterruptedIOException, RaftLogIOException {
int retry = 0;
AppendEntriesRequestProto request = newAppendEntriesRequest(CallId.getAndIncrement(), false);
while (isRunning()) { // keep retrying for IOException
try {
if (request == null || request.getEntriesCount() == 0) {
request = newAppendEntriesRequest(CallId.getAndIncrement(), false);
}
if (request == null) {
LOG.trace("{} no entries to send now, wait ...", this);
return null;
} else if (!isRunning()) {
LOG.info("{} is stopped. Skip appendEntries.", this);
return null;
}
getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0);
final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
getFollower().updateLastRpcResponseTime();
getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit());
return r;
} catch (InterruptedIOException | RaftLogIOException e) {
throw e;
} catch (IOException ioe) {
// TODO should have more detailed retry policy here.
if (retry++ % 10 == 0) { // to reduce the number of messages
LOG.warn("{}: Failed to appendEntries (retry={}): {}", this, retry++, ioe);
}
handleException(ioe);
}
if (isRunning()) {
getServer().properties().rpcSleepTime().sleep();
}
}
return null;
}
private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException {
String requestId = UUID.randomUUID().toString();
InstallSnapshotReplyProto reply = null;
try {
for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
getFollower().updateLastRpcSendTime(false);
reply = getServerRpc().installSnapshot(request);
getFollower().updateLastRpcResponseTime();
if (!reply.getServerReply().getSuccess()) {
return reply;
}
}
} catch (InterruptedIOException iioe) {
throw iioe;
} catch (Exception ioe) {
LOG.warn("{}: Failed to installSnapshot {}: {}", this, snapshot, ioe);
handleException(ioe);
return null;
}
if (reply != null) {
getFollower().setSnapshotIndex(snapshot.getTermIndex().getIndex());
LOG.info("{}: installSnapshot {} successfully", this, snapshot);
getServer().getRaftServerMetrics().onSnapshotInstalled();
}
return reply;
}
@Override
public void run() throws InterruptedException, IOException {
while (isRunning()) {
if (shouldSendAppendEntries()) {
SnapshotInfo snapshot = shouldInstallSnapshot();
if (snapshot != null) {
LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower",
this, getFollower().getNextIndex(), getRaftLog().getStartIndex(), snapshot);
final InstallSnapshotReplyProto r = installSnapshot(snapshot);
if (r != null) {
switch (r.getResult()) {
case NOT_LEADER:
onFollowerTerm(r.getTerm());
break;
case SUCCESS:
case SNAPSHOT_UNAVAILABLE:
case ALREADY_INSTALLED:
getFollower().setAttemptedToInstallSnapshot();
break;
default:
break;
}
}
// otherwise if r is null, retry the snapshot installation
} else {
final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
if (r != null) {
handleReply(r);
}
}
}
if (isRunning() && !hasAppendEntries()) {
getEventAwaitForSignal().await(getHeartbeatWaitTimeMs(), TimeUnit.MILLISECONDS);
}
getLeaderState().checkHealth(getFollower());
}
}
private void handleReply(AppendEntriesReplyProto reply) throws IllegalArgumentException {
if (reply != null) {
switch (reply.getResult()) {
case SUCCESS:
final long oldNextIndex = getFollower().getNextIndex();
final long nextIndex = reply.getNextIndex();
if (nextIndex < oldNextIndex) {
throw new IllegalStateException("nextIndex=" + nextIndex
+ " < oldNextIndex=" + oldNextIndex
+ ", reply=" + ServerStringUtils.toAppendEntriesReplyString(reply));
}
if (nextIndex > oldNextIndex) {
getFollower().updateMatchIndex(nextIndex - 1);
getFollower().increaseNextIndex(nextIndex);
getLeaderState().onFollowerSuccessAppendEntries(getFollower());
}
break;
case NOT_LEADER:
// check if should step down
onFollowerTerm(reply.getTerm());
break;
case INCONSISTENCY:
getFollower().decreaseNextIndex(reply.getNextIndex());
break;
case UNRECOGNIZED:
LOG.warn("{}: received {}", this, reply.getResult());
break;
default: throw new IllegalArgumentException("Unable to process result " + reply.getResult());
}
}
}
private void handleException(Exception e) {
LOG.trace("TRACE", e);
getServerRpc().handleException(getFollowerId(), e, false);
}
}