blob: 83b7f18f6cf59c61d18a6e7c232ee9a2a8f8b780 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engine.execute
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engine.conf.EngineConfiguration
import com.webank.wedatasphere.linkis.engine.extension.EnginePreExecuteHook
import com.webank.wedatasphere.linkis.resourcemanager.Resource
import com.webank.wedatasphere.linkis.scheduler.executer._
import com.webank.wedatasphere.linkis.storage.resultset.ResultSetFactory
import org.apache.commons.lang.StringUtils
import org.apache.commons.lang.exception.ExceptionUtils
import scala.collection.mutable.ArrayBuffer
/**
* Created by enjoyyin on 2018/9/17.
*/
abstract class EngineExecutor(outputPrintLimit: Int, isSupportParallelism: Boolean) extends AbstractExecutor(0) with Logging {
private val resultSetFactory = ResultSetFactory.getInstance
private var codeParser: Option[CodeParser] = None
private var resultSetListener: Option[ResultSetListener] = None
private var logListener: Option[JobLogListener] = None
private var jobProgressListener: Option[JobProgressListener] = None
private var engineInitialized = false
private var executedNum = 0
private var succeedNum = 0
private val enginePreExecuteHooks:Array[EnginePreExecuteHook] = {
val hooks = new ArrayBuffer[EnginePreExecuteHook]()
EngineConfiguration.ENGINE_PRE_EXECUTE_HOOK_CLASSES.getValue.split(",") foreach {
hookStr => Utils.tryCatch{
val clazz = Class.forName(hookStr.trim)
val obj = clazz.newInstance()
obj match {
case hook:EnginePreExecuteHook => hooks += hook
case _ => logger.warn(s"obj is not a engineHook obj is ${obj.getClass}")
}
}{
case e:Exception => logger.error(s"failed to load class ${hookStr}")
}
}
hooks.toArray
}
def setCodeParser(codeParser: CodeParser) = this.codeParser = Some(codeParser)
def setResultSetListener(resultSetListener: ResultSetListener) = this.resultSetListener = Some(resultSetListener)
def getResultSetListener = resultSetListener
def setLogListener(logListener: JobLogListener) = this.logListener = Some(logListener)
def getLogListener = logListener
def setJobProgressListener(jobProgressListener: JobProgressListener) = this.jobProgressListener = Some(jobProgressListener)
def getJobProgressListener = jobProgressListener
def init(): Unit = {}
final def ready(): Unit = {
transition(ExecutorState.Idle)
if(!engineInitialized) {
engineInitialized = true
}
}
def isEngineInitialized = engineInitialized
def getExecutedNum = executedNum
def getSucceedNum = succeedNum
def getFailedNum = executedNum - succeedNum
def getName: String
def getActualUsedResources: Resource
def getDefaultResultSetType: String = resultSetFactory.getResultSetType(0)
protected def createEngineExecutorContext(executeRequest: ExecuteRequest): EngineExecutorContext = {
val engineExecutorContext = new EngineExecutorContext(this)
executeRequest match {
case job: JobExecuteRequest => engineExecutorContext.setJobId(job.jobId)
case _ =>
}
executeRequest match {
case store: StorePathExecuteRequest => engineExecutorContext.setStorePath(store.storePath)
case _ =>
}
engineExecutorContext
}
protected def incompleteSplitter = "\n"
protected def executeLine(engineExecutorContext: EngineExecutorContext, code: String): ExecuteResponse
protected def executeCompletely(engineExecutorContext: EngineExecutorContext, code: String, completedLine: String): ExecuteResponse
override def execute(executeRequest: ExecuteRequest): ExecuteResponse = {
if (StringUtils.isEmpty(executeRequest.code)){
return IncompleteExecuteResponse("Your code is incomplete, it may be that only comments are selected for execution(您的代码不完整,可能是仅仅选中了注释进行执行)")
}
executedNum += 1
def ensureOp[A](f: => A): A = if(!this.engineInitialized) f
else if(isSupportParallelism) whenAvailable(f) else ensureIdle(f)
ensureOp {
val engineExecutorContext = createEngineExecutorContext(executeRequest)
var hookedCode = executeRequest.code;
Utils.tryCatch{
enginePreExecuteHooks foreach {
hook => logger.info(s"${hook.hookName} begins to do a hook")
hookedCode = hook.callPreExecuteHook(engineExecutorContext, executeRequest, hookedCode)
logger.info(s"${hook.hookName} ends to do a hook")
}
}{
case e:Throwable => logger.info("failed to do with hook", e)
}
info(s"hooked after code:$hookedCode")
var response: ExecuteResponse = null
val incomplete = new StringBuilder
val codes = Utils.tryCatch(codeParser.map(_.parse(hookedCode, engineExecutorContext)).getOrElse(Array(hookedCode))){
e => info("Your code will be submitted in overall mode")
Array(hookedCode)
}
engineExecutorContext.setTotalParagraph(codes.length)
codes.indices.foreach { index =>
if(engineExecutorContext.isKilled) return ErrorExecuteResponse("Job is killed by user!", null)
val code = codes(index)
engineExecutorContext.setCurrentParagraph(index + 1)
response = Utils.tryCatch(if(incomplete.nonEmpty) executeCompletely(engineExecutorContext, code, incomplete.toString)
else executeLine(engineExecutorContext, code)){ t =>
ErrorExecuteResponse(ExceptionUtils.getRootCauseMessage(t), t)
}
incomplete ++= code
//engineExecutorContext.appendStdout(getName + ">> " + incomplete.toString().trim + " complete ")
response match {
case e: ErrorExecuteResponse =>
error(s"execute code failed!", e.t)
return response
case SuccessExecuteResponse() =>
engineExecutorContext.appendStdout("\n")
incomplete.setLength(0)
case e: OutputExecuteResponse =>
incomplete.setLength(0)
val output = if(StringUtils.isNotEmpty(e.getOutput) && e.getOutput.length > outputPrintLimit)
e.getOutput.substring(0, outputPrintLimit) else e.getOutput
engineExecutorContext.appendStdout(output)
if(StringUtils.isNotBlank(e.getOutput)) engineExecutorContext.sendResultSet(e)
case _: IncompleteExecuteResponse =>
incomplete ++= incompleteSplitter
}
}
Utils.tryCatch(engineExecutorContext.close()){ t =>
response = ErrorExecuteResponse("send resultSet to entrance failed!", t)
}
response match {
case _: OutputExecuteResponse =>
succeedNum += 1
SuccessExecuteResponse()
case s: SuccessExecuteResponse =>
succeedNum += 1
s
case _ => response
}
}
}
private[engine] def tryShutdown(): Unit = this.ensureAvailable(transition(ExecutorState.ShuttingDown))
private[engine] def tryDead(): Unit = this.whenState(ExecutorState.ShuttingDown, transition(ExecutorState.Dead))
override protected def callback(): Unit = {}
// class EngineExecutorContext {
// private var jobId: Option[String] = None
// private val aliasNum = new AtomicInteger(0)
// protected var storePath: Option[String] = None
//
// def sendResultSet(resultSetWriter: ResultSetWriter[_ <: MetaData, _ <: Record]): Unit = {
// val fileName = new File(resultSetWriter.toFSPath.getPath).getName
// val index = if(fileName.indexOf(".") < 0) fileName.length else fileName.indexOf(".")
// val alias = if(fileName.startsWith("_")) fileName.substring(1, index) else fileName.substring(0, fileName.indexOf("_"))
// resultSetWriter.flush()
// Utils.tryFinally(sendResultSet(resultSetWriter.toString(), alias))(IOUtils.closeQuietly(resultSetWriter))
// }
//
// def sendResultSet(output: String): Unit = sendResultSet(output, "_" + aliasNum.getAndIncrement())
//
// private def sendResultSet(output: String, alias: String): Unit =
// if(resultSetFactory.isResultSetPath(output))
// resultSetListener.foreach(l => jobId.foreach(l.onResultSetCreated(_, output, alias)))
// else if(resultSetFactory.isResultSet(output))
// resultSetListener.foreach(l => jobId.foreach(l.onResultSetCreated(_, output, alias)))
// else throw new EngineErrorException(50050, "unknown resultSet: " + output)
//
// def setJobId(jobId: String) = this.jobId = Option(jobId)
// def getJobId = jobId
// def setStorePath(storePath: String) = this.storePath = Option(storePath)
//
// def sendResultSet(outputExecuteResponse: OutputExecuteResponse): Unit = outputExecuteResponse match {
// case AliasOutputExecuteResponse(alias, output) => sendResultSet(output, alias)
// case output: OutputExecuteResponse => sendResultSet(output.getOutput, "_" + aliasNum.getAndIncrement())
// }
//
// protected def getDefaultStorePath: String = {
// val path = ENGINE_RESULT_SET_STORE_PATH.getValue
// (if(path.endsWith("/")) path else path + "/") + "user" + "/" +
// DateFormatUtils.format(System.currentTimeMillis(), "yyyyMMdd") + "/" + Sender.getThisModuleInstance.getApplicationName +
// "/" + Sender.getThisModuleInstance.getInstance + "/" + System.nanoTime
// }
//
// def createDefaultResultSetWriter(): ResultSetWriter[_ <: MetaData, _ <: Record] = createResultSetWriter(resultSetFactory.getResultSetByType(getDefaultResultSetType))
//
// def createDefaultResultSetWriter(alias: String): ResultSetWriter[_ <: MetaData, _ <: Record] =
// createResultSetWriter(resultSetFactory.getResultSetByType(getDefaultResultSetType), alias)
//
// def createResultSetWriter(resultSetType: String): ResultSetWriter[_ <: MetaData, _ <: Record] =
// createResultSetWriter(resultSetFactory.getResultSetByType(resultSetType), null)
//
// def createResultSetWriter(resultSet: ResultSet[_ <: MetaData, _ <: Record]): ResultSetWriter[_ <: MetaData, _ <: Record] =
// createResultSetWriter(resultSet, null)
//
// def createResultSetWriter(resultSetType: String, alias: String): ResultSetWriter[_ <: MetaData, _ <: Record] =
// createResultSetWriter(resultSetFactory.getResultSetByType(resultSetType), alias)
//
// def createResultSetWriter(resultSet: ResultSet[_ <: MetaData, _ <: Record], alias: String): ResultSetWriter[_ <: MetaData, _ <: Record] = {
// val filePath = storePath.getOrElse(getDefaultStorePath)
// val fileName = if(StringUtils.isEmpty(alias)) "_" + aliasNum.getAndIncrement() else alias + "_" + aliasNum.getAndIncrement()
// val resultSetPath = resultSet.getResultSetPath(new FsPath(filePath), fileName)
// val resultSetWriter = ResultSetWriter.getResultSetWriter(resultSet, ENGINE_RESULT_SET_MAX_CACHE.getValue.toLong, resultSetPath)
// resultSetWriter
// }
//
// def appendStdout(log: String): Unit = if(!engineInitialized)
// info(log) else logListener.foreach(ll => jobId.foreach(ll.onLogUpdate(_, log)))
// }
}