blob: 6abf2a777c8c740adf6e6a432c6cc47803e20807 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engineconn.computation.executor.execute
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import com.google.common.cache.{Cache, CacheBuilder}
import com.webank.wedatasphere.linkis.DataWorkCloudApplication
import com.webank.wedatasphere.linkis.common.log.LogUtils
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.listener.event.TaskStatusChangedEvent
import com.webank.wedatasphere.linkis.engineconn.common.conf.{EngineConnConf, EngineConnConstant}
import com.webank.wedatasphere.linkis.engineconn.computation.executor.conf.ComputationExecutorConf
import com.webank.wedatasphere.linkis.engineconn.computation.executor.creation.ComputationExecutorManager
import com.webank.wedatasphere.linkis.engineconn.computation.executor.entity.EngineConnTask
import com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationExecutorHook
import com.webank.wedatasphere.linkis.engineconn.core.EngineConnObject
import com.webank.wedatasphere.linkis.engineconn.core.engineconn.EngineConnManager
import com.webank.wedatasphere.linkis.engineconn.core.executor.ExecutorManager
import com.webank.wedatasphere.linkis.engineconn.executor.entity.{LabelExecutor, ResourceExecutor}
import com.webank.wedatasphere.linkis.engineconn.executor.listener.ExecutorListenerBusContext
import com.webank.wedatasphere.linkis.governance.common.entity.ExecutionNodeStatus
import com.webank.wedatasphere.linkis.governance.common.paser.CodeParser
import com.webank.wedatasphere.linkis.governance.common.protocol.task.{EngineConcurrentInfo, RequestTask}
import com.webank.wedatasphere.linkis.manager.common.entity.enumeration.NodeStatus
import com.webank.wedatasphere.linkis.manager.engineplugin.common.creation.ExecutorFactory
import com.webank.wedatasphere.linkis.manager.label.entity.engine.UserCreatorLabel
import com.webank.wedatasphere.linkis.manager.label.entity.entrance.ExecuteOnceLabel
import com.webank.wedatasphere.linkis.protocol.engine.JobProgressInfo
import com.webank.wedatasphere.linkis.scheduler.executer._
import org.apache.commons.lang.StringUtils
import org.apache.commons.lang.exception.ExceptionUtils
abstract class ComputationExecutor(val outputPrintLimit: Int = 1000) extends AccessibleExecutor with ResourceExecutor with LabelExecutor with Logging {
private val listenerBusContext = ExecutorListenerBusContext.getExecutorListenerBusContext()
// private val taskMap: util.Map[String, EngineConnTask] = new ConcurrentHashMap[String, EngineConnTask](8)
private val taskCache: Cache[String, EngineConnTask] = CacheBuilder.newBuilder().expireAfterAccess(EngineConnConf.ENGINE_TASK_EXPIRE_TIME.getValue, TimeUnit.MILLISECONDS)
.maximumSize(EngineConnConstant.MAX_TASK_NUM).build()
private var engineInitialized: Boolean = false
private var internalExecute: Boolean = false
private var codeParser: Option[CodeParser] = None
private var runningTasks: Count = new Count
private var pendingTasks: Count = new Count
private var succeedTasks: Count = new Count
private var failedTasks: Count = new Count
private var lastTask: EngineConnTask = _
private val MAX_TASK_EXECUTE_NUM = ComputationExecutorConf.ENGINE_MAX_TASK_EXECUTE_NUM.getValue
protected def setInitialized(inited: Boolean = true): Unit = this.engineInitialized = inited
final override def tryReady(): Boolean = {
transition(NodeStatus.Unlock)
if (!engineInitialized) {
engineInitialized = true
}
info(s"Executor($getId) is ready.")
true
}
override def init(): Unit = {
setInitialized()
info(s"Executor($getId) inited : ${isEngineInitialized}")
}
def tryShutdown(): Boolean = {
transition(NodeStatus.ShuttingDown)
true
}
def tryFailed(): Boolean = {
this.whenStatus(NodeStatus.ShuttingDown, transition(NodeStatus.Failed))
true
}
override def trySucceed(): Boolean = false
def getSucceedNum: Int = succeedTasks.getCount()
def getFailedNum: Int = failedTasks.getCount()
def getRunningTask: Int = runningTasks.getCount()
protected def getExecutorConcurrentInfo: EngineConcurrentInfo = EngineConcurrentInfo(getRunningTask, 0, getSucceedNum, getFailedNum)
def isEngineInitialized: Boolean = engineInitialized
def isInternalExecute: Boolean = internalExecute
protected def callback(): Unit = {}
override def close(): Unit = {
if (null != lastTask) synchronized {
killTask(lastTask.getTaskId)
} else {
killTask("By close")
}
super.close()
}
// override def getName: String = ComputationExecutorConf.DEFAULT_COMPUTATION_NAME
protected def ensureOp[A](f: => A): A = if (!isEngineInitialized)
f
else ensureIdle(f)
protected def beforeExecute(engineConnTask: EngineConnTask): Unit = {}
protected def afterExecute(engineConnTask: EngineConnTask, executeResponse: ExecuteResponse): Unit = {
val executorNumber = getSucceedNum + getFailedNum
if (MAX_TASK_EXECUTE_NUM > 0 && runningTasks.getCount() == 0 && executorNumber > MAX_TASK_EXECUTE_NUM) {
error(s"Task has reached max execute number $MAX_TASK_EXECUTE_NUM, now tryShutdown. ")
ExecutorManager.getInstance.getReportExecutor.tryShutdown()
}
}
def toExecuteTask(engineConnTask: EngineConnTask, internalExecute: Boolean = false): ExecuteResponse = {
runningTasks.increase()
this.internalExecute = internalExecute
Utils.tryFinally{
transformTaskStatus(engineConnTask, ExecutionNodeStatus.Running)
val engineExecutionContext = createEngineExecutionContext(engineConnTask)
var hookedCode = engineConnTask.getCode
Utils.tryCatch {
val engineCreationContext = EngineConnObject.getEngineCreationContext
ComputationExecutorHook.getComputationExecutorHooks.foreach(hook => {
hookedCode = hook.beforeExecutorExecute(engineExecutionContext, engineCreationContext, hookedCode)
})
} ( e => info("failed to do with hook", e))
if (hookedCode.length > 100) {
info(s"hooked after code: ${hookedCode.substring(0, 100)} ....")
} else {
info(s"hooked after code: $hookedCode ")
}
val localPath = EngineConnConf.getLogDir
engineExecutionContext.appendStdout(LogUtils.generateInfo(s"EngineConn local log path: ${DataWorkCloudApplication.getServiceInstance.toString} $localPath"))
var response: ExecuteResponse = null
val incomplete = new StringBuilder
val codes = Utils.tryCatch(getCodeParser.map(_.parse(hookedCode)).getOrElse(Array(hookedCode))) { e =>
info("Your code will be submitted in overall mode.", e)
Array(hookedCode)
}
engineExecutionContext.setTotalParagraph(codes.length)
codes.indices.foreach({
index =>
if (ExecutionNodeStatus.Cancelled == engineConnTask.getStatus) return ErrorExecuteResponse("Job is killed by user!", null)
val code = codes(index)
engineExecutionContext.setCurrentParagraph(index + 1)
response = Utils.tryCatch(if (incomplete.nonEmpty) executeCompletely(engineExecutionContext, code, incomplete.toString())
else executeLine(engineExecutionContext, code)
) {
t => ErrorExecuteResponse(ExceptionUtils.getRootCauseMessage(t), t)
}
//info(s"Finished to execute task ${engineConnTask.getTaskId}")
incomplete ++= code
response match {
case e: ErrorExecuteResponse =>
failedTasks.increase()
error("execute code failed!", e.t)
return response
case SuccessExecuteResponse() =>
engineExecutionContext.appendStdout("\n")
incomplete.setLength(0)
case e: OutputExecuteResponse =>
incomplete.setLength(0)
val output = if (StringUtils.isNotEmpty(e.getOutput) && e.getOutput.length > outputPrintLimit)
e.getOutput.substring(0, outputPrintLimit) else e.getOutput
engineExecutionContext.appendStdout(output)
if (StringUtils.isNotBlank(e.getOutput)) engineExecutionContext.sendResultSet(e)
case _: IncompleteExecuteResponse =>
incomplete ++= incompleteSplitter
}
})
Utils.tryCatch(engineExecutionContext.close()) {
t =>
response = ErrorExecuteResponse("send resultSet to entrance failed!", t)
failedTasks.increase()
}
response = response match {
case _: OutputExecuteResponse =>
succeedTasks.increase()
transformTaskStatus(engineConnTask, ExecutionNodeStatus.Succeed)
SuccessExecuteResponse()
case s: SuccessExecuteResponse =>
succeedTasks.increase()
transformTaskStatus(engineConnTask, ExecutionNodeStatus.Succeed)
s
case _ => response
}
response
}{
runningTasks.decrease()
this.internalExecute = false
}
}
def execute(engineConnTask: EngineConnTask): ExecuteResponse = {
info(s"start to execute task ${engineConnTask.getTaskId}")
updateLastActivityTime()
beforeExecute(engineConnTask)
taskCache.put(engineConnTask.getTaskId, engineConnTask)
lastTask = engineConnTask
val response = ensureOp {
toExecuteTask(engineConnTask)
}
Utils.tryAndWarn(afterExecute(engineConnTask, response))
info(s"Finished to execute task ${engineConnTask.getTaskId}")
lastTask = null
response
}
def setCodeParser(codeParser: CodeParser): Unit = this.codeParser = Some(codeParser)
def getCodeParser: Option[CodeParser] = this.codeParser
def executeLine(engineExecutorContext: EngineExecutionContext, code: String): ExecuteResponse
protected def incompleteSplitter = "\n"
def executeCompletely(engineExecutorContext: EngineExecutionContext, code: String, completedLine: String): ExecuteResponse
def progress(): Float
def getProgressInfo: Array[JobProgressInfo]
protected def createEngineExecutionContext(engineConnTask: EngineConnTask): EngineExecutionContext = {
val userCreator = engineConnTask.getLables.find(_.isInstanceOf[UserCreatorLabel])
.map{case label: UserCreatorLabel => label}.orNull
val engineExecutionContext = if (null != userCreator && StringUtils.isNotBlank(userCreator.getUser)) {
new EngineExecutionContext(this, userCreator.getUser)
} else {
new EngineExecutionContext(this)
}
if (engineConnTask.getProperties.containsKey(RequestTask.RESULT_SET_STORE_PATH)) {
engineExecutionContext.setStorePath(engineConnTask.getProperties.get(RequestTask.RESULT_SET_STORE_PATH).toString)
}
info(s"StorePath : ${engineExecutionContext.getStorePath.orNull}.")
engineExecutionContext.setJobId(engineConnTask.getTaskId)
engineExecutionContext.getProperties.putAll(engineConnTask.getProperties)
engineExecutionContext.setLabels(engineConnTask.getLables)
engineExecutionContext
}
def killTask(taskId: String): Unit = {
Utils.tryAndWarn {
val task = getTaskById(taskId)
if (null != task) {
task.setStatus(ExecutionNodeStatus.Cancelled)
transformTaskStatus(task, ExecutionNodeStatus.Cancelled)
}
}
}
def transformTaskStatus(task: EngineConnTask, newStatus: ExecutionNodeStatus): Unit = {
val oriStatus = task.getStatus
info(s"task ${task.getTaskId} from status $oriStatus to new status $newStatus")
oriStatus match {
case ExecutionNodeStatus.Scheduled =>
if (task.getStatus != newStatus) {
task.setStatus(newStatus)
}
case ExecutionNodeStatus.Running =>
if (newStatus == ExecutionNodeStatus.Succeed || newStatus == ExecutionNodeStatus.Failed || newStatus == ExecutionNodeStatus.Cancelled) {
task.setStatus(newStatus)
} else {
error(s"Task status change error. task: $task, newStatus : $newStatus.")
}
case _ =>
error(s"Task status change error. task: $task, newStatus : $newStatus.")
}
if (oriStatus != newStatus && !isInternalExecute) {
listenerBusContext.getEngineConnSyncListenerBus.postToAll(TaskStatusChangedEvent(task.getTaskId, oriStatus, newStatus))
}
}
def getTaskById(taskId: String): EngineConnTask = {
taskCache.getIfPresent(taskId)
}
def clearTaskCache(taskId: String): Unit = {
taskCache.invalidate(taskId)
}
}
class Count{
val count = new AtomicInteger(0)
def getCount(): Int = {
count.get()
}
def increase() : Unit = {
count.incrementAndGet()
}
def decrease(): Unit = {
count.decrementAndGet()
}
}