RATIS-511. Fail the requests the sliding window when a raft client hits GroupMismatchException. Contributed by Tsz Wo Nicholas Sze.
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index c489dff..f73e541 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -107,7 +107,7 @@
}
@Override
- public void fail(Exception e) {
+ public void fail(Throwable e) {
getReplyFuture().completeExceptionally(e);
}
@@ -373,7 +373,8 @@
}
return null;
}
- throw new CompletionException(e);
+ failAllAsyncRequests(request, e);
+ return null;
});
}
@@ -385,7 +386,11 @@
private void handleAsyncRetryFailure(RaftClientRequest request, int attemptCount) {
final RaftRetryFailureException rfe = newRaftRetryFailureException(request, attemptCount, retryPolicy);
- getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), rfe);
+ failAllAsyncRequests(request, rfe);
+ }
+
+ private void failAllAsyncRequests(RaftClientRequest request, Throwable t) {
+ getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), t);
}
private RaftClientReply sendRequest(RaftClientRequest request)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index d0a1a52..8d5c1b6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -48,7 +48,7 @@
interface ClientSideRequest<REPLY> extends Request<REPLY> {
void setFirstRequest();
- void fail(Exception e);
+ void fail(Throwable e);
}
interface ServerSideRequest<REPLY> extends Request<REPLY> {
@@ -181,7 +181,7 @@
/** Is the first request replied? */
private boolean firstReplied;
/** The exception, if there is any. */
- private Exception exception;
+ private Throwable exception;
public Client(Object name) {
this.requests = new RequestMap<REQUEST, REPLY>(name) {
@@ -324,7 +324,7 @@
}
/** Fail all requests starting from the given seqNum. */
- public synchronized void fail(final long startingSeqNum, Exception e) {
+ public synchronized void fail(final long startingSeqNum, Throwable e) {
exception = e;
boolean handled = false;
@@ -345,7 +345,7 @@
}
}
- private void alreadyClosed(REQUEST request, Exception e) {
+ private void alreadyClosed(REQUEST request, Throwable e) {
request.fail(new AlreadyClosedException(SlidingWindow.class.getSimpleName() + "$" + getClass().getSimpleName()
+ " " + requests.getName() + " is closed.", e));
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
new file mode 100644
index 0000000..f48e989
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.protocol.AlreadyClosedException;
+import org.apache.ratis.protocol.GroupMismatchException;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+public abstract class RaftAsyncExceptionTests<CLUSTER extends MiniRaftCluster>
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+ @Test
+ public void testGroupMismatchException() throws Exception {
+ runWithNewCluster(1, this::runTestGroupMismatchException);
+ }
+
+ private void runTestGroupMismatchException(CLUSTER cluster) throws Exception {
+ // send a message to make sure the cluster is working
+ try(RaftClient client = cluster.createClient()) {
+ final RaftClientReply reply = client.sendAsync(new SimpleMessage("first")).get();
+ Assert.assertTrue(reply.isSuccess());
+ }
+
+ // create another group
+ final RaftGroup clusterGroup = cluster.getGroup();
+ final RaftGroup anotherGroup = RaftGroup.valueOf(RaftGroupId.randomId(), clusterGroup.getPeers());
+ Assert.assertNotEquals(clusterGroup.getGroupId(), anotherGroup.getGroupId());
+
+ // create another client using another group
+ final SimpleMessage[] messages = SimpleMessage.create(5);
+ try(RaftClient client = cluster.createClient(anotherGroup)) {
+ // send a few messages
+ final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>();
+ for(SimpleMessage m : messages) {
+ futures.add(client.sendAsync(m));
+ }
+ Assert.assertEquals(messages.length, futures.size());
+
+ // check replies
+ final Iterator<CompletableFuture<RaftClientReply>> i = futures.iterator();
+ testFailureCase("First reply is GroupMismatchException",
+ () -> i.next().get(),
+ ExecutionException.class, GroupMismatchException.class);
+ for(; i.hasNext(); ) {
+ testFailureCase("Following replies are AlreadyClosedException caused by GroupMismatchException",
+ () -> i.next().get(),
+ ExecutionException.class, AlreadyClosedException.class, GroupMismatchException.class);
+ }
+ }
+ }
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncExceptionWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncExceptionWithGrpc.java
new file mode 100644
index 0000000..2ec10cd
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftAsyncExceptionWithGrpc.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.RaftAsyncExceptionTests;
+
+public class TestRaftAsyncExceptionWithGrpc
+ extends RaftAsyncExceptionTests<MiniRaftClusterWithGrpc>
+ implements MiniRaftClusterWithGrpc.FactoryGet {
+}