blob: 73544c8900dcc3f4aff87a0684e18dc9e15d0910 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.livy
import java.lang.{Boolean => JBoolean, Long => JLong}
import java.util.{Map => JMap}
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.livy.client.common.ClientConf
import org.apache.livy.client.common.ClientConf.ConfEntry
import org.apache.livy.client.common.ClientConf.DeprecatedConf
object LivyConf {
case class Entry(override val key: String, override val dflt: AnyRef) extends ConfEntry
object Entry {
def apply(key: String, dflt: Boolean): Entry = Entry(key, dflt: JBoolean)
def apply(key: String, dflt: Int): Entry = Entry(key, dflt: Integer)
def apply(key: String, dflt: Long): Entry = Entry(key, dflt: JLong)
val TEST_MODE = ClientConf.TEST_MODE
val SPARK_HOME = Entry("livy.server.spark-home", null)
val LIVY_SPARK_MASTER = Entry("livy.spark.master", "local")
val LIVY_SPARK_DEPLOY_MODE = Entry("livy.spark.deploy-mode", null)
// Two configurations to specify Spark and related Scala version. These are internal
// configurations will be set by LivyServer and used in session creation. It is not required to
// set usually unless running with unofficial Spark + Scala versions
// (like Spark 2.0 + Scala 2.10, Spark 1.6 + Scala 2.11)
val LIVY_SPARK_SCALA_VERSION = Entry("livy.spark.scala-version", null)
val LIVY_SPARK_VERSION = Entry("livy.spark.version", null)
val SESSION_STAGING_DIR = Entry("livy.session.staging-dir", null)
val FILE_UPLOAD_MAX_SIZE = Entry("livy.file.upload.max.size", 100L * 1024 * 1024)
val LOCAL_FS_WHITELIST = Entry("livy.file.local-dir-whitelist", null)
val ENABLE_HIVE_CONTEXT = Entry("livy.repl.enable-hive-context", false)
val ENVIRONMENT = Entry("livy.environment", "production")
val SERVER_HOST = Entry("", "")
val SERVER_PORT = Entry("livy.server.port", 8998)
val SERVER_BASE_PATH = Entry("livy.ui.basePath", "")
val UI_ENABLED = Entry("livy.ui.enabled", true)
val REQUEST_HEADER_SIZE = Entry("livy.server.request-header.size", 131072)
val RESPONSE_HEADER_SIZE = Entry("livy.server.response-header.size", 131072)
val CSRF_PROTECTION = Entry("livy.server.csrf-protection.enabled", false)
val IMPERSONATION_ENABLED = Entry("livy.impersonation.enabled", false)
val SUPERUSERS = Entry("livy.superusers", null)
val ACCESS_CONTROL_ENABLED = Entry("livy.server.access-control.enabled", false)
// Allowed users to access Livy, by default any user is allowed to access Livy. If user want to
// limit who could access Livy, user should list all the permitted users with comma
// separated.
val ACCESS_CONTROL_ALLOWED_USERS = Entry("livy.server.access-control.allowed-users", "*")
val ACCESS_CONTROL_MODIFY_USERS = Entry("livy.server.access-control.modify-users", null)
val ACCESS_CONTROL_VIEW_USERS = Entry("livy.server.access-control.view-users", null)
val SSL_KEYSTORE = Entry("livy.keystore", null)
val SSL_KEYSTORE_PASSWORD = Entry("livy.keystore.password", null)
val SSL_KEY_PASSWORD = Entry("livy.key-password", null)
val AUTH_TYPE = Entry("livy.server.auth.type", null)
val AUTH_KERBEROS_PRINCIPAL = Entry("livy.server.auth.kerberos.principal", null)
val AUTH_KERBEROS_KEYTAB = Entry("livy.server.auth.kerberos.keytab", null)
val HEARTBEAT_WATCHDOG_INTERVAL = Entry("livy.server.heartbeat-watchdog.interval", "1m")
val LAUNCH_KERBEROS_PRINCIPAL = Entry("livy.server.launch.kerberos.principal", null)
val LAUNCH_KERBEROS_KEYTAB = Entry("livy.server.launch.kerberos.keytab", null)
val LAUNCH_KERBEROS_REFRESH_INTERVAL = Entry("livy.server.launch.kerberos.refresh-interval", "1h")
val KINIT_FAIL_THRESHOLD = Entry("livy.server.launch.kerberos.kinit-fail-threshold", 5)
val THRIFT_INCR_COLLECT_ENABLED = Entry("livy.server.thrift.incrementalCollect", false)
val THRIFT_SESSION_CREATION_TIMEOUT = Entry("livy.server.thrift.session.creationTimeout", "10m")
val THRIFT_SERVER_JAR_LOCATION = Entry("livy.server.thrift.jarLocation", null)
* Recovery mode of Livy. Possible values:
* off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions.
* recovery: Livy persists session info to the state store. When Livy restarts, it recovers
* previous sessions from the state store.
* Must set livy.server.recovery.state-store and livy.server.recovery.state-store.url to
* configure the state store.
val RECOVERY_MODE = Entry("livy.server.recovery.mode", "off")
* Where Livy should store state to for recovery. Possible values:
* <empty>: Default. State store disabled.
* filesystem: Store state on a file system.
* zookeeper: Store state in a Zookeeper instance.
val RECOVERY_STATE_STORE = Entry("livy.server.recovery.state-store", null)
* For filesystem state store, the path of the state store directory. Please don't use a
* filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///.
* For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "")
// Livy will cache the max no of logs specified. 0 means don't cache the logs.
val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200)
// If Livy can't find the yarn app within this time, consider it lost.
val YARN_APP_LOOKUP_TIMEOUT = Entry("", "120s")
// How often Livy polls YARN to refresh YARN app state.
val YARN_POLL_INTERVAL = Entry("livy.server.yarn.poll-interval", "5s")
// Days to keep Livy server request logs.
val REQUEST_LOG_RETAIN_DAYS = Entry("livy.server.request-log-retain.days", 5)
// REPL related jars separated with comma.
val REPL_JARS = Entry("livy.repl.jars", null)
// RSC related jars separated with comma.
val RSC_JARS = Entry("livy.rsc.jars", null)
// How long to check livy session leakage
val YARN_APP_LEAKAGE_CHECK_TIMEOUT = Entry("", "600s")
// how often to check livy session leakage
// Whether session timeout should be checked, by default it will be checked, which means inactive
// session will be stopped after "livy.server.session.timeout"
val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
// How long will an inactive session be gc-ed.
val SESSION_TIMEOUT = Entry("livy.server.session.timeout", "1h")
// How long a finished session state will be kept in memory
val SESSION_STATE_RETAIN_TIME = Entry("livy.server.session.state-retain.sec", "600s")
// Max creating session in livyServer
val SESSION_MAX_CREATION = Entry("livy.server.session.max-creation", 100)
val SPARK_MASTER = "spark.master"
val SPARK_DEPLOY_MODE = "spark.submit.deployMode"
val SPARK_JARS = "spark.jars"
val SPARK_FILES = "spark.files"
val SPARK_ARCHIVES = "spark.yarn.dist.archives"
val SPARK_PY_FILES = "spark.submit.pyFiles"
* These are Spark configurations that contain lists of files that the user can add to
* their jobs in one way or another. Livy needs to pre-process these to make sure the
* user can read them (in case they reference local files), and to provide correct URIs
* to Spark based on the Livy config.
* The configuration allows adding new configurations in case we either forget something in
* the hardcoded list, or new versions of Spark add new configs.
val SPARK_FILE_LISTS = Entry("livy.spark.file-list-configs", null)
case class DepConf(
override val key: String,
override val version: String,
override val deprecationMessage: String = "")
extends DeprecatedConf
private val configsWithAlternatives: Map[String, DeprecatedConf] = Map[String, DepConf](
LIVY_SPARK_DEPLOY_MODE.key -> DepConf("livy.spark.deployMode", "0.4"),
LIVY_SPARK_SCALA_VERSION.key -> DepConf("livy.spark.scalaVersion", "0.4"),
ENABLE_HIVE_CONTEXT.key -> DepConf("livy.repl.enableHiveContext", "0.4"),
CSRF_PROTECTION.key -> DepConf("livy.server.csrf_protection.enabled", "0.4"),
ACCESS_CONTROL_ENABLED.key -> DepConf("livy.server.access_control.enabled", "0.4"),
AUTH_KERBEROS_NAME_RULES.key -> DepConf("livy.server.auth.kerberos.name_rules", "0.4"),
DepConf("livy.server.launch.kerberos.refresh_interval", "0.4"),
KINIT_FAIL_THRESHOLD.key -> DepConf("livy.server.launch.kerberos.kinit_fail_threshold", "0.4"),
DepConf("", "0.4"),
DepConf("", "0.4")
private val deprecatedConfigs: Map[String, DeprecatedConf] = {
val configs: Seq[DepConf] = Seq(
DepConf("livy.server.access_control.users", "0.4")
Map( { cfg => (cfg.key -> cfg) }: _*)
* @param loadDefaults whether to also load values from the Java system properties
class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
import LivyConf._
lazy val hadoopConf = new Configuration()
lazy val localFsWhitelist = configToSeq(LOCAL_FS_WHITELIST).map { path =>
// Make sure the path ends with a single separator.
path.stripSuffix("/") + "/"
lazy val sparkFileLists = HARDCODED_SPARK_FILE_LISTS ++ configToSeq(SPARK_FILE_LISTS)
* Create a LivyConf that loads defaults from the system properties and the classpath.
* @return
def this() = this(true)
if (loadDefaults) {
def loadFromFile(name: String): LivyConf = {
/** Return true if spark master starts with yarn. */
def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn")
/** Return the spark deploy mode Livy sessions should use. */
def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty)
/** Return the location of the spark home directory */
def sparkHome(): Option[String] = Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME"))
/** Return the spark master Livy sessions should use. */
def sparkMaster(): String = get(LIVY_SPARK_MASTER)
/** Return the path to the spark-submit executable. */
def sparkSubmit(): String = {
sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get
private val configDir: Option[File] = {
.orElse(sys.env.get("LIVY_HOME").map(path => s"$path${File.separator}conf"))
.map(new File(_))
private def getConfigFile(name: String): Option[File] = { File(_, name)).filter(_.exists())
private def loadFromMap(map: Iterable[(String, String)]): Unit = {
map.foreach { case (k, v) =>
if (k.startsWith("livy.")) {
set(k, v)
def configToSeq(entry: LivyConf.Entry): Seq[String] = {
Option(get(entry)).map(_.split("[, ]+").toSeq).getOrElse(Nil)
override def getConfigsWithAlternatives: JMap[String, DeprecatedConf] = {
override def getDeprecatedConfigs: JMap[String, DeprecatedConf] = {