[LIVY-732][SERVER] A common zookeeper wrapper utility

## What changes were proposed in this pull request?

Currently, the utilities of zookeeper mixed with ZooKeeperStateStore. To use the utility of zookeeper, the instance of ZooKeeperStateStore has to be created , which looks weird.

This PR aims to achieve the follow target:

1.  Extract the utilities of zookeeper from ZooKeeperStateStore to support such as distributed lock, service discovery and so on.

## How was this patch tested?

Existed UT and IT.

Author: runzhiwang <runzhiwang@tencent.com>

Closes #267 from runzhiwang/zk-wrapper-utility.
diff --git a/conf/livy.conf.template b/conf/livy.conf.template
index 1fe6047..456bec7 100644
--- a/conf/livy.conf.template
+++ b/conf/livy.conf.template
@@ -107,6 +107,8 @@
 # Must set livy.server.recovery.state-store and livy.server.recovery.state-store.url to
 # configure the state store.
 # livy.server.recovery.mode = off
+# Zookeeper address used for HA and state store. e.g. host1:port1, host2:port2
+# livy.server.zookeeper.url =
 
 # Where Livy should store state to for recovery. Possible values:
 # <empty>: Default. State store disabled.
@@ -117,8 +119,26 @@
 # For filesystem state store, the path of the state store directory. Please don't use a filesystem
 # that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///.
 # For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
+# If livy.server.recovery.state-store is zookeeper, this config is for back-compatibility,
+# so if both this config and livy.server.zookeeper.url exist,
+# livy uses livy.server.zookeeper.url first.
 # livy.server.recovery.state-store.url =
 
+# The policy of curator connecting to zookeeper.
+# For example, m, n means retry m times and the interval of retry is n milliseconds.
+# Please use the new config: livy.server.zk.retry-policy.
+# Keep this config for back-compatibility.
+# If both this config and livy.server.zk.retry-policy exist,
+# livy uses livy.server.zk.retry-policy first.
+# livy.server.recovery.zk-state-store.retry-policy = 5,100
+
+# The policy of curator connecting to zookeeper.
+# For example, m, n means retry m times and the interval of retry is n milliseconds
+# livy.server.zk.retry-policy =
+
+# The dir in zk to store the data about session.
+# livy.server.recovery.zk-state-store.key-prefix = livy
+
 # If Livy can't find the yarn app within this time, consider it lost.
 # livy.server.yarn.app-lookup-timeout = 120s
 # When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 683bccc..1b06fde 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -185,6 +185,10 @@
    * configure the state store.
    */
   val RECOVERY_MODE = Entry("livy.server.recovery.mode", "off")
+
+  // Zookeeper address used for HA and state store. e.g. host1:port1, host2:port2
+  val ZOOKEEPER_URL = Entry("livy.server.zookeeper.url", null)
+
   /**
    * Where Livy should store state to for recovery. Possible values:
    * <empty>: Default. State store disabled.
@@ -196,8 +200,32 @@
    * For filesystem state store, the path of the state store directory. Please don't use a
    * filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///.
    * For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
+   * If livy.server.recovery.state-store is zookeeper, this config is for back-compatibility,
+   * so if both this config and livy.server.zookeeper.url exist,
+   * livy uses livy.server.zookeeper.url first.
    */
-  val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "")
+  val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", null)
+
+  /**
+    * The policy of curator connecting to zookeeper.
+    * For example, m, n means retry m times and the interval of retry is n milliseconds.
+    * Please use the new config: livy.server.zk.retry-policy.
+    * Keep this config for back-compatibility.
+    * If both this config and livy.server.zk.retry-policy exist,
+    * livy uses livy.server.zk.retry-policy first.
+    */
+  val RECOVERY_ZK_STATE_STORE_RETRY_POLICY =
+    Entry("livy.server.recovery.zk-state-store.retry-policy", "5,100")
+
+  /**
+    * The policy of curator connecting to zookeeper.
+    * For example, m, n means retry m times and the interval of retry is n milliseconds
+   */
+  val ZK_RETRY_POLICY = Entry("livy.server.zk.retry-policy", null)
+
+  // The dir in zookeeper to store the data about session.
+  val RECOVERY_ZK_STATE_STORE_KEY_PREFIX =
+    Entry("livy.server.recovery.zk-state-store.key-prefix", "livy")
 
   // Livy will cache the max no of logs specified. 0 means don't cache the logs.
   val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200)
diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala
index b40a20e..3e715bd 100644
--- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala
+++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala
@@ -39,7 +39,7 @@
 import org.apache.livy.server.auth.LdapAuthenticationHandlerImpl
 import org.apache.livy.server.batch.BatchSessionServlet
 import org.apache.livy.server.interactive.InteractiveSessionServlet
