ZkClient add recursive persist listener implementation (#2506)

ZkClient add recursive persist listener implementation
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index 7941deb..4398e8e 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -56,6 +56,7 @@
 import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.helix.zookeeper.zkclient.exception.ZkException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
@@ -332,15 +333,13 @@
     return true;
   }
 
-  // TODO: add impl and remove UnimplementedException
   @Override
   public boolean subscribeChildChanges(String key, ChildChangeListener listener, boolean skipWatchingNonExistNode) {
-    try {
-      _zkClient.subscribePersistRecursiveWatcher(key, new ChildListenerAdapter(listener));
-    } catch (KeeperException.UnimplementedException e) {
-      LOG.error(e.getLocalizedMessage());
+    if (skipWatchingNonExistNode && exists(key) == null) {
+      return false;
     }
-    return false;
+    _zkClient.subscribePersistRecursiveListener(key, new ChildListenerAdapter(listener));
+    return true;
   }
 
   @Override
@@ -356,11 +355,7 @@
   // TODO: add impl and remove UnimplementedException
   @Override
   public void unsubscribeChildChanges(String key, ChildChangeListener listener) {
-    try{
-    _zkClient.unsubscribePersistRecursiveWatcher(key, new ChildListenerAdapter(listener));
-    } catch (KeeperException.UnimplementedException e) {
-      LOG.error(e.getLocalizedMessage());
-    }
+    _zkClient.unsubscribePersistRecursiveListener(key, new ChildListenerAdapter(listener));
   }
 
   @Override
diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java
index e7d8856..19c1d64 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/adapter/ChildListenerAdapter.java
@@ -39,7 +39,7 @@
   private static ChildChangeListener.ChangeType convertType(Watcher.Event.EventType eventType) {
     switch (eventType) {
       case NodeCreated: return ChildChangeListener.ChangeType.ENTRY_CREATED;
-      case NodeChildrenChanged: return ChildChangeListener.ChangeType.ENTRY_DATA_CHANGE;
+      case NodeDataChanged: return ChildChangeListener.ChangeType.ENTRY_DATA_CHANGE;
       case NodeDeleted: return ChildChangeListener.ChangeType.ENTRY_DELETED;
       default: throw new IllegalArgumentException("EventType " + eventType + " is not supported.");
     }
diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
index cbefd21..69724c9 100644
--- a/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/impl/zk/TestZkMetaClient.java
@@ -19,6 +19,7 @@
  * under the License.
  */
 
+import org.apache.helix.metaclient.api.ChildChangeListener;
 import org.apache.helix.metaclient.api.DataUpdater;
 import org.apache.helix.metaclient.api.MetaClientInterface;
 import org.apache.helix.metaclient.exception.MetaClientException;
@@ -55,6 +56,7 @@
 
   private static final String TRANSACTION_TEST_PARENT_PATH = "/transactionOpTestPath";
   private static final String TEST_INVALID_PATH = "/_invalid/a/b/c";
+  private static final int DEFAULT_LISTENER_WAIT_TIMEOUT = 5000;
 
   private final Object _syncObject = new Object();
 
@@ -316,7 +318,7 @@
         }
       }
       zkMetaClient.set(basePath + "_1", testData, -1);
-      Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+      Assert.assertTrue(countDownLatch.await(DEFAULT_LISTENER_WAIT_TIMEOUT, TimeUnit.MILLISECONDS));
       Assert.assertTrue(dataExpected.get());
     }
   }
@@ -347,7 +349,7 @@
       Assert.assertEquals(watchers.get("persistentWatches").get(0), basePath);
       Assert.assertEquals(watchers.get("childWatches").size(), 0);
       Assert.assertEquals(watchers.get("dataWatches").size(), 0);
-      Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+      Assert.assertTrue(countDownLatch.await(DEFAULT_LISTENER_WAIT_TIMEOUT, TimeUnit.MILLISECONDS));
 
       zkMetaClient.unsubscribeDirectChildChange(basePath, listener);
       // verify that no listener is registered on any path
