| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ratis.grpc.server; |
| |
| import org.apache.ratis.conf.RaftProperties; |
| import org.apache.ratis.grpc.GrpcConfigKeys; |
| import org.apache.ratis.grpc.GrpcUtil; |
| import org.apache.ratis.grpc.metrics.GrpcServerMetrics; |
| import org.apache.ratis.metrics.Timekeeper; |
| import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult; |
| import org.apache.ratis.protocol.RaftPeerId; |
| import org.apache.ratis.retry.MultipleLinearRandomRetry; |
| import org.apache.ratis.retry.RetryPolicy; |
| import org.apache.ratis.server.RaftServer; |
| import org.apache.ratis.server.RaftServerConfigKeys; |
| import org.apache.ratis.server.leader.FollowerInfo; |
| import org.apache.ratis.server.leader.LeaderState; |
| import org.apache.ratis.server.leader.LogAppenderBase; |
| import org.apache.ratis.server.protocol.TermIndex; |
| import org.apache.ratis.server.raftlog.RaftLog; |
| import org.apache.ratis.server.util.ServerStringUtils; |
| import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; |
| import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver; |
| import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; |
| import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto; |
| import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult; |
| import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto; |
| import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto; |
| import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto; |
| import org.apache.ratis.statemachine.SnapshotInfo; |
| import org.apache.ratis.util.*; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.util.*; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| /** |
| * A new log appender implementation using grpc bi-directional stream API. |
| */ |
| public class GrpcLogAppender extends LogAppenderBase { |
| public static final Logger LOG = LoggerFactory.getLogger(GrpcLogAppender.class); |
| |
| private enum BatchLogKey implements BatchLogger.Key { |
| RESET_CLIENT, |
| APPEND_LOG_RESPONSE_HANDLER_ON_ERROR |
| } |
| |
| private static final Comparator<Long> CALL_ID_COMPARATOR = (left, right) -> { |
| // calculate diff in order to take care the possibility of numerical overflow |
| final long diff = left - right; |
| return diff == 0? 0: diff > 0? 1: -1; |
| }; |
| |
| enum Event { |
| APPEND_ENTRIES_REPLY, |
| APPEND_ENTRIES_INCONSISTENCY_REPLY, |
| SNAPSHOT_REPLY, |
| COMPLETE, |
| TIMEOUT, |
| ERROR; |
| |
| boolean updateFirstReplyReceived(boolean firstReplyReceived) { |
| switch (this) { |
| case APPEND_ENTRIES_REPLY: |
| case APPEND_ENTRIES_INCONSISTENCY_REPLY: |
| case SNAPSHOT_REPLY: |
| case COMPLETE: |
| return true; |
| case ERROR: |
| return false; |
| case TIMEOUT: |
| return firstReplyReceived; |
| default: |
| throw new IllegalStateException("Unexpected event: " + this); |
| } |
| } |
| |
| boolean isError() { |
| switch (this) { |
| case APPEND_ENTRIES_INCONSISTENCY_REPLY: |
| case TIMEOUT: |
| case ERROR: |
| return true; |
| case APPEND_ENTRIES_REPLY: |
| case SNAPSHOT_REPLY: |
| case COMPLETE: |
| return false; |
| default: |
| throw new IllegalStateException("Unexpected event: " + this); |
| } |
| } |
| } |
| |
| static class ReplyState { |
| private boolean firstReplyReceived = false; |
| private int errorCount = 0; |
| |
| synchronized boolean isFirstReplyReceived() { |
| return firstReplyReceived; |
| } |
| |
| synchronized int getErrorCount() { |
| return errorCount; |
| } |
| |
| int process(AppendResult result) { |
| return process(result == AppendResult.INCONSISTENCY? Event.APPEND_ENTRIES_INCONSISTENCY_REPLY |
| : Event.APPEND_ENTRIES_REPLY); |
| } |
| |
| synchronized int process(Event event) { |
| firstReplyReceived = event.updateFirstReplyReceived(firstReplyReceived); |
| if (event.isError()) { |
| errorCount++; |
| } else { |
| errorCount = 0; |
| } |
| return errorCount; |
| } |
| } |
| |
| private final AtomicLong callId = new AtomicLong(); |
| |
| private final RequestMap pendingRequests = new RequestMap(); |
| private final int maxPendingRequestsNum; |
| private final boolean installSnapshotEnabled; |
| |
| private final TimeDuration requestTimeoutDuration; |
| private final TimeDuration installSnapshotStreamTimeout; |
| private final TimeDuration logMessageBatchDuration; |
| private final int maxOutstandingInstallSnapshots; |
| private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); |
| |
| private volatile StreamObservers appendLogRequestObserver; |
| private final boolean useSeparateHBChannel; |
| |
| private final GrpcServerMetrics grpcServerMetrics; |
| |
| private final AutoCloseableReadWriteLock lock; |
| private final StackTraceElement caller; |
| private final RetryPolicy errorRetryWaitPolicy; |
| private final ReplyState replyState = new ReplyState(); |
| |
| public GrpcLogAppender(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) { |
| super(server, leaderState, f); |
| |
| Preconditions.assertNotNull(getServerRpc(), "getServerRpc()"); |
| |
| final RaftProperties properties = server.getRaftServer().getProperties(); |
| this.maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(properties); |
| this.requestTimeoutDuration = RaftServerConfigKeys.Rpc.requestTimeout(properties); |
| this.maxOutstandingInstallSnapshots = GrpcConfigKeys.Server.installSnapshotRequestElementLimit(properties); |
| this.installSnapshotStreamTimeout = GrpcConfigKeys.Server.installSnapshotRequestTimeout(properties) |
| .multiply(maxOutstandingInstallSnapshots); |
| this.logMessageBatchDuration = GrpcConfigKeys.Server.logMessageBatchDuration(properties); |
| this.installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties); |
| this.useSeparateHBChannel = GrpcConfigKeys.Server.heartbeatChannel(properties); |
| |
| grpcServerMetrics = new GrpcServerMetrics(server.getMemberId().toString()); |
| grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(), pendingRequests::logRequestsSize); |
| |
| lock = new AutoCloseableReadWriteLock(this); |
| caller = LOG.isTraceEnabled()? JavaUtils.getCallerStackTraceElement(): null; |
| errorRetryWaitPolicy = MultipleLinearRandomRetry.parseCommaSeparated( |
| RaftServerConfigKeys.Log.Appender.retryPolicy(properties)); |
| } |
| |
| @Override |
| public GrpcService getServerRpc() { |
| return (GrpcService)super.getServerRpc(); |
| } |
| |
| private GrpcServerProtocolClient getClient() throws IOException { |
| return getServerRpc().getProxies().getProxy(getFollowerId()); |
| } |
| |
| private void resetClient(AppendEntriesRequest request, Event event) { |
| try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) { |
| getClient().resetConnectBackoff(); |
| if (appendLogRequestObserver != null) { |
| appendLogRequestObserver.stop(); |
| appendLogRequestObserver = null; |
| } |
| final int errorCount = replyState.process(event); |
| // clear the pending requests queue and reset the next index of follower |
| pendingRequests.clear(); |
| final FollowerInfo f = getFollower(); |
| final long nextIndex = 1 + Optional.ofNullable(request) |
| .map(AppendEntriesRequest::getPreviousLog) |
| .map(TermIndex::getIndex) |
| .orElseGet(f::getMatchIndex); |
| if (event.isError() && request == null) { |
| final long followerNextIndex = f.getNextIndex(); |
| BatchLogger.warn(BatchLogKey.RESET_CLIENT, f.getId() + "-" + followerNextIndex, suffix -> |
| LOG.warn("{}: Follower failed (request=null, errorCount={}); keep nextIndex ({}) unchanged and retry.{}", |
| this, errorCount, followerNextIndex, suffix), logMessageBatchDuration); |
| return; |
| } |
| if (request != null && request.isHeartbeat()) { |
| return; |
| } |
| getFollower().computeNextIndex(getNextIndexForError(nextIndex)); |
| } catch (IOException ie) { |
| LOG.warn(this + ": Failed to getClient for " + getFollowerId(), ie); |
| } |
| } |
| |
| private boolean isFollowerCommitBehindLastCommitIndex() { |
| return getRaftLog().getLastCommittedIndex() > getFollower().getCommitIndex(); |
| } |
| |
| private boolean installSnapshot() { |
| if (installSnapshotEnabled) { |
| final SnapshotInfo snapshot = shouldInstallSnapshot(); |
| if (snapshot != null) { |
| installSnapshot(snapshot); |
| return true; |
| } |
| } else { |
| // check installSnapshotNotification |
| final TermIndex firstAvailable = shouldNotifyToInstallSnapshot(); |
| if (firstAvailable != null) { |
| notifyInstallSnapshot(firstAvailable); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public void run() throws IOException { |
| for(; isRunning(); mayWait()) { |
| //HB period is expired OR we have messages OR follower is behind with commit index |
| if (shouldSendAppendEntries() || isFollowerCommitBehindLastCommitIndex()) { |
| final boolean installingSnapshot = installSnapshot(); |
| appendLog(installingSnapshot || haveTooManyPendingRequests()); |
| } |
| getLeaderState().checkHealth(getFollower()); |
| } |
| |
| Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObservers::onCompleted); |
| } |
| |
| public long getWaitTimeMs() { |
| if (haveTooManyPendingRequests()) { |
| return getHeartbeatWaitTimeMs(); // Should wait for a short time |
| } else if (shouldSendAppendEntries() && !isSlowFollower()) { |
| // For normal nodes, new entries should be sent ASAP |
| // however for slow followers (especially when the follower is down), |
| // keep sending without any wait time only ends up in high CPU load |
| return TimeDuration.max(getRemainingWaitTime(), TimeDuration.ZERO).toLong(TimeUnit.MILLISECONDS); |
| } |
| return getHeartbeatWaitTimeMs(); |
| } |
| |
| private boolean isSlowFollower() { |
| final TimeDuration elapsedTime = getFollower().getLastRpcResponseTime().elapsedTime(); |
| return elapsedTime.compareTo(getServer().properties().rpcSlownessTimeout()) > 0; |
| } |
| |
| private void mayWait() { |
| // use lastSend time instead of lastResponse time |
| try { |
| getEventAwaitForSignal().await(getWaitTimeMs() + errorWaitTimeMs(), |
| TimeUnit.MILLISECONDS); |
| } catch (InterruptedException ie) { |
| LOG.warn(this + ": Wait interrupted by " + ie); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| private long errorWaitTimeMs() { |
| return errorRetryWaitPolicy.handleAttemptFailure(replyState::getErrorCount) |
| .getSleepTime().toLong(TimeUnit.MILLISECONDS); |
| } |
| |
| @Override |
| public CompletableFuture<LifeCycle.State> stopAsync() { |
| grpcServerMetrics.unregister(); |
| return super.stopAsync(); |
| } |
| |
| @Override |
| public boolean shouldSendAppendEntries() { |
| return appendLogRequestObserver == null || super.shouldSendAppendEntries(); |
| } |
| |
| @Override |
| public boolean hasPendingDataRequests() { |
| return pendingRequests.logRequestsSize() > 0; |
| } |
| |
| /** @return true iff either (1) queue is full, or (2) queue is non-empty and not received first response. */ |
| private boolean haveTooManyPendingRequests() { |
| final int size = pendingRequests.logRequestsSize(); |
| if (size == 0) { |
| return false; |
| } else if (size >= maxPendingRequestsNum) { |
| return true; |
| } else { |
| // queue is non-empty and non-full |
| return !replyState.isFirstReplyReceived(); |
| } |
| } |
| |
| static class StreamObservers { |
| private final CallStreamObserver<AppendEntriesRequestProto> appendLog; |
| private final CallStreamObserver<AppendEntriesRequestProto> heartbeat; |
| private final TimeDuration waitForReady; |
| private volatile boolean running = true; |
| |
| StreamObservers(GrpcServerProtocolClient client, AppendLogResponseHandler handler, boolean separateHeartbeat, |
| TimeDuration waitTimeMin) { |
| this.appendLog = client.appendEntries(handler, false); |
| this.heartbeat = separateHeartbeat? client.appendEntries(handler, true): null; |
| this.waitForReady = waitTimeMin.isPositive()? waitTimeMin: TimeDuration.ONE_MILLISECOND; |
| } |
| |
| void onNext(AppendEntriesRequestProto proto) |
| throws InterruptedIOException { |
| CallStreamObserver<AppendEntriesRequestProto> stream; |
| boolean isHeartBeat = heartbeat != null && proto.getEntriesCount() == 0; |
| if (isHeartBeat) { |
| stream = heartbeat; |
| } else { |
| stream = appendLog; |
| } |
| // stall for stream to be ready. |
| while (!stream.isReady() && running) { |
| sleep(waitForReady, isHeartBeat); |
| } |
| stream.onNext(proto); |
| } |
| |
| void stop() { |
| running = false; |
| } |
| |
| void onCompleted() { |
| appendLog.onCompleted(); |
| Optional.ofNullable(heartbeat).ifPresent(StreamObserver::onCompleted); |
| } |
| } |
| |
| @Override |
| public long getCallId() { |
| return callId.get(); |
| } |
| |
| @Override |
| public Comparator<Long> getCallIdComparator() { |
| return CALL_ID_COMPARATOR; |
| } |
| |
| private void appendLog(boolean heartbeat) throws IOException { |
| final AppendEntriesRequestProto pending; |
| final AppendEntriesRequest request; |
| try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) { |
| // Prepare and send the append request. |
| // Note changes on follower's nextIndex and ops on pendingRequests should always be done under the write-lock |
| pending = newAppendEntriesRequest(callId.getAndIncrement(), heartbeat); |
| if (pending == null) { |
| return; |
| } |
| request = new AppendEntriesRequest(pending, getFollowerId(), grpcServerMetrics); |
| pendingRequests.put(request); |
| increaseNextIndex(pending); |
| if (appendLogRequestObserver == null) { |
| appendLogRequestObserver = new StreamObservers( |
| getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, getWaitTimeMin()); |
| } |
| } |
| |
| final TimeDuration remaining = getRemainingWaitTime(); |
| if (remaining.isPositive()) { |
| sleep(remaining, heartbeat); |
| } |
| if (isRunning()) { |
| sendRequest(request, pending); |
| } |
| } |
| |
| private static void sleep(TimeDuration waitTime, boolean heartbeat) |
| throws InterruptedIOException { |
| try { |
| waitTime.sleep(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw IOUtils.toInterruptedIOException( |
| "Interrupted appendLog, heartbeat? " + heartbeat, e); |
| } |
| } |
| |
| private void sendRequest(AppendEntriesRequest request, |
| AppendEntriesRequestProto proto) throws InterruptedIOException { |
| CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST, |
| getServer().getId(), null, proto); |
| resetHeartbeatTrigger(); |
| |
| StreamObservers observers = appendLogRequestObserver; |
| if (observers != null) { |
| request.startRequestTimer(); |
| observers.onNext(proto); |
| getFollower().updateLastRpcSendTime(request.isHeartbeat()); |
| scheduler.onTimeout(requestTimeoutDuration, |
| () -> timeoutAppendRequest(request.getCallId(), request.isHeartbeat()), |
| LOG, () -> "Timeout check failed for append entry request: " + request); |
| } |
| } |
| |
| private void timeoutAppendRequest(long cid, boolean heartbeat) { |
| final AppendEntriesRequest pending = pendingRequests.handleTimeout(cid, heartbeat); |
| if (pending != null) { |
| final int errorCount = replyState.process(Event.TIMEOUT); |
| LOG.warn("{}: Timed out {}appendEntries, errorCount={}, request={}", |
| this, heartbeat ? "HEARTBEAT " : "", errorCount, pending); |
| grpcServerMetrics.onRequestTimeout(getFollowerId().toString(), heartbeat); |
| pending.stopRequestTimer(); |
| } |
| } |
| |
| private void increaseNextIndex(AppendEntriesRequestProto request) { |
| final int count = request.getEntriesCount(); |
| if (count > 0) { |
| getFollower().increaseNextIndex(request.getEntries(count - 1).getIndex() + 1); |
| } |
| } |
| |
| private void increaseNextIndex(final long installedSnapshotIndex, Object reason) { |
| final long newNextIndex = installedSnapshotIndex + 1; |
| LOG.info("{}: updateNextIndex {} for {}", this, newNextIndex, reason); |
| getFollower().updateNextIndex(newNextIndex); |
| } |
| |
| /** |
| * StreamObserver for handling responses from the follower |
| */ |
| private class AppendLogResponseHandler implements StreamObserver<AppendEntriesReplyProto> { |
| private final String name = getFollower().getName() + "-" + JavaUtils.getClassSimpleName(getClass()); |
| |
| /** |
| * After receiving a appendEntries reply, do the following: |
| * 1. If the reply is success, update the follower's match index and submit |
| * an event to leaderState |
| * 2. If the reply is NOT_LEADER, step down |
| * 3. If the reply is INCONSISTENCY, increase/ decrease the follower's next |
| * index based on the response |
| */ |
| @Override |
| public void onNext(AppendEntriesReplyProto reply) { |
| AppendEntriesRequest request = pendingRequests.remove(reply); |
| if (request != null) { |
| request.stopRequestTimer(); // Update completion time |
| getFollower().updateLastRespondedAppendEntriesSendTime(request.getSendTime()); |
| } |
| getFollower().updateLastRpcResponseTime(); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("{}: received {} reply {}, request={}", |
| this, replyState.isFirstReplyReceived()? "a": "the first", |
| ServerStringUtils.toAppendEntriesReplyString(reply), request); |
| } |
| |
| try { |
| onNextImpl(request, reply); |
| } catch(Exception t) { |
| LOG.error("Failed onNext request=" + request |
| + ", reply=" + ServerStringUtils.toAppendEntriesReplyString(reply), t); |
| } |
| } |
| |
| private void onNextImpl(AppendEntriesRequest request, AppendEntriesReplyProto reply) { |
| final int errorCount = replyState.process(reply.getResult()); |
| |
| switch (reply.getResult()) { |
| case SUCCESS: |
| grpcServerMetrics.onRequestSuccess(getFollowerId().toString(), reply.getIsHearbeat()); |
| getLeaderState().onFollowerCommitIndex(getFollower(), reply.getFollowerCommit()); |
| if (getFollower().updateMatchIndex(reply.getMatchIndex())) { |
| getFollower().updateNextIndex(reply.getMatchIndex() + 1); |
| getLeaderState().onFollowerSuccessAppendEntries(getFollower()); |
| } |
| break; |
| case NOT_LEADER: |
| grpcServerMetrics.onRequestNotLeader(getFollowerId().toString()); |
| LOG.warn("{}: received {} reply with term {}", this, reply.getResult(), reply.getTerm()); |
| if (onFollowerTerm(reply.getTerm())) { |
| return; |
| } |
| break; |
| case INCONSISTENCY: |
| grpcServerMetrics.onRequestInconsistency(getFollowerId().toString()); |
| LOG.warn("{}: received {} reply with nextIndex {}, errorCount={}, request={}", |
| this, reply.getResult(), reply.getNextIndex(), errorCount, request); |
| final long requestFirstIndex = request != null? request.getFirstIndex(): RaftLog.INVALID_LOG_INDEX; |
| updateNextIndex(getNextIndexForInconsistency(requestFirstIndex, reply.getNextIndex())); |
| break; |
| default: |
| throw new IllegalStateException("Unexpected reply result: " + reply.getResult()); |
| } |
| getLeaderState().onAppendEntriesReply(GrpcLogAppender.this, reply); |
| notifyLogAppender(); |
| } |
| |
| /** |
| * for now we simply retry the first pending request |
| */ |
| @Override |
| public void onError(Throwable t) { |
| if (!isRunning()) { |
| LOG.info("{} is already stopped", GrpcLogAppender.this); |
| return; |
| } |
| BatchLogger.warn(BatchLogKey.APPEND_LOG_RESPONSE_HANDLER_ON_ERROR, AppendLogResponseHandler.this.name, |
| suffix -> GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries" + suffix, t), |
| logMessageBatchDuration, t instanceof StatusRuntimeException); |
| grpcServerMetrics.onRequestRetry(); // Update try counter |
| AppendEntriesRequest request = pendingRequests.remove(GrpcUtil.getCallId(t), GrpcUtil.isHeartbeat(t)); |
| resetClient(request, Event.ERROR); |
| } |
| |
| @Override |
| public void onCompleted() { |
| LOG.info("{}: follower responses appendEntries COMPLETED", this); |
| resetClient(null, Event.COMPLETE); |
| } |
| |
| @Override |
| public String toString() { |
| return name; |
| } |
| } |
| |
| private void updateNextIndex(long replyNextIndex) { |
| try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) { |
| pendingRequests.clear(); |
| getFollower().setNextIndex(replyNextIndex); |
| } |
| } |
| |
| private class InstallSnapshotResponseHandler implements StreamObserver<InstallSnapshotReplyProto> { |
| private final String name = getFollower().getName() + "-" + JavaUtils.getClassSimpleName(getClass()); |
| private final Queue<Integer> pending; |
| private final AtomicBoolean done = new AtomicBoolean(false); |
| private final boolean isNotificationOnly; |
| |
| InstallSnapshotResponseHandler() { |
| this(false); |
| } |
| |
| InstallSnapshotResponseHandler(boolean notifyOnly) { |
| pending = new LinkedList<>(); |
| this.isNotificationOnly = notifyOnly; |
| } |
| |
| void addPending(InstallSnapshotRequestProto request) { |
| try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) { |
| pending.offer(request.getSnapshotChunk().getRequestIndex()); |
| } |
| } |
| |
| void removePending(InstallSnapshotReplyProto reply) { |
| try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) { |
| final Integer index = pending.poll(); |
| Objects.requireNonNull(index, "index == null"); |
| Preconditions.assertTrue(index == reply.getRequestIndex()); |
| } |
| } |
| |
| //compare follower's latest installed snapshot index with leader's start index |
| void onFollowerCatchup(long followerSnapshotIndex) { |
| final long leaderStartIndex = getRaftLog().getStartIndex(); |
| final long followerNextIndex = followerSnapshotIndex + 1; |
| if (followerNextIndex >= leaderStartIndex) { |
| LOG.info("{}: Follower can catch up leader after install the snapshot, as leader's start index is {}", |
| this, followerNextIndex); |
| notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS, followerSnapshotIndex); |
| } |
| } |
| |
| void notifyInstallSnapshotFinished(InstallSnapshotResult result, long snapshotIndex) { |
| getServer().getStateMachine().event().notifySnapshotInstalled(result, snapshotIndex, getFollower().getPeer()); |
| } |
| |
| boolean isDone() { |
| return done.get(); |
| } |
| |
| void close() { |
| done.set(true); |
| notifyLogAppender(); |
| } |
| |
| boolean hasAllResponse() { |
| try (AutoCloseableLock readLock = lock.readLock(caller, LOG::trace)) { |
| return pending.isEmpty(); |
| } |
| } |
| |
| @Override |
| public void onNext(InstallSnapshotReplyProto reply) { |
| if (LOG.isInfoEnabled()) { |
| LOG.info("{}: received {} reply {}", this, replyState.isFirstReplyReceived()? "a" : "the first", |
| ServerStringUtils.toInstallSnapshotReplyString(reply)); |
| } |
| |
| // update the last rpc time |
| getFollower().updateLastRpcResponseTime(); |
| replyState.process(Event.SNAPSHOT_REPLY); |
| |
| final long followerSnapshotIndex; |
| switch (reply.getResult()) { |
| case SUCCESS: |
| LOG.info("{}: Completed InstallSnapshot. Reply: {}", this, reply); |
| getFollower().setAttemptedToInstallSnapshot(); |
| removePending(reply); |
| break; |
| case IN_PROGRESS: |
| LOG.info("{}: InstallSnapshot in progress.", this); |
| removePending(reply); |
| break; |
| case ALREADY_INSTALLED: |
| followerSnapshotIndex = reply.getSnapshotIndex(); |
| LOG.info("{}: Follower snapshot is already at index {}.", this, followerSnapshotIndex); |
| getFollower().setSnapshotIndex(followerSnapshotIndex); |
| getFollower().setAttemptedToInstallSnapshot(); |
| getLeaderState().onFollowerCommitIndex(getFollower(), followerSnapshotIndex); |
| increaseNextIndex(followerSnapshotIndex, reply.getResult()); |
| removePending(reply); |
| break; |
| case NOT_LEADER: |
| onFollowerTerm(reply.getTerm()); |
| break; |
| case CONF_MISMATCH: |
| LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}", |
| this, RaftServerConfigKeys.Log.Appender.INSTALL_SNAPSHOT_ENABLED_KEY, |
| getServer().getId(), installSnapshotEnabled, getFollowerId(), !installSnapshotEnabled); |
| break; |
| case SNAPSHOT_INSTALLED: |
| followerSnapshotIndex = reply.getSnapshotIndex(); |
| LOG.info("{}: Follower installed snapshot at index {}", this, followerSnapshotIndex); |
| getFollower().setSnapshotIndex(followerSnapshotIndex); |
| getFollower().setAttemptedToInstallSnapshot(); |
| getLeaderState().onFollowerCommitIndex(getFollower(), followerSnapshotIndex); |
| increaseNextIndex(followerSnapshotIndex, reply.getResult()); |
| onFollowerCatchup(followerSnapshotIndex); |
| removePending(reply); |
| break; |
| case SNAPSHOT_UNAVAILABLE: |
| LOG.info("{}: Follower could not install snapshot as it is not available.", this); |
| getFollower().setAttemptedToInstallSnapshot(); |
| notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, RaftLog.INVALID_LOG_INDEX); |
| removePending(reply); |
| break; |
| case UNRECOGNIZED: |
| LOG.error("Unrecognized the reply result {}: Leader is {}, follower is {}", |
| reply.getResult(), getServer().getId(), getFollowerId()); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| @Override |
| public void onError(Throwable t) { |
| if (!isRunning()) { |
| LOG.info("{} is stopped", GrpcLogAppender.this); |
| return; |
| } |
| GrpcUtil.warn(LOG, () -> this + ": Failed InstallSnapshot", t); |
| grpcServerMetrics.onRequestRetry(); // Update try counter |
| resetClient(null, Event.ERROR); |
| close(); |
| } |
| |
| @Override |
| public void onCompleted() { |
| if (!isNotificationOnly || LOG.isDebugEnabled()) { |
| LOG.info("{}: follower responded installSnapshot COMPLETED", this); |
| } |
| replyState.process(Event.COMPLETE); |
| close(); |
| } |
| |
| @Override |
| public String toString() { |
| return name; |
| } |
| } |
| |
| /** |
| * Send installSnapshot request to Follower with a snapshot. |
| * @param snapshot the snapshot to be sent to Follower |
| */ |
| private void installSnapshot(SnapshotInfo snapshot) { |
| LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower", |
| this, getFollower().getNextIndex(), getRaftLog().getStartIndex(), snapshot); |
| |
| final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(); |
| StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null; |
| final String requestId = UUID.randomUUID().toString(); |
| try { |
| snapshotRequestObserver = getClient().installSnapshot( |
| getFollower().getName() + "-installSnapshot-" + requestId, |
| installSnapshotStreamTimeout, maxOutstandingInstallSnapshots, responseHandler); |
| for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) { |
| if (isRunning()) { |
| snapshotRequestObserver.onNext(request); |
| getFollower().updateLastRpcSendTime(false); |
| responseHandler.addPending(request); |
| } else { |
| break; |
| } |
| } |
| snapshotRequestObserver.onCompleted(); |
| grpcServerMetrics.onInstallSnapshot(); |
| } catch (Exception e) { |
| LOG.warn(this + ": failed to installSnapshot " + snapshot, e); |
| if (snapshotRequestObserver != null) { |
| snapshotRequestObserver.onError(e); |
| } |
| return; |
| } |
| |
| while (isRunning() && !responseHandler.isDone()) { |
| try { |
| getEventAwaitForSignal().await(); |
| } catch (InterruptedException ignored) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| if (responseHandler.hasAllResponse()) { |
| getFollower().setSnapshotIndex(snapshot.getTermIndex().getIndex()); |
| LOG.info("{}: installed snapshot {} successfully", this, snapshot); |
| } |
| } |
| |
| /** |
| * Send an installSnapshot notification request to the Follower. |
| * @param firstAvailable the first available log's index on the Leader |
| */ |
| private void notifyInstallSnapshot(TermIndex firstAvailable) { |
| LOG.info("{}: notifyInstallSnapshot with firstAvailable={}, followerNextIndex={}", |
| this, firstAvailable, getFollower().getNextIndex()); |
| |
| final InstallSnapshotResponseHandler responseHandler = new InstallSnapshotResponseHandler(true); |
| StreamObserver<InstallSnapshotRequestProto> snapshotRequestObserver = null; |
| // prepare and enqueue the notify install snapshot request. |
| final InstallSnapshotRequestProto request = newInstallSnapshotNotificationRequest(firstAvailable); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("{}: send {}", this, ServerStringUtils.toInstallSnapshotRequestString(request)); |
| } |
| try { |
| snapshotRequestObserver = getClient().installSnapshot(getFollower().getName() + "-notifyInstallSnapshot", |
| requestTimeoutDuration, 0, responseHandler); |
| |
| snapshotRequestObserver.onNext(request); |
| getFollower().updateLastRpcSendTime(false); |
| responseHandler.addPending(request); |
| snapshotRequestObserver.onCompleted(); |
| } catch (Exception e) { |
| GrpcUtil.warn(LOG, () -> this + ": Failed to notify follower to install snapshot.", e); |
| if (snapshotRequestObserver != null) { |
| snapshotRequestObserver.onError(e); |
| } |
| return; |
| } |
| |
| while (isRunning() && !responseHandler.isDone()) { |
| try { |
| getEventAwaitForSignal().await(); |
| } catch (InterruptedException ignored) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| /** |
| * Should the Leader notify the Follower to install the snapshot through |
| * its own State Machine. |
| * @return the first available log's start term index |
| */ |
| private TermIndex shouldNotifyToInstallSnapshot() { |
| final FollowerInfo follower = getFollower(); |
| final long leaderNextIndex = getRaftLog().getNextIndex(); |
| final boolean isFollowerBootstrapping = getLeaderState().isFollowerBootstrapping(follower); |
| final long leaderStartIndex = getRaftLog().getStartIndex(); |
| final TermIndex firstAvailable = Optional.ofNullable(getRaftLog().getTermIndex(leaderStartIndex)) |
| .orElseGet(() -> TermIndex.valueOf(getServer().getInfo().getCurrentTerm(), leaderNextIndex)); |
| if (isFollowerBootstrapping && !follower.hasAttemptedToInstallSnapshot()) { |
| // If the follower is bootstrapping and has not yet installed any snapshot from leader, then the follower should |
| // be notified to install a snapshot. Every follower should try to install at least one snapshot during |
| // bootstrapping, if available. |
| LOG.debug("{}: follower is bootstrapping, notify to install snapshot to {}.", this, firstAvailable); |
| return firstAvailable; |
| } |
| |
| final long followerNextIndex = follower.getNextIndex(); |
| if (followerNextIndex >= leaderNextIndex) { |
| return null; |
| } |
| |
| if (followerNextIndex < leaderStartIndex) { |
| // The Leader does not have the logs from the Follower's last log |
| // index onwards. And install snapshot is disabled. So the Follower |
| // should be notified to install the latest snapshot through its |
| // State Machine. |
| return firstAvailable; |
| } else if (leaderStartIndex == RaftLog.INVALID_LOG_INDEX) { |
| // Leader has no logs to check from, hence return next index. |
| return firstAvailable; |
| } |
| |
| return null; |
| } |
| |
| static class AppendEntriesRequest { |
| private final Timekeeper timer; |
| private volatile Timekeeper.Context timerContext; |
| |
| private final long callId; |
| private final TermIndex previousLog; |
| private final int entriesCount; |
| |
| private final TermIndex firstEntry; |
| private final TermIndex lastEntry; |
| |
| private volatile Timestamp sendTime; |
| |
| AppendEntriesRequest(AppendEntriesRequestProto proto, RaftPeerId followerId, GrpcServerMetrics grpcServerMetrics) { |
| this.callId = proto.getServerRequest().getCallId(); |
| this.previousLog = proto.hasPreviousLog()? TermIndex.valueOf(proto.getPreviousLog()): null; |
| this.entriesCount = proto.getEntriesCount(); |
| this.firstEntry = entriesCount > 0? TermIndex.valueOf(proto.getEntries(0)): null; |
| this.lastEntry = entriesCount > 0? TermIndex.valueOf(proto.getEntries(entriesCount - 1)): null; |
| |
| this.timer = grpcServerMetrics.getGrpcLogAppenderLatencyTimer(followerId.toString(), isHeartbeat()); |
| grpcServerMetrics.onRequestCreate(isHeartbeat()); |
| } |
| |
| long getCallId() { |
| return callId; |
| } |
| |
| TermIndex getPreviousLog() { |
| return previousLog; |
| } |
| |
| long getFirstIndex() { |
| return Optional.ofNullable(firstEntry).map(TermIndex::getIndex).orElse(RaftLog.INVALID_LOG_INDEX); |
| } |
| |
| Timestamp getSendTime() { |
| return sendTime; |
| } |
| |
| void startRequestTimer() { |
| timerContext = timer.time(); |
| sendTime = Timestamp.currentTime(); |
| } |
| |
| void stopRequestTimer() { |
| timerContext.stop(); |
| } |
| |
| boolean isHeartbeat() { |
| return entriesCount == 0; |
| } |
| |
| @Override |
| public String toString() { |
| final String entries = entriesCount == 0? "" |
| : entriesCount == 1? ",entry=" + firstEntry |
| : ",entries=" + firstEntry + "..." + lastEntry; |
| return JavaUtils.getClassSimpleName(getClass()) |
| + ":cid=" + callId |
| + ",entriesCount=" + entriesCount |
| + entries; |
| } |
| } |
| |
| static class RequestMap { |
| private final Map<Long, AppendEntriesRequest> logRequests = new ConcurrentHashMap<>(); |
| private final Map<Long, AppendEntriesRequest> heartbeats = new ConcurrentHashMap<>(); |
| |
| int logRequestsSize() { |
| return logRequests.size(); |
| } |
| |
| void clear() { |
| logRequests.clear(); |
| heartbeats.clear(); |
| } |
| |
| void put(AppendEntriesRequest request) { |
| if (request.isHeartbeat()) { |
| heartbeats.put(request.getCallId(), request); |
| } else { |
| logRequests.put(request.getCallId(), request); |
| } |
| } |
| |
| AppendEntriesRequest remove(AppendEntriesReplyProto reply) { |
| return remove(reply.getServerReply().getCallId(), reply.getIsHearbeat()); |
| } |
| |
| AppendEntriesRequest remove(long cid, boolean isHeartbeat) { |
| return isHeartbeat ? heartbeats.remove(cid): logRequests.remove(cid); |
| } |
| |
| public AppendEntriesRequest handleTimeout(long callId, boolean heartbeat) { |
| return heartbeat ? heartbeats.remove(callId) : logRequests.get(callId); |
| } |
| } |
| } |