RATIS-235. RaftTestUtil.assertLogEntries should check index for async tests.
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index 1c1a754..cb18caa 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -123,7 +123,7 @@
         LOG.trace("{}: wait {}ms", this, waitTimeMs);
         wait(waitTimeMs);
       } catch(InterruptedException ie) {
-        LOG.warn("Wait interrupted", ie);
+        LOG.warn(this + ": Wait interrupted by " + ie);
       }
     }
   }
@@ -242,7 +242,7 @@
         LOG.info("{} is stopped", GRpcLogAppender.this);
         return;
       }
-      RaftGrpcUtil.warn(LOG, () -> server.getId() + ": Failed appendEntries to " + follower.getPeer().getId(), t);
+      RaftGrpcUtil.warn(LOG, () -> server.getId() + ": Failed appendEntries to " + follower.getPeer(), t);
 
       long callId = RaftGrpcUtil.getCallId(t);
       resetClient(pendingRequests.get(callId));
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index e6a3b57..30fcd91 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -61,9 +61,8 @@
   }
 
   @Test
-  public void testRequestTimeout()
-      throws IOException, InterruptedException, ExecutionException {
-    testRequestTimeout(false, getCluster(), LOG, getProperties());
+  public void testRequestTimeout() throws Exception {
+    testRequestTimeout(false, getCluster(), LOG);
   }
 
   @Test
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 5c5749f..2643073 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -153,6 +153,10 @@
     ExitUtils.disableSystemExit();
   }
 
+  public RaftProperties getProperties() {
+    return properties;
+  }
+
   public MiniRaftCluster initServers() {
     if (servers.isEmpty()) {
       putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), true);
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index db466df..d8909de 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -57,22 +57,20 @@
     LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
 
-  private RaftProperties properties;
-
   public static final int NUM_SERVERS = 3;
 
   @Before
   public void setup() {
-    properties = new RaftProperties();
-    properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+    getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
         SimpleStateMachine4Testing.class, StateMachine.class);
     TimeDuration retryCacheExpiryDuration = TimeDuration.valueOf(5, TimeUnit.SECONDS);
-    RaftServerConfigKeys.RetryCache.setExpiryTime(properties, retryCacheExpiryDuration);
+    RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), retryCacheExpiryDuration);
   }
 
   @Test
   public void testAsyncConfiguration() throws IOException {
     LOG.info("Running testAsyncConfiguration");
+    final RaftProperties properties = new RaftProperties();
     RaftClient.Builder clientBuilder = RaftClient.newBuilder()
         .setRaftGroup(RaftGroup.emptyGroup())
         .setProperties(properties);
@@ -91,24 +89,17 @@
       RaftClientTestUtil.assertScheduler(client, numThreads);
       RaftClientTestUtil.assertAsyncRequestSemaphore(client, maxOutstandingRequests, 0);
     }
-
-    // reset to default for other tests.
-    RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties,
-        RaftClientConfigKeys.Async.MAX_OUTSTANDING_REQUESTS_DEFAULT);
-    RaftClientConfigKeys.Async.setSchedulerThreads(properties,
-        RaftClientConfigKeys.Async.SCHEDULER_THREADS_DEFAULT);
   }
 
   @Test
-  public void testAsyncRequestSemaphore()
-      throws InterruptedException, IOException {
+  public void testAsyncRequestSemaphore() throws Exception {
     LOG.info("Running testAsyncRequestSemaphore");
-    CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+    final CLUSTER cluster = newCluster(NUM_SERVERS);
     Assert.assertNull(cluster.getLeader());
     cluster.start();
     waitForLeader(cluster);
 
-    int numMessages = RaftClientConfigKeys.Async.maxOutstandingRequests(properties);
+    int numMessages = RaftClientConfigKeys.Async.maxOutstandingRequests(getProperties());
     CompletableFuture[] futures = new CompletableFuture[numMessages + 1];
     final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMessages);
     final RaftClient client = cluster.createClient();
@@ -153,33 +144,31 @@
     cluster.shutdown();
   }
 
