blob: 8642b368d9c1d17df9677b60a8df33b903d8216f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.test;
import static java.lang.Integer.parseInt;
import static java.lang.String.format;
import static java.net.InetAddress.getLoopbackAddress;
import static java.util.stream.Collectors.toList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.DummyWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReconfigTest extends ZKTestCase implements DataCallback {
private static final Logger LOG = LoggerFactory.getLogger(ReconfigTest.class);
private QuorumUtil qu;
private ZooKeeper[] zkArr;
private ZooKeeperAdmin[] zkAdminArr;
@Before
public void setup() {
System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU="/* password is 'test'*/);
QuorumPeerConfig.setReconfigEnabled(true);
}
@After
public void tearDown() throws Exception {
closeAllHandles(zkArr, zkAdminArr);
if (qu != null) {
qu.tearDown();
}
}
public static String reconfig(
ZooKeeperAdmin zkAdmin,
List<String> joiningServers,
List<String> leavingServers,
List<String> newMembers,
long fromConfig) throws KeeperException, InterruptedException {
byte[] config = null;
String failure = null;
LOG.info("reconfig initiated by the test");
for (int j = 0; j < 30; j++) {
try {
config = zkAdmin.reconfigure(joiningServers, leavingServers, newMembers, fromConfig, new Stat());
failure = null;
break;
} catch (KeeperException.ConnectionLossException e) {
failure = "client could not connect to reestablished quorum: giving up after 30+ seconds.";
} catch (KeeperException.ReconfigInProgress e) {
failure = "reconfig still in progress: giving up after 30+ seconds.";
}
Thread.sleep(1000);
}
if (failure != null) {
fail(failure);
}
String configStr = new String(config);
List<ServerConfigLine> currentServerConfigs = Arrays.stream(configStr.split("\n"))
.map(String::trim)
.filter(s->s.startsWith("server"))
.map(ServerConfigLine::new)
.collect(toList());
if (joiningServers != null) {
for (String joiner : joiningServers) {
ServerConfigLine joinerServerConfigLine = new ServerConfigLine(joiner);
String errorMessage = format("expected joiner config \"%s\" not found in current config:\n%s", joiner, configStr);
assertTrue(errorMessage, currentServerConfigs.stream().anyMatch(c -> c.equals(joinerServerConfigLine)));
}
}
if (leavingServers != null) {
for (String leaving : leavingServers) {
String errorMessage = format("leaving server \"%s\" not removed from config: \n%s", leaving, configStr);
assertFalse(errorMessage, configStr.contains(format("server.%s=", leaving)));
}
}
return configStr;
}
public static String testServerHasConfig(
ZooKeeper zk,
List<String> joiningServers,
List<String> leavingServers) throws KeeperException, InterruptedException {
boolean testNodeExists = false;
byte[] config = null;
for (int j = 0; j < 30; j++) {
try {
if (!testNodeExists) {
createZNode(zk, "/dummy", "dummy");
testNodeExists = true;
}
// Use setData instead of sync API to force a view update.
// Check ZOOKEEPER-2137 for details.
zk.setData("/dummy", "dummy".getBytes(), -1);
config = zk.getConfig(false, new Stat());
break;
} catch (KeeperException.ConnectionLossException e) {
if (j < 29) {
Thread.sleep(1000);
} else {
// test fails if we still can't connect to the quorum after
// 30 seconds.
fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
}
}
}
String configStr = new String(config);
if (joiningServers != null) {
for (String joiner : joiningServers) {
assertTrue("Config:<" + configStr + ">\n" + joiner, configStr.contains(joiner));
}
}
if (leavingServers != null) {
for (String leaving : leavingServers) {
assertFalse("Config:<" + configStr + ">\n" + leaving, configStr.contains("server.".concat(leaving)));
}
}
return configStr;
}
public static void testNormalOperation(
ZooKeeper writer,
ZooKeeper reader) throws KeeperException, InterruptedException {
boolean testReaderNodeExists = false;
boolean testWriterNodeExists = false;
for (int j = 0; j < 30; j++) {
try {
if (!testWriterNodeExists) {
createZNode(writer, "/test", "test");
testWriterNodeExists = true;
}
if (!testReaderNodeExists) {
createZNode(reader, "/dummy", "dummy");
testReaderNodeExists = true;
}
String data = "test" + j;
writer.setData("/test", data.getBytes(), -1);
// Use setData instead of sync API to force a view update.
// Check ZOOKEEPER-2137 for details.
reader.setData("/dummy", "dummy".getBytes(), -1);
byte[] res = reader.getData("/test", null, new Stat());
assertEquals(data, new String(res));
break;
} catch (KeeperException.ConnectionLossException e) {
if (j < 29) {
Thread.sleep(1000);
} else {
// test fails if we still can't connect to the quorum after
// 30 seconds.
fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
}
}
}
}
private static void createZNode(
ZooKeeper zk,
String path,
String data) throws KeeperException, InterruptedException {
try {
zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
}
}
private int getLeaderId(QuorumUtil qu) {
int leaderId = 1;
while (qu.getPeer(leaderId).peer.leader == null) {
leaderId++;
}
return leaderId;
}
public static ZooKeeper[] createHandles(QuorumUtil qu) throws IOException {
// create an extra handle, so we can index the handles from 1 to qu.ALL
// using the server id.
ZooKeeper[] zkArr = new ZooKeeper[qu.ALL + 1];
zkArr[0] = null; // not used.
for (int i = 1; i <= qu.ALL; i++) {
// server ids are 1, 2 and 3
zkArr[i] = new ZooKeeper(
"127.0.0.1:" + qu.getPeer(i).peer.getClientPort(),
ClientBase.CONNECTION_TIMEOUT,
DummyWatcher.INSTANCE);
}
return zkArr;
}
public static ZooKeeperAdmin[] createAdminHandles(QuorumUtil qu) throws IOException {
// create an extra handle, so we can index the handles from 1 to qu.ALL
// using the server id.
ZooKeeperAdmin[] zkAdminArr = new ZooKeeperAdmin[qu.ALL + 1];
zkAdminArr[0] = null; // not used.
for (int i = 1; i <= qu.ALL; i++) {
// server ids are 1, 2 and 3
zkAdminArr[i] = new ZooKeeperAdmin(
"127.0.0.1:" + qu.getPeer(i).peer.getClientPort(),
ClientBase.CONNECTION_TIMEOUT,
DummyWatcher.INSTANCE);
zkAdminArr[i].addAuthInfo("digest", "super:test".getBytes());
}
return zkAdminArr;
}
public static void closeAllHandles(ZooKeeper[] zkArr, ZooKeeperAdmin[] zkAdminArr) throws InterruptedException {
if (zkArr != null) {
for (ZooKeeper zk : zkArr) {
if (zk != null) {
zk.close();
}
}
}
if (zkAdminArr != null) {
for (ZooKeeperAdmin zkAdmin : zkAdminArr) {
if (zkAdmin != null) {
zkAdmin.close();
}
}
}
}
@Test
public void testRemoveAddOne() throws Exception {
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
zkArr = createHandles(qu);
zkAdminArr = createAdminHandles(qu);
List<String> leavingServers = new ArrayList<String>();
List<String> joiningServers = new ArrayList<String>();
int leaderIndex = getLeaderId(qu);
// during first iteration, leavingIndex will correspond to a follower
// during second iteration leavingIndex will be the index of the leader
int leavingIndex = (leaderIndex == 1) ? 2 : 1;
for (int i = 0; i < 2; i++) {
// some of the operations will be executed by a client connected to
// the removed server
// while others are invoked by a client connected to some other
// server.
// when we're removing the leader, zk1 will be the client connected
// to removed server
ZooKeeper zk1 = (leavingIndex == leaderIndex) ? zkArr[leaderIndex] : zkArr[(leaderIndex % qu.ALL) + 1];
ZooKeeper zk2 = (leavingIndex == leaderIndex) ? zkArr[(leaderIndex % qu.ALL) + 1] : zkArr[leaderIndex];
ZooKeeperAdmin zkAdmin1 = (leavingIndex == leaderIndex)
? zkAdminArr[leaderIndex]
: zkAdminArr[(leaderIndex % qu.ALL) + 1];
ZooKeeperAdmin zkAdmin2 = (leavingIndex == leaderIndex)
? zkAdminArr[(leaderIndex % qu.ALL) + 1]
: zkAdminArr[leaderIndex];
leavingServers.add(Integer.toString(leavingIndex));
// remember this server so we can add it back later
joiningServers.add("server." + leavingIndex
+ "=localhost:"
+ qu.getPeer(leavingIndex).peer.getQuorumAddress().getAllPorts().get(0)
+ ":"
+ qu.getPeer(leavingIndex).peer.getElectionAddress().getAllPorts().get(0)
+ ":participant;localhost:"
+ qu.getPeer(leavingIndex).peer.getClientPort());
String configStr = reconfig(zkAdmin1, null, leavingServers, null, -1);
testServerHasConfig(zk2, null, leavingServers);
testNormalOperation(zk2, zk1);
QuorumVerifier qv = qu.getPeer(1).peer.configFromString(configStr);
long version = qv.getVersion();
// checks that conditioning on version works properly
try {
reconfig(zkAdmin2, joiningServers, null, null, version + 1);
fail("reconfig succeeded even though version condition was incorrect!");
} catch (KeeperException.BadVersionException e) {
}
reconfig(zkAdmin2, joiningServers, null, null, version);
testNormalOperation(zk1, zk2);
testServerHasConfig(zk1, joiningServers, null);
// second iteration of the loop will remove the leader
// and add it back (as follower)
leavingIndex = leaderIndex = getLeaderId(qu);
leavingServers.clear();
joiningServers.clear();
}
}
/**
* 1. removes and adds back two servers (incl leader). One of the servers is added back as observer
* 2. tests that reconfig fails if quorum of new config is not up
* 3. tests that a server that's not up during reconfig learns the new config when it comes up
* @throws Exception
*/
@Test
public void testRemoveAddTwo() throws Exception {
qu = new QuorumUtil(2); // create 5 servers
qu.disableJMXTest = true;
qu.startAll();
zkArr = createHandles(qu);
zkAdminArr = createAdminHandles(qu);
List<String> leavingServers = new ArrayList<String>();
List<String> joiningServers = new ArrayList<String>();
int leaderIndex = getLeaderId(qu);
// lets remove the leader and some other server
int leavingIndex1 = leaderIndex;
int leavingIndex2 = (leaderIndex == 1) ? 2 : 1;
// find some server that's staying
int stayingIndex1 = 1, stayingIndex2 = 1, stayingIndex3 = 1;
while (stayingIndex1 == leavingIndex1 || stayingIndex1 == leavingIndex2) {
stayingIndex1++;
}
while (stayingIndex2 == leavingIndex1 || stayingIndex2 == leavingIndex2 || stayingIndex2 == stayingIndex1) {
stayingIndex2++;
}
while (stayingIndex3 == leavingIndex1
|| stayingIndex3 == leavingIndex2
|| stayingIndex3 == stayingIndex1
|| stayingIndex3 == stayingIndex2) {
stayingIndex3++;
}
leavingServers.add(Integer.toString(leavingIndex1));
leavingServers.add(Integer.toString(leavingIndex2));
// remember these servers so we can add them back later
joiningServers.add("server."
+ leavingIndex1
+ "=localhost:"
+ qu.getPeer(leavingIndex1).peer.getQuorumAddress().getAllPorts().get(0)
+ ":"
+ qu.getPeer(leavingIndex1).peer.getElectionAddress().getAllPorts().get(0)
+ ":participant;localhost:"
+ qu.getPeer(leavingIndex1).peer.getClientPort());
// this server will be added back as an observer
joiningServers.add("server."
+ leavingIndex2
+ "=localhost:"
+ qu.getPeer(leavingIndex2).peer.getQuorumAddress().getAllPorts().get(0)
+ ":"
+ qu.getPeer(leavingIndex2).peer.getElectionAddress().getAllPorts().get(0)
+ ":observer;localhost:"
+ qu.getPeer(leavingIndex2).peer.getClientPort());
qu.shutdown(leavingIndex1);
qu.shutdown(leavingIndex2);
// 3 servers still up so this should work
reconfig(zkAdminArr[stayingIndex2], null, leavingServers, null, -1);
qu.shutdown(stayingIndex2);
// the following commands would not work in the original
// cluster of 5, but now that we've removed 2 servers
// we have a cluster of 3 servers and one of them is allowed to fail
testServerHasConfig(zkArr[stayingIndex1], null, leavingServers);
testServerHasConfig(zkArr[stayingIndex3], null, leavingServers);
testNormalOperation(zkArr[stayingIndex1], zkArr[stayingIndex3]);
// this is a test that a reconfig will only succeed
// if there is a quorum up in new config. Below there is no
// quorum so it should fail
// the sleep is necessary so that the leader figures out
// that the switched off servers are down
Thread.sleep(10000);
try {
reconfig(zkAdminArr[stayingIndex1], joiningServers, null, null, -1);
fail("reconfig completed successfully even though there is no quorum up in new config!");
} catch (KeeperException.NewConfigNoQuorum e) {
}
// now start the third server so that new config has quorum
qu.restart(stayingIndex2);
reconfig(zkAdminArr[stayingIndex1], joiningServers, null, null, -1);
testNormalOperation(zkArr[stayingIndex2], zkArr[stayingIndex3]);
testServerHasConfig(zkArr[stayingIndex2], joiningServers, null);
// this server wasn't around during the configuration change
// we should check that it is able to connect, finds out
// about the change and becomes an observer.
qu.restart(leavingIndex2);
assertTrue(qu.getPeer(leavingIndex2).peer.getPeerState() == ServerState.OBSERVING);
testNormalOperation(zkArr[stayingIndex2], zkArr[leavingIndex2]);
testServerHasConfig(zkArr[leavingIndex2], joiningServers, null);
}
@Test
public void testBulkReconfig() throws Exception {
qu = new QuorumUtil(3); // create 7 servers
qu.disableJMXTest = true;
qu.startAll();
zkArr = createHandles(qu);
zkAdminArr = createAdminHandles(qu);
// new config will have three of the servers as followers
// two of the servers as observers, and all ports different
ArrayList<String> newServers = new ArrayList<String>();
for (int i = 1; i <= 5; i++) {
String server = "server." + i + "=localhost:" + PortAssignment.unique()
+ ":" + PortAssignment.unique() + ":" + ((i == 4 || i == 5) ? "observer" : "participant")
+ ";localhost:" + qu.getPeer(i).peer.getClientPort();
newServers.add(server);
}
qu.shutdown(3);
qu.shutdown(6);
qu.shutdown(7);
reconfig(zkAdminArr[1], null, null, newServers, -1);
testNormalOperation(zkArr[1], zkArr[2]);
testServerHasConfig(zkArr[1], newServers, null);
testServerHasConfig(zkArr[2], newServers, null);
testServerHasConfig(zkArr[4], newServers, null);
testServerHasConfig(zkArr[5], newServers, null);
qu.shutdown(5);
qu.shutdown(4);
testNormalOperation(zkArr[1], zkArr[2]);
}
@Test
public void testRemoveOneAsynchronous() throws Exception {
qu = new QuorumUtil(2);
qu.disableJMXTest = true;
qu.startAll();
zkArr = createHandles(qu);
zkAdminArr = createAdminHandles(qu);
List<String> leavingServers = new ArrayList<String>();
// lets remove someone who's not the leader
leavingServers.add(getLeaderId(qu) == 5 ? "4" : "5");
List<Integer> results = new LinkedList<Integer>();
zkAdminArr[1].reconfigure(null, leavingServers, null, -1, this, results);
synchronized (results) {
while (results.size() < 1) {
results.wait();
}
}
assertEquals(0, (int) results.get(0));
testNormalOperation(zkArr[1], zkArr[2]);
for (int i = 1; i <= 5; i++) {
testServerHasConfig(zkArr[i], null, leavingServers);
}
}
@SuppressWarnings("unchecked")
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
synchronized (ctx) {
((LinkedList<Integer>) ctx).add(rc);
ctx.notifyAll();
}
}
@Test
public void testRoleChange() throws Exception {
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
zkArr = createHandles(qu);
zkAdminArr = createAdminHandles(qu);
// changing a server's role / port is done by "adding" it with the same
// id but different role / port
List<String> joiningServers = new ArrayList<String>();
int leaderIndex = getLeaderId(qu);
// during first and second iteration, leavingIndex will correspond to a
// follower
// during third and fouth iteration leavingIndex will be the index of
// the leader
int changingIndex = (leaderIndex == 1) ? 2 : 1;
// first convert participant to observer, then observer to participant,
// and so on
String newRole = "observer";
for (int i = 0; i < 4; i++) {
// some of the operations will be executed by a client connected to
// the removed server
// while others are invoked by a client connected to some other
// server.
// when we're removing the leader, zk1 will be the client connected
// to removed server
ZooKeeper zk1 = (changingIndex == leaderIndex) ? zkArr[leaderIndex] : zkArr[(leaderIndex % qu.ALL) + 1];
ZooKeeperAdmin zkAdmin1 = (changingIndex == leaderIndex)
? zkAdminArr[leaderIndex]
: zkAdminArr[(leaderIndex % qu.ALL) + 1];
// exactly as it is now, except for role change
joiningServers.add("server."
+ changingIndex
+ "=localhost:"
+ qu.getPeer(changingIndex).peer.getQuorumAddress().getAllPorts().get(0)
+ ":"
+ qu.getPeer(changingIndex).peer.getElectionAddress().getAllPorts().get(0)
+ ":"
+ newRole
+ ";localhost:"
+ qu.getPeer(changingIndex).peer.getClientPort());
reconfig(zkAdmin1, joiningServers, null, null, -1);
testNormalOperation(zkArr[changingIndex], zk1);
if (newRole.equals("observer")) {
assertTrue(qu.getPeer(changingIndex).peer.observer != null
&& qu.getPeer(changingIndex).peer.follower == null
&& qu.getPeer(changingIndex).peer.leader == null);
assertTrue(qu.getPeer(changingIndex).peer.getPeerState() == ServerState.OBSERVING);
} else {
assertTrue(qu.getPeer(changingIndex).peer.observer == null
&& (qu.getPeer(changingIndex).peer.follower != null
|| qu.getPeer(changingIndex).peer.leader != null));
assertTrue(qu.getPeer(changingIndex).peer.getPeerState() == ServerState.FOLLOWING
|| qu.getPeer(changingIndex).peer.getPeerState() == ServerState.LEADING);
}
joiningServers.clear();
if (newRole.equals("observer")) {
newRole = "participant";
} else {
// lets change leader to observer
newRole = "observer";
leaderIndex = getLeaderId(qu);
changingIndex = leaderIndex;
}
}
}
@Test
public void testPortChange() throws Exception {
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
zkArr = createHandles(qu);
zkAdminArr = createAdminHandles(qu);
List<String> joiningServers = new ArrayList<String>();
int leaderIndex = getLeaderId(qu);
int followerIndex = leaderIndex == 1 ? 2 : 1;
// modify follower's client port
int quorumPort = qu.getPeer(followerIndex).peer.getQuorumAddress().getAllPorts().get(0);
int electionPort = qu.getPeer(followerIndex).peer.getElectionAddress().getAllPorts().get(0);
int oldClientPort = qu.getPeer(followerIndex).peer.getClientPort();
int newClientPort = PortAssignment.unique();
joiningServers.add("server."
+ followerIndex
+ "=localhost:"
+ quorumPort
+ ":"
+ electionPort
+ ":participant;localhost:"
+ newClientPort);
// create a /test znode and check that read/write works before
// any reconfig is invoked
testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
reconfig(zkAdminArr[followerIndex], joiningServers, null, null, -1);
try {
for (int i = 0; i < 20; i++) {
Thread.sleep(1000);
zkArr[followerIndex].setData("/test", "teststr".getBytes(), -1);
}
} catch (KeeperException.ConnectionLossException e) {
fail("Existing client disconnected when client port changed!");
}
zkArr[followerIndex].close();
zkArr[followerIndex] = new ZooKeeper(
"127.0.0.1:" + oldClientPort,
ClientBase.CONNECTION_TIMEOUT,
DummyWatcher.INSTANCE);
zkAdminArr[followerIndex].close();
zkAdminArr[followerIndex] = new ZooKeeperAdmin(
"127.0.0.1:" + oldClientPort,
ClientBase.CONNECTION_TIMEOUT,
DummyWatcher.INSTANCE);
zkAdminArr[followerIndex].addAuthInfo("digest", "super:test".getBytes());
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
zkArr[followerIndex].setData("/test", "teststr".getBytes(), -1);
fail("New client connected to old client port!");
} catch (KeeperException.ConnectionLossException e) {
}
}
zkArr[followerIndex].close();
zkArr[followerIndex] = new ZooKeeper(
"127.0.0.1:" + newClientPort,
ClientBase.CONNECTION_TIMEOUT,
DummyWatcher.INSTANCE);
zkAdminArr[followerIndex].close();
zkAdminArr[followerIndex] = new ZooKeeperAdmin(
"127.0.0.1:" + newClientPort,
ClientBase.CONNECTION_TIMEOUT,
DummyWatcher.INSTANCE);
zkAdminArr[followerIndex].addAuthInfo("digest", "super:test".getBytes());
testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
testServerHasConfig(zkArr[followerIndex], joiningServers, null);
assertEquals(newClientPort, qu.getPeer(followerIndex).peer.getClientPort());
joiningServers.clear();
// change leader's leading port - should renounce leadership
int newQuorumPort = PortAssignment.unique();
joiningServers.add("server." + leaderIndex + "=localhost:"
+ newQuorumPort
+ ":"
+ qu.getPeer(leaderIndex).peer.getElectionAddress().getAllPorts().get(0)
+ ":participant;localhost:"
+ qu.getPeer(leaderIndex).peer.getClientPort());
reconfig(zkAdminArr[leaderIndex], joiningServers, null, null, -1);
testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
assertEquals((int) qu.getPeer(leaderIndex).peer.getQuorumAddress().getAllPorts().get(0), newQuorumPort);
joiningServers.clear();
// change everyone's leader election port
for (int i = 1; i <= 3; i++) {
joiningServers.add("server." + i + "=localhost:"
+ qu.getPeer(i).peer.getQuorumAddress().getAllPorts().get(0)
+ ":"
+ PortAssignment.unique()
+ ":participant;localhost:"
+ qu.getPeer(i).peer.getClientPort());
}
reconfig(zkAdminArr[1], joiningServers, null, null, -1);
leaderIndex = getLeaderId(qu);
int follower1 = leaderIndex == 1 ? 2 : 1;
int follower2 = 1;
while (follower2 == leaderIndex || follower2 == follower1) {
follower2++;
}
// lets kill the leader and see if a new one is elected
qu.shutdown(getLeaderId(qu));
testNormalOperation(zkArr[follower2], zkArr[follower1]);
testServerHasConfig(zkArr[follower1], joiningServers, null);
testServerHasConfig(zkArr[follower2], joiningServers, null);
}
@Test
public void testPortChangeToBlockedPortFollower() throws Exception {
testPortChangeToBlockedPort(false);
}
@Test
public void testPortChangeToBlockedPortLeader() throws Exception {
testPortChangeToBlockedPort(true);
}
private void testPortChangeToBlockedPort(boolean testLeader) throws Exception {
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
zkArr = createHandles(qu);
zkAdminArr = createAdminHandles(qu);
List<String> joiningServers = new ArrayList<String>();
int leaderIndex = getLeaderId(qu);
int followerIndex = leaderIndex == 1 ? 2 : 1;
int serverIndex = testLeader ? leaderIndex : followerIndex;
int reconfigIndex = testLeader ? followerIndex : leaderIndex;
// modify server's client port
int quorumPort = qu.getPeer(serverIndex).peer.getQuorumAddress().getAllPorts().get(0);
int electionPort = qu.getPeer(serverIndex).peer.getElectionAddress().getAllPorts().get(0);
int oldClientPort = qu.getPeer(serverIndex).peer.getClientPort();
int newClientPort = PortAssignment.unique();
try (ServerSocket ss = new ServerSocket()) {
ss.bind(new InetSocketAddress(getLoopbackAddress(), newClientPort));
joiningServers.add("server." + serverIndex + "=localhost:" + quorumPort + ":" + electionPort + ":participant;localhost:" + newClientPort);
// create a /test znode and check that read/write works before
// any reconfig is invoked
testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
// Reconfigure
reconfig(zkAdminArr[reconfigIndex], joiningServers, null, null, -1);
Thread.sleep(1000);
// The follower reconfiguration will have failed
zkArr[serverIndex].close();
zkArr[serverIndex] = new ZooKeeper(
"127.0.0.1:" + newClientPort,
ClientBase.CONNECTION_TIMEOUT,
DummyWatcher.INSTANCE);
zkAdminArr[serverIndex].close();
zkAdminArr[serverIndex] = new ZooKeeperAdmin(
"127.0.0.1:" + newClientPort,
ClientBase.CONNECTION_TIMEOUT,
DummyWatcher.INSTANCE);
try {
Thread.sleep(1000);
zkArr[serverIndex].setData("/test", "teststr".getBytes(), -1);
fail("New client connected to new client port!");
} catch (KeeperException.ConnectionLossException e) {
// Exception is expected
}
//The old port should be clear at this stage
try (ServerSocket ss2 = new ServerSocket()) {
ss2.bind(new InetSocketAddress(getLoopbackAddress(), oldClientPort));
}
// Move back to the old port
joiningServers.clear();
joiningServers.add("server." + serverIndex + "=localhost:" + quorumPort + ":" + electionPort
+ ":participant;localhost:" + oldClientPort);
reconfig(zkAdminArr[reconfigIndex], joiningServers, null, null, -1);
zkArr[serverIndex].close();
zkArr[serverIndex] = new ZooKeeper(
"127.0.0.1:" + oldClientPort,
ClientBase.CONNECTION_TIMEOUT,
DummyWatcher.INSTANCE);
testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
testServerHasConfig(zkArr[serverIndex], joiningServers, null);
assertEquals(oldClientPort, qu.getPeer(serverIndex).peer.getClientPort());
}
}
@Test
public void testUnspecifiedClientAddress() throws Exception {
int[] ports = {PortAssignment.unique(), PortAssignment.unique(), PortAssignment.unique()};
String server = "server.0=localhost:" + ports[0] + ":" + ports[1] + ";" + ports[2];
QuorumServer qs = new QuorumServer(0, server);
assertEquals(qs.clientAddr.getHostString(), "0.0.0.0");
assertEquals(qs.clientAddr.getPort(), ports[2]);
}
@Test
public void testQuorumSystemChange() throws Exception {
qu = new QuorumUtil(3); // create 7 servers
qu.disableJMXTest = true;
qu.startAll();
zkArr = createHandles(qu);
zkAdminArr = createAdminHandles(qu);
ArrayList<String> members = new ArrayList<>();
members.add("group.1=3:4:5");
members.add("group.2=1:2");
members.add("weight.1=0");
members.add("weight.2=0");
members.add("weight.3=1");
members.add("weight.4=1");
members.add("weight.5=1");
for (int i = 1; i <= 5; i++) {
members.add("server." + i + "=127.0.0.1:"
+ qu.getPeer(i).peer.getQuorumAddress().getAllPorts().get(0)
+ ":"
+ qu.getPeer(i).peer.getElectionAddress().getAllPorts().get(0)
+ ";"
+ "127.0.0.1:"
+ qu.getPeer(i).peer.getClientPort());
}
reconfig(zkAdminArr[1], null, null, members, -1);
// this should flush the config to servers 2, 3, 4 and 5
testNormalOperation(zkArr[2], zkArr[3]);
testNormalOperation(zkArr[4], zkArr[5]);
for (int i = 1; i <= 5; i++) {
if (!(qu.getPeer(i).peer.getQuorumVerifier() instanceof QuorumHierarchical)) {
fail("peer " + i + " doesn't think the quorum system is Hieararchical!");
}
}
qu.shutdown(1);
qu.shutdown(2);
qu.shutdown(3);
qu.shutdown(7);
qu.shutdown(6);
// servers 4 and 5 should be able to work independently
testNormalOperation(zkArr[4], zkArr[5]);
qu.restart(1);
qu.restart(2);
members.clear();
for (int i = 1; i <= 3; i++) {
members.add("server." + i + "=127.0.0.1:"
+ qu.getPeer(i).peer.getQuorumAddress().getAllPorts().get(0)
+ ":"
+ qu.getPeer(i).peer.getElectionAddress().getAllPorts().get(0)
+ ";"
+ "127.0.0.1:"
+ qu.getPeer(i).peer.getClientPort());
}
reconfig(zkAdminArr[1], null, null, members, -1);
// flush the config to server 2
testNormalOperation(zkArr[1], zkArr[2]);
qu.shutdown(4);
qu.shutdown(5);
// servers 1 and 2 should be able to work independently
testNormalOperation(zkArr[1], zkArr[2]);
for (int i = 1; i <= 2; i++) {
if (!(qu.getPeer(i).peer.getQuorumVerifier() instanceof QuorumMaj)) {
fail("peer " + i + " doesn't think the quorum system is a majority quorum system!");
}
}
}
@Test
public void testInitialConfigHasPositiveVersion() throws Exception {
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
zkArr = createHandles(qu);
testNormalOperation(zkArr[1], zkArr[2]);
for (int i = 1; i < 4; i++) {
String configStr = testServerHasConfig(zkArr[i], null, null);
QuorumVerifier qv = qu.getPeer(i).peer.configFromString(configStr);
long version = qv.getVersion();
assertTrue(version == 0x100000000L);
}
}
/**
* Tests verifies the jmx attributes of local and remote peer bean - remove
* one quorum peer and again adding it back
*/
@Test
public void testJMXBeanAfterRemoveAddOne() throws Exception {
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
zkArr = createHandles(qu);
zkAdminArr = createAdminHandles(qu);
List<String> leavingServers = new ArrayList<String>();
List<String> joiningServers = new ArrayList<String>();
// assert remotePeerBean.1 of ReplicatedServer_2
int leavingIndex = 1;
int replica2 = 2;
QuorumPeer peer2 = qu.getPeer(replica2).peer;
QuorumServer leavingQS2 = peer2.getView().get(Long.valueOf(leavingIndex));
String remotePeerBean2 = MBeanRegistry.DOMAIN
+ ":name0=ReplicatedServer_id"
+ replica2
+ ",name1=replica."
+ leavingIndex;
assertRemotePeerMXBeanAttributes(leavingQS2, remotePeerBean2);
// assert remotePeerBean.1 of ReplicatedServer_3
int replica3 = 3;
QuorumPeer peer3 = qu.getPeer(replica3).peer;
QuorumServer leavingQS3 = peer3.getView().get(Long.valueOf(leavingIndex));
String remotePeerBean3 = MBeanRegistry.DOMAIN
+ ":name0=ReplicatedServer_id"
+ replica3
+ ",name1=replica."
+ leavingIndex;
assertRemotePeerMXBeanAttributes(leavingQS3, remotePeerBean3);
ZooKeeper zk = zkArr[leavingIndex];
ZooKeeperAdmin zkAdmin = zkAdminArr[leavingIndex];
leavingServers.add(Integer.toString(leavingIndex));
// remember this server so we can add it back later
joiningServers.add("server." + leavingIndex + "=127.0.0.1:"
+ qu.getPeer(leavingIndex).peer.getQuorumAddress().getAllPorts().get(0)
+ ":"
+ qu.getPeer(leavingIndex).peer.getElectionAddress().getAllPorts().get(0)
+ ":participant;127.0.0.1:"
+ qu.getPeer(leavingIndex).peer.getClientPort());
// Remove ReplicatedServer_1 from the ensemble
reconfig(zkAdmin, null, leavingServers, null, -1);
// localPeerBean.1 of ReplicatedServer_1
QuorumPeer removedPeer = qu.getPeer(leavingIndex).peer;
String localPeerBean = MBeanRegistry.DOMAIN
+ ":name0=ReplicatedServer_id"
+ leavingIndex
+ ",name1=replica."
+ leavingIndex;
assertLocalPeerMXBeanAttributes(removedPeer, localPeerBean, false);
// remotePeerBean.1 shouldn't exists in ReplicatedServer_2
JMXEnv.ensureNone(remotePeerBean2);
// remotePeerBean.1 shouldn't exists in ReplicatedServer_3
JMXEnv.ensureNone(remotePeerBean3);
// Add ReplicatedServer_1 back to the ensemble
reconfig(zkAdmin, joiningServers, null, null, -1);
// localPeerBean.1 of ReplicatedServer_1
assertLocalPeerMXBeanAttributes(removedPeer, localPeerBean, true);
// assert remotePeerBean.1 of ReplicatedServer_2
leavingQS2 = peer2.getView().get(Long.valueOf(leavingIndex));
assertRemotePeerMXBeanAttributes(leavingQS2, remotePeerBean2);
// assert remotePeerBean.1 of ReplicatedServer_3
leavingQS3 = peer3.getView().get(Long.valueOf(leavingIndex));
assertRemotePeerMXBeanAttributes(leavingQS3, remotePeerBean3);
}
/**
* Tests verifies the jmx attributes of local and remote peer bean - change
* participant to observer role
*/
@Test
public void testJMXBeanAfterRoleChange() throws Exception {
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
zkArr = createHandles(qu);
zkAdminArr = createAdminHandles(qu);
// changing a server's role / port is done by "adding" it with the same
// id but different role / port
List<String> joiningServers = new ArrayList<String>();
// assert remotePeerBean.1 of ReplicatedServer_2
int changingIndex = 1;
int replica2 = 2;
QuorumPeer peer2 = qu.getPeer(replica2).peer;
QuorumServer changingQS2 = peer2.getView().get(Long.valueOf(changingIndex));
String remotePeerBean2 = MBeanRegistry.DOMAIN
+ ":name0=ReplicatedServer_id"
+ replica2
+ ",name1=replica."
+ changingIndex;
assertRemotePeerMXBeanAttributes(changingQS2, remotePeerBean2);
// assert remotePeerBean.1 of ReplicatedServer_3
int replica3 = 3;
QuorumPeer peer3 = qu.getPeer(replica3).peer;
QuorumServer changingQS3 = peer3.getView().get(Long.valueOf(changingIndex));
String remotePeerBean3 = MBeanRegistry.DOMAIN
+ ":name0=ReplicatedServer_id"
+ replica3
+ ",name1=replica."
+ changingIndex;
assertRemotePeerMXBeanAttributes(changingQS3, remotePeerBean3);
String newRole = "observer";
ZooKeeper zk = zkArr[changingIndex];
ZooKeeperAdmin zkAdmin = zkAdminArr[changingIndex];
// exactly as it is now, except for role change
joiningServers.add("server." + changingIndex + "=127.0.0.1:"
+ qu.getPeer(changingIndex).peer.getQuorumAddress().getAllPorts().get(0)
+ ":"
+ qu.getPeer(changingIndex).peer.getElectionAddress().getAllPorts().get(0)
+ ":"
+ newRole
+ ";127.0.0.1:"
+ qu.getPeer(changingIndex).peer.getClientPort());
reconfig(zkAdmin, joiningServers, null, null, -1);
testNormalOperation(zkArr[changingIndex], zk);
assertTrue(qu.getPeer(changingIndex).peer.observer != null
&& qu.getPeer(changingIndex).peer.follower == null
&& qu.getPeer(changingIndex).peer.leader == null);
assertTrue(qu.getPeer(changingIndex).peer.getPeerState() == ServerState.OBSERVING);
QuorumPeer qp = qu.getPeer(changingIndex).peer;
String localPeerBeanName = MBeanRegistry.DOMAIN
+ ":name0=ReplicatedServer_id"
+ changingIndex
+ ",name1=replica."
+ changingIndex;
// localPeerBean.1 of ReplicatedServer_1
assertLocalPeerMXBeanAttributes(qp, localPeerBeanName, true);
// assert remotePeerBean.1 of ReplicatedServer_2
changingQS2 = peer2.getView().get(Long.valueOf(changingIndex));
assertRemotePeerMXBeanAttributes(changingQS2, remotePeerBean2);
// assert remotePeerBean.1 of ReplicatedServer_3
changingQS3 = peer3.getView().get(Long.valueOf(changingIndex));
assertRemotePeerMXBeanAttributes(changingQS3, remotePeerBean3);
}
private void assertLocalPeerMXBeanAttributes(
QuorumPeer qp,
String beanName,
Boolean isPartOfEnsemble) throws Exception {
assertEquals(
"Mismatches LearnerType!",
qp.getLearnerType().name(),
JMXEnv.ensureBeanAttribute(beanName, "LearnerType"));
assertEquals(
"Mismatches ClientAddress!",
qp.getClientAddress().getHostString() + ":" + qp.getClientAddress().getPort(),
JMXEnv.ensureBeanAttribute(beanName, "ClientAddress"));
assertEquals(
"Mismatches LearnerType!",
qp.getElectionAddress().getOne().getHostString() + ":" + qp.getElectionAddress().getOne().getPort(),
JMXEnv.ensureBeanAttribute(beanName, "ElectionAddress"));
assertEquals(
"Mismatches PartOfEnsemble!",
isPartOfEnsemble,
JMXEnv.ensureBeanAttribute(beanName, "PartOfEnsemble"));
assertEquals(
"Mismatches ConfigVersion!",
qp.getQuorumVerifier().getVersion(),
JMXEnv.ensureBeanAttribute(beanName, "ConfigVersion"));
assertEquals(
"Mismatches QuorumSystemInfo!",
qp.getQuorumVerifier().toString(),
JMXEnv.ensureBeanAttribute(beanName, "QuorumSystemInfo"));
}
String getAddrPortFromBean(String beanName, String attribute) throws Exception {
String name = (String) JMXEnv.ensureBeanAttribute(beanName, attribute);
if (!name.contains(":")) {
return name;
}
return getNumericalAddrPort(name);
}
String getNumericalAddrPort(String name) throws UnknownHostException {
String port = name.split(":")[1];
String addr = name.split(":")[0];
addr = InetAddress.getByName(addr).getHostAddress();
return addr + ":" + port;
}
private void assertRemotePeerMXBeanAttributes(QuorumServer qs, String beanName) throws Exception {
assertEquals(
"Mismatches LearnerType!",
qs.type.name(),
JMXEnv.ensureBeanAttribute(beanName, "LearnerType"));
assertEquals(
"Mismatches ClientAddress!",
getNumericalAddrPort(qs.clientAddr.getHostString() + ":" + qs.clientAddr.getPort()),
getAddrPortFromBean(beanName, "ClientAddress"));
assertEquals(
"Mismatches ElectionAddress!",
getNumericalAddrPort(qs.electionAddr.getOne().getHostString() + ":" + qs.electionAddr.getOne().getPort()),
getAddrPortFromBean(beanName, "ElectionAddress"));
assertEquals(
"Mismatches QuorumAddress!",
getNumericalAddrPort(qs.addr.getOne().getHostString() + ":" + qs.addr.getOne().getPort()),
getAddrPortFromBean(beanName, "QuorumAddress"));
}
/*
* A helper class to parse / compare server address config lines.
* Example: server.1=127.0.0.1:11228:11231|127.0.0.1:11230:11229:participant;0.0.0.0:11227
*/
private static class ServerConfigLine {
private final int serverId;
private Integer clientPort;
// hostName -> <quorumPort1, quorumPort2>
private final Map<String, Set<Integer>> quorumPorts = new HashMap<>();
// hostName -> <electionPort1, electionPort2>
private final Map<String, Set<Integer>> electionPorts = new HashMap<>();
private ServerConfigLine(String configLine) {
String[] parts = configLine.trim().split("=");
serverId = parseInt(parts[0].split("\\.")[1]);
String[] serverConfig = parts[1].split(";");
String[] serverAddresses = serverConfig[0].split("\\|");
if (serverConfig.length > 1) {
String[] clientParts = serverConfig[1].split(":");
if (clientParts.length > 1) {
clientPort = parseInt(clientParts[1]);
} else {
clientPort = parseInt(clientParts[0]);
}
}
for (String addr : serverAddresses) {
// addr like: 127.0.0.1:11230:11229:participant or [0:0:0:0:0:0:0:1]:11346:11347
String serverHost;
String[] ports;
if (addr.contains("[")) {
serverHost = addr.substring(1, addr.indexOf("]"));
ports = addr.substring(addr.indexOf("]") + 2).split(":");
} else {
serverHost = addr.substring(0, addr.indexOf(":"));
ports = addr.substring(addr.indexOf(":") + 1).split(":");
}
quorumPorts.computeIfAbsent(serverHost, k -> new HashSet<>()).add(parseInt(ports[0]));
if (ports.length > 1) {
electionPorts.computeIfAbsent(serverHost, k -> new HashSet<>()).add(parseInt(ports[1]));
}
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ServerConfigLine that = (ServerConfigLine) o;
return serverId == that.serverId
&& Objects.equals(clientPort, that.clientPort)
&& quorumPorts.equals(that.quorumPorts)
&& electionPorts.equals(that.electionPorts);
}
@Override
public int hashCode() {
return Objects.hash(serverId, clientPort, quorumPorts, electionPorts);
}
}
}