blob: 36667eeae05f44e3a5e99bc190fb744fade3a81e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.grpc;
import org.apache.log4j.Level;
import org.apache.ratis.LogAppenderTests;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Log4jUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
public class TestLogAppenderWithGrpc
extends LogAppenderTests<MiniRaftClusterWithGrpc>
implements MiniRaftClusterWithGrpc.FactoryGet {
{
Log4jUtils.setLogLevel(FollowerInfo.LOG, Level.DEBUG);
}
@Test
public void testPendingLimits() throws IOException, InterruptedException {
int maxAppends = 10;
RaftProperties properties = new RaftProperties();
properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
SimpleStateMachine4Testing.class, StateMachine.class);
GrpcConfigKeys.Server.setLeaderOutstandingAppendsMax(properties, maxAppends);
RaftServerConfigKeys.Log.Appender.setBufferElementLimit(properties, 1);
MiniRaftClusterWithGrpc cluster = getFactory().newCluster(3, properties);
cluster.start();
// client and leader setup
try (final RaftClient client = cluster.createClient(cluster.getGroup())) {
client.io().send(new RaftTestUtil.SimpleMessage("m"));
final RaftServer.Division leader = waitForLeader(cluster);
long initialNextIndex = RaftServerTestUtil.getNextIndex(leader);
for (RaftServer.Division server : cluster.getFollowers()) {
// block the appends in the follower
SimpleStateMachine4Testing.get(server).blockWriteStateMachineData();
}
Collection<CompletableFuture<RaftClientReply>> futures = new ArrayList<>(maxAppends * 2);
for (int i = 0; i < maxAppends * 2; i++) {
futures.add(client.async().send(new RaftTestUtil.SimpleMessage("m")));
}
FIVE_SECONDS.sleep();
for (long nextIndex : leader.getInfo().getFollowerNextIndices()) {
// Verify nextIndex does not progress due to pendingRequests limit
Assert.assertEquals(initialNextIndex + maxAppends, nextIndex);
}
ONE_SECOND.sleep();
for (RaftServer.Division server : cluster.getFollowers()) {
// unblock the appends in the follower
SimpleStateMachine4Testing.get(server).unblockWriteStateMachineData();
}
JavaUtils.allOf(futures).join();
cluster.shutdown();
}
}
@Test
public void testRestartLogAppender() throws Exception {
runWithNewCluster(2, this::runTestRestartLogAppender);
}
private void runTestRestartLogAppender(MiniRaftClusterWithGrpc cluster) throws Exception {
final RaftServer.Division leader = waitForLeader(cluster);
int messageCount = 0;
// Send some messages
try(RaftClient client = cluster.createClient(leader.getId())) {
for(int i = 0; i < 10; i++) {
final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + ++messageCount));
Assert.assertTrue(reply.isSuccess());
}
}
// assert INCONSISTENCY counter == 0
final GrpcServerMetrics leaderMetrics = new GrpcServerMetrics(leader.getMemberId().toString());
final String counter = String.format(GrpcServerMetrics.RATIS_GRPC_METRICS_LOG_APPENDER_INCONSISTENCY,
cluster.getFollowers().iterator().next().getMemberId().getPeerId());
Assert.assertEquals(0L, leaderMetrics.getRegistry().counter(counter).getCount());
// restart LogAppender
RaftServerTestUtil.restartLogAppenders(leader);
// Send some more messages
try(RaftClient client = cluster.createClient(leader.getId())) {
for(int i = 0; i < 10; i++) {
final RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + ++messageCount));
Assert.assertTrue(reply.isSuccess());
}
}
final RaftServer.Division newLeader = waitForLeader(cluster);
if (leader == newLeader) {
final GrpcServerMetrics newleaderMetrics = new GrpcServerMetrics(leader.getMemberId().toString());
// assert INCONSISTENCY counter >= 1
// If old LogAppender die before new LogAppender start, INCONSISTENCY equal to 1,
// else INCONSISTENCY greater than 1
Assert.assertTrue(newleaderMetrics.getRegistry().counter(counter).getCount() >= 1L);
}
}
}