RATIS-2026. LogAppender to consume log entries with reference count (#1049)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 5f9c94e..e23f282 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -378,30 +378,39 @@
}
private void appendLog(boolean heartbeat) throws IOException {
- final AppendEntriesRequestProto pending;
+ ReferenceCountedObject<AppendEntriesRequestProto> pending = null;
final AppendEntriesRequest request;
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
// Prepare and send the append request.
// Note changes on follower's nextIndex and ops on pendingRequests should always be done under the write-lock
- pending = newAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
+ pending = nextAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
if (pending == null) {
return;
}
- request = new AppendEntriesRequest(pending, getFollowerId(), grpcServerMetrics);
+ request = new AppendEntriesRequest(pending.get(), getFollowerId(), grpcServerMetrics);
pendingRequests.put(request);
- increaseNextIndex(pending);
+ increaseNextIndex(pending.get());
if (appendLogRequestObserver == null) {
appendLogRequestObserver = new StreamObservers(
getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, getWaitTimeMin());
}
+ } catch(Exception e) {
+ if (pending != null) {
+ pending.release();
+ }
+ throw e;
}
- final TimeDuration remaining = getRemainingWaitTime();
- if (remaining.isPositive()) {
- sleep(remaining, heartbeat);
- }
- if (isRunning()) {
- sendRequest(request, pending);
+ try {
+ final TimeDuration remaining = getRemainingWaitTime();
+ if (remaining.isPositive()) {
+ sleep(remaining, heartbeat);
+ }
+ if (isRunning()) {
+ sendRequest(request, pending.get());
+ }
+ } finally {
+ pending.release();
}
}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 36331e3..78f6130 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -125,7 +125,9 @@
* @param heartbeat the returned request must be a heartbeat.
*
* @return a new {@link AppendEntriesRequestProto} object.
+ * @deprecated this is no longer a public API.
*/
+ @Deprecated
AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) throws RaftLogIOException;
/** @return a new {@link InstallSnapshotRequestProto} object. */
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index e4fbd66..ca785a4 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -79,10 +79,24 @@
/**
* @return null if the log entry is not found in this log;
* otherwise, return the {@link EntryWithData} corresponding to the given index.
+ * @deprecated use {@link #retainEntryWithData(long)}.
*/
+ @Deprecated
EntryWithData getEntryWithData(long index) throws RaftLogIOException;
/**
+ * @return null if the log entry is not found in this log;
+ * otherwise, return a retained reference of the {@link EntryWithData} corresponding to the given index.
+ * Since the returned reference is retained, the caller must call {@link ReferenceCountedObject#release()}}
+ * after use.
+ */
+ default ReferenceCountedObject<EntryWithData> retainEntryWithData(long index) throws RaftLogIOException {
+ final ReferenceCountedObject<EntryWithData> wrap = ReferenceCountedObject.wrap(getEntryWithData(index));
+ wrap.retain();
+ return wrap;
+}
+
+ /**
* @param startIndex the starting log index (inclusive)
* @param endIndex the ending log index (exclusive)
* @return null if entries are unavailable in this log;
@@ -172,6 +186,15 @@
* containing both the log entry and the state machine data.
*/
interface EntryWithData {
+ /** @return the index of this entry. */
+ default long getIndex() {
+ try {
+ return getEntry(TimeDuration.ONE_MINUTE).getIndex();
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to getIndex", e);
+ }
+ }
+
/** @return the serialized size including both log entry and state machine data. */
int getSerializedSize();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 958cc6f..de22143 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -33,11 +33,14 @@
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -218,16 +221,35 @@
};
}
-
@Override
- public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat)
+ public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) {
+ throw new UnsupportedOperationException("Use nextAppendEntriesRequest(" + callId + ", " + heartbeat +") instead.");
+ }
+
+/**
+ * Create a {@link AppendEntriesRequestProto} object using the {@link FollowerInfo} of this {@link LogAppender}.
+ * The {@link AppendEntriesRequestProto} object may contain zero or more log entries.
+ * When there is zero log entries, the {@link AppendEntriesRequestProto} object is a heartbeat.
+ *
+ * @param callId The call id of the returned request.
+ * @param heartbeat the returned request must be a heartbeat.
+ *
+ * @return a retained reference of {@link AppendEntriesRequestProto} object.
+ * Since the returned reference is retained, the caller must call {@link ReferenceCountedObject#release()}}
+ * after use.
+ */
+ protected ReferenceCountedObject<AppendEntriesRequestProto> nextAppendEntriesRequest(long callId, boolean heartbeat)
throws RaftLogIOException {
final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs();
final TermIndex previous = getPrevious(follower.getNextIndex());
if (heartbeatWaitTimeMs <= 0L || heartbeat) {
// heartbeat
- return leaderState.newAppendEntriesRequestProto(follower, Collections.emptyList(),
- hasPendingDataRequests()? null : previous, callId);
+ AppendEntriesRequestProto heartbeatRequest =
+ leaderState.newAppendEntriesRequestProto(follower, Collections.emptyList(),
+ hasPendingDataRequests() ? null : previous, callId);
+ ReferenceCountedObject<AppendEntriesRequestProto> ref = ReferenceCountedObject.wrap(heartbeatRequest);
+ ref.retain();
+ return ref;
}
Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements.");
@@ -236,10 +258,14 @@
final long leaderNext = getRaftLog().getNextIndex();
final long followerNext = follower.getNextIndex();
final long halfMs = heartbeatWaitTimeMs/2;
- for (long next = followerNext; leaderNext > next && getHeartbeatWaitTimeMs() - halfMs > 0; ) {
- if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
+ final Map<Long, ReferenceCountedObject<EntryWithData>> offered = new HashMap<>();
+ for (long next = followerNext; leaderNext > next && getHeartbeatWaitTimeMs() - halfMs > 0; next++) {
+ final ReferenceCountedObject<EntryWithData> entryWithData = getRaftLog().retainEntryWithData(next);
+ if (!buffer.offer(entryWithData.get())) {
+ entryWithData.release();
break;
}
+ offered.put(next, entryWithData);
}
if (buffer.isEmpty()) {
return null;
@@ -248,9 +274,15 @@
final List<LogEntryProto> protos = buffer.pollList(getHeartbeatWaitTimeMs(), EntryWithData::getEntry,
(entry, time, exception) -> LOG.warn("Failed to get " + entry
+ " in " + time.toString(TimeUnit.MILLISECONDS, 3), exception));
+ for (EntryWithData entry : buffer) {
+ // Release remaining entries.
+ offered.remove(entry.getIndex()).release();
+ }
buffer.clear();
assertProtos(protos, followerNext, previous, snapshotIndex);
- return leaderState.newAppendEntriesRequestProto(follower, protos, previous, callId);
+ AppendEntriesRequestProto appendEntriesProto =
+ leaderState.newAppendEntriesRequestProto(follower, protos, previous, callId);
+ return ReferenceCountedObject.delegateFrom(offered.values(), appendEntriesProto);
}
private void assertProtos(List<LogEntryProto> protos, long nextIndex, TermIndex previous, long snapshotIndex) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 21ef70d..432a419 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -26,6 +26,7 @@
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.Timestamp;
import java.io.IOException;
@@ -58,11 +59,15 @@
throws InterruptedException, InterruptedIOException, RaftLogIOException {
int retry = 0;
- AppendEntriesRequestProto request = newAppendEntriesRequest(CallId.getAndIncrement(), false);
+ ReferenceCountedObject<AppendEntriesRequestProto> request = nextAppendEntriesRequest(
+ CallId.getAndIncrement(), false);
while (isRunning()) { // keep retrying for IOException
try {
- if (request == null || request.getEntriesCount() == 0) {
- request = newAppendEntriesRequest(CallId.getAndIncrement(), false);
+ if (request == null || request.get().getEntriesCount() == 0) {
+ if (request != null) {
+ request.release();
+ }
+ request = nextAppendEntriesRequest(CallId.getAndIncrement(), false);
}
if (request == null) {
@@ -73,14 +78,8 @@
return null;
}
- resetHeartbeatTrigger();
- final Timestamp sendTime = Timestamp.currentTime();
- getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0);
- final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
- getFollower().updateLastRpcResponseTime();
- getFollower().updateLastRespondedAppendEntriesSendTime(sendTime);
-
- getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit());
+ AppendEntriesReplyProto r = sendAppendEntries(request.get());
+ request.release();
return r;
} catch (InterruptedIOException | RaftLogIOException e) {
throw e;
@@ -98,6 +97,18 @@
return null;
}
+ private AppendEntriesReplyProto sendAppendEntries(AppendEntriesRequestProto request) throws IOException {
+ resetHeartbeatTrigger();
+ final Timestamp sendTime = Timestamp.currentTime();
+ getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0);
+ final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
+ getFollower().updateLastRpcResponseTime();
+ getFollower().updateLastRespondedAppendEntriesSendTime(sendTime);
+
+ getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit());
+ return r;
+ }
+
private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException {
String requestId = UUID.randomUUID().toString();
InstallSnapshotReplyProto reply = null;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 0a9a1c9..284776d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -428,23 +428,28 @@
private ByteString checkStateMachineData(ByteString data) {
if (data == null) {
- throw new IllegalStateException("State machine data is null for log entry " + logEntry);
+ throw new IllegalStateException("State machine data is null for log entry " + this);
}
return data;
}
@Override
+ public long getIndex() {
+ return logEntry.getIndex();
+ }
+
+ @Override
public int getSerializedSize() {
return LogProtoUtils.getSerializedSize(logEntry);
}
@Override
public LogEntryProto getEntry(TimeDuration timeout) throws RaftLogIOException, TimeoutException {
- LogEntryProto entryProto;
if (future == null) {
return logEntry;
}
+ final LogEntryProto entryProto;
try {
entryProto = future.thenApply(data -> LogProtoUtils.addStateMachineData(data, logEntry))
.get(timeout.getDuration(), timeout.getUnit());
@@ -457,14 +462,14 @@
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
- final String err = getName() + ": Failed readStateMachineData for " + toLogEntryString(logEntry);
+ final String err = getName() + ": Failed readStateMachineData for " + this;
LOG.error(err, e);
throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e));
}
// by this time we have already read the state machine data,
// so the log entry data should be set now
if (LogProtoUtils.isStateMachineDataEmpty(entryProto)) {
- final String err = getName() + ": State machine data not set for " + toLogEntryString(logEntry);
+ final String err = getName() + ": State machine data not set for " + this;
LOG.error(err);
throw new RaftLogIOException(err);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index feedaee..55036fa 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -135,14 +135,14 @@
}
@Override
- public EntryWithData getEntryWithData(long index) {
- // TODO. The reference counted object should be passed to LogAppender RATIS-2026.
- ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
- try {
- return newEntryWithData(ref.get(), null);
- } finally {
- ref.release();
- }
+ public EntryWithData getEntryWithData(long index) throws RaftLogIOException {
+ throw new UnsupportedOperationException("Use retainEntryWithData(" + index + ") instead.");
+ }
+
+ @Override
+ public ReferenceCountedObject<EntryWithData> retainEntryWithData(long index) {
+ final ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
+ return ref.delegate(newEntryWithData(ref.get(), null));
}
@Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index bb0793a..b7dd326 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -319,21 +319,19 @@
@Override
public EntryWithData getEntryWithData(long index) throws RaftLogIOException {
+ throw new UnsupportedOperationException("Use retainEntryWithData(" + index + ") instead.");
+ }
+
+ @Override
+ public ReferenceCountedObject<EntryWithData> retainEntryWithData(long index) throws RaftLogIOException {
final ReferenceCountedObject<LogEntryProto> entryRef = retainLog(index);
if (entryRef == null) {
throw new RaftLogIOException("Log entry not found: index = " + index);
}
- try {
- // TODO. The reference counted object should be passed to LogAppender RATIS-2026.
- return getEntryWithData(entryRef.get());
- } finally {
- entryRef.release();
- }
- }
- private EntryWithData getEntryWithData(LogEntryProto entry) throws RaftLogIOException {
+ final LogEntryProto entry = entryRef.get();
if (!LogProtoUtils.isStateMachineDataEmpty(entry)) {
- return newEntryWithData(entry, null);
+ return entryRef.delegate(newEntryWithData(entry, null));
}
try {
@@ -344,7 +342,7 @@
throw new CompletionException("Failed to read state machine data for log entry " + entry, ex);
});
}
- return newEntryWithData(entry, future);
+ return entryRef.delegate(newEntryWithData(entry, future));
} catch (Exception e) {
final String err = getName() + ": Failed readStateMachineData for " +
LogProtoUtils.toLogEntryString(entry);