RATIS-1994. AsyncApi send() to support optional replication level. (#1006)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
index 483a222..e383281 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java
@@ -36,9 +36,15 @@
* For readonly messages, use {@link #sendReadOnly(Message)} instead.
*
* @param message The request message.
+ * @param replication The replication level to wait for.
* @return a future of the reply.
*/
- CompletableFuture<RaftClientReply> send(Message message);
+ CompletableFuture<RaftClientReply> send(Message message, ReplicationLevel replication);
+
+ /** The same as send(message, ReplicationLevel.MAJORITY). */
+ default CompletableFuture<RaftClientReply> send(Message message) {
+ return send(message, ReplicationLevel.MAJORITY);
+ }
/** The same as sendReadOnly(message, null). */
default CompletableFuture<RaftClientReply> sendReadOnly(Message message) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
index 2f7069f..8547ce2 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java
@@ -42,8 +42,8 @@
}
@Override
- public CompletableFuture<RaftClientReply> send(Message message) {
- return send(RaftClientRequest.writeRequestType(), message, null);
+ public CompletableFuture<RaftClientReply> send(Message message, ReplicationLevel replication) {
+ return send(RaftClientRequest.writeRequestType(replication), message, null);
}
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 9d853b4..ed41f1e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -17,23 +17,31 @@
*/
package org.apache.ratis.protocol;
-import org.apache.ratis.proto.RaftProtos.*;
+import org.apache.ratis.proto.RaftProtos.DataStreamRequestTypeProto;
+import org.apache.ratis.proto.RaftProtos.ForwardRequestTypeProto;
+import org.apache.ratis.proto.RaftProtos.MessageStreamRequestTypeProto;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
+import org.apache.ratis.proto.RaftProtos.ReadRequestTypeProto;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
+import org.apache.ratis.proto.RaftProtos.StaleReadRequestTypeProto;
+import org.apache.ratis.proto.RaftProtos.WatchRequestTypeProto;
+import org.apache.ratis.proto.RaftProtos.WriteRequestTypeProto;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import static org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.*;
-
/**
* Request from client to server
*/
public class RaftClientRequest extends RaftClientMessage {
private static final Type DATA_STREAM_DEFAULT = new Type(DataStreamRequestTypeProto.getDefaultInstance());
private static final Type FORWARD_DEFAULT = new Type(ForwardRequestTypeProto.getDefaultInstance());
- private static final Type WRITE_DEFAULT = new Type(WriteRequestTypeProto.getDefaultInstance());
private static final Type WATCH_DEFAULT = new Type(
WatchRequestTypeProto.newBuilder().setIndex(0L).setReplication(ReplicationLevel.MAJORITY).build());
@@ -44,8 +52,26 @@
= new Type(ReadRequestTypeProto.newBuilder().setPreferNonLinearizable(true).build());
private static final Type STALE_READ_DEFAULT = new Type(StaleReadRequestTypeProto.getDefaultInstance());
+ private static final Map<ReplicationLevel, Type> WRITE_REQUEST_TYPES;
+
+ static {
+ final EnumMap<ReplicationLevel, Type> map = new EnumMap<>(ReplicationLevel.class);
+ for(ReplicationLevel replication : ReplicationLevel.values()) {
+ if (replication == ReplicationLevel.UNRECOGNIZED) {
+ continue;
+ }
+ final WriteRequestTypeProto write = WriteRequestTypeProto.newBuilder().setReplication(replication).build();
+ map.put(replication, new Type(write));
+ }
+ WRITE_REQUEST_TYPES = Collections.unmodifiableMap(map);
+ }
+
+ public static Type writeRequestType(ReplicationLevel replication) {
+ return WRITE_REQUEST_TYPES.get(replication);
+ }
+
public static Type writeRequestType() {
- return WRITE_DEFAULT;
+ return writeRequestType(ReplicationLevel.MAJORITY);
}
public static Type dataStreamRequestType() {
@@ -88,10 +114,10 @@
return new Type(WatchRequestTypeProto.newBuilder().setIndex(index).setReplication(replication).build());
}
- /** The type of {@link RaftClientRequest} corresponding to {@link RaftClientRequestProto.TypeCase}. */
+ /** The type of {@link RaftClientRequest} corresponding to {@link TypeCase}. */
public static final class Type {
public static Type valueOf(WriteRequestTypeProto write) {
- return WRITE_DEFAULT;
+ return writeRequestType(write.getReplication());
}
public static Type valueOf(DataStreamRequestTypeProto dataStream) {
@@ -126,43 +152,43 @@
* Only the corresponding proto (must be non-null) is used.
* The other protos are ignored.
*/
- private final RaftClientRequestProto.TypeCase typeCase;
+ private final TypeCase typeCase;
private final Object proto;
- private Type(RaftClientRequestProto.TypeCase typeCase, Object proto) {
+ private Type(TypeCase typeCase, Object proto) {
this.typeCase = Objects.requireNonNull(typeCase, "typeCase == null");
this.proto = Objects.requireNonNull(proto, "proto == null");
}
private Type(WriteRequestTypeProto write) {
- this(WRITE, write);
+ this(TypeCase.WRITE, write);
}
private Type(DataStreamRequestTypeProto dataStream) {
- this(DATASTREAM, dataStream);
+ this(TypeCase.DATASTREAM, dataStream);
}
private Type(ForwardRequestTypeProto forward) {
- this(FORWARD, forward);
+ this(TypeCase.FORWARD, forward);
}
private Type(MessageStreamRequestTypeProto messageStream) {
- this(MESSAGESTREAM, messageStream);
+ this(TypeCase.MESSAGESTREAM, messageStream);
}
private Type(ReadRequestTypeProto read) {
- this(READ, read);
+ this(TypeCase.READ, read);
}
private Type(StaleReadRequestTypeProto staleRead) {
- this(STALEREAD, staleRead);
+ this(TypeCase.STALEREAD, staleRead);
}
private Type(WatchRequestTypeProto watch) {
- this(WATCH, watch);
+ this(TypeCase.WATCH, watch);
}
- public boolean is(RaftClientRequestProto.TypeCase t) {
+ public boolean is(TypeCase t) {
return getTypeCase() == t;
}
@@ -182,42 +208,46 @@
}
}
- public RaftClientRequestProto.TypeCase getTypeCase() {
+ public TypeCase getTypeCase() {
return typeCase;
}
+ private void assertType(TypeCase expected) {
+ Preconditions.assertSame(expected, getTypeCase(), "type");
+ }
+
public WriteRequestTypeProto getWrite() {
- Preconditions.assertTrue(is(WRITE));
+ assertType(TypeCase.WRITE);
return (WriteRequestTypeProto)proto;
}
public DataStreamRequestTypeProto getDataStream() {
- Preconditions.assertTrue(is(DATASTREAM));
+ assertType(TypeCase.DATASTREAM);
return (DataStreamRequestTypeProto)proto;
}
public ForwardRequestTypeProto getForward() {
- Preconditions.assertTrue(is(FORWARD));
+ assertType(TypeCase.FORWARD);
return (ForwardRequestTypeProto)proto;
}
public MessageStreamRequestTypeProto getMessageStream() {
- Preconditions.assertTrue(is(MESSAGESTREAM), () -> "proto = " + proto);
+ assertType(TypeCase.MESSAGESTREAM);
return (MessageStreamRequestTypeProto)proto;
}
public ReadRequestTypeProto getRead() {
- Preconditions.assertTrue(is(READ));
+ assertType(TypeCase.READ);
return (ReadRequestTypeProto)proto;
}
public StaleReadRequestTypeProto getStaleRead() {
- Preconditions.assertTrue(is(STALEREAD));
+ assertType(TypeCase.STALEREAD);
return (StaleReadRequestTypeProto)proto;
}
public WatchRequestTypeProto getWatch() {
- Preconditions.assertTrue(is(WATCH));
+ assertType(TypeCase.WATCH);
return (WatchRequestTypeProto)proto;
}
@@ -426,7 +456,7 @@
return type;
}
- public boolean is(RaftClientRequestProto.TypeCase typeCase) {
+ public boolean is(TypeCase typeCase) {
return getType().is(typeCase);
}
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index d8a1b62..586ec1b 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -281,6 +281,7 @@
}
message WriteRequestTypeProto {
+ ReplicationLevel replication = 1;
}
message MessageStreamRequestTypeProto {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index c6983e3..b278891 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -545,7 +545,7 @@
.thenApply(bytes -> RaftClientRequest.toWriteRequest(request, Message.valueOf(bytes)));
}
- CompletableFuture<RaftClientReply> addWatchReqeust(RaftClientRequest request) {
+ CompletableFuture<RaftClientReply> addWatchRequest(RaftClientRequest request) {
LOG.debug("{}: addWatchRequest {}", this, request);
return watchRequests.add(request)
.thenApply(logIndex -> server.newSuccessReply(request, logIndex))
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 73451bf..51067a8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -46,29 +46,8 @@
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.Timekeeper;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.proto.RaftProtos.CandidateInfoProto;
-import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
-import org.apache.ratis.proto.RaftProtos.FollowerInfoProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
-import org.apache.ratis.proto.RaftProtos.LeaderInfoProto;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
-import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
-import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
-import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
-import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
-import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
-import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
-import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
-import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.proto.RaftProtos.ServerRpcProto;
-import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
-import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupInfoRequest;
@@ -86,20 +65,7 @@
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.SnapshotManagementRequest;
import org.apache.ratis.protocol.TransferLeadershipRequest;
-import org.apache.ratis.protocol.exceptions.GroupMismatchException;
-import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
-import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
-import org.apache.ratis.protocol.exceptions.NotLeaderException;
-import org.apache.ratis.protocol.exceptions.RaftException;
-import org.apache.ratis.protocol.exceptions.ReadException;
-import org.apache.ratis.protocol.exceptions.ReadIndexException;
-import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
-import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
-import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
-import org.apache.ratis.protocol.exceptions.SetConfigurationException;
-import org.apache.ratis.protocol.exceptions.StaleReadException;
-import org.apache.ratis.protocol.exceptions.StateMachineException;
-import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
+import org.apache.ratis.protocol.exceptions.*;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.DivisionProperties;
@@ -597,7 +563,17 @@
if ((old != RaftPeerRole.FOLLOWER || force) && old != RaftPeerRole.LISTENER) {
setRole(RaftPeerRole.FOLLOWER, reason);
if (old == RaftPeerRole.LEADER) {
- role.shutdownLeaderState(false);
+ role.shutdownLeaderState(false)
+ .exceptionally(e -> {
+ if (e != null) {
+ if (!getInfo().isAlive()) {
+ LOG.info("Since server is not alive {}, safely ignore {}", this, e.toString());
+ return null;
+ }
+ }
+ throw new CompletionException("Failed to shutdownLeaderState: " + this, e);
+ })
+ .join();
state.setLeader(null, reason);
} else if (old == RaftPeerRole.CANDIDATE) {
role.shutdownLeaderElection();
@@ -883,7 +859,30 @@
}
leaderState.notifySenders();
}
- return pending.getFuture();
+
+ final CompletableFuture<RaftClientReply> future = pending.getFuture();
+ if (request.is(TypeCase.WRITE)) {
+ // check replication
+ final ReplicationLevel replication = request.getType().getWrite().getReplication();
+ if (replication != ReplicationLevel.MAJORITY) {
+ return future.thenCompose(reply -> waitForReplication(reply, replication));
+ }
+ }
+
+ return future;
+ }
+
+ /** Wait until the given replication requirement is satisfied. */
+ private CompletableFuture<RaftClientReply> waitForReplication(RaftClientReply reply, ReplicationLevel replication) {
+ final RaftClientRequest.Type type = RaftClientRequest.watchRequestType(reply.getLogIndex(), replication);
+ final RaftClientRequest watch = RaftClientRequest.newBuilder()
+ .setServerId(reply.getServerId())
+ .setClientId(reply.getClientId())
+ .setGroupId(reply.getRaftGroupId())
+ .setCallId(reply.getCallId())
+ .setType(type)
+ .build();
+ return watchAsync(watch).thenApply(r -> reply);
}
void stepDownOnJvmPause() {
@@ -1002,7 +1001,7 @@
}
return role.getLeaderState()
- .map(ls -> ls.addWatchReqeust(request))
+ .map(ls -> ls.addWatchRequest(request))
.orElseGet(() -> CompletableFuture.completedFuture(
newExceptionReply(request, generateNotLeaderException())));
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 9f53b69..fe2bc96 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -20,6 +20,7 @@
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.Timestamp;
import org.slf4j.Logger;
@@ -87,7 +88,7 @@
final LeaderStateImpl leader = leaderState.getAndSet(null);
if (leader == null) {
if (!allowNull) {
- throw new NullPointerException("leaderState == null");
+ return JavaUtils.completeExceptionally(new NullPointerException("leaderState == null"));
}
return CompletableFuture.completedFuture(null);
} else {