blob: 96fa384303a2d64726378ae694bee22168a4a296 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.imps;
import com.google.common.collect.Lists;
import org.apache.curator.ensemble.EnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingCluster;
import org.apache.curator.test.TestingZooKeeperServer;
import org.apache.curator.test.compatibility.CuratorTestBase;
import org.apache.curator.test.compatibility.Timing2;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class TestReconfiguration extends CuratorTestBase
{
private final Timing2 timing = new Timing2();
private TestingCluster cluster;
private EnsembleProvider ensembleProvider;
private static final String superUserPasswordDigest = "curator-test:zghsj3JfJqK7DbWf0RQ1BgbJH9w="; // ran from DigestAuthenticationProvider.generateDigest(superUserPassword);
private static final String superUserPassword = "curator-test";
@BeforeMethod
@Override
public void setup() throws Exception
{
super.setup();
QuorumPeerConfig.setReconfigEnabled(true);
System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", superUserPasswordDigest);
CloseableUtils.closeQuietly(server);
cluster = createAndStartCluster(3);
}
@AfterMethod
@Override
public void teardown() throws Exception
{
CloseableUtils.closeQuietly(cluster);
ensembleProvider = null;
System.clearProperty("zookeeper.DigestAuthenticationProvider.superDigest");
super.teardown();
}
@SuppressWarnings("ConstantConditions")
@Test(enabled = false)
public void testApiPermutations() throws Exception
{
// not an actual test. Specifies all possible API possibilities
Watcher watcher = null;
Stat stat = null;
CuratorFramework client = null;
client.getConfig().forEnsemble();
client.getConfig().inBackground().forEnsemble();
client.getConfig().usingWatcher(watcher).forEnsemble();
client.getConfig().usingWatcher(watcher).inBackground().forEnsemble();
client.getConfig().storingStatIn(stat).forEnsemble();
client.getConfig().storingStatIn(stat).inBackground().forEnsemble();
client.getConfig().storingStatIn(stat).usingWatcher(watcher).forEnsemble();
client.getConfig().storingStatIn(stat).usingWatcher(watcher).inBackground().forEnsemble();
// ---------
client.reconfig().leaving().forEnsemble();
client.reconfig().joining().forEnsemble();
client.reconfig().leaving().joining().forEnsemble();
client.reconfig().joining().leaving().forEnsemble();
client.reconfig().withNewMembers().forEnsemble();
client.reconfig().leaving().fromConfig(0).forEnsemble();
client.reconfig().joining().fromConfig(0).forEnsemble();
client.reconfig().leaving().joining().fromConfig(0).forEnsemble();
client.reconfig().joining().leaving().fromConfig(0).forEnsemble();
client.reconfig().withNewMembers().fromConfig(0).forEnsemble();
client.reconfig().leaving().storingStatIn(stat).forEnsemble();
client.reconfig().joining().storingStatIn(stat).forEnsemble();
client.reconfig().leaving().joining().storingStatIn(stat).forEnsemble();
client.reconfig().joining().leaving().storingStatIn(stat).forEnsemble();
client.reconfig().withNewMembers().storingStatIn(stat).forEnsemble();
client.reconfig().leaving().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().joining().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().leaving().joining().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().joining().leaving().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().withNewMembers().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().inBackground().leaving().forEnsemble();
client.reconfig().inBackground().joining().forEnsemble();
client.reconfig().inBackground().leaving().joining().forEnsemble();
client.reconfig().inBackground().joining().leaving().forEnsemble();
client.reconfig().inBackground().withNewMembers().forEnsemble();
client.reconfig().inBackground().leaving().fromConfig(0).forEnsemble();
client.reconfig().inBackground().joining().fromConfig(0).forEnsemble();
client.reconfig().inBackground().leaving().joining().fromConfig(0).forEnsemble();
client.reconfig().inBackground().joining().leaving().fromConfig(0).forEnsemble();
client.reconfig().inBackground().withNewMembers().fromConfig(0).forEnsemble();
client.reconfig().inBackground().leaving().storingStatIn(stat).forEnsemble();
client.reconfig().inBackground().joining().storingStatIn(stat).forEnsemble();
client.reconfig().inBackground().leaving().joining().storingStatIn(stat).forEnsemble();
client.reconfig().inBackground().joining().leaving().storingStatIn(stat).forEnsemble();
client.reconfig().inBackground().withNewMembers().storingStatIn(stat).forEnsemble();
client.reconfig().inBackground().leaving().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().inBackground().joining().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().inBackground().leaving().joining().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().inBackground().joining().leaving().storingStatIn(stat).fromConfig(0).forEnsemble();
client.reconfig().inBackground().withNewMembers().storingStatIn(stat).fromConfig(0).forEnsemble();
}
@Test
public void testBasicGetConfig() throws Exception
{
try ( CuratorFramework client = newClient())
{
client.start();
byte[] configData = client.getConfig().forEnsemble();
QuorumVerifier quorumVerifier = toQuorumVerifier(configData);
System.out.println(quorumVerifier);
assertConfig(quorumVerifier, cluster.getInstances());
Assert.assertEquals(EnsembleTracker.configToConnectionString(quorumVerifier), ensembleProvider.getConnectionString());
}
}
@Test
public void testAddWithoutEnsembleTracker() throws Exception
{
final String initialClusterCS = cluster.getConnectString();
try ( CuratorFramework client = newClient(cluster.getConnectString(), false))
{
Assert.assertEquals(((CuratorFrameworkImpl) client).getEnsembleTracker(), null);
client.start();
QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble());
assertConfig(oldConfig, cluster.getInstances());
CountDownLatch latch = setChangeWaiter(client);
try ( TestingCluster newCluster = new TestingCluster(TestingCluster.makeSpecs(1, false)) )
{
newCluster.start();
client.reconfig().joining(toReconfigSpec(newCluster.getInstances())).fromConfig(oldConfig.getVersion()).forEnsemble();
Assert.assertTrue(timing.awaitLatch(latch));
byte[] newConfigData = client.getConfig().forEnsemble();
QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
newInstances.addAll(newCluster.getInstances());
assertConfig(newConfig, newInstances);
Assert.assertEquals(ensembleProvider.getConnectionString(), initialClusterCS);
Assert.assertNotEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
Assert.assertEquals(client.getZookeeperClient().getCurrentConnectionString(), initialClusterCS);
final CountDownLatch reconnectLatch = new CountDownLatch(1);
client.getConnectionStateListenable().addListener(
(cfClient, newState) -> {
if (newState == ConnectionState.RECONNECTED) reconnectLatch.countDown();
}
);
client.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
Assert.assertTrue(reconnectLatch.await(2, TimeUnit.SECONDS));
Assert.assertEquals(client.getZookeeperClient().getCurrentConnectionString(), initialClusterCS);
Assert.assertEquals(ensembleProvider.getConnectionString(), initialClusterCS);
newConfigData = client.getConfig().forEnsemble();
Assert.assertNotEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
}
}
}
@Test
public void testAdd() throws Exception
{
try ( CuratorFramework client = newClient())
{
client.start();
QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble());
assertConfig(oldConfig, cluster.getInstances());
CountDownLatch latch = setChangeWaiter(client);
try ( TestingCluster newCluster = new TestingCluster(TestingCluster.makeSpecs(1, false)) )
{
newCluster.start();
client.reconfig().joining(toReconfigSpec(newCluster.getInstances())).fromConfig(oldConfig.getVersion()).forEnsemble();
Assert.assertTrue(timing.awaitLatch(latch));
byte[] newConfigData = client.getConfig().forEnsemble();
QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
newInstances.addAll(newCluster.getInstances());
assertConfig(newConfig, newInstances);
Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
}
}
}
@Test
public void testAddAsync() throws Exception
{
try ( CuratorFramework client = newClient())
{
client.start();
QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble());
assertConfig(oldConfig, cluster.getInstances());
CountDownLatch latch = setChangeWaiter(client);
try ( TestingCluster newCluster = new TestingCluster(TestingCluster.makeSpecs(1, false)) )
{
newCluster.start();
final CountDownLatch callbackLatch = new CountDownLatch(1);
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( event.getType() == CuratorEventType.RECONFIG )
{
callbackLatch.countDown();
}
}
};
client.reconfig().inBackground(callback).joining(toReconfigSpec(newCluster.getInstances())).fromConfig(oldConfig.getVersion()).forEnsemble();
Assert.assertTrue(timing.awaitLatch(callbackLatch));
Assert.assertTrue(timing.awaitLatch(latch));
byte[] newConfigData = client.getConfig().forEnsemble();
QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
List<InstanceSpec> newInstances = Lists.newArrayList(cluster.getInstances());
newInstances.addAll(newCluster.getInstances());
assertConfig(newConfig, newInstances);
Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
}
}
}
@Test
public void testAddAndRemove() throws Exception
{
try ( CuratorFramework client = newClient())
{
client.start();
QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble());
assertConfig(oldConfig, cluster.getInstances());
CountDownLatch latch = setChangeWaiter(client);
try ( TestingCluster newCluster = new TestingCluster(TestingCluster.makeSpecs(1, false)) )
{
newCluster.start();
Collection<InstanceSpec> oldInstances = cluster.getInstances();
InstanceSpec us = cluster.findConnectionInstance(client.getZookeeperClient().getZooKeeper());
InstanceSpec removeSpec = oldInstances.iterator().next();
if ( us.equals(removeSpec) ) {
Iterator<InstanceSpec> iterator = oldInstances.iterator();
iterator.next();
removeSpec = iterator.next();
}
Collection<InstanceSpec> instances = newCluster.getInstances();
client.reconfig().leaving(Integer.toString(removeSpec.getServerId())).joining(toReconfigSpec(instances)).fromConfig(oldConfig.getVersion()).forEnsemble();
Assert.assertTrue(timing.awaitLatch(latch));
byte[] newConfigData = client.getConfig().forEnsemble();
QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
ArrayList<InstanceSpec> newInstances = Lists.newArrayList(oldInstances);
newInstances.addAll(instances);
newInstances.remove(removeSpec);
assertConfig(newConfig, newInstances);
Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
}
}
}
@Test
public void testAddAndRemoveWithEmptyList() throws Exception
{
try ( CuratorFramework client = newClient())
{
client.start();
QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble());
assertConfig(oldConfig, cluster.getInstances());
CountDownLatch latch = setChangeWaiter(client);
Collection<InstanceSpec> oldInstances = cluster.getInstances();
client.reconfig().leaving(Collections.emptyList()).joining(Collections.emptyList()).fromConfig(oldConfig.getVersion()).forEnsemble();
Assert.assertTrue(timing.awaitLatch(latch));
byte[] newConfigData = client.getConfig().forEnsemble();
QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
assertConfig(newConfig, oldInstances);
Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
}
}
@Test
public void testNewMembersWithEmptyList() throws Exception
{
try ( CuratorFramework client = newClient())
{
client.start();
QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble());
assertConfig(oldConfig, cluster.getInstances());
CountDownLatch latch = setChangeWaiter(client);
Collection<InstanceSpec> oldInstances = cluster.getInstances();
client.reconfig().withNewMembers(Collections.emptyList()).fromConfig(oldConfig.getVersion()).forEnsemble();
Assert.assertTrue(timing.awaitLatch(latch));
byte[] newConfigData = client.getConfig().forEnsemble();
QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
assertConfig(newConfig, oldInstances);
Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
}
}
@Test(enabled = false) // it's what this test is inteded to do and it keeps failing - disable for now
public void testNewMembers() throws Exception
{
cluster.close();
cluster = null;
TestingCluster smallCluster = null;
TestingCluster localCluster = new TestingCluster(5);
try
{
List<TestingZooKeeperServer> servers = localCluster.getServers();
List<InstanceSpec> smallClusterInstances = Lists.newArrayList();
for ( int i = 0; i < 3; ++i ) // only start 3 of the 5
{
TestingZooKeeperServer server = servers.get(i);
server.start();
smallClusterInstances.add(server.getInstanceSpec());
}
smallCluster = new TestingCluster(smallClusterInstances);
try ( CuratorFramework client = newClient(smallCluster.getConnectString()))
{
client.start();
QuorumVerifier oldConfig = toQuorumVerifier(client.getConfig().forEnsemble());
Assert.assertEquals(oldConfig.getAllMembers().size(), 5);
assertConfig(oldConfig, localCluster.getInstances());
CountDownLatch latch = setChangeWaiter(client);
client.reconfig().withNewMembers(toReconfigSpec(smallClusterInstances)).forEnsemble();
Assert.assertTrue(timing.awaitLatch(latch));
byte[] newConfigData = client.getConfig().forEnsemble();
QuorumVerifier newConfig = toQuorumVerifier(newConfigData);
Assert.assertEquals(newConfig.getAllMembers().size(), 3);
assertConfig(newConfig, smallClusterInstances);
Assert.assertEquals(EnsembleTracker.configToConnectionString(newConfig), ensembleProvider.getConnectionString());
}
}
finally
{
CloseableUtils.closeQuietly(smallCluster);
CloseableUtils.closeQuietly(localCluster);
}
}
@Test
public void testConfigToConnectionStringIPv4Normal() throws Exception
{
String config = "server.1=10.1.2.3:2888:3888:participant;10.2.3.4:2181";
String configString = EnsembleTracker.configToConnectionString(toQuorumVerifier(config.getBytes()));
Assert.assertEquals("10.2.3.4:2181", configString);
}
@Test
public void testConfigToConnectionStringIPv6Normal() throws Exception
{
String config = "server.1=[1010:0001:0002:0003:0004:0005:0006:0007]:2888:3888:participant;[2001:db8:85a3:0:0:8a2e:370:7334]:2181";
String configString = EnsembleTracker.configToConnectionString(toQuorumVerifier(config.getBytes()));
Assert.assertEquals("2001:db8:85a3:0:0:8a2e:370:7334:2181", configString);
}
@Test
public void testConfigToConnectionStringIPv4NoClientAddr() throws Exception
{
String config = "server.1=10.1.2.3:2888:3888:participant;2181";
String configString = EnsembleTracker.configToConnectionString(toQuorumVerifier(config.getBytes()));
Assert.assertEquals("10.1.2.3:2181", configString);
}
@Test
public void testConfigToConnectionStringIPv4WildcardClientAddr() throws Exception
{
String config = "server.1=10.1.2.3:2888:3888:participant;0.0.0.0:2181";
String configString = EnsembleTracker.configToConnectionString(toQuorumVerifier(config.getBytes()));
Assert.assertEquals("10.1.2.3:2181", configString);
}
@Test
public void testConfigToConnectionStringNoClientAddrOrPort() throws Exception
{
String config = "server.1=10.1.2.3:2888:3888:participant";
String configString = EnsembleTracker.configToConnectionString(toQuorumVerifier(config.getBytes()));
Assert.assertEquals("", configString);
}
@Test
public void testIPv6Wildcard1() throws Exception
{
String config = "server.1=[2001:db8:85a3:0:0:8a2e:370:7334]:2888:3888:participant;[::]:2181";
String configString = EnsembleTracker.configToConnectionString(toQuorumVerifier(config.getBytes()));
Assert.assertEquals("2001:db8:85a3:0:0:8a2e:370:7334:2181", configString);
}
@Test
public void testIPv6Wildcard2() throws Exception
{
String config = "server.1=[1010:0001:0002:0003:0004:0005:0006:0007]:2888:3888:participant;[::0]:2181";
String configString = EnsembleTracker.configToConnectionString(toQuorumVerifier(config.getBytes()));
Assert.assertEquals("1010:1:2:3:4:5:6:7:2181", configString);
}
@Test
public void testMixedIPv1() throws Exception
{
String config = "server.1=10.1.2.3:2888:3888:participant;[::]:2181";
String configString = EnsembleTracker.configToConnectionString(toQuorumVerifier(config.getBytes()));
Assert.assertEquals("10.1.2.3:2181", configString);
}
@Test
public void testMixedIPv2() throws Exception
{
String config = "server.1=[2001:db8:85a3:0:0:8a2e:370:7334]:2888:3888:participant;127.0.0.1:2181";
String configString = EnsembleTracker.configToConnectionString(toQuorumVerifier(config.getBytes()));
Assert.assertEquals("127.0.0.1:2181", configString);
}
@Override
protected void createServer() throws Exception
{
// NOP
}
private CuratorFramework newClient()
{
return newClient(cluster.getConnectString(), true);
}
private CuratorFramework newClient(String connectionString) {
return newClient(connectionString, true);
}
private CuratorFramework newClient(String connectionString, boolean withEnsembleProvider)
{
final AtomicReference<String> connectString = new AtomicReference<>(connectionString);
ensembleProvider = new EnsembleProvider()
{
@Override
public void start() throws Exception
{
}
@Override
public boolean updateServerListEnabled()
{
return false;
}
@Override
public String getConnectionString()
{
return connectString.get();
}
@Override
public void close() throws IOException
{
}
@Override
public void setConnectionString(String connectionString)
{
connectString.set(connectionString);
}
};
return CuratorFrameworkFactory.builder()
.ensembleProvider(ensembleProvider)
.ensembleTracker(withEnsembleProvider)
.sessionTimeoutMs(timing.session())
.connectionTimeoutMs(timing.connection())
.authorization("digest", superUserPassword.getBytes())
.retryPolicy(new ExponentialBackoffRetry(timing.forSleepingABit().milliseconds(), 3))
.build();
}
private CountDownLatch setChangeWaiter(CuratorFramework client) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
if ( event.getType() == Event.EventType.NodeDataChanged )
{
latch.countDown();
}
}
};
client.getConfig().usingWatcher(watcher).forEnsemble();
return latch;
}
private void assertConfig(QuorumVerifier config, Collection<InstanceSpec> instances)
{
for ( InstanceSpec instance : instances )
{
QuorumPeer.QuorumServer quorumServer = config.getAllMembers().get((long)instance.getServerId());
Assert.assertNotNull(quorumServer, String.format("Looking for %s - found %s", instance.getServerId(), config.getAllMembers()));
Assert.assertEquals(quorumServer.clientAddr.getPort(), instance.getPort());
}
}
private List<String> toReconfigSpec(Collection<InstanceSpec> instances) throws Exception
{
String localhost = new InetSocketAddress((InetAddress)null, 0).getAddress().getHostAddress();
List<String> specs = Lists.newArrayList();
for ( InstanceSpec instance : instances ) {
specs.add("server." + instance.getServerId() + "=" + localhost + ":" + instance.getElectionPort() + ":" + instance.getQuorumPort() + ";" + instance.getPort());
}
return specs;
}
private static QuorumVerifier toQuorumVerifier(byte[] bytes) throws Exception
{
Assert.assertNotNull(bytes);
Properties properties = new Properties();
properties.load(new ByteArrayInputStream(bytes));
return new QuorumMaj(properties);
}
}