blob: 931bb6f15f96645eba618ec79c46815421dba160 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zookeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.NoWatcherException;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.WatcherType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.test.ClientBase;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Verifies removing watches using ZooKeeper client apis
*/
@RunWith(Parameterized.class)
@Parameterized.UseParametersRunnerFactory(ZKParameterized.RunnerFactory.class)
public class RemoveWatchesTest extends ClientBase {
private static final Logger LOG = LoggerFactory
.getLogger(RemoveWatchesTest.class);
private ZooKeeper zk1 = null;
private ZooKeeper zk2 = null;
@Override
public void setUp() throws Exception {
super.setUp();
zk1 = createClient();
zk2 = createClient();
}
@Override
public void tearDown() throws Exception {
if (zk1 != null)
zk1.close();
if (zk2 != null)
zk2.close();
super.tearDown();
}
private final boolean useAsync;
public RemoveWatchesTest(boolean useAsync) {
this.useAsync = useAsync;
}
@Parameters
public static Collection<Object[]> configs() {
return Arrays.asList(new Object[][] { { false }, { true }, });
}
private void removeWatches(ZooKeeper zk, String path, Watcher watcher,
WatcherType watcherType, boolean local, KeeperException.Code rc)
throws InterruptedException, KeeperException {
LOG.info(
"Sending removeWatches req using zk {} path: {} type: {} watcher: {} ",
new Object[] { zk, path, watcherType, watcher });
if (useAsync) {
MyCallback c1 = new MyCallback(rc.intValue(), path);
zk.removeWatches(path, watcher, watcherType, local, c1, null);
Assert.assertTrue("Didn't succeeds removeWatch operation",
c1.matches());
if (KeeperException.Code.OK.intValue() != c1.rc) {
KeeperException ke = KeeperException
.create(KeeperException.Code.get(c1.rc));
throw ke;
}
} else {
zk.removeWatches(path, watcher, watcherType, local);
}
}
private void removeAllWatches(ZooKeeper zk, String path,
WatcherType watcherType, boolean local, KeeperException.Code rc)
throws InterruptedException, KeeperException {
LOG.info("Sending removeWatches req using zk {} path: {} type: {} ",
new Object[] { zk, path, watcherType });
if (useAsync) {
MyCallback c1 = new MyCallback(rc.intValue(), path);
zk.removeAllWatches(path, watcherType, local, c1, null);
Assert.assertTrue("Didn't succeeds removeWatch operation",
c1.matches());
if (KeeperException.Code.OK.intValue() != c1.rc) {
KeeperException ke = KeeperException
.create(KeeperException.Code.get(c1.rc));
throw ke;
}
} else {
zk.removeAllWatches(path, watcherType, local);
}
}
/**
* Test verifies removal of single watcher when there is server connection
*/
@Test(timeout = 90000)
public void testRemoveSingleWatcher() throws Exception {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zk1.create("/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
MyWatcher w1 = new MyWatcher("/node1", 1);
LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w1));
MyWatcher w2 = new MyWatcher("/node2", 1);
LOG.info("Adding data watcher {} on path {}", new Object[] { w2,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node2", w2));
removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK);
Assert.assertEquals("Didn't find data watcher", 1,
zk2.getDataWatches().size());
Assert.assertEquals("Didn't find data watcher", "/node2",
zk2.getDataWatches().get(0));
removeWatches(zk2, "/node2", w2, WatcherType.Any, false, Code.OK);
Assert.assertTrue("Didn't remove data watcher", w2.matches());
// closing session should remove ephemeral nodes and trigger data
// watches if any
if (zk1 != null) {
zk1.close();
zk1 = null;
}
List<EventType> events = w1.getEventsAfterWatchRemoval();
Assert.assertFalse(
"Shouldn't get NodeDeletedEvent after watch removal",
events.contains(EventType.NodeDeleted));
Assert.assertEquals(
"Shouldn't get NodeDeletedEvent after watch removal", 0,
events.size());
}
/**
* Test verifies removal of multiple data watchers when there is server
* connection
*/
@Test(timeout = 90000)
public void testMultipleDataWatchers() throws IOException,
InterruptedException, KeeperException {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
MyWatcher w1 = new MyWatcher("/node1", 1);
LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w1));
MyWatcher w2 = new MyWatcher("/node1", 1);
LOG.info("Adding data watcher {} on path {}", new Object[] { w2,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w2));
removeWatches(zk2, "/node1", w2, WatcherType.Data, false, Code.OK);
Assert.assertEquals("Didn't find data watcher", 1,
zk2.getDataWatches().size());
Assert.assertEquals("Didn't find data watcher", "/node1",
zk2.getDataWatches().get(0));
removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK);
Assert.assertTrue("Didn't remove data watcher", w2.matches());
// closing session should remove ephemeral nodes and trigger data
// watches if any
if (zk1 != null) {
zk1.close();
zk1 = null;
}
List<EventType> events = w2.getEventsAfterWatchRemoval();
Assert.assertEquals(
"Shouldn't get NodeDeletedEvent after watch removal", 0,
events.size());
}
/**
* Test verifies removal of multiple child watchers when there is server
* connection
*/
@Test(timeout = 90000)
public void testMultipleChildWatchers() throws IOException,
InterruptedException, KeeperException {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
MyWatcher w1 = new MyWatcher("/node1", 1);
LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
"/node1" });
zk2.getChildren("/node1", w1);
MyWatcher w2 = new MyWatcher("/node1", 1);
LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
"/node1" });
zk2.getChildren("/node1", w2);
removeWatches(zk2, "/node1", w2, WatcherType.Children, false, Code.OK);
Assert.assertTrue("Didn't remove child watcher", w2.matches());
Assert.assertEquals("Didn't find child watcher", 1, zk2
.getChildWatches().size());
removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK);
Assert.assertTrue("Didn't remove child watcher", w1.matches());
// create child to see NodeChildren notification
zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
// waiting for child watchers to be notified
int count = 30;
while (count > 0) {
if (w1.getEventsAfterWatchRemoval().size() > 0) {
break;
}
count--;
Thread.sleep(100);
}
// watcher2
List<EventType> events = w2.getEventsAfterWatchRemoval();
Assert.assertEquals("Shouldn't get NodeChildrenChanged event", 0,
events.size());
}
/**
* Test verifies null watcher with WatcherType.Any - remove all the watchers
* data, child, exists
*/
@Test(timeout = 90000)
public void testRemoveAllWatchers() throws IOException,
InterruptedException, KeeperException {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
MyWatcher w1 = new MyWatcher("/node1", 2);
MyWatcher w2 = new MyWatcher("/node1", 2);
LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w1));
LOG.info("Adding data watcher {} on path {}", new Object[] { w2,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w2));
LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
"/node1" });
zk2.getChildren("/node1", w1);
LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
"/node1" });
zk2.getChildren("/node1", w2);
removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK);
removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK);
zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
Assert.assertTrue("Didn't remove data watcher", w1.matches());
Assert.assertTrue("Didn't remove child watcher", w2.matches());
}
/**
* Test verifies null watcher with WatcherType.Data - remove all data
* watchers. Child watchers shouldn't be removed
*/
@Test(timeout = 90000)
public void testRemoveAllDataWatchers() throws IOException,
InterruptedException, KeeperException {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
MyWatcher w1 = new MyWatcher("/node1", 1);
MyWatcher w2 = new MyWatcher("/node1", 1);
LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w1));
LOG.info("Adding data watcher {} on path {}", new Object[] { w2,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w2));
LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
"/node1" });
zk2.getChildren("/node1", w1);
LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
"/node1" });
zk2.getChildren("/node1", w2);
removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK);
removeWatches(zk2, "/node1", w2, WatcherType.Data, false, Code.OK);
zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
Assert.assertTrue("Didn't remove data watcher", w1.matches());
Assert.assertTrue("Didn't remove data watcher", w2.matches());
// waiting for child watchers to be notified
int count = 10;
while (count > 0) {
if (w1.getEventsAfterWatchRemoval().size() > 0
&& w2.getEventsAfterWatchRemoval().size() > 0) {
break;
}
count--;
Thread.sleep(1000);
}
// watcher1
List<EventType> events = w1.getEventsAfterWatchRemoval();
Assert.assertEquals("Didn't get NodeChildrenChanged event", 1,
events.size());
Assert.assertTrue("Didn't get NodeChildrenChanged event",
events.contains(EventType.NodeChildrenChanged));
// watcher2
events = w2.getEventsAfterWatchRemoval();
Assert.assertEquals("Didn't get NodeChildrenChanged event", 1,
events.size());
Assert.assertTrue("Didn't get NodeChildrenChanged event",
events.contains(EventType.NodeChildrenChanged));
}
/**
* Test verifies null watcher with WatcherType.Children - remove all child
* watchers. Data watchers shouldn't be removed
*/
@Test(timeout = 90000)
public void testRemoveAllChildWatchers() throws IOException,
InterruptedException, KeeperException {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
MyWatcher w1 = new MyWatcher("/node1", 1);
MyWatcher w2 = new MyWatcher("/node1", 1);
LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w1));
LOG.info("Adding data watcher {} on path {}", new Object[] { w2,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w2));
LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
"/node1" });
zk2.getChildren("/node1", w1);
LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
"/node1" });
zk2.getChildren("/node1", w2);
removeWatches(zk2, "/node1", w1, WatcherType.Children, false, Code.OK);
removeWatches(zk2, "/node1", w2, WatcherType.Children, false, Code.OK);
zk1.setData("/node1", "test".getBytes(), -1);
Assert.assertTrue("Didn't remove child watcher", w1.matches());
Assert.assertTrue("Didn't remove child watcher", w2.matches());
// waiting for child watchers to be notified
int count = 10;
while (count > 0) {
if (w1.getEventsAfterWatchRemoval().size() > 0
&& w2.getEventsAfterWatchRemoval().size() > 0) {
break;
}
count--;
Thread.sleep(1000);
}
// watcher1
List<EventType> events = w1.getEventsAfterWatchRemoval();
Assert.assertEquals("Didn't get NodeDataChanged event", 1,
events.size());
Assert.assertTrue("Didn't get NodeDataChanged event",
events.contains(EventType.NodeDataChanged));
// watcher2
events = w2.getEventsAfterWatchRemoval();
Assert.assertEquals("Didn't get NodeDataChanged event", 1,
events.size());
Assert.assertTrue("Didn't get NodeDataChanged event",
events.contains(EventType.NodeDataChanged));
}
/**
* Test verifies given watcher doesn't exists!
*/
@Test(timeout = 90000)
public void testNoWatcherException() throws IOException,
InterruptedException, KeeperException {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
MyWatcher w1 = new MyWatcher("/node1", 2);
MyWatcher w2 = new MyWatcher("/node1", 2);
LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w1));
LOG.info("Adding data watcher {} on path {}", new Object[] { w2,
"/node1" });
Assert.assertNull("Didn't set data watches", zk2.exists("/node2", w2));
LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
"/node1" });
zk2.getChildren("/node1", w1);
LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
"/node1" });
zk2.getChildren("/node1", w2);
// New Watcher which will be used for removal
MyWatcher w3 = new MyWatcher("/node1", 2);
try {
removeWatches(zk2, "/node1", w3, WatcherType.Any, false,
Code.NOWATCHER);
Assert.fail("Should throw exception as given watcher doesn't exists");
} catch (KeeperException.NoWatcherException nwe) {
// expected
}
try {
removeWatches(zk2, "/node1", w3, WatcherType.Children, false,
Code.NOWATCHER);
Assert.fail("Should throw exception as given watcher doesn't exists");
} catch (KeeperException.NoWatcherException nwe) {
// expected
}
try {
removeWatches(zk2, "/node1", w3, WatcherType.Data, false,
Code.NOWATCHER);
Assert.fail("Should throw exception as given watcher doesn't exists");
} catch (KeeperException.NoWatcherException nwe) {
// expected
}
try {
removeWatches(zk2, "/nonexists", w3, WatcherType.Data, false,
Code.NOWATCHER);
Assert.fail("Should throw exception as given watcher doesn't exists");
} catch (KeeperException.NoWatcherException nwe) {
// expected
}
}
/**
* Test verifies WatcherType.Any - removes only the configured data watcher
* function
*/
@Test(timeout = 90000)
public void testRemoveAnyDataWatcher() throws Exception {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
MyWatcher w1 = new MyWatcher("/node1", 1);
MyWatcher w2 = new MyWatcher("/node1", 2);
// Add multiple data watches
LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w1));
LOG.info("Adding data watcher {} on path {}", new Object[] { w2,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w2));
// Add child watch
LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
"/node1" });
zk2.getChildren("/node1", w2);
removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK);
Assert.assertTrue("Didn't remove data watcher", w1.matches());
Assert.assertEquals("Didn't find child watcher", 1, zk2
.getChildWatches().size());
Assert.assertEquals("Didn't find data watcher", 1, zk2
.getDataWatches().size());
removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK);
Assert.assertTrue("Didn't remove child watcher", w2.matches());
}
/**
* Test verifies WatcherType.Any - removes only the configured child watcher
* function
*/
@Test(timeout = 90000)
public void testRemoveAnyChildWatcher() throws Exception {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
MyWatcher w1 = new MyWatcher("/node1", 2);
MyWatcher w2 = new MyWatcher("/node1", 1);
LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w1));
// Add multiple child watches
LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
"/node1" });
zk2.getChildren("/node1", w2);
LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
"/node1" });
zk2.getChildren("/node1", w1);
removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK);
Assert.assertTrue("Didn't remove child watcher", w2.matches());
Assert.assertEquals("Didn't find child watcher", 1, zk2
.getChildWatches().size());
Assert.assertEquals("Didn't find data watcher", 1, zk2
.getDataWatches().size());
removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK);
Assert.assertTrue("Didn't remove watchers", w1.matches());
}
/**
* Test verifies when there is no server connection. Remove watches when
* local=true, otw should retain it
*/
@Test(timeout = 90000)
public void testRemoveWatcherWhenNoConnection() throws Exception {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
MyWatcher w1 = new MyWatcher("/node1", 2);
MyWatcher w2 = new MyWatcher("/node1", 1);
LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w1));
// Add multiple child watches
LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
"/node1" });
zk2.getChildren("/node1", w1);
LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
"/node1" });
zk2.getChildren("/node1", w2);
stopServer();
removeWatches(zk2, "/node1", w2, WatcherType.Any, true, Code.OK);
Assert.assertTrue("Didn't remove child watcher", w2.matches());
Assert.assertFalse("Shouldn't remove data watcher", w1.matches());
try {
removeWatches(zk2, "/node1", w1, WatcherType.Any, false,
Code.CONNECTIONLOSS);
Assert.fail("Should throw exception as last watch removal requires server connection");
} catch (KeeperException.ConnectionLossException nwe) {
// expected
}
Assert.assertFalse("Shouldn't remove data watcher", w1.matches());
// when local=true, here if connection not available, simply removes
// from local session
removeWatches(zk2, "/node1", w1, WatcherType.Any, true, Code.OK);
Assert.assertTrue("Didn't remove data watcher", w1.matches());
}
/**
* Test verifies many pre-node watchers. Also, verifies internal
* datastructure 'watchManager.existWatches'
*/
@Test(timeout = 90000)
public void testManyPreNodeWatchers() throws Exception {
int count = 50;
List<MyWatcher> wList = new ArrayList<MyWatcher>(count);
MyWatcher w;
String path = "/node";
// Exists watcher
for (int i = 0; i < count; i++) {
final String nodePath = path + i;
w = new MyWatcher(nodePath, 1);
wList.add(w);
LOG.info("Adding pre node watcher {} on path {}", new Object[] { w,
nodePath });
zk1.exists(nodePath, w);
}
Assert.assertEquals("Failed to add watchers!", count, zk1
.getExistWatches().size());
for (int i = 0; i < count; i++) {
final MyWatcher watcher = wList.get(i);
removeWatches(zk1, path + i, watcher, WatcherType.Data, false,
Code.OK);
Assert.assertTrue("Didn't remove data watcher", watcher.matches());
}
Assert.assertEquals("Didn't remove watch references!", 0, zk1
.getExistWatches().size());
}
/**
* Test verifies many child watchers. Also, verifies internal datastructure
* 'watchManager.childWatches'
*/
@Test(timeout = 90000)
public void testManyChildWatchers() throws Exception {
int count = 50;
List<MyWatcher> wList = new ArrayList<MyWatcher>(count);
MyWatcher w;
String path = "/node";
// Child watcher
for (int i = 0; i < count; i++) {
String nodePath = path + i;
zk1.create(nodePath, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
nodePath += "/";
}
for (int i = 0; i < count; i++) {
String nodePath = path + i;
w = new MyWatcher(path + i, 1);
wList.add(w);
LOG.info("Adding child watcher {} on path {}", new Object[] { w,
nodePath });
zk1.getChildren(nodePath, w);
nodePath += "/";
}
Assert.assertEquals("Failed to add watchers!", count, zk1
.getChildWatches().size());
for (int i = 0; i < count; i++) {
final MyWatcher watcher = wList.get(i);
removeWatches(zk1, path + i, watcher, WatcherType.Children, false,
Code.OK);
Assert.assertTrue("Didn't remove child watcher", watcher.matches());
}
Assert.assertEquals("Didn't remove watch references!", 0, zk1
.getChildWatches().size());
}
/**
* Test verifies many data watchers. Also, verifies internal datastructure
* 'watchManager.dataWatches'
*/
@Test(timeout = 90000)
public void testManyDataWatchers() throws Exception {
int count = 50;
List<MyWatcher> wList = new ArrayList<MyWatcher>(count);
MyWatcher w;
String path = "/node";
// Data watcher
for (int i = 0; i < count; i++) {
String nodePath = path + i;
w = new MyWatcher(path + i, 1);
wList.add(w);
zk1.create(nodePath, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
LOG.info("Adding data watcher {} on path {}", new Object[] { w,
nodePath });
zk1.getData(nodePath, w, null);
nodePath += "/";
}
Assert.assertEquals("Failed to add watchers!", count, zk1
.getDataWatches().size());
for (int i = 0; i < count; i++) {
final MyWatcher watcher = wList.get(i);
removeWatches(zk1, path + i, watcher, WatcherType.Data, false,
Code.OK);
Assert.assertTrue("Didn't remove data watcher", watcher.matches());
}
Assert.assertEquals("Didn't remove watch references!", 0, zk1
.getDataWatches().size());
}
/**
* Test verifies removal of many watchers locally when no connection and
* WatcherType#Any. Also, verifies internal watchManager datastructures
*/
@Test(timeout = 90000)
public void testManyWatchersWhenNoConnection() throws Exception {
int count = 3;
List<MyWatcher> wList = new ArrayList<MyWatcher>(count);
MyWatcher w;
String path = "/node";
// Child watcher
for (int i = 0; i < count; i++) {
String nodePath = path + i;
zk1.create(nodePath, null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
nodePath += "/";
}
for (int i = 0; i < count; i++) {
String nodePath = path + i;
w = new MyWatcher(path + i, 2);
wList.add(w);
LOG.info("Adding child watcher {} on path {}", new Object[] { w,
nodePath });
zk1.getChildren(nodePath, w);
nodePath += "/";
}
Assert.assertEquals("Failed to add watchers!", count, zk1
.getChildWatches().size());
// Data watcher
for (int i = 0; i < count; i++) {
String nodePath = path + i;
w = wList.get(i);
LOG.info("Adding data watcher {} on path {}", new Object[] { w,
nodePath });
zk1.getData(nodePath, w, null);
nodePath += "/";
}
Assert.assertEquals("Failed to add watchers!", count, zk1
.getDataWatches().size());
stopServer();
for (int i = 0; i < count; i++) {
final MyWatcher watcher = wList.get(i);
removeWatches(zk1, path + i, watcher, WatcherType.Any, true,
Code.OK);
Assert.assertTrue("Didn't remove watcher", watcher.matches());
}
Assert.assertEquals("Didn't remove watch references!", 0, zk1
.getChildWatches().size());
Assert.assertEquals("Didn't remove watch references!", 0, zk1
.getDataWatches().size());
}
/**
* Test verifies removing watcher having namespace
*/
@Test(timeout = 90000)
public void testChRootRemoveWatcher() throws Exception {
// creating the subtree for chRoot clients.
String chRoot = "/appsX";
zk1.create("/appsX", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
if (zk1 != null) {
zk1.close();
}
if (zk2 != null) {
zk2.close();
}
// Creating chRoot client.
zk1 = createClient(this.hostPort + chRoot);
zk2 = createClient(this.hostPort + chRoot);
LOG.info("Creating child znode /node1 using chRoot client");
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
MyWatcher w1 = new MyWatcher("/node1", 2);
MyWatcher w2 = new MyWatcher("/node1", 1);
LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w1));
// Add multiple child watches
LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
"/node1" });
zk2.getChildren("/node1", w2);
LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
"/node1" });
zk2.getChildren("/node1", w1);
removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.OK);
Assert.assertTrue("Didn't remove child watcher", w1.matches());
Assert.assertEquals("Didn't find child watcher", 1, zk2
.getChildWatches().size());
removeWatches(zk2, "/node1", w2, WatcherType.Any, false, Code.OK);
Assert.assertTrue("Didn't remove child watcher", w2.matches());
}
/**
* Verify that if a given watcher doesn't exist, the server properly
* returns an error code for it.
*
* In our Java client implementation, we check that a given watch exists at
* two points:
*
* 1) before submitting the RemoveWatches request
* 2) after a successful server response, when the watcher needs to be
* removed
*
* Since this can be racy (i.e. a watch can fire while a RemoveWatches
* request is in-flight), we need to verify that the watch was actually
* removed (i.e. from ZKDatabase and DataTree) and return NOWATCHER if
* needed.
*
* Also, other implementations might not do a client side check before
* submitting a RemoveWatches request. If we don't do a server side check,
* we would just return ZOK even if no watch was removed.
*
*/
@Test(timeout = 90000)
public void testNoWatcherServerException()
throws InterruptedException, IOException, TimeoutException {
CountdownWatcher watcher = new CountdownWatcher();
MyZooKeeper zk = new MyZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher);
boolean nw = false;
watcher.waitForConnected(CONNECTION_TIMEOUT);
try {
zk.removeWatches("/nowatchhere", watcher, WatcherType.Data, false);
} catch (KeeperException nwe) {
if (nwe.code().intValue() == Code.NOWATCHER.intValue()) {
nw = true;
}
}
Assert.assertTrue("Server didn't return NOWATCHER",
zk.getRemoveWatchesRC() == Code.NOWATCHER.intValue());
Assert.assertTrue("NoWatcherException didn't happen", nw);
}
/**
* Test verifies given watcher doesn't exists!
*/
@Test(timeout = 90000)
public void testRemoveAllNoWatcherException() throws IOException,
InterruptedException, KeeperException {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
try {
removeAllWatches(zk2, "/node1", WatcherType.Any, false,
Code.NOWATCHER);
Assert.fail("Should throw exception as given watcher doesn't exists");
} catch (KeeperException.NoWatcherException nwe) {
// expected
}
}
/**
* Test verifies null watcher
*/
@Test(timeout = 30000)
public void testNullWatcherReference() throws Exception {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
try {
if (useAsync) {
zk1.removeWatches("/node1", null, WatcherType.Data, false,
null, null);
} else {
zk1.removeWatches("/node1", null, WatcherType.Data, false);
}
Assert.fail("Must throw IllegalArgumentException as watcher is null!");
} catch (IllegalArgumentException iae) {
// expected
}
}
/**
* Test verifies WatcherType.Data - removes only the configured data watcher
* function
*/
@Test(timeout = 90000)
public void testRemoveWhenMultipleDataWatchesOnAPath() throws Exception {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
final CountDownLatch dataWatchCount = new CountDownLatch(1);
final CountDownLatch rmWatchCount = new CountDownLatch(1);
Watcher w1 = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.DataWatchRemoved) {
rmWatchCount.countDown();
}
}
};
Watcher w2 = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeDataChanged) {
dataWatchCount.countDown();
}
}
};
// Add multiple data watches
LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w1));
LOG.info("Adding data watcher {} on path {}", new Object[] { w2,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w2));
removeWatches(zk2, "/node1", w1, WatcherType.Data, false, Code.OK);
Assert.assertTrue("Didn't remove data watcher",
rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
zk1.setData("/node1", "test".getBytes(), -1);
LOG.info("Waiting for data watchers to be notified");
Assert.assertTrue("Didn't get data watch notification!",
dataWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
}
/**
* Test verifies WatcherType.Children - removes only the configured child
* watcher function
*/
@Test(timeout = 90000)
public void testRemoveWhenMultipleChildWatchesOnAPath() throws Exception {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
final CountDownLatch childWatchCount = new CountDownLatch(1);
final CountDownLatch rmWatchCount = new CountDownLatch(1);
Watcher w1 = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.ChildWatchRemoved) {
rmWatchCount.countDown();
}
}
};
Watcher w2 = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeChildrenChanged) {
childWatchCount.countDown();
}
}
};
// Add multiple child watches
LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertEquals("Didn't set child watches", 0,
zk2.getChildren("/node1", w1).size());
LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
"/node1" });
Assert.assertEquals("Didn't set child watches", 0,
zk2.getChildren("/node1", w2).size());
removeWatches(zk2, "/node1", w1, WatcherType.Children, false, Code.OK);
Assert.assertTrue("Didn't remove child watcher",
rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
LOG.info("Waiting for child watchers to be notified");
Assert.assertTrue("Didn't get child watch notification!",
childWatchCount
.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
}
/**
* Test verifies WatcherType.Data - removes only the configured data watcher
* function
*/
@Test(timeout = 90000)
public void testRemoveAllDataWatchesOnAPath() throws Exception {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
final CountDownLatch dWatchCount = new CountDownLatch(2);
final CountDownLatch rmWatchCount = new CountDownLatch(2);
Watcher w1 = new Watcher() {
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case DataWatchRemoved:
rmWatchCount.countDown();
break;
case NodeDataChanged:
dWatchCount.countDown();
break;
default:
break;
}
}
};
Watcher w2 = new Watcher() {
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case DataWatchRemoved:
rmWatchCount.countDown();
break;
case NodeDataChanged:
dWatchCount.countDown();
break;
default:
break;
}
}
};
// Add multiple data watches
LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w1));
LOG.info("Adding data watcher {} on path {}", new Object[] { w2,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w2));
Assert.assertTrue("Server session is not a watcher",
isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Data));
removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.OK);
Assert.assertTrue("Didn't remove data watcher",
rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
Assert.assertFalse("Server session is still a watcher after removal",
isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Data));
}
/**
* Test verifies WatcherType.Children - removes only the configured child
* watcher function
*/
@Test(timeout = 90000)
public void testRemoveAllChildWatchesOnAPath() throws Exception {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
final CountDownLatch cWatchCount = new CountDownLatch(2);
final CountDownLatch rmWatchCount = new CountDownLatch(2);
Watcher w1 = new Watcher() {
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case ChildWatchRemoved:
rmWatchCount.countDown();
break;
case NodeChildrenChanged:
cWatchCount.countDown();
break;
default:
break;
}
}
};
Watcher w2 = new Watcher() {
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case ChildWatchRemoved:
rmWatchCount.countDown();
break;
case NodeChildrenChanged:
cWatchCount.countDown();
break;
default:
break;
}
}
};
// Add multiple child watches
LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertEquals("Didn't set child watches", 0,
zk2.getChildren("/node1", w1).size());
LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
"/node1" });
Assert.assertEquals("Didn't set child watches", 0,
zk2.getChildren("/node1", w2).size());
Assert.assertTrue("Server session is not a watcher",
isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Children));
removeAllWatches(zk2, "/node1", WatcherType.Children, false, Code.OK);
Assert.assertTrue("Didn't remove child watcher",
rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
Assert.assertFalse("Server session is still a watcher after removal",
isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Children));
}
/**
* Test verifies WatcherType.Any - removes all the configured child,data
* watcher functions
*/
@Test(timeout = 90000)
public void testRemoveAllWatchesOnAPath() throws Exception {
zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
final CountDownLatch watchCount = new CountDownLatch(2);
final CountDownLatch rmWatchCount = new CountDownLatch(4);
Watcher w1 = new Watcher() {
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case ChildWatchRemoved:
case DataWatchRemoved:
rmWatchCount.countDown();
break;
case NodeChildrenChanged:
case NodeDataChanged:
watchCount.countDown();
break;
default:
break;
}
}
};
Watcher w2 = new Watcher() {
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case ChildWatchRemoved:
case DataWatchRemoved:
rmWatchCount.countDown();
break;
case NodeChildrenChanged:
case NodeDataChanged:
watchCount.countDown();
break;
default:
break;
}
}
};
// Add multiple child watches
LOG.info("Adding child watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertEquals("Didn't set child watches", 0,
zk2.getChildren("/node1", w1).size());
LOG.info("Adding child watcher {} on path {}", new Object[] { w2,
"/node1" });
Assert.assertEquals("Didn't set child watches", 0,
zk2.getChildren("/node1", w2).size());
// Add multiple data watches
LOG.info("Adding data watcher {} on path {}", new Object[] { w1,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w1));
LOG.info("Adding data watcher {} on path {}", new Object[] { w2,
"/node1" });
Assert.assertNotNull("Didn't set data watches",
zk2.exists("/node1", w2));
Assert.assertTrue("Server session is not a watcher",
isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Data));
removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.OK);
Assert.assertTrue("Didn't remove data watcher",
rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS));
Assert.assertFalse("Server session is still a watcher after removal",
isServerSessionWatcher(zk2.getSessionId(), "/node1",
WatcherType.Data));
Assert.assertEquals("Received watch notification after removal!", 2,
watchCount.getCount());
}
/* a mocked ZK class that doesn't do client-side verification
* before/after calling removeWatches */
private class MyZooKeeper extends ZooKeeper {
class MyWatchManager extends ZKWatchManager {
public MyWatchManager(boolean disableAutoWatchReset) {
super(disableAutoWatchReset);
}
public int lastrc;
/* Pretend that any watcher exists */
void containsWatcher(String path, Watcher watcher,
WatcherType watcherType) throws NoWatcherException {
}
/* save the return error code by the server */
protected boolean removeWatches(
Map<String, Set<Watcher>> pathVsWatcher,
Watcher watcher, String path, boolean local, int rc,
Set<Watcher> removedWatchers) throws KeeperException {
lastrc = rc;
return false;
}
}
public MyZooKeeper(String hp, int timeout, Watcher watcher)
throws IOException {
super(hp, timeout, watcher, false);
}
private MyWatchManager myWatchManager;
protected ZKWatchManager defaultWatchManager() {
myWatchManager = new MyWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
return myWatchManager;
}
public int getRemoveWatchesRC() {
return myWatchManager.lastrc;
}
}
private class MyWatcher implements Watcher {
private final String path;
private String eventPath;
private CountDownLatch latch;
private List<EventType> eventsAfterWatchRemoval = new ArrayList<EventType>();
MyWatcher(String path, int count) {
this.path = path;
latch = new CountDownLatch(count);
}
public void process(WatchedEvent event) {
LOG.debug("Event path : {}, eventPath : {}"
+ new Object[] { path, event.getPath() });
this.eventPath = event.getPath();
// notifies watcher removal
if (latch.getCount() == 0) {
if (event.getType() != EventType.None) {
eventsAfterWatchRemoval.add(event.getType());
}
}
if (event.getType() == EventType.ChildWatchRemoved
|| event.getType() == EventType.DataWatchRemoved) {
latch.countDown();
}
}
/**
* Returns true if the watcher was triggered. Try to avoid using this
* method with assertFalse statements. A false return depends on a timed
* out wait on a latch, which makes tests run long.
*
* @return true if the watcher was triggered, false otherwise
* @throws InterruptedException if interrupted while waiting on latch
*/
public boolean matches() throws InterruptedException {
if (!latch.await(CONNECTION_TIMEOUT/5, TimeUnit.MILLISECONDS)) {
LOG.error("Failed waiting to remove the watches");
return false;
}
LOG.debug("Client path : {} eventPath : {}", new Object[] { path,
eventPath });
return path.equals(eventPath);
}
public List<EventType> getEventsAfterWatchRemoval() {
return eventsAfterWatchRemoval;
}
}
private class MyCallback implements AsyncCallback.VoidCallback {
private final String path;
private final int rc;
private String eventPath;
int eventRc;
private CountDownLatch latch = new CountDownLatch(1);
public MyCallback(int rc, String path) {
this.rc = rc;
this.path = path;
}
@Override
public void processResult(int rc, String eventPath, Object ctx) {
System.out.println("latch:" + path + " " + eventPath);
this.eventPath = eventPath;
this.eventRc = rc;
this.latch.countDown();
}
/**
* Returns true if the callback was triggered. Try to avoid using this
* method with assertFalse statements. A false return depends on a timed
* out wait on a latch, which makes tests run long.
*
* @return true if the watcher was triggered, false otherwise
* @throws InterruptedException if interrupted while waiting on latch
*/
public boolean matches() throws InterruptedException {
if (!latch.await(CONNECTION_TIMEOUT/5, TimeUnit.MILLISECONDS)) {
return false;
}
return path.equals(eventPath) && rc == eventRc;
}
}
/**
* Checks if a session is registered with the server as a watcher.
*
* @param long sessionId the session ID to check
* @param path the path to check for watchers
* @param type the type of watcher
* @return true if the client session is a watcher on path for the type
*/
private boolean isServerSessionWatcher(long sessionId, String path,
WatcherType type) {
Set<ServerCnxn> cnxns = new HashSet<>();
CollectionUtils.addAll(cnxns, serverFactory.getConnections().iterator());
for (ServerCnxn cnxn : cnxns) {
if (cnxn.getSessionId() == sessionId) {
return getServer(serverFactory).getZKDatabase().getDataTree()
.containsWatcher(path, type, cnxn);
}
}
return false;
}
}