RATIS-2018. Zero-copy buffers are not released - 2nd chunk (#1032)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
index 815b90d..4cca3a9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
@@ -117,7 +117,13 @@
 
       @Override
       public boolean release() {
-        return fromRefs.stream().map(ReferenceCountedObject::release).allMatch(r -> r);
+        boolean allReleased = true;
+        for (ReferenceCountedObject<T> ref : fromRefs) {
+          if (!ref.release()) {
+            allReleased = false;
+          }
+        }
+        return allReleased;
       }
     };
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 7adc7a7..22285a7 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -167,6 +167,7 @@
                 + " will NEVER be processed; request = " + r);
         r.fail(e);
         replyMethod.accept(r);
+        r.release();
       }
       tail.clear();
 
@@ -455,19 +456,26 @@
     /** A request (or a retry) arrives (may be out-of-order except for the first request). */
     public synchronized void receivedRequest(REQUEST request, Consumer<REQUEST> processingMethod) {
       final long seqNum = request.getSeqNum();
+      final boolean accepted;
       if (nextToProcess == -1 && (request.isFirstRequest() || seqNum == 0)) {
         nextToProcess = seqNum;
         requests.putNewRequest(request);
         LOG.debug("Received seq={} (first request), {}", seqNum, this);
+        accepted = true;
+      } else if (request.getSeqNum() < nextToProcess) {
+        LOG.debug("Received seq={} < nextToProcess {}, {}", seqNum, nextToProcess, this);
+        accepted = false;
       } else {
         final boolean isRetry = requests.putIfAbsent(request);
         LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this);
-        if (isRetry) {
-          return;
-        }
+        accepted = !isRetry;
       }
 
-      processRequestsFromHead(processingMethod);
+      if (accepted) {
+        processRequestsFromHead(processingMethod);
+      } else {
+        request.release();
+      }
     }
 
     private void processRequestsFromHead(Consumer<REQUEST> processingMethod) {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
index 0baefa2..5a7f003 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -63,14 +63,8 @@
       Metadata.Key.of("heartbeat", Metadata.ASCII_STRING_MARSHALLER);
 
   static StatusRuntimeException wrapException(Throwable t) {
-    return wrapException(t, -1);
-  }
-
-  static StatusRuntimeException wrapException(Throwable t, long callId) {
     t = JavaUtils.unwrapCompletionException(t);
-    Metadata trailers = new StatusRuntimeExceptionMetadataBuilder(t)
-        .addCallId(callId)
-        .build();
+    Metadata trailers = new StatusRuntimeExceptionMetadataBuilder(t).build();
     return wrapException(t, trailers);
   }
 
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
index 0671a18..97b0118 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
@@ -29,6 +29,7 @@
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
 import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
@@ -317,14 +318,16 @@
     void processClientRequest(ReferenceCountedObject<RaftClientRequest> requestRef) {
       final RaftClientRequest request = requestRef.retain();
       final long callId = request.getCallId();
-
+      final SlidingWindowEntry slidingWindowEntry = request.getSlidingWindowEntry();
       final CompletableFuture<Void> f = processClientRequest(requestRef, reply -> {
         if (!reply.isSuccess()) {
-          LOG.info("Failed {}, reply={}", request, reply);
+          LOG.info("Failed request cid={}, {}, reply={}", callId, slidingWindowEntry, reply);
         }
         final RaftClientReplyProto proto = ClientProtoUtils.toRaftClientReplyProto(reply);
         responseNext(proto);
-      }).whenComplete((r, e) -> requestRef.release());
+      });
+
+      requestRef.release();
 
       put(callId, f);
       f.thenAccept(dummy -> remove(callId));
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index ebe764f..9c426f3 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -28,7 +28,6 @@
 import org.apache.ratis.server.util.ServerStringUtils;
 import org.apache.ratis.thirdparty.io.grpc.ServerServiceDefinition;
 import org.apache.ratis.thirdparty.io.grpc.Status;
