blob: f03e2d883d2ff1181c3ba797e3d6b5c758ff0bd3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.FollowerInfoProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import org.apache.ratis.proto.RaftProtos.ServerRpcProto;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.Timestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.ratis.server.impl.ServerProtoUtils.toInstallSnapshotReplyProto;
import static org.apache.ratis.server.impl.ServerProtoUtils.toServerRpcProto;
import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX;
class SnapshotInstallationHandler {
static final Logger LOG = LoggerFactory.getLogger(SnapshotInstallationHandler.class);
static final TermIndex INVALID_TERM_INDEX = TermIndex.valueOf(0, INVALID_LOG_INDEX);
private final RaftServerImpl server;
private final ServerState state;
private final boolean installSnapshotEnabled;
private final AtomicLong inProgressInstallSnapshotIndex = new AtomicLong(INVALID_LOG_INDEX);
private final AtomicReference<TermIndex> installedSnapshotTermIndex =
new AtomicReference<>(INVALID_TERM_INDEX);
private final AtomicBoolean isSnapshotNull = new AtomicBoolean();
private final AtomicLong installedIndex = new AtomicLong(INVALID_LOG_INDEX);
SnapshotInstallationHandler(RaftServerImpl server, RaftProperties properties) {
this.server = server;
this.state = server.getState();
this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
}
RaftGroupMemberId getMemberId() {
return state.getMemberId();
}
long getInstalledIndex() {
return installedIndex.getAndSet(INVALID_LOG_INDEX);
}
long getInProgressInstallSnapshotIndex() {
return inProgressInstallSnapshotIndex.get();
}
InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException {
if (LOG.isInfoEnabled()) {
LOG.info("{}: receive installSnapshot: {}", getMemberId(),
ServerStringUtils.toInstallSnapshotRequestString(request));
}
final InstallSnapshotReplyProto reply;
try {
reply = installSnapshotImpl(request);
} catch (Exception e) {
LOG.error("{}: installSnapshot failed", getMemberId(), e);
throw e;
}
if (LOG.isInfoEnabled()) {
LOG.info("{}: reply installSnapshot: {}", getMemberId(),
ServerStringUtils.toInstallSnapshotReplyString(reply));
}
return reply;
}
private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProto request) throws IOException {
final RaftRpcRequestProto r = request.getServerRequest();
final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
final RaftGroupId leaderGroupId = ProtoUtils.toRaftGroupId(r.getRaftGroupId());
CodeInjectionForTesting.execute(RaftServerImpl.INSTALL_SNAPSHOT, server.getId(), leaderId, request);
server.assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
ServerImplUtils.assertGroup(getMemberId(), leaderId, leaderGroupId);
InstallSnapshotReplyProto reply = null;
// Check if install snapshot from Leader is enabled
if (installSnapshotEnabled) {
// Leader has sent InstallSnapshot request with SnapshotInfo. Install the snapshot.
if (request.hasSnapshotChunk()) {
reply = checkAndInstallSnapshot(request, leaderId);
}
} else {
// Leader has only sent a notification to install snapshot. Inform State Machine to install snapshot.
if (request.hasNotification()) {
reply = notifyStateMachineToInstallSnapshot(request, leaderId);
}
}
if (reply != null) {
if (request.hasLastRaftConfigurationLogEntryProto()) {
// Set the configuration included in the snapshot
final LogEntryProto proto = request.getLastRaftConfigurationLogEntryProto();
if (!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(proto))) {
LOG.info("{}: set new configuration {} from snapshot", getMemberId(), proto);
state.setRaftConf(proto);
state.writeRaftConfiguration(proto);
server.getStateMachine().event().notifyConfigurationChanged(
proto.getTerm(), proto.getIndex(), proto.getConfigurationEntry());
}
}
return reply;
}
// There is a mismatch between configurations on leader and follower.
final InstallSnapshotReplyProto failedReply = toInstallSnapshotReplyProto(
leaderId, getMemberId(), state.getCurrentTerm(), InstallSnapshotResult.CONF_MISMATCH);
LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}",
getMemberId(), RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY,
leaderId, request.hasSnapshotChunk(), server.getId(), installSnapshotEnabled);
return failedReply;
}
private InstallSnapshotReplyProto checkAndInstallSnapshot(InstallSnapshotRequestProto request,
RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final InstallSnapshotRequestProto.SnapshotChunkProto snapshotChunkRequest = request.getSnapshotChunk();
final TermIndex lastIncluded = TermIndex.valueOf(snapshotChunkRequest.getTermIndex());
final long lastIncludedIndex = lastIncluded.getIndex();
synchronized (server) {
final boolean recognized = state.recognizeLeader(RaftServerProtocol.Op.INSTALL_SNAPSHOT, leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.NOT_LEADER);
}
server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_START);
try {
// Check and append the snapshot chunk. We simply put this in lock
// considering a follower peer requiring a snapshot installation does not
// have a lot of requests
Preconditions.assertTrue(state.getLog().getLastCommittedIndex() < lastIncludedIndex,
"%s log's commit index is %s, last included index in snapshot is %s",
getMemberId(), state.getLog().getLastCommittedIndex(), lastIncludedIndex);
//TODO: We should only update State with installed snapshot once the request is done.
state.installSnapshot(request);
// update the committed index
// re-load the state machine if this is the last chunk
if (snapshotChunkRequest.getDone()) {
state.reloadStateMachine(lastIncluded);
}
} finally {
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_COMPLETE);
}
}
if (snapshotChunkRequest.getDone()) {
LOG.info("{}: successfully install the entire snapshot-{}", getMemberId(), lastIncludedIndex);
}
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, snapshotChunkRequest.getRequestIndex(), InstallSnapshotResult.SUCCESS);
}
private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
InstallSnapshotRequestProto request, RaftPeerId leaderId) throws IOException {
final long currentTerm;
final long leaderTerm = request.getLeaderTerm();
final TermIndex firstAvailableLogTermIndex = TermIndex.valueOf(
request.getNotification().getFirstAvailableTermIndex());
final long firstAvailableLogIndex = firstAvailableLogTermIndex.getIndex();
synchronized (server) {
final boolean recognized = state.recognizeLeader("notifyInstallSnapshot", leaderId, leaderTerm);
currentTerm = state.getCurrentTerm();
if (!recognized) {
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.NOT_LEADER);
}
server.changeToFollowerAndPersistMetadata(leaderTerm, true, "installSnapshot");
state.setLeader(leaderId, "installSnapshot");
server.updateLastRpcTime(FollowerState.UpdateType.INSTALL_SNAPSHOT_NOTIFICATION);
if (inProgressInstallSnapshotIndex.compareAndSet(INVALID_LOG_INDEX, firstAvailableLogIndex)) {
LOG.info("{}: Received notification to install snapshot at index {}", getMemberId(), firstAvailableLogIndex);
// Check if snapshot index is already at par or ahead of the first
// available log index of the Leader.
final long snapshotIndex = state.getLog().getSnapshotIndex();
if (snapshotIndex != INVALID_LOG_INDEX && snapshotIndex + 1 >= firstAvailableLogIndex &&
firstAvailableLogIndex > INVALID_LOG_INDEX) {
// State Machine has already installed the snapshot. Return the
// latest snapshot index to the Leader.
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX);
LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(),
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
return toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
}
final RaftPeerProto leaderProto;
if (!request.hasLastRaftConfigurationLogEntryProto()) {
leaderProto = null;
} else {
leaderProto = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry().getPeersList()
.stream()
.filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId))
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Leader " + leaderId
+ " not found from the last configuration LogEntryProto, request = " + request));
}
// For the cases where RaftConf is empty on newly started peer with empty peer list,
// we retrieve leader info from installSnapShotRequestProto.
final RoleInfoProto proto = leaderProto == null || server.getRaftConf().getPeer(state.getLeaderId()) != null?
server.getRoleInfoProto(): getRoleInfoProto(ProtoUtils.toRaftPeer(leaderProto));
// This is the first installSnapshot notify request for this term and
// index. Notify the state machine to install the snapshot.
LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.",
getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex);
// If notifyInstallSnapshotFromLeader future is done asynchronously, the main thread will go through the
// downside part. As the time consumed by user-defined statemachine is uncontrollable(e.g. the RocksDB
// checkpoint could be constantly increasing, the transmission will always exceed one boundary), we expect that
// once snapshot installed, follower could work ASAP. For the rest of time, server can respond snapshot
// installation in progress.
// There is another appendLog thread appending raft entries, which returns inconsistency entries with
// nextIndex and commitIndex to the leader when install snapshot in progress. The nextIndex on follower side
// is updated when state.reloadStateMachine. We shall keep this index upgraded synchronously with main thread,
// otherwise leader could get this follower's latest nextIndex from appendEntries instead of after
// acknowledging the SNAPSHOT_INSTALLED.
server.getStateMachine().followerEvent().notifyInstallSnapshotFromLeader(proto, firstAvailableLogTermIndex)
.whenComplete((reply, exception) -> {
if (exception != null) {
LOG.error("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}",
getMemberId(), exception.getMessage());
inProgressInstallSnapshotIndex.compareAndSet(firstAvailableLogIndex, INVALID_LOG_INDEX);
return;
}
if (reply != null) {
LOG.info("{}: StateMachine successfully installed snapshot index {}. Reloading the StateMachine.",
getMemberId(), reply.getIndex());
installedSnapshotTermIndex.set(reply);
} else {
isSnapshotNull.set(true);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: StateMachine could not install snapshot as it is not available", this);
}
}
});
if (LOG.isDebugEnabled()) {
LOG.debug("{}: StateMachine is processing Snapshot Installation Request.", getMemberId());
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("{}: StateMachine is already installing a snapshot.", getMemberId());
}
}
final long inProgressInstallSnapshotIndexValue = getInProgressInstallSnapshotIndex();
Preconditions.assertTrue(inProgressInstallSnapshotIndexValue <= firstAvailableLogIndex
&& inProgressInstallSnapshotIndexValue > INVALID_LOG_INDEX,
"inProgressInstallSnapshotRequest: %s is not eligible, firstAvailableLogIndex: %s",
getInProgressInstallSnapshotIndex(), firstAvailableLogIndex);
// If the snapshot is null or unavailable, return SNAPSHOT_UNAVAILABLE.
if (isSnapshotNull.compareAndSet(true, false)) {
LOG.info("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer());
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
}
// If a snapshot has been installed, return SNAPSHOT_INSTALLED with the installed snapshot index and reset
// installedSnapshotIndex to (0,-1).
final TermIndex latestInstalledSnapshotTermIndex = this.installedSnapshotTermIndex
.getAndSet(INVALID_TERM_INDEX);
if (latestInstalledSnapshotTermIndex.getIndex() > INVALID_LOG_INDEX) {
server.getStateMachine().pause();
state.reloadStateMachine(latestInstalledSnapshotTermIndex);
LOG.info("{}: InstallSnapshot notification result: {}, at index: {}", getMemberId(),
InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex);
inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
final long latestInstalledIndex = latestInstalledSnapshotTermIndex.getIndex();
server.getStateMachine().event().notifySnapshotInstalled(
InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer());
installedIndex.set(latestInstalledIndex);
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex());
}
// Otherwise, Snapshot installation is in progress.
if (LOG.isDebugEnabled()) {
LOG.debug("{}: InstallSnapshot notification result: {}", getMemberId(),
InstallSnapshotResult.IN_PROGRESS);
}
return toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.IN_PROGRESS);
}
}
private RoleInfoProto getRoleInfoProto(RaftPeer leader) {
final RoleInfo role = server.getRole();
final Optional<FollowerState> fs = role.getFollowerState();
final ServerRpcProto leaderInfo = toServerRpcProto(leader,
fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L));
final FollowerInfoProto.Builder followerInfo = FollowerInfoProto.newBuilder()
.setLeaderInfo(leaderInfo)
.setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0));
return RoleInfoProto.newBuilder()
.setSelf(server.getPeer().getRaftPeerProto())
.setRole(role.getCurrentRole())
.setRoleElapsedTimeMs(role.getRoleElapsedTimeMs())
.setFollowerInfo(followerInfo)
.build();
}
}