ZkClient add recursive persist listener implementation (#2506)
ZkClient add recursive persist listener implementation
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 7941deb..4398e8e 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -56,6 +56,7 @@
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
@@ -332,15 +333,13 @@
return true;
}
- // TODO: add impl and remove UnimplementedException
@Override
public boolean subscribeChildChanges(String key, ChildChangeListener listener, boolean skipWatchingNonExistNode) {
- try {
- _zkClient.subscribePersistRecursiveWatcher(key, new ChildListenerAdapter(listener));
- } catch (KeeperException.UnimplementedException e) {
- LOG.error(e.getLocalizedMessage());
+ if (skipWatchingNonExistNode && exists(key) == null) {
+ return false;
}
- return false;
+ _zkClient.subscribePersistRecursiveListener(key, new ChildListenerAdapter(listener));
+ return true;
}
@Override
@@ -356,11 +355,7 @@
// TODO: add impl and remove UnimplementedException
@Override
public void unsubscribeChildChanges(String key, ChildChangeListener listener) {
- try{
- _zkClient.unsubscribePersistRecursiveWatcher(key, new ChildListenerAdapter(listener));
- } catch (KeeperException.UnimplementedException e) {
- LOG.error(e.getLocalizedMessage());
- }
+ _zkClient.unsubscribePersistRecursiveListener(key, new ChildListenerAdapter(listener));
}
@Override
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java
index e7d8856..19c1d64 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java
@@ -39,7 +39,7 @@
private static ChildChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) {
switch (eventType) {
case NodeCreated: return ChildChangeListener.ChangeType.ENTRY_CREATED;
- case NodeChildrenChanged: return ChildChangeListener.ChangeType.ENTRY_DATA_CHANGE;
+ case NodeDataChanged: return ChildChangeListener.ChangeType.ENTRY_DATA_CHANGE;
case NodeDeleted: return ChildChangeListener.ChangeType.ENTRY_DELETED;
default: throw new IllegalArgumentException("EventType " + eventType + " is not supported.");
}
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index cbefd21..69724c9 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -19,6 +19,7 @@
* under the License.
*/
+import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.DataUpdater;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.exception.MetaClientException;
@@ -55,6 +56,7 @@
private static final String TRANSACTION_TEST_PARENT_PATH = "/transactionOpTestPath";
private static final String TEST_INVALID_PATH = "/_invalid/a/b/c";
+ private static final int DEFAULT_LISTENER_WAIT_TIMEOUT = 5000;
private final Object _syncObject = new Object();
@@ -316,7 +318,7 @@
}
}
zkMetaClient.set(basePath + "_1", testData, -1);
- Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(countDownLatch.await(DEFAULT_LISTENER_WAIT_TIMEOUT, TimeUnit.MILLISECONDS));
Assert.assertTrue(dataExpected.get());
}
}
@@ -347,7 +349,7 @@
Assert.assertEquals(watchers.get("persistentWatches").get(0), basePath);
Assert.assertEquals(watchers.get("childWatches").size(), 0);
Assert.assertEquals(watchers.get("dataWatches").size(), 0);
- Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(countDownLatch.await(DEFAULT_LISTENER_WAIT_TIMEOUT, TimeUnit.MILLISECONDS));
zkMetaClient.unsubscribeDirectChildChange(basePath, listener);
// verify that no listener is registered on any path
@@ -362,7 +364,6 @@
public void testDataChangeListener() throws Exception {
final String basePath = "/TestZkMetaClient_testDataChangeListener";
final int count = 200;
- final int[] get_count = {0};
try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
zkMetaClient.connect();
CountDownLatch countDownLatch = new CountDownLatch(count);
@@ -372,7 +373,6 @@
public void handleDataChange(String key, Object data, ChangeType changeType)
throws Exception {
if(changeType == ENTRY_UPDATE) {
- get_count[0]++;
countDownLatch.countDown();
}
}
@@ -391,13 +391,84 @@
for (int i=0; i<200; ++i) {
zkMetaClient.set(basePath, "data7" + i, -1);
}
- Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+ Assert.assertTrue(countDownLatch.await(DEFAULT_LISTENER_WAIT_TIMEOUT, TimeUnit.MILLISECONDS));
zkMetaClient.unsubscribeDataChange(basePath, listener);
// verify that no listener is registered on any path
watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+ Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
+ Assert.assertEquals(watchers.get("childWatches").size(), 0);
+ Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+
+ }
+ }
+
+ @Test
+ public void testChildChangeListener() throws Exception {
+ final String basePath = "/TestZkMetaClient_testChildChangeListener";
+ final int count = 100;
+ try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+ zkMetaClient.connect();
+ CountDownLatch countDownLatch = new CountDownLatch(count*4);
+ ChildChangeListener listener = new ChildChangeListener() {
+
+ @Override
+ public void handleChildChange(String changedPath, ChangeType changeType) throws Exception {
+ countDownLatch.countDown();
+
+ }
+ };
+ zkMetaClient.create(basePath, "");
+ Assert.assertTrue(
+ zkMetaClient.subscribeChildChanges(basePath, listener, false)
+ );
+
+ DataChangeListener dummyDataListener = new DataChangeListener() {
+ @Override
+ public void handleDataChange(String key, Object data, ChangeType changeType)
+ throws Exception {
+ }
+ };
+ try {
+ zkMetaClient.subscribeDataChange(basePath, dummyDataListener, false);
+ Assert.fail("subscribeDataChange should throw exception");
+ } catch (UnsupportedOperationException ex) {
+ // we are expecting a UnsupportedOperationException, continue with test.
+ }
+
+ DirectChildChangeListener dummyCldListener = new DirectChildChangeListener() {
+ @Override
+ public void handleDirectChildChange(String key) throws Exception {
+
+ }
+ };
+ try {
+ zkMetaClient.subscribeDirectChildChange(basePath, dummyCldListener, false);
+ } catch ( Exception ex) {
+ Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException");
+ }
+
+ // Verify no one time watcher is registered. Only one persist listener is registered.
+ Map<String, List<String>> watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+ Assert.assertEquals(watchers.get("persistentRecursiveWatches").size(), 1);
+ Assert.assertEquals(watchers.get("persistentRecursiveWatches").get(0), basePath);
+ Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
+ Assert.assertEquals(watchers.get("childWatches").size(), 0);
+ Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+
+ for (int i=0; i<count; ++i) {
+ zkMetaClient.set(basePath, "data7" + i, -1);
+ zkMetaClient.create(basePath+"/c1_" +i , "datat");
+ zkMetaClient.create(basePath+"/c1_" +i + "/c2", "datat");
+ zkMetaClient.delete(basePath+"/c1_" +i + "/c2");
+ }
+ Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+
+ zkMetaClient.unsubscribeChildChanges(basePath, listener);
+ // verify that no listener is registered on any path
watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+ Assert.assertEquals(watchers.get("persistentRecursiveWatches").size(), 0);
Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
Assert.assertEquals(watchers.get("childWatches").size(), 0);
Assert.assertEquals(watchers.get("dataWatches").size(), 0);
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 13dd510..064f6b4 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -343,17 +343,54 @@
* Subscribe RecursivePersistListener for a particular path. User can only subscribe when
* `_usePersistWatcher` is set to true and there is no pre-existing watcher on the path.
*/
- // TODO: Add impl and remove exception
- public boolean subscribePersistRecursiveWatcher(String path,
- RecursivePersistListener recursivePersistListener)
- throws KeeperException.UnimplementedException {
- throw new KeeperException.UnimplementedException();
+ public void subscribePersistRecursiveListener(String path,
+ RecursivePersistListener recursivePersistListener) {
+ if (!_usePersistWatcher) {
+ throw new UnsupportedOperationException(
+ "Can not subscribe PersistRecursiveWatcher. Persist listener is not enabled.");
+ }
+
+ ManipulateListener addListener = () -> {
+ if (hasChildOrDataListeners(path)) {
+ throw new UnsupportedOperationException(
+ "Can not subscribe PersistRecursiveWatcher. There is an existing listener on " + path);
+ }
+ // subscribe a PERSISTENT_RECURSIVE listener on path. It throws exception if not successful
+ retryUntilConnected(() -> {
+ getConnection().addWatch(path, ZkClient.this, AddWatchMode.PERSISTENT_RECURSIVE);
+ return null;
+ });
+
+ _zkPathRecursiveWatcherTrie.addRecursiveListener(path, recursivePersistListener);
+ };
+
+ executeWithInPersistListenerMutex(addListener);
}
- public boolean unsubscribePersistRecursiveWatcher(String path,
- RecursivePersistListener recursivePersistListener)
- throws KeeperException.UnimplementedException {
- throw new KeeperException.UnimplementedException();
+ public void unsubscribePersistRecursiveListener(String path,
+ RecursivePersistListener recursivePersistListener) {
+ if (!_usePersistWatcher) {
+ throw new UnsupportedOperationException(
+ "Can not subscribe PersistRecursiveWatcher. Persist listener is not enabled.");
+ }
+
+ // unsubscribe from ZK if this is the only recursive persist listener on this path.
+ ManipulateListener removeListeners = () -> {
+ _zkPathRecursiveWatcherTrie.removeRecursiveListener(path, recursivePersistListener);
+ if (_zkPathRecursiveWatcherTrie.hasListenerOnPath(path)) {
+ return;
+ }
+ try {
+ // We are not checking if there is a persist listener registered on the path because
+ // we do not allow subscribe a persist listener on the same path of persist recursive
+ // listener as of now.
+ getConnection().removeWatches(path, this, WatcherType.Any);
+ } catch (KeeperException.NoWatcherException e) {
+ LOG.warn("Persist watcher is already removed on path: {}", path);
+ }
+ };
+ executeWithInPersistListenerMutex(removeListeners);
+
}
private boolean isPrefetchEnabled(IZkDataListener dataListener) {
@@ -1372,7 +1409,7 @@
* are deleted before the last page is fetched. The upstream caller should be able to handle this.
*/
public List<String> getChildren(String path) {
- return getChildren(path, (!_usePersistWatcher) && hasListeners(path));
+ return getChildren(path, (!_usePersistWatcher) && hasChildOrDataListeners(path));
}
protected List<String> getChildren(final String path, final boolean watch) {
@@ -1437,7 +1474,7 @@
}
public boolean exists(final String path) {
- return exists(path, hasListeners(path));
+ return exists(path, hasChildOrDataListeners(path));
}
protected boolean exists(final String path, final boolean watch) {
@@ -1724,7 +1761,7 @@
}
}
- private boolean hasListeners(String path) {
+ private boolean hasChildOrDataListeners(String path) {
Set<IZkDataListenerEntry> dataListeners = _dataListener.get(path);
if (dataListeners != null && dataListeners.size() > 0) {
return true;
@@ -1807,6 +1844,23 @@
fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime),
pathExists, event.getType());
}
+
+ // fire change event for persist recursive listener
+ if (_usePersistWatcher) {
+ Set<RecursivePersistListener> recListeners =
+ _zkPathRecursiveWatcherTrie.getAllRecursiveListeners(path);
+ if (!recListeners.isEmpty()) {
+ for (final RecursivePersistListener listener : recListeners) {
+ _eventThread.send(
+ new ZkEventThread.ZkEvent("Data of " + path + " changed sent to " + listener) {
+ @Override
+ public void run() throws Exception {
+ listener.handleZNodeChange(path, event.getType());
+ }
+ });
+ }
+ }
+ }
}
}
@@ -1873,7 +1927,7 @@
public void run() throws Exception {
if (!pathStatRecord.pathChecked()) {
Stat stat = null;
- if (_usePersistWatcher || !pathExists || !hasListeners(path)) {
+ if (_usePersistWatcher || !pathExists || !hasChildOrDataListeners(path)) {
// will not install listener using exists call
stat = getStat(path, false);
} else {
@@ -2178,7 +2232,7 @@
@SuppressWarnings("unchecked")
public <T extends Object> T readData(String path, Stat stat) {
- return (T) readData(path, stat, hasListeners(path));
+ return (T) readData(path, stat, hasChildOrDataListeners(path));
}
@SuppressWarnings("unchecked")
@@ -3030,8 +3084,16 @@
void run() throws KeeperException, InterruptedException;
}
+ // Add a persist listener on the path.
+ // Throws UnsupportedOperationException if there is already a recursive persist listener on the
+ // path because it will overwrite that recursive persist listener.
private void addPersistListener(String path, Object listener) {
ManipulateListener addListeners = () -> {
+ if (_zkPathRecursiveWatcherTrie.hasListenerOnPath(path)) {
+ throw new UnsupportedOperationException(
+ "Can not subscribe PersistListener when there is an recursive listener on path: "
+ + path);
+ }
if (listener instanceof IZkChildListener) {
addChildListener(path, (IZkChildListener) listener);
} else if (listener instanceof IZkDataListener) {
@@ -3044,6 +3106,7 @@
// TODO: Consider create an empty interface and let the two listeners interface extend that
// interface for code clean.
+ // This function removes persist child or data listener.
private void removePersistListener(String path, Object listener) {
ManipulateListener removeListeners = () -> {
@@ -3053,8 +3116,9 @@
} else if (listener instanceof IZkDataListener) {
removeDataListener(path, (IZkDataListener) listener);
}
- if (!hasListeners(path)) {
- // TODO: update hasListeners logic when recursive persist listener is added
+ if (!hasChildOrDataListeners(path)) {
+ // This will also remove persist recursive watcher on ZK. However, there should not be an
+ // persist recursive watcher installed in the first place.
getConnection().removeWatches(path, this, WatcherType.Any);
}
} catch (KeeperException.NoWatcherException e) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
index 9adcb3a..41a9dd1 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
@@ -164,7 +164,6 @@
result.addAll(cur.getRecursiveListeners());
}
}
-
return result;
}
@@ -225,6 +224,28 @@
}
/**
+ * Return if there is listener on a particular path
+ * @param path
+ * @return
+ */
+ public boolean hasListenerOnPath(String path) {
+ Objects.requireNonNull(path, "Path cannot be null");
+
+ final List<String> pathComponents = split(path);
+ TrieNode cur;
+ synchronized (this) {
+ cur = _rootNode;
+ for (final String element : pathComponents) {
+ cur = cur.getChild(element);
+ if (cur == null) {
+ break;
+ }
+ }
+ }
+ return cur != null && !cur.getRecursiveListeners().isEmpty();
+ }
+
+ /**
* Clear all nodes in the trie.
*/
public synchronized void clear() {
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
index b841bcd..76f5352 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
@@ -116,39 +116,62 @@
org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
- .setUsePersistWatcher(false);
+ .setUsePersistWatcher(true);
org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build();
zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer()));
int count = 100;
final AtomicInteger[] event_count = {new AtomicInteger(0)};
final AtomicInteger[] event_count2 = {new AtomicInteger(0)};
- CountDownLatch countDownLatch1 = new CountDownLatch(count);
- CountDownLatch countDownLatch2 = new CountDownLatch(count/2);
- String path = "/base/testZkClientChildChange";
+ // for each iteration, we will edit a node, create a child, create a grand child, and
+ // delete child. Expect 4 event per iteration. -> total event should be count*4
+ CountDownLatch countDownLatch1 = new CountDownLatch(count*4);
+ CountDownLatch countDownLatch2 = new CountDownLatch(count);
+ String path = "/testZkClientPersistRecursiveChange";
RecursivePersistListener rcListener = new RecursivePersistListener() {
@Override
public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType)
throws Exception {
countDownLatch1.countDown();
event_count[0].incrementAndGet() ;
- System.out.println("rcListener count " + event_count[0]);
}
};
+ zkClient.create(path, "datat", CreateMode.PERSISTENT);
+ zkClient.subscribePersistRecursiveListener(path, rcListener);
+ for (int i=0; i<count; ++i) {
+ zkClient.writeData(path, "data7" + i, -1);
+ zkClient.create(path+"/c1_" +i , "datat", CreateMode.PERSISTENT);
+ zkClient.create(path+"/c1_" +i + "/c2", "datat", CreateMode.PERSISTENT);
+ zkClient.delete(path+"/c1_" +i + "/c2");
+ }
+ Assert.assertTrue(countDownLatch1.await(50000000, TimeUnit.MILLISECONDS));
+
+ // subscribe a persist child watch, it should throw exception
IZkChildListener childListener2 = new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds)
throws Exception {
countDownLatch2.countDown();
event_count2[0].incrementAndGet();
- System.out.println("childListener2 count " + event_count2[0]);
}
};
try {
- zkClient.subscribePersistRecursiveWatcher(path, rcListener);
- } catch (KeeperException.UnimplementedException e) {
- e.printStackTrace();
+ zkClient.subscribeChildChanges(path, childListener2, false);
+ } catch ( Exception ex) {
+ Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException");
}
+ // unsubscribe recursive persist watcher, and subscribe persist watcher should success.
+ zkClient.unsubscribePersistRecursiveListener(path, rcListener);
+ zkClient.subscribeChildChanges(path, childListener2, false);
+ // we should only get 100 event since only 100 direct child change.
+ for (int i=0; i<count; ++i) {
+ zkClient.writeData(path, "data7" + i, -1);
+ zkClient.create(path+"/c2_" +i , "datat", CreateMode.PERSISTENT);
+ zkClient.create(path+"/c2_" +i + "/c3", "datat", CreateMode.PERSISTENT);
+ zkClient.delete(path+"/c2_" +i + "/c3");
+ }
+ Assert.assertTrue(countDownLatch2.await(50000000, TimeUnit.MILLISECONDS));
+
zkClient.close();
}