-import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
@@ -51,21 +50,28 @@
   public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class);
 
   static class PendingServerRequest<REQUEST> {
-    private final REQUEST request;
+    private final AtomicReference<ReferenceCountedObject<REQUEST>> requestRef;
     private final CompletableFuture<Void> future = new CompletableFuture<>();
 
     PendingServerRequest(ReferenceCountedObject<REQUEST> requestRef) {
-      this.request = requestRef.retain();
-      this.future.whenComplete((r, e) -> requestRef.release());
+      requestRef.retain();
+      this.requestRef = new AtomicReference<>(requestRef);
     }
 
     REQUEST getRequest() {
-      return request;
+      return Optional.ofNullable(requestRef.get())
+          .map(ReferenceCountedObject::get)
+          .orElse(null);
     }
 
     CompletableFuture<Void> getFuture() {
       return future;
     }
+
+    void release() {
+      Optional.ofNullable(requestRef.getAndSet(null))
+          .ifPresent(ReferenceCountedObject::release);
+    }
   }
 
   abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements StreamObserver<REQUEST> {
@@ -108,20 +114,21 @@
 
     abstract long getCallId(REQUEST request);
 
+    boolean isHeartbeat(REQUEST request) {
+      return false;
+    }
+
     abstract String requestToString(REQUEST request);
 
     abstract String replyToString(REPLY reply);
 
     abstract boolean replyInOrder(REQUEST request);
 
-    StatusRuntimeException wrapException(Throwable e, REQUEST request) {
-      return GrpcUtil.wrapException(e, getCallId(request));
-    }
-
-    private void handleError(Throwable e, REQUEST request) {
-      GrpcUtil.warn(LOG, () -> getId() + ": Failed " + op + " request " + requestToString(request), e);
+    private synchronized void handleError(Throwable e, long callId, boolean isHeartbeat) {
+      GrpcUtil.warn(LOG, () -> getId() + ": Failed " + op + " request cid=" + callId + ", isHeartbeat? "
+          + isHeartbeat, e);
       if (isClosed.compareAndSet(false, true)) {
-        responseObserver.onError(wrapException(e, request));
+        responseObserver.onError(GrpcUtil.wrapException(e, callId, isHeartbeat));
       }
     }
 
@@ -151,21 +158,22 @@
         try {
           composeRequest(process(requestRef).thenApply(this::handleReply));
         } catch (Exception e) {
-          handleError(e, request);
+          handleError(e, getCallId(request), isHeartbeat(request));
           release(request);
         }
         return;
       }
 
       final PendingServerRequest<REQUEST> current = new PendingServerRequest<>(requestRef);
-      final PendingServerRequest<REQUEST> previous = previousOnNext.getAndSet(current);
-      final CompletableFuture<Void> previousFuture = Optional.ofNullable(previous)
-          .map(PendingServerRequest::getFuture)
+      final long callId = getCallId(current.getRequest());
+      final boolean isHeartbeat = isHeartbeat(current.getRequest());
+      final Optional<PendingServerRequest<REQUEST>> previous = Optional.ofNullable(previousOnNext.getAndSet(current));
+      final CompletableFuture<Void> previousFuture = previous.map(PendingServerRequest::getFuture)
           .orElse(CompletableFuture.completedFuture(null));
       try {
         final CompletableFuture<REPLY> f = process(requestRef).exceptionally(e -> {
           // Handle cases, such as RaftServer is paused
-          handleError(e, request);
+          handleError(e, callId, isHeartbeat);
           current.getFuture().completeExceptionally(e);
           return null;
         }).thenCombine(previousFuture, (reply, v) -> {
@@ -175,8 +183,14 @@
         });
         composeRequest(f);
       } catch (Exception e) {
-        handleError(e, request);
+        handleError(e, callId, isHeartbeat);
         current.getFuture().completeExceptionally(e);
+      } finally {
+        previous.ifPresent(PendingServerRequest::release);
+        if (isClosed.get()) {
+          // Some requests may come after onCompleted or onError, ensure they're released.
+          releaseLast();
+        }
       }
     }
 
@@ -188,8 +202,10 @@
           LOG.info("{}: Completed {}, lastReply: {}", getId(), op, reply);
           responseObserver.onCompleted();
         });
+        releaseLast();
       }
     }
+
     @Override
     public void onError(Throwable t) {
       GrpcUtil.warn(LOG, () -> getId() + ": "+ op + " onError, lastRequest: " + getPreviousRequestString(), t);
@@ -198,8 +214,13 @@
         if (status != null && status.getCode() != Status.Code.CANCELLED) {
           responseObserver.onCompleted();
         }
+        releaseLast();
       }
     }
+
+    private void releaseLast() {
+      Optional.ofNullable(previousOnNext.get()).ifPresent(PendingServerRequest::release);
+    }
   }
 
   private final Supplier<RaftPeerId> idSupplier;
