blob: 133e69025f73704c3a7c22d7d576d1cba14030a1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.imps;
import org.apache.curator.ensemble.EnsembleListener;
import org.apache.curator.ensemble.dynamic.DynamicEnsembleProvider;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.ensemble.EnsembleTracker;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.TestingCluster;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
public class TestReconfiguration
{
private TestingCluster cluster;
private DynamicEnsembleProvider dynamicEnsembleProvider;
private WaitOnDelegateListener waitOnDelegateListener;
private EnsembleTracker ensembleTracker;
private CuratorFramework client;
private String connectionString1to5;
private String connectionString2to5;
private String connectionString3to5;
@BeforeMethod
public void setup() throws Exception
{
cluster = new TestingCluster(5);
cluster.start();
connectionString1to5 = cluster.getConnectString();
connectionString2to5 = getConnectionString(cluster, 2, 3, 4, 5);
connectionString3to5 = getConnectionString(cluster, 3, 4, 5);
dynamicEnsembleProvider = new DynamicEnsembleProvider(connectionString1to5);
client = CuratorFrameworkFactory.builder()
.ensembleProvider(dynamicEnsembleProvider)
.retryPolicy(new RetryOneTime(1))
.build();
client.start();
client.blockUntilConnected();
//Wrap around the dynamic ensemble provider, so that we can wait until it has received the event.
waitOnDelegateListener = new WaitOnDelegateListener(dynamicEnsembleProvider);
ensembleTracker = new EnsembleTracker(client);
ensembleTracker.getListenable().addListener(waitOnDelegateListener);
ensembleTracker.start();
//Wait for the initial event.
waitOnDelegateListener.waitForEvent();
}
@AfterMethod
public void tearDown() throws IOException
{
CloseableUtils.closeQuietly(ensembleTracker);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(cluster);
}
@Test
public void testSyncIncremental() throws Exception
{
Stat stat = new Stat();
byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
Assert.assertNotNull(bytes);
QuorumVerifier qv = getQuorumVerifier(bytes);
Assert.assertEquals(qv.getAllMembers().size(), 5);
String server1 = getServerString(qv, cluster, 1L);
String server2 = getServerString(qv, cluster, 2L);
//Remove Servers
bytes = client.reconfig().leaving("1").fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
qv = getQuorumVerifier(bytes);
Assert.assertEquals(qv.getAllMembers().size(), 4);
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
bytes = client.reconfig().leaving("2").fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
qv = getQuorumVerifier(bytes);
Assert.assertEquals(qv.getAllMembers().size(), 3);
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
//Add Servers
bytes = client.reconfig().joining("server.2=" + server2).fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
qv = getQuorumVerifier(bytes);
Assert.assertEquals(qv.getAllMembers().size(), 4);
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
bytes = client.reconfig().joining("server.1=" + server1).fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
qv = getQuorumVerifier(bytes);
Assert.assertEquals(qv.getAllMembers().size(), 5);
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
}
@Test
public void testAsyncIncremental() throws Exception
{
final AtomicReference<byte[]> bytes = new AtomicReference<>();
final BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
bytes.set(event.getData());
//We only need the latch on getConfig.
if ( event.getContext() != null )
{
((CountDownLatch)event.getContext()).countDown();
}
}
};
CountDownLatch latch = new CountDownLatch(1);
client.getConfig().inBackground(callback, latch).forEnsemble();
latch.await(5, TimeUnit.SECONDS);
Assert.assertNotNull(bytes.get());
QuorumVerifier qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 5);
String server1 = getServerString(qv, cluster, 1L);
String server2 = getServerString(qv, cluster, 2L);
//Remove Servers
client.reconfig().inBackground(callback).leaving("1").fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 4);
client.reconfig().inBackground(callback, latch).leaving("2").fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 3);
//Add Servers
client.reconfig().inBackground(callback, latch).joining("server.2=" + server2).fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 4);
client.reconfig().inBackground(callback, latch).joining("server.1=" + server1).fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 5);
}
@Test
public void testSyncNonIncremental() throws Exception
{
Stat stat = new Stat();
byte[] bytes = client.getConfig().storingStatIn(stat).forEnsemble();
Assert.assertNotNull(bytes);
QuorumVerifier qv = getQuorumVerifier(bytes);
Assert.assertEquals(qv.getAllMembers().size(), 5);
String server1 = getServerString(qv, cluster, 1L);
String server2 = getServerString(qv, cluster, 2L);
String server3 = getServerString(qv, cluster, 3L);
String server4 = getServerString(qv, cluster, 4L);
String server5 = getServerString(qv, cluster, 5L);
//Remove Servers
bytes = client.reconfig()
.adding("server.2=" + server2,
"server.3=" + server3,
"server.4=" + server4,
"server.5=" + server5)
.fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
qv = getQuorumVerifier(bytes);
Assert.assertEquals(qv.getAllMembers().size(), 4);
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
bytes = client.reconfig()
.adding("server.3=" + server3,
"server.4=" + server4,
"server.5=" + server5)
.fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
qv = getQuorumVerifier(bytes);
Assert.assertEquals(qv.getAllMembers().size(), 3);
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
//Add Servers
bytes = client.reconfig()
.adding("server.2=" + server2,
"server.3=" + server3,
"server.4=" + server4,
"server.5=" + server5)
.fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
qv = getQuorumVerifier(bytes);
Assert.assertEquals(qv.getAllMembers().size(), 4);
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
bytes = client.reconfig()
.adding("server.1=" + server1,
"server.2=" + server2,
"server.3=" + server3,
"server.4=" + server4,
"server.5=" + server5)
.fromConfig(qv.getVersion()).storingStatIn(stat).forEnsemble();
qv = getQuorumVerifier(bytes);
Assert.assertEquals(qv.getAllMembers().size(), 5);
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
}
@Test
public void testAsyncNonIncremental() throws Exception
{
final AtomicReference<byte[]> bytes = new AtomicReference<>();
final BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
bytes.set(event.getData());
((CountDownLatch)event.getContext()).countDown();
}
};
CountDownLatch latch = new CountDownLatch(1);
client.getConfig().inBackground(callback, latch).forEnsemble();
latch.await(5, TimeUnit.SECONDS);
Assert.assertNotNull(bytes.get());
QuorumVerifier qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 5);
String server1 = getServerString(qv, cluster, 1L);
String server2 = getServerString(qv, cluster, 2L);
String server3 = getServerString(qv, cluster, 3L);
String server4 = getServerString(qv, cluster, 4L);
String server5 = getServerString(qv, cluster, 5L);
//Remove Servers
client.reconfig().inBackground(callback, latch)
.adding("server.2=" + server2,
"server.3=" + server3,
"server.4=" + server4,
"server.5=" + server5)
.fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 4);
client.reconfig().inBackground(callback, latch)
.adding("server.3=" + server3,
"server.4=" + server4,
"server.5=" + server5)
.fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString3to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 3);
//Add Servers
client.reconfig().inBackground(callback, latch)
.adding("server.2=" + server2,
"server.3=" + server3,
"server.4=" + server4,
"server.5=" + server5)
.fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString2to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 4);
client.reconfig().inBackground(callback, latch)
.adding("server.1=" + server1,
"server.2=" + server2,
"server.3=" + server3,
"server.4=" + server4,
"server.5=" + server5)
.fromConfig(qv.getVersion()).forEnsemble();
waitOnDelegateListener.waitForEvent();
Assert.assertEquals(dynamicEnsembleProvider.getConnectionString(), connectionString1to5);
qv = getQuorumVerifier(bytes.get());
Assert.assertEquals(qv.getAllMembers().size(), 5);
}
static QuorumVerifier getQuorumVerifier(byte[] bytes) throws Exception
{
Properties properties = new Properties();
properties.load(new StringReader(new String(bytes)));
return new QuorumMaj(properties);
}
static InstanceSpec getInstance(TestingCluster cluster, int id)
{
for ( InstanceSpec spec : cluster.getInstances() )
{
if ( spec.getServerId() == id )
{
return spec;
}
}
throw new IllegalStateException("InstanceSpec with id:" + id + " not found");
}
static String getServerString(QuorumVerifier qv, TestingCluster cluster, long id) throws Exception
{
String str = qv.getAllMembers().get(id).toString();
//check if connection string is already there.
if ( str.contains(";") )
{
return str;
}
else
{
return str + ";" + getInstance(cluster, (int)id).getConnectString();
}
}
static String getConnectionString(TestingCluster cluster, long... ids) throws Exception
{
StringBuilder sb = new StringBuilder();
Map<Long, InstanceSpec> specs = new HashMap<>();
for ( InstanceSpec spec : cluster.getInstances() )
{
specs.put((long)spec.getServerId(), spec);
}
for ( long id : ids )
{
if ( sb.length() != 0 )
{
sb.append(",");
}
sb.append(specs.get(id).getConnectString());
}
return sb.toString();
}
//Simple EnsembleListener that can wait until the delegate handles the event.
private static class WaitOnDelegateListener implements EnsembleListener
{
private CountDownLatch latch = new CountDownLatch(1);
private final EnsembleListener delegate;
private WaitOnDelegateListener(EnsembleListener delegate)
{
this.delegate = delegate;
}
@Override
public void connectionStringUpdated(String connectionString)
{
delegate.connectionStringUpdated(connectionString);
latch.countDown();
}
public void waitForEvent() throws InterruptedException, TimeoutException
{
if ( latch.await(5, TimeUnit.SECONDS) )
{
latch = new CountDownLatch(1);
}
else
{
throw new TimeoutException("Failed to receive event in time.");
}
}
}
}