RATIS-2012. Client should not retry after close. (#1025)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index 4be9fa3..7698780 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -119,16 +119,18 @@
         ioe = e;
       }
 
-      pending.incrementExceptionCount(ioe);
-      ClientRetryEvent event = new ClientRetryEvent(request, ioe, pending);
-      final RetryPolicy retryPolicy = client.getRetryPolicy();
-      final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
-      TimeDuration sleepTime = client.getEffectiveSleepTime(ioe, action.getSleepTime());
-
-      if (!action.shouldRetry()) {
-        throw (IOException)client.noMoreRetries(event);
+      if (client.isClosed()) {
+        throw new AlreadyClosedException(this + " is closed.");
       }
 
+      final ClientRetryEvent event = pending.newClientRetryEvent(request, ioe);
+      final RetryPolicy retryPolicy = client.getRetryPolicy();
+      final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
+      if (!action.shouldRetry()) {
+        throw client.noMoreRetries(event);
+      }
+
+      final TimeDuration sleepTime = client.getEffectiveSleepTime(ioe, action.getSleepTime());
       try {
         sleepTime.sleep();
       } catch (InterruptedException e) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index a1aa586..34dc3be 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -149,10 +149,6 @@
     getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), t);
   }
 
-  private void handleAsyncRetryFailure(ClientRetryEvent event) {
-    failAllAsyncRequests(event.getRequest(), client.noMoreRetries(event));
-  }
-
   CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, Message message, RaftPeerId server) {
     if (!type.is(TypeCase.WATCH) && !type.is(TypeCase.MESSAGESTREAM)) {
       Objects.requireNonNull(message, "message == null");
@@ -187,85 +183,68 @@
     if (pending == null) {
       return;
     }
-
-    final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
-    if (f.isDone()) {
+    if (pending.getReplyFuture().isDone()) {
       return;
     }
 
-    final RaftClientRequest request = pending.newRequestImpl();
+    final RaftClientRequest request = pending.newRequest();
     if (request == null) { // already done
-      LOG.debug("{} newRequestImpl returns null", pending);
+      LOG.debug("{} newRequest returns null", pending);
       return;
     }
 
-    final RetryPolicy retryPolicy = client.getRetryPolicy();
-    sendRequest(pending).exceptionally(e -> {
-      if (e instanceof CompletionException) {
-        e = JavaUtils.unwrapCompletionException(e);
-        scheduleWithTimeout(pending, request, retryPolicy, e);
-        return null;
-      }
-      f.completeExceptionally(e);
-      return null;
-    });
-  }
-
-  private void scheduleWithTimeout(PendingOrderedRequest pending,
-      RaftClientRequest request, RetryPolicy retryPolicy, Throwable e) {
-    final int attempt = pending.getAttemptCount();
-    final ClientRetryEvent event = new ClientRetryEvent(request, e, pending);
-    final TimeDuration sleepTime = client.getEffectiveSleepTime(e,
-        retryPolicy.handleAttemptFailure(event).getSleepTime());
-    LOG.debug("schedule* attempt #{} with sleep {} and policy {} for {}", attempt, sleepTime, retryPolicy, request);
-    scheduleWithTimeout(pending, sleepTime, getSlidingWindow(request));
-  }
-
-  private void scheduleWithTimeout(PendingOrderedRequest pending, TimeDuration sleepTime,
-      SlidingWindow.Client<PendingOrderedRequest, RaftClientReply> slidingWindow) {
-    client.getScheduler().onTimeout(sleepTime,
-        () -> slidingWindow.retry(pending, this::sendRequestWithRetry),
-        LOG, () -> "Failed* to retry " + pending);
-  }
-
-  private CompletableFuture<RaftClientReply> sendRequest(PendingOrderedRequest pending) {
-    final RetryPolicy retryPolicy = client.getRetryPolicy();
-    final RaftClientRequest request;
     if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) {
       pending.setFirstRequest();
     }
-    request = pending.newRequest();
     LOG.debug("{}: send* {}", client.getId(), request);
-    return client.getClientRpc().sendRequestAsync(request).thenApply(reply -> {
+    client.getClientRpc().sendRequestAsync(request).thenAccept(reply -> {
       LOG.debug("{}: receive* {}", client.getId(), reply);
       Objects.requireNonNull(reply, "reply == null");
       client.handleReply(request, reply);
       getSlidingWindow(request).receiveReply(
           request.getSlidingWindowEntry().getSeqNum(), reply, this::sendRequestWithRetry);
-      return reply;
     }).exceptionally(e -> {
       LOG.error(client.getId() + ": Failed* " + request, e);
-      e = JavaUtils.unwrapCompletionException(e);
-      if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
-        pending.incrementExceptionCount(e);
-        final ClientRetryEvent event = new ClientRetryEvent(request, e, pending);
-        if (!retryPolicy.handleAttemptFailure(event).shouldRetry()) {
-          handleAsyncRetryFailure(event);
-        } else {
-          if (e instanceof NotLeaderException) {
-            NotLeaderException nle = (NotLeaderException)e;
-            client.handleNotLeaderException(request, nle, this::resetSlidingWindow);
-          } else {
-            client.handleIOException(request, (IOException) e, null, this::resetSlidingWindow);
-          }
-        }
-        throw new CompletionException(e);
-      }
-      failAllAsyncRequests(request, e);
+      handleException(pending, request, e);
       return null;
     });
   }
 
+  private void handleException(PendingOrderedRequest pending, RaftClientRequest request, Throwable e) {
+    final RetryPolicy retryPolicy = client.getRetryPolicy();
+    if (client.isClosed()) {
+      failAllAsyncRequests(request, new AlreadyClosedException(client + " is closed."));
+      return;
+    }
+
+    e = JavaUtils.unwrapCompletionException(e);
+    if (!(e instanceof IOException) || e instanceof GroupMismatchException) {
+      // non-retryable exceptions
+      failAllAsyncRequests(request, e);
+      return;
+    }
+
+    final ClientRetryEvent event = pending.newClientRetryEvent(request, e);
+    final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
+    if (!action.shouldRetry()) {
+      failAllAsyncRequests(request, client.noMoreRetries(event));
+      return;
+    }
+
+    if (e instanceof NotLeaderException) {
+      client.handleNotLeaderException(request, (NotLeaderException) e, this::resetSlidingWindow);
+    } else {
+      client.handleIOException(request, (IOException) e, null, this::resetSlidingWindow);
+    }
+    final TimeDuration sleepTime = client.getEffectiveSleepTime(e, action.getSleepTime());
+    LOG.debug("schedule* retry with sleep {} for attempt #{} of {}, {}",
+        sleepTime, event.getAttemptCount(), request, retryPolicy);
+    final SlidingWindow.Client<PendingOrderedRequest, RaftClientReply> slidingWindow = getSlidingWindow(request);
+    client.getScheduler().onTimeout(sleepTime,
+        () -> slidingWindow.retry(pending, this::sendRequestWithRetry),
+        LOG, () -> "Failed* to retry " + pending);
+  }
+
   void assertRequestSemaphore(int expectedAvailablePermits, int expectedQueueLength) {
     Preconditions.assertSame(expectedAvailablePermits, requestSemaphore.availablePermits(), "availablePermits");
     Preconditions.assertSame(expectedQueueLength, requestSemaphore.getQueueLength(), "queueLength");
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index f423919..1b82709 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -44,11 +44,13 @@
 import org.apache.ratis.thirdparty.com.google.common.cache.Cache;
 import org.apache.ratis.thirdparty.com.google.common.cache.CacheBuilder;
 import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutExecutor;
+import org.apache.ratis.util.Timestamp;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -65,6 +67,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -79,10 +82,10 @@
       .build();
 
   public abstract static class PendingClientRequest {
-    private final long creationTimeInMs = System.currentTimeMillis();
+    private final Timestamp creationTime = Timestamp.currentTime();
     private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
     private final AtomicInteger attemptCount = new AtomicInteger();
-    private final Map<Class<?>, Integer> exceptionCount = new ConcurrentHashMap<>();
+    private final Map<Class<?>, Integer> exceptionCounts = new ConcurrentHashMap<>();
 
     public abstract RaftClientRequest newRequestImpl();
 
@@ -101,19 +104,10 @@
       return attemptCount.get();
     }
 
-    int incrementExceptionCount(Throwable t) {
-      return t != null ? exceptionCount.compute(t.getClass(), (k, v) -> v != null ? v + 1 : 1) : 0;
-    }
-
-    public int getExceptionCount(Throwable t) {
-      return t != null ? Optional.ofNullable(exceptionCount.get(t.getClass())).orElse(0) : 0;
-    }
-
-    public boolean isRequestTimeout(TimeDuration timeout) {
-      if (timeout == null) {
-        return false;
-      }
-      return System.currentTimeMillis() - creationTimeInMs > timeout.toLong(TimeUnit.MILLISECONDS);
+    public ClientRetryEvent newClientRetryEvent(RaftClientRequest request, Throwable throwable) {
+      final int exceptionCount = throwable == null? 0
+          : exceptionCounts.compute(throwable.getClass(), (k, v) -> v == null? 1: v+1);
+      return new ClientRetryEvent(getAttemptCount(), request, exceptionCount, throwable, creationTime);
     }
   }
 
@@ -196,6 +190,8 @@
   private final ConcurrentMap<RaftPeerId, LeaderElectionManagementApi>
       leaderElectionManagement = new ConcurrentHashMap<>();
 
+  private final AtomicBoolean closed = new AtomicBoolean();
+
   @SuppressWarnings("checkstyle:ParameterNumber")
   RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId, RaftPeer primaryDataStreamServer,
       RaftClientRpc clientRpc, RetryPolicy retryPolicy, RaftProperties properties, Parameters parameters) {
@@ -346,11 +342,11 @@
     return dataStreamApi.get();
   }
 
-  Throwable noMoreRetries(ClientRetryEvent event) {
+  IOException noMoreRetries(ClientRetryEvent event) {
     final int attemptCount = event.getAttemptCount();
     final Throwable throwable = event.getCause();
     if (attemptCount == 1 && throwable != null) {
-      return throwable;
+      return IOUtils.asIOException(throwable);
     }
     return new RaftRetryFailureException(event.getRequest(), attemptCount, retryPolicy, throwable);
   }
@@ -418,8 +414,7 @@
 
   void handleIOException(RaftClientRequest request, IOException ioe,
       RaftPeerId newLeader, Consumer<RaftClientRequest> handler) {
-    LOG.debug("{}: suggested new leader: {}. Failed {} with {}",
-        clientId, newLeader, request, ioe);
+    LOG.debug("{}: suggested new leader: {}. Failed {}", clientId, newLeader, request, ioe);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Stack trace", new Throwable("TRACE"));
     }
@@ -456,8 +451,17 @@
     return clientRpc;
   }
 
+  boolean isClosed() {
+    return closed.get();
+  }
+
   @Override
   public void close() throws IOException {
+    if (!closed.compareAndSet(false, true)) {
+      return;
+    }
+
+    LOG.debug("close {}", getId());
     clientRpc.close();
     if (dataStreamApi.isInitialized()) {
       dataStreamApi.get().close();
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 84b817b..eccda4d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -22,6 +22,7 @@
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -89,11 +90,14 @@
         }
 
         final Throwable cause = replyException != null ? replyException : e;
-        pending.incrementExceptionCount(cause);
-        final ClientRetryEvent event = new ClientRetryEvent(request, cause, pending);
+        if (client.isClosed()) {
+          f.completeExceptionally(new AlreadyClosedException(client + " is closed"));
+          return;
+        }
+
+        final ClientRetryEvent event = pending.newClientRetryEvent(request, cause);
         RetryPolicy retryPolicy = client.getRetryPolicy();
         final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
-        TimeDuration sleepTime = client.getEffectiveSleepTime(cause, action.getSleepTime());
         if (!action.shouldRetry()) {
           f.completeExceptionally(client.noMoreRetries(event));
           return;
@@ -124,7 +128,9 @@
           }
         }
 
-        LOG.debug("schedule retry for attempt #{}, policy={}, request={}", attemptCount, retryPolicy, request);
+        final TimeDuration sleepTime = client.getEffectiveSleepTime(cause, action.getSleepTime());
+        LOG.debug("schedule~ attempt #{} with sleep {} and policy {} for {}",
+            attemptCount, sleepTime, retryPolicy, request);
         client.getScheduler().onTimeout(sleepTime,
             () -> sendRequestWithRetry(pending, client), LOG, () -> clientId + ": Failed~ to retry " + request);
       } catch (Exception ex) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java b/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java
index f0c38ef..c6a8beb 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/retry/ClientRetryEvent.java
@@ -17,12 +17,11 @@
  */
 package org.apache.ratis.client.retry;
 
-import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.Timestamp;
 
 /** An {@link RetryPolicy.Event} specific to client request failure. */
 public class ClientRetryEvent implements RetryPolicy.Event {
@@ -30,23 +29,15 @@
   private final int causeCount;
   private final RaftClientRequest request;
   private final Throwable cause;
-  private PendingClientRequest pending;
+  private final Timestamp pendingRequestCreationTime;
 
-  @VisibleForTesting
-  public ClientRetryEvent(int attemptCount, RaftClientRequest request, Throwable cause) {
-    this(attemptCount, request, attemptCount, cause);
-  }
-
-  public ClientRetryEvent(RaftClientRequest request, Throwable t, PendingClientRequest pending) {
-    this(pending.getAttemptCount(), request, pending.getExceptionCount(t), t);
-    this.pending = pending;
-  }
-
-  private ClientRetryEvent(int attemptCount, RaftClientRequest request, int causeCount, Throwable cause) {
+  public ClientRetryEvent(int attemptCount, RaftClientRequest request, int causeCount, Throwable cause,
+      Timestamp pendingRequestCreationTime) {
     this.attemptCount = attemptCount;
     this.causeCount = causeCount;
     this.request = request;
     this.cause = cause;
+    this.pendingRequestCreationTime = pendingRequestCreationTime;
   }
 
   @Override
@@ -69,7 +60,7 @@
   }
 
   boolean isRequestTimeout(TimeDuration timeout) {
-    return pending != null && pending.isRequestTimeout(timeout);
+    return timeout != null && pendingRequestCreationTime.elapsedTime().compareTo(timeout) >= 0;
   }
 
   @Override
@@ -77,6 +68,7 @@
     return JavaUtils.getClassSimpleName(getClass())
         + ":attempt=" + attemptCount
         + ",request=" + request
-        + ",cause=" + cause;
+        + ",cause=" + cause
+        + ",causeCount=" + causeCount;
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 105ecbf..0ce0595 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -36,6 +36,7 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /** A map from peer id to peer and its proxy. */
 public class PeerProxyMap<PROXY extends Closeable> implements RaftPeer.Add, Closeable {
@@ -65,7 +66,7 @@
               throw new AlreadyClosedException(name + " is already " + current);
             }
             lifeCycle.startAndTransition(
-                () -> proxy = createProxy.apply(peer), IOException.class);
+                () -> proxy = createProxyImpl(peer), IOException.class);
           }
         }
       }
