blob: 6e8fcbbbabcfa4833b28c07cb30d4d986e654bd3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.leader;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.BaseClassForTests;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test cases designed after CURATOR-45.
*/
@Tag("master")
public class TestLeaderSelectorEdges extends BaseClassForTests {
private final Logger log = LoggerFactory.getLogger(getClass());
@BeforeAll
public static void setCNXFactory() {
System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, ChaosMonkeyCnxnFactory.class.getName());
}
@AfterAll
public static void resetCNXFactory() {
System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
}
/**
* Create a LeaderSelector but close the connection right after the "lock" znode
* has been created.
*/
@Test
public void flappingTest() throws Exception {
final CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryNTimes(1, 500))
.sessionTimeoutMs(30000)
.build();
final TestLeaderSelectorListener listener = new TestLeaderSelectorListener();
LeaderSelector leaderSelector1 = new LeaderSelector(client, ChaosMonkeyCnxnFactory.CHAOS_ZNODE, listener);
LeaderSelector leaderSelector2 = null;
client.start();
try {
client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
leaderSelector1.start();
// At this point the ChaosMonkeyZookeeperServer must close the connection
// right after the lock znode is created.
assertTrue(listener.reconnected.await(10, TimeUnit.SECONDS), "Connection has not been lost");
// Check that leadership has failed
assertEquals(listener.takeLeadership.getCount(), 1);
// Wait FailedDelete
Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
// Check that there is no znode
final int children = client.getChildren()
.forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE)
.size();
assertEquals(
children, 0, "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
// Check that a new LeaderSelector can be started
leaderSelector2 = new LeaderSelector(client, ChaosMonkeyCnxnFactory.CHAOS_ZNODE, listener);
leaderSelector2.start();
assertTrue(listener.takeLeadership.await(1, TimeUnit.SECONDS));
} finally {
try {
leaderSelector1.close();
} catch (IllegalStateException e) {
fail(e.getMessage());
}
try {
if (leaderSelector2 != null) {
leaderSelector2.close();
}
} catch (IllegalStateException e) {
fail(e.getMessage());
}
client.close();
}
}
private class TestLeaderSelectorListener implements LeaderSelectorListener {
final CountDownLatch takeLeadership = new CountDownLatch(1);
final CountDownLatch reconnected = new CountDownLatch(1);
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
log.info("-->takeLeadership({})", client.toString());
takeLeadership.countDown();
log.info("<--takeLeadership({})", client.toString());
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.RECONNECTED) {
reconnected.countDown();
}
}
}
/**
* Create a protected node in background with a retry policy
*/
@Test
public void createProtectedNodeInBackgroundTest() throws Exception {
final CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryNTimes(2, 100))
.connectionTimeoutMs(1000)
.sessionTimeoutMs(60000)
.build();
final CountDownLatch latch = new CountDownLatch(1);
client.start();
try {
client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
client.create()
.withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
log.info("Receive event {}", event.toString());
if (event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue()) {
latch.countDown();
}
}
})
.forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE_PREFIX + "foo-");
assertTrue(latch.await(30, TimeUnit.SECONDS), "Callback has not been called");
// Wait for the znode to be deleted
Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
// Check that there is no znode
final int children = client.getChildren()
.forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE)
.size();
assertEquals(
children, 0, "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
} finally {
client.close();
}
}
/**
* Same test as above but without a retry policy
*/
@Test
public void createProtectedNodeInBackgroundTestNoRetry() throws Exception {
final CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryNTimes(0, 0))
.connectionTimeoutMs(1000)
.sessionTimeoutMs(60000)
.build();
final CountDownLatch latch = new CountDownLatch(1);
client.start();
try {
client.create().forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE);
client.create()
.withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
log.info("Receive event {}", event.toString());
if (event.getResultCode() == KeeperException.Code.CONNECTIONLOSS.intValue()) {
latch.countDown();
}
}
})
.forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE_PREFIX + "foo-");
assertTrue(latch.await(30, TimeUnit.SECONDS), "Callback has not been called");
// Wait for the znode to be deleted
Thread.sleep(ChaosMonkeyCnxnFactory.LOCKOUT_DURATION_MS * 2);
// Check that there is no znode
final int children = client.getChildren()
.forPath(ChaosMonkeyCnxnFactory.CHAOS_ZNODE)
.size();
assertEquals(
children, 0, "Still " + children + " znodes under " + ChaosMonkeyCnxnFactory.CHAOS_ZNODE + " lock");
} finally {
client.close();
}
}
}