+  void runTestBasicAppendEntriesAsync(ReplicationLevel replication) throws Exception {
+    final CLUSTER cluster = newCluster(NUM_SERVERS);
+    try {
+      cluster.start();
+      waitForLeader(cluster);
+      RaftBasicTests.runTestBasicAppendEntries(true, replication, 1000, cluster, LOG);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
   @Test
   public void testBasicAppendEntriesAsync() throws Exception {
-    LOG.info("Running testBasicAppendEntriesAsync");
-    RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 100);
-    final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
-    cluster.start();
-    waitForLeader(cluster);
-    RaftBasicTests.runTestBasicAppendEntries(true, ReplicationLevel.MAJORITY, 1000, cluster, LOG);
-    cluster.shutdown();
+    runTestBasicAppendEntriesAsync(ReplicationLevel.MAJORITY);
   }
 
   @Test
   public void testBasicAppendEntriesAsyncWithAllReplication() throws Exception {
-    LOG.info("Running testBasicAppendEntriesAsync");
-    RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 100);
-    final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
-    cluster.start();
-    waitForLeader(cluster);
-    RaftBasicTests.runTestBasicAppendEntries(true, ReplicationLevel.ALL, 1000, cluster, LOG);
-    cluster.shutdown();
+    runTestBasicAppendEntriesAsync(ReplicationLevel.ALL);
   }
 
   @Test
   public void testWithLoadAsync() throws Exception {
     LOG.info("Running testWithLoadAsync");
-    RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 100);
-    final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+    final CLUSTER cluster = newCluster(NUM_SERVERS);
     cluster.start();
     waitForLeader(cluster);
     RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG);
@@ -189,7 +178,7 @@
   @Test
   public void testStaleReadAsync() throws Exception {
     final int numMesssages = 10;
-    final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+    final CLUSTER cluster = newCluster(NUM_SERVERS);
 
     try (RaftClient client = cluster.createClient()) {
       cluster.start();
@@ -263,11 +252,10 @@
   }
 
   @Test
-  public void testRequestTimeout()
-      throws IOException, InterruptedException, ExecutionException {
-    final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+  public void testRequestTimeout() throws Exception {
+    final CLUSTER cluster = newCluster(NUM_SERVERS);
     cluster.start();
-    RaftBasicTests.testRequestTimeout(true, cluster, LOG, properties);
+    RaftBasicTests.testRequestTimeout(true, cluster, LOG);
     cluster.shutdown();
   }
 
@@ -275,9 +263,9 @@
   public void testAppendEntriesTimeout()
       throws IOException, InterruptedException, ExecutionException {
     LOG.info("Running testAppendEntriesTimeout");
-    TimeDuration retryCacheExpiryDuration = TimeDuration.valueOf(20, TimeUnit.SECONDS);
-    RaftServerConfigKeys.RetryCache.setExpiryTime(properties, retryCacheExpiryDuration);
-    final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+    final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
+    RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(20, TimeUnit.SECONDS));
+    final CLUSTER cluster = newCluster(NUM_SERVERS);
     cluster.start();
     waitForLeader(cluster);
     long time = System.currentTimeMillis();
@@ -309,5 +297,8 @@
       Assert.assertTrue(System.currentTimeMillis() - time > waitTime);
     }
     cluster.shutdown();
+
+    //reset for the other tests
+    RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), oldExpiryTime);
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 4deeef5..64b3c7e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -18,7 +18,7 @@
 package org.apache.ratis;
 
 import org.apache.log4j.Level;
-import org.apache.ratis.RaftTestUtil.*;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.conf.RaftProperties;
@@ -28,12 +28,16 @@
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
 import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.RetryCacheTestUtil;
 import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.After;
 import org.junit.Assert;
