blob: de221432bc5b1be48d466e3ee5ce2322279d0976 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.leader;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLog.EntryWithData;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.util.AwaitForSignal;
import org.apache.ratis.util.DataQueue;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongUnaryOperator;
/**
* An abstract implementation of {@link LogAppender}.
*/
public abstract class LogAppenderBase implements LogAppender {
private final String name;
private final RaftServer.Division server;
private final LeaderState leaderState;
private final FollowerInfo follower;
private final DataQueue<EntryWithData> buffer;
private final int snapshotChunkMaxSize;
private final LogAppenderDaemon daemon;
private final AwaitForSignal eventAwaitForSignal;
private final AtomicBoolean heartbeatTrigger = new AtomicBoolean();
private final TimeDuration waitTimeMin;
protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
this.follower = f;
this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass());
this.server = server;
this.leaderState = leaderState;
final RaftProperties properties = server.getRaftServer().getProperties();
this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
final SizeInBytes bufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
final int bufferElementLimit = RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
this.buffer = new DataQueue<>(this, bufferByteLimit, bufferElementLimit, EntryWithData::getSerializedSize);
this.daemon = new LogAppenderDaemon(this);
this.eventAwaitForSignal = new AwaitForSignal(name);
this.waitTimeMin = RaftServerConfigKeys.Log.Appender.waitTimeMin(properties);
}
@Override
public void triggerHeartbeat() {
if (heartbeatTrigger.compareAndSet(false, true)) {
notifyLogAppender();
}
}
protected void resetHeartbeatTrigger() {
heartbeatTrigger.set(false);
}
@Override
public boolean shouldSendAppendEntries() {
return heartbeatTrigger.get() || LogAppender.super.shouldSendAppendEntries();
}
@Override
public long getHeartbeatWaitTimeMs() {
return heartbeatTrigger.get() ? 0 :
LogAppender.super.getHeartbeatWaitTimeMs();
}
@Override
public AwaitForSignal getEventAwaitForSignal() {
return eventAwaitForSignal;
}
@Override
public final RaftServer.Division getServer() {
return server;
}
@Override
public String toString() {
return name;
}
@Override
public void start() {
daemon.tryToStart();
}
@Override
public boolean isRunning() {
return daemon.isWorking();
}
@Override
public CompletableFuture<LifeCycle.State> stopAsync() {
return daemon.tryToClose();
}
void restart() {
if (!server.getInfo().isAlive()) {
LOG.warn("Failed to restart {}: server {} is not alive", this, server.getMemberId());
return;
}
getLeaderState().restart(this);
}
protected TimeDuration getWaitTimeMin() {
return waitTimeMin;
}
protected TimeDuration getRemainingWaitTime() {
return waitTimeMin.add(getFollower().getLastRpcSendTime().elapsedTime().negate());
}
@Override
public final FollowerInfo getFollower() {
return follower;
}
@Override
public final LeaderState getLeaderState() {
return leaderState;
}
public boolean hasPendingDataRequests() {
return false;
}
private TermIndex getPrevious(long nextIndex) {
if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
return null;
}
final long previousIndex = nextIndex - 1;
final TermIndex previous = getRaftLog().getTermIndex(previousIndex);
if (previous != null) {
return previous;
}
final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
if (snapshot != null) {
final TermIndex snapshotTermIndex = snapshot.getTermIndex();
if (snapshotTermIndex.getIndex() == previousIndex) {
return snapshotTermIndex;
}
}
return null;
}
protected long getNextIndexForInconsistency(long requestFirstIndex, long replyNextIndex) {
long next = replyNextIndex;
final long i = getFollower().getMatchIndex() + 1;
if (i > next && i != requestFirstIndex) {
// Ideally, we should set nextIndex to a value greater than matchIndex.
// However, we must not resend the same first entry due to some special cases (e.g. the log is empty).
// Otherwise, the follower will reply INCONSISTENCY again.
next = i;
}
if (next == requestFirstIndex && next > RaftLog.LEAST_VALID_LOG_INDEX) {
// Avoid resending the same first entry.
next--;
}
return next;
}
protected LongUnaryOperator getNextIndexForError(long newNextIndex) {
return oldNextIndex -> {
final long m = getFollower().getMatchIndex() + 1;
final long n = oldNextIndex <= 0L ? oldNextIndex : Math.min(oldNextIndex - 1, newNextIndex);
if (m > n) {
if (m > newNextIndex) {
LOG.info("Set nextIndex to matchIndex + 1 (= " + m + ")");
}
return m;
} else if (oldNextIndex <= 0L) {
return oldNextIndex; // no change.
} else {
LOG.info("Decrease nextIndex to " + n);
return n;
}
};
}
@Override
public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) {
throw new UnsupportedOperationException("Use nextAppendEntriesRequest(" + callId + ", " + heartbeat +") instead.");
}
/**
* Create a {@link AppendEntriesRequestProto} object using the {@link FollowerInfo} of this {@link LogAppender}.
* The {@link AppendEntriesRequestProto} object may contain zero or more log entries.
* When there is zero log entries, the {@link AppendEntriesRequestProto} object is a heartbeat.
*
* @param callId The call id of the returned request.
* @param heartbeat the returned request must be a heartbeat.
*
* @return a retained reference of {@link AppendEntriesRequestProto} object.
* Since the returned reference is retained, the caller must call {@link ReferenceCountedObject#release()}}
* after use.
*/
protected ReferenceCountedObject<AppendEntriesRequestProto> nextAppendEntriesRequest(long callId, boolean heartbeat)
throws RaftLogIOException {
final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs();
final TermIndex previous = getPrevious(follower.getNextIndex());
if (heartbeatWaitTimeMs <= 0L || heartbeat) {
// heartbeat
AppendEntriesRequestProto heartbeatRequest =
leaderState.newAppendEntriesRequestProto(follower, Collections.emptyList(),
hasPendingDataRequests() ? null : previous, callId);
ReferenceCountedObject<AppendEntriesRequestProto> ref = ReferenceCountedObject.wrap(heartbeatRequest);
ref.retain();
return ref;
}
Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements.");
final long snapshotIndex = follower.getSnapshotIndex();
final long leaderNext = getRaftLog().getNextIndex();
final long followerNext = follower.getNextIndex();
final long halfMs = heartbeatWaitTimeMs/2;
final Map<Long, ReferenceCountedObject<EntryWithData>> offered = new HashMap<>();
for (long next = followerNext; leaderNext > next && getHeartbeatWaitTimeMs() - halfMs > 0; next++) {
final ReferenceCountedObject<EntryWithData> entryWithData = getRaftLog().retainEntryWithData(next);
if (!buffer.offer(entryWithData.get())) {
entryWithData.release();
break;
}
offered.put(next, entryWithData);
}
if (buffer.isEmpty()) {
return null;
}
final List<LogEntryProto> protos = buffer.pollList(getHeartbeatWaitTimeMs(), EntryWithData::getEntry,
(entry, time, exception) -> LOG.warn("Failed to get " + entry
+ " in " + time.toString(TimeUnit.MILLISECONDS, 3), exception));
for (EntryWithData entry : buffer) {
// Release remaining entries.
offered.remove(entry.getIndex()).release();
}
buffer.clear();
assertProtos(protos, followerNext, previous, snapshotIndex);
AppendEntriesRequestProto appendEntriesProto =
leaderState.newAppendEntriesRequestProto(follower, protos, previous, callId);
return ReferenceCountedObject.delegateFrom(offered.values(), appendEntriesProto);
}
private void assertProtos(List<LogEntryProto> protos, long nextIndex, TermIndex previous, long snapshotIndex) {
if (protos.isEmpty()) {
return;
}
final long firstIndex = protos.get(0).getIndex();
Preconditions.assertTrue(firstIndex == nextIndex,
() -> follower.getName() + ": firstIndex = " + firstIndex + " != nextIndex = " + nextIndex);
if (firstIndex > RaftLog.LEAST_VALID_LOG_INDEX) {
// Check if nextIndex is 1 greater than the snapshotIndex. If yes, then
// we do not have to check for the existence of previous.
if (nextIndex != snapshotIndex + 1) {
Objects.requireNonNull(previous,
() -> follower.getName() + ": Previous TermIndex not found for firstIndex = " + firstIndex);
Preconditions.assertTrue(previous.getIndex() == firstIndex - 1,
() -> follower.getName() + ": Previous = " + previous + " but firstIndex = " + firstIndex);
}
}
}
@Override
public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() >= 0);
synchronized (server) {
return LeaderProtoUtils.toInstallSnapshotRequestProto(server, getFollowerId(), firstAvailableLogTermIndex);
}
}
@Override
public Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot) {
return new InstallSnapshotRequests(server, getFollowerId(), requestId, snapshot, snapshotChunkMaxSize);
}
}