blob: cb93106926bd40198e185c98fc537c84ac606291 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper.test;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.jmx.CommonNames;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumStats;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.server.quorum.flexible.QuorumHierarchical;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.junit.Assert;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReconfigTest extends ZKTestCase implements DataCallback{
private static final Logger LOG = LoggerFactory
.getLogger(ReconfigTest.class);
private QuorumUtil qu;
@After
public void tearDown() throws Exception {
if (qu != null) {
qu.tearDown();
}
}
public static String reconfig(ZooKeeper zk, List<String> joiningServers,
List<String> leavingServers, List<String> newMembers, long fromConfig)
throws KeeperException, InterruptedException {
byte[] config = null;
for (int j = 0; j < 30; j++) {
try {
config = zk.reconfig(joiningServers, leavingServers,
newMembers, fromConfig, new Stat());
break;
} catch (KeeperException.ConnectionLossException e) {
if (j < 29) {
Thread.sleep(1000);
} else {
// test fails if we still can't connect to the quorum after
// 30 seconds.
Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
}
}
}
String configStr = new String(config);
if (joiningServers != null) {
for (String joiner : joiningServers)
Assert.assertTrue(configStr.contains(joiner));
}
if (leavingServers != null) {
for (String leaving : leavingServers)
Assert.assertFalse(configStr.contains("server.".concat(leaving)));
}
return configStr;
}
public static String testServerHasConfig(ZooKeeper zk,
List<String> joiningServers, List<String> leavingServers)
throws KeeperException, InterruptedException {
boolean testNodeExists = false;
byte[] config = null;
for (int j = 0; j < 30; j++) {
try {
if (!testNodeExists) {
createZNode(zk, "/dummy", "dummy");
testNodeExists = true;
}
// Use setData instead of sync API to force a view update.
// Check ZOOKEEPER-2137 for details.
zk.setData("/dummy", "dummy".getBytes(), -1);
config = zk.getConfig(false, new Stat());
break;
} catch (KeeperException.ConnectionLossException e) {
if (j < 29) {
Thread.sleep(1000);
} else {
// test fails if we still can't connect to the quorum after
// 30 seconds.
Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
}
}
}
String configStr = new String(config);
if (joiningServers != null) {
for (String joiner : joiningServers) {
Assert.assertTrue(configStr.contains(joiner));
}
}
if (leavingServers != null) {
for (String leaving : leavingServers)
Assert.assertFalse(configStr.contains("server.".concat(leaving)));
}
return configStr;
}
public static void testNormalOperation(ZooKeeper writer, ZooKeeper reader)
throws KeeperException, InterruptedException {
boolean testReaderNodeExists = false;
boolean testWriterNodeExists = false;
for (int j = 0; j < 30; j++) {
try {
if (!testWriterNodeExists) {
createZNode(writer, "/test", "test");
testWriterNodeExists = true;
}
if (!testReaderNodeExists) {
createZNode(reader, "/dummy", "dummy");
testReaderNodeExists = true;
}
String data = "test" + j;
writer.setData("/test", data.getBytes(), -1);
// Use setData instead of sync API to force a view update.
// Check ZOOKEEPER-2137 for details.
reader.setData("/dummy", "dummy".getBytes(), -1);
byte[] res = reader.getData("/test", null, new Stat());
Assert.assertEquals(data, new String(res));
break;
} catch (KeeperException.ConnectionLossException e) {
if (j < 29) {
Thread.sleep(1000);
} else {
// test fails if we still can't connect to the quorum after
// 30 seconds.
Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
}
}
}
}
private static void createZNode(ZooKeeper zk, String path, String data)
throws KeeperException, InterruptedException {
try{
zk.create(path, data.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
}
}
private int getLeaderId(QuorumUtil qu) {
int leaderId = 1;
while (qu.getPeer(leaderId).peer.leader == null)
leaderId++;
return leaderId;
}
public static ZooKeeper[] createHandles(QuorumUtil qu) throws IOException {
// create an extra handle, so we can index the handles from 1 to qu.ALL
// using the server id.
ZooKeeper[] zkArr = new ZooKeeper[qu.ALL + 1];
zkArr[0] = null; // not used.
for (int i = 1; i <= qu.ALL; i++) {
// server ids are 1, 2 and 3
zkArr[i] = new ZooKeeper("127.0.0.1:"
+ qu.getPeer(i).peer.getClientPort(),
ClientBase.CONNECTION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
}});
}
return zkArr;
}
public static void closeAllHandles(ZooKeeper[] zkArr) throws InterruptedException {
for (ZooKeeper zk : zkArr)
if (zk != null)
zk.close();
}
@Test
public void testRemoveAddOne() throws Exception {
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
ZooKeeper[] zkArr = createHandles(qu);
List<String> leavingServers = new ArrayList<String>();
List<String> joiningServers = new ArrayList<String>();
int leaderIndex = getLeaderId(qu);
// during first iteration, leavingIndex will correspond to a follower
// during second iteration leavingIndex will be the index of the leader
int leavingIndex = (leaderIndex == 1) ? 2 : 1;
for (int i = 0; i < 2; i++) {
// some of the operations will be executed by a client connected to
// the removed server
// while others are invoked by a client connected to some other
// server.
// when we're removing the leader, zk1 will be the client connected
// to removed server
ZooKeeper zk1 = (leavingIndex == leaderIndex) ? zkArr[leaderIndex]
: zkArr[(leaderIndex % qu.ALL) + 1];
ZooKeeper zk2 = (leavingIndex == leaderIndex) ? zkArr[(leaderIndex % qu.ALL) + 1]
: zkArr[leaderIndex];
leavingServers.add(Integer.toString(leavingIndex));
// remember this server so we can add it back later
joiningServers.add("server."
+ leavingIndex
+ "=localhost:"
+ qu.getPeer(leavingIndex).peer.getQuorumAddress()
.getPort()
+ ":"
+ qu.getPeer(leavingIndex).peer.getElectionAddress()
.getPort() + ":participant;localhost:"
+ qu.getPeer(leavingIndex).peer.getClientPort());
String configStr = reconfig(zk1, null, leavingServers, null, -1);
testServerHasConfig(zk2, null, leavingServers);
testNormalOperation(zk2, zk1);
QuorumVerifier qv = qu.getPeer(1).peer.configFromString(configStr);
long version = qv.getVersion();
// checks that conditioning on version works properly
try {
reconfig(zk2, joiningServers, null, null, version + 1);
Assert.fail("reconfig succeeded even though version condition was incorrect!");
} catch (KeeperException.BadVersionException e) {
}
reconfig(zk2, joiningServers, null, null, version);
testNormalOperation(zk1, zk2);
testServerHasConfig(zk1, joiningServers, null);
// second iteration of the loop will remove the leader
// and add it back (as follower)
leavingIndex = leaderIndex = getLeaderId(qu);
leavingServers.clear();
joiningServers.clear();
}
closeAllHandles(zkArr);
}
/**
* 1. removes and adds back two servers (incl leader). One of the servers is added back as observer
* 2. tests that reconfig fails if quorum of new config is not up
* 3. tests that a server that's not up during reconfig learns the new config when it comes up
* @throws Exception
*/
@Test
public void testRemoveAddTwo() throws Exception {
qu = new QuorumUtil(2); // create 5 servers
qu.disableJMXTest = true;
qu.startAll();
ZooKeeper[] zkArr = createHandles(qu);
List<String> leavingServers = new ArrayList<String>();
List<String> joiningServers = new ArrayList<String>();
int leaderIndex = getLeaderId(qu);
// lets remove the leader and some other server
int leavingIndex1 = leaderIndex;
int leavingIndex2 = (leaderIndex == 1) ? 2 : 1;
// find some server that's staying
int stayingIndex1 = 1, stayingIndex2 = 1, stayingIndex3 = 1;
while (stayingIndex1 == leavingIndex1 || stayingIndex1 == leavingIndex2)
stayingIndex1++;
while (stayingIndex2 == leavingIndex1 || stayingIndex2 == leavingIndex2
|| stayingIndex2 == stayingIndex1)
stayingIndex2++;
while (stayingIndex3 == leavingIndex1 || stayingIndex3 == leavingIndex2
|| stayingIndex3 == stayingIndex1
|| stayingIndex3 == stayingIndex2)
stayingIndex3++;
leavingServers.add(Integer.toString(leavingIndex1));
leavingServers.add(Integer.toString(leavingIndex2));
// remember these servers so we can add them back later
joiningServers.add("server." + leavingIndex1 + "=localhost:"
+ qu.getPeer(leavingIndex1).peer.getQuorumAddress().getPort()
+ ":"
+ qu.getPeer(leavingIndex1).peer.getElectionAddress().getPort()
+ ":participant;localhost:"
+ qu.getPeer(leavingIndex1).peer.getClientPort());
// this server will be added back as an observer
joiningServers.add("server." + leavingIndex2 + "=localhost:"
+ qu.getPeer(leavingIndex2).peer.getQuorumAddress().getPort()
+ ":"
+ qu.getPeer(leavingIndex2).peer.getElectionAddress().getPort()
+ ":observer;localhost:"
+ qu.getPeer(leavingIndex2).peer.getClientPort());
qu.shutdown(leavingIndex1);
qu.shutdown(leavingIndex2);
// 3 servers still up so this should work
reconfig(zkArr[stayingIndex2], null, leavingServers, null, -1);
qu.shutdown(stayingIndex2);
// the following commands would not work in the original
// cluster of 5, but now that we've removed 2 servers
// we have a cluster of 3 servers and one of them is allowed to fail
testServerHasConfig(zkArr[stayingIndex1], null, leavingServers);
testServerHasConfig(zkArr[stayingIndex3], null, leavingServers);
testNormalOperation(zkArr[stayingIndex1], zkArr[stayingIndex3]);
// this is a test that a reconfig will only succeed
// if there is a quorum up in new config. Below there is no
// quorum so it should fail
// the sleep is necessary so that the leader figures out
// that the switched off servers are down
Thread.sleep(10000);
try {
reconfig(zkArr[stayingIndex1], joiningServers, null, null, -1);
Assert.fail("reconfig completed successfully even though there is no quorum up in new config!");
} catch (KeeperException.NewConfigNoQuorum e) {
}
// now start the third server so that new config has quorum
qu.restart(stayingIndex2);
reconfig(zkArr[stayingIndex1], joiningServers, null, null, -1);
testNormalOperation(zkArr[stayingIndex2], zkArr[stayingIndex3]);
testServerHasConfig(zkArr[stayingIndex2], joiningServers, null);
// this server wasn't around during the configuration change
// we should check that it is able to connect, finds out
// about the change and becomes an observer.
qu.restart(leavingIndex2);
Assert.assertTrue(qu.getPeer(leavingIndex2).peer.getPeerState() == ServerState.OBSERVING);
testNormalOperation(zkArr[stayingIndex2], zkArr[leavingIndex2]);
testServerHasConfig(zkArr[leavingIndex2], joiningServers, null);
closeAllHandles(zkArr);
}
@Test
public void testBulkReconfig() throws Exception {
qu = new QuorumUtil(3); // create 7 servers
qu.disableJMXTest = true;
qu.startAll();
ZooKeeper[] zkArr = createHandles(qu);
// new config will have three of the servers as followers
// two of the servers as observers, and all ports different
ArrayList<String> newServers = new ArrayList<String>();
for (int i = 1; i <= 5; i++) {
String server = "server." + i + "=localhost:" + PortAssignment.unique()
+ ":" + PortAssignment.unique() + ":"
+ ((i == 4 || i == 5) ? "observer" : "participant")
+ ";localhost:" + qu.getPeer(i).peer.getClientPort();
newServers.add(server);
}
qu.shutdown(3);
qu.shutdown(6);
qu.shutdown(7);
reconfig(zkArr[1], null, null, newServers, -1);
testNormalOperation(zkArr[1], zkArr[2]);
testServerHasConfig(zkArr[1], newServers, null);
testServerHasConfig(zkArr[2], newServers, null);
testServerHasConfig(zkArr[4], newServers, null);
testServerHasConfig(zkArr[5], newServers, null);
qu.shutdown(5);
qu.shutdown(4);
testNormalOperation(zkArr[1], zkArr[2]);
closeAllHandles(zkArr);
}
@Test
public void testRemoveOneAsynchronous() throws Exception {
qu = new QuorumUtil(2);
qu.disableJMXTest = true;
qu.startAll();
ZooKeeper[] zkArr = createHandles(qu);
List<String> leavingServers = new ArrayList<String>();
// lets remove someone who's not the leader
leavingServers.add(getLeaderId(qu) == 5 ? "4": "5");
LinkedList<Integer> results = new LinkedList<Integer>();
zkArr[1].reconfig(null, leavingServers, null, -1, this, results);
synchronized (results) {
while (results.size() < 1) {
results.wait();
}
}
Assert.assertEquals(0, (int) results.get(0));
testNormalOperation(zkArr[1], zkArr[2]);
for (int i=1; i<=5; i++)
testServerHasConfig(zkArr[i], null, leavingServers);
closeAllHandles(zkArr);
}
@SuppressWarnings("unchecked")
public void processResult(int rc, String path, Object ctx, byte[] data,
Stat stat) {
synchronized(ctx) {
((LinkedList<Integer>)ctx).add(rc);
ctx.notifyAll();
}
}
@Test
public void testRoleChange() throws Exception {
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
ZooKeeper[] zkArr = createHandles(qu);
// changing a server's role / port is done by "adding" it with the same
// id but different role / port
List<String> joiningServers = new ArrayList<String>();
int leaderIndex = getLeaderId(qu);
// during first and second iteration, leavingIndex will correspond to a
// follower
// during third and fouth iteration leavingIndex will be the index of
// the leader
int changingIndex = (leaderIndex == 1) ? 2 : 1;
// first convert participant to observer, then observer to participant,
// and so on
String newRole = "observer";
for (int i = 0; i < 4; i++) {
// some of the operations will be executed by a client connected to
// the removed server
// while others are invoked by a client connected to some other
// server.
// when we're removing the leader, zk1 will be the client connected
// to removed server
ZooKeeper zk1 = (changingIndex == leaderIndex) ? zkArr[leaderIndex]
: zkArr[(leaderIndex % qu.ALL) + 1];
// exactly as it is now, except for role change
joiningServers.add("server."
+ changingIndex
+ "=localhost:"
+ qu.getPeer(changingIndex).peer.getQuorumAddress()
.getPort()
+ ":"
+ qu.getPeer(changingIndex).peer.getElectionAddress()
.getPort() + ":" + newRole + ";localhost:"
+ qu.getPeer(changingIndex).peer.getClientPort());
reconfig(zk1, joiningServers, null, null, -1);
testNormalOperation(zkArr[changingIndex], zk1);
if (newRole.equals("observer")) {
Assert.assertTrue(qu.getPeer(changingIndex).peer.observer != null
&& qu.getPeer(changingIndex).peer.follower == null
&& qu.getPeer(changingIndex).peer.leader == null);
Assert.assertTrue(qu.getPeer(changingIndex).peer.getPeerState() == ServerState.OBSERVING);
} else {
Assert.assertTrue(qu.getPeer(changingIndex).peer.observer == null
&& (qu.getPeer(changingIndex).peer.follower != null || qu
.getPeer(changingIndex).peer.leader != null));
Assert.assertTrue(qu.getPeer(changingIndex).peer.getPeerState() == ServerState.FOLLOWING
|| qu.getPeer(changingIndex).peer.getPeerState() == ServerState.LEADING);
}
joiningServers.clear();
if (newRole.equals("observer")) {
newRole = "participant";
} else {
// lets change leader to observer
newRole = "observer";
leaderIndex = getLeaderId(qu);
changingIndex = leaderIndex;
}
}
closeAllHandles(zkArr);
}
@Test
public void testPortChange() throws Exception {
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
ZooKeeper[] zkArr = createHandles(qu);
List<String> joiningServers = new ArrayList<String>();
int leaderIndex = getLeaderId(qu);
int followerIndex = leaderIndex == 1 ? 2 : 1;
// modify follower's client port
int quorumPort = qu.getPeer(followerIndex).peer.getQuorumAddress().getPort();
int electionPort = qu.getPeer(followerIndex).peer.getElectionAddress().getPort();
int oldClientPort = qu.getPeer(followerIndex).peer.getClientPort();
int newClientPort = PortAssignment.unique();
joiningServers.add("server." + followerIndex + "=localhost:" + quorumPort
+ ":" + electionPort + ":participant;localhost:" + newClientPort);
// create a /test znode and check that read/write works before
// any reconfig is invoked
testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
reconfig(zkArr[followerIndex], joiningServers, null, null, -1);
try {
for (int i=0; i < 20; i++) {
Thread.sleep(1000);
zkArr[followerIndex].setData("/test", "teststr".getBytes(), -1);
}
} catch (KeeperException.ConnectionLossException e) {
Assert.fail("Existing client disconnected when client port changed!");
}
zkArr[followerIndex].close();
zkArr[followerIndex] = new ZooKeeper("127.0.0.1:"
+ oldClientPort,
ClientBase.CONNECTION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {}});
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
zkArr[followerIndex].setData("/test", "teststr".getBytes(), -1);
Assert.fail("New client connected to old client port!");
} catch (KeeperException.ConnectionLossException e) {
}
}
zkArr[followerIndex].close();
zkArr[followerIndex] = new ZooKeeper("127.0.0.1:"
+ newClientPort,
ClientBase.CONNECTION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {}});
testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
testServerHasConfig(zkArr[followerIndex], joiningServers, null);
Assert.assertEquals(newClientPort, qu.getPeer(followerIndex).peer.getClientPort());
joiningServers.clear();
// change leader's leading port - should renounce leadership
int newQuorumPort = PortAssignment.unique();
joiningServers.add("server." + leaderIndex + "=localhost:"
+ newQuorumPort
+ ":"
+ qu.getPeer(leaderIndex).peer.getElectionAddress().getPort()
+ ":participant;localhost:"
+ qu.getPeer(leaderIndex).peer.getClientPort());
reconfig(zkArr[leaderIndex], joiningServers, null, null, -1);
testNormalOperation(zkArr[followerIndex], zkArr[leaderIndex]);
Assert.assertTrue(qu.getPeer(leaderIndex).peer.getQuorumAddress()
.getPort() == newQuorumPort);
Assert.assertTrue(getLeaderId(qu) != leaderIndex); // the leader changed
joiningServers.clear();
// change everyone's leader election port
for (int i = 1; i <= 3; i++) {
joiningServers.add("server." + i + "=localhost:"
+ qu.getPeer(i).peer.getQuorumAddress().getPort() + ":"
+ PortAssignment.unique() + ":participant;localhost:"
+ qu.getPeer(i).peer.getClientPort());
}
reconfig(zkArr[1], joiningServers, null, null, -1);
leaderIndex = getLeaderId(qu);
int follower1 = leaderIndex == 1 ? 2 : 1;
int follower2 = 1;
while (follower2 == leaderIndex || follower2 == follower1)
follower2++;
// lets kill the leader and see if a new one is elected
qu.shutdown(getLeaderId(qu));
testNormalOperation(zkArr[follower2], zkArr[follower1]);
testServerHasConfig(zkArr[follower1], joiningServers, null);
testServerHasConfig(zkArr[follower2], joiningServers, null);
closeAllHandles(zkArr);
}
@Test
public void testUnspecifiedClientAddress() throws Exception {
int[] ports = new int[3];
for (int port : ports) {
port = PortAssignment.unique();
}
String server = "server.0=localhost:" + ports[0] + ":" + ports[1] + ";" + ports[2];
QuorumServer qs = new QuorumServer(0, server);
Assert.assertEquals(qs.clientAddr.getHostString(), "0.0.0.0");
Assert.assertEquals(qs.clientAddr.getPort(), ports[2]);
}
@Test
public void testQuorumSystemChange() throws Exception {
qu = new QuorumUtil(3); // create 7 servers
qu.disableJMXTest = true;
qu.startAll();
ZooKeeper[] zkArr = createHandles(qu);
ArrayList<String> members = new ArrayList<String>();
members.add("group.1=3:4:5");
members.add("group.2=1:2");
members.add("weight.1=0");
members.add("weight.2=0");
members.add("weight.3=1");
members.add("weight.4=1");
members.add("weight.5=1");
for (int i = 1; i <= 5; i++) {
members.add("server." + i + "=127.0.0.1:"
+ qu.getPeer(i).peer.getQuorumAddress().getPort() + ":"
+ qu.getPeer(i).peer.getElectionAddress().getPort() + ";"
+ "127.0.0.1:" + qu.getPeer(i).peer.getClientPort());
}
reconfig(zkArr[1], null, null, members, -1);
// this should flush the config to servers 2, 3, 4 and 5
testNormalOperation(zkArr[2], zkArr[3]);
testNormalOperation(zkArr[4], zkArr[5]);
for (int i = 1; i <= 5; i++) {
if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumHierarchical))
Assert.fail("peer " + i
+ " doesn't think the quorum system is Hieararchical!");
}
qu.shutdown(1);
qu.shutdown(2);
qu.shutdown(3);
qu.shutdown(7);
qu.shutdown(6);
// servers 4 and 5 should be able to work independently
testNormalOperation(zkArr[4], zkArr[5]);
qu.restart(1);
qu.restart(2);
members.clear();
for (int i = 1; i <= 3; i++) {
members.add("server." + i + "=127.0.0.1:"
+ qu.getPeer(i).peer.getQuorumAddress().getPort() + ":"
+ qu.getPeer(i).peer.getElectionAddress().getPort() + ";"
+ "127.0.0.1:" + qu.getPeer(i).peer.getClientPort());
}
reconfig(zkArr[1], null, null, members, -1);
// flush the config to server 2
testNormalOperation(zkArr[1], zkArr[2]);
qu.shutdown(4);
qu.shutdown(5);
// servers 1 and 2 should be able to work independently
testNormalOperation(zkArr[1], zkArr[2]);
for (int i = 1; i <= 2; i++) {
if (!(qu.getPeer(i).peer.quorumVerifier instanceof QuorumMaj))
Assert.fail("peer "
+ i
+ " doesn't think the quorum system is a majority quorum system!");
}
closeAllHandles(zkArr);
}
@Test
public void testInitialConfigHasPositiveVersion() throws Exception {
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
ZooKeeper[] zkArr = createHandles(qu);
testNormalOperation(zkArr[1], zkArr[2]);
for (int i=1; i<4; i++) {
String configStr = testServerHasConfig(zkArr[i], null, null);
QuorumVerifier qv = qu.getPeer(i).peer.configFromString(configStr);
long version = qv.getVersion();
Assert.assertTrue(version == 0x100000000L);
}
}
/**
* Tests verifies the jmx attributes of local and remote peer bean - remove
* one quorum peer and again adding it back
*/
@Test
public void testJMXBeanAfterRemoveAddOne() throws Exception {
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
ZooKeeper[] zkArr = createHandles(qu);
List<String> leavingServers = new ArrayList<String>();
List<String> joiningServers = new ArrayList<String>();
// assert remotePeerBean.1 of ReplicatedServer_2
int leavingIndex = 1;
int replica2 = 2;
QuorumPeer peer2 = qu.getPeer(replica2).peer;
QuorumServer leavingQS2 = peer2.getView().get(new Long(leavingIndex));
String remotePeerBean2 = CommonNames.DOMAIN
+ ":name0=ReplicatedServer_id" + replica2 + ",name1=replica."
+ leavingIndex;
assertRemotePeerMXBeanAttributes(leavingQS2, remotePeerBean2);
// assert remotePeerBean.1 of ReplicatedServer_3
int replica3 = 3;
QuorumPeer peer3 = qu.getPeer(replica3).peer;
QuorumServer leavingQS3 = peer3.getView().get(new Long(leavingIndex));
String remotePeerBean3 = CommonNames.DOMAIN
+ ":name0=ReplicatedServer_id" + replica3 + ",name1=replica."
+ leavingIndex;
assertRemotePeerMXBeanAttributes(leavingQS3, remotePeerBean3);
ZooKeeper zk = zkArr[leavingIndex];
leavingServers.add(Integer.toString(leavingIndex));
// remember this server so we can add it back later
joiningServers.add("server." + leavingIndex + "=127.0.0.1:"
+ qu.getPeer(leavingIndex).peer.getQuorumAddress().getPort()
+ ":"
+ qu.getPeer(leavingIndex).peer.getElectionAddress().getPort()
+ ":participant;127.0.0.1:"
+ qu.getPeer(leavingIndex).peer.getClientPort());
// Remove ReplicatedServer_1 from the ensemble
reconfig(zk, null, leavingServers, null, -1);
// localPeerBean.1 of ReplicatedServer_1
QuorumPeer removedPeer = qu.getPeer(leavingIndex).peer;
String localPeerBean = CommonNames.DOMAIN
+ ":name0=ReplicatedServer_id" + leavingIndex
+ ",name1=replica." + leavingIndex;
assertLocalPeerMXBeanAttributes(removedPeer, localPeerBean, false);
// remotePeerBean.1 shouldn't exists in ReplicatedServer_2
JMXEnv.ensureNone(remotePeerBean2);
// remotePeerBean.1 shouldn't exists in ReplicatedServer_3
JMXEnv.ensureNone(remotePeerBean3);
// Add ReplicatedServer_1 back to the ensemble
reconfig(zk, joiningServers, null, null, -1);
// localPeerBean.1 of ReplicatedServer_1
assertLocalPeerMXBeanAttributes(removedPeer, localPeerBean, true);
// assert remotePeerBean.1 of ReplicatedServer_2
leavingQS2 = peer2.getView().get(new Long(leavingIndex));
assertRemotePeerMXBeanAttributes(leavingQS2, remotePeerBean2);
// assert remotePeerBean.1 of ReplicatedServer_3
leavingQS3 = peer3.getView().get(new Long(leavingIndex));
assertRemotePeerMXBeanAttributes(leavingQS3, remotePeerBean3);
closeAllHandles(zkArr);
}
/**
* Tests verifies the jmx attributes of local and remote peer bean - change
* participant to observer role
*/
@Test
public void testJMXBeanAfterRoleChange() throws Exception {
qu = new QuorumUtil(1); // create 3 servers
qu.disableJMXTest = true;
qu.startAll();
ZooKeeper[] zkArr = createHandles(qu);
// changing a server's role / port is done by "adding" it with the same
// id but different role / port
List<String> joiningServers = new ArrayList<String>();
// assert remotePeerBean.1 of ReplicatedServer_2
int changingIndex = 1;
int replica2 = 2;
QuorumPeer peer2 = qu.getPeer(replica2).peer;
QuorumServer changingQS2 = peer2.getView().get(new Long(changingIndex));
String remotePeerBean2 = CommonNames.DOMAIN
+ ":name0=ReplicatedServer_id" + replica2 + ",name1=replica."
+ changingIndex;
assertRemotePeerMXBeanAttributes(changingQS2, remotePeerBean2);
// assert remotePeerBean.1 of ReplicatedServer_3
int replica3 = 3;
QuorumPeer peer3 = qu.getPeer(replica3).peer;
QuorumServer changingQS3 = peer3.getView().get(new Long(changingIndex));
String remotePeerBean3 = CommonNames.DOMAIN
+ ":name0=ReplicatedServer_id" + replica3 + ",name1=replica."
+ changingIndex;
assertRemotePeerMXBeanAttributes(changingQS3, remotePeerBean3);
String newRole = "observer";
ZooKeeper zk = zkArr[changingIndex];
// exactly as it is now, except for role change
joiningServers.add("server." + changingIndex + "=127.0.0.1:"
+ qu.getPeer(changingIndex).peer.getQuorumAddress().getPort()
+ ":"
+ qu.getPeer(changingIndex).peer.getElectionAddress().getPort()
+ ":" + newRole + ";127.0.0.1:"
+ qu.getPeer(changingIndex).peer.getClientPort());
reconfig(zk, joiningServers, null, null, -1);
testNormalOperation(zkArr[changingIndex], zk);
Assert.assertTrue(qu.getPeer(changingIndex).peer.observer != null
&& qu.getPeer(changingIndex).peer.follower == null
&& qu.getPeer(changingIndex).peer.leader == null);
Assert.assertTrue(qu.getPeer(changingIndex).peer.getPeerState() == ServerState.OBSERVING);
QuorumPeer qp = qu.getPeer(changingIndex).peer;
String localPeerBeanName = CommonNames.DOMAIN
+ ":name0=ReplicatedServer_id" + changingIndex
+ ",name1=replica." + changingIndex;
// localPeerBean.1 of ReplicatedServer_1
assertLocalPeerMXBeanAttributes(qp, localPeerBeanName, true);
// assert remotePeerBean.1 of ReplicatedServer_2
changingQS2 = peer2.getView().get(new Long(changingIndex));
assertRemotePeerMXBeanAttributes(changingQS2, remotePeerBean2);
// assert remotePeerBean.1 of ReplicatedServer_3
changingQS3 = peer3.getView().get(new Long(changingIndex));
assertRemotePeerMXBeanAttributes(changingQS3, remotePeerBean3);
closeAllHandles(zkArr);
}
private void assertLocalPeerMXBeanAttributes(QuorumPeer qp,
String beanName, Boolean isPartOfEnsemble) throws Exception {
Assert.assertEquals("Mismatches LearnerType!", qp.getLearnerType()
.name(), JMXEnv.ensureBeanAttribute(beanName, "LearnerType"));
Assert.assertEquals("Mismatches ClientAddress!",
qp.getClientAddress().getHostString() + ":" + qp.getClientAddress().getPort(),
JMXEnv.ensureBeanAttribute(beanName, "ClientAddress"));
Assert.assertEquals("Mismatches LearnerType!",
qp.getElectionAddress().getHostString() + ":" + qp.getElectionAddress().getPort(),
JMXEnv.ensureBeanAttribute(beanName, "ElectionAddress"));
Assert.assertEquals("Mismatches PartOfEnsemble!", isPartOfEnsemble,
JMXEnv.ensureBeanAttribute(beanName, "PartOfEnsemble"));
Assert.assertEquals("Mismatches ConfigVersion!", qp.getQuorumVerifier()
.getVersion(), JMXEnv.ensureBeanAttribute(beanName,
"ConfigVersion"));
Assert.assertEquals("Mismatches QuorumSystemInfo!", qp
.getQuorumVerifier().toString(), JMXEnv.ensureBeanAttribute(
beanName, "QuorumSystemInfo"));
}
String getAddrPortFromBean(String beanName, String attribute) throws Exception {
String name = (String) JMXEnv.ensureBeanAttribute(
beanName, attribute);
if ( ! name.contains(":") ) {
return name;
}
return getNumericalAddrPort(name);
}
String getNumericalAddrPort(String name) throws UnknownHostException {
String port = name.split(":")[1];
String addr = name.split(":")[0];
addr = InetAddress.getByName(addr).getHostAddress();
return addr + ":" + port;
}
private void assertRemotePeerMXBeanAttributes(QuorumServer qs,
String beanName) throws Exception {
Assert.assertEquals("Mismatches LearnerType!", qs.type.name(),
JMXEnv.ensureBeanAttribute(beanName, "LearnerType"));
Assert.assertEquals("Mismatches ClientAddress!",
getNumericalAddrPort(qs.clientAddr.getHostString() + ":" + qs.clientAddr.getPort()),
getAddrPortFromBean(beanName, "ClientAddress") );
Assert.assertEquals("Mismatches ElectionAddress!",
getNumericalAddrPort(qs.electionAddr.getHostString() + ":" + qs.electionAddr.getPort()),
getAddrPortFromBean(beanName, "ElectionAddress") );
Assert.assertEquals("Mismatches QuorumAddress!",
getNumericalAddrPort(qs.addr.getHostString() + ":" + qs.addr.getPort()),
getAddrPortFromBean(beanName, "QuorumAddress") );
}
}