RATIS-2007. Zero-copy buffers are not released (#1027)
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
index 1985bbe..222ccff 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientAsynchronousProtocol.java
@@ -46,11 +46,11 @@
ReferenceCountedObject<RaftClientRequest> requestRef) {
try {
// for backward compatibility
- return submitClientRequestAsync(requestRef.retain())
- .whenComplete((r, e) -> requestRef.release());
+ return submitClientRequestAsync(requestRef.retain());
} catch (Exception e) {
- requestRef.release();
return JavaUtils.completeExceptionally(e);
+ } finally {
+ requestRef.release();
}
}
}
\ No newline at end of file
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 732e3d8..7adc7a7 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -52,6 +52,9 @@
boolean hasReply();
void fail(Throwable e);
+
+ default void release() {
+ }
}
interface ClientSideRequest<REPLY> extends Request<REPLY> {
@@ -170,8 +173,12 @@
putNewRequest(end);
}
- void clear() {
+ void clear(long nextToProcess) {
LOG.debug("close {}", this);
+ final SortedMap<Long, REQUEST> tail = requests.tailMap(nextToProcess);
+ for (REQUEST r : tail.values()) {
+ r.release();
+ }
requests.clear();
}
@@ -469,6 +476,7 @@
return;
} else if (r.getSeqNum() == nextToProcess) {
processingMethod.accept(r);
+ r.release();
nextToProcess++;
}
}
@@ -514,7 +522,7 @@
@Override
public void close() {
- requests.clear();
+ requests.clear(nextToProcess);
}
}
}
\ No newline at end of file
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
index 67e75d6..0671a18 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
@@ -97,6 +97,13 @@
}
@Override
+ public void release() {
+ if (requestRef != null) {
+ requestRef.release();
+ }
+ }
+
+ @Override
public long getSeqNum() {
return request != null? request.getSlidingWindowEntry().getSeqNum(): Long.MAX_VALUE;
}
@@ -363,7 +370,6 @@
final long seq = pending.getSeqNum();
processClientRequest(pending.getRequestRef(),
reply -> slidingWindow.receiveReply(seq, reply, this::sendReply));
- pending.getRequestRef().release();
}
@Override
@@ -378,7 +384,6 @@
final RaftGroupId requestGroupId = request.getRaftGroupId();
// use the group id in the first request as the group id of this observer
final RaftGroupId updated = groupId.updateAndGet(g -> g != null ? g : requestGroupId);
- final PendingOrderedRequest pending = new PendingOrderedRequest(requestRef);
if (!requestGroupId.equals(updated)) {
final GroupMismatchException exception = new GroupMismatchException(getId()
@@ -387,7 +392,13 @@
responseError(exception, () -> "processClientRequest (Group mismatched) for " + request);
return;
}
- slidingWindow.receivedRequest(pending, this::processClientRequest);
+ final PendingOrderedRequest pending = new PendingOrderedRequest(requestRef);
+ try {
+ slidingWindow.receivedRequest(pending, this::processClientRequest);
+ } catch (Exception e) {
+ pending.release();
+ throw e;
+ }
} finally {
requestRef.release();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 8ad8354..396243b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -887,15 +887,19 @@
return CompletableFuture.completedFuture(reply);
}
- final Timekeeper timer = raftServerMetrics.getClientRequestTimer(request.getType());
- final Optional<Timekeeper.Context> timerContext = Optional.ofNullable(timer).map(Timekeeper::time);
- return replyFuture(requestRef).whenComplete((clientReply, exception) -> {
+ try {
+ RaftClientRequest.Type type = request.getType();
+ final Timekeeper timer = raftServerMetrics.getClientRequestTimer(type);
+ final Optional<Timekeeper.Context> timerContext = Optional.ofNullable(timer).map(Timekeeper::time);
+ return replyFuture(requestRef).whenComplete((clientReply, exception) -> {
+ timerContext.ifPresent(Timekeeper.Context::stop);
+ if (exception != null || clientReply.getException() != null) {
+ raftServerMetrics.incFailedRequestCount(type);
+ }
+ });
+ } finally {
requestRef.release();
- timerContext.ifPresent(Timekeeper.Context::stop);
- if (exception != null || clientReply.getException() != null) {
- raftServerMetrics.incFailedRequestCount(request.getType());
- }
- });
+ }
}
private CompletableFuture<RaftClientReply> replyFuture(ReferenceCountedObject<RaftClientRequest> requestRef) {
@@ -1479,12 +1483,12 @@
preAppendEntriesAsync(requestorId, ProtoUtils.toRaftGroupId(request.getRaftGroupId()), r.getLeaderTerm(),
previous, r.getLeaderCommit(), r.getInitializing(), entries);
return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous, r.getLeaderCommit(),
- request.getCallId(), r.getInitializing(), r.getCommitInfosList(), entries, requestRef)
- .whenComplete((reply, e) -> requestRef.release());
+ request.getCallId(), r.getInitializing(), r.getCommitInfosList(), entries, requestRef);
} catch(Exception t) {
LOG.error("{}: Failed appendEntriesAsync {}", getMemberId(), r, t);
- requestRef.release();
throw t;
+ } finally {
+ requestRef.release();
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 0750d2c..68da350 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -245,6 +245,7 @@
if (ti.equals(key.getTermIndex())) {
toReturn.set(entry);
}
+ entryRef.release();
});
loadingTimes.incrementAndGet();
return Objects.requireNonNull(toReturn.get());