RATIS-2007. Zero-copy buffers are not released (#1027)

diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
index 1985bbe..222ccff 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
@@ -46,11 +46,11 @@
       ReferenceCountedObject<RaftClientRequest> requestRef) {
     try {
       // for backward compatibility
-      return submitClientRequestAsync(requestRef.retain())
-          .whenComplete((r, e) -> requestRef.release());
+      return submitClientRequestAsync(requestRef.retain());
     } catch (Exception e) {
-      requestRef.release();
       return JavaUtils.completeExceptionally(e);
+    } finally {
+      requestRef.release();
     }
   }
 }
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 732e3d8..7adc7a7 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -52,6 +52,9 @@
     boolean hasReply();
 
     void fail(Throwable e);
+
+    default void release() {
+    }
   }
 
   interface ClientSideRequest<REPLY> extends Request<REPLY> {
@@ -170,8 +173,12 @@
       putNewRequest(end);
     }
 
-    void clear() {
+    void clear(long nextToProcess) {
       LOG.debug("close {}", this);
+      final SortedMap<Long, REQUEST> tail = requests.tailMap(nextToProcess);
+      for (REQUEST r : tail.values()) {
+        r.release();
+      }
       requests.clear();
     }
 
@@ -469,6 +476,7 @@
           return;
         } else if (r.getSeqNum() == nextToProcess) {
           processingMethod.accept(r);
+          r.release();
           nextToProcess++;
         }
       }
@@ -514,7 +522,7 @@
 
     @Override
     public void close() {
-      requests.clear();
+      requests.clear(nextToProcess);
     }
   }
 }
\ No newline at end of file
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
index 67e75d6..0671a18 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
@@ -97,6 +97,13 @@
     }
 
     @Override
+    public void release() {
+      if (requestRef != null) {
+        requestRef.release();
+      }
+    }
+
+    @Override
     public long getSeqNum() {
       return request != null? request.getSlidingWindowEntry().getSeqNum(): Long.MAX_VALUE;
     }
@@ -363,7 +370,6 @@
       final long seq = pending.getSeqNum();
       processClientRequest(pending.getRequestRef(),
           reply -> slidingWindow.receiveReply(seq, reply, this::sendReply));
-      pending.getRequestRef().release();
     }
 
     @Override
@@ -378,7 +384,6 @@
         final RaftGroupId requestGroupId = request.getRaftGroupId();
         // use the group id in the first request as the group id of this observer
         final RaftGroupId updated = groupId.updateAndGet(g -> g != null ? g : requestGroupId);
-        final PendingOrderedRequest pending = new PendingOrderedRequest(requestRef);
 
         if (!requestGroupId.equals(updated)) {
           final GroupMismatchException exception = new GroupMismatchException(getId()
@@ -387,7 +392,13 @@
           responseError(exception, () -> "processClientRequest (Group mismatched) for " + request);
           return;
         }
-        slidingWindow.receivedRequest(pending, this::processClientRequest);
+        final PendingOrderedRequest pending = new PendingOrderedRequest(requestRef);
+        try {
+          slidingWindow.receivedRequest(pending, this::processClientRequest);
+        } catch (Exception e) {
+          pending.release();
+          throw e;
+        }
       } finally {
         requestRef.release();
       }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 8ad8354..396243b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -887,15 +887,19 @@
       return CompletableFuture.completedFuture(reply);
     }
 
-    final Timekeeper timer = raftServerMetrics.getClientRequestTimer(request.getType());
-    final Optional<Timekeeper.Context> timerContext = Optional.ofNullable(timer).map(Timekeeper::time);
-    return replyFuture(requestRef).whenComplete((clientReply, exception) -> {
+    try {
+      RaftClientRequest.Type type = request.getType();
+      final Timekeeper timer = raftServerMetrics.getClientRequestTimer(type);
+      final Optional<Timekeeper.Context> timerContext = Optional.ofNullable(timer).map(Timekeeper::time);
+      return replyFuture(requestRef).whenComplete((clientReply, exception) -> {
+        timerContext.ifPresent(Timekeeper.Context::stop);
+        if (exception != null || clientReply.getException() != null) {
+          raftServerMetrics.incFailedRequestCount(type);
+        }
+      });
+    } finally {
       requestRef.release();
-      timerContext.ifPresent(Timekeeper.Context::stop);
-      if (exception != null || clientReply.getException() != null) {
-        raftServerMetrics.incFailedRequestCount(request.getType());
-      }
-    });
+    }
   }
 
   private CompletableFuture<RaftClientReply> replyFuture(ReferenceCountedObject<RaftClientRequest> requestRef) {
@@ -1479,12 +1483,12 @@
       preAppendEntriesAsync(requestorId, ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
           previous, r.getLeaderCommit(), r.getInitializing(), entries);
       return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, r.getLeaderCommit(),
-          request.getCallId(), r.getInitializing(), r.getCommitInfosList(), entries, requestRef)
-          .whenComplete((reply, e) -> requestRef.release());
+          request.getCallId(), r.getInitializing(), r.getCommitInfosList(), entries, requestRef);
     } catch(Exception t) {
       LOG.error("{}: Failed appendEntriesAsync {}", getMemberId(), r, t);
-      requestRef.release();
       throw t;
+    } finally {
+      requestRef.release();
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 0750d2c..68da350 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -245,6 +245,7 @@
         if (ti.equals(key.getTermIndex())) {
           toReturn.set(entry);
         }
+        entryRef.release();
       });
       loadingTimes.incrementAndGet();
       return Objects.requireNonNull(toReturn.get());