[LIVY-732][SERVER] A common zookeeper wrapper utility
## What changes were proposed in this pull request?
Currently, the utilities of zookeeper mixed with ZooKeeperStateStore. To use the utility of zookeeper, the instance of ZooKeeperStateStore has to be created , which looks weird.
This PR aims to achieve the follow target:
1. Extract the utilities of zookeeper from ZooKeeperStateStore to support such as distributed lock, service discovery and so on.
## How was this patch tested?
Existed UT and IT.
Author: runzhiwang <runzhiwang@tencent.com>
Closes #267 from runzhiwang/zk-wrapper-utility.
diff --git a/conf/livy.conf.template b/conf/livy.conf.template
index 1fe6047..456bec7 100644
--- a/conf/livy.conf.template
+++ b/conf/livy.conf.template
@@ -107,6 +107,8 @@
# Must set livy.server.recovery.state-store and livy.server.recovery.state-store.url to
# configure the state store.
# livy.server.recovery.mode = off
+# Zookeeper address used for HA and state store. e.g. host1:port1, host2:port2
+# livy.server.zookeeper.url =
# Where Livy should store state to for recovery. Possible values:
# <empty>: Default. State store disabled.
@@ -117,8 +119,26 @@
# For filesystem state store, the path of the state store directory. Please don't use a filesystem
# that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///.
# For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
+# If livy.server.recovery.state-store is zookeeper, this config is for back-compatibility,
+# so if both this config and livy.server.zookeeper.url exist,
+# livy uses livy.server.zookeeper.url first.
# livy.server.recovery.state-store.url =
+# The policy of curator connecting to zookeeper.
+# For example, m, n means retry m times and the interval of retry is n milliseconds.
+# Please use the new config: livy.server.zk.retry-policy.
+# Keep this config for back-compatibility.
+# If both this config and livy.server.zk.retry-policy exist,
+# livy uses livy.server.zk.retry-policy first.
+# livy.server.recovery.zk-state-store.retry-policy = 5,100
+
+# The policy of curator connecting to zookeeper.
+# For example, m, n means retry m times and the interval of retry is n milliseconds
+# livy.server.zk.retry-policy =
+
+# The dir in zk to store the data about session.
+# livy.server.recovery.zk-state-store.key-prefix = livy
+
# If Livy can't find the yarn app within this time, consider it lost.
# livy.server.yarn.app-lookup-timeout = 120s
# When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would
diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala
index 683bccc..1b06fde 100644
--- a/server/src/main/scala/org/apache/livy/LivyConf.scala
+++ b/server/src/main/scala/org/apache/livy/LivyConf.scala
@@ -185,6 +185,10 @@
* configure the state store.
*/
val RECOVERY_MODE = Entry("livy.server.recovery.mode", "off")
+
+ // Zookeeper address used for HA and state store. e.g. host1:port1, host2:port2
+ val ZOOKEEPER_URL = Entry("livy.server.zookeeper.url", null)
+
/**
* Where Livy should store state to for recovery. Possible values:
* <empty>: Default. State store disabled.
@@ -196,8 +200,32 @@
* For filesystem state store, the path of the state store directory. Please don't use a
* filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///.
* For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
+ * If livy.server.recovery.state-store is zookeeper, this config is for back-compatibility,
+ * so if both this config and livy.server.zookeeper.url exist,
+ * livy uses livy.server.zookeeper.url first.
*/
- val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "")
+ val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", null)
+
+ /**
+ * The policy of curator connecting to zookeeper.
+ * For example, m, n means retry m times and the interval of retry is n milliseconds.
+ * Please use the new config: livy.server.zk.retry-policy.
+ * Keep this config for back-compatibility.
+ * If both this config and livy.server.zk.retry-policy exist,
+ * livy uses livy.server.zk.retry-policy first.
+ */
+ val RECOVERY_ZK_STATE_STORE_RETRY_POLICY =
+ Entry("livy.server.recovery.zk-state-store.retry-policy", "5,100")
+
+ /**
+ * The policy of curator connecting to zookeeper.
+ * For example, m, n means retry m times and the interval of retry is n milliseconds
+ */
+ val ZK_RETRY_POLICY = Entry("livy.server.zk.retry-policy", null)
+
+ // The dir in zookeeper to store the data about session.
+ val RECOVERY_ZK_STATE_STORE_KEY_PREFIX =
+ Entry("livy.server.recovery.zk-state-store.key-prefix", "livy")
// Livy will cache the max no of logs specified. 0 means don't cache the logs.
val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200)
diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala
index b40a20e..3e715bd 100644
--- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala
+++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala
@@ -39,7 +39,7 @@
import org.apache.livy.server.auth.LdapAuthenticationHandlerImpl
import org.apache.livy.server.batch.BatchSessionServlet
import org.apache.livy.server.interactive.InteractiveSessionServlet
-import org.apache.livy.server.recovery.{SessionStore, StateStore}
+import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManager}
import org.apache.livy.server.ui.UIServlet
import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
@@ -60,6 +60,8 @@
private var accessManager: AccessManager = _
private var _thriftServerFactory: Option[ThriftServerFactory] = None
+ private var zkManager: Option[ZooKeeperManager] = None
+
private var ugi: UserGroupInformation = _
def start(): Unit = {
@@ -146,7 +148,12 @@
Future { SparkYarnApp.yarnClient }
}
- StateStore.init(livyConf)
+ if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") {
+ zkManager = Some(new ZooKeeperManager(livyConf))
+ zkManager.foreach(_.start())
+ }
+
+ StateStore.init(livyConf, zkManager)
val sessionStore = new SessionStore(livyConf)
val batchSessionManager = new BatchSessionManager(livyConf, sessionStore)
val interactiveSessionManager = new InteractiveSessionManager(livyConf, sessionStore)
@@ -323,6 +330,7 @@
Runtime.getRuntime().addShutdownHook(new Thread("Livy Server Shutdown") {
override def run(): Unit = {
info("Shutting down Livy server.")
+ zkManager.foreach(_.stop())
server.stop()
_thriftServerFactory.foreach(_.stop())
}
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
index ff5185b..826a2fb 100644
--- a/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
+++ b/server/src/main/scala/org/apache/livy/server/recovery/FileSystemStateStore.scala
@@ -44,7 +44,8 @@
private val fsUri = {
val fsPath = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
- require(!fsPath.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
+ require(fsPath != null && !fsPath.isEmpty,
+ s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
new URI(fsPath)
}
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala
index a6c3275..2e454db 100644
--- a/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala
+++ b/server/src/main/scala/org/apache/livy/server/recovery/StateStore.scala
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.livy.{LivyConf, Logging}
+import org.apache.livy.server.recovery.ZooKeeperManager
import org.apache.livy.sessions.SessionKindModule
import org.apache.livy.sessions.SessionManager._
@@ -78,11 +79,17 @@
object StateStore extends Logging {
private[this] var stateStore: Option[StateStore] = None
- def init(livyConf: LivyConf): Unit = synchronized {
+ def init(livyConf: LivyConf, zkManager: Option[ZooKeeperManager] = None): Unit = synchronized {
if (stateStore.isEmpty) {
val fileStateStoreClassTag = pickStateStore(livyConf)
- stateStore = Option(fileStateStoreClassTag.getDeclaredConstructor(classOf[LivyConf])
- .newInstance(livyConf).asInstanceOf[StateStore])
+ if (fileStateStoreClassTag == classOf[ZooKeeperStateStore]) {
+ stateStore = Option(fileStateStoreClassTag.
+ getDeclaredConstructor(classOf[LivyConf], classOf[ZooKeeperManager])
+ .newInstance(livyConf, zkManager.get).asInstanceOf[StateStore])
+ } else {
+ stateStore = Option(fileStateStoreClassTag.getDeclaredConstructor(classOf[LivyConf])
+ .newInstance(livyConf).asInstanceOf[StateStore])
+ }
info(s"Using ${stateStore.get.getClass.getSimpleName} for recovery.")
}
}
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala
new file mode 100644
index 0000000..2d49cbb
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.server.recovery
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.curator.framework.api.UnhandledErrorListener
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.RetryNTimes
+import org.apache.zookeeper.KeeperException.NoNodeException
+
+import org.apache.livy.LivyConf
+import org.apache.livy.Logging
+import org.apache.livy.utils.LivyUncaughtException
+
+class ZooKeeperManager(
+ livyConf: LivyConf,
+ mockCuratorClient: Option[CuratorFramework] = None)
+ extends JsonMapper with Logging {
+
+ def this(livyConf: LivyConf) {
+ this(livyConf, None)
+ }
+
+ private val zkAddress = Option(livyConf.get(LivyConf.ZOOKEEPER_URL)).
+ orElse(Option(livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL))).
+ map(_.trim).orNull
+
+ require(zkAddress != null && !zkAddress.isEmpty,
+ s"Please config ${LivyConf.ZOOKEEPER_URL.key}.")
+
+ private val retryValue = Option(livyConf.get(LivyConf.ZK_RETRY_POLICY)).
+ orElse(Option(livyConf.get(LivyConf.RECOVERY_ZK_STATE_STORE_RETRY_POLICY))).
+ map(_.trim).orNull
+
+ require(retryValue != null && !retryValue.isEmpty,
+ s"Please config ${LivyConf.ZK_RETRY_POLICY.key}.")
+
+ // a regex to match patterns like "m, n" where m and n both are integer values
+ private val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r
+ private[recovery] val retryPolicy = retryValue match {
+ case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt)
+ case _ => throw new IllegalArgumentException(
+ s"contains bad value: $retryValue. " +
+ "Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100")
+ }
+
+ private val curatorClient = mockCuratorClient.getOrElse {
+ CuratorFrameworkFactory.newClient(zkAddress, retryPolicy)
+ }
+
+ curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener {
+ def unhandledError(message: String, e: Throwable): Unit = {
+ error(s"Fatal Zookeeper error: ${message}.", e)
+ throw new LivyUncaughtException(e.getMessage)
+ }
+ })
+
+ def start(): Unit = {
+ curatorClient.start()
+ }
+
+ def stop(): Unit = {
+ curatorClient.close()
+ }
+
+ // TODO Make sure ZK path has proper secure permissions so that other users cannot read its
+ // contents.
+ def set(key: String, value: Object): Unit = {
+ val data = serializeToBytes(value)
+ if (curatorClient.checkExists().forPath(key) == null) {
+ curatorClient.create().creatingParentsIfNeeded().forPath(key, data)
+ } else {
+ curatorClient.setData().forPath(key, data)
+ }
+ }
+
+ def get[T: ClassTag](key: String): Option[T] = {
+ if (curatorClient.checkExists().forPath(key) == null) {
+ None
+ } else {
+ Option(deserialize[T](curatorClient.getData().forPath(key)))
+ }
+ }
+
+ def getChildren(key: String): Seq[String] = {
+ if (curatorClient.checkExists().forPath(key) == null) {
+ Seq.empty[String]
+ } else {
+ curatorClient.getChildren.forPath(key).asScala
+ }
+ }
+
+ def remove(key: String): Unit = {
+ try {
+ curatorClient.delete().guaranteed().forPath(key)
+ } catch {
+ case _: NoNodeException => warn(s"Fail to remove non-existed zookeeper node: ${key}")
+ }
+ }
+}
diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala
index 1b93b7a..ceb2258 100644
--- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala
+++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperStateStore.scala
@@ -16,103 +16,35 @@
*/
package org.apache.livy.server.recovery
-import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import scala.util.Try
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.framework.api.UnhandledErrorListener
-import org.apache.curator.retry.RetryNTimes
-import org.apache.zookeeper.KeeperException.NoNodeException
-
-import org.apache.livy.{LivyConf, Logging}
-import org.apache.livy.LivyConf.Entry
-
-object ZooKeeperStateStore {
- val ZK_KEY_PREFIX_CONF = Entry("livy.server.recovery.zk-state-store.key-prefix", "livy")
- val ZK_RETRY_CONF = Entry("livy.server.recovery.zk-state-store.retry-policy", "5,100")
-}
+import org.apache.livy.LivyConf
class ZooKeeperStateStore(
livyConf: LivyConf,
- mockCuratorClient: Option[CuratorFramework] = None) // For testing
- extends StateStore(livyConf) with Logging {
+ zkManager: ZooKeeperManager)
+ extends StateStore(livyConf) {
- import ZooKeeperStateStore._
-
- // Constructor defined for StateStore factory to new this class using reflection.
- def this(livyConf: LivyConf) {
- this(livyConf, None)
- }
-
- private val zkAddress = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
- require(!zkAddress.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
- private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF)
- private val retryValue = livyConf.get(ZK_RETRY_CONF)
- // a regex to match patterns like "m, n" where m and n both are integer values
- private val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r
- private[recovery] val retryPolicy = retryValue match {
- case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt)
- case _ => throw new IllegalArgumentException(
- s"$ZK_KEY_PREFIX_CONF contains bad value: $retryValue. " +
- "Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100")
- }
-
- private val curatorClient = mockCuratorClient.getOrElse {
- CuratorFrameworkFactory.newClient(zkAddress, retryPolicy)
- }
-
- Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
- override def run(): Unit = {
- curatorClient.close()
- }
- }))
-
- curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener {
- def unhandledError(message: String, e: Throwable): Unit = {
- error(s"Fatal Zookeeper error. Shutting down Livy server.")
- System.exit(1)
- }
- })
- curatorClient.start()
- // TODO Make sure ZK path has proper secure permissions so that other users cannot read its
- // contents.
+ private val zkKeyPrefix = livyConf.get(LivyConf.RECOVERY_ZK_STATE_STORE_KEY_PREFIX)
+ private def prefixKey(key: String) = s"/$zkKeyPrefix/$key"
override def set(key: String, value: Object): Unit = {
- val prefixedKey = prefixKey(key)
- val data = serializeToBytes(value)
- if (curatorClient.checkExists().forPath(prefixedKey) == null) {
- curatorClient.create().creatingParentsIfNeeded().forPath(prefixedKey, data)
- } else {
- curatorClient.setData().forPath(prefixedKey, data)
- }
+ zkManager.set(prefixKey(key), value)
}
override def get[T: ClassTag](key: String): Option[T] = {
- val prefixedKey = prefixKey(key)
- if (curatorClient.checkExists().forPath(prefixedKey) == null) {
- None
- } else {
- Option(deserialize[T](curatorClient.getData().forPath(prefixedKey)))
- }
+ zkManager.get(prefixKey(key))
}
override def getChildren(key: String): Seq[String] = {
- val prefixedKey = prefixKey(key)
- if (curatorClient.checkExists().forPath(prefixedKey) == null) {
- Seq.empty[String]
- } else {
- curatorClient.getChildren.forPath(prefixedKey).asScala
- }
+ zkManager.getChildren(prefixKey(key))
}
override def remove(key: String): Unit = {
- try {
- curatorClient.delete().guaranteed().forPath(prefixKey(key))
- } catch {
- case _: NoNodeException => warn(s"Fail to remove non-existed zookeeper node: ${key}")
- }
+ zkManager.remove(prefixKey(key))
}
- private def prefixKey(key: String) = s"/$zkKeyPrefix/$key"
+ def getZooKeeperManager(): ZooKeeperManager = {
+ zkManager
+ }
}
diff --git a/server/src/main/scala/org/apache/livy/utils/LivyUncaughtException.java b/server/src/main/scala/org/apache/livy/utils/LivyUncaughtException.java
new file mode 100644
index 0000000..89f6523
--- /dev/null
+++ b/server/src/main/scala/org/apache/livy/utils/LivyUncaughtException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.livy.utils;
+
+public class LivyUncaughtException extends Exception {
+ public LivyUncaughtException(String remoteStackTrace) {
+ super(remoteStackTrace);
+ }
+}
diff --git a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala
index 88e530f..3ad7912 100644
--- a/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala
+++ b/server/src/test/scala/org/apache/livy/server/recovery/ZooKeeperStateStoreSpec.scala
@@ -34,6 +34,7 @@
describe("ZooKeeperStateStore") {
case class TestFixture(stateStore: ZooKeeperStateStore, curatorClient: CuratorFramework)
val conf = new LivyConf()
+ conf.set(LivyConf.RECOVERY_STATE_STORE, "zookeeper")
conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host")
val key = "key"
val prefixedKey = s"/livy/$key"
@@ -42,7 +43,9 @@
val curatorClient = mock[CuratorFramework]
when(curatorClient.getUnhandledErrorListenable())
.thenReturn(mock[Listenable[UnhandledErrorListener]])
- val stateStore = new ZooKeeperStateStore(conf, Some(curatorClient))
+ val zkManager = new ZooKeeperManager(conf, Some(curatorClient))
+ zkManager.start()
+ val stateStore = new ZooKeeperStateStore(conf, zkManager)
testBody(TestFixture(stateStore, curatorClient))
}
@@ -57,11 +60,11 @@
it("should throw on bad config") {
withMock { f =>
val conf = new LivyConf()
- intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) }
+ intercept[IllegalArgumentException] { new ZooKeeperManager(conf) }
conf.set(LivyConf.RECOVERY_STATE_STORE_URL, "host")
- conf.set(ZooKeeperStateStore.ZK_RETRY_CONF, "bad")
- intercept[IllegalArgumentException] { new ZooKeeperStateStore(conf) }
+ conf.set(LivyConf.ZK_RETRY_POLICY, "bad")
+ intercept[IllegalArgumentException] { new ZooKeeperManager(conf) }
}
}
@@ -96,12 +99,12 @@
}
it("get should retrieve retry policy configs") {
- conf.set(org.apache.livy.server.recovery.ZooKeeperStateStore.ZK_RETRY_CONF, "11,77")
+ conf.set(LivyConf.ZK_RETRY_POLICY, "11,77")
withMock { f =>
mockExistsBuilder(f.curatorClient, true)
- f.stateStore.retryPolicy should not be null
- f.stateStore.retryPolicy.getN shouldBe 11
+ f.stateStore.getZooKeeperManager().retryPolicy should not be null
+ f.stateStore.getZooKeeperManager().retryPolicy.getN shouldBe 11
}
}