@@ -289,6 +310,11 @@
       }
 
       @Override
+      boolean isHeartbeat(AppendEntriesRequestProto request) {
+        return request.getEntriesCount() == 0;
+      }
+
+      @Override
       String requestToString(AppendEntriesRequestProto request) {
         return ServerStringUtils.toAppendEntriesRequestString(request);
       }
@@ -302,11 +328,6 @@
       boolean replyInOrder(AppendEntriesRequestProto request) {
         return request.getEntriesCount() != 0;
       }
-
-      @Override
-      StatusRuntimeException wrapException(Throwable e, AppendEntriesRequestProto request) {
-        return GrpcUtil.wrapException(e, getCallId(request), request.getEntriesCount() == 0);
-      }
     };
   }
 
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
index 1244e72..035e0a8 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
@@ -50,11 +50,9 @@
       ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws IOException {
     // Default implementation for backward compatibility.
     try {
-      return appendEntriesAsync(requestRef.retain())
-          .whenComplete((r, e) -> requestRef.release());
-    } catch (Exception e) {
+      return appendEntriesAsync(requestRef.retain());
+    } finally {
       requestRef.release();
-      throw e;
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 8f6c927..c47db14 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1076,7 +1076,13 @@
         return f.thenApply(r -> null);
       }
       // the message stream has ended and the request become a WRITE request
-      return replyFuture(f.join());
+      ReferenceCountedObject<RaftClientRequest> joinedRequest = f.join();
+      try {
+        return replyFuture(joinedRequest);
+      } finally {
+        // Released pending streaming requests.
+        joinedRequest.release();
+      }
     }
 
     return role.getLeaderState()
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 1d08316..d222482 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -42,6 +42,7 @@
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 /**
  * In-memory RaftLog Cache. Currently we provide a simple implementation that
@@ -350,12 +351,10 @@
     TruncationSegments purge(long index) {
       try (AutoCloseableLock writeLock = writeLock()) {
         int segmentIndex = binarySearch(index);
-        List<SegmentFileInfo> list = new ArrayList<>();
+        List<LogSegment> list = new LinkedList<>();
 
         if (segmentIndex == -segments.size() - 1) {
-          for (LogSegment ls : segments) {
-            list.add(SegmentFileInfo.newClosedSegmentFileInfo(ls));
-          }
+          list.addAll(segments);
           segments.clear();
           sizeInBytes = 0;
         } else if (segmentIndex >= 0) {
@@ -368,13 +367,16 @@
           for (int i = 0; i <= startIndex; i++) {
             LogSegment segment = segments.remove(0); // must remove the first segment to avoid gaps.
             sizeInBytes -= segment.getTotalFileSize();
-            list.add(SegmentFileInfo.newClosedSegmentFileInfo(segment));
+            list.add(segment);
           }
         } else {
           throw new IllegalStateException("Unexpected gap in segments: binarySearch(" + index + ") returns "
                   + segmentIndex + ", segments=" + segments);
         }
-        return list.isEmpty() ? null : new TruncationSegments(null, list);
+        list.forEach(LogSegment::evictCache);
+        List<SegmentFileInfo> toDelete = list.stream().map(SegmentFileInfo::newClosedSegmentFileInfo)
+            .collect(Collectors.toList());
+        return list.isEmpty() ? null : new TruncationSegments(null, toDelete);
       }
     }
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
index 07073be..7c40ec2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
@@ -42,6 +42,7 @@
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
@@ -247,13 +248,24 @@
     LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
     LOG.info("applyTransaction for log index {}", entry.getIndex());
 
-    put(entry);
+    // TODO: Logs kept in StateMachine's cache may be corrupted. Copy for now to have the test pass.
+    // Use ReferenceCount per RATIS-1997.
+    LogEntryProto copied = copy(entry);
+    put(copied);
     updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
 
     final SimpleMessage m = new SimpleMessage(entry.getIndex() + " OK");
     return collecting.collect(Collecting.Type.APPLY_TRANSACTION, m);
   }
 
+  private LogEntryProto copy(LogEntryProto log) {
+    try {
+      return LogEntryProto.parseFrom(log.toByteString());
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalStateException("Error copying log entry", e);
+    }
+  }
+
   @Override
   public long takeSnapshot() {
     final TermIndex termIndex = getLastAppliedTermIndex();