@@ -362,7 +364,6 @@
   public void testDataChangeListener() throws Exception {
     final String basePath = "/TestZkMetaClient_testDataChangeListener";
     final int count = 200;
-    final int[] get_count = {0};
     try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
       zkMetaClient.connect();
       CountDownLatch countDownLatch = new CountDownLatch(count);
@@ -372,7 +373,6 @@
         public void handleDataChange(String key, Object data, ChangeType changeType)
             throws Exception {
           if(changeType == ENTRY_UPDATE) {
-            get_count[0]++;
             countDownLatch.countDown();
           }
         }
@@ -391,13 +391,84 @@
       for (int i=0; i<200; ++i) {
         zkMetaClient.set(basePath, "data7" + i, -1);
       }
-      Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+      Assert.assertTrue(countDownLatch.await(DEFAULT_LISTENER_WAIT_TIMEOUT, TimeUnit.MILLISECONDS));
 
 
       zkMetaClient.unsubscribeDataChange(basePath, listener);
       // verify that no listener is registered on any path
       watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+      Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
+      Assert.assertEquals(watchers.get("childWatches").size(), 0);
+      Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+
+    }
+  }
+
+  @Test
+  public void testChildChangeListener() throws Exception {
+    final String basePath = "/TestZkMetaClient_testChildChangeListener";
+    final int count = 100;
+    try (ZkMetaClient<String> zkMetaClient = createZkMetaClient()) {
+      zkMetaClient.connect();
+      CountDownLatch countDownLatch = new CountDownLatch(count*4);
+      ChildChangeListener listener = new ChildChangeListener() {
+
+        @Override
+        public void handleChildChange(String changedPath, ChangeType changeType) throws Exception {
+          countDownLatch.countDown();
+
+        }
+      };
+      zkMetaClient.create(basePath, "");
+      Assert.assertTrue(
+          zkMetaClient.subscribeChildChanges(basePath, listener, false)
+      );
+
+      DataChangeListener dummyDataListener = new DataChangeListener() {
+        @Override
+        public void handleDataChange(String key, Object data, ChangeType changeType)
+            throws Exception {
+        }
+      };
+      try {
+        zkMetaClient.subscribeDataChange(basePath, dummyDataListener, false);
+        Assert.fail("subscribeDataChange should throw exception");
+      } catch (UnsupportedOperationException ex) {
+        // we are expecting a UnsupportedOperationException, continue with test.
+      }
+
+      DirectChildChangeListener dummyCldListener = new DirectChildChangeListener() {
+        @Override
+        public void handleDirectChildChange(String key) throws Exception {
+
+        }
+      };
+      try {
+        zkMetaClient.subscribeDirectChildChange(basePath, dummyCldListener, false);
+      } catch ( Exception ex) {
+        Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException");
+      }
+
+      // Verify no one time watcher is registered. Only one persist listener is registered.
+      Map<String, List<String>> watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+      Assert.assertEquals(watchers.get("persistentRecursiveWatches").size(), 1);
+      Assert.assertEquals(watchers.get("persistentRecursiveWatches").get(0), basePath);
+      Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
+      Assert.assertEquals(watchers.get("childWatches").size(), 0);
+      Assert.assertEquals(watchers.get("dataWatches").size(), 0);
+
+      for (int i=0; i<count; ++i) {
+        zkMetaClient.set(basePath, "data7" + i, -1);
+        zkMetaClient.create(basePath+"/c1_" +i , "datat");
+        zkMetaClient.create(basePath+"/c1_" +i + "/c2", "datat");
+        zkMetaClient.delete(basePath+"/c1_" +i + "/c2");
+      }
+      Assert.assertTrue(countDownLatch.await(5000, TimeUnit.MILLISECONDS));
+
+      zkMetaClient.unsubscribeChildChanges(basePath, listener);
+      // verify that no listener is registered on any path
       watchers = TestUtil.getZkWatch(zkMetaClient.getZkClient());
+      Assert.assertEquals(watchers.get("persistentRecursiveWatches").size(), 0);
       Assert.assertEquals(watchers.get("persistentWatches").size(), 0);
       Assert.assertEquals(watchers.get("childWatches").size(), 0);
       Assert.assertEquals(watchers.get("dataWatches").size(), 0);
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 13dd510..064f6b4 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -343,17 +343,54 @@
    * Subscribe RecursivePersistListener for a particular path. User can only subscribe when
    * `_usePersistWatcher` is set to true and there is no pre-existing watcher on the path.
    */
-  // TODO: Add impl and remove exception
-  public boolean subscribePersistRecursiveWatcher(String path,
-      RecursivePersistListener recursivePersistListener)
-      throws KeeperException.UnimplementedException {
-    throw new KeeperException.UnimplementedException();
+  public void subscribePersistRecursiveListener(String path,
+      RecursivePersistListener recursivePersistListener) {
+    if (!_usePersistWatcher) {
+      throw new UnsupportedOperationException(
+          "Can not subscribe PersistRecursiveWatcher. Persist listener is not enabled.");
+    }
+
+    ManipulateListener addListener = () -> {
+      if (hasChildOrDataListeners(path)) {
+        throw new UnsupportedOperationException(
+            "Can not subscribe PersistRecursiveWatcher. There is an existing listener on " + path);
+      }
+      // subscribe a PERSISTENT_RECURSIVE listener on path. It throws exception if not successful
+      retryUntilConnected(() -> {
+        getConnection().addWatch(path, ZkClient.this, AddWatchMode.PERSISTENT_RECURSIVE);
+        return null;
+      });
+
+      _zkPathRecursiveWatcherTrie.addRecursiveListener(path, recursivePersistListener);
+    };
+
+    executeWithInPersistListenerMutex(addListener);
   }
 
-  public boolean unsubscribePersistRecursiveWatcher(String path,
-      RecursivePersistListener recursivePersistListener)
-      throws KeeperException.UnimplementedException {
-    throw new KeeperException.UnimplementedException();
+  public void unsubscribePersistRecursiveListener(String path,
+      RecursivePersistListener recursivePersistListener) {
+    if (!_usePersistWatcher) {
+      throw new UnsupportedOperationException(
+          "Can not subscribe PersistRecursiveWatcher. Persist listener is not enabled.");
+    }
+
+    // unsubscribe from ZK if this is the only recursive persist listener on this path.
+    ManipulateListener removeListeners = () -> {
+      _zkPathRecursiveWatcherTrie.removeRecursiveListener(path, recursivePersistListener);
+      if (_zkPathRecursiveWatcherTrie.hasListenerOnPath(path)) {
+        return;
+      }
+      try {
+        // We are not checking if there is a persist listener registered on the path because
+        // we do not allow subscribe a persist listener on the same path of persist recursive
+        // listener as of now.
+        getConnection().removeWatches(path, this, WatcherType.Any);
+      } catch (KeeperException.NoWatcherException e) {
+        LOG.warn("Persist watcher is already removed on path: {}", path);
+      }
+    };
+    executeWithInPersistListenerMutex(removeListeners);
+
   }
 
   private boolean isPrefetchEnabled(IZkDataListener dataListener) {
@@ -1372,7 +1409,7 @@
    * are deleted before the last page is fetched. The upstream caller should be able to handle this.
    */
   public List<String> getChildren(String path) {
-    return getChildren(path, (!_usePersistWatcher) && hasListeners(path));
+    return getChildren(path, (!_usePersistWatcher) && hasChildOrDataListeners(path));
   }
 
   protected List<String> getChildren(final String path, final boolean watch) {
@@ -1437,7 +1474,7 @@
   }
 
   public boolean exists(final String path) {
-    return exists(path, hasListeners(path));
+    return exists(path, hasChildOrDataListeners(path));
   }
 
   protected boolean exists(final String path, final boolean watch) {
@@ -1724,7 +1761,7 @@
     }
   }
 
-  private boolean hasListeners(String path) {
+  private boolean hasChildOrDataListeners(String path) {
     Set<IZkDataListenerEntry> dataListeners = _dataListener.get(path);
     if (dataListeners != null && dataListeners.size() > 0) {
       return true;
@@ -1807,6 +1844,23 @@
         fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime),
             pathExists, event.getType());
       }
+
+      // fire change event for persist recursive listener
+      if (_usePersistWatcher) {
+        Set<RecursivePersistListener> recListeners =
+            _zkPathRecursiveWatcherTrie.getAllRecursiveListeners(path);
+        if (!recListeners.isEmpty()) {
+          for (final RecursivePersistListener listener : recListeners) {
+            _eventThread.send(
+                new ZkEventThread.ZkEvent("Data of " + path + " changed sent to " + listener) {
+                  @Override
+                  public void run() throws Exception {
+                    listener.handleZNodeChange(path, event.getType());
+                  }
+                });
+          }
+        }
+      }
     }
   }
 
@@ -1873,7 +1927,7 @@
           public void run() throws Exception {
             if (!pathStatRecord.pathChecked()) {
               Stat stat = null;
-              if (_usePersistWatcher || !pathExists || !hasListeners(path)) {
+              if (_usePersistWatcher || !pathExists || !hasChildOrDataListeners(path)) {
                 // will not install listener using exists call
                 stat = getStat(path, false);
               } else {
@@ -2178,7 +2232,7 @@
 
   @SuppressWarnings("unchecked")
   public <T extends Object> T readData(String path, Stat stat) {
-    return (T) readData(path, stat, hasListeners(path));
+    return (T) readData(path, stat, hasChildOrDataListeners(path));
   }
 
   @SuppressWarnings("unchecked")
@@ -3030,8 +3084,16 @@
     void run() throws KeeperException, InterruptedException;
   }
 
+  // Add a persist listener on the path.
+  // Throws UnsupportedOperationException if there is already a recursive persist listener on the
+  // path because it will overwrite that recursive persist listener.
   private void addPersistListener(String path, Object listener) {
     ManipulateListener addListeners = () -> {
+      if (_zkPathRecursiveWatcherTrie.hasListenerOnPath(path)) {
+        throw new UnsupportedOperationException(
+            "Can not subscribe PersistListener when there is an recursive listener on path: "
+                + path);
+      }
       if (listener instanceof IZkChildListener) {
         addChildListener(path, (IZkChildListener) listener);
       } else if (listener instanceof IZkDataListener) {
@@ -3044,6 +3106,7 @@
 
   // TODO: Consider create an empty interface and let the two listeners interface extend that
   // interface for code clean.
+  // This function removes persist child or data listener.
   private void removePersistListener(String path, Object listener) {
 
     ManipulateListener removeListeners = () -> {
@@ -3053,8 +3116,9 @@
         } else if (listener instanceof IZkDataListener) {
           removeDataListener(path, (IZkDataListener) listener);
         }
-        if (!hasListeners(path)) {
-          // TODO: update hasListeners logic when recursive persist listener is added
+        if (!hasChildOrDataListeners(path)) {
+          // This will also remove persist recursive watcher on ZK. However, there should not be an
+          // persist recursive watcher installed in the first place.
           getConnection().removeWatches(path, this, WatcherType.Any);
         }
       } catch (KeeperException.NoWatcherException e) {
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
index 9adcb3a..41a9dd1 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/util/ZkPathRecursiveWatcherTrie.java
@@ -164,7 +164,6 @@
         result.addAll(cur.getRecursiveListeners());
       }
     }
-
     return result;
   }
 
@@ -225,6 +224,28 @@
   }
 
   /**
+   * Return if there is listener on a particular path
+   * @param path
+   * @return
+   */
+  public boolean hasListenerOnPath(String path) {
+    Objects.requireNonNull(path, "Path cannot be null");
+
+    final List<String> pathComponents = split(path);
+    TrieNode cur;
+    synchronized (this) {
+      cur = _rootNode;
+      for (final String element : pathComponents) {
+        cur = cur.getChild(element);
+        if (cur == null) {
+          break;
+        }
+      }
+    }
+    return cur != null && !cur.getRecursiveListeners().isEmpty();
+  }
+
+  /**
    * Clear all nodes in the trie.
    */
   public synchronized void clear() {
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
index b841bcd..76f5352 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/zkclient/TestZkClientPersistWatcher.java
@@ -116,39 +116,62 @@
     org.apache.helix.zookeeper.impl.client.ZkClient.Builder builder =
         new org.apache.helix.zookeeper.impl.client.ZkClient.Builder();
     builder.setZkServer(ZkTestBase.ZK_ADDR).setMonitorRootPathOnly(false)
-        .setUsePersistWatcher(false);
+        .setUsePersistWatcher(true);
     org.apache.helix.zookeeper.impl.client.ZkClient zkClient = builder.build();
     zkClient.setZkSerializer(new BasicZkSerializer(new SerializableSerializer()));
     int count = 100;
     final AtomicInteger[] event_count = {new AtomicInteger(0)};
     final AtomicInteger[] event_count2 = {new AtomicInteger(0)};
-    CountDownLatch countDownLatch1 = new CountDownLatch(count);
-    CountDownLatch countDownLatch2 = new CountDownLatch(count/2);
-    String path = "/base/testZkClientChildChange";
+    // for each iteration, we will edit a node, create a child, create a grand child, and
+    // delete child. Expect 4 event per iteration. -> total event should be count*4
+    CountDownLatch countDownLatch1 = new CountDownLatch(count*4);
+    CountDownLatch countDownLatch2 = new CountDownLatch(count);
+    String path = "/testZkClientPersistRecursiveChange";
     RecursivePersistListener rcListener = new RecursivePersistListener() {
       @Override
       public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType)
           throws Exception {
         countDownLatch1.countDown();
         event_count[0].incrementAndGet() ;
-        System.out.println("rcListener count " + event_count[0]);
       }
     };
+    zkClient.create(path, "datat", CreateMode.PERSISTENT);
+    zkClient.subscribePersistRecursiveListener(path, rcListener);
+    for (int i=0; i<count; ++i) {
+      zkClient.writeData(path, "data7" + i, -1);
+      zkClient.create(path+"/c1_" +i , "datat", CreateMode.PERSISTENT);
+      zkClient.create(path+"/c1_" +i + "/c2", "datat", CreateMode.PERSISTENT);
+      zkClient.delete(path+"/c1_" +i + "/c2");
+    }
+    Assert.assertTrue(countDownLatch1.await(50000000, TimeUnit.MILLISECONDS));
+
+    // subscribe a persist child watch, it should throw exception
     IZkChildListener childListener2 = new IZkChildListener() {
       @Override
       public void handleChildChange(String parentPath, List<String> currentChilds)
           throws Exception {
         countDownLatch2.countDown();
         event_count2[0].incrementAndGet();
-        System.out.println("childListener2 count " + event_count2[0]);
       }
     };
     try {
-      zkClient.subscribePersistRecursiveWatcher(path, rcListener);
-    } catch (KeeperException.UnimplementedException e) {
-      e.printStackTrace();
+      zkClient.subscribeChildChanges(path, childListener2, false);
+    } catch ( Exception ex) {
+      Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException");
     }
 
+    // unsubscribe recursive persist watcher, and subscribe persist watcher should success.
+    zkClient.unsubscribePersistRecursiveListener(path, rcListener);
+    zkClient.subscribeChildChanges(path, childListener2, false);
+    // we should only get 100 event since only 100 direct child change.
+    for (int i=0; i<count; ++i) {
+      zkClient.writeData(path, "data7" + i, -1);
+      zkClient.create(path+"/c2_" +i , "datat", CreateMode.PERSISTENT);
+      zkClient.create(path+"/c2_" +i + "/c3", "datat", CreateMode.PERSISTENT);
+      zkClient.delete(path+"/c2_" +i + "/c3");
+    }
+    Assert.assertTrue(countDownLatch2.await(50000000, TimeUnit.MILLISECONDS));
+
     zkClient.close();
   }