RATIS-2013. OrderedAsync retry results an IllegalStateException in GrpcClientProtocolService. (#1026)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index ec16763..f423919 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -87,8 +87,10 @@
     public abstract RaftClientRequest newRequestImpl();
 
     final RaftClientRequest newRequest() {
-      attemptCount.incrementAndGet();
-      return newRequestImpl();
+      final int attempt = attemptCount.incrementAndGet();
+      final RaftClientRequest request = newRequestImpl();
+      LOG.debug("attempt #{}, newRequest {}", attempt, request);
+      return request;
     }
 
     CompletableFuture<RaftClientReply> getReplyFuture() {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index 316604d..732e3d8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -122,6 +122,13 @@
       return requests.values().iterator();
     }
 
+    /** @return true iff the request already exists. */
+    boolean putIfAbsent(REQUEST request) {
+      final long seqNum = request.getSeqNum();
+      final REQUEST previous = requests.putIfAbsent(seqNum, request);
+      return previous != null;
+    }
+
     void putNewRequest(REQUEST request) {
       final long seqNum = request.getSeqNum();
       CollectionUtils.putNew(seqNum, request, requests, () -> getName() + ":requests");
@@ -443,11 +450,16 @@
       final long seqNum = request.getSeqNum();
       if (nextToProcess == -1 && (request.isFirstRequest() || seqNum == 0)) {
         nextToProcess = seqNum;
-        LOG.debug("{}: got seq={} (first request), set nextToProcess in {}", requests.getName(), seqNum, this);
+        requests.putNewRequest(request);
+        LOG.debug("Received seq={} (first request), {}", seqNum, this);
       } else {
-        LOG.debug("{}: got seq={} in {}", requests.getName(), seqNum, this);
+        final boolean isRetry = requests.putIfAbsent(request);
+        LOG.debug("Received seq={}, isRetry? {}, {}", seqNum, isRetry, this);
+        if (isRetry) {
+          return;
+        }
       }
-      requests.putNewRequest(request);
+
       processRequestsFromHead(processingMethod);
     }
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
index 7eba6a8..264db89 100644
--- a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
+++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
@@ -178,12 +178,14 @@
   }
 
   void runTestExceptionRetryAttempts(MiniRaftClusterWithGrpc cluster) throws Exception {
-    final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+    final int retryCount = 5;
+    final RetryPolicy timeoutPolicy = MultipleLinearRandomRetry.parseCommaSeparated("1ms, " + retryCount);
     final ExceptionDependentRetry policy = ExceptionDependentRetry.newBuilder()
-        .setExceptionToPolicy(TimeoutIOException.class, MultipleLinearRandomRetry.parseCommaSeparated("1ms, 5"))
+        .setExceptionToPolicy(TimeoutIOException.class, timeoutPolicy)
         .setDefaultPolicy(RetryPolicies.retryForeverNoSleep())
         .build();
 
+    final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
     // create a client with the exception dependent policy
     try (final RaftClient client = cluster.createClient(policy)) {
       client.async().send(new RaftTestUtil.SimpleMessage("1")).get();
@@ -196,7 +198,8 @@
       Assert.fail("Test should have failed.");
     } catch (ExecutionException e) {
       RaftRetryFailureException rrfe = (RaftRetryFailureException) e.getCause();
-      Assert.assertEquals(16, rrfe.getAttemptCount());
+      final int expectedCount = 1 + retryCount; // new request attempt + retry attempts
+      Assert.assertEquals(expectedCount, rrfe.getAttemptCount());
     } finally {
       SimpleStateMachine4Testing.get(leader).unblockWriteStateMachineData();
       cluster.shutdown();