RATIS-1994. AsyncApi send() to support optional replication level. (#1006)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
index 483a222..e383281 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -36,9 +36,15 @@
    * For readonly messages, use {@link #sendReadOnly(Message)} instead.
    *
    * @param message The request message.
+   * @param replication The replication level to wait for.
    * @return a future of the reply.
    */
-  CompletableFuture<RaftClientReply> send(Message message);
+  CompletableFuture<RaftClientReply> send(Message message, ReplicationLevel replication);
+
+  /** The same as send(message, ReplicationLevel.MAJORITY). */
+  default CompletableFuture<RaftClientReply> send(Message message) {
+    return send(message, ReplicationLevel.MAJORITY);
+  }
 
   /** The same as sendReadOnly(message, null). */
   default CompletableFuture<RaftClientReply> sendReadOnly(Message message) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
index 2f7069f..8547ce2 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -42,8 +42,8 @@
   }
 
   @Override
-  public CompletableFuture<RaftClientReply> send(Message message) {
-    return send(RaftClientRequest.writeRequestType(), message, null);
+  public CompletableFuture<RaftClientReply> send(Message message, ReplicationLevel replication) {
+    return send(RaftClientRequest.writeRequestType(replication), message, null);
   }
 
   @Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 9d853b4..ed41f1e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -17,23 +17,31 @@
  */
 package org.apache.ratis.protocol;
 
-import org.apache.ratis.proto.RaftProtos.*;
+import org.apache.ratis.proto.RaftProtos.DataStreamRequestTypeProto;
+import org.apache.ratis.proto.RaftProtos.ForwardRequestTypeProto;
+import org.apache.ratis.proto.RaftProtos.MessageStreamRequestTypeProto;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
+import org.apache.ratis.proto.RaftProtos.ReadRequestTypeProto;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
+import org.apache.ratis.proto.RaftProtos.StaleReadRequestTypeProto;
+import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto;
+import org.apache.ratis.proto.RaftProtos.WriteRequestTypeProto;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
 
 import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
-import static org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.*;
-
 /**
  * Request from client to server
  */
 public class RaftClientRequest extends RaftClientMessage {
   private static final Type DATA_STREAM_DEFAULT = new Type(DataStreamRequestTypeProto.getDefaultInstance());
   private static final Type FORWARD_DEFAULT = new Type(ForwardRequestTypeProto.getDefaultInstance());
-  private static final Type WRITE_DEFAULT = new Type(WriteRequestTypeProto.getDefaultInstance());
   private static final Type WATCH_DEFAULT = new Type(
       WatchRequestTypeProto.newBuilder().setIndex(0L).setReplication(ReplicationLevel.MAJORITY).build());
 
@@ -44,8 +52,26 @@
       = new Type(ReadRequestTypeProto.newBuilder().setPreferNonLinearizable(true).build());
   private static final Type STALE_READ_DEFAULT = new Type(StaleReadRequestTypeProto.getDefaultInstance());
 
+  private static final Map<ReplicationLevel, Type> WRITE_REQUEST_TYPES;
+
+  static {
+    final EnumMap<ReplicationLevel, Type> map = new EnumMap<>(ReplicationLevel.class);
+    for(ReplicationLevel replication : ReplicationLevel.values()) {
+      if (replication == ReplicationLevel.UNRECOGNIZED) {
+        continue;
+      }
+      final WriteRequestTypeProto write = WriteRequestTypeProto.newBuilder().setReplication(replication).build();
+      map.put(replication, new Type(write));
+    }
+    WRITE_REQUEST_TYPES = Collections.unmodifiableMap(map);
+  }
+
+  public static Type writeRequestType(ReplicationLevel replication) {
+    return WRITE_REQUEST_TYPES.get(replication);
+  }
+
   public static Type writeRequestType() {
-    return WRITE_DEFAULT;
+    return writeRequestType(ReplicationLevel.MAJORITY);
   }
 
   public static Type dataStreamRequestType() {
@@ -88,10 +114,10 @@
     return new Type(WatchRequestTypeProto.newBuilder().setIndex(index).setReplication(replication).build());
   }
 
-  /** The type of {@link RaftClientRequest} corresponding to {@link RaftClientRequestProto.TypeCase}. */
+  /** The type of {@link RaftClientRequest} corresponding to {@link TypeCase}. */
   public static final class Type {
     public static Type valueOf(WriteRequestTypeProto write) {
-      return WRITE_DEFAULT;
+      return writeRequestType(write.getReplication());
     }
 
     public static Type valueOf(DataStreamRequestTypeProto dataStream) {
@@ -126,43 +152,43 @@
      * Only the corresponding proto (must be non-null) is used.
      * The other protos are ignored.
      */
-    private final RaftClientRequestProto.TypeCase typeCase;
+    private final TypeCase typeCase;
     private final Object proto;
 
-    private Type(RaftClientRequestProto.TypeCase typeCase, Object proto) {
+    private Type(TypeCase typeCase, Object proto) {
       this.typeCase = Objects.requireNonNull(typeCase, "typeCase == null");
       this.proto = Objects.requireNonNull(proto, "proto == null");
     }
 
     private Type(WriteRequestTypeProto write) {
-      this(WRITE, write);
+      this(TypeCase.WRITE, write);
     }
 
     private Type(DataStreamRequestTypeProto dataStream) {
-      this(DATASTREAM, dataStream);
+      this(TypeCase.DATASTREAM, dataStream);
     }
 
     private Type(ForwardRequestTypeProto forward) {
-      this(FORWARD, forward);
+      this(TypeCase.FORWARD, forward);
     }
 
     private Type(MessageStreamRequestTypeProto messageStream) {
-      this(MESSAGESTREAM, messageStream);
+      this(TypeCase.MESSAGESTREAM, messageStream);
     }
 
     private Type(ReadRequestTypeProto read) {
-      this(READ, read);
+      this(TypeCase.READ, read);
     }
 
     private Type(StaleReadRequestTypeProto staleRead) {
-      this(STALEREAD, staleRead);
+      this(TypeCase.STALEREAD, staleRead);
     }
 
     private Type(WatchRequestTypeProto watch) {
-      this(WATCH, watch);
+      this(TypeCase.WATCH, watch);
     }
 
-    public boolean is(RaftClientRequestProto.TypeCase t) {
+    public boolean is(TypeCase t) {
       return getTypeCase() == t;
     }
 
@@ -182,42 +208,46 @@
       }
     }
 
-    public RaftClientRequestProto.TypeCase getTypeCase() {
+    public TypeCase getTypeCase() {
       return typeCase;
     }
 
+    private void assertType(TypeCase expected) {
+      Preconditions.assertSame(expected, getTypeCase(), "type");
+    }
+
     public WriteRequestTypeProto getWrite() {
-      Preconditions.assertTrue(is(WRITE));
+      assertType(TypeCase.WRITE);
       return (WriteRequestTypeProto)proto;
     }
 
     public DataStreamRequestTypeProto getDataStream() {
-      Preconditions.assertTrue(is(DATASTREAM));
+      assertType(TypeCase.DATASTREAM);
       return (DataStreamRequestTypeProto)proto;
     }
 
     public ForwardRequestTypeProto getForward() {
-      Preconditions.assertTrue(is(FORWARD));
+      assertType(TypeCase.FORWARD);
       return (ForwardRequestTypeProto)proto;
     }
 
     public MessageStreamRequestTypeProto getMessageStream() {
-      Preconditions.assertTrue(is(MESSAGESTREAM), () -> "proto = " + proto);
+      assertType(TypeCase.MESSAGESTREAM);
       return (MessageStreamRequestTypeProto)proto;
     }
 
     public ReadRequestTypeProto getRead() {
-      Preconditions.assertTrue(is(READ));
+      assertType(TypeCase.READ);
       return (ReadRequestTypeProto)proto;
     }
 
     public StaleReadRequestTypeProto getStaleRead() {
-      Preconditions.assertTrue(is(STALEREAD));
+      assertType(TypeCase.STALEREAD);
       return (StaleReadRequestTypeProto)proto;
     }
 
     public WatchRequestTypeProto getWatch() {
-      Preconditions.assertTrue(is(WATCH));
+      assertType(TypeCase.WATCH);
       return (WatchRequestTypeProto)proto;
     }
 
@@ -426,7 +456,7 @@
     return type;
   }
 
-  public boolean is(RaftClientRequestProto.TypeCase typeCase) {
+  public boolean is(TypeCase typeCase) {
     return getType().is(typeCase);
   }
 
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index d8a1b62..586ec1b 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -281,6 +281,7 @@
 }
 
 message WriteRequestTypeProto {
+  ReplicationLevel replication = 1;
 }
 
 message MessageStreamRequestTypeProto {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index c6983e3..b278891 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -545,7 +545,7 @@
         .thenApply(bytes -> RaftClientRequest.toWriteRequest(request, Message.valueOf(bytes)));
   }
 
-  CompletableFuture<RaftClientReply> addWatchReqeust(RaftClientRequest request) {
+  CompletableFuture<RaftClientReply> addWatchRequest(RaftClientRequest request) {
     LOG.debug("{}: addWatchRequest {}", this, request);
     return watchRequests.add(request)
         .thenApply(logIndex -> server.newSuccessReply(request, logIndex))
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 73451bf..51067a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -46,29 +46,8 @@
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.metrics.Timekeeper;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.proto.RaftProtos.CandidateInfoProto;
-import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
-import org.apache.ratis.proto.RaftProtos.FollowerInfoProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
-import org.apache.ratis.proto.RaftProtos.LeaderInfoProto;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
-import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
-import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
-import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
-import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
-import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
-import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
-import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
-import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.proto.RaftProtos.ServerRpcProto;
-import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
-import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
 import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.GroupInfoReply;
 import org.apache.ratis.protocol.GroupInfoRequest;
@@ -86,20 +65,7 @@
 import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.protocol.SnapshotManagementRequest;
 import org.apache.ratis.protocol.TransferLeadershipRequest;
-import org.apache.ratis.protocol.exceptions.GroupMismatchException;
-import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
-import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
-import org.apache.ratis.protocol.exceptions.NotLeaderException;
-import org.apache.ratis.protocol.exceptions.RaftException;
-import org.apache.ratis.protocol.exceptions.ReadException;
-import org.apache.ratis.protocol.exceptions.ReadIndexException;
-import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
-import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
-import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
-import org.apache.ratis.protocol.exceptions.SetConfigurationException;
-import org.apache.ratis.protocol.exceptions.StaleReadException;
-import org.apache.ratis.protocol.exceptions.StateMachineException;
-import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
+import org.apache.ratis.protocol.exceptions.*;
 import org.apache.ratis.server.DataStreamMap;
 import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.DivisionProperties;
@@ -597,7 +563,17 @@
     if ((old != RaftPeerRole.FOLLOWER || force) && old != RaftPeerRole.LISTENER) {
       setRole(RaftPeerRole.FOLLOWER, reason);
       if (old == RaftPeerRole.LEADER) {
-        role.shutdownLeaderState(false);
+        role.shutdownLeaderState(false)
+            .exceptionally(e -> {
+              if (e != null) {
+                if (!getInfo().isAlive()) {
+                  LOG.info("Since server is not alive {}, safely ignore {}", this, e.toString());
+                  return null;
+                }
+              }
+              throw new CompletionException("Failed to shutdownLeaderState: " + this, e);
+            })
+            .join();
         state.setLeader(null, reason);
       } else if (old == RaftPeerRole.CANDIDATE) {
         role.shutdownLeaderElection();
@@ -883,7 +859,30 @@
       }
       leaderState.notifySenders();
     }
-    return pending.getFuture();
+
+    final CompletableFuture<RaftClientReply> future = pending.getFuture();
+    if (request.is(TypeCase.WRITE)) {
+      // check replication
+      final ReplicationLevel replication = request.getType().getWrite().getReplication();
+      if (replication != ReplicationLevel.MAJORITY) {
+        return future.thenCompose(reply -> waitForReplication(reply, replication));
+      }
+    }
+
+    return future;
+  }
+
+  /** Wait until the given replication requirement is satisfied. */
+  private CompletableFuture<RaftClientReply> waitForReplication(RaftClientReply reply, ReplicationLevel replication) {
+    final RaftClientRequest.Type type = RaftClientRequest.watchRequestType(reply.getLogIndex(), replication);
+    final RaftClientRequest watch = RaftClientRequest.newBuilder()
+        .setServerId(reply.getServerId())
+        .setClientId(reply.getClientId())
+        .setGroupId(reply.getRaftGroupId())
+        .setCallId(reply.getCallId())
+        .setType(type)
+        .build();
+    return watchAsync(watch).thenApply(r -> reply);
   }
 
   void stepDownOnJvmPause() {
@@ -1002,7 +1001,7 @@
     }
 
     return role.getLeaderState()
-        .map(ls -> ls.addWatchReqeust(request))
+        .map(ls -> ls.addWatchRequest(request))
         .orElseGet(() -> CompletableFuture.completedFuture(
             newExceptionReply(request, generateNotLeaderException())));
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 9f53b69..fe2bc96 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -20,6 +20,7 @@
 
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.Timestamp;
 import org.slf4j.Logger;
@@ -87,7 +88,7 @@
     final LeaderStateImpl leader = leaderState.getAndSet(null);
     if (leader == null) {
       if (!allowNull) {
-        throw new NullPointerException("leaderState == null");
+        return JavaUtils.completeExceptionally(new NullPointerException("leaderState == null"));
       }
       return CompletableFuture.completedFuture(null);
     } else {