blob: a71399614e4c8917ccfd646df55dcfc6f1e92428 [file] [log] [blame]
package org.apache.helix.manager.zk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.helix.HelixException;
import org.apache.helix.TestHelper;
import org.apache.helix.ZkTestHelper;
import org.apache.helix.ZkUnitTestBase;
import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
import org.apache.helix.monitoring.mbeans.ZkClientMonitor;
import org.apache.helix.monitoring.mbeans.ZkClientPathMonitor;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.ZkServer;
import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException;
import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestRawZkClient extends ZkUnitTestBase {
private final String TEST_TAG = "test_monitor";
private final String TEST_ROOT = "/my_cluster/IDEALSTATES";
private ZkClient _zkClient;
@BeforeClass
public void beforeClass() {
_zkClient = new ZkClient(ZK_ADDR);
}
@AfterClass
public void afterClass() {
_zkClient.delete(TEST_ROOT);
_zkClient.deleteRecursively("/tmp");
_zkClient.deleteRecursively("/my_cluster");
_zkClient.close();
}
@Test()
void testGetStat() {
String path = "/tmp/getStatTest";
_zkClient.deleteRecursively(path);
Stat stat, newStat;
stat = _zkClient.getStat(path);
AssertJUnit.assertNull(stat);
_zkClient.createPersistent(path, true);
stat = _zkClient.getStat(path);
AssertJUnit.assertNotNull(stat);
newStat = _zkClient.getStat(path);
AssertJUnit.assertEquals(stat, newStat);
_zkClient.writeData(path, "Test");
newStat = _zkClient.getStat(path);
AssertJUnit.assertNotSame(stat, newStat);
}
/*
* Tests subscribing state changes for helix's IZkStateListener.
*/
@Test
public void testSubscribeStateChanges() {
int numListeners = _zkClient.numberOfListeners();
List<IZkStateListener> listeners = new ArrayList<>();
// Subscribe multiple listeners to test that listener's hashcode works as expected.
// Each listener is subscribed and unsubscribed successfully.
for (int i = 0; i < 3; i++) {
IZkStateListener listener = new IZkStateListener() {
@Override
public void handleStateChanged(KeeperState state) {
System.out.println("Handle new state: " + state);
}
@Override
public void handleNewSession(final String sessionId) {
System.out.println("Handle new session: " + sessionId);
}
@Override
public void handleSessionEstablishmentError(Throwable error) {
System.out.println("Handle session establishment error: " + error);
}
};
_zkClient.subscribeStateChanges(listener);
Assert.assertEquals(_zkClient.numberOfListeners(), ++numListeners);
listeners.add(listener);
}
for (IZkStateListener listener : listeners) {
_zkClient.unsubscribeStateChanges(listener);
Assert.assertEquals(_zkClient.numberOfListeners(), --numListeners);
}
}
/*
* Tests session expiry for the helix's IZkStateListener.
*/
@Test
void testSessionExpiry()
throws Exception {
long lastSessionId = _zkClient.getSessionId();
// Test multiple times to make sure each time the new session id is increasing.
for (int i = 0; i < 3; i++) {
ZkTestHelper.expireSession(_zkClient);
long newSessionId = _zkClient.getSessionId();
Assert.assertTrue(newSessionId != lastSessionId,
"New session id should not equal to expired session id.");
lastSessionId = newSessionId;
}
}
/*
* Tests state changes subscription for I0Itec's IZkStateListener.
* This is a test for backward compatibility.
*
* TODO: remove this test when getting rid of I0Itec.
*/
@Test
public void testSubscribeStateChangesForI0ItecIZkStateListener() {
int numListeners = _zkClient.numberOfListeners();
List<org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener> listeners =
new ArrayList<>();
// Subscribe multiple listeners to test that listener's hashcode works as expected.
// Each listener is subscribed and unsubscribed successfully.
for (int i = 0; i < 3; i++) {
org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener =
new org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener() {
@Override
public void handleStateChanged(KeeperState state) {
System.out.println("Handle new state: " + state);
}
@Override
public void handleNewSession() {
System.out.println("Handle new session: ");
}
@Override
public void handleSessionEstablishmentError(Throwable error) {
System.out.println("Handle session establishment error: " + error);
}
};
_zkClient.subscribeStateChanges(listener);
Assert.assertEquals(_zkClient.numberOfListeners(), ++numListeners);
// Try to subscribe the listener again but number of listeners should not change because the
// listener already exists.
_zkClient.subscribeStateChanges(listener);
Assert.assertEquals(_zkClient.numberOfListeners(), numListeners);
listeners.add(listener);
}
for (org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener : listeners) {
_zkClient.unsubscribeStateChanges(listener);
Assert.assertEquals(_zkClient.numberOfListeners(), --numListeners);
}
}
/*
* Tests session expiry for I0Itec's IZkStateListener.
* This is a test for backward compatibility.
*
* TODO: remove this test when getting rid of I0Itec.
*/
@Test
public void testSessionExpiryForI0IItecZkStateListener()
throws Exception {
org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener =
new org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener() {
@Override
public void handleStateChanged(KeeperState state) {
System.out.println("In Old connection New state " + state);
}
@Override
public void handleNewSession() {
System.out.println("In Old connection New session");
}
@Override
public void handleSessionEstablishmentError(Throwable var1) {
}
};
_zkClient.subscribeStateChanges(listener);
ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
ZooKeeper zookeeper = connection.getZookeeper();
long oldSessionId = zookeeper.getSessionId();
System.out.println("old sessionId= " + oldSessionId);
Watcher watcher = event -> System.out.println("In New connection In process event:" + event);
ZooKeeper newZookeeper =
new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(), watcher,
zookeeper.getSessionId(), zookeeper.getSessionPasswd());
Thread.sleep(3000);
System.out.println("New sessionId= " + newZookeeper.getSessionId());
Thread.sleep(3000);
newZookeeper.close();
Thread.sleep(10000);
connection = ((ZkConnection) _zkClient.getConnection());
zookeeper = connection.getZookeeper();
long newSessionId = zookeeper.getSessionId();
System.out.println("After session expiry sessionId= " + newSessionId);
_zkClient.unsubscribeStateChanges(listener);
}
@Test
public void testZkClientMonitor()
throws Exception {
final String TEST_KEY = "testZkClientMonitor";
ZkClient.Builder builder = new ZkClient.Builder();
builder.setZkServer(ZK_ADDR).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG)
.setMonitorRootPathOnly(false);
ZkClient zkClient = builder.build();
String TEST_DATA = "testData";
String TEST_NODE = "/test_zkclient_monitor";
String TEST_PATH = TEST_ROOT + TEST_NODE;
final long TEST_DATA_SIZE = zkClient.serialize(TEST_DATA, TEST_PATH).length;
if (_zkClient.exists(TEST_PATH)) {
_zkClient.delete(TEST_PATH);
}
if (!_zkClient.exists(TEST_ROOT)) {
_zkClient.createPersistent(TEST_ROOT, true);
}
MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName name = MBeanRegistrar
.buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY);
ObjectName rootname = MBeanRegistrar
.buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, ZkClientPathMonitor.MONITOR_PATH,
"Root");
ObjectName idealStatename = MBeanRegistrar
.buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY, ZkClientPathMonitor.MONITOR_PATH,
"IdealStates");
Assert.assertTrue(beanServer.isRegistered(rootname));
Assert.assertTrue(beanServer.isRegistered(idealStatename));
Assert.assertEquals((long) beanServer.getAttribute(name, "DataChangeEventCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(name, "StateChangeEventCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(name, "OutstandingRequestGauge"), 0);
Assert.assertEquals((long) beanServer.getAttribute(name, "TotalCallbackCounter"), 0);
// Test exists
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max"), 0);
zkClient.exists(TEST_ROOT);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1);
Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter") >= 0);
Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadLatencyGauge.Max") >= 0);
// Test create
Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max"), 0);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter"),
0);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max"), 0);
zkClient.create(TEST_PATH, TEST_DATA, CreateMode.PERSISTENT);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 1);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"),
TEST_DATA_SIZE);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 1);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"),
TEST_DATA_SIZE);
long origWriteTotalLatencyCounter =
(long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter");
Assert.assertTrue(origWriteTotalLatencyCounter >= 0);
Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteLatencyGauge.Max") >= 0);
long origIdealStatesWriteTotalLatencyCounter =
(long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter");
Assert.assertTrue(origIdealStatesWriteTotalLatencyCounter >= 0);
Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteLatencyGauge.Max") >= 0);
// Test read
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 1);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 0);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"), 0);
long origReadTotalLatencyCounter =
(long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter");
long origIdealStatesReadTotalLatencyCounter =
(long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter");
Assert.assertEquals(origIdealStatesReadTotalLatencyCounter, 0);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max"), 0);
zkClient.readData(TEST_PATH, new Stat());
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 2);
Assert
.assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 1);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
TEST_DATA_SIZE);
Assert.assertTrue((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter")
>= origReadTotalLatencyCounter);
Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadTotalLatencyCounter")
>= origIdealStatesReadTotalLatencyCounter);
Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "ReadLatencyGauge.Max") >= 0);
zkClient.getChildren(TEST_PATH);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 3);
Assert
.assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 2);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
TEST_DATA_SIZE);
zkClient.getStat(TEST_PATH);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 4);
Assert
.assertEquals((long) beanServer.getAttribute(rootname, "ReadBytesCounter"), TEST_DATA_SIZE);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadCounter"), 3);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "ReadBytesCounter"),
TEST_DATA_SIZE);
zkClient.readDataAndStat(TEST_PATH, new Stat(), true);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 5);
ZkAsyncCallbacks.ExistsCallbackHandler callbackHandler =
new ZkAsyncCallbacks.ExistsCallbackHandler();
zkClient.asyncExists(TEST_PATH, callbackHandler);
callbackHandler.waitForSuccess();
Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 6);
// Test write
zkClient.writeData(TEST_PATH, TEST_DATA);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteCounter"), 2);
Assert.assertEquals((long) beanServer.getAttribute(rootname, "WriteBytesCounter"),
TEST_DATA_SIZE * 2);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteCounter"), 2);
Assert.assertEquals((long) beanServer.getAttribute(idealStatename, "WriteBytesCounter"),
TEST_DATA_SIZE * 2);
Assert.assertTrue((long) beanServer.getAttribute(rootname, "WriteTotalLatencyCounter")
>= origWriteTotalLatencyCounter);
Assert.assertTrue((long) beanServer.getAttribute(idealStatename, "WriteTotalLatencyCounter")
>= origIdealStatesWriteTotalLatencyCounter);
// Test data change count
final Lock lock = new ReentrantLock();
final Condition callbackFinish = lock.newCondition();
zkClient.subscribeDataChanges(TEST_PATH, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) {
callbackLock();
}
@Override
public void handleDataDeleted(String dataPath) {
callbackLock();
}
private void callbackLock() {
lock.lock();
try {
callbackFinish.signal();
} finally {
lock.unlock();
}
}
});
lock.lock();
_zkClient.writeData(TEST_PATH, "Test");
Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
Assert.assertEquals((long) beanServer.getAttribute(name, "DataChangeEventCounter"), 1);
Assert.assertEquals((long) beanServer.getAttribute(name, "OutstandingRequestGauge"), 0);
Assert.assertEquals((long) beanServer.getAttribute(name, "TotalCallbackCounter"), 1);
Assert.assertEquals((long) beanServer.getAttribute(name, "TotalCallbackHandledCounter"), 1);
Assert.assertEquals((long) beanServer.getAttribute(name, "PendingCallbackGauge"), 0);
// Simulate a delayed callback
int waitTime = 10;
Thread.sleep(waitTime);
lock.lock();
zkClient.process(new WatchedEvent(Watcher.Event.EventType.NodeDataChanged, null, TEST_PATH));
Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
Assert.assertTrue(
(long) beanServer.getAttribute(rootname, "DataPropagationLatencyGauge.Max") >= waitTime);
_zkClient.delete(TEST_PATH);
}
@Test(dependsOnMethods = "testZkClientMonitor")
void testPendingRequestGauge()
throws Exception {
final String TEST_KEY = "testPendingRequestGauge";
final MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
final ObjectName name = MBeanRegistrar
.buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY);
final int zkPort = TestHelper.getRandomPort();
final String zkAddr = String.format("localhost:%d", zkPort);
final ZkServer zkServer = TestHelper.startZkServer(zkAddr);
try {
ZkClient.Builder builder = new ZkClient.Builder();
builder.setZkServer(zkAddr).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG)
.setMonitorRootPathOnly(true);
final ZkClient zkClient = builder.build();
zkServer.shutdown();
zkClient.waitForKeeperState(KeeperState.Disconnected, 5000, TimeUnit.MILLISECONDS);
Assert.assertFalse(zkClient.waitUntilConnected(0, TimeUnit.MILLISECONDS));
Assert.assertEquals((long) beanServer.getAttribute(name, "OutstandingRequestGauge"), 0);
// Request a read in a separate thread. This will be a pending request
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
zkClient.exists(TEST_ROOT);
});
Assert.assertTrue(TestHelper
.verify(() -> (long) beanServer.getAttribute(name, "OutstandingRequestGauge") == 1,
1000));
zkServer.start();
Assert.assertTrue(zkClient.waitUntilConnected(5000, TimeUnit.MILLISECONDS));
Assert.assertTrue(TestHelper
.verify(() -> (long) beanServer.getAttribute(name, "OutstandingRequestGauge") == 0,
2000));
zkClient.close();
} finally {
zkServer.shutdown();
}
}
/*
* This test checks that a valid session can successfully create an ephemeral node.
*/
@Test
public void testCreateEphemeralWithValidSession()
throws Exception {
final String className = TestHelper.getTestClassName();
final String methodName = TestHelper.getTestMethodName();
final String clusterName = className + "_" + methodName;
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"TestDB", // resource name prefix
1, // resources
10, // partitions per resource
5, // number of nodes
3, // replicas
"MasterSlave", true); // do rebalance
final String originalSessionId = ZKUtil.toHexSessionId(_zkClient.getSessionId());
final String path = "/" + methodName;
final String data = "Hello Helix";
// Verify the node is not existed yet.
Assert.assertFalse(_zkClient.exists(path));
// Wait until the ZkClient has got a new session.
Assert.assertTrue(TestHelper
.verify(() -> _zkClient.getConnection().getZookeeperState().isConnected(), 1000L));
try {
// Create ephemeral node.
_zkClient.createEphemeral(path, data, originalSessionId);
} catch (Exception ex) {
Assert.fail("Failed to create ephemeral node.", ex);
}
// Verify the node is created and its data is correct.
Stat stat = new Stat();
String nodeData = _zkClient.readData(path, stat, true);
Assert.assertNotNull(nodeData, "Failed to create ephemeral node: " + path);
Assert.assertEquals(nodeData, data, "Data is not correct.");
Assert.assertTrue(stat.getEphemeralOwner() != 0L,
"Ephemeral owner should NOT be zero because the node is an ephemeral node.");
Assert.assertEquals(ZKUtil.toHexSessionId(stat.getEphemeralOwner()), originalSessionId,
"Ephemeral node is created by an unexpected session");
// Delete the node to clean up, otherwise, the ephemeral node would be existed
// until the session of its owner expires.
_zkClient.delete(path);
}
/*
* This test checks that ephemeral creation fails because the expected zk session does not match
* the actual zk session.
* How this test does is:
* 1. Creates a zk client and gets its original session id
* 2. Expires the original session, and a new session will be created
* 3. Tries to create ephemeral node with the original session
* 4. ZkSessionMismatchedException is expected for the creation and ephemeral node is not created
*/
@Test
public void testCreateEphemeralWithMismatchedSession()
throws Exception {
final String className = TestHelper.getTestClassName();
final String methodName = TestHelper.getTestMethodName();
final String clusterName = className + "_" + methodName;
TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
"localhost", // participant name prefix
"TestDB", // resource name prefix
1, // resources
10, // partitions per resource
5, // number of nodes
3, // replicas
"MasterSlave", true); // do rebalance
final long originalSessionId = _zkClient.getSessionId();
final String originalHexSessionId = ZKUtil.toHexSessionId(originalSessionId);
final String path = "/" + methodName;
// Verify the node is not existed.
Assert.assertFalse(_zkClient.exists(path));
// Expire the original session.
ZkTestHelper.expireSession(_zkClient);
// Wait until the ZkClient has got a new session.
Assert.assertTrue(TestHelper.verify(() -> {
try {
// New session id should not equal to expired session id.
return _zkClient.getSessionId() != originalSessionId;
} catch (HelixException ex) {
return false;
}
}, 1000L));
try {
// Try to create ephemeral node with the original session.
// This creation should NOT be successful because the original session is already expired.
_zkClient.createEphemeral(path, "Hello Helix", originalHexSessionId);
Assert.fail("Ephemeral node should not be created by the expired session.");
} catch (ZkSessionMismatchedException expected) {
// Expected because there is a session mismatch.
} catch (Exception unexpected) {
Assert.fail("Should not have thrown exception: " + unexpected);
}
// Verify the node is not created.
Assert.assertFalse(_zkClient.exists(path));
}
/*
* Tests that when trying to create an ephemeral node, if connection to zk service is lost,
* ConnectionLossException will be thrown in retryUntilConnected and ephemeral creation will fail.
* Because ConnectionLossException is not thrown in createEphemeral(), operation retry timeout is
* set to 3 seconds and then ZkTimeoutException is thrown. And retry cause message is checked to
* see if ConnectionLossException was thrown before retry.
*/
@Test(timeOut = 5 * 60 * 1000L)
public void testConnectionLossWhileCreateEphemeral()
throws Exception {
final String methodName = TestHelper.getTestMethodName();
final ZkClient zkClient =
new ZkClient.Builder().setZkServer(ZK_ADDR).setOperationRetryTimeout(3000L) // 3 seconds
.build();
final String expectedSessionId = ZKUtil.toHexSessionId(zkClient.getSessionId());
final String path = "/" + methodName;
final String data = "data";
Assert.assertFalse(zkClient.exists(path));
// Shutdown zk server so zk operations will fail due to disconnection.
TestHelper.stopZkServer(_zkServer);
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicBoolean running = new AtomicBoolean(true);
final Thread creationThread = new Thread(() -> {
while (running.get()) {
try {
// Create ephemeral node in the expected session id.
zkClient.createEphemeral(path, data, expectedSessionId);
} catch (ZkTimeoutException e) {
// Verify ConnectionLossException was thrown before retry and timeout.
if (e.getMessage().endsWith("Retry was caused by "
+ KeeperException.ConnectionLossException.Code.CONNECTIONLOSS)) {
running.set(false);
}
}
}
countDownLatch.countDown();
});
creationThread.start();
final boolean creationThreadTerminated = countDownLatch.await(10L, TimeUnit.SECONDS);
if (!creationThreadTerminated) {
running.set(false);
creationThread.join(5000L);
Assert.fail("Failed to receive a ConnectionLossException after zookeeper has shutdown.");
}
} finally {
zkClient.close();
// Recover zk server.
_zkServer.start();
}
}
/*
* Tests that when trying to create an ephemeral node, if connection to zk service is lost,
* the creation operation will keep retrying until connected and finally successfully create the
* node.
* How this test simulates the steps is:
* 1. Shuts down zk server
* 2. Starts a creation thread to create an ephemeral node in the original zk session
* 3. Creation operation loses connection and keeps retrying
* 4. Restarts zk server and Zk session is recovered
* 5. zk client reconnects successfully and creates an ephemeral node
*/
@Test(timeOut = 5 * 60 * 1000L)
public void testRetryUntilConnectedAfterConnectionLoss()
throws Exception {
final String methodName = TestHelper.getTestMethodName();
final String expectedSessionId = ZKUtil.toHexSessionId(_zkClient.getSessionId());
final String path = "/" + methodName;
final String data = "data";
Assert.assertFalse(_zkClient.exists(path));
// Shutdown zk server so zk operations will fail due to disconnection.
TestHelper.stopZkServer(_zkServer);
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicBoolean running = new AtomicBoolean(true);
final Thread creationThread = new Thread(() -> {
while (running.get()) {
// Create ephemeral node in the expected session id.
System.out.println("Trying to create ephemeral node...");
_zkClient.createEphemeral(path, data, expectedSessionId);
System.out.println("Ephemeral node created.");
running.set(false);
}
countDownLatch.countDown();
});
creationThread.start();
// Keep creation thread retrying to connect for 10 seconds.
System.out.println("Keep creation thread retrying to connect for 10 seconds...");
TimeUnit.SECONDS.sleep(10);
System.out.println("Restarting zk server...");
_zkServer.start();
// Wait for creating ephemeral node successfully.
final boolean creationThreadTerminated = countDownLatch.await(10, TimeUnit.SECONDS);
if (!creationThreadTerminated) {
running.set(false);
creationThread.join(5000L);
Assert.fail("Failed to reconnect to zk server and create ephemeral node"
+ " after zk server is recovered.");
}
Stat stat = new Stat();
String nodeData = _zkClient.readData(path, stat, true);
Assert.assertNotNull(nodeData, "Failed to create ephemeral node: " + path);
Assert.assertEquals(nodeData, data, "Data is not correct.");
Assert.assertTrue(stat.getEphemeralOwner() != 0L,
"Ephemeral owner should NOT be zero because the node is an ephemeral node.");
Assert.assertEquals(ZKUtil.toHexSessionId(stat.getEphemeralOwner()), expectedSessionId,
"Ephemeral node is created by an unexpected session");
// Delete the node to clean up, otherwise, the ephemeral node would be existed until the session
// of its owner expires.
_zkClient.delete(path);
}
@Test
public void testWaitForEstablishedSession() {
ZkClient zkClient = new ZkClient(ZK_ADDR);
Assert.assertTrue(zkClient.waitForEstablishedSession(1, TimeUnit.SECONDS) != 0L);
TestHelper.stopZkServer(_zkServer);
Assert.assertTrue(zkClient.waitForKeeperState(KeeperState.Disconnected, 1, TimeUnit.SECONDS));
try {
zkClient.waitForEstablishedSession(3, TimeUnit.SECONDS);
Assert.fail("Connecting to zk server should time out and ZkTimeoutException is expected.");
} catch (ZkTimeoutException expected) {
// Because zk server is shutdown, zkClient should not connect to zk server and a
// ZkTimeoutException should be thrown.
}
zkClient.close();
// Recover zk server for later tests.
_zkServer.start();
}
}