-import org.apache.livy.server.recovery.{SessionStore, StateStore}
+import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManager}
 import org.apache.livy.server.ui.UIServlet
 import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
 import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
@@ -60,6 +60,8 @@
   private var accessManager: AccessManager = _
   private var _thriftServerFactory: Option[ThriftServerFactory] = None
 
+  private var zkManager: Option[ZooKeeperManager] = None
+
   private var ugi: UserGroupInformation = _
 
   def start(): Unit = {
@@ -146,7 +148,12 @@
       Future { SparkYarnApp.yarnClient }
     }
 
-    StateStore.init(livyConf)
+    if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") {
+      zkManager = Some(new ZooKeeperManager(livyConf))
+      zkManager.foreach(_.start())
+    }
+
+    StateStore.init(livyConf, zkManager)
     val sessionStore = new SessionStore(livyConf)
     val batchSessionManager = new BatchSessionManager(livyConf, sessionStore)
     val interactiveSessionManager = new InteractiveSessionManager(livyConf, sessionStore)
@@ -323,6 +330,7 @@
     Runtime.getRuntime().addShutdownHook(new Thread("Livy Server Shutdown") {
       override def run(): Unit = {
         info("Shutting down Livy server.")
+        zkManager.foreach(_.stop())
         server.stop()
         _thriftServerFactory.foreach(_.stop())
       }
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
index ff5185b..826a2fb 100644
--- a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
+++ b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
@@ -44,7 +44,8 @@
 
   private val fsUri = {
     val fsPath = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
-    require(!fsPath.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
+    require(fsPath != null && !fsPath.isEmpty,
+      s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
     new URI(fsPath)
   }
 
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala
index a6c3275..2e454db 100644
--- a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala
+++ b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 
 import org.apache.livy.{LivyConf, Logging}
+import org.apache.livy.server.recovery.ZooKeeperManager
 import org.apache.livy.sessions.SessionKindModule
 import org.apache.livy.sessions.SessionManager._
 
@@ -78,11 +79,17 @@
 object StateStore extends Logging {
   private[this] var stateStore: Option[StateStore] = None
 
-  def init(livyConf: LivyConf): Unit = synchronized {
+  def init(livyConf: LivyConf, zkManager: Option[ZooKeeperManager] = None): Unit = synchronized {
     if (stateStore.isEmpty) {
       val fileStateStoreClassTag = pickStateStore(livyConf)
-      stateStore = Option(fileStateStoreClassTag.getDeclaredConstructor(classOf[LivyConf])
-        .newInstance(livyConf).asInstanceOf[StateStore])
+      if (fileStateStoreClassTag == classOf[ZooKeeperStateStore]) {
+        stateStore = Option(fileStateStoreClassTag.
+          getDeclaredConstructor(classOf[LivyConf], classOf[ZooKeeperManager])
+          .newInstance(livyConf, zkManager.get).asInstanceOf[StateStore])
+      } else {
+        stateStore = Option(fileStateStoreClassTag.getDeclaredConstructor(classOf[LivyConf])
+          .newInstance(livyConf).asInstanceOf[StateStore])
+      }
       info(s"Using ${stateStore.get.getClass.getSimpleName} for recovery.")
     }
   }
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala
new file mode 100644
index 0000000..2d49cbb
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.curator.framework.api.UnhandledErrorListener
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.RetryNTimes
+import org.apache.zookeeper.KeeperException.NoNodeException
+
+import org.apache.livy.LivyConf
+import org.apache.livy.Logging
+import org.apache.livy.utils.LivyUncaughtException
+
+class ZooKeeperManager(
+    livyConf: LivyConf,
+    mockCuratorClient: Option[CuratorFramework] = None)
+  extends JsonMapper with Logging {
+
+  def this(livyConf: LivyConf) {
+    this(livyConf, None)
+  }
+
+  private val zkAddress = Option(livyConf.get(LivyConf.ZOOKEEPER_URL)).
+    orElse(Option(livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL))).
+    map(_.trim).orNull
+
+  require(zkAddress != null && !zkAddress.isEmpty,
+    s"Please config ${LivyConf.ZOOKEEPER_URL.key}.")
+
+  private val retryValue = Option(livyConf.get(LivyConf.ZK_RETRY_POLICY)).
+    orElse(Option(livyConf.get(LivyConf.RECOVERY_ZK_STATE_STORE_RETRY_POLICY))).
+    map(_.trim).orNull
+
+  require(retryValue != null && !retryValue.isEmpty,
+    s"Please config ${LivyConf.ZK_RETRY_POLICY.key}.")
+
+  // a regex to match patterns like "m, n" where m and n both are integer values
+  private val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r
+  private[recovery] val retryPolicy = retryValue match {
+    case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt)
+    case _ => throw new IllegalArgumentException(
+      s"contains bad value: $retryValue. " +
+        "Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100")
+  }
+
+  private val curatorClient = mockCuratorClient.getOrElse {
+    CuratorFrameworkFactory.newClient(zkAddress, retryPolicy)
+  }
+
+  curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener {
+    def unhandledError(message: String, e: Throwable): Unit = {
+      error(s"Fatal Zookeeper error: ${message}.", e)
+      throw new LivyUncaughtException(e.getMessage)
+    }
+  })
+
+  def start(): Unit = {
+    curatorClient.start()
+  }
+
+  def stop(): Unit = {
+    curatorClient.close()
+  }
+
+  // TODO Make sure ZK path has proper secure permissions so that other users cannot read its
+  // contents.
+  def set(key: String, value: Object): Unit = {
+    val data = serializeToBytes(value)
+    if (curatorClient.checkExists().forPath(key) == null) {
+      curatorClient.create().creatingParentsIfNeeded().forPath(key, data)
+    } else {
+      curatorClient.setData().forPath(key, data)
+    }
+  }
+
+  def get[T: ClassTag](key: String): Option[T] = {
+    if (curatorClient.checkExists().forPath(key) == null) {
+      None
+    } else {
+      Option(deserialize[T](curatorClient.getData().forPath(key)))
+    }
+  }
+
+  def getChildren(key: String): Seq[String] = {
+    if (curatorClient.checkExists().forPath(key) == null) {
+      Seq.empty[String]
+    } else {
+      curatorClient.getChildren.forPath(key).asScala
+    }
+  }
+
+  def remove(key: String): Unit = {
+    try {
+      curatorClient.delete().guaranteed().forPath(key)
+    } catch {
+      case _: NoNodeException => warn(s"Fail to remove non-existed zookeeper node: ${key}")
+    }
+  }
+}
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala
index 1b93b7a..ceb2258 100644
--- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala
+++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala
@@ -16,103 +16,35 @@
  */
 package org.apache.livy.server.recovery
 
-import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
-import scala.util.Try
 
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.framework.api.UnhandledErrorListener
-import org.apache.curator.retry.RetryNTimes
-import org.apache.zookeeper.KeeperException.NoNodeException
-
-import org.apache.livy.{LivyConf, Logging}
-import org.apache.livy.LivyConf.Entry
-
-object ZooKeeperStateStore {
-  val ZK_KEY_PREFIX_CONF = Entry("livy.server.recovery.zk-state-store.key-prefix", "livy")
-  val ZK_RETRY_CONF = Entry("livy.server.recovery.zk-state-store.retry-policy", "5,100")
-}
+import org.apache.livy.LivyConf
 
 class ZooKeeperStateStore(
     livyConf: LivyConf,
-    mockCuratorClient: Option[CuratorFramework] = None) // For testing
-  extends StateStore(livyConf) with Logging {
+    zkManager: ZooKeeperManager)
+  extends StateStore(livyConf) {
 
-  import ZooKeeperStateStore._
-
-  // Constructor defined for StateStore factory to new this class using reflection.
-  def this(livyConf: LivyConf) {
-    this(livyConf, None)
-  }
-
-  private val zkAddress = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
-  require(!zkAddress.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
-  private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF)
-  private val retryValue = livyConf.get(ZK_RETRY_CONF)
-  // a regex to match patterns like "m, n" where m and n both are integer values
-  private val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r
-  private[recovery] val retryPolicy = retryValue match {
-    case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt)
-    case _ => throw new IllegalArgumentException(
-      s"$ZK_KEY_PREFIX_CONF contains bad value: $retryValue. " +
-        "Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100")
-  }
-
-  private val curatorClient = mockCuratorClient.getOrElse {
-    CuratorFrameworkFactory.newClient(zkAddress, retryPolicy)
-  }
-
-  Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
-    override def run(): Unit = {
-      curatorClient.close()
-    }
-  }))
-
-  curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener {
-    def unhandledError(message: String, e: Throwable): Unit = {
-      error(s"Fatal Zookeeper error. Shutting down Livy server.")
-      System.exit(1)
-    }
-  })
-  curatorClient.start()
-  // TODO Make sure ZK path has proper secure permissions so that other users cannot read its
-  // contents.
+  private val zkKeyPrefix = livyConf.get(LivyConf.RECOVERY_ZK_STATE_STORE_KEY_PREFIX)
+  private def prefixKey(key: String) = s"/$zkKeyPrefix/$key"
 
   override def set(key: String, value: Object): Unit = {
-    val prefixedKey = prefixKey(key)
-    val data = serializeToBytes(value)
-    if (curatorClient.checkExists().forPath(prefixedKey) == null) {
-      curatorClient.create().creatingParentsIfNeeded().forPath(prefixedKey, data)
-    } else {
-      curatorClient.setData().forPath(prefixedKey, data)
-    }
+    zkManager.set(prefixKey(key), value)
   }
 
   override def get[T: ClassTag](key: String): Option[T] = {
-    val prefixedKey = prefixKey(key)
-    if (curatorClient.checkExists().forPath(prefixedKey) == null) {
-      None
-    } else {
-      Option(deserialize[T](curatorClient.getData().forPath(prefixedKey)))
-    }
+    zkManager.get(prefixKey(key))
   }
 
   override def getChildren(key: String): Seq[String] = {
-    val prefixedKey = prefixKey(key)
-    if (curatorClient.checkExists().forPath(prefixedKey) == null) {
-      Seq.empty[String]
-    } else {
-      curatorClient.getChildren.forPath(prefixedKey).asScala
-    }
+    zkManager.getChildren(prefixKey(key))
   }
 
   override def remove(key: String): Unit = {
-    try {
-      curatorClient.delete().guaranteed().forPath(prefixKey(key))
-    } catch {
-      case _: NoNodeException => warn(s"Fail to remove non-existed zookeeper node: ${key}")
-    }
+    zkManager.remove(prefixKey(key))
   }
 
