RATIS-2026. LogAppender to consume log entries with reference count (#1049)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 5f9c94e..e23f282 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -378,30 +378,39 @@
   }
 
   private void appendLog(boolean heartbeat) throws IOException {
-    final AppendEntriesRequestProto pending;
+    ReferenceCountedObject<AppendEntriesRequestProto> pending = null;
     final AppendEntriesRequest request;
     try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
       // Prepare and send the append request.
       // Note changes on follower's nextIndex and ops on pendingRequests should always be done under the write-lock
-      pending = newAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
+      pending = nextAppendEntriesRequest(callId.getAndIncrement(), heartbeat);
       if (pending == null) {
         return;
       }
-      request = new AppendEntriesRequest(pending, getFollowerId(), grpcServerMetrics);
+      request = new AppendEntriesRequest(pending.get(), getFollowerId(), grpcServerMetrics);
       pendingRequests.put(request);
-      increaseNextIndex(pending);
+      increaseNextIndex(pending.get());
       if (appendLogRequestObserver == null) {
         appendLogRequestObserver = new StreamObservers(
             getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, getWaitTimeMin());
       }
+    } catch(Exception e) {
+      if (pending != null) {
+        pending.release();
+      }
+      throw e;
     }
 
-    final TimeDuration remaining = getRemainingWaitTime();
-    if (remaining.isPositive()) {
-      sleep(remaining, heartbeat);
-    }
-    if (isRunning()) {
-      sendRequest(request, pending);
+    try {
+      final TimeDuration remaining = getRemainingWaitTime();
+      if (remaining.isPositive()) {
+        sleep(remaining, heartbeat);
+      }
+      if (isRunning()) {
+        sendRequest(request, pending.get());
+      }
+    } finally {
+      pending.release();
     }
   }
 
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 36331e3..78f6130 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -125,7 +125,9 @@
    * @param heartbeat the returned request must be a heartbeat.
    *
    * @return a new {@link AppendEntriesRequestProto} object.
+   * @deprecated this is no longer a public API.
    */
+  @Deprecated
   AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) throws RaftLogIOException;
 
   /** @return a new {@link InstallSnapshotRequestProto} object. */
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index e4fbd66..ca785a4 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -79,10 +79,24 @@
   /**
    * @return null if the log entry is not found in this log;
    *         otherwise, return the {@link EntryWithData} corresponding to the given index.
+   * @deprecated use {@link #retainEntryWithData(long)}.
    */
+  @Deprecated
   EntryWithData getEntryWithData(long index) throws RaftLogIOException;
 
   /**
+   * @return null if the log entry is not found in this log;
+   *         otherwise, return a retained reference of the {@link EntryWithData} corresponding to the given index.
+   *         Since the returned reference is retained, the caller must call {@link ReferenceCountedObject#release()}}
+   *         after use.
+   */
+  default ReferenceCountedObject<EntryWithData> retainEntryWithData(long index) throws RaftLogIOException {
+    final ReferenceCountedObject<EntryWithData> wrap = ReferenceCountedObject.wrap(getEntryWithData(index));
+    wrap.retain();
+    return wrap;
+}
+
+  /**
    * @param startIndex the starting log index (inclusive)
    * @param endIndex the ending log index (exclusive)
    * @return null if entries are unavailable in this log;
@@ -172,6 +186,15 @@
    * containing both the log entry and the state machine data.
    */
   interface EntryWithData {
+    /** @return the index of this entry. */
+    default long getIndex() {
+      try {
+        return getEntry(TimeDuration.ONE_MINUTE).getIndex();
+      } catch (Exception e) {
+        throw new IllegalStateException("Failed to getIndex", e);
+      }
+    }
+
     /** @return the serialized size including both log entry and state machine data. */
     int getSerializedSize();
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 958cc6f..de22143 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -33,11 +33,14 @@
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -218,16 +221,35 @@
     };
   }
 
-
   @Override