@@ -92,6 +93,7 @@
   private final Object resetLock = new Object();
 
   private final CheckedFunction<RaftPeer, PROXY, IOException> createProxy;
+  private final AtomicBoolean closed = new AtomicBoolean();
 
   public PeerProxyMap(String name, CheckedFunction<RaftPeer, PROXY, IOException> createProxy) {
     this.name = name;
@@ -102,6 +104,13 @@
     return name;
   }
 
+  private PROXY createProxyImpl(RaftPeer peer) throws IOException {
+    if (closed.get()) {
+      throw new AlreadyClosedException(name + ": Failed to create proxy for " + peer);
+    }
+    return createProxy.apply(peer);
+  }
+
   public PROXY getProxy(RaftPeerId id) throws IOException {
     Objects.requireNonNull(id, "id == null");
     PeerAndProxy p = peers.get(id);
@@ -161,6 +170,10 @@
 
   @Override
   public void close() {
+    if (!closed.compareAndSet(false, true)) {
+      return;
+    }
+
     final List<IOException> exceptions = Collections.synchronizedList(new ArrayList<>());
     ConcurrentUtils.parallelForEachAsync(peers.values(),
         pp -> pp.setNullProxyAndClose().map(proxy -> closeProxy(proxy, pp)).ifPresent(exceptions::add),
@@ -180,7 +193,7 @@
 
   private IOException closeProxy(PROXY proxy, PeerAndProxy pp) {
     try {
-      LOG.debug("{}: Closing proxy for peer {}", name, pp);
+      LOG.debug("{}: Closing proxy {} {} for peer {}", name, proxy.getClass().getSimpleName(), proxy, pp);
       proxy.close();
       return null;
     } catch (IOException e) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 260f601..71c5c5e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -372,7 +372,7 @@
   @Test
   public void testStateMachineMetrics() throws Exception {
     runWithNewCluster(NUM_SERVERS, cluster ->
-        RaftBasicTests.testStateMachineMetrics(true, cluster, LOG));
+        RaftBasicTests.runTestStateMachineMetrics(true, cluster));
   }
 
   @Test
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 47c9b0e..4ff9681 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -168,12 +168,10 @@
 
     final List<RaftServer.Division> divisions = cluster.getServerAliveStream().collect(Collectors.toList());
     for(RaftServer.Division impl: divisions) {
-        JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term, messages),
-            50, TimeDuration.valueOf(1, TimeUnit.SECONDS), impl.getId() + " assertLogEntries", LOG);
+      RaftTestUtil.assertLogEntries(impl, term, messages, 50, LOG);
     }
   }
 
