RATIS-235. RaftTestUtil.assertLogEntries should check index for async tests.
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index 1c1a754..cb18caa 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -123,7 +123,7 @@
LOG.trace("{}: wait {}ms", this, waitTimeMs);
wait(waitTimeMs);
} catch(InterruptedException ie) {
- LOG.warn("Wait interrupted", ie);
+ LOG.warn(this + ": Wait interrupted by " + ie);
}
}
}
@@ -242,7 +242,7 @@
LOG.info("{} is stopped", GRpcLogAppender.this);
return;
}
- RaftGrpcUtil.warn(LOG, () -> server.getId() + ": Failed appendEntries to " + follower.getPeer().getId(), t);
+ RaftGrpcUtil.warn(LOG, () -> server.getId() + ": Failed appendEntries to " + follower.getPeer(), t);
long callId = RaftGrpcUtil.getCallId(t);
resetClient(pendingRequests.get(callId));
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index e6a3b57..30fcd91 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -61,9 +61,8 @@
}
@Test
- public void testRequestTimeout()
- throws IOException, InterruptedException, ExecutionException {
- testRequestTimeout(false, getCluster(), LOG, getProperties());
+ public void testRequestTimeout() throws Exception {
+ testRequestTimeout(false, getCluster(), LOG);
}
@Test
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 5c5749f..2643073 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -153,6 +153,10 @@
ExitUtils.disableSystemExit();
}
+ public RaftProperties getProperties() {
+ return properties;
+ }
+
public MiniRaftCluster initServers() {
if (servers.isEmpty()) {
putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), true);
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index db466df..d8909de 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -57,22 +57,20 @@
LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
}
- private RaftProperties properties;
-
public static final int NUM_SERVERS = 3;
@Before
public void setup() {
- properties = new RaftProperties();
- properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
SimpleStateMachine4Testing.class, StateMachine.class);
TimeDuration retryCacheExpiryDuration = TimeDuration.valueOf(5, TimeUnit.SECONDS);
- RaftServerConfigKeys.RetryCache.setExpiryTime(properties, retryCacheExpiryDuration);
+ RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), retryCacheExpiryDuration);
}
@Test
public void testAsyncConfiguration() throws IOException {
LOG.info("Running testAsyncConfiguration");
+ final RaftProperties properties = new RaftProperties();
RaftClient.Builder clientBuilder = RaftClient.newBuilder()
.setRaftGroup(RaftGroup.emptyGroup())
.setProperties(properties);
@@ -91,24 +89,17 @@
RaftClientTestUtil.assertScheduler(client, numThreads);
RaftClientTestUtil.assertAsyncRequestSemaphore(client, maxOutstandingRequests, 0);
}
-
- // reset to default for other tests.
- RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties,
- RaftClientConfigKeys.Async.MAX_OUTSTANDING_REQUESTS_DEFAULT);
- RaftClientConfigKeys.Async.setSchedulerThreads(properties,
- RaftClientConfigKeys.Async.SCHEDULER_THREADS_DEFAULT);
}
@Test
- public void testAsyncRequestSemaphore()
- throws InterruptedException, IOException {
+ public void testAsyncRequestSemaphore() throws Exception {
LOG.info("Running testAsyncRequestSemaphore");
- CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+ final CLUSTER cluster = newCluster(NUM_SERVERS);
Assert.assertNull(cluster.getLeader());
cluster.start();
waitForLeader(cluster);
- int numMessages = RaftClientConfigKeys.Async.maxOutstandingRequests(properties);
+ int numMessages = RaftClientConfigKeys.Async.maxOutstandingRequests(getProperties());
CompletableFuture[] futures = new CompletableFuture[numMessages + 1];
final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMessages);
final RaftClient client = cluster.createClient();
@@ -153,33 +144,31 @@
cluster.shutdown();
}
+ void runTestBasicAppendEntriesAsync(ReplicationLevel replication) throws Exception {
+ final CLUSTER cluster = newCluster(NUM_SERVERS);
+ try {
+ cluster.start();
+ waitForLeader(cluster);
+ RaftBasicTests.runTestBasicAppendEntries(true, replication, 1000, cluster, LOG);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
@Test
public void testBasicAppendEntriesAsync() throws Exception {
- LOG.info("Running testBasicAppendEntriesAsync");
- RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 100);
- final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
- cluster.start();
- waitForLeader(cluster);
- RaftBasicTests.runTestBasicAppendEntries(true, ReplicationLevel.MAJORITY, 1000, cluster, LOG);
- cluster.shutdown();
+ runTestBasicAppendEntriesAsync(ReplicationLevel.MAJORITY);
}
@Test
public void testBasicAppendEntriesAsyncWithAllReplication() throws Exception {
- LOG.info("Running testBasicAppendEntriesAsync");
- RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 100);
- final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
- cluster.start();
- waitForLeader(cluster);
- RaftBasicTests.runTestBasicAppendEntries(true, ReplicationLevel.ALL, 1000, cluster, LOG);
- cluster.shutdown();
+ runTestBasicAppendEntriesAsync(ReplicationLevel.ALL);
}
@Test
public void testWithLoadAsync() throws Exception {
LOG.info("Running testWithLoadAsync");
- RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 100);
- final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+ final CLUSTER cluster = newCluster(NUM_SERVERS);
cluster.start();
waitForLeader(cluster);
RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG);
@@ -189,7 +178,7 @@
@Test
public void testStaleReadAsync() throws Exception {
final int numMesssages = 10;
- final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+ final CLUSTER cluster = newCluster(NUM_SERVERS);
try (RaftClient client = cluster.createClient()) {
cluster.start();
@@ -263,11 +252,10 @@
}
@Test
- public void testRequestTimeout()
- throws IOException, InterruptedException, ExecutionException {
- final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+ public void testRequestTimeout() throws Exception {
+ final CLUSTER cluster = newCluster(NUM_SERVERS);
cluster.start();
- RaftBasicTests.testRequestTimeout(true, cluster, LOG, properties);
+ RaftBasicTests.testRequestTimeout(true, cluster, LOG);
cluster.shutdown();
}
@@ -275,9 +263,9 @@
public void testAppendEntriesTimeout()
throws IOException, InterruptedException, ExecutionException {
LOG.info("Running testAppendEntriesTimeout");
- TimeDuration retryCacheExpiryDuration = TimeDuration.valueOf(20, TimeUnit.SECONDS);
- RaftServerConfigKeys.RetryCache.setExpiryTime(properties, retryCacheExpiryDuration);
- final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+ final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
+ RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(20, TimeUnit.SECONDS));
+ final CLUSTER cluster = newCluster(NUM_SERVERS);
cluster.start();
waitForLeader(cluster);
long time = System.currentTimeMillis();
@@ -309,5 +297,8 @@
Assert.assertTrue(System.currentTimeMillis() - time > waitTime);
}
cluster.shutdown();
+
+ //reset for the other tests
+ RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), oldExpiryTime);
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 4deeef5..64b3c7e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -18,7 +18,7 @@
package org.apache.ratis;
import org.apache.log4j.Level;
-import org.apache.ratis.RaftTestUtil.*;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
@@ -28,12 +28,16 @@
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.junit.After;
import org.junit.Assert;
@@ -46,7 +50,6 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -55,12 +58,15 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.ratis.RaftTestUtil.*;
+import static org.apache.ratis.RaftTestUtil.logEntriesContains;
+import static org.apache.ratis.RaftTestUtil.sendMessageInNewThread;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
import static org.junit.Assert.assertTrue;
public abstract class RaftBasicTests extends BaseTest {
{
LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+ LogUtils.setLogLevel(RaftServerTestUtil.getStateMachineUpdaterLog(), Level.DEBUG);
LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
RaftServerConfigKeys.RetryCache.setExpiryTime(properties, TimeDuration
.valueOf(5, TimeUnit.SECONDS));
@@ -100,30 +106,29 @@
runTestBasicAppendEntries(false, ReplicationLevel.ALL, 10, getCluster(), LOG);
}
+ static void killAndRestartServer(RaftPeerId id, long killSleepMs, long restartSleepMs, MiniRaftCluster cluster, Logger LOG) {
+ try {
+ Thread.sleep(killSleepMs);
+ cluster.killServer(id);
+ Thread.sleep(restartSleepMs);
+ LOG.info("restart server: " + id);
+ cluster.restartServer(id, false);
+ } catch (Exception e) {
+ ExitUtils.terminate(-1, "Failed to kill/restart server: " + id, e, LOG);
+ }
+ }
+
static void runTestBasicAppendEntries(
boolean async, ReplicationLevel replication, int numMessages, MiniRaftCluster cluster, Logger LOG) throws Exception {
- LOG.info("runTestBasicAppendEntries: async? " + async + ", numMessages=" + numMessages);
+ LOG.info("runTestBasicAppendEntries: async? {}, replication={}, numMessages={}",
+ async, replication, numMessages);
for (RaftServer s : cluster.getServers()) {
cluster.restartServer(s.getId(), false);
}
RaftServerImpl leader = waitForLeader(cluster);
final long term = leader.getState().getCurrentTerm();
- final RaftPeerId killed = cluster.getFollowers().get(0).getId();
- cluster.killServer(killed);
-
- if (replication == ReplicationLevel.ALL) {
- new Thread(() -> {
- try {
- Thread.sleep(3000);
- LOG.info("restart server: " + killed.toString());
- cluster.restartServer(killed, false);
- } catch (Exception e) {
- LOG.info("cannot restart server: " + killed.toString());
- e.printStackTrace();
- }
- }).start();
- }
+ new Thread(() -> killAndRestartServer(cluster.getFollowers().get(0).getId(), 0, 1000, cluster, LOG)).start();
LOG.info(cluster.printServers());
@@ -144,7 +149,8 @@
}
});
} else {
- client.send(message, replication);
+ final RaftClientReply reply = client.send(message, replication);
+ Preconditions.assertTrue(reply.isSuccess());
}
}
if (async) {
@@ -157,8 +163,12 @@
}
LOG.info(cluster.printAllLogs());
- cluster.getServerAliveStream().map(s -> s.getState().getLog())
- .forEach(log -> RaftTestUtil.assertLogEntries(log, async, term, messages));
+ for(RaftServerProxy server : cluster.getServers()) {
+ final RaftServerImpl impl = server.getImpl();
+ if (impl.isAlive() || replication == ReplicationLevel.ALL) {
+ RaftTestUtil.assertLogEntries(impl, term, messages);
+ }
+ }
}
@@ -199,7 +209,7 @@
Assert.assertEquals(followerToSendLog.getId(), newLeaderId);
cluster.getServerAliveStream().map(s -> s.getState().getLog())
- .forEach(log -> RaftTestUtil.assertLogEntries(log, false, term, messages));
+ .forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages));
LOG.info("terminating testOldLeaderCommit test");
}
@@ -401,8 +411,7 @@
}
}
- public static void testRequestTimeout(boolean async, MiniRaftCluster cluster, Logger LOG,
- RaftProperties properties) throws InterruptedException, IOException, ExecutionException {
+ public static void testRequestTimeout(boolean async, MiniRaftCluster cluster, Logger LOG) throws Exception {
LOG.info("Running testRequestTimeout");
waitForLeader(cluster);
long time = System.currentTimeMillis();
@@ -427,7 +436,7 @@
// when the retry cache entry is invalidated.
// The duration for which the client waits should be more than the retryCacheExpiryDuration.
TimeDuration duration = TimeDuration.valueOf(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS);
- TimeDuration retryCacheExpiryDuration = RaftServerConfigKeys.RetryCache.expiryTime(properties);
+ TimeDuration retryCacheExpiryDuration = RaftServerConfigKeys.RetryCache.expiryTime(cluster.getProperties());
Assert.assertTrue(duration.compareTo(retryCacheExpiryDuration) >= 0);
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index c55445a..53a051a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -27,6 +27,7 @@
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
@@ -40,7 +41,11 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
import java.util.function.BooleanSupplier;
import java.util.function.IntSupplier;
import java.util.function.Predicate;
@@ -165,9 +170,18 @@
}
}
- static void assertLogEntries(RaftLog log, boolean async, long expectedTerm,
- SimpleMessage... expectedMessages) {
+ static void assertLogEntries(RaftServerImpl server, long expectedTerm, SimpleMessage... expectedMessages) {
+ LOG.info("checking raft log for " + server.getId());
+ final RaftLog log = server.getState().getLog();
+ try {
+ RaftTestUtil.assertLogEntries(log, expectedTerm, expectedMessages);
+ } catch (AssertionError e) {
+ LOG.error(server.getId() + ": Unexpected raft log", e);
+ throw e;
+ }
+ }
+ static void assertLogEntries(RaftLog log, long expectedTerm, SimpleMessage... expectedMessages) {
final TermIndex[] termIndices = log.getEntries(1, Long.MAX_VALUE);
final List<LogEntryProto> entries = new ArrayList<>(expectedMessages.length);
for (TermIndex ti : termIndices) {
@@ -179,16 +193,25 @@
}
if (e.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.SMLOGENTRY) {
+ LOG.info(ServerProtoUtils.toString(e) + ", " + e.getSmLogEntry().toString().trim().replace("\n", ", "));
entries.add(e);
} else if (e.getLogEntryBodyCase() == LogEntryProto.LogEntryBodyCase.NOOP) {
LOG.info("Found " + LogEntryProto.LogEntryBodyCase.NOOP + " at " + ti
+ ", ignoring it.");
} else {
- throw new AssertionError(
- "Unexpected LogEntryBodyCase " + e.getLogEntryBodyCase() + " at " + ti);
+ throw new AssertionError("Unexpected LogEntryBodyCase " + e.getLogEntryBodyCase() + " at " + ti
+ + ": " + ServerProtoUtils.toString(e));
}
}
+ try {
+ assertLogEntries(entries, expectedTerm, expectedMessages);
+ } catch(Throwable t) {
+ throw new AssertionError("entries: " + entries, t);
+ }
+ }
+
+ static void assertLogEntries(List<LogEntryProto> entries, long expectedTerm, SimpleMessage... expectedMessages) {
long logIndex = 0;
Assert.assertEquals(expectedMessages.length, entries.size());
for (int i = 0; i < expectedMessages.length; i++) {
@@ -197,9 +220,7 @@
if (e.getTerm() > expectedTerm) {
expectedTerm = e.getTerm();
}
- if (!async) {
- Assert.assertTrue(e.getIndex() > logIndex);
- }
+ Assert.assertTrue(e.getIndex() > logIndex);
logIndex = e.getIndex();
Assert.assertArrayEquals(expectedMessages[i].getContent().toByteArray(),
e.getSmLogEntry().getData().toByteArray());
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 47d8c9d..f84576d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -94,4 +94,8 @@
public static Stream<LogAppender> getLogAppenders(RaftServerImpl server) {
return server.getLeaderState().getLogAppenders();
}
+
+ public static Logger getStateMachineUpdaterLog() {
+ return StateMachineUpdater.LOG;
+ }
}