-  public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat)
+  public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) {
+    throw new UnsupportedOperationException("Use nextAppendEntriesRequest(" + callId + ", " + heartbeat +") instead.");
+  }
+
+/**
+ * Create a {@link AppendEntriesRequestProto} object using the {@link FollowerInfo} of this {@link LogAppender}.
+ * The {@link AppendEntriesRequestProto} object may contain zero or more log entries.
+ * When there is zero log entries, the {@link AppendEntriesRequestProto} object is a heartbeat.
+ *
+ * @param callId The call id of the returned request.
+ * @param heartbeat the returned request must be a heartbeat.
+ *
+ * @return a retained reference of {@link AppendEntriesRequestProto} object.
+ *         Since the returned reference is retained, the caller must call {@link ReferenceCountedObject#release()}}
+ *         after use.
+ */
+  protected ReferenceCountedObject<AppendEntriesRequestProto> nextAppendEntriesRequest(long callId, boolean heartbeat)
       throws RaftLogIOException {
     final long heartbeatWaitTimeMs = getHeartbeatWaitTimeMs();
     final TermIndex previous = getPrevious(follower.getNextIndex());
     if (heartbeatWaitTimeMs <= 0L || heartbeat) {
       // heartbeat
-      return leaderState.newAppendEntriesRequestProto(follower, Collections.emptyList(),
-          hasPendingDataRequests()? null : previous, callId);
+      AppendEntriesRequestProto heartbeatRequest =
+          leaderState.newAppendEntriesRequestProto(follower, Collections.emptyList(),
+              hasPendingDataRequests() ? null : previous, callId);
+      ReferenceCountedObject<AppendEntriesRequestProto> ref = ReferenceCountedObject.wrap(heartbeatRequest);
+      ref.retain();
+      return ref;
     }
 
     Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements.");
@@ -236,10 +258,14 @@
     final long leaderNext = getRaftLog().getNextIndex();
     final long followerNext = follower.getNextIndex();
     final long halfMs = heartbeatWaitTimeMs/2;
-    for (long next = followerNext; leaderNext > next && getHeartbeatWaitTimeMs() - halfMs > 0; ) {
-      if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
+    final Map<Long, ReferenceCountedObject<EntryWithData>> offered = new HashMap<>();
+    for (long next = followerNext; leaderNext > next && getHeartbeatWaitTimeMs() - halfMs > 0; next++) {
+      final ReferenceCountedObject<EntryWithData> entryWithData = getRaftLog().retainEntryWithData(next);
+      if (!buffer.offer(entryWithData.get())) {
+        entryWithData.release();
         break;
       }
+      offered.put(next, entryWithData);
     }
     if (buffer.isEmpty()) {
       return null;
@@ -248,9 +274,15 @@
     final List<LogEntryProto> protos = buffer.pollList(getHeartbeatWaitTimeMs(), EntryWithData::getEntry,
         (entry, time, exception) -> LOG.warn("Failed to get " + entry
             + " in " + time.toString(TimeUnit.MILLISECONDS, 3), exception));
+    for (EntryWithData entry : buffer) {
+      // Release remaining entries.
+      offered.remove(entry.getIndex()).release();
+    }
     buffer.clear();
     assertProtos(protos, followerNext, previous, snapshotIndex);
-    return leaderState.newAppendEntriesRequestProto(follower, protos, previous, callId);
+    AppendEntriesRequestProto appendEntriesProto =
+        leaderState.newAppendEntriesRequestProto(follower, protos, previous, callId);
+    return ReferenceCountedObject.delegateFrom(offered.values(), appendEntriesProto);
   }
 
   private void assertProtos(List<LogEntryProto> protos, long nextIndex, TermIndex previous, long snapshotIndex) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 21ef70d..432a419 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -26,6 +26,7 @@
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.util.ServerStringUtils;
 import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.util.ReferenceCountedObject;
 import org.apache.ratis.util.Timestamp;
 
 import java.io.IOException;
@@ -58,11 +59,15 @@
       throws InterruptedException, InterruptedIOException, RaftLogIOException {
     int retry = 0;
 
-    AppendEntriesRequestProto request = newAppendEntriesRequest(CallId.getAndIncrement(), false);
+    ReferenceCountedObject<AppendEntriesRequestProto> request = nextAppendEntriesRequest(
+        CallId.getAndIncrement(), false);
     while (isRunning()) { // keep retrying for IOException
       try {
-        if (request == null || request.getEntriesCount() == 0) {
-          request = newAppendEntriesRequest(CallId.getAndIncrement(), false);
+        if (request == null || request.get().getEntriesCount() == 0) {
+          if (request != null) {
+            request.release();
+          }
+          request = nextAppendEntriesRequest(CallId.getAndIncrement(), false);
         }
 
         if (request == null) {
@@ -73,14 +78,8 @@
           return null;
         }
 
-        resetHeartbeatTrigger();
-        final Timestamp sendTime = Timestamp.currentTime();
-        getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0);
-        final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
-        getFollower().updateLastRpcResponseTime();
-        getFollower().updateLastRespondedAppendEntriesSendTime(sendTime);
-
-        getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit());
+        AppendEntriesReplyProto r = sendAppendEntries(request.get());
+        request.release();
         return r;
       } catch (InterruptedIOException | RaftLogIOException e) {
         throw e;
@@ -98,6 +97,18 @@
     return null;
   }
 
+  private AppendEntriesReplyProto sendAppendEntries(AppendEntriesRequestProto request) throws IOException {
+    resetHeartbeatTrigger();
+    final Timestamp sendTime = Timestamp.currentTime();
+    getFollower().updateLastRpcSendTime(request.getEntriesCount() == 0);
+    final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
+    getFollower().updateLastRpcResponseTime();
+    getFollower().updateLastRespondedAppendEntriesSendTime(sendTime);
+
+    getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit());
+    return r;
+  }
+
   private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException {
     String requestId = UUID.randomUUID().toString();
     InstallSnapshotReplyProto reply = null;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
index 0a9a1c9..284776d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java
@@ -428,23 +428,28 @@
 
     private ByteString checkStateMachineData(ByteString data) {
       if (data == null) {
-        throw new IllegalStateException("State machine data is null for log entry " + logEntry);
+        throw new IllegalStateException("State machine data is null for log entry " + this);
       }
       return data;
     }
 
     @Override
+    public long getIndex() {
+      return logEntry.getIndex();
+    }
+
+    @Override
     public int getSerializedSize() {
       return LogProtoUtils.getSerializedSize(logEntry);
     }
 
     @Override
     public LogEntryProto getEntry(TimeDuration timeout) throws RaftLogIOException, TimeoutException {
-      LogEntryProto entryProto;
       if (future == null) {
         return logEntry;
       }
 
+      final LogEntryProto entryProto;
       try {
         entryProto = future.thenApply(data -> LogProtoUtils.addStateMachineData(data, logEntry))
             .get(timeout.getDuration(), timeout.getUnit());
@@ -457,14 +462,14 @@
         if (e instanceof InterruptedException) {
           Thread.currentThread().interrupt();
         }
-        final String err = getName() + ": Failed readStateMachineData for " + toLogEntryString(logEntry);
+        final String err = getName() + ": Failed readStateMachineData for " + this;
         LOG.error(err, e);
         throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e));
       }
       // by this time we have already read the state machine data,
       // so the log entry data should be set now
       if (LogProtoUtils.isStateMachineDataEmpty(entryProto)) {
-        final String err = getName() + ": State machine data not set for " + toLogEntryString(logEntry);
+        final String err = getName() + ": State machine data not set for " + this;
         LOG.error(err);
         throw new RaftLogIOException(err);
       }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
index feedaee..55036fa 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java
@@ -135,14 +135,14 @@
   }
 
   @Override
-  public EntryWithData getEntryWithData(long index) {
-    // TODO. The reference counted object should be passed to LogAppender RATIS-2026.
-    ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
-    try {
-      return newEntryWithData(ref.get(), null);
-    } finally {
-      ref.release();
-    }
+  public EntryWithData getEntryWithData(long index) throws RaftLogIOException {
+    throw new UnsupportedOperationException("Use retainEntryWithData(" + index + ") instead.");
+  }
+
+  @Override
+  public ReferenceCountedObject<EntryWithData> retainEntryWithData(long index) {
+    final ReferenceCountedObject<LogEntryProto> ref = retainLog(index);
+    return ref.delegate(newEntryWithData(ref.get(), null));
   }
 
   @Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index bb0793a..b7dd326 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -319,21 +319,19 @@
 
   @Override
   public EntryWithData getEntryWithData(long index) throws RaftLogIOException {
+    throw new UnsupportedOperationException("Use retainEntryWithData(" + index + ") instead.");
+  }
+
+  @Override
+  public ReferenceCountedObject<EntryWithData> retainEntryWithData(long index) throws RaftLogIOException {
     final ReferenceCountedObject<LogEntryProto> entryRef = retainLog(index);
     if (entryRef == null) {
       throw new RaftLogIOException("Log entry not found: index = " + index);
     }
-    try {
-      // TODO. The reference counted object should be passed to LogAppender RATIS-2026.
-      return getEntryWithData(entryRef.get());
-    } finally {
-      entryRef.release();
-    }
-  }
 
-  private EntryWithData getEntryWithData(LogEntryProto entry) throws RaftLogIOException {
+    final LogEntryProto entry = entryRef.get();
     if (!LogProtoUtils.isStateMachineDataEmpty(entry)) {
-      return newEntryWithData(entry, null);
+      return entryRef.delegate(newEntryWithData(entry, null));
     }
 
     try {
@@ -344,7 +342,7 @@
           throw new CompletionException("Failed to read state machine data for log entry " + entry, ex);
         });
       }
-      return newEntryWithData(entry, future);
+      return entryRef.delegate(newEntryWithData(entry, future));
     } catch (Exception e) {
       final String err = getName() + ": Failed readStateMachineData for " +
           LogProtoUtils.toLogEntryString(entry);