blob: a343f769004b1f2bdd33174bee43149c651370c9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis;
import org.apache.log4j.Level;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
static {
LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
}
public static final int NUM_SERVERS = 3;
@Before
public void setup() {
getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
SimpleStateMachine4Testing.class, StateMachine.class);
}
@Test
public void testAsyncConfiguration() throws IOException {
LOG.info("Running testAsyncConfiguration");
final RaftProperties properties = new RaftProperties();
RaftClient.Builder clientBuilder = RaftClient.newBuilder()
.setRaftGroup(RaftGroup.emptyGroup())
.setProperties(properties);
int numThreads = RaftClientConfigKeys.Async.SCHEDULER_THREADS_DEFAULT;
int maxOutstandingRequests = RaftClientConfigKeys.Async.MAX_OUTSTANDING_REQUESTS_DEFAULT;
try(RaftClient client = clientBuilder.build()) {
RaftClientTestUtil.assertScheduler(client, numThreads);
RaftClientTestUtil.assertAsyncRequestSemaphore(client, maxOutstandingRequests, 0);
}
numThreads = 200;
maxOutstandingRequests = 5;
RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, maxOutstandingRequests);
RaftClientConfigKeys.Async.setSchedulerThreads(properties, numThreads);
try(RaftClient client = clientBuilder.build()) {
RaftClientTestUtil.assertScheduler(client, numThreads);
RaftClientTestUtil.assertAsyncRequestSemaphore(client, maxOutstandingRequests, 0);
}
}
@Test
public void testAsyncRequestSemaphore() throws Exception {
LOG.info("Running testAsyncRequestSemaphore");
final CLUSTER cluster = newCluster(NUM_SERVERS);
Assert.assertNull(cluster.getLeader());
cluster.start();
waitForLeader(cluster);
int numMessages = RaftClientConfigKeys.Async.maxOutstandingRequests(getProperties());
CompletableFuture[] futures = new CompletableFuture[numMessages + 1];
final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMessages);
final RaftClient client = cluster.createClient();
//Set blockTransaction flag so that transaction blocks
for (RaftServerProxy server : cluster.getServers()) {
((SimpleStateMachine4Testing) server.getStateMachine()).setBlockTransaction(true);
}
//Send numMessages which are blocked and do not release the client semaphore permits
AtomicInteger blockedRequestsCount = new AtomicInteger();
for (int i=0; i<numMessages; i++) {
blockedRequestsCount.getAndIncrement();
futures[i] = client.sendAsync(messages[i]);
blockedRequestsCount.decrementAndGet();
}
Assert.assertTrue(blockedRequestsCount.get() == 0);
ExecutorService threadPool = Executors.newFixedThreadPool(1);
futures[numMessages] = CompletableFuture.supplyAsync(() -> {
blockedRequestsCount.incrementAndGet();
client.sendAsync(new RaftTestUtil.SimpleMessage("n1"));
blockedRequestsCount.decrementAndGet();
return null;
}, threadPool);
//Allow the last msg to be sent
while (blockedRequestsCount.get() != 1) {
Thread.sleep(1000);
}
Assert.assertTrue(blockedRequestsCount.get() == 1);
//Since all semaphore permits are acquired the last message sent is in queue
RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1);
//Unset the blockTransaction flag so that semaphore permits can be released
for (RaftServerProxy server : cluster.getServers()) {
((SimpleStateMachine4Testing) server.getStateMachine()).setBlockTransaction(false);
}
for(int i=0; i<=numMessages; i++){
futures[i].join();
}
Assert.assertTrue(blockedRequestsCount.get() == 0);
cluster.shutdown();
}
void runTestBasicAppendEntriesAsync(ReplicationLevel replication, boolean killLeader) throws Exception {
final CLUSTER cluster = newCluster(killLeader? 5: 3);
try {
cluster.start();
waitForLeader(cluster);
RaftBasicTests.runTestBasicAppendEntries(true, replication, killLeader, 1000, cluster, LOG);
} finally {
cluster.shutdown();
}
}
@Test
public void testBasicAppendEntriesAsync() throws Exception {
runTestBasicAppendEntriesAsync(ReplicationLevel.MAJORITY, false);
}
@Test
public void testBasicAppendEntriesAsyncKillLeader() throws Exception {
runTestBasicAppendEntriesAsync(ReplicationLevel.MAJORITY, true);
}
@Test
public void testBasicAppendEntriesAsyncWithAllReplication() throws Exception {
runTestBasicAppendEntriesAsync(ReplicationLevel.ALL, false);
}
@Test
public void testWithLoadAsync() throws Exception {
LOG.info("Running testWithLoadAsync");
final CLUSTER cluster = newCluster(NUM_SERVERS);
cluster.start();
waitForLeader(cluster);
RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG);
cluster.shutdown();
}
@Test
public void testStaleReadAsync() throws Exception {
final int numMesssages = 10;
final CLUSTER cluster = newCluster(NUM_SERVERS);
try (RaftClient client = cluster.createClient()) {
cluster.start();
RaftTestUtil.waitForLeader(cluster);
// submit some messages
final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>();
for (int i = 0; i < numMesssages; i++) {
final String s = "m" + i;
LOG.info("sendAsync " + s);
futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage(s)));
}
Assert.assertEquals(numMesssages, futures.size());
RaftClientReply lastWriteReply = null;
for (CompletableFuture<RaftClientReply> f : futures) {
lastWriteReply = f.join();
Assert.assertTrue(lastWriteReply.isSuccess());
}
futures.clear();
// Use a follower with the max commit index
final RaftPeerId leader = lastWriteReply.getServerId();
LOG.info("leader = " + leader);
final Collection<CommitInfoProto> commitInfos = lastWriteReply.getCommitInfos();
LOG.info("commitInfos = " + commitInfos);
final CommitInfoProto followerCommitInfo = commitInfos.stream()
.filter(info -> !RaftPeerId.valueOf(info.getServer().getId()).equals(leader))
.max(Comparator.comparing(CommitInfoProto::getCommitIndex)).get();
final RaftPeerId follower = RaftPeerId.valueOf(followerCommitInfo.getServer().getId());
LOG.info("max follower = " + follower);
// test a failure case
testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index",
() -> client.sendStaleReadAsync(
new RaftTestUtil.SimpleMessage("" + (numMesssages + 1)),
followerCommitInfo.getCommitIndex(), follower),
StateMachineException.class, IndexOutOfBoundsException.class);
// test sendStaleReadAsync
for (int i = 1; i < followerCommitInfo.getCommitIndex(); i++) {
final int query = i;
LOG.info("sendStaleReadAsync, query=" + query);
final Message message = new RaftTestUtil.SimpleMessage("" + query);
final CompletableFuture<RaftClientReply> readFuture = client.sendReadOnlyAsync(message);
final CompletableFuture<RaftClientReply> staleReadFuture = client.sendStaleReadAsync(
message, followerCommitInfo.getCommitIndex(), follower);
futures.add(readFuture.thenApply(r -> getMessageContent(r))
.thenCombine(staleReadFuture.thenApply(r -> getMessageContent(r)), (expected, computed) -> {
try {
LOG.info("query " + query + " returns "
+ LogEntryProto.parseFrom(expected).getSmLogEntry().getData().toStringUtf8());
} catch (InvalidProtocolBufferException e) {
throw new CompletionException(e);
}
Assert.assertEquals("log entry mismatch for query=" + query, expected, computed);
return null;
})
);
}
JavaUtils.allOf(futures).join();
} finally {
cluster.shutdown();
}
}
static ByteString getMessageContent(RaftClientReply reply) {
Assert.assertTrue(reply.isSuccess());
return reply.getMessage().getContent();
}
@Test
public void testRequestTimeout() throws Exception {
final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5, TimeUnit.SECONDS));
final CLUSTER cluster = newCluster(NUM_SERVERS);
cluster.start();
RaftBasicTests.testRequestTimeout(true, cluster, LOG);
cluster.shutdown();
//reset for the other tests
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), oldExpiryTime);
}
@Test
public void testAppendEntriesTimeout()
throws IOException, InterruptedException, ExecutionException {
LOG.info("Running testAppendEntriesTimeout");
final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(20, TimeUnit.SECONDS));
final CLUSTER cluster = newCluster(NUM_SERVERS);
cluster.start();
waitForLeader(cluster);
long time = System.currentTimeMillis();
long waitTime = 5000;
try (final RaftClient client = cluster.createClient()) {
// block append requests
cluster.getServerAliveStream().forEach(raftServer -> {
try {
if (!raftServer.isLeader()) {
((SimpleStateMachine4Testing) raftServer.getStateMachine()).setBlockAppend(true);
}
} catch (InterruptedException e) {
LOG.error("Interrupted while blocking append", e);
}
});
CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new RaftTestUtil.SimpleMessage("abc"));
Thread.sleep(waitTime);
// replyFuture should not be completed until append request is unblocked.
Assert.assertTrue(!replyFuture.isDone());
// unblock append request.
cluster.getServerAliveStream().forEach(raftServer -> {
try {
((SimpleStateMachine4Testing) raftServer.getStateMachine()).setBlockAppend(false);
} catch (InterruptedException e) {
LOG.error("Interrupted while unblocking append", e);
}
});
replyFuture.get();
Assert.assertTrue(System.currentTimeMillis() - time > waitTime);
}
cluster.shutdown();
//reset for the other tests
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), oldExpiryTime);
}
}