Remove race condition from FluoAdminImpl.numWorkers (#1090)

The pattern of checking if a ZNode exists and then getting it's
children contains a race condition. If the ZNode is deleted after
the existence check but before getting the children then a
NoNodeException will be thrown instead of returning 0.

Instead we skip the existence check and just return 0 when a
NoNodeException is thrown. This will avoid the race condition.

Fixes #1089
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index c705792..2930838 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -527,13 +527,13 @@
   public static int numWorkers(CuratorFramework curator) {
     int numWorkers = 0;
     try {
-      if (curator.checkExists().forPath(ZookeeperPath.FINDERS) != null) {
-        for (String path : curator.getChildren().forPath(ZookeeperPath.FINDERS)) {
-          if (path.startsWith(PartitionManager.ZK_FINDER_PREFIX)) {
-            numWorkers++;
-          }
+      for (String path : curator.getChildren().forPath(ZookeeperPath.FINDERS)) {
+        if (path.startsWith(PartitionManager.ZK_FINDER_PREFIX)) {
+          numWorkers++;
         }
       }
+    } catch (KeeperException.NoNodeException e) {
+      return 0;
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
index 6c34be3..cea68a2 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
@@ -37,12 +37,14 @@
 import org.apache.fluo.api.config.FluoConfiguration;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.exceptions.FluoException;
+import org.apache.fluo.api.service.FluoWorker;
 import org.apache.fluo.core.client.FluoAdminImpl;
 import org.apache.fluo.core.client.FluoClientImpl;
 import org.apache.fluo.core.impl.Environment;
 import org.apache.fluo.core.oracle.OracleServer;
 import org.apache.fluo.core.util.AccumuloUtil;
 import org.apache.fluo.core.util.CuratorUtil;
+import org.apache.fluo.core.worker.FluoWorkerImpl;
 import org.apache.fluo.integration.ITBaseImpl;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
@@ -314,4 +316,38 @@
       assertFalse(admin.oracleExists());
     }
   }
+
+  @Test
+  public void testNumWorkers() {
+    try (FluoAdminImpl admin = new FluoAdminImpl(config)) {
+      assertEquals(0, admin.numWorkers());
+
+      FluoWorker fluoWorker = new FluoWorkerImpl(config);
+      fluoWorker.start();
+      assertEquals(1, admin.numWorkers());
+
+      FluoWorker fluoWorker2 = new FluoWorkerImpl(config);
+      fluoWorker2.start();
+      assertEquals(2, admin.numWorkers());
+
+      fluoWorker2.stop();
+      assertEquals(1, admin.numWorkers());
+
+      fluoWorker.stop();
+      assertEquals(0, admin.numWorkers());
+    }
+  }
+
+
+  @Test
+  public void testNumWorkersWithMissingWorkerPath() throws Exception {
+    try (CuratorFramework curator = CuratorUtil.newAppCurator(config);
+        FluoAdminImpl admin = new FluoAdminImpl(config)) {
+      curator.start();
+      if (curator.checkExists().forPath(ZookeeperPath.FINDERS) != null) {
+        curator.delete().forPath(ZookeeperPath.FINDERS);
+      }
+      assertEquals(0, admin.numWorkers());
+    }
+  }
 }