blob: 9cf0d9b35c2ff2ad8edf762535aacce1679676fc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis;
import org.apache.log4j.Level;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.server.storage.RaftLogIOException;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.SizeInBytes;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
static {
LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
LogUtils.setLogLevel(RaftLog.LOG, Level.DEBUG);
LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
}
static final int NUM_PEERS = 3;
{
RaftServerConfigKeys.Log.Appender.setBufferByteLimit(getProperties(), SizeInBytes.valueOf("4KB"));
}
@Test
public void testHandleNotLeaderException() throws Exception {
runWithNewCluster(NUM_PEERS, cluster -> runTestHandleNotLeaderException(false, cluster));
}
/**
* Test handle both IOException and NotLeaderException
*/
@Test
public void testHandleNotLeaderAndIOException() throws Exception {
runWithNewCluster(NUM_PEERS, cluster -> runTestHandleNotLeaderException(true, cluster));
}
void runTestHandleNotLeaderException(boolean killNewLeader, CLUSTER cluster) throws Exception {
final RaftPeerId oldLeader = RaftTestUtil.waitForLeader(cluster).getId();
try(final RaftClient client = cluster.createClient(oldLeader)) {
sendMessage("m1", client);
// enforce leader change
final RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, oldLeader);
if (killNewLeader) {
// kill the new leader
cluster.killServer(newLeader);
}
final RaftClientRpc rpc = client.getClientRpc();
JavaUtils.attempt(() -> assertNotLeaderException(newLeader, "m2", oldLeader, rpc, cluster),
10, ONE_SECOND, "assertNotLeaderException", LOG);
sendMessage("m3", client);
}
}
RaftClientReply assertNotLeaderException(RaftPeerId expectedSuggestedLeader,
String messageId, RaftPeerId server, RaftClientRpc rpc, CLUSTER cluster) throws IOException {
final SimpleMessage message = new SimpleMessage(messageId);
final RaftClientReply reply = rpc.sendRequest(cluster.newRaftClientRequest(ClientId.randomId(), server, message));
Assert.assertNotNull(reply);
Assume.assumeFalse(reply.isSuccess());
final NotLeaderException nle = reply.getNotLeaderException();
Objects.requireNonNull(nle);
Assert.assertEquals(expectedSuggestedLeader, nle.getSuggestedLeader().getId());
return reply;
}
static void sendMessage(String message, RaftClient client) throws IOException {
final RaftClientReply reply = client.send(new SimpleMessage(message));
Assert.assertTrue(reply.isSuccess());
}
@Test
public void testNotLeaderExceptionWithReconf() throws Exception {
runWithNewCluster(NUM_PEERS, this::runTestNotLeaderExceptionWithReconf);
}
void runTestNotLeaderExceptionWithReconf(CLUSTER cluster) throws Exception {
final RaftPeerId oldLeader = RaftTestUtil.waitForLeader(cluster).getId();
try(final RaftClient client = cluster.createClient(oldLeader)) {
// enforce leader change
final RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, oldLeader);
// add two more peers
MiniRaftCluster.PeerChanges change = cluster.addNewPeers(new String[]{"ss1", "ss2"}, true);
// trigger setConfiguration
LOG.info("Start changing the configuration: {}", Arrays.asList(change.allPeersInNewConf));
try (final RaftClient c2 = cluster.createClient(newLeader)) {
RaftClientReply reply = c2.setConfiguration(change.allPeersInNewConf);
Assert.assertTrue(reply.isSuccess());
}
LOG.info(cluster.printServers());
// it is possible that the remote peer's rpc server is not ready. need retry
final RaftClientRpc rpc = client.getClientRpc();
final RaftClientReply reply = JavaUtils.attempt(
() -> assertNotLeaderException(newLeader, "m1", oldLeader, rpc, cluster),
10, ONE_SECOND, "assertNotLeaderException", LOG);
final Collection<RaftPeer> peers = cluster.getPeers();
final RaftPeer[] peersFromReply = reply.getNotLeaderException().getPeers();
Assert.assertEquals(peers.size(), peersFromReply.length);
for (RaftPeer p : peersFromReply) {
Assert.assertTrue(peers.contains(p));
}
sendMessage("m2", client);
}
}
@Test
public void testGroupMismatchException() throws Exception {
runWithSameCluster(NUM_PEERS, this::runTestGroupMismatchException);
}
void runTestGroupMismatchException(CLUSTER cluster) throws Exception {
final RaftGroup clusterGroup = cluster.getGroup();
Assert.assertEquals(NUM_PEERS, clusterGroup.getPeers().size());
final RaftGroup anotherGroup = RaftGroup.valueOf(RaftGroupId.randomId(), clusterGroup.getPeers());
Assert.assertNotEquals(clusterGroup.getGroupId(), anotherGroup.getGroupId());
// Create client using another group
try(RaftClient client = cluster.createClient(anotherGroup)) {
testFailureCase("send(..) with client group being different from the server group",
() -> client.send(Message.EMPTY),
GroupMismatchException.class);
testFailureCase("sendReadOnly(..) with client group being different from the server group",
() -> client.sendReadOnly(Message.EMPTY),
GroupMismatchException.class);
testFailureCase("setConfiguration(..) with client group being different from the server group",
() -> client.setConfiguration(RaftPeer.emptyArray()),
GroupMismatchException.class);
testFailureCase("groupRemove(..) with another group id",
() -> client.groupRemove(anotherGroup.getGroupId(), false, clusterGroup.getPeers().iterator().next().getId()),
GroupMismatchException.class);
}
}
@Test
public void testStaleReadException() throws Exception {
runWithSameCluster(NUM_PEERS, this::runTestStaleReadException);
}
void runTestStaleReadException(CLUSTER cluster) throws Exception {
RaftTestUtil.waitForLeader(cluster);
try (RaftClient client = cluster.createClient()) {
final RaftPeerId follower = cluster.getFollowers().iterator().next().getId();
testFailureCase("sendStaleRead(..) with a large commit index",
() -> client.sendStaleRead(Message.EMPTY, 1_000_000_000L, follower),
StateMachineException.class, StaleReadException.class);
}
}
@Test
public void testLogAppenderBufferCapacity() throws Exception {
runWithSameCluster(NUM_PEERS, this::runTestLogAppenderBufferCapacity);
}
void runTestLogAppenderBufferCapacity(CLUSTER cluster) throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
byte[] bytes = new byte[8192];
Arrays.fill(bytes, (byte) 1);
SimpleMessage msg = new SimpleMessage(new String(bytes));
try (RaftClient client = cluster.createClient(leaderId)) {
testFailureCase("testLogAppenderBufferCapacity",
() -> client.send(msg),
StateMachineException.class, RaftLogIOException.class);
}
}
}