blob: c9beb09476e5a19776d2026abfc84b9832508342 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.server.quorum;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.apache.zookeeper.test.ClientBase.createEmptyTestDir;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.LineNumberReader;
import java.io.StringReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import javax.security.sasl.SaslException;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.WriterAppender;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.metrics.BaseTestMetricsProvider;
import org.apache.zookeeper.metrics.impl.NullMetricsProvider;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.test.ClientBase;
import org.junit.Test;
/**
* Test stand-alone server.
*
*/
public class QuorumPeerMainTest extends QuorumPeerTestBase {
/**
* Verify the ability to start a cluster.
*/
public void testQuorumInternal(String addr) throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT_QP1 = PortAssignment.unique();
final int CLIENT_PORT_QP2 = PortAssignment.unique();
String server1 = String.format("server.1=%1$s:%2$s:%3$s;%4$s", addr, PortAssignment.unique(), PortAssignment.unique(), CLIENT_PORT_QP1);
String server2 = String.format("server.2=%1$s:%2$s:%3$s;%4$s", addr, PortAssignment.unique(), PortAssignment.unique(), CLIENT_PORT_QP2);
String quorumCfgSection = server1 + "\n" + server2;
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
q1.start();
q2.start();
assertTrue(
"waiting for server 1 being up",
ClientBase.waitForServerUp(addr + ":" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT));
assertTrue(
"waiting for server 2 being up",
ClientBase.waitForServerUp(addr + ":" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT));
QuorumPeer quorumPeer = q1.main.quorumPeer;
int tickTime = quorumPeer.getTickTime();
assertEquals(
"Default value of minimumSessionTimeOut is not considered",
tickTime * 2,
quorumPeer.getMinSessionTimeout());
assertEquals(
"Default value of maximumSessionTimeOut is not considered",
tickTime * 20,
quorumPeer.getMaxSessionTimeout());
ZooKeeper zk = new ZooKeeper(addr + ":" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, this);
waitForOne(zk, States.CONNECTED);
zk.create("/foo_q1", "foobar1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertEquals(new String(zk.getData("/foo_q1", null, null)), "foobar1");
zk.close();
zk = new ZooKeeper(addr + ":" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT, this);
waitForOne(zk, States.CONNECTED);
zk.create("/foo_q2", "foobar2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertEquals(new String(zk.getData("/foo_q2", null, null)), "foobar2");
zk.close();
q1.shutdown();
q2.shutdown();
assertTrue(
"waiting for server 1 down",
ClientBase.waitForServerDown(addr + ":" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
assertTrue(
"waiting for server 2 down",
ClientBase.waitForServerDown(addr + ":" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
}
/**
* Verify the ability to start a cluster.
*/
@Test
public void testQuorum() throws Exception {
testQuorumInternal("127.0.0.1");
}
/**
* Verify the ability to start a cluster. IN V6!!!!
*/
@Test
public void testQuorumV6() throws Exception {
testQuorumInternal("[::1]");
}
/**
* Test early leader abandonment.
*/
@Test
public void testEarlyLeaderAbandonment() throws Exception {
ClientBase.setupTestEnv();
final int SERVER_COUNT = 3;
final int[] clientPorts = new int[SERVER_COUNT];
StringBuilder sb = new StringBuilder();
for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
sb.append("server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + clientPorts[i] + "\n");
}
String quorumCfgSection = sb.toString();
MainThread[] mt = new MainThread[SERVER_COUNT];
ZooKeeper[] zk = new ZooKeeper[SERVER_COUNT];
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i] = new MainThread(i, clientPorts[i], quorumCfgSection);
mt[i].start();
zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
}
waitForAll(zk, States.CONNECTED);
// we need to shutdown and start back up to make sure that the create session isn't the first transaction since
// that is rather innocuous.
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].shutdown();
}
waitForAll(zk, States.CONNECTING);
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].start();
// Recreate a client session since the previous session was not persisted.
zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
}
waitForAll(zk, States.CONNECTED);
// ok lets find the leader and kill everything else, we have a few
// seconds, so it should be plenty of time
int leader = -1;
Map<Long, Proposal> outstanding = null;
for (int i = 0; i < SERVER_COUNT; i++) {
if (mt[i].main.quorumPeer.leader == null) {
mt[i].shutdown();
} else {
leader = i;
outstanding = mt[leader].main.quorumPeer.leader.outstandingProposals;
}
}
try {
zk[leader].create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
fail("create /zk" + leader + " should have failed");
} catch (KeeperException e) {
}
// just make sure that we actually did get it in process at the
// leader
assertTrue(outstanding.size() == 1);
assertTrue(outstanding.values().iterator().next().request.getHdr().getType() == OpCode.create);
// make sure it has a chance to write it to disk
Thread.sleep(1000);
mt[leader].shutdown();
waitForAll(zk, States.CONNECTING);
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
mt[i].start();
}
}
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
// Recreate a client session since the previous session was not persisted.
zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
waitForOne(zk[i], States.CONNECTED);
zk[i].create("/zk" + i, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
mt[leader].start();
waitForAll(zk, States.CONNECTED);
// make sure everything is consistent
for (int i = 0; i < SERVER_COUNT; i++) {
for (int j = 0; j < SERVER_COUNT; j++) {
if (i == leader) {
assertTrue((j == leader ? ("Leader (" + leader + ")") : ("Follower " + j))
+ " should not have /zk"
+ i, zk[j].exists("/zk" + i, false) == null);
} else {
assertTrue((j == leader ? ("Leader (" + leader + ")") : ("Follower " + j))
+ " does not have /zk"
+ i, zk[j].exists("/zk" + i, false) != null);
}
}
}
for (int i = 0; i < SERVER_COUNT; i++) {
zk[i].close();
}
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].shutdown();
}
}
/**
* Test the case of server with highest zxid not present at leader election and joining later.
* This test case is for reproducing the issue and fixing the bug mentioned in ZOOKEEPER-1154
* and ZOOKEEPER-1156.
*/
@Test
public void testHighestZxidJoinLate() throws Exception {
numServers = 3;
servers = LaunchServers(numServers);
String path = "/hzxidtest";
int leader = servers.findLeader();
// make sure there is a leader
assertTrue("There should be a leader", leader >= 0);
int nonleader = (leader + 1) % numServers;
byte[] input = new byte[1];
input[0] = 1;
byte[] output;
// Create a couple of nodes
servers.zk[leader].create(path + leader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
servers.zk[leader].create(path + nonleader, input, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// make sure the updates indeed committed. If it is not
// the following statement will throw.
output = servers.zk[leader].getData(path + nonleader, false, null);
// Shutdown every one else but the leader
for (int i = 0; i < numServers; i++) {
if (i != leader) {
servers.mt[i].shutdown();
}
}
input[0] = 2;
// Update the node on the leader
servers.zk[leader].setData(path + leader, input, -1, null, null);
// wait some time to let this get written to disk
Thread.sleep(500);
// shut the leader down
servers.mt[leader].shutdown();
System.gc();
waitForAll(servers.zk, States.CONNECTING);
// Start everyone but the leader
for (int i = 0; i < numServers; i++) {
if (i != leader) {
servers.mt[i].start();
}
}
// wait to connect to one of these
waitForOne(servers.zk[nonleader], States.CONNECTED);
// validate that the old value is there and not the new one
output = servers.zk[nonleader].getData(path + leader, false, null);
assertEquals("Expecting old value 1 since 2 isn't committed yet", output[0], 1);
// Do some other update, so we bump the maxCommttedZxid
// by setting the value to 2
servers.zk[nonleader].setData(path + nonleader, input, -1);
// start the old leader
servers.mt[leader].start();
// connect to it
waitForOne(servers.zk[leader], States.CONNECTED);
// make sure it doesn't have the new value that it alone had logged
output = servers.zk[leader].getData(path + leader, false, null);
assertEquals("Validating that the deposed leader has rolled back that change it had written", output[0], 1);
// make sure the leader has the subsequent changes that were made while it was offline
output = servers.zk[leader].getData(path + nonleader, false, null);
assertEquals("Validating that the deposed leader caught up on changes it missed", output[0], 2);
}
/**
* This test validates that if a quorum member determines that it is leader without the support of the rest of the
* quorum (the other members do not believe it to be the leader) it will stop attempting to lead and become a follower.
*
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testElectionFraud() throws IOException, InterruptedException {
// capture QuorumPeer logging
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = getConsoleAppender(os, Level.INFO);
Logger qlogger = Logger.getLogger(QuorumPeer.class);
qlogger.addAppender(appender);
numServers = 3;
// used for assertions later
boolean foundLeading = false;
boolean foundLooking = false;
boolean foundFollowing = false;
try {
// spin up a quorum, we use a small ticktime to make the test run faster
servers = LaunchServers(numServers, 500);
// find the leader
int trueLeader = servers.findLeader();
assertTrue("There should be a leader", trueLeader >= 0);
// find a follower
int falseLeader = (trueLeader + 1) % numServers;
assertTrue("All servers should join the quorum", servers.mt[falseLeader].main.quorumPeer.follower
!= null);
// to keep the quorum peer running and force it to go into the looking state, we kill leader election
servers.mt[falseLeader].main.quorumPeer.electionAlg.shutdown();
servers.mt[falseLeader].main.quorumPeer.follower.getSocket().close();
// wait for the falseLeader to disconnect
waitForOne(servers.zk[falseLeader], States.CONNECTING);
// convince falseLeader that it is the leader
servers.mt[falseLeader].main.quorumPeer.setPeerState(QuorumPeer.ServerState.LEADING);
// provide time for the falseleader to realize no followers have connected
// (this is twice the timeout used in Leader#getEpochToPropose)
Thread.sleep(2 * servers.mt[falseLeader].main.quorumPeer.initLimit * servers.mt[falseLeader].main.quorumPeer.tickTime);
// Restart leader election
servers.mt[falseLeader].main.quorumPeer.startLeaderElection();
// The previous client connection to falseLeader likely closed, create a new one
servers.zk[falseLeader] = new ZooKeeper(
"127.0.0.1:" + servers.mt[falseLeader].getClientPort(),
ClientBase.CONNECTION_TIMEOUT,
this);
// Wait for falseLeader to rejoin the quorum
waitForOne(servers.zk[falseLeader], States.CONNECTED);
// and ensure trueLeader is still the leader
assertTrue(servers.mt[trueLeader].main.quorumPeer.leader != null);
// Look through the logs for output that indicates the falseLeader is LEADING, then LOOKING, then FOLLOWING
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
Pattern leading = Pattern.compile(".*myid=" + falseLeader + ".*LEADING.*");
Pattern looking = Pattern.compile(".*myid=" + falseLeader + ".*LOOKING.*");
Pattern following = Pattern.compile(".*myid=" + falseLeader + ".*FOLLOWING.*");
String line;
while ((line = r.readLine()) != null) {
if (!foundLeading) {
foundLeading = leading.matcher(line).matches();
} else if (!foundLooking) {
foundLooking = looking.matcher(line).matches();
} else if (following.matcher(line).matches()) {
foundFollowing = true;
break;
}
}
} finally {
qlogger.removeAppender(appender);
}
assertTrue("falseLeader never attempts to become leader", foundLeading);
assertTrue("falseLeader never gives up on leadership", foundLooking);
assertTrue("falseLeader never rejoins the quorum", foundFollowing);
}
/**
* Verify handling of bad quorum address
*/
@Test
public void testBadPeerAddressInQuorum() throws Exception {
ClientBase.setupTestEnv();
// setup the logger to capture all logs
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = getConsoleAppender(os, Level.WARN);
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
qlogger.addAppender(appender);
try {
final int CLIENT_PORT_QP1 = PortAssignment.unique();
final int CLIENT_PORT_QP2 = PortAssignment.unique();
String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ "\nserver.2=fee.fii.foo.fum:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2;
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
q1.start();
boolean isup = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 30000);
assertFalse("Server never came up", isup);
q1.shutdown();
assertTrue(
"waiting for server 1 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
} finally {
qlogger.removeAppender(appender);
}
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
String line;
boolean found = false;
Pattern p = Pattern.compile(".*None of the addresses .* are reachable for sid 2");
while ((line = r.readLine()) != null) {
found = p.matcher(line).matches();
if (found) {
break;
}
}
assertTrue("complains about host", found);
}
/**
* Verify handling of inconsistent peer type
*/
@Test
public void testInconsistentPeerType() throws Exception {
ClientBase.setupTestEnv();
// setup the logger to capture all logs
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = getConsoleAppender(os, Level.INFO);
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
qlogger.addAppender(appender);
// test the most likely situation only: server is stated as observer in
// servers list, but there's no "peerType=observer" token in config
try {
final int CLIENT_PORT_QP1 = PortAssignment.unique();
final int CLIENT_PORT_QP2 = PortAssignment.unique();
final int CLIENT_PORT_QP3 = PortAssignment.unique();
String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2
+ "\nserver.3=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":observer" + ";" + CLIENT_PORT_QP3;
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
MainThread q3 = new MainThread(3, CLIENT_PORT_QP3, quorumCfgSection);
q1.start();
q2.start();
q3.start();
assertTrue(
"waiting for server 1 being up",
ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT));
assertTrue(
"waiting for server 2 being up",
ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT));
assertTrue(
"waiting for server 3 being up",
ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP3, CONNECTION_TIMEOUT));
q1.shutdown();
q2.shutdown();
q3.shutdown();
assertTrue(
"waiting for server 1 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
assertTrue(
"waiting for server 2 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
assertTrue(
"waiting for server 3 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP3, ClientBase.CONNECTION_TIMEOUT));
} finally {
qlogger.removeAppender(appender);
}
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
String line;
boolean warningPresent = false;
boolean defaultedToObserver = false;
Pattern pWarn = Pattern.compile(".*Peer type from servers list.* doesn't match peerType.*");
Pattern pObserve = Pattern.compile(".*OBSERVING.*");
while ((line = r.readLine()) != null) {
if (pWarn.matcher(line).matches()) {
warningPresent = true;
}
if (pObserve.matcher(line).matches()) {
defaultedToObserver = true;
}
if (warningPresent && defaultedToObserver) {
break;
}
}
assertTrue("Should warn about inconsistent peer type", warningPresent && defaultedToObserver);
}
/**
* verify if bad packets are being handled properly
* at the quorum port
* @throws Exception
*/
@Test
public void testBadPackets() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT_QP1 = PortAssignment.unique();
final int CLIENT_PORT_QP2 = PortAssignment.unique();
int electionPort1 = PortAssignment.unique();
int electionPort2 = PortAssignment.unique();
String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + electionPort1 + ";" + CLIENT_PORT_QP1
+ "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + electionPort2 + ";" + CLIENT_PORT_QP2;
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
q1.start();
q2.start();
assertTrue(
"waiting for server 1 being up",
ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT));
assertTrue(
"waiting for server 2 being up",
ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT));
byte[] b = new byte[4];
int length = 1024 * 1024 * 1024;
ByteBuffer buff = ByteBuffer.wrap(b);
buff.putInt(length);
buff.position(0);
SocketChannel s = SocketChannel.open(new InetSocketAddress("127.0.0.1", electionPort1));
s.write(buff);
s.close();
buff.position(0);
s = SocketChannel.open(new InetSocketAddress("127.0.0.1", electionPort2));
s.write(buff);
s.close();
ZooKeeper zk = new ZooKeeper("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, this);
waitForOne(zk, States.CONNECTED);
zk.create("/foo_q1", "foobar1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
assertEquals(new String(zk.getData("/foo_q1", null, null)), "foobar1");
zk.close();
q1.shutdown();
q2.shutdown();
}
/**
* Verify handling of quorum defaults
* * default electionAlg is fast leader election
*/
@Test
public void testQuorumDefaults() throws Exception {
ClientBase.setupTestEnv();
// setup the logger to capture all logs
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = getConsoleAppender(os, Level.INFO);
appender.setImmediateFlush(true);
Logger zlogger = Logger.getLogger("org.apache.zookeeper");
zlogger.addAppender(appender);
try {
final int CLIENT_PORT_QP1 = PortAssignment.unique();
final int CLIENT_PORT_QP2 = PortAssignment.unique();
String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2;
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
q1.start();
q2.start();
assertTrue(
"waiting for server 1 being up",
ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT));
assertTrue(
"waiting for server 2 being up",
ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT));
q1.shutdown();
q2.shutdown();
assertTrue(
"waiting for server 1 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
assertTrue(
"waiting for server 2 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
} finally {
zlogger.removeAppender(appender);
}
os.close();
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
String line;
boolean found = false;
Pattern p = Pattern.compile(".*FastLeaderElection.*");
while ((line = r.readLine()) != null) {
found = p.matcher(line).matches();
if (found) {
break;
}
}
assertTrue("fastleaderelection used", found);
}
/**
* Verifies that QuorumPeer exits immediately
*/
@Test
public void testQuorumPeerExitTime() throws Exception {
long maxwait = 3000;
final int CLIENT_PORT_QP1 = PortAssignment.unique();
String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + PortAssignment.unique();
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
q1.start();
// Let the notifications timeout
Thread.sleep(30000);
long start = Time.currentElapsedTime();
q1.shutdown();
long end = Time.currentElapsedTime();
if ((end - start) > maxwait) {
fail("QuorumPeer took " + (end - start) + " to shutdown, expected " + maxwait);
}
}
/**
* Test verifies that the server is able to redefine the min/max session
* timeouts
*/
@Test
public void testMinMaxSessionTimeOut() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT_QP1 = PortAssignment.unique();
final int CLIENT_PORT_QP2 = PortAssignment.unique();
String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique();
final int minSessionTimeOut = 10000;
final int maxSessionTimeOut = 15000;
final String configs = "maxSessionTimeout=" + maxSessionTimeOut + "\n"
+ "minSessionTimeout=" + minSessionTimeOut + "\n";
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection, configs);
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, configs);
q1.start();
q2.start();
assertTrue(
"waiting for server 1 being up",
ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT));
assertTrue(
"waiting for server 2 being up",
ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT));
QuorumPeer quorumPeer = q1.main.quorumPeer;
assertEquals("minimumSessionTimeOut is not considered", minSessionTimeOut, quorumPeer.getMinSessionTimeout());
assertEquals("maximumSessionTimeOut is not considered", maxSessionTimeOut, quorumPeer.getMaxSessionTimeout());
}
/**
* Test verifies that the server is able to redefine if user configured only
* minSessionTimeout limit
*/
@Test
public void testWithOnlyMinSessionTimeout() throws Exception {
ClientBase.setupTestEnv();
final int CLIENT_PORT_QP1 = PortAssignment.unique();
final int CLIENT_PORT_QP2 = PortAssignment.unique();
String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique();
final int minSessionTimeOut = 15000;
final String configs = "minSessionTimeout=" + minSessionTimeOut + "\n";
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection, configs);
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection, configs);
q1.start();
q2.start();
assertTrue(
"waiting for server 1 being up",
ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, CONNECTION_TIMEOUT));
assertTrue(
"waiting for server 2 being up",
ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, CONNECTION_TIMEOUT));
QuorumPeer quorumPeer = q1.main.quorumPeer;
final int maxSessionTimeOut = quorumPeer.tickTime * 20;
assertEquals("minimumSessionTimeOut is not considered", minSessionTimeOut, quorumPeer.getMinSessionTimeout());
assertEquals("maximumSessionTimeOut is wrong", maxSessionTimeOut, quorumPeer.getMaxSessionTimeout());
}
@Test
public void testFailedTxnAsPartOfQuorumLoss() throws Exception {
final int LEADER_TIMEOUT_MS = 10_000;
// 1. start up server and wait for leader election to finish
ClientBase.setupTestEnv();
final int SERVER_COUNT = 3;
servers = LaunchServers(SERVER_COUNT);
waitForAll(servers, States.CONNECTED);
// we need to shutdown and start back up to make sure that the create session isn't the first transaction since
// that is rather innocuous.
servers.shutDownAllServers();
waitForAll(servers, States.CONNECTING);
servers.restartAllServersAndClients(this);
waitForAll(servers, States.CONNECTED);
// 2. kill all followers
int leader = servers.findLeader();
Map<Long, Proposal> outstanding = servers.mt[leader].main.quorumPeer.leader.outstandingProposals;
// increase the tick time to delay the leader going to looking
int previousTick = servers.mt[leader].main.quorumPeer.tickTime;
servers.mt[leader].main.quorumPeer.tickTime = LEADER_TIMEOUT_MS;
// let the previous tick on the leader exhaust itself so the new tick time takes effect
Thread.sleep(previousTick);
LOG.warn("LEADER {}", leader);
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
servers.mt[i].shutdown();
}
}
// 3. start up the followers to form a new quorum
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
servers.mt[i].start();
}
}
// 4. wait one of the follower to be the new leader
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
// Recreate a client session since the previous session was not persisted.
servers.restartClient(i, this);
waitForOne(servers.zk[i], States.CONNECTED);
}
}
// 5. send a create request to old leader and make sure it's synced to disk,
// which means it acked from itself
try {
servers.zk[leader].create("/zk" + leader, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
fail("create /zk" + leader + " should have failed");
} catch (KeeperException e) {
}
// just make sure that we actually did get it in process at the
// leader
// there can be extra sessionClose proposals
assertTrue(outstanding.size() > 0);
Proposal p = findProposalOfType(outstanding, OpCode.create);
LOG.info("Old leader id: {}. All proposals: {}", leader, outstanding);
assertNotNull("Old leader doesn't have 'create' proposal", p);
// make sure it has a chance to write it to disk
int sleepTime = 0;
Long longLeader = (long) leader;
while (!p.qvAcksetPairs.get(0).getAckset().contains(longLeader)) {
if (sleepTime > 2000) {
fail("Transaction not synced to disk within 1 second " + p.qvAcksetPairs.get(0).getAckset() + " expected " + leader);
}
Thread.sleep(100);
sleepTime += 100;
}
// 6. wait for the leader to quit due to not enough followers and come back up as a part of the new quorum
LOG.info("Waiting for leader {} to timeout followers", leader);
sleepTime = 0;
Follower f = servers.mt[leader].main.quorumPeer.follower;
while (f == null || !f.isRunning()) {
if (sleepTime > LEADER_TIMEOUT_MS * 2) {
fail("Took too long for old leader to time out "
+ servers.mt[leader].main.quorumPeer.getPeerState());
}
Thread.sleep(100);
sleepTime += 100;
f = servers.mt[leader].main.quorumPeer.follower;
}
int newLeader = servers.findLeader();
// make sure a different leader was elected
assertNotEquals(leader, newLeader);
// 7. restart the previous leader to force it to replay the edits and possibly come up in a bad state
servers.mt[leader].shutdown();
servers.mt[leader].start();
// old client session can expire, restart it
servers.restartClient(leader, this);
waitForAll(servers, States.CONNECTED);
// 8. check the node exist in previous leader but not others
// make sure everything is consistent
for (int i = 0; i < SERVER_COUNT; i++) {
assertNull(
"server " + i + " should not have /zk" + leader,
servers.zk[i].exists("/zk" + leader, false));
}
}
/**
* Verify that a node without the leader in its view will not attempt to connect to the leader.
*/
@Test
public void testLeaderOutOfView() throws Exception {
ClientBase.setupTestEnv();
int numServers = 3;
// used for assertions later
boolean foundLeading = false;
boolean foundFollowing = false;
// capture QuorumPeer logging
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = getConsoleAppender(os, Level.DEBUG);
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
qlogger.addAppender(appender);
try {
Servers svrs = new Servers();
svrs.clientPorts = new int[numServers];
for (int i = 0; i < numServers; i++) {
svrs.clientPorts[i] = PortAssignment.unique();
}
String quorumCfgIncomplete = getUniquePortCfgForId(1) + "\n" + getUniquePortCfgForId(2);
String quorumCfgComplete = quorumCfgIncomplete + "\n" + getUniquePortCfgForId(3);
svrs.mt = new MainThread[3];
// Node 1 is started without the leader (3) in its config view
svrs.mt[0] = new MainThread(1, svrs.clientPorts[0], quorumCfgIncomplete);
for (int i = 1; i < numServers; i++) {
svrs.mt[i] = new MainThread(i + 1, svrs.clientPorts[i], quorumCfgComplete);
}
// Node 1 must be started first, before quorum is formed, to trigger the attempted invalid connection to 3
svrs.mt[0].start();
QuorumPeer quorumPeer1 = waitForQuorumPeer(svrs.mt[0], CONNECTION_TIMEOUT);
assertTrue(quorumPeer1.getPeerState() == QuorumPeer.ServerState.LOOKING);
// Node 3 started second to avoid 1 and 2 forming a quorum before 3 starts up
int highestServerIndex = numServers - 1;
svrs.mt[highestServerIndex].start();
QuorumPeer quorumPeer3 = waitForQuorumPeer(svrs.mt[highestServerIndex], CONNECTION_TIMEOUT);
assertTrue(quorumPeer3.getPeerState() == QuorumPeer.ServerState.LOOKING);
// Node 2 started last, kicks off leader election
for (int i = 1; i < highestServerIndex; i++) {
svrs.mt[i].start();
}
// Nodes 2 and 3 now form quorum and fully start. 1 attempts to vote for 3, fails, returns to LOOKING state
for (int i = 1; i < numServers; i++) {
assertTrue(
"waiting for server to start",
ClientBase.waitForServerUp("127.0.0.1:" + svrs.clientPorts[i], CONNECTION_TIMEOUT));
}
assertTrue(svrs.mt[0].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.LOOKING);
assertTrue(svrs.mt[highestServerIndex].getQuorumPeer().getPeerState()
== QuorumPeer.ServerState.LEADING);
for (int i = 1; i < highestServerIndex; i++) {
assertTrue(svrs.mt[i].getQuorumPeer().getPeerState() == QuorumPeer.ServerState.FOLLOWING);
}
// Look through the logs for output that indicates Node 1 is LEADING or FOLLOWING
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
Pattern leading = Pattern.compile(".*myid=1.*QuorumPeer.*LEADING.*");
Pattern following = Pattern.compile(".*myid=1.*QuorumPeer.*FOLLOWING.*");
String line;
while ((line = r.readLine()) != null && !foundLeading && !foundFollowing) {
foundLeading = leading.matcher(line).matches();
foundFollowing = following.matcher(line).matches();
}
} finally {
qlogger.removeAppender(appender);
}
assertFalse("Corrupt peer should never become leader", foundLeading);
assertFalse("Corrupt peer should not attempt connection to out of view leader", foundFollowing);
}
@Test
public void testDataDirAndDataLogDir() throws Exception {
File dataDir = createEmptyTestDir();
File dataLogDir = createEmptyTestDir();
// Arrange
try {
QuorumPeerConfig configMock = mock(QuorumPeerConfig.class);
when(configMock.getDataDir()).thenReturn(dataDir);
when(configMock.getDataLogDir()).thenReturn(dataLogDir);
when(configMock.getMetricsProviderClassName()).thenReturn(NullMetricsProvider.class.getName());
QuorumPeer qpMock = mock(QuorumPeer.class);
doCallRealMethod().when(qpMock).setTxnFactory(any(FileTxnSnapLog.class));
when(qpMock.getTxnFactory()).thenCallRealMethod();
InjectableQuorumPeerMain qpMain = new InjectableQuorumPeerMain(qpMock);
// Act
qpMain.runFromConfig(configMock);
// Assert
FileTxnSnapLog txnFactory = qpMain.getQuorumPeer().getTxnFactory();
assertEquals(Paths.get(dataLogDir.getAbsolutePath(), "version-2").toString(), txnFactory.getDataDir().getAbsolutePath());
assertEquals(Paths.get(dataDir.getAbsolutePath(), "version-2").toString(), txnFactory.getSnapDir().getAbsolutePath());
} finally {
FileUtils.deleteDirectory(dataDir);
FileUtils.deleteDirectory(dataLogDir);
}
}
private class InjectableQuorumPeerMain extends QuorumPeerMain {
QuorumPeer qp;
InjectableQuorumPeerMain(QuorumPeer qp) {
this.qp = qp;
}
@Override
protected QuorumPeer getQuorumPeer() {
return qp;
}
}
private WriterAppender getConsoleAppender(ByteArrayOutputStream os, Level level) {
String loggingPattern = ((PatternLayout) Logger.getRootLogger().getAppender("CONSOLE").getLayout()).getConversionPattern();
WriterAppender appender = new WriterAppender(new PatternLayout(loggingPattern), os);
appender.setThreshold(level);
return appender;
}
private String getUniquePortCfgForId(int id) {
return String.format("server.%d=127.0.0.1:%d:%d", id, PortAssignment.unique(), PortAssignment.unique());
}
private QuorumPeer waitForQuorumPeer(MainThread mainThread, int timeout) throws TimeoutException {
long start = Time.currentElapsedTime();
while (true) {
QuorumPeer quorumPeer = mainThread.isAlive() ? mainThread.getQuorumPeer() : null;
if (quorumPeer != null) {
return quorumPeer;
}
if (Time.currentElapsedTime() > start + timeout) {
LOG.error("Timed out while waiting for QuorumPeer");
throw new TimeoutException();
}
try {
Thread.sleep(250);
} catch (InterruptedException e) {
// ignore
}
}
}
private Proposal findProposalOfType(Map<Long, Proposal> proposals, int type) {
for (Proposal proposal : proposals.values()) {
if (proposal.request.getHdr().getType() == type) {
return proposal;
}
}
return null;
}
/**
* Currently, in SNAP sync, the leader will start queuing the
* proposal/commits and the NEWLEADER packet before sending
* over the snapshot over wire. So it's possible that the zxid
* associated with the snapshot might be higher than all the
* packets queued before NEWLEADER.
*
* When the follower received the snapshot, it will apply all
* the txns queued before NEWLEADER, which may not cover all
* the txns up to the zxid in the snapshot. After that, it
* will write the snapshot out to disk with the zxid associated
* with the snapshot. In case the server crashed after writing
* this out, when loading the data from disk, it will use zxid
* of the snapshot file to sync with leader, and it could cause
* data inconsistent, because we only replayed partial of the
* historical data during previous syncing.
*
* This test case is going to cover and simulate this scenario
* and make sure there is no data inconsistency issue after fix.
*/
@Test
public void testInconsistentDueToNewLeaderOrder() throws Exception {
// 1. set up an ensemble with 3 servers
final int ENSEMBLE_SERVERS = 3;
final int[] clientPorts = new int[ENSEMBLE_SERVERS];
StringBuilder sb = new StringBuilder();
String server;
for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ ":participant;127.0.0.1:" + clientPorts[i];
sb.append(server + "\n");
}
String currentQuorumCfgSection = sb.toString();
// start servers
MainThread[] mt = new MainThread[ENSEMBLE_SERVERS];
ZooKeeper[] zk = new ZooKeeper[ENSEMBLE_SERVERS];
Context[] contexts = new Context[ENSEMBLE_SERVERS];
for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
final Context context = new Context();
contexts[i] = context;
mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
@Override
public TestQPMain getTestQPMain() {
return new CustomizedQPMain(context);
}
};
mt[i].start();
zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
}
waitForAll(zk, States.CONNECTED);
LOG.info("all servers started");
String nodePath = "/testInconsistentDueToNewLeader";
int leaderId = -1;
int followerA = -1;
for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
if (mt[i].main.quorumPeer.leader != null) {
leaderId = i;
} else if (followerA == -1) {
followerA = i;
}
}
LOG.info("shutdown follower {}", followerA);
mt[followerA].shutdown();
waitForOne(zk[followerA], States.CONNECTING);
try {
// 2. set force snapshot to be true
LOG.info("force snapshot sync");
System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
// 3. create a node
String initialValue = "1";
final ZooKeeper leaderZk = zk[leaderId];
leaderZk.create(nodePath, initialValue.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
LOG.info("created node {} with value {}", nodePath, initialValue);
CustomQuorumPeer leaderQuorumPeer = (CustomQuorumPeer) mt[leaderId].main.quorumPeer;
// 4. on the customized leader catch the startForwarding call
// (without synchronized), set the node to value v1, then
// call the super.startForwarding to generate the ongoing
// txn proposal and commit for v1 value update
leaderQuorumPeer.setStartForwardingListener(new StartForwardingListener() {
@Override
public void start() {
if (!Boolean.getBoolean(LearnerHandler.FORCE_SNAP_SYNC)) {
return;
}
final String value = "2";
LOG.info("start forwarding, set {} to {}", nodePath, value);
// use async, otherwise it will block the logLock in
// ZKDatabase and the setData request will timeout
try {
leaderZk.setData(nodePath, value.getBytes(), -1, new AsyncCallback.StatCallback() {
public void processResult(int rc, String path, Object ctx, Stat stat) {
}
}, null);
// wait for the setData txn being populated
Thread.sleep(1000);
} catch (Exception e) {
LOG.error("error when set {} to {}", nodePath, value, e);
}
}
});
// 5. on the customized leader catch the beginSnapshot call in
// LearnerSyncThrottler to set the node to value v2,
// wait it hit data tree
leaderQuorumPeer.setBeginSnapshotListener(new BeginSnapshotListener() {
@Override
public void start() {
String value = "3";
LOG.info("before sending snapshot, set {} to {}", nodePath, value);
try {
leaderZk.setData(nodePath, value.getBytes(), -1);
LOG.info("successfully set {} to {}", nodePath, value);
} catch (Exception e) {
LOG.error("error when set {} to {}, {}", nodePath, value, e);
}
}
});
// 6. exit follower A after taking snapshot
CustomQuorumPeer followerAQuorumPeer = ((CustomQuorumPeer) mt[followerA].main.quorumPeer);
LOG.info("set exit when ack new leader packet on {}", followerA);
contexts[followerA].exitWhenAckNewLeader = true;
CountDownLatch latch = new CountDownLatch(1);
final MainThread followerAMT = mt[followerA];
contexts[followerA].newLeaderAckCallback = new NewLeaderAckCallback() {
@Override
public void start() {
try {
latch.countDown();
followerAMT.shutdown();
} catch (Exception e) {
}
}
};
// 7. start follower A to do snapshot sync
LOG.info("starting follower {}", followerA);
mt[followerA].start();
assertTrue(latch.await(30, TimeUnit.SECONDS));
// 8. now we have invalid data on disk, let's load it and verify
LOG.info("disable exit when ack new leader packet on {}", followerA);
System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "false");
contexts[followerA].exitWhenAckNewLeader = true;
contexts[followerA].newLeaderAckCallback = null;
LOG.info("restarting follower {}", followerA);
mt[followerA].start();
zk[followerA].close();
zk[followerA] = new ZooKeeper("127.0.0.1:" + clientPorts[followerA], ClientBase.CONNECTION_TIMEOUT, this);
// 9. start follower A, after it's in broadcast state, make sure
// the node value is same as what we have on leader
waitForOne(zk[followerA], States.CONNECTED);
assertEquals(
new String(zk[followerA].getData(nodePath, null, null)),
new String(zk[leaderId].getData(nodePath, null, null)));
} finally {
System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC);
for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
mt[i].shutdown();
zk[i].close();
}
}
}
/**
* Test leader election finished with 1 disloyal voter and without
* majority followers, expecting to see the quorum stablized only
* after waiting for maxTimeToWaitForEpoch.
*/
@Test
public void testLeaderElectionWithDisloyalVoter() throws IOException {
testLeaderElection(5, 3, 1000, 10000);
}
/**
* Test leader election finished with 1 disloyal voter and majority
* followers, expecting to see the quorum stablized immediately even
* there is 1 disloyal voter.
*
* Set the maxTimeToWaitForEpoch to 3s and maxTimeWaitForServerUp to
* 2s to confirm this.
*/
@Test
public void testLeaderElectionWithDisloyalVoter_stillHasMajority() throws IOException {
testLeaderElection(5, 5, 3000, 20000);
}
void testLeaderElection(int totalServers, int serversToStart, int maxTimeToWaitForEpoch, int maxTimeWaitForServerUp) throws IOException {
Leader.setMaxTimeToWaitForEpoch(maxTimeToWaitForEpoch);
// set up config for an ensemble with given number of servers
servers = new Servers();
int ENSEMBLE_SERVERS = totalServers;
final int[] clientPorts = new int[ENSEMBLE_SERVERS];
StringBuilder sb = new StringBuilder();
String server;
for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
clientPorts[i] = PortAssignment.unique();
server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ ":participant;127.0.0.1:" + clientPorts[i];
sb.append(server + "\n");
}
String currentQuorumCfgSection = sb.toString();
// start servers
int SERVERS_TO_START = serversToStart;
MainThread[] mt = new MainThread[SERVERS_TO_START];
Context[] contexts = new Context[SERVERS_TO_START];
servers.mt = mt;
numServers = SERVERS_TO_START;
for (int i = 0; i < SERVERS_TO_START; i++) {
// hook the 1st follower to quit following after leader election
// simulate the behavior of changing voting during looking
final Context context = new Context();
if (i == 0) {
context.quitFollowing = true;
}
contexts[i] = context;
mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
@Override
public TestQPMain getTestQPMain() {
return new CustomizedQPMain(context);
}
};
mt[i].start();
}
// make sure the quorum can be formed within initLimit * tickTime
// the default setting is 10 * 4000 = 40000 ms
for (int i = 0; i < SERVERS_TO_START; i++) {
assertTrue(
"Server " + i + " should have joined quorum by now",
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], maxTimeWaitForServerUp));
}
}
/**
* Verify boot works configuring a MetricsProvider
*/
@Test
public void testMetricsProviderLifecycle() throws Exception {
ClientBase.setupTestEnv();
BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.reset();
// setup the logger to capture all logs
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = getConsoleAppender(os, Level.WARN);
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
qlogger.addAppender(appender);
try {
final int CLIENT_PORT_QP1 = PortAssignment.unique();
final int CLIENT_PORT_QP2 = PortAssignment.unique();
String quorumCfgSectionServer = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ "\nserver.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2 + "\n";
// server 1 boots with a MetricsProvider
String quorumCfgSectionServer1 = quorumCfgSectionServer
+ "metricsProvider.className=" + BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.class.getName() + "\n";
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSectionServer1);
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSectionServer);
q1.start();
q2.start();
boolean isup1 = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 30000);
boolean isup2 = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, 30000);
assertTrue("Server 1 never came up", isup1);
assertTrue("Server 2 never came up", isup2);
q1.shutdown();
q2.shutdown();
assertTrue(
"waiting for server 1 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
assertTrue(
"waiting for server 2 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
} finally {
qlogger.removeAppender(appender);
}
assertTrue("metrics provider lifecycle error", BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.configureCalled.get());
assertTrue("metrics provider lifecycle error", BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.startCalled.get());
assertTrue("metrics provider lifecycle error", BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.getRootContextCalled.get());
assertTrue("metrics provider lifecycle error", BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.stopCalled.get());
}
/**
* Test verifies that configuration is passed to the MetricsProvider.
*/
@Test
public void testMetricsProviderConfiguration() throws Exception {
ClientBase.setupTestEnv();
BaseTestMetricsProvider.MetricsProviderWithConfiguration.httpPort.set(0);
// setup the logger to capture all logs
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = getConsoleAppender(os, Level.WARN);
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
qlogger.addAppender(appender);
try {
final int CLIENT_PORT_QP1 = PortAssignment.unique();
final int CLIENT_PORT_QP2 = PortAssignment.unique();
String quorumCfgSectionServer = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\n"
+ "server.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2 + "\n";
// server 1 boots with a MetricsProvider
String quorumCfgSectionServer1 = quorumCfgSectionServer
+ "metricsProvider.className="
+ BaseTestMetricsProvider.MetricsProviderWithConfiguration.class.getName()
+ "\n" + "metricsProvider.httpPort=1234";
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSectionServer1);
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSectionServer);
q1.start();
q2.start();
boolean isup1 = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 30000);
boolean isup2 = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, 30000);
assertTrue("Server 1 never came up", isup1);
assertTrue("Server 2 never came up", isup2);
q1.shutdown();
q2.shutdown();
assertTrue(
"waiting for server 1 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
assertTrue(
"waiting for server 2 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
} finally {
qlogger.removeAppender(appender);
}
assertEquals(1234, BaseTestMetricsProvider.MetricsProviderWithConfiguration.httpPort.get());
}
/**
* Test verifies that the server shouldn't be affected but runtime errors on stop()
*/
@Test
public void testFaultyMetricsProviderOnStop() throws Exception {
ClientBase.setupTestEnv();
BaseTestMetricsProvider.MetricsProviderCapturingLifecycle.reset();
// setup the logger to capture all logs
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = getConsoleAppender(os, Level.WARN);
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
qlogger.addAppender(appender);
try {
final int CLIENT_PORT_QP1 = PortAssignment.unique();
final int CLIENT_PORT_QP2 = PortAssignment.unique();
String quorumCfgSectionServer = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1 + "\n"
+ "server.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2 + "\n";
// server 1 boots with a MetricsProvider
String quorumCfgSectionServer1 = quorumCfgSectionServer
+ "metricsProvider.className="
+ BaseTestMetricsProvider.MetricsProviderWithErrorInStop.class.getName()
+ "\n";
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSectionServer1);
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSectionServer);
q1.start();
q2.start();
boolean isup1 = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 30000);
boolean isup2 = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2, 30000);
assertTrue("Server 1 never came up", isup1);
assertTrue("Server 2 never came up", isup2);
q1.shutdown();
q2.shutdown();
assertTrue(
"waiting for server 1 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
assertTrue(
"waiting for server 2 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT));
} finally {
qlogger.removeAppender(appender);
}
assertTrue("metrics provider lifecycle error", BaseTestMetricsProvider.MetricsProviderWithErrorInStop.stopCalled.get());
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
String line;
boolean found = false;
Pattern p = Pattern.compile(".*Error while stopping metrics.*");
while ((line = r.readLine()) != null) {
found = p.matcher(line).matches();
if (found) {
break;
}
}
assertTrue("complains about metrics provider", found);
}
/**
* Verify boot fails with a bad MetricsProvider
*/
@Test
public void testInvalidMetricsProvider() throws Exception {
ClientBase.setupTestEnv();
// setup the logger to capture all logs
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = getConsoleAppender(os, Level.WARN);
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
qlogger.addAppender(appender);
try {
final int CLIENT_PORT_QP1 = PortAssignment.unique();
String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ "\n" + "server.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ "\n" + "metricsProvider.className=BadClass\n";
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
q1.start();
boolean isup = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 5000);
assertFalse("Server never came up", isup);
q1.shutdown();
assertTrue(
"waiting for server 1 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
} finally {
qlogger.removeAppender(appender);
}
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
String line;
boolean found = false;
Pattern p = Pattern.compile(".*BadClass.*");
while ((line = r.readLine()) != null) {
found = p.matcher(line).matches();
if (found) {
break;
}
}
assertTrue("complains about metrics provider", found);
}
/**
* Verify boot fails with a MetricsProvider with fails to start
*/
@Test
public void testFaultyMetricsProviderOnStart() throws Exception {
ClientBase.setupTestEnv();
// setup the logger to capture all logs
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = getConsoleAppender(os, Level.WARN);
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
qlogger.addAppender(appender);
try {
final int CLIENT_PORT_QP1 = PortAssignment.unique();
String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ "\n" + "server.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ "\n" + "metricsProvider.className=" + BaseTestMetricsProvider.MetricsProviderWithErrorInStart.class.getName()
+ "\n";
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
q1.start();
boolean isup = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 5000);
assertFalse("Server never came up", isup);
q1.shutdown();
assertTrue(
"waiting for server 1 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
} finally {
qlogger.removeAppender(appender);
}
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
String line;
boolean found = false;
Pattern p = Pattern.compile(".*MetricsProviderLifeCycleException.*");
while ((line = r.readLine()) != null) {
found = p.matcher(line).matches();
if (found) {
break;
}
}
assertTrue("complains about metrics provider MetricsProviderLifeCycleException", found);
}
/**
* Verify boot fails with a MetricsProvider with fails to start
*/
@Test
public void testFaultyMetricsProviderOnConfigure() throws Exception {
ClientBase.setupTestEnv();
// setup the logger to capture all logs
ByteArrayOutputStream os = new ByteArrayOutputStream();
WriterAppender appender = getConsoleAppender(os, Level.WARN);
Logger qlogger = Logger.getLogger("org.apache.zookeeper.server.quorum");
qlogger.addAppender(appender);
try {
final int CLIENT_PORT_QP1 = PortAssignment.unique();
String quorumCfgSection = "server.1=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ "\n" + "server.2=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ "\n" + "metricsProvider.className=" + BaseTestMetricsProvider.MetricsProviderWithErrorInConfigure.class.getName()
+ "\n";
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
q1.start();
boolean isup = ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP1, 5000);
assertFalse("Server never came up", isup);
q1.shutdown();
assertTrue(
"waiting for server 1 down",
ClientBase.waitForServerDown("127.0.0.1:" + CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT));
} finally {
qlogger.removeAppender(appender);
}
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
String line;
boolean found = false;
Pattern p = Pattern.compile(".*MetricsProviderLifeCycleException.*");
while ((line = r.readLine()) != null) {
found = p.matcher(line).matches();
if (found) {
break;
}
}
assertTrue("complains about metrics provider MetricsProviderLifeCycleException", found);
}
static class Context {
boolean quitFollowing = false;
boolean exitWhenAckNewLeader = false;
NewLeaderAckCallback newLeaderAckCallback = null;
}
interface NewLeaderAckCallback {
void start();
}
interface StartForwardingListener {
void start();
}
interface BeginSnapshotListener {
void start();
}
static class CustomizedQPMain extends TestQPMain {
private Context context;
public CustomizedQPMain(Context context) {
this.context = context;
}
@Override
protected QuorumPeer getQuorumPeer() throws SaslException {
return new CustomQuorumPeer(context);
}
}
static class CustomQuorumPeer extends QuorumPeer {
private Context context;
private LearnerSyncThrottler throttler = null;
private StartForwardingListener startForwardingListener;
private BeginSnapshotListener beginSnapshotListener;
public CustomQuorumPeer(Context context) throws SaslException {
this.context = context;
}
public void setStartForwardingListener(
StartForwardingListener startForwardingListener) {
this.startForwardingListener = startForwardingListener;
}
public void setBeginSnapshotListener(
BeginSnapshotListener beginSnapshotListener) {
this.beginSnapshotListener = beginSnapshotListener;
}
@Override
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {
@Override
void followLeader() throws InterruptedException {
if (context.quitFollowing) {
// reset the flag
context.quitFollowing = false;
LOG.info("Quit following");
return;
} else {
super.followLeader();
}
}
@Override
void writePacket(QuorumPacket pp, boolean flush) throws IOException {
if (pp != null && pp.getType() == Leader.ACK && context.exitWhenAckNewLeader) {
if (context.newLeaderAckCallback != null) {
context.newLeaderAckCallback.start();
}
}
super.writePacket(pp, flush);
}
};
}
@Override
protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException, X509Exception {
return new Leader(this, new LeaderZooKeeperServer(logFactory, this, this.getZkDb())) {
@Override
public long startForwarding(LearnerHandler handler, long lastSeenZxid) {
if (startForwardingListener != null) {
startForwardingListener.start();
}
return super.startForwarding(handler, lastSeenZxid);
}
@Override
public LearnerSyncThrottler getLearnerSnapSyncThrottler() {
if (throttler == null) {
throttler = new LearnerSyncThrottler(getMaxConcurrentSnapSyncs(), LearnerSyncThrottler.SyncType.SNAP) {
@Override
public void beginSync(boolean essential) throws SyncThrottleException, InterruptedException {
if (beginSnapshotListener != null) {
beginSnapshotListener.start();
}
super.beginSync(essential);
}
};
}
return throttler;
}
};
}
}
}