blob: 4bbac8e87fe3fb84a87a4a08ef9a44b6a2181401 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import java.io.File
import java.lang.reflect.{InvocationTargetException, UndeclaredThrowableException}
import java.net.URI
import scala.annotation.tailrec
import scala.collection.Map
import scala.util.Try
import scala.util.matching.Regex
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util._
import org.slf4j.Logger
import yaooqinn.kyuubi.Logging
/**
* Wrapper for [[Utils]] and [[SparkHadoopUtil]]
*/
object KyuubiSparkUtil extends Logging {
// PREFIXES
val SPARK_PREFIX = "spark."
private val YARN_PREFIX = "yarn."
private val HADOOP_PRFIX = "hadoop."
val SPARK_HADOOP_PREFIX: String = SPARK_PREFIX + HADOOP_PRFIX
private val SPARK_YARN_PREFIX: String = SPARK_PREFIX + YARN_PREFIX
private val DRIVER_PREFIX = "driver."
private val AM_PREFIX = SPARK_PREFIX + YARN_PREFIX + "am."
private val UI_PREFIX = "ui."
private val SQL_PREFIX = "sql."
private val HIVE_PREFIX = "hive."
private val METASTORE_PREFIX = "metastore."
// ENVIRONMENTS
val SPARK_HOME: String = System.getenv("SPARK_HOME")
val SPARK_JARS_DIR: String = SPARK_HOME + File.separator + "jars"
// YARN
val KEYTAB: String = SPARK_YARN_PREFIX + "keytab"
val PRINCIPAL: String = SPARK_YARN_PREFIX + "principal"
val MAX_APP_ATTEMPTS: String = SPARK_YARN_PREFIX + "maxAppAttempts"
val SPARK_YARN_JARS: String = SPARK_YARN_PREFIX + "jars"
val ACCESS_NNS: String = SPARK_YARN_PREFIX + "access.namenodes"
val ACCESS_FSS: String = SPARK_YARN_PREFIX + "access.hadoopFileSystems"
val STAGING_DIR: String = SPARK_YARN_PREFIX + "stagingDir"
val SPARK_YARN_DIST_JARS: String = SPARK_YARN_PREFIX + "dist.jars"
// DRIVER
val DRIVER_BIND_ADDR: String = SPARK_PREFIX + DRIVER_PREFIX + "bindAddress"
val DRIVER_MEM: String = SPARK_PREFIX + DRIVER_PREFIX + "memory"
val DRIVER_MEM_OVERHEAD: String = SPARK_PREFIX + YARN_PREFIX + DRIVER_PREFIX + "memoryOverhead"
val DRIVER_CORES: String = SPARK_PREFIX + DRIVER_PREFIX + "cores"
val DRIVER_EXTRA_JAVA_OPTIONS: String = SPARK_PREFIX + DRIVER_PREFIX + "extraJavaOptions"
val GC_INTERVAL: String = SPARK_PREFIX + "cleaner.periodicGC.interval"
val GC_INTERVAL_DEFAULT: String = "3min"
val AM_EXTRA_JAVA_OPTIONS: String = AM_PREFIX + "extraJavaOptions"
val SPARK_UI_PORT: String = SPARK_PREFIX + UI_PREFIX + "port"
val SPARK_UI_PORT_DEFAULT = "0"
val MULTIPLE_CONTEXTS: String = SPARK_PREFIX + DRIVER_PREFIX + "allowMultipleContexts"
val MULTIPLE_CONTEXTS_DEFAULT = "true"
// Shuffle
val PREFER_DIRECTBUF: String = SPARK_PREFIX + "shuffle." + "io.preferDirectBufs"
val PREFER_DIRECTBUF_DEFAULT = "false"
// Spark SQL
val CATALOG_IMPL: String = SPARK_PREFIX + SQL_PREFIX + "catalogImplementation"
val CATALOG_IMPL_DEFAULT = "hive"
val CONVERT_METASTORE_ORC: String =
SPARK_PREFIX + SQL_PREFIX + HIVE_PREFIX + "convertMetastoreOrc"
val ORC_IMPL: String = SPARK_PREFIX + SQL_PREFIX + "orc.impl"
val ORC_IMPL_DEFAULT = "native"
val ORC_VECTORIZED_READER_ENABLED: String =
SPARK_PREFIX + SQL_PREFIX + "orc.enableVectorizedReader"
val DEPLOY_MODE: String = SPARK_PREFIX + "submit.deployMode"
val DEPLOY_MODE_DEFAULT = "client"
// Hive
val METASTORE_JARS: String = SPARK_PREFIX + SQL_PREFIX + HIVE_PREFIX + METASTORE_PREFIX + "jars"
val METASTORE_KEYTAB: String =
SPARK_HADOOP_PREFIX + HIVE_PREFIX + METASTORE_PREFIX + "kerberos.keytab.file"
val METASTORE_PRINCIPAL: String =
SPARK_HADOOP_PREFIX + HIVE_PREFIX + METASTORE_PREFIX + "kerberos.principal"
val SPARK_LOCAL_DIR: String = SPARK_PREFIX + "local.dir"
val HIVE_VAR_PREFIX: Regex = """set:hivevar:([^=]+)""".r
val USE_DB = "use:database"
val QUEUE: String = SPARK_PREFIX + YARN_PREFIX + "queue"
val DEPRECATED_QUEUE = "mapred.job.queue.name"
val HDFS_CLIENT_CACHE: String = SPARK_HADOOP_PREFIX + "fs.hdfs.impl.disable.cache"
val HDFS_CLIENT_CACHE_DEFAULT = "true"
val FILE_CLIENT_CACHE: String = SPARK_HADOOP_PREFIX + "fs.file.impl.disable.cache"
val FILE_CLIENT_CACHE_DEFAULT = "true"
val RDD_PAR_LISTING: String = SPARK_PREFIX + "rdd.parallelListingThreshold"
// Runtime Spark Version
val SPARK_VERSION: String = org.apache.spark.SPARK_VERSION
def addShutdownHook(f: => Unit): Unit = {
ShutdownHookManager.addShutdownHook(() => f)
}
def initDaemon(log: Logger): Unit = {
Utils.initDaemon(log)
}
def getJobGroupIDKey: String = SparkContext.SPARK_JOB_GROUP_ID
def exceptionString(e: Throwable): String = {
Utils.exceptionString(e)
}
def getCurrentUserName: String = {
UserGroupInformation.getCurrentUser.getShortUserName
}
def getContextOrSparkClassLoader: ClassLoader = {
Utils.getContextOrSparkClassLoader
}
def createTempDir(
root: String = System.getProperty("java.io.tmpdir"),
namePrefix: String = "spark"): File = {
Utils.createTempDir(root, namePrefix)
}
def newConfiguration(conf: SparkConf): Configuration = {
SparkHadoopUtil.get.newConfiguration(conf)
}
def localHostName(): String = Utils.localHostName()
// org.apache.spark.util.VersionUtils: Utilities for working with Spark version strings
def majorVersion(sparkVersion: String): Int = VersionUtils.majorVersion(sparkVersion)
def minorVersion(sparkVersion: String): Int = VersionUtils.minorVersion(sparkVersion)
/**
* Check whether the runtime version of Spark is >= the specified one.
*/
def equalOrHigherThan(version: String): Boolean = {
val tMajor = majorVersion(SPARK_VERSION)
val tMinor = minorVersion(SPARK_VERSION)
val sMajor = majorVersion(version)
val sMinor = minorVersion(version)
tMajor > sMajor || (tMajor == sMajor && tMinor >= sMinor)
}
/**
* Convert a time parameter such as (50s, 100ms, or 250us) to microseconds for internal use. If
* no suffix is provided, the passed number is assumed to be in ms.
*/
def timeStringAsMs(str: String): Long = {
Utils.timeStringAsMs(str)
}
/**
* Return a well-formed URI for the file described by a user input string.
*
* If the supplied path does not contain a scheme, or is a relative path, it will be
* converted into an absolute path with a file:// scheme.
*/
def resolveURI(path: String): URI = {
Utils.resolveURI(path)
}
/**
* Get the path of a temporary directory. Spark's local directories can be configured through
* multiple settings, which are used with the following precedence:
*
* - If called from inside of a YARN container, this will return a directory chosen by YARN.
* - If the SPARK_LOCAL_DIRS environment variable is set, this will return a directory from it.
* - Otherwise, if the spark.local.dir is set, this will return a directory from it.
* - Otherwise, this will return java.io.tmpdir.
*
* Some of these configuration options might be lists of multiple paths, but this method will
* always return a single directory.
*/
def getLocalDir(conf: SparkConf): String = {
Utils.getLocalDir(conf)
}
/**
* Split a string of potentially quoted arguments from the command line the way that a shell
* would do it to determine arguments to a command. For example, if the string is 'a "b c" d',
* then it would be parsed as three arguments: 'a', 'b c' and 'd'.
*/
def splitCommandString(s: String): Seq[String] = {
Utils.splitCommandString(s)
}
/**
* Replaces all the {{APP_ID}} occurrences with the App Id.
*/
def substituteAppId(opt: String, appId: String): String = {
opt.replace("{{APP_ID}}", appId)
}
/**
* Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands
* using either
*
* (Unix-based) `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work.
* The argument is enclosed in single quotes and some key characters are escaped.
*
* @param arg A single argument.
*
* @return Argument quoted for execution via Yarn's generated shell script.
*/
def escapeForShell(arg: String): String = {
val escaped = new StringBuilder("'")
arg.foreach {
case '$' => escaped.append("\\$")
case '"' => escaped.append("\\\"")
case '\'' => escaped.append("'\\''")
case c => escaped.append(c)
}
escaped.append("'").toString()
}
/** Load properties present in the given file. */
def getPropertiesFromFile(filename: String): Map[String, String] = {
Utils.getPropertiesFromFile(filename)
}
def setActiveSparkContext(sc: SparkContext): Unit = {
info(s"Application ${sc.applicationId} has been activated")
SparkContext.setActiveContext(sc, allowMultipleContexts = true)
}
/**
* Get and set Kyuubi Jar First ClassLoader
*/
def getAndSetKyuubiFirstClassLoader: MutableURLClassLoader = {
val url = this.getClass.getProtectionDomain.getCodeSource.getLocation
val loader = new ChildFirstURLClassLoader(
Array(url),
Thread.currentThread.getContextClassLoader)
Thread.currentThread.setContextClassLoader(loader)
info(s"Kyuubi first classloader is set to $url")
loader
}
/** Determines whether the provided class is loadable in the current thread. */
def classIsLoadable(clazz: String): Boolean = {
Try { Class.forName(clazz, false, getContextOrSparkClassLoader) }.isSuccess
}
/**
* Generate proper configurations before server starts
* @param conf the default [[SparkConf]]
*/
def setupCommonConfig(conf: SparkConf): Unit = {
conf.setAppName("Kyuubi Server")
// avoid max port retries reached
conf.set(SPARK_UI_PORT, SPARK_UI_PORT_DEFAULT)
conf.set(MULTIPLE_CONTEXTS, MULTIPLE_CONTEXTS_DEFAULT)
conf.set(CATALOG_IMPL, CATALOG_IMPL_DEFAULT)
// For the server itself the deploy mode could be either client or cluster,
// but for the later [[SparkContext]] must be set to client mode
conf.set(DEPLOY_MODE, DEPLOY_MODE_DEFAULT)
// The delegation token store implementation. Set to MemoryTokenStore always.
conf.set("spark.hadoop.hive.cluster.delegation.token.store.class",
"org.apache.hadoop.hive.thrift.MemoryTokenStore")
conf.getOption(METASTORE_JARS) match {
case None | Some("builtin") =>
case _ =>
conf.set(METASTORE_JARS, "builtin")
info(s"Kyuubi prefer $METASTORE_JARS to be builtin ones")
}
// Set missing Kyuubi configs to SparkConf
KyuubiConf.getAllDefaults.foreach(kv => conf.setIfMissing(kv._1, kv._2))
conf.setIfMissing(PREFER_DIRECTBUF, PREFER_DIRECTBUF_DEFAULT)
conf.setIfMissing(SPARK_LOCAL_DIR, conf.get(KyuubiConf.BACKEND_SESSION_LOCAL_DIR.key))
conf.setIfMissing(GC_INTERVAL, GC_INTERVAL_DEFAULT)
if (UserGroupInformation.isSecurityEnabled) {
// Spark SQL does not use its internal ORC implementation to r/w hive orc tables by default,
// but use hive serde which will cause token expiration issues while getting file splits.
// So we force to set CONVERT_METASTORE_ORC to true if we are using Kyuubi with kerberized
// hadoop clusters.
// see https://github.com/yaooqinn/kyuubi/issues/153
conf.set(CONVERT_METASTORE_ORC, "true")
} else {
// If the cluster is not secured, firstly we respect user's choice if the explicitly set, and
// take true as default value for better performance if users are not concerned.
conf.setIfMissing(CONVERT_METASTORE_ORC, "true")
}
conf.setIfMissing(ORC_IMPL, ORC_IMPL_DEFAULT)
conf.setIfMissing(ORC_VECTORIZED_READER_ENABLED, "true")
if (UserGroupInformation.isSecurityEnabled) {
conf.setIfMissing(HDFS_CLIENT_CACHE, HDFS_CLIENT_CACHE_DEFAULT)
conf.setIfMissing(FILE_CLIENT_CACHE, FILE_CLIENT_CACHE_DEFAULT)
// If you are using Kyuubi against kerberized HDFS, you will run into HDFS_DELEGATION_TOKEN
// expiration in some particular sql queries. This exception is usually caught in
// HadoopRDD.getPartitions, where the JobConf has no Credentials because it is generated by
// Configuration, and the UGI.getCurrentUser contains only the oldest tokens which are
// destined to expire. The reason seems to be parallel listing UnionRDD's sub RDDs using a
// ForkJoinPool which points to another calling context. Turn off parallel listing seems
// to be a solution to this issue.
conf.setIfMissing(RDD_PAR_LISTING, Int.MaxValue.toString)
}
val kyuubiJar = Option(System.getenv("KYUUBI_JAR")).getOrElse("")
val distJars = conf.getOption(SPARK_YARN_DIST_JARS) match {
case Some(jars) if jars.nonEmpty =>
if (kyuubiJar != null && kyuubiJar.nonEmpty) jars + "," + kyuubiJar else jars
case _ => kyuubiJar
}
conf.set(SPARK_YARN_DIST_JARS, distJars)
}
@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause != null) findCause(e.getCause) else e
case e: InvocationTargetException =>
if (e.getCause != null) findCause(e.getCause) else e
case e: Throwable => e
}
}