-
   @Test
   public void testOldLeaderCommit() throws Exception {
     runWithNewCluster(NUM_SERVERS, this::runTestOldLeaderCommit);
@@ -218,7 +216,7 @@
 
     cluster.getServerAliveStream()
         .map(RaftServer.Division::getRaftLog)
-        .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages));
+        .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages, System.out::println));
   }
 
   @Test
@@ -453,8 +451,12 @@
     }
   }
 
-  public static void testStateMachineMetrics(boolean async,
-      MiniRaftCluster cluster, Logger LOG) throws Exception {
+  @Test
+  public void testStateMachineMetrics() throws Exception {
+    runWithNewCluster(NUM_SERVERS, cluster -> runTestStateMachineMetrics(false, cluster));
+  }
+
+  static void runTestStateMachineMetrics(boolean async, MiniRaftCluster cluster) throws Exception {
     RaftServer.Division leader = waitForLeader(cluster);
     try (final RaftClient client = cluster.createClient()) {
       Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader,
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index fa41887..41a4311 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -249,19 +249,16 @@
     }
   }
 
-  static void assertLogEntries(RaftServer.Division server, long expectedTerm, SimpleMessage... expectedMessages) {
-    LOG.info("checking raft log for {}", server.getMemberId());
-    final RaftLog log = server.getRaftLog();
-    try {
-      RaftTestUtil.assertLogEntries(log, expectedTerm, expectedMessages);
-    } catch (AssertionError e) {
-      LOG.error("Unexpected raft log in {}", server.getMemberId(), e);
-      throw e;
-    }
+  static void assertLogEntries(RaftServer.Division server, long expectedTerm, SimpleMessage[] expectedMessages,
+      int numAttempts, Logger log) throws Exception {
+    final String name = server.getId() + " assertLogEntries";
+    final Function<Integer, Consumer<String>> print = i -> i < numAttempts? s -> {}: System.out::println;
+    JavaUtils.attempt(i -> assertLogEntries(server.getRaftLog(), expectedTerm, expectedMessages, print.apply(i)),
+        numAttempts, TimeDuration.ONE_SECOND, () -> name, log);
   }
 
   static Iterable<LogEntryProto> getLogEntryProtos(RaftLog log) {
-    return CollectionUtils.as(log.getEntries(0, Long.MAX_VALUE), ti -> {
+    return CollectionUtils.as(log.getEntries(0, log.getLastEntryTermIndex().getIndex() + 1), ti -> {
       try {
         return log.get(ti.getIndex());
       } catch (IOException exception) {
@@ -270,17 +267,17 @@
     });
   }
 
-  static List<LogEntryProto> getStateMachineLogEntries(RaftLog log) {
+  static List<LogEntryProto> getStateMachineLogEntries(RaftLog log, Consumer<String> print) {
     final List<LogEntryProto> entries = new ArrayList<>();
     for (LogEntryProto e : getLogEntryProtos(log)) {
       final String s = LogProtoUtils.toLogEntryString(e);
       if (e.hasStateMachineLogEntry()) {
-        LOG.info(s + ", " + e.getStateMachineLogEntry().toString().trim().replace("\n", ", "));
+        print.accept(entries.size() + ") " + s);
         entries.add(e);
       } else if (e.hasConfigurationEntry()) {
-        LOG.info("Found {}, ignoring it.", s);
+        print.accept("Ignoring " + s);
       } else if (e.hasMetadataEntry()) {
-        LOG.info("Found {}, ignoring it.", s);
+        print.accept("Ignoring " + s);
       } else {
         throw new AssertionError("Unexpected LogEntryBodyCase " + e.getLogEntryBodyCase() + " at " + s);
       }
@@ -288,13 +285,14 @@
     return entries;
   }
 
-  static void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage... expectedMessages) {
-    final List<LogEntryProto> entries = getStateMachineLogEntries(log);
+  static Void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage[] expectedMessages, Consumer<String> print) {
+    final List<LogEntryProto> entries = getStateMachineLogEntries(log, print);
     try {
       assertLogEntries(entries, expectedTerm, expectedMessages);
     } catch(Exception t) {
       throw new AssertionError("entries: " + entries, t);
     }
+    return null;
   }
 
   static void assertLogEntries(List<LogEntryProto> entries, long expectedTerm, SimpleMessage... expectedMessages) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java b/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java
new file mode 100644
index 0000000..80c5774
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/RaftLogTruncateTests.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.client.impl.OrderedAsync;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
+import org.apache.ratis.util.Slf4jUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.event.Level;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+public abstract class RaftLogTruncateTests<CLUSTER extends MiniRaftCluster> extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+  public static final int NUM_SERVERS = 5;
+  final TimeDuration MIN_TIMEOUT = TimeDuration.valueOf(3, TimeUnit.SECONDS);
+
+  static SimpleMessage[] arraycopy(SimpleMessage[] src1, SimpleMessage[] src2) {
+    final SimpleMessage[] dst = new SimpleMessage[src1.length + src2.length];
+    System.arraycopy(src1, 0, dst, 0, src1.length);
+    System.arraycopy(src2, 0, dst, src1.length, src2.length);
+    return dst;
+  }
+
+  {
+    Slf4jUtils.setLogLevel(OrderedAsync.LOG, Level.ERROR);
+    Slf4jUtils.setLogLevel(RaftServerConfigKeys.LOG, Level.ERROR);
+    Slf4jUtils.setLogLevel(RaftClientConfigKeys.LOG, Level.ERROR);
+
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
+
+    // set a long rpc timeout so, when the leader does not have the majority, it won't step down fast.
+    RaftServerConfigKeys.Rpc.setTimeoutMin(p, MIN_TIMEOUT);
+    RaftServerConfigKeys.Rpc.setTimeoutMax(p, MIN_TIMEOUT.multiply(2));
+    RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMin(p, TimeDuration.ONE_SECOND);
+    RaftServerConfigKeys.Rpc.setFirstElectionTimeoutMax(p, TimeDuration.ONE_SECOND.multiply(2));
+  }
+
+  @Override
+  public int getGlobalTimeoutSeconds() {
+    return 200;
+  }
+
+  @Test
+  public void testLogTruncate() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestLogTruncate);
+  }
+
+  void runTestLogTruncate(MiniRaftCluster cluster) throws Exception {
+    final RaftServer.Division oldLeader = waitForLeader(cluster);
+    final List<RaftServer.Division> oldFollowers = cluster.getFollowers();
+    final List<RaftPeerId> killedPeers = new ArrayList<>();
+    final List<RaftPeerId> remainingPeers = new ArrayList<>();
+
+    final int majorityIndex = NUM_SERVERS / 2 + 1;
+    Assert.assertEquals(NUM_SERVERS - 1, oldFollowers.size());
+    Assert.assertTrue(majorityIndex < oldFollowers.size());
+
+    for (int i = 0; i < majorityIndex; i++) {
+      killedPeers.add(oldFollowers.get(i).getId());
+    }
+    remainingPeers.add(oldLeader.getId());
+    for (int i = majorityIndex; i < oldFollowers.size(); i++) {
+      remainingPeers.add(oldFollowers.get(i).getId());
+    }
+
+    try {
+      runTestLogTruncate(cluster, oldLeader, killedPeers, remainingPeers);
+    } catch (Throwable e) {
+      LOG.info("killedPeers   : {}", killedPeers);
+      LOG.info("remainingPeers: {}", remainingPeers);
+      throw e;
+    }
+  }
+
+  void runTestLogTruncate(MiniRaftCluster cluster, RaftServer.Division oldLeader,
+      List<RaftPeerId> killedPeers, List<RaftPeerId> remainingPeers) throws Exception {
+    final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<>());
+    final long oldLeaderTerm = oldLeader.getInfo().getCurrentTerm();
+    LOG.info("oldLeader: {}, term={}", oldLeader.getId(), oldLeaderTerm);
+
+    final SimpleMessage[] firstBatch = SimpleMessage.create(5, "first");
+    final SimpleMessage[] secondBatch = SimpleMessage.create(4, "second");
+
+    try (final RaftClient client = cluster.createClient(oldLeader.getId())) {
+      // send some messages
+      for (SimpleMessage batch : firstBatch) {
+        final RaftClientReply reply = client.io().send(batch);
+        Assert.assertTrue(reply.isSuccess());
+      }
+      for (RaftServer.Division f : cluster.getFollowers()) {
+        assertLogEntries(f, oldLeaderTerm, firstBatch);
+      }
+
+      // kill a majority of followers
+      LOG.info("Before killServer {}: {}", killedPeers, cluster.printServers());
+      for (RaftPeerId f : killedPeers) {
+        cluster.killServer(f);
+      }
+      LOG.info("After killServer {}: {}", killedPeers, cluster.printServers());
+
+      // send more messages, but they won't be committed due to not enough followers
+      final SimpleMessage[] messagesToBeTruncated = SimpleMessage.create(3, "messagesToBeTruncated");
+      final AtomicBoolean done = new AtomicBoolean();
+      for (SimpleMessage message : messagesToBeTruncated) {
+        client.async().send(message).whenComplete((r, e) -> {
+          if (!done.get()) {
+            exceptions.add(new IllegalStateException(message + " is completed: reply=" + r, e));
+          }
+        });
+      }
+
+      // check log messages
+      final SimpleMessage[] expectedMessages = arraycopy(firstBatch, messagesToBeTruncated);
+      for (RaftPeerId f : remainingPeers) {
+        assertLogEntries(cluster.getDivision(f), oldLeaderTerm, expectedMessages);
+      }
+      done.set(true);
+      LOG.info("done");
+    }
+
+    // kill the remaining servers
+    LOG.info("Before killServer {}: {}", remainingPeers, cluster.printServers());
+    for (RaftPeerId f : remainingPeers) {
+      cluster.killServer(f);
+    }
+    LOG.info("After killServer {}: {}", remainingPeers, cluster.printServers());
+
+    // restart the earlier followers
+    for (RaftPeerId f : killedPeers) {
+      cluster.restartServer(f, false);
+    }
+
+    // The new leader should be one of the earlier followers
+    final RaftServer.Division newLeader = waitForLeader(cluster);
+    LOG.info("After restartServer {}: {}", killedPeers, cluster.printServers());
+    final long newLeaderTerm = newLeader.getInfo().getCurrentTerm();
+
+    final SegmentedRaftLog newLeaderLog = (SegmentedRaftLog) newLeader.getRaftLog();
+    LOG.info("newLeader: {}, term {}, last={}", newLeader.getId(), newLeaderTerm,
+        newLeaderLog.getLastEntryTermIndex());
+    Assert.assertTrue(killedPeers.contains(newLeader.getId()));
+
+    // restart the remaining servers
+    for (RaftPeerId f : remainingPeers) {
+      cluster.restartServer(f, false);
+    }
+
+    // check RaftLog truncate
+    for (RaftPeerId f : remainingPeers) {
+      assertLogEntries(cluster.getDivision(f), oldLeaderTerm, firstBatch);
+    }
+
+    try (final RaftClient client = cluster.createClient(newLeader.getId())) {
+      // send more messages
+      for (SimpleMessage batch : secondBatch) {
+        final RaftClientReply reply = client.io().send(batch);
+        Assert.assertTrue(reply.isSuccess());
+      }
+    }
+
+    // check log messages -- it should be truncated and then append the new messages
+    final SimpleMessage[] expectedMessages = arraycopy(firstBatch, secondBatch);
+    for (RaftPeerId f : killedPeers) {
+      assertLogEntries(cluster.getDivision(f), oldLeaderTerm, expectedMessages);
+    }
+
+    if (!exceptions.isEmpty()) {
+      LOG.info("{} exceptions", exceptions.size());
+      for(int i = 0 ; i < exceptions.size(); i++) {
+        LOG.info("exception {})", i, exceptions.get(i));
+      }
+      Assert.fail();
+    }
+  }
+
+  private void assertLogEntries(RaftServer.Division server, long term, SimpleMessage[] expectedMessages)
+      throws Exception {
+    RaftTestUtil.assertLogEntries(server, term, expectedMessages, 30, LOG);
+  }
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftLogTruncateWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftLogTruncateWithGrpc.java
new file mode 100644
index 0000000..dc28463
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftLogTruncateWithGrpc.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.RaftLogTruncateTests;
+
+public class TestRaftLogTruncateWithGrpc extends RaftLogTruncateTests<MiniRaftClusterWithGrpc>
+    implements MiniRaftClusterWithGrpc.FactoryGet {
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index bc0061f..046453d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -74,12 +74,6 @@
   }
 
   @Test
-  public void testStateMachineMetrics() throws Exception {
-    runWithNewCluster(NUM_SERVERS, cluster ->
-        testStateMachineMetrics(false, cluster, LOG));
-  }
-
-  @Test
   public void testUpdateViaHeartbeat() throws Exception {
     runWithNewCluster(NUM_SERVERS, this::runTestUpdateViaHeartbeat);
   }
diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
index 264db89..36e6dfb 100644
--- a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
+++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
@@ -154,7 +154,7 @@
       long sleepTime) {
     for (int i = 0; i < retries + 1; i++) {
       RetryPolicy.Action action = exceptionDependentRetry
-          .handleAttemptFailure(new ClientRetryEvent(i, null, exception));
+          .handleAttemptFailure(TestRetryPolicy.newClientRetryEvent(i, null, exception));
 
       final boolean expected = i < retries && i < maxAttempts;
       Assert.assertEquals(expected, action.shouldRetry());
diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
index d69cd1a..1b9536b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
+++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestRetryPolicy.java
@@ -33,6 +33,7 @@
 import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.Timestamp;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -70,6 +71,10 @@
     }
   }
 
+  static ClientRetryEvent newClientRetryEvent(int attemptCount, RaftClientRequest request, Throwable cause) {
+    return new ClientRetryEvent(attemptCount, request, attemptCount, cause, Timestamp.currentTime());
+  }
+
   @Test
   public void testRequestTypeDependentRetry() {
     final RequestTypeDependentRetryPolicy.Builder b = RequestTypeDependentRetryPolicy.newBuilder();
@@ -88,7 +93,7 @@
         RaftClientRequest.watchRequestType(1, ReplicationLevel.MAJORITY));
     for(int i = 1; i < 2*n; i++) {
       { //write
-        final ClientRetryEvent event = new ClientRetryEvent(i, writeRequest, null);
+        final ClientRetryEvent event = newClientRetryEvent(i, writeRequest, null);
         final RetryPolicy.Action action = policy.handleAttemptFailure(event);
 
         final boolean expected = i < n;
@@ -101,21 +106,21 @@
       }
 
       { //read and stale read are using default
-        final ClientRetryEvent event = new ClientRetryEvent(i, readRequest, null);
+        final ClientRetryEvent event = newClientRetryEvent(i, readRequest, null);
         final RetryPolicy.Action action = policy.handleAttemptFailure(event);
         Assert.assertTrue(action.shouldRetry());
         Assert.assertEquals(0L, action.getSleepTime().getDuration());
       }
 
       {
-        final ClientRetryEvent event = new ClientRetryEvent(i, staleReadRequest, null);
+        final ClientRetryEvent event = newClientRetryEvent(i, staleReadRequest, null);
         final RetryPolicy.Action action = policy.handleAttemptFailure(event);
         Assert.assertTrue(action.shouldRetry());
         Assert.assertEquals(0L, action.getSleepTime().getDuration());
       }
 
       { //watch has no retry
-        final ClientRetryEvent event = new ClientRetryEvent(i, watchRequest, null);
+        final ClientRetryEvent event = newClientRetryEvent(i, watchRequest, null);
         final RetryPolicy.Action action = policy.handleAttemptFailure(event);
         Assert.assertFalse(action.shouldRetry());
         Assert.assertEquals(0L, action.getSleepTime().getDuration());
@@ -148,7 +153,7 @@
     };
 
     for (RaftClientRequest request : requests) {
-      final ClientRetryEvent event = new ClientRetryEvent(request, new Exception(), pending);
+      final ClientRetryEvent event = pending.newClientRetryEvent(request, new Exception());
       final RetryPolicy.Action action = policy.handleAttemptFailure(event);
       Assert.assertTrue(action.shouldRetry());
       Assert.assertEquals(0L, action.getSleepTime().getDuration());
@@ -156,7 +161,7 @@
 
     timeout.sleep();
     for (RaftClientRequest request : requests) {
-      final ClientRetryEvent event = new ClientRetryEvent(request, new Exception(), pending);
+      final ClientRetryEvent event = pending.newClientRetryEvent(request, new Exception());
       final RetryPolicy.Action action = policy.handleAttemptFailure(event);
       Assert.assertFalse(action.shouldRetry());
     }
@@ -218,8 +223,7 @@
    */
   private void checkEvent(int exceptionAttemptCount, RetryPolicy retryPolicy, RaftClientRequest raftClientRequest,
       Throwable exception, Pair exceptionPolicyPair) {
-    final ClientRetryEvent event =
-        new ClientRetryEvent(exceptionAttemptCount, raftClientRequest, exception);
+    final ClientRetryEvent event = newClientRetryEvent(exceptionAttemptCount, raftClientRequest, exception);
     final RetryPolicy.Action action = retryPolicy.handleAttemptFailure(event);
 
     final boolean expected = exceptionAttemptCount < exceptionPolicyPair.retries;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index 73ff1eb..2f3edf7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -261,7 +261,7 @@
     final RaftPeerId leaderId = leader.getId();
     ids.add(leaderId);
 
-    RaftTestUtil.getStateMachineLogEntries(leaderLog);
+    RaftTestUtil.getStateMachineLogEntries(leaderLog, LOG::info);
 
     // check that the last metadata entry is written to the log
     JavaUtils.attempt(() -> assertLastLogEntry(leader), 20, HUNDRED_MILLIS, "leader last metadata entry", LOG);