RATIS-2013. OrderedAsync retry results an IllegalStateException in GrpcClientProtocolService. (#1026)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index ec16763..f423919 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -87,8 +87,10 @@
public abstract RaftClientRequest newRequestImpl();
final RaftClientRequest newRequest() {
- attemptCount.incrementAndGet();
- return newRequestImpl();
+ final int attempt = attemptCount.incrementAndGet();
+ final RaftClientRequest request = newRequestImpl();
+ LOG.debug("attempt #{}, newRequest {}", attempt, request);
+ return request;
}
CompletableFuture<RaftClientReply> getReplyFuture() {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 316604d..732e3d8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -122,6 +122,13 @@
return requests.values().iterator();
}
+ /** @return true iff the request already exists. */
+ boolean putIfAbsent(REQUEST request) {
+ final long seqNum = request.getSeqNum();
+ final REQUEST previous = requests.putIfAbsent(seqNum, request);
+ return previous != null;
+ }
+
void putNewRequest(REQUEST request) {
final long seqNum = request.getSeqNum();
CollectionUtils.putNew(seqNum, request, requests, () -> getName() + ":requests");
@@ -443,11 +450,16 @@
final long seqNum = request.getSeqNum();
if (nextToProcess == -1 && (request.isFirstRequest() || seqNum == 0)) {
nextToProcess = seqNum;
- LOG.debug("{}: got seq={} (first request), set nextToProcess in {}", requests.getName(), seqNum, this);
+ requests.putNewRequest(request);
+ LOG.debug("Received seq={} (first request), {}", seqNum, this);
} else {
- LOG.debug("{}: got seq={} in {}", requests.getName(), seqNum, this);
+ final boolean isRetry = requests.putIfAbsent(request);
+ LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this);
+ if (isRetry) {
+ return;
+ }
}
- requests.putNewRequest(request);
+
processRequestsFromHead(processingMethod);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
index 7eba6a8..264db89 100644
--- a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
+++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
@@ -178,12 +178,14 @@
}
void runTestExceptionRetryAttempts(MiniRaftClusterWithGrpc cluster) throws Exception {
- final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ final int retryCount = 5;
+ final RetryPolicy timeoutPolicy = MultipleLinearRandomRetry.parseCommaSeparated("1ms, " + retryCount);
final ExceptionDependentRetry policy = ExceptionDependentRetry.newBuilder()
- .setExceptionToPolicy(TimeoutIOException.class, MultipleLinearRandomRetry.parseCommaSeparated("1ms, 5"))
+ .setExceptionToPolicy(TimeoutIOException.class, timeoutPolicy)
.setDefaultPolicy(RetryPolicies.retryForeverNoSleep())
.build();
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
// create a client with the exception dependent policy
try (final RaftClient client = cluster.createClient(policy)) {
client.async().send(new RaftTestUtil.SimpleMessage("1")).get();
@@ -196,7 +198,8 @@
Assert.fail("Test should have failed.");
} catch (ExecutionException e) {
RaftRetryFailureException rrfe = (RaftRetryFailureException) e.getCause();
- Assert.assertEquals(16, rrfe.getAttemptCount());
+ final int expectedCount = 1 + retryCount; // new request attempt + retry attempts
+ Assert.assertEquals(expectedCount, rrfe.getAttemptCount());
} finally {
SimpleStateMachine4Testing.get(leader).unblockWriteStateMachineData();
cluster.shutdown();