blob: c6916d209cf7ed98152b4d41d6305c1b2eb62801 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import java.lang.{Boolean => JBoolean, String => JString}
import java.nio.file.Paths
import org.apache.spark.memory.MonitorEnv
import org.apache.spark.util.Utils
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.kylin.query.UdfManager
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.KylinSession._
import java.util.concurrent.atomic.AtomicReference
import org.apache.commons.io.FileUtils
import org.apache.kylin.common.KylinConfig
import org.apache.kylin.common.util.ToolUtil
import org.apache.kylin.query.monitor.SparderContextCanary
import org.apache.kylin.spark.classloader.ClassLoaderUtils
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
import org.apache.spark.sql.execution.datasource.{KylinSourceStrategy, ShardFileStatusCache}
import org.apache.spark.sql.metrics.SparderMetricsListener
import org.apache.spark.utils.YarnInfoFetcherUtils
// scalastyle:off
object SparderContext extends Logging {
@volatile
private var spark: SparkSession = _
@volatile
private var initializingThread: Thread = null
@volatile
var master_app_url: String = _
def getOriginalSparkSession: SparkSession = withClassLoad {
if (spark == null || spark.sparkContext.isStopped) {
logInfo("Init spark.")
initSpark()
}
spark
}
def getSparkSession: SparkSession = {
logInfo(s"Current thread ${Thread.currentThread().getId} create a SparkSession.")
SparderContextFacade.current().getFirst
}
def closeThreadSparkSession(): Unit = {
logInfo(s"Remove SparkSession from thread ${Thread.currentThread().getId}")
SparderContextFacade.remove()
}
def setSparkSession(sparkSession: SparkSession): Unit = {
spark = sparkSession
UdfManager.create(sparkSession)
}
def setAPPMasterTrackURL(url: String): Unit = {
master_app_url = url
}
def appMasterTrackURL(): String = {
if (master_app_url != null)
master_app_url
else
""
}
def isSparkAvailable: Boolean = {
spark != null && !spark.sparkContext.isStopped
}
def restartSpark(): Unit = withClassLoad {
this.synchronized {
if (spark != null && !spark.sparkContext.isStopped) {
Utils.tryWithSafeFinally {
spark.stop()
} {
SparkContext.clearActiveContext
}
}
logInfo("Restart Spark")
init()
}
}
def stopSpark(): Unit = withClassLoad {
this.synchronized {
if (spark != null && !spark.sparkContext.isStopped) {
Utils.tryWithSafeFinally {
spark.stop()
} {
SparkContext.clearActiveContext
}
}
}
}
def init(): Unit = withClassLoad {
getOriginalSparkSession
}
def getSparkConf(key: String): String = {
getSparkSession.sparkContext.conf.get(key)
}
def initSpark(): Unit = withClassLoad {
this.synchronized {
if (initializingThread == null && (spark == null || spark.sparkContext.isStopped)) {
initializingThread = new Thread(new Runnable {
override def run(): Unit = {
try {
val kylinConf: KylinConfig = KylinConfig.getInstanceFromEnv
val master = System.getProperty("spark.local") match {
case "true" =>
"local"
case _ =>
kylinConf.getSparderConfigOverrideWithSpecificName("spark.master")
}
logInfo("SparderContext deploy with spark master: " + master)
val sparkSession = SparkSession.builder
.master(master)
.config("spark.submit.deployMode", "client")
.appName(kylinConf.getSparderAppName)
.withExtensions { ext =>
ext.injectPlannerStrategy(_ => KylinSourceStrategy)
}
.enableHiveSupport()
.getOrCreateKylinSession()
if (kylinConf.isKylinMetricsReporterForQueryEnabled) {
val appStatusListener = new SparderMetricsListener()
sparkSession.sparkContext.addSparkListener(appStatusListener)
logInfo("Query metrics reporter is enabled, sparder metrics listener is added.")
}
spark = sparkSession
val appid = sparkSession.sparkContext.applicationId
// write application id to file 'sparkappid'
val kylinHomePath = KylinConfig.getKylinHomeAtBestEffort().getCanonicalPath
try {
val appidFile = Paths.get(kylinHomePath, "sparkappid").toFile
FileUtils.writeStringToFile(appidFile, appid)
logInfo("Spark application id is " + appid)
} catch {
case e: Exception =>
logError("Failed to generate spark application id[" + appid + "] file", e)
}
logInfo("Spark context started successfully with stack trace:")
logInfo(Thread.currentThread().getStackTrace.mkString("\n"))
logInfo(
"Class loader: " + Thread
.currentThread()
.getContextClassLoader
.toString)
initMonitorEnv()
master match {
case mode: String if mode.startsWith("local") =>
var hostName = ToolUtil.getHostName
if (hostName.equals("Unknown")) {
hostName = "localhost"
}
master_app_url = "http://" + hostName + ":" + sparkSession.sparkContext.getConf
.get("spark.ui.port", "4040")
case _ =>
master_app_url = YarnInfoFetcherUtils.getTrackingUrl(appid)
}
} catch {
case throwable: Throwable =>
logError("Error for initializing spark ", throwable)
} finally {
logInfo("Setting initializing Spark thread to null.")
initializingThread = null
}
}
})
logInfo("Initializing Spark thread starting.")
initializingThread.start()
}
if (initializingThread != null) {
logInfo("Initializing Spark, waiting for done.")
initializingThread.join()
}
if (System.getProperty("spark.local") ne "true") {
//monitor sparder
SparderContextCanary.init()
}
// init FileStatusCache
ShardFileStatusCache.getFileStatusCache(getOriginalSparkSession)
}
}
def registerListener(sc: SparkContext): Unit = {
val sparkListener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case _ => // Ignore
}
}
sc.addSparkListener(sparkListener)
}
def initMonitorEnv(): Unit = {
val env = SparkEnv.get
val rpcEnv = env.rpcEnv
val sparkConf = new SparkConf
MonitorEnv.create(sparkConf, env.executorId, rpcEnv, null, isDriver = true)
logInfo("setup master endpoint finished." + "hostPort:" + rpcEnv.address.hostPort)
}
/**
* @param sqlText SQL to be validated
* @return The logical plan
* @throws ParseException if validate failed
*/
@throws[ParseException]
def validateSql(sqlText: String): LogicalPlan = {
val logicalPlan: LogicalPlan = getSparkSession.sessionState.sqlParser.parsePlan(sqlText)
logicalPlan
}
/**
* To avoid spark being affected by the environment, we use spark classloader load spark.
*
* @param body Somewhere if you use spark
* @tparam T Action function
* @return The body return
*/
def withClassLoad[T](body: => T): T = {
// val originClassLoad = Thread.currentThread().getContextClassLoader
Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader)
val t = body
// Thread.currentThread().setContextClassLoader(originClassLoad)
t
}
val _isAsyncQuery = new ThreadLocal[JBoolean]
val _separator = new ThreadLocal[JString]
val _df = new ThreadLocal[Dataset[Row]]
val _needCompute = new ThreadLocal[JBoolean] {
override protected def initialValue = false
}
//cleaned
val _numScanFiles =
new ThreadLocal[java.lang.Long] {
override protected def initialValue = 0L
}
val _queryRef =
new ThreadLocal[AtomicReference[java.lang.Boolean]]
def accumulateScanFiles(numFiles: java.lang.Long): Unit = {
_numScanFiles.set(_numScanFiles.get() + numFiles)
}
def getNumScanFiles(): java.lang.Long = {
_numScanFiles.get()
}
def setAsAsyncQuery(): Unit = {
_isAsyncQuery.set(true)
}
def isAsyncQuery: java.lang.Boolean =
if (_isAsyncQuery.get == null) false
else _isAsyncQuery.get
def setSeparator(separator: java.lang.String): Unit = {
_separator.set(separator)
}
def getSeparator: java.lang.String =
if (_separator.get == null) ","
else _separator.get
def getDF: Dataset[Row] = _df.get
def setDF(df: Dataset[Row]): Unit = {
_df.set(df)
}
def setResultRef(ref: AtomicReference[java.lang.Boolean]): Unit = {
_queryRef.set(ref)
}
def getResultRef: AtomicReference[java.lang.Boolean] = _queryRef.get
// clean it after query end
def clean(): Unit = {
_isAsyncQuery.set(null)
_separator.set(null)
_df.set(null)
_needCompute.set(null)
}
// clean it after collect
def cleanQueryInfo(): Unit = {
_numScanFiles.set(0L)
}
def needCompute(): JBoolean = {
!_needCompute.get()
}
def skipCompute(): Unit = {
_needCompute.set(true)
}
def cleanCompute(): Unit = {
_needCompute.set(false)
}
}