blob: 72063f68720075cf8d84f42665e0a7bb76d52732 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engineplugin.spark.executor
import java.io.{BufferedReader, File}
import com.webank.wedatasphere.linkis.common.utils.Utils
import com.webank.wedatasphere.linkis.engineconn.computation.executor.creation.ComputationExecutorManager
import com.webank.wedatasphere.linkis.engineconn.computation.executor.execute.EngineExecutionContext
import com.webank.wedatasphere.linkis.engineconn.computation.executor.rs.RsOutputStream
import com.webank.wedatasphere.linkis.engineplugin.spark.common.{Kind, SparkScala}
import com.webank.wedatasphere.linkis.engineplugin.spark.config.SparkConfiguration
import com.webank.wedatasphere.linkis.engineplugin.spark.entity.SparkEngineSession
import com.webank.wedatasphere.linkis.engineplugin.spark.exception.{ApplicationAlreadyStoppedException, ExecuteError, SparkSessionNullException}
import com.webank.wedatasphere.linkis.engineplugin.spark.utils.EngineUtils
import com.webank.wedatasphere.linkis.governance.common.paser.ScalaCodeParser
import com.webank.wedatasphere.linkis.scheduler.executer.{ErrorExecuteResponse, ExecuteResponse, IncompleteExecuteResponse, SuccessExecuteResponse}
import com.webank.wedatasphere.linkis.storage.resultset.ResultSetWriter
import org.apache.commons.io.IOUtils
import org.apache.commons.lang.StringUtils
import org.apache.commons.lang.exception.ExceptionUtils
import org.apache.spark.repl.SparkILoop
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.util.SparkUtils
import org.apache.spark.{SparkConf, SparkContext}
import _root_.scala.tools.nsc.GenericRunnerSettings
import scala.tools.nsc.interpreter.{IMain, JPrintWriter, NamedParam, Results, SimpleReader, StdReplTags, isReplPower, replProps}
class SparkScalaExecutor(sparkEngineSession: SparkEngineSession, id: Long) extends SparkEngineConnExecutor(sparkEngineSession.sparkContext, id) {
private val sparkContext: SparkContext = sparkEngineSession.sparkContext
private val _sqlContext: SQLContext = sparkEngineSession.sqlContext
private val sparkSession: SparkSession = sparkEngineSession.sparkSession
private val sparkConf: SparkConf = sparkContext.getConf
private var sparkILoop: SparkILoop = _
private var bindFlag: Boolean = false
private val engineExecutionContextFactory: EngineExecutionContextFactory = new EngineExecutionContextFactory
private val lineOutputStream = new RsOutputStream
private val jOut = new JPrintWriter(lineOutputStream, true)
private val jobGroup = new java.lang.StringBuilder
private var executeCount = 0
var sparkILoopInited = false
private val outputDir = sparkEngineSession.outputDir
protected implicit val executor = Utils.newCachedExecutionContext(5, "Spark-Scala-REPL-Thread-", true)
override def init(): Unit = {
System.setProperty("scala.repl.name.line", ("$line" + this.hashCode).replace('-', '0'))
setCodeParser(new ScalaCodeParser)
if (sparkILoop == null) {
synchronized {
if (sparkILoop == null) createSparkILoop
}
}
if (sparkILoop != null) {
if (!sparkILoopInited) {
sparkILoop synchronized {
if (!sparkILoopInited) {
Utils.tryCatch{
initSparkILoop
}{
t =>
logger.error("init failed: ", t)
null
}
//TODO When an exception is thrown, is there a shutdown here? I need to think about it again.(当抛出异常时,此处是否需要shutdown,需要再思考一下)
sparkILoopInited = true
}
}
}
}else{
throw new SparkSessionNullException(40006,"sparkILoop is null")
}
Utils.waitUntil(() => sparkILoopInited && sparkILoop.intp != null, SparkConfiguration.SPARK_LOOP_INIT_TIME.getValue.toDuration)
super.init()
}
override protected def getKind: Kind = SparkScala()
override protected def runCode(executor: SparkEngineConnExecutor, code: String, engineExecutionContext: EngineExecutionContext, jobGroup: String): ExecuteResponse = {
this.jobGroup.append(jobGroup)
if (null != sparkILoop.intp && null != sparkILoop.intp.classLoader) {
Thread.currentThread().setContextClassLoader(sparkILoop.intp.classLoader)
}
if (engineExecutionContext != this.engineExecutionContextFactory.getEngineExecutionContext) {
lineOutputStream.reset(engineExecutionContext)
}
lazyLoadILoop
lineOutputStream.ready()
if(sparkILoopInited) {
this.engineExecutionContextFactory.setEngineExecutionContext(engineExecutionContext)
}
var res: ExecuteResponse = null
Utils.tryCatch{
res = executeLine(code,engineExecutionContext)
}{
case e: Exception =>
sparkContext.clearJobGroup()
error("Interpreter exception", e)
// _state = Idle()
return ErrorExecuteResponse("Interpreter exception",e)
}
res match {
case SuccessExecuteResponse() =>
case IncompleteExecuteResponse(_) =>
case _ =>
sparkContext.clearJobGroup()
return res
}
res
}
def executeLine(code: String, engineExecutionContext: EngineExecutionContext): ExecuteResponse = synchronized {
if(sparkContext.isStopped) {
error("Spark application has already stopped, please restart it.")
throw new ApplicationAlreadyStoppedException(40004,"Spark application has already stopped, please restart it.")
}
executeCount += 1
val originalOut = System.out
val result = scala.Console.withOut(lineOutputStream) {
Utils.tryCatch(sparkILoop.interpret(code)){ t =>
error("task error info:", t)
val msg = ExceptionUtils.getRootCauseMessage(t)
if (msg.contains("OutOfMemoryError")) {
error("engine oom now to set status to shutdown")
ComputationExecutorManager.getInstance.getReportExecutor.tryShutdown()
}
engineExecutionContext.appendStdout("task error info: " + msg)
Results.Error
} match {
case Results.Success =>
lineOutputStream.flush()
engineExecutionContext.appendStdout("scala> " + code)
val outStr = lineOutputStream.toString()
if(outStr.length >0) {
val output = Utils.tryQuietly(ResultSetWriter.getRecordByRes(outStr, SparkConfiguration.SPARK_CONSOLE_OUTPUT_NUM.getValue))
val res = if (output != null) output.map(x => x.toString).toList.mkString("\n") else ""
if (res.length > 0) {
engineExecutionContext.appendStdout(res)
}
}
SuccessExecuteResponse()
case Results.Incomplete =>
//error("incomplete code.")
IncompleteExecuteResponse(null)
case Results.Error =>
lineOutputStream.flush()
val output = lineOutputStream.toString
IOUtils.closeQuietly(lineOutputStream)
var errorMsg: String = null
if (StringUtils.isNotBlank(output)) {
errorMsg = Utils.tryCatch(EngineUtils.getResultStrByDolphinTextContent(output))(t => t.getMessage)
error("Execute code error for "+ errorMsg)
} else {
error("No error message is captured, please see the detailed log")
}
ErrorExecuteResponse(errorMsg, ExecuteError(40005, "execute sparkScala failed!"))
}
}
// reset the java stdout
System.setOut(originalOut)
result
}
private def createSparkILoop = {
info("outputDir====> " + outputDir)
sparkILoop = Utils.tryCatch{
new SparkILoop(None, jOut)
}{
t => logger.error("create ILoop failed", t)
null
}
}
private def lazyLoadILoop = { //lazy loaded.
if(!bindFlag) {
bindSparkSession
}
}
private def initSparkILoop = {
val settings = new GenericRunnerSettings(error(_))
val sparkJars = sparkConf.getOption("spark.jars")
val jars = if (sparkConf.get("spark.master").contains("yarn")) {
val yarnJars = sparkConf.getOption("spark.yarn.dist.jars")
SparkUtils.unionFileLists(sparkJars, yarnJars).toSeq
} else {
sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
}
val classpathJars = System.getProperty("java.class.path").split(":").filter(_.endsWith(".jar"))
//.filterNot(f=> f.contains("spark-") || f.contains("datanucleus"))
val classpath = jars.mkString(File.pathSeparator) + File.pathSeparator +
classpathJars.mkString(File.pathSeparator)
debug("Spark shell add jars: " + classpath)
settings.processArguments(List("-Yrepl-class-based",
"-Yrepl-outdir", s"${outputDir.getAbsolutePath}", "-classpath", classpath), true)
settings.usejavacp.value = true
settings.embeddedDefaults(Thread.currentThread().getContextClassLoader())
sparkILoop.settings = settings
sparkILoop.createInterpreter()
val in0 = getField(sparkILoop, "scala$tools$nsc$interpreter$ILoop$$in0").asInstanceOf[Option[BufferedReader]]
val reader = in0.fold(sparkILoop.chooseReader(settings))(r => SimpleReader(r,
jOut, interactive = true))
sparkILoop.in = reader
sparkILoop.initializeSynchronous()
SparkScalaExecutor.loopPostInit(sparkILoop)
warn("spark repl has been finished, now stop it.")
/*Future {
sparkILoop.process(settings)
warn("spark repl has been finished, now stop it.")
}*/
}
protected def getField(obj: Object, name: String): Object = {
val field = obj.getClass.getField(name)
field.setAccessible(true)
field.get(obj)
}
def bindSparkSession = {
require(sparkContext != null)
require(sparkSession != null)
require(_sqlContext != null)
//Wait up to 10 seconds(最多等待10秒)
val startTime = System.currentTimeMillis()
Utils.waitUntil(() => sparkILoop.intp != null && sparkILoop.intp.isInitializeComplete, SparkConfiguration.SPARK_LANGUAGE_REPL_INIT_TIME.getValue.toDuration)
warn(s"Start to init sparkILoop cost ${System.currentTimeMillis() - startTime}.")
sparkILoop.beSilentDuring {
sparkILoop.command(":silent")
sparkILoop.bind("sc", "org.apache.spark.SparkContext", sparkContext, List("""@transient"""))
sparkILoop.bind("spark", "org.apache.spark.sql.SparkSession", sparkSession, List("""@transient"""))
sparkILoop.bind("sqlContext", "org.apache.spark.sql.SQLContext", _sqlContext, List("""@transient"""))
sparkILoop.bind("engineExecutionContextFactory", "com.webank.wedatasphere.linkis.engineplugin.spark.executor.EngineExecutionContextFactory", engineExecutionContextFactory)
sparkILoop.bind("jobGroup", "java.lang.StringBuilder", jobGroup)
sparkILoop.interpret("import org.apache.spark.SparkContext")
sparkILoop.interpret("import org.apache.spark.SparkContext._")
sparkILoop.interpret("import org.apache.spark.sql.SparkSession")
sparkILoop.interpret("import org.apache.spark.sql.SQLContext")
sparkILoop.interpret("import org.apache.spark.sql.DataFrame")
sparkILoop.interpret("import com.webank.wedatasphere.linkis.engineplugin.spark.executor.SQLSession.showDF")
sparkILoop.interpret("import com.webank.wedatasphere.linkis.engineplugin.spark.executor.SQLSession.showHTML")
sparkILoop.interpret("import sqlContext.sql")
sparkILoop.interpret("import sqlContext._")
sparkILoop.interpret("import spark.implicits._")
sparkILoop.interpret("import spark.sql")
sparkILoop.interpret("import org.apache.spark.sql.functions._")
sparkILoop.interpret("import com.webank.wedatasphere.linkis.engineplugin.spark.executor.EngineExecutionContextFactory")
sparkILoop.interpret("def showAlias(df: DataFrame, alias:String): Unit = showDF(sparkContext, jobGroup.toString, df, alias,10000, engineExecutionContextFactory.getEngineExecutionContext)")
sparkILoop.interpret("def show(df: DataFrame): Unit = showDF(sparkContext, jobGroup.toString, df,\"\",10000, engineExecutionContextFactory.getEngineExecutionContext)")
sparkILoop.interpret("def showHtml(content: Any): Unit = showHTML(sparkContext, jobGroup.toString, content, engineExecutionContextFactory.getEngineExecutionContext)")
sparkILoop.interpret("import org.apache.spark.sql.execution.datasources.csv._")
sparkILoop.interpret("import org.apache.spark.sql.UDFRegistration")
sparkILoop.interpret("val udf = UDF")
sparkILoop.interpret("implicit def toUDFMethod(udf: UDF.type): UDFRegistration = sqlContext.udf")
sparkILoop.interpret("implicit val sparkSession = spark")
bindFlag = true
warn(s"Finished to init sparkILoop cost ${System.currentTimeMillis() - startTime}.")
}
}
def getOption(key: String): Option[String] = {
val value = SparkConfiguration.SPARK_REPL_CLASSDIR.getValue
Some(value)
}
override protected def getExecutorIdPreFix: String = "SparkScalaExecutor_"
}
class EngineExecutionContextFactory {
private var engineExecutionContext: EngineExecutionContext = _
def setEngineExecutionContext(engineExecutionContext: EngineExecutionContext): Unit = this.engineExecutionContext = engineExecutionContext
def getEngineExecutionContext = this.engineExecutionContext
}
object SparkScalaExecutor {
private def loopPostInit(sparkILoop: SparkILoop): Unit = {
import StdReplTags._
import scala.reflect.{classTag, io}
val intp = sparkILoop.intp
val power = sparkILoop.power
val in = sparkILoop.in
def loopPostInit() {
// Bind intp somewhere out of the regular namespace where
// we can get at it in generated code.
intp.quietBind(NamedParam[IMain]("$intp", intp)(tagOfIMain, classTag[IMain]))
// Auto-run code via some setting.
(replProps.replAutorunCode.option
flatMap (f => io.File(f).safeSlurp())
foreach (intp quietRun _)
)
// classloader and power mode setup
intp.setContextClassLoader()
if (isReplPower) {
replProps.power setValue true
unleashAndSetPhase()
asyncMessage(power.banner)
}
// SI-7418 Now, and only now, can we enable TAB completion.
in.postInit()
}
def unleashAndSetPhase() = if (isReplPower) {
power.unleash()
intp beSilentDuring phaseCommand("typer") // Set the phase to "typer"
}
def phaseCommand(name: String): Results.Result = {
callMethod(
sparkILoop,
"scala$tools$nsc$interpreter$ILoop$$phaseCommand",
Array(classOf[String]),
Array(name)).asInstanceOf[Results.Result]
}
def asyncMessage(msg: String): Unit = {
callMethod(
sparkILoop, "asyncMessage", Array(classOf[String]), Array(msg))
}
loopPostInit()
}
def callMethod(obj: Object, name: String,
parameterTypes: Array[Class[_]],
parameters: Array[Object]): Object = {
val method = obj.getClass.getMethod(name, parameterTypes: _ *)
method.setAccessible(true)
method.invoke(obj, parameters: _ *)
}
}