@@ -46,7 +50,6 @@
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -55,12 +58,15 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.ratis.RaftTestUtil.*;
+import static org.apache.ratis.RaftTestUtil.logEntriesContains;
+import static org.apache.ratis.RaftTestUtil.sendMessageInNewThread;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
 import static org.junit.Assert.assertTrue;
 
 public abstract class RaftBasicTests extends BaseTest {
   {
     LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftServerTestUtil.getStateMachineUpdaterLog(), Level.DEBUG);
     LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
     RaftServerConfigKeys.RetryCache.setExpiryTime(properties, TimeDuration
         .valueOf(5, TimeUnit.SECONDS));
@@ -100,30 +106,29 @@
     runTestBasicAppendEntries(false, ReplicationLevel.ALL, 10, getCluster(), LOG);
   }
 
+  static void killAndRestartServer(RaftPeerId id, long killSleepMs, long restartSleepMs, MiniRaftCluster cluster, Logger LOG) {
+    try {
+      Thread.sleep(killSleepMs);
+      cluster.killServer(id);
+      Thread.sleep(restartSleepMs);
+      LOG.info("restart server: " + id);
+      cluster.restartServer(id, false);
+    } catch (Exception e) {
+      ExitUtils.terminate(-1, "Failed to kill/restart server: " + id, e, LOG);
+    }
+  }
+
   static void runTestBasicAppendEntries(
       boolean async, ReplicationLevel replication, int numMessages, MiniRaftCluster cluster, Logger LOG) throws Exception {
-    LOG.info("runTestBasicAppendEntries: async? " + async + ", numMessages=" + numMessages);
+    LOG.info("runTestBasicAppendEntries: async? {}, replication={}, numMessages={}",
+        async, replication, numMessages);
     for (RaftServer s : cluster.getServers()) {
       cluster.restartServer(s.getId(), false);
     }
     RaftServerImpl leader = waitForLeader(cluster);
     final long term = leader.getState().getCurrentTerm();
 
-    final RaftPeerId killed = cluster.getFollowers().get(0).getId();
-    cluster.killServer(killed);
-
-    if (replication == ReplicationLevel.ALL) {
-      new Thread(() -> {
-        try {
-          Thread.sleep(3000);
-          LOG.info("restart server: " + killed.toString());
-          cluster.restartServer(killed, false);
-        } catch (Exception e) {
-          LOG.info("cannot restart server: " + killed.toString());
-          e.printStackTrace();
-        }
-      }).start();
-    }
+    new Thread(() -> killAndRestartServer(cluster.getFollowers().get(0).getId(), 0, 1000, cluster, LOG)).start();
 
     LOG.info(cluster.printServers());
 
@@ -144,7 +149,8 @@
             }
           });
         } else {
-          client.send(message, replication);
+          final RaftClientReply reply = client.send(message, replication);
+          Preconditions.assertTrue(reply.isSuccess());
         }
       }
       if (async) {
@@ -157,8 +163,12 @@
     }
     LOG.info(cluster.printAllLogs());
 
-    cluster.getServerAliveStream().map(s -> s.getState().getLog())
-        .forEach(log -> RaftTestUtil.assertLogEntries(log, async, term, messages));
+    for(RaftServerProxy server : cluster.getServers()) {
+      final RaftServerImpl impl = server.getImpl();
+      if (impl.isAlive() || replication == ReplicationLevel.ALL) {
+        RaftTestUtil.assertLogEntries(impl, term, messages);
+      }
+    }
   }
 
 
@@ -199,7 +209,7 @@
     Assert.assertEquals(followerToSendLog.getId(), newLeaderId);
 
     cluster.getServerAliveStream().map(s -> s.getState().getLog())
-        .forEach(log -> RaftTestUtil.assertLogEntries(log, false, term, messages));
+        .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages));
     LOG.info("terminating testOldLeaderCommit test");
   }
 
@@ -401,8 +411,7 @@
     }
   }
 
