blob: c3ebcf08006275d3720cdb1d23cc44f8716c8de2 [file] [log] [blame]
package org.apache.helix;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.testng.Assert;
import org.testng.annotations.Test;
/**
* test zookeeper basis
*/
public class TestZkBasis extends ZkUnitTestBase {
class ZkListener implements IZkDataListener, IZkChildListener {
String _parentPath = null;
String _dataDeletePath = null;
List<String> _currentChilds = Collections.emptyList(); // make sure it's set to null in
// #handleChildChange()
CountDownLatch _childChangeCountDown = new CountDownLatch(1);
CountDownLatch _dataDeleteCountDown = new CountDownLatch(1);
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) {
_parentPath = parentPath;
_currentChilds = currentChilds;
_childChangeCountDown.countDown();
}
@Override
public void handleDataChange(String dataPath, Object data) {
// To change body of implemented methods use File | Settings | File Templates.
}
@Override
public void handleDataDeleted(String dataPath) {
_dataDeletePath = dataPath;
_dataDeleteCountDown.countDown();
}
}
@Test
public void testZkSessionExpiry() throws Exception {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
ZkClient client = new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
String path = String.format("/%s", clusterName);
client.createEphemeral(path);
String oldSessionId = ZkTestHelper.getSessionId(client);
ZkTestHelper.expireSession(client);
String newSessionId = ZkTestHelper.getSessionId(client);
Assert.assertNotSame(newSessionId, oldSessionId);
Assert.assertFalse(client.exists(path), "Ephemeral znode should be gone after session expiry");
client.close();
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
@Test
public void testCloseZkClient() {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
ZkClient client = new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
String path = String.format("/%s", clusterName);
client.createEphemeral(path);
client.close();
Assert.assertFalse(_gZkClient.exists(path),
"Ephemeral node: " + path + " should be removed after ZkClient#close()");
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
@Test
public void testCloseZkClientInZkClientEventThread() throws Exception {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
final CountDownLatch waitCallback = new CountDownLatch(1);
final ZkClient client = new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
String path = String.format("/%s", clusterName);
client.createEphemeral(path);
client.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) {
}
@Override
public void handleDataChange(String dataPath, Object data) {
client.close();
waitCallback.countDown();
}
});
client.writeData(path, new ZNRecord("test"));
waitCallback.await();
Assert.assertFalse(_gZkClient.exists(path), "Ephemeral node: " + path
+ " should be removed after ZkClient#close() in its own event-thread");
System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
}
/**
* test zk watchers are renewed automatically after session expiry
* zookeeper-client side keeps all registered watchers see ZooKeeper.WatchRegistration.register()
* after session expiry, all watchers are renewed
* if a path that has watches on it has been removed during session expiry,
* the watchers on that path will still get callbacks after session renewal, especially:
* a data-watch will get data-deleted callback
* a child-watch will get a child-change callback with current-child-list = null
* this can be used for cleanup watchers on the zookeeper-client side
*/
@Test
public void testWatchRenew() throws Exception {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String testName = className + "_" + methodName;
final ZkClient client = new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
// make sure "/testName/test" doesn't exist
final String path = "/" + testName + "/test";
client.delete(path);
ZkListener listener = new ZkListener();
client.subscribeDataChanges(path, listener);
client.subscribeChildChanges(path, listener);
ZkTestHelper.expireSession(client);
boolean succeed = listener._childChangeCountDown.await(10, TimeUnit.SECONDS);
Assert.assertTrue(succeed,
"fail to wait on child-change count-down in 10 seconds after session-expiry");
Assert.assertEquals(listener._parentPath, path,
"fail to get child-change callback after session-expiry");
Assert.assertNull(listener._currentChilds,
"fail to get child-change callback with currentChilds=null after session expiry");
succeed = listener._dataDeleteCountDown.await(10, TimeUnit.SECONDS);
Assert.assertTrue(succeed,
"fail to wait on data-delete count-down in 10 seconds after session-expiry");
Assert.assertEquals(listener._dataDeletePath, path,
"fail to get data-delete callback after session-expiry");
client.close();
}
/**
* after calling zkclient#unsubscribeXXXListener()
* an already registered watch will not be removed from ZooKeeper#watchManager#XXXWatches
* immediately.
* the watch will get removed on the following conditions:
* 1) there is a set/delete on the listening path via the zkclient
* 2) session expiry on the zkclient (i.e. the watch will not be renewed after session expiry)
* @throws Exception
*/
@Test
public void testWatchRemove() throws Exception {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String testName = className + "_" + methodName;
final ZkClient client = new ZkClient(ZK_ADDR, HelixZkClient.DEFAULT_SESSION_TIMEOUT,
HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
// make sure "/testName/test" doesn't exist
final String path = "/" + testName + "/test";
client.createPersistent(path, true);
ZkListener listener = new ZkListener();
client.subscribeDataChanges(path, listener);
client.subscribeChildChanges(path, listener);
// listener should be in both ZkClient#_dataListener and ZkClient#_childListener set
Map<String, Set<IZkDataListener>> dataListenerMap = ZkTestHelper.getZkDataListener(client);
Assert.assertEquals(dataListenerMap.size(), 1, "ZkClient#_dataListener should have 1 listener");
Set<IZkDataListener> dataListenerSet = dataListenerMap.get(path);
Assert.assertNotNull(dataListenerSet,
"ZkClient#_dataListener should have 1 listener on path: " + path);
Assert.assertEquals(dataListenerSet.size(), 1,
"ZkClient#_dataListener should have 1 listener on path: " + path);
Map<String, Set<IZkChildListener>> childListenerMap = ZkTestHelper.getZkChildListener(client);
Assert.assertEquals(childListenerMap.size(), 1,
"ZkClient#_childListener should have 1 listener");
Set<IZkChildListener> childListenerSet = childListenerMap.get(path);
Assert.assertNotNull(childListenerSet,
"ZkClient#_childListener should have 1 listener on path: " + path);
Assert.assertEquals(childListenerSet.size(), 1,
"ZkClient#_childListener should have 1 listener on path: " + path);
// watch should be in ZooKeeper#watchManager#XXXWatches
Map<String, List<String>> watchMap = ZkTestHelper.getZkWatch(client);
// System.out.println("watchMap1: " + watchMap);
List<String> dataWatch = watchMap.get("dataWatches");
Assert.assertNotNull(dataWatch,
"ZooKeeper#watchManager#dataWatches should have 1 data watch on path: " + path);
Assert.assertEquals(dataWatch.size(), 1,
"ZooKeeper#watchManager#dataWatches should have 1 data watch on path: " + path);
Assert.assertEquals(dataWatch.get(0), path,
"ZooKeeper#watchManager#dataWatches should have 1 data watch on path: " + path);
List<String> childWatch = watchMap.get("childWatches");
Assert.assertNotNull(childWatch,
"ZooKeeper#watchManager#childWatches should have 1 child watch on path: " + path);
Assert.assertEquals(childWatch.size(), 1,
"ZooKeeper#watchManager#childWatches should have 1 child watch on path: " + path);
Assert.assertEquals(childWatch.get(0), path,
"ZooKeeper#watchManager#childWatches should have 1 child watch on path: " + path);
client.unsubscribeDataChanges(path, listener);
client.unsubscribeChildChanges(path, listener);
// System.out.println("watchMap2: " + watchMap);
ZkTestHelper.expireSession(client);
// after session expiry, those watches should be removed
watchMap = ZkTestHelper.getZkWatch(client);
// System.out.println("watchMap3: " + watchMap);
dataWatch = watchMap.get("dataWatches");
Assert.assertTrue(dataWatch.isEmpty(), "ZooKeeper#watchManager#dataWatches should be empty");
childWatch = watchMap.get("childWatches");
Assert.assertTrue(childWatch.isEmpty(), "ZooKeeper#watchManager#childWatches should be empty");
client.close();
deleteCluster(testName);
}
}