RATIS-2018. Zero-copy buffers are not released - 2nd chunk (#1032)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
index 815b90d..4cca3a9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
@@ -117,7 +117,13 @@
@Override
public boolean release() {
- return fromRefs.stream().map(ReferenceCountedObject::release).allMatch(r -> r);
+ boolean allReleased = true;
+ for (ReferenceCountedObject<T> ref : fromRefs) {
+ if (!ref.release()) {
+ allReleased = false;
+ }
+ }
+ return allReleased;
}
};
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 7adc7a7..22285a7 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -167,6 +167,7 @@
+ " will NEVER be processed; request = " + r);
r.fail(e);
replyMethod.accept(r);
+ r.release();
}
tail.clear();
@@ -455,19 +456,26 @@
/** A request (or a retry) arrives (may be out-of-order except for the first request). */
public synchronized void receivedRequest(REQUEST request, Consumer<REQUEST> processingMethod) {
final long seqNum = request.getSeqNum();
+ final boolean accepted;
if (nextToProcess == -1 && (request.isFirstRequest() || seqNum == 0)) {
nextToProcess = seqNum;
requests.putNewRequest(request);
LOG.debug("Received seq={} (first request), {}", seqNum, this);
+ accepted = true;
+ } else if (request.getSeqNum() < nextToProcess) {
+ LOG.debug("Received seq={} < nextToProcess {}, {}", seqNum, nextToProcess, this);
+ accepted = false;
} else {
final boolean isRetry = requests.putIfAbsent(request);
LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this);
- if (isRetry) {
- return;
- }
+ accepted = !isRetry;
}
- processRequestsFromHead(processingMethod);
+ if (accepted) {
+ processRequestsFromHead(processingMethod);
+ } else {
+ request.release();
+ }
}
private void processRequestsFromHead(Consumer<REQUEST> processingMethod) {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
index 0baefa2..5a7f003 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -63,14 +63,8 @@
Metadata.Key.of("heartbeat", Metadata.ASCII_STRING_MARSHALLER);
static StatusRuntimeException wrapException(Throwable t) {
- return wrapException(t, -1);
- }
-
- static StatusRuntimeException wrapException(Throwable t, long callId) {
t = JavaUtils.unwrapCompletionException(t);
- Metadata trailers = new StatusRuntimeExceptionMetadataBuilder(t)
- .addCallId(callId)
- .build();
+ Metadata trailers = new StatusRuntimeExceptionMetadataBuilder(t).build();
return wrapException(t, trailers);
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
index 0671a18..97b0118 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java
@@ -29,6 +29,7 @@
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
@@ -317,14 +318,16 @@
void processClientRequest(ReferenceCountedObject<RaftClientRequest> requestRef) {
final RaftClientRequest request = requestRef.retain();
final long callId = request.getCallId();
-
+ final SlidingWindowEntry slidingWindowEntry = request.getSlidingWindowEntry();
final CompletableFuture<Void> f = processClientRequest(requestRef, reply -> {
if (!reply.isSuccess()) {
- LOG.info("Failed {}, reply={}", request, reply);
+ LOG.info("Failed request cid={}, {}, reply={}", callId, slidingWindowEntry, reply);
}
final RaftClientReplyProto proto = ClientProtoUtils.toRaftClientReplyProto(reply);
responseNext(proto);
- }).whenComplete((r, e) -> requestRef.release());
+ });
+
+ requestRef.release();
put(callId, f);
f.thenAccept(dummy -> remove(callId));
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
index ebe764f..9c426f3 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolService.java
@@ -28,7 +28,6 @@
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.thirdparty.io.grpc.ServerServiceDefinition;
import org.apache.ratis.thirdparty.io.grpc.Status;
-import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceImplBase;
@@ -51,21 +50,28 @@
public static final Logger LOG = LoggerFactory.getLogger(GrpcServerProtocolService.class);
static class PendingServerRequest<REQUEST> {
- private final REQUEST request;
+ private final AtomicReference<ReferenceCountedObject<REQUEST>> requestRef;
private final CompletableFuture<Void> future = new CompletableFuture<>();
PendingServerRequest(ReferenceCountedObject<REQUEST> requestRef) {
- this.request = requestRef.retain();
- this.future.whenComplete((r, e) -> requestRef.release());
+ requestRef.retain();
+ this.requestRef = new AtomicReference<>(requestRef);
}
REQUEST getRequest() {
- return request;
+ return Optional.ofNullable(requestRef.get())
+ .map(ReferenceCountedObject::get)
+ .orElse(null);
}
CompletableFuture<Void> getFuture() {
return future;
}
+
+ void release() {
+ Optional.ofNullable(requestRef.getAndSet(null))
+ .ifPresent(ReferenceCountedObject::release);
+ }
}
abstract class ServerRequestStreamObserver<REQUEST, REPLY> implements StreamObserver<REQUEST> {
@@ -108,20 +114,21 @@
abstract long getCallId(REQUEST request);
+ boolean isHeartbeat(REQUEST request) {
+ return false;
+ }
+
abstract String requestToString(REQUEST request);
abstract String replyToString(REPLY reply);
abstract boolean replyInOrder(REQUEST request);
- StatusRuntimeException wrapException(Throwable e, REQUEST request) {
- return GrpcUtil.wrapException(e, getCallId(request));
- }
-
- private void handleError(Throwable e, REQUEST request) {
- GrpcUtil.warn(LOG, () -> getId() + ": Failed " + op + " request " + requestToString(request), e);
+ private synchronized void handleError(Throwable e, long callId, boolean isHeartbeat) {
+ GrpcUtil.warn(LOG, () -> getId() + ": Failed " + op + " request cid=" + callId + ", isHeartbeat? "
+ + isHeartbeat, e);
if (isClosed.compareAndSet(false, true)) {
- responseObserver.onError(wrapException(e, request));
+ responseObserver.onError(GrpcUtil.wrapException(e, callId, isHeartbeat));
}
}
@@ -151,21 +158,22 @@
try {
composeRequest(process(requestRef).thenApply(this::handleReply));
} catch (Exception e) {
- handleError(e, request);
+ handleError(e, getCallId(request), isHeartbeat(request));
release(request);
}
return;
}
final PendingServerRequest<REQUEST> current = new PendingServerRequest<>(requestRef);
- final PendingServerRequest<REQUEST> previous = previousOnNext.getAndSet(current);
- final CompletableFuture<Void> previousFuture = Optional.ofNullable(previous)
- .map(PendingServerRequest::getFuture)
+ final long callId = getCallId(current.getRequest());
+ final boolean isHeartbeat = isHeartbeat(current.getRequest());
+ final Optional<PendingServerRequest<REQUEST>> previous = Optional.ofNullable(previousOnNext.getAndSet(current));
+ final CompletableFuture<Void> previousFuture = previous.map(PendingServerRequest::getFuture)
.orElse(CompletableFuture.completedFuture(null));
try {
final CompletableFuture<REPLY> f = process(requestRef).exceptionally(e -> {
// Handle cases, such as RaftServer is paused
- handleError(e, request);
+ handleError(e, callId, isHeartbeat);
current.getFuture().completeExceptionally(e);
return null;
}).thenCombine(previousFuture, (reply, v) -> {
@@ -175,8 +183,14 @@
});
composeRequest(f);
} catch (Exception e) {
- handleError(e, request);
+ handleError(e, callId, isHeartbeat);
current.getFuture().completeExceptionally(e);
+ } finally {
+ previous.ifPresent(PendingServerRequest::release);
+ if (isClosed.get()) {
+ // Some requests may come after onCompleted or onError, ensure they're released.
+ releaseLast();
+ }
}
}
@@ -188,8 +202,10 @@
LOG.info("{}: Completed {}, lastReply: {}", getId(), op, reply);
responseObserver.onCompleted();
});
+ releaseLast();
}
}
+
@Override
public void onError(Throwable t) {
GrpcUtil.warn(LOG, () -> getId() + ": "+ op + " onError, lastRequest: " + getPreviousRequestString(), t);
@@ -198,8 +214,13 @@
if (status != null && status.getCode() != Status.Code.CANCELLED) {
responseObserver.onCompleted();
}
+ releaseLast();
}
}
+
+ private void releaseLast() {
+ Optional.ofNullable(previousOnNext.get()).ifPresent(PendingServerRequest::release);
+ }
}
private final Supplier<RaftPeerId> idSupplier;
@@ -289,6 +310,11 @@
}
@Override
+ boolean isHeartbeat(AppendEntriesRequestProto request) {
+ return request.getEntriesCount() == 0;
+ }
+
+ @Override
String requestToString(AppendEntriesRequestProto request) {
return ServerStringUtils.toAppendEntriesRequestString(request);
}
@@ -302,11 +328,6 @@
boolean replyInOrder(AppendEntriesRequestProto request) {
return request.getEntriesCount() != 0;
}
-
- @Override
- StatusRuntimeException wrapException(Throwable e, AppendEntriesRequestProto request) {
- return GrpcUtil.wrapException(e, getCallId(request), request.getEntriesCount() == 0);
- }
};
}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
index 1244e72..035e0a8 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/RaftServerAsynchronousProtocol.java
@@ -50,11 +50,9 @@
ReferenceCountedObject<AppendEntriesRequestProto> requestRef) throws IOException {
// Default implementation for backward compatibility.
try {
- return appendEntriesAsync(requestRef.retain())
- .whenComplete((r, e) -> requestRef.release());
- } catch (Exception e) {
+ return appendEntriesAsync(requestRef.retain());
+ } finally {
requestRef.release();
- throw e;
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 8f6c927..c47db14 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1076,7 +1076,13 @@
return f.thenApply(r -> null);
}
// the message stream has ended and the request become a WRITE request
- return replyFuture(f.join());
+ ReferenceCountedObject<RaftClientRequest> joinedRequest = f.join();
+ try {
+ return replyFuture(joinedRequest);
+ } finally {
+ // Released pending streaming requests.
+ joinedRequest.release();
+ }
}
return role.getLeaderState()
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 1d08316..d222482 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -42,6 +42,7 @@
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
/**
* In-memory RaftLog Cache. Currently we provide a simple implementation that
@@ -350,12 +351,10 @@
TruncationSegments purge(long index) {
try (AutoCloseableLock writeLock = writeLock()) {
int segmentIndex = binarySearch(index);
- List<SegmentFileInfo> list = new ArrayList<>();
+ List<LogSegment> list = new LinkedList<>();
if (segmentIndex == -segments.size() - 1) {
- for (LogSegment ls : segments) {
- list.add(SegmentFileInfo.newClosedSegmentFileInfo(ls));
- }
+ list.addAll(segments);
segments.clear();
sizeInBytes = 0;
} else if (segmentIndex >= 0) {
@@ -368,13 +367,16 @@
for (int i = 0; i <= startIndex; i++) {
LogSegment segment = segments.remove(0); // must remove the first segment to avoid gaps.
sizeInBytes -= segment.getTotalFileSize();
- list.add(SegmentFileInfo.newClosedSegmentFileInfo(segment));
+ list.add(segment);
}
} else {
throw new IllegalStateException("Unexpected gap in segments: binarySearch(" + index + ") returns "
+ segmentIndex + ", segments=" + segments);
}
- return list.isEmpty() ? null : new TruncationSegments(null, list);
+ list.forEach(LogSegment::evictCache);
+ List<SegmentFileInfo> toDelete = list.stream().map(SegmentFileInfo::newClosedSegmentFileInfo)
+ .collect(Collectors.toList());
+ return list.isEmpty() ? null : new TruncationSegments(null, toDelete);
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
index 07073be..7c40ec2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java
@@ -42,6 +42,7 @@
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
@@ -247,13 +248,24 @@
LogEntryProto entry = Objects.requireNonNull(trx.getLogEntry());
LOG.info("applyTransaction for log index {}", entry.getIndex());
- put(entry);
+ // TODO: Logs kept in StateMachine's cache may be corrupted. Copy for now to have the test pass.
+ // Use ReferenceCount per RATIS-1997.
+ LogEntryProto copied = copy(entry);
+ put(copied);
updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
final SimpleMessage m = new SimpleMessage(entry.getIndex() + " OK");
return collecting.collect(Collecting.Type.APPLY_TRANSACTION, m);
}
+ private LogEntryProto copy(LogEntryProto log) {
+ try {
+ return LogEntryProto.parseFrom(log.toByteString());
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalStateException("Error copying log entry", e);
+ }
+ }
+
@Override
public long takeSnapshot() {
final TermIndex termIndex = getLastAppliedTermIndex();