-  public static void testRequestTimeout(boolean async, MiniRaftCluster cluster, Logger LOG,
-      RaftProperties properties) throws InterruptedException, IOException, ExecutionException {
+  public static void testRequestTimeout(boolean async, MiniRaftCluster cluster, Logger LOG) throws Exception {
     LOG.info("Running testRequestTimeout");
     waitForLeader(cluster);
     long time = System.currentTimeMillis();
@@ -427,7 +436,7 @@
       // when the retry cache entry is invalidated.
       // The duration for which the client waits should be more than the retryCacheExpiryDuration.
       TimeDuration duration = TimeDuration.valueOf(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS);
-      TimeDuration retryCacheExpiryDuration = RaftServerConfigKeys.RetryCache.expiryTime(properties);
+      TimeDuration retryCacheExpiryDuration = RaftServerConfigKeys.RetryCache.expiryTime(cluster.getProperties());
       Assert.assertTrue(duration.compareTo(retryCacheExpiryDuration) >= 0);
     }
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index c55445a..53a051a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -27,6 +27,7 @@
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
@@ -40,7 +41,11 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
 import java.util.function.BooleanSupplier;
 import java.util.function.IntSupplier;
 import java.util.function.Predicate;
@@ -165,9 +170,18 @@
     }
   }
 
-  static void assertLogEntries(RaftLog log, boolean async, long expectedTerm,
-      SimpleMessage... expectedMessages) {
+  static void assertLogEntries(RaftServerImpl server, long expectedTerm, SimpleMessage... expectedMessages) {
+    LOG.info("checking raft log for " + server.getId());
+    final RaftLog log = server.getState().getLog();
+    try {
+      RaftTestUtil.assertLogEntries(log, expectedTerm, expectedMessages);
+    } catch (AssertionError e) {
+      LOG.error(server.getId() + ": Unexpected raft log", e);
+      throw e;
+    }
+  }
 
+  static void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage... expectedMessages) {
     final TermIndex[] termIndices = log.getEntries(1, Long.MAX_VALUE);
     final List<LogEntryProto> entries = new ArrayList<>(expectedMessages.length);
     for (TermIndex ti : termIndices) {
@@ -179,16 +193,25 @@
       }
 
       if (e.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.SMLOGENTRY) {
+        LOG.info(ServerProtoUtils.toString(e) + ", " + e.getSmLogEntry().toString().trim().replace("\n", ", "));
         entries.add(e);
       } else if (e.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.NOOP) {
         LOG.info("Found " + LogEntryProto.LogEntryBodyCase.NOOP + " at " + ti
             + ", ignoring it.");
       } else {
-        throw new AssertionError(
-            "Unexpected LogEntryBodyCase " + e.getLogEntryBodyCase() + " at " + ti);
+        throw new AssertionError("Unexpected LogEntryBodyCase " + e.getLogEntryBodyCase() + " at " + ti
+            + ": " + ServerProtoUtils.toString(e));
       }
     }
 
+    try {
+      assertLogEntries(entries, expectedTerm, expectedMessages);
+    } catch(Throwable t) {
+      throw new AssertionError("entries: " + entries, t);
+    }
+  }
+
+  static void assertLogEntries(List<LogEntryProto> entries, long expectedTerm, SimpleMessage... expectedMessages) {
     long logIndex = 0;
     Assert.assertEquals(expectedMessages.length, entries.size());
     for (int i = 0; i < expectedMessages.length; i++) {
@@ -197,9 +220,7 @@
       if (e.getTerm() > expectedTerm) {
         expectedTerm = e.getTerm();
       }
-      if (!async) {
-        Assert.assertTrue(e.getIndex() > logIndex);
-      }
+      Assert.assertTrue(e.getIndex() > logIndex);
       logIndex = e.getIndex();
       Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(),
           e.getSmLogEntry().getData().toByteArray());
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 47d8c9d..f84576d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -94,4 +94,8 @@
   public static Stream<LogAppender> getLogAppenders(RaftServerImpl server) {
     return server.getLeaderState().getLogAppenders();
   }
+
+  public static Logger getStateMachineUpdaterLog() {
+    return StateMachineUpdater.LOG;
+  }
 }