blob: ef1f1219452fc197df2d1345394b567837f7309f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import javax.security.sasl.SaslException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.PrepRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.test.ClientBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This test class contains test cases related to race condition in complete
* ZooKeeper
*/
public class RaceConditionTest extends QuorumPeerTestBase {
protected static final Logger LOG = LoggerFactory.getLogger(RaceConditionTest.class);
private static int SERVER_COUNT = 3;
private MainThread[] mt;
/**
* Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2380.
* Deadlock while shutting down the ZooKeeper
*/
@Test
@Timeout(value = 30)
public void testRaceConditionBetweenLeaderAndAckRequestProcessor() throws Exception {
mt = startQuorum();
// get leader
QuorumPeer leader = getLeader(mt);
long oldLeaderCurrentEpoch = leader.getCurrentEpoch();
assertNotNull(leader, "Leader should not be null");
// shutdown 2 followers so that leader does not have majority and goes
// into looking state or following/leading state.
shutdownFollowers(mt);
/**
* <pre>
* Verify that there is no deadlock in following ways:
* 1) If leader is in LOOKING or FOLLOWING, we are sure there is no deadlock.
* 2) If leader in in LEADING state then we have to check that this LEADING state is
* after the leader election, not the old LEADING state.
* </pre>
*/
boolean leaderStateChanged = ClientBase
.waitForServerState(leader, 15000, QuorumStats.Provider.LOOKING_STATE, QuorumStats.Provider.FOLLOWING_STATE);
// Wait for the old leader to start completely
assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + leader.getClientPort(), CONNECTION_TIMEOUT),
"Failed to bring up the old leader server");
assertTrue(leaderStateChanged || (leader.getCurrentEpoch() > oldLeaderCurrentEpoch),
"Leader failed to transition to new state. Current state is " + leader.getServerState());
}
@AfterEach
public void tearDown() {
// stop all severs
if (null != mt) {
for (int i = 0; i < SERVER_COUNT; i++) {
try {
// With the defect, leader hangs here also, but with fix
// it does not
mt[i].shutdown();
} catch (InterruptedException e) {
LOG.warn("Quorum Peer interrupted while shutting it down", e);
}
}
}
}
private MainThread[] startQuorum() throws IOException {
final int[] clientPorts = new int[SERVER_COUNT];
StringBuilder sb = new StringBuilder();
String server;
for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ ":participant;127.0.0.1:" + clientPorts[i];
sb.append(server + "\n");
}
String currentQuorumCfgSection = sb.toString();
MainThread[] mt = new MainThread[SERVER_COUNT];
// start all the servers
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
@Override
public TestQPMain getTestQPMain() {
return new MockTestQPMain();
}
};
mt[i].start();
}
// ensure all servers started
for (int i = 0; i < SERVER_COUNT; i++) {
assertTrue(
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT),
"waiting for server " + i + " being up");
}
return mt;
}
private QuorumPeer getLeader(MainThread[] mt) {
for (int i = mt.length - 1; i >= 0; i--) {
QuorumPeer quorumPeer = mt[i].getQuorumPeer();
if (quorumPeer != null && ServerState.LEADING == quorumPeer.getPeerState()) {
return quorumPeer;
}
}
return null;
}
private void shutdownFollowers(MainThread[] mt) {
for (int i = 0; i < mt.length; i++) {
CustomQuorumPeer quorumPeer = (CustomQuorumPeer) mt[i].getQuorumPeer();
if (quorumPeer != null && ServerState.FOLLOWING == quorumPeer.getPeerState()) {
quorumPeer.setStopPing(true);
}
}
}
private static class CustomQuorumPeer extends QuorumPeer {
private boolean stopPing;
public CustomQuorumPeer() throws SaslException {
}
public void setStopPing(boolean stopPing) {
this.stopPing = stopPing;
}
@Override
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {
@Override
protected void processPacket(QuorumPacket qp) throws Exception {
if (stopPing && qp.getType() == Leader.PING) {
LOG.info("Follower skipped ping");
throw new SocketException("Socket time out while sending the ping response");
} else {
super.processPacket(qp);
}
}
};
}
@Override
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, this, this.getZkDb()) {
@Override
protected void setupRequestProcessors() {
/**
* This method is overridden to make a place to inject
* MockSyncRequestProcessor
*/
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());
commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false, getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(this, commitProcessor);
proposalProcessor.initialize();
prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
prepRequestProcessor.start();
firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
}
};
return new Leader(this, zk);
}
}
private static class MockSyncRequestProcessor extends SyncRequestProcessor {
public MockSyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
super(zks, nextProcessor);
}
@Override
public void shutdown() {
/**
* Add a request so that something is there for SyncRequestProcessor
* to process, while we are in shutdown flow
*/
Request request = new Request(null, 0, 0, ZooDefs.OpCode.delete, ByteBuffer.wrap("/deadLockIssue".getBytes()), null);
processRequest(request);
super.shutdown();
}
}
private static class MockProposalRequestProcessor extends ProposalRequestProcessor {
public MockProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
super(zks, nextProcessor);
/**
* The only purpose here is to inject the mocked
* SyncRequestProcessor
*/
AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
syncProcessor = new MockSyncRequestProcessor(zks, ackProcessor);
}
}
private static class MockTestQPMain extends TestQPMain {
@Override
protected QuorumPeer getQuorumPeer() throws SaslException {
return new CustomQuorumPeer();
}
}
}