Use LeaderLatch to determine if oracle exists (#1088)

* Use LeaderLatch to determine if oracle exists

The old implementation of oracleExists checks to see if there are
more than 1 ZNodes under the ZookeeperPath.ORACLE_SERVER path. This
relies on the LeaderLatch implementation and uses a path given to
a Curator recipe, which is advised against by the Curator docs.
See https://cwiki.apache.org/confluence/display/CURATOR/TN7.

The new implementation utilizes LeaderLatch to count the number
of participants to see if there's more than 0.
diff --git a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
index 68ba8ba..c705792 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/client/FluoAdminImpl.java
@@ -335,7 +335,6 @@
       CuratorFramework curator = getAppCurator();
       ObserverUtil.initialize(curator, config);
 
-
       sharedProps.store(baos, "Shared java props");
 
       CuratorUtil.putData(curator, ZookeeperPath.CONFIG_SHARED, baos.toByteArray(),
@@ -505,32 +504,26 @@
   }
 
   public static boolean oracleExists(CuratorFramework curator) {
-    try {
-      return curator.checkExists().forPath(ZookeeperPath.ORACLE_SERVER) != null
-          && !curator.getChildren().forPath(ZookeeperPath.ORACLE_SERVER).isEmpty();
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+    return numOracles(curator) > 0;
   }
 
   public boolean oracleExists() {
     return oracleExists(getAppCurator());
   }
 
-  public int numOracles() {
-    CuratorFramework curator = getAppCurator();
-    if (oracleExists(curator)) {
-      try {
-        LeaderLatch leaderLatch = new LeaderLatch(curator, ZookeeperPath.ORACLE_SERVER);
-        return leaderLatch.getParticipants().size();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    } else {
-      return 0;
+  private static int numOracles(CuratorFramework curator) {
+    try {
+      LeaderLatch leaderLatch = new LeaderLatch(curator, ZookeeperPath.ORACLE_SERVER);
+      return leaderLatch.getParticipants().size();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
     }
   }
 
+  public int numOracles() {
+    return numOracles(getAppCurator());
+  }
+
   public static int numWorkers(CuratorFramework curator) {
     int numWorkers = 0;
     try {
diff --git a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
index 2d58c81..6c34be3 100644
--- a/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
+++ b/modules/integration-tests/src/main/java/org/apache/fluo/integration/client/FluoAdminImplIT.java
@@ -24,6 +24,7 @@
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.fluo.accumulo.util.ColumnConstants;
+import org.apache.fluo.accumulo.util.ZookeeperPath;
 import org.apache.fluo.accumulo.util.ZookeeperUtil;
 import org.apache.fluo.api.client.FluoAdmin;
 import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
@@ -49,6 +50,8 @@
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -286,4 +289,29 @@
       Assert.assertEquals(0, admin.numOracles());
     }
   }
+
+  @Test
+  public void testNumOraclesWithMissingOraclePath() throws Exception {
+    oserver.stop();
+    try (CuratorFramework curator = CuratorUtil.newAppCurator(config);
+        FluoAdminImpl admin = new FluoAdminImpl(config)) {
+      curator.start();
+      oserver.awaitLeaderElection(3, TimeUnit.SECONDS);
+      curator.delete().forPath(ZookeeperPath.ORACLE_SERVER);
+
+      assertEquals(0, admin.numOracles());
+    }
+  }
+
+  @Test
+  public void testOracleExists() throws Exception {
+    try (FluoAdminImpl admin = new FluoAdminImpl(config)) {
+      assertTrue(admin.oracleExists());
+
+      oserver.stop();
+      oserver.awaitLeaderElection(3, TimeUnit.SECONDS);
+
+      assertFalse(admin.oracleExists());
+    }
+  }
 }
diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java
index e0b9cf9..e0bd83f 100644
--- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java
+++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoOutputFormat.java
@@ -43,8 +43,7 @@
   private static String PROPS_CONF_KEY = FluoOutputFormat.class.getName() + ".props";
 
   @Override
-  public void checkOutputSpecs(JobContext arg0) throws IOException, InterruptedException {
-  }
+  public void checkOutputSpecs(JobContext arg0) throws IOException, InterruptedException {}
 
   @Override
   public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)