-  private def prefixKey(key: String) = s"/$zkKeyPrefix/$key"
+  def getZooKeeperManager(): ZooKeeperManager = {
+    zkManager
+  }
 }
diff --git a/server/src/main/scala/org/apache/livy/utils/LivyUncaughtException.java b/server/src/main/scala/org/apache/livy/utils/LivyUncaughtException.java
new file mode 100644
index 0000000..89f6523
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/utils/LivyUncaughtException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.utils;
+
+public class LivyUncaughtException extends Exception {
+  public LivyUncaughtException(String remoteStackTrace) {
+    super(remoteStackTrace);
+  }
+}
diff --git a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala
index 88e530f..3ad7912 100644
--- a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala
@@ -34,6 +34,7 @@
   describe("ZooKeeperStateStore") {
     case class TestFixture(stateStore: ZooKeeperStateStore, curatorClient: CuratorFramework)
     val conf = new LivyConf()
+    conf.set(LivyConf.RECOVERY_STATE_STORE, "zookeeper")
     conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host")
     val key = "key"
     val prefixedKey = s"/livy/$key"
@@ -42,7 +43,9 @@
       val curatorClient = mock[CuratorFramework]
       when(curatorClient.getUnhandledErrorListenable())
         .thenReturn(mock[Listenable[UnhandledErrorListener]])
-      val stateStore = new ZooKeeperStateStore(conf, Some(curatorClient))
+      val zkManager = new ZooKeeperManager(conf, Some(curatorClient))
+      zkManager.start()
+      val stateStore = new ZooKeeperStateStore(conf, zkManager)
       testBody(TestFixture(stateStore, curatorClient))
     }
 
@@ -57,11 +60,11 @@
     it("should throw on bad config") {
       withMock { f =>
         val conf = new LivyConf()
-        intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) }
+        intercept[IllegalArgumentException] { new ZooKeeperManager(conf) }
 
         conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host")
-        conf.set(ZooKeeperStateStore.ZK_RETRY_CONF, "bad")
-        intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) }
+        conf.set(LivyConf.ZK_RETRY_POLICY, "bad")
+        intercept[IllegalArgumentException] { new ZooKeeperManager(conf) }
       }
     }
 
@@ -96,12 +99,12 @@
     }
 
     it("get should retrieve retry policy configs") {
-      conf.set(org.apache.livy.server.recovery.ZooKeeperStateStore.ZK_RETRY_CONF, "11,77")
+      conf.set(LivyConf.ZK_RETRY_POLICY, "11,77")
         withMock { f =>
         mockExistsBuilder(f.curatorClient, true)
 
-        f.stateStore.retryPolicy should not be null
-        f.stateStore.retryPolicy.getN shouldBe 11
+        f.stateStore.getZooKeeperManager().retryPolicy should not be null
+        f.stateStore.getZooKeeperManager().retryPolicy.getN shouldBe 11
       }
     }