blob: 569b1de4da199e5cb1219ff84b7babc0ee9b93ad [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engineconn.computation.executor.service
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{BlockingQueue, ExecutorService, Future, LinkedBlockingDeque, TimeUnit}
import com.google.common.cache.{Cache, CacheBuilder}
import com.webank.wedatasphere.linkis.common.listener.Event
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.listener.LogListener
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.listener.event._
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.log.LogHelper
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.service.LockService
import com.webank.wedatasphere.linkis.engineconn.common.conf.{EngineConnConf, EngineConnConstant}
import com.webank.wedatasphere.linkis.engineconn.computation.executor.conf.ComputationExecutorConf
import com.webank.wedatasphere.linkis.engineconn.computation.executor.creation.ComputationExecutorManager
import com.webank.wedatasphere.linkis.engineconn.computation.executor.entity.{CommonEngineConnTask, EngineConnTask}
import com.webank.wedatasphere.linkis.engineconn.computation.executor.execute.{ComputationExecutor, ConcurrentComputationExecutor}
import com.webank.wedatasphere.linkis.engineconn.computation.executor.listener.{ResultSetListener, TaskProgressListener, TaskStatusListener}
import com.webank.wedatasphere.linkis.engineconn.computation.executor.utlis.{ComputaionEngineContant, ComputationEngineUtils}
import com.webank.wedatasphere.linkis.engineconn.executor.listener.ExecutorListenerBusContext
import com.webank.wedatasphere.linkis.engineconn.executor.listener.event.EngineConnSyncEvent
import com.webank.wedatasphere.linkis.governance.common.entity.ExecutionNodeStatus
import com.webank.wedatasphere.linkis.governance.common.exception.engineconn.{EngineConnExecutorErrorCode, EngineConnExecutorErrorException}
import com.webank.wedatasphere.linkis.governance.common.protocol.task._
import com.webank.wedatasphere.linkis.manager.common.entity.enumeration.NodeStatus
import com.webank.wedatasphere.linkis.message.annotation.Receiver
import com.webank.wedatasphere.linkis.message.builder.ServiceMethodContext
import com.webank.wedatasphere.linkis.protocol.message.RequestProtocol
import com.webank.wedatasphere.linkis.rpc.Sender
import com.webank.wedatasphere.linkis.rpc.utils.RPCUtils
import com.webank.wedatasphere.linkis.scheduler.executer.{ErrorExecuteResponse, ExecuteResponse, IncompleteExecuteResponse, SubmitResponse}
import com.webank.wedatasphere.linkis.server.BDPJettyServerHelper
import javax.annotation.PostConstruct
import org.apache.commons.lang.StringUtils
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import java.util
import scala.collection.JavaConverters._
@Component
class TaskExecutionServiceImpl extends TaskExecutionService with Logging with ResultSetListener with LogListener with TaskProgressListener with TaskStatusListener {
private val executorManager = ComputationExecutorManager.getInstance
private val taskExecutedNum = new AtomicInteger(0)
private var lastTask: EngineConnTask = _
private var lastTaskFuture: Future[_] = _
private var lastTaskDaemonFuture: Future[_] = _
// for concurrent executor
private var concurrentTaskQueueFifoConsumerFuture: Future[_] = _
private var concurrentTaskQueue: BlockingQueue[EngineConnTask] = _
@Autowired
private var lockService: LockService = _
private val asyncListenerBusContext = ExecutorListenerBusContext.getExecutorListenerBusContext().getEngineConnAsyncListenerBus
private val syncListenerBus = ExecutorListenerBusContext.getExecutorListenerBusContext().getEngineConnSyncListenerBus
private val taskIdCache: Cache[String, ComputationExecutor] = CacheBuilder.newBuilder().expireAfterAccess(EngineConnConf.ENGINE_TASK_EXPIRE_TIME.getValue, TimeUnit.MILLISECONDS)
.maximumSize(EngineConnConstant.MAX_TASK_NUM).build()
@PostConstruct
def init(): Unit = {
LogHelper.setLogListener(this)
syncListenerBus.addListener(this)
}
private def sendToEntrance(task: EngineConnTask, msg: RequestProtocol): Unit = synchronized {
Utils.tryCatch {
var sender : Sender = null
if (null != task && null != task.getCallbackServiceInstance()) {
sender = Sender.getSender(task.getCallbackServiceInstance())
sender.send(msg)
} else {
// todo
debug("SendtoEntrance error, cannot find entrance instance.")
}
}{
t =>
val errorMsg = s"SendToEntrance error. $msg" + t.getCause
error(errorMsg, t)
throw new EngineConnExecutorErrorException(EngineConnExecutorErrorCode.SEND_TO_ENTRANCE_ERROR, errorMsg)
}
}
@Receiver
override def execute(requestTask: RequestTask, smc: ServiceMethodContext): ExecuteResponse = {
// smc // todo get sender
// check lock
info("Received a new task, task content is " + requestTask)
if (StringUtils.isBlank(requestTask.getLock)) {
error(s"Invalid lock : ${requestTask.getLock} , requestTask : " + requestTask)
return ErrorExecuteResponse(s"Invalid lock : ${requestTask.getLock}.", new EngineConnExecutorErrorException(EngineConnExecutorErrorCode.INVALID_PARAMS, "Invalid lock or code(请获取到锁后再提交任务.)"))
}
if (!lockService.isLockExist(requestTask.getLock)) {
error(s"Lock ${requestTask.getLock} not exist, cannot execute.")
return ErrorExecuteResponse("Lock not exixt", new EngineConnExecutorErrorException(EngineConnExecutorErrorCode.INVALID_LOCK, "Lock : " + requestTask.getLock + " not exist(您的锁无效,请重新获取后再提交)."))
}
if (StringUtils.isBlank(requestTask.getCode)) {
return IncompleteExecuteResponse("Your code is incomplete, it may be that only comments are selected for execution(您的代码不完整,可能是仅仅选中了注释进行执行)")
}
// 获取任务类型和任务代码,运行任务
val taskId: Int = taskExecutedNum.incrementAndGet()
val retryAble: Boolean = {
val retry = requestTask.getProperties.getOrDefault(ComputaionEngineContant.RETRYABLE_TYPE_NAME, null)
if (null != retry) retry.asInstanceOf[Boolean]
else false
}
val task = new CommonEngineConnTask(String.valueOf(taskId), retryAble)
task.setCode(requestTask.getCode)
task.setProperties(requestTask.getProperties)
task.data(ComputaionEngineContant.LOCK_TYPE_NAME, requestTask.getLock)
task.setStatus(ExecutionNodeStatus.Scheduled)
val labels = requestTask.getLabels.asScala.toArray
task.setLabels(labels)
val entranceServerInstance = RPCUtils.getServiceInstanceFromSender(smc.getSender)
task.setCallbackServiceInstance(entranceServerInstance)
val executor = executorManager.getExecutorByLabels(labels)
executor match {
case computationExecutor: ComputationExecutor =>
taskIdCache.put(task.getTaskId, computationExecutor)
submitTask(task, computationExecutor)
case o =>
val msg = "Invalid computationExecutor : " + ComputationEngineUtils.GSON.toJson(o) + ", labels : " + ComputationEngineUtils.GSON.toJson(labels) + ", requestTask : " + requestTask
error(msg)
ErrorExecuteResponse("Invalid computationExecutor(生成无效的计算引擎,请联系管理员).",
new EngineConnExecutorErrorException(EngineConnExecutorErrorCode.INVALID_ENGINE_TYPE, msg))
}
}
// override def taskStatus(taskID: String): ResponseTaskStatus = {
// val task = taskIdCache.get(taskID)
// ResponseTaskStatus(taskID, task.getStatus.id)
// }
private def submitTask(task: CommonEngineConnTask, computationExecutor: ComputationExecutor): ExecuteResponse = {
info(s"Task ${task.getTaskId} was submited.")
computationExecutor match {
case concurrentComputationExecutor: ConcurrentComputationExecutor =>
submitConcurrentTask(task, concurrentComputationExecutor)
case _ =>
submitSyncTask(task, computationExecutor)
}
}
private def submitSyncTask(task: CommonEngineConnTask, computationExecutor: ComputationExecutor): ExecuteResponse = {
val runTask = new Runnable {
override def run(): Unit = Utils.tryAndWarn {
LogHelper.dropAllRemainLogs()
val response = computationExecutor.execute(task)
response match {
case ErrorExecuteResponse(message, throwable) =>
sendToEntrance(task, ResponseTaskError(task.getTaskId, message))
error(message, throwable)
LogHelper.pushAllRemainLogs()
computationExecutor.transformTaskStatus(task, ExecutionNodeStatus.Failed)
case _ =>
}
}
}
lastTask = task
lastTaskFuture = Utils.defaultScheduler.submit(runTask)
lastTaskDaemonFuture = openDaemonForTask(task, lastTaskFuture, Utils.defaultScheduler)
SubmitResponse(task.getTaskId)
}
private def submitConcurrentTask(task: CommonEngineConnTask, executor: ConcurrentComputationExecutor): ExecuteResponse = {
if (null == concurrentTaskQueue) synchronized {
if (null == concurrentTaskQueue) {
concurrentTaskQueue = new LinkedBlockingDeque[EngineConnTask]()
}
}
concurrentTaskQueue.put(task)
if (null == concurrentTaskQueueFifoConsumerFuture) synchronized {
val consumerRunnable = new Runnable {
override def run(): Unit = {
var errCount = 0
val ERR_COUNT_MAX = 20
while (true) {
Utils.tryCatch {
if (! executor.isBusy && ! executor.isClosed) {
val task = concurrentTaskQueue.take()
lastTask = task
info(s"Start to run task ${task.getTaskId}")
val response = executor.execute(task)
response match {
case ErrorExecuteResponse(message, throwable) =>
sendToEntrance(task, ResponseTaskError(task.getTaskId, message))
error(message, throwable)
LogHelper.pushAllRemainLogs()
executor.transformTaskStatus(task, ExecutionNodeStatus.Failed)
case _ => //TODO response maybe lose
}
}
Thread.sleep(20)
} {
case t: Throwable =>
errCount += 1
error(s"Execute task ${task.getTaskId} failed :", t)
if (errCount > ERR_COUNT_MAX) {
error(s"Executor run failed for ${errCount} times over ERROR_COUNT_MAX : ${ERR_COUNT_MAX}, will shutdown.")
executor.transition(NodeStatus.ShuttingDown)
}
}
}
}
}
if (null == concurrentTaskQueueFifoConsumerFuture) {
val consumerThread = new Thread(consumerRunnable)
consumerThread.setDaemon(true)
consumerThread.setName("ConcurrentTaskQueueFifoConsumerThread")
consumerThread.start()
}
}
SubmitResponse(task.getTaskId)
}
/**
* Open daemon thread
* @param task engine conn task
* @param scheduler scheduler
* @return
*/
private def openDaemonForTask(task: EngineConnTask, taskFuture: Future[_], scheduler: ExecutorService): Future[_] = {
scheduler.submit(new Runnable {
override def run(): Unit = {
val sleepInterval = ComputationExecutorConf.ENGINE_PROGRESS_FETCH_INTERVAL.getValue
while(null != taskFuture && !taskFuture.isDone){
sendToEntrance(task, taskProgress(task.getTaskId))
Thread.sleep(TimeUnit.MILLISECONDS.convert(sleepInterval, TimeUnit.SECONDS))
}
}
})
}
override def taskProgress(taskID: String): ResponseTaskProgress = {
var response = ResponseTaskProgress(taskID, 0, null)
if (StringUtils.isBlank(taskID)) return response
val executor = taskIdCache.getIfPresent(taskID)
if (null != executor) {
val task = executor.getTaskById(taskID)
if (null != task) {
if (ExecutionNodeStatus.isCompleted(task.getStatus)) {
response = ResponseTaskProgress(taskID, 1.0f, null)
} else {
response = ResponseTaskProgress(taskID, executor.progress(), executor.getProgressInfo)
}
} else {
response = ResponseTaskProgress(taskID, -1, null)
}
} else {
error(s"Executor of taskId : $taskID is not cached.")
}
response
}
override def taskLog(taskID: String): ResponseTaskLog = {
// todo check
null
}
// override def pauseTask(taskID: String): Unit = {
// val task = taskIdCache.get(taskID)
// // todo
// }
override def killTask(taskID: String): Unit = {
val executor = taskIdCache.getIfPresent(taskID)
if (null != executor) {
executor.killTask(taskID)
} else {
error(s"Executor of taskId : $taskID is not cached.")
}
Utils.tryAndWarn (Thread.sleep(50))
if (null != lastTask && lastTask.getTaskId.equalsIgnoreCase(taskID)) {
if (null != lastTaskFuture && !lastTaskFuture.isDone) {
Utils.tryAndWarn {
lastTaskFuture.cancel(true)
//Close the daemon also
lastTaskDaemonFuture.cancel(true)
}
}
}
}
/*override def resumeTask(taskID: String): Unit = {
// todo
}*/
@Receiver
override def dealRequestTaskStatus(requestTaskStatus: RequestTaskStatus): ResponseTaskStatus = {
val task = getTaskByTaskId(requestTaskStatus.execId)
if (null != task) {
ResponseTaskStatus(task.getTaskId, task.getStatus)
} else {
val msg = "Task null! requestTaskStatus: " + ComputationEngineUtils.GSON.toJson(requestTaskStatus)
error(msg)
ResponseTaskStatus(requestTaskStatus.execId, ExecutionNodeStatus.Cancelled)
}
}
@Receiver
override def dealRequestTaskPause(requestTaskPause: RequestTaskPause): Unit = {
info(s"Pause is Not supported for task : " + requestTaskPause.execId )
}
@Receiver
override def dealRequestTaskKill(requestTaskKill: RequestTaskKill): Unit = {
val executor = taskIdCache.getIfPresent(requestTaskKill.execId)
if (null != executor) {
executor.killTask(requestTaskKill.execId)
info(s"TaskId : ${requestTaskKill.execId} was killed by user.")
} else {
error(s"Invalid executor : null for taskId : ${requestTaskKill.execId}")
}
}
@Receiver
override def dealRequestTaskResume(requestTaskResume: RequestTaskResume): Unit = {
info(s"Resume is Not support for task : " + requestTaskResume.execId )
}
override def onEvent(event: EngineConnSyncEvent): Unit = event match {
case taskStatusChangedEvent: TaskStatusChangedEvent => onTaskStatusChanged(taskStatusChangedEvent)
case taskProgressUpdateEvent: TaskProgressUpdateEvent => onProgressUpdate(taskProgressUpdateEvent)
case logUpdateEvent: TaskLogUpdateEvent => onLogUpdate(logUpdateEvent)
case taskResultCreateEvent: TaskResultCreateEvent => onResultSetCreated(taskResultCreateEvent)
case taskResultSizeCreatedEvent: TaskResultSizeCreatedEvent => onResultSizeCreated(taskResultSizeCreatedEvent)
case _ =>
warn("Unknown event : " + BDPJettyServerHelper.gson.toJson(event))
}
override def onLogUpdate(logUpdateEvent: TaskLogUpdateEvent): Unit = {
if (EngineConnConf.ENGINE_PUSH_LOG_TO_ENTRANCE.getValue) {
if (null != logUpdateEvent && StringUtils.isNotBlank(logUpdateEvent.taskId)) {
val task = getTaskByTaskId(logUpdateEvent.taskId)
if (null != task) {
sendToEntrance(task, ResponseTaskLog(logUpdateEvent.taskId, logUpdateEvent.log))
} else {
error("Task cannot null! logupdateEvent: " + ComputationEngineUtils.GSON.toJson(logUpdateEvent))
}
} else if (null != lastTask) {
val executor = executorManager.getReportExecutor
executor match {
case computationExecutor: ComputationExecutor =>
if (computationExecutor.isBusy) {
sendToEntrance(lastTask, ResponseTaskLog(lastTask.getTaskId, logUpdateEvent.log))
}
case _ =>
error("OnLogUpdate error. Invalid ComputationExecutor : " + ComputationEngineUtils.GSON.toJson(executor))
}
} else {
info(s"Task not ready, log will be dropped : ${BDPJettyServerHelper.gson.toJson(logUpdateEvent)}")
}
}
}
override def onTaskStatusChanged(taskStatusChangedEvent: TaskStatusChangedEvent): Unit = {
val task = getTaskByTaskId(taskStatusChangedEvent.taskId)
if (null != task) {
if (ExecutionNodeStatus.isCompleted(taskStatusChangedEvent.toStatus)) {
lastTask = task
LogHelper.pushAllRemainLogs()
}
sendToEntrance(task, ResponseTaskStatus(taskStatusChangedEvent.taskId, taskStatusChangedEvent.toStatus))
} else {
error("Task cannot null! taskStatusChangedEvent: " + ComputationEngineUtils.GSON.toJson(taskStatusChangedEvent))
}
}
override def onProgressUpdate(taskProgressUpdateEvent: TaskProgressUpdateEvent): Unit = {
if (EngineConnConf.ENGINE_PUSH_LOG_TO_ENTRANCE.getValue) {
val task = getTaskByTaskId(taskProgressUpdateEvent.taskId)
if (null != task) {
sendToEntrance(task, ResponseTaskProgress(taskProgressUpdateEvent.taskId, taskProgressUpdateEvent.progress, taskProgressUpdateEvent.progressInfo))
} else {
error("Task cannot null! taskProgressUpdateEvent : " + ComputationEngineUtils.GSON.toJson(taskProgressUpdateEvent))
}
}
}
override def onResultSetCreated(taskResultCreateEvent: TaskResultCreateEvent): Unit = {
info(s"start to deal result event ${taskResultCreateEvent.taskId}")
val task = getTaskByTaskId(taskResultCreateEvent.taskId)
if (null != task) {
sendToEntrance(task, ResponseTaskResultSet(
taskResultCreateEvent.taskId,
taskResultCreateEvent.resStr,
taskResultCreateEvent.alias
))
} else {
error(s"Task cannot null! taskResultCreateEvent: ${taskResultCreateEvent.taskId}" )
}
info(s"Finished to deal result event ${taskResultCreateEvent.taskId}")
}
def getTaskByTaskId(taskId: String): EngineConnTask = {
if (StringUtils.isBlank(taskId)) return null
val executor = taskIdCache.getIfPresent(taskId)
if (null != executor) {
executor.getTaskById(taskId)
} else {
error(s"Executor of taskId : $taskId is not cached.")
null
}
}
override def onResultSizeCreated(taskResultSizeCreatedEvent: TaskResultSizeCreatedEvent): Unit = {
val task = getTaskByTaskId(taskResultSizeCreatedEvent.taskId)
if (null != task) {
sendToEntrance(task, ResponseTaskResultSize(
taskResultSizeCreatedEvent.taskId,
taskResultSizeCreatedEvent.resultSize
))
} else {
error("Task cannot null! taskResultSizeCreatedEvent: " + ComputationEngineUtils.GSON.toJson(taskResultSizeCreatedEvent))
}
}
override def onEventError(event: Event, t: Throwable): Unit = {
}
}