blob: 933234956a876e0b93a9fe2c3957a3a8e35865a3 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.webank.wedatasphere.linkis.orchestrator.computation.service
import com.webank.wedatasphere.linkis.common.log.LogUtils
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.governance.common.entity.ExecutionNodeStatus
import com.webank.wedatasphere.linkis.governance.common.protocol.task._
import com.webank.wedatasphere.linkis.message.annotation.Receiver
import com.webank.wedatasphere.linkis.message.builder.ServiceMethodContext
import com.webank.wedatasphere.linkis.orchestrator.computation.execute.CodeExecTaskExecutorManager
import com.webank.wedatasphere.linkis.orchestrator.computation.utils.ComputationOrchestratorUtils
import com.webank.wedatasphere.linkis.orchestrator.core.ResultSet
import com.webank.wedatasphere.linkis.orchestrator.computation.monitor.EngineConnMonitor
import com.webank.wedatasphere.linkis.orchestrator.ecm.service.TaskExecutionReceiver
import com.webank.wedatasphere.linkis.orchestrator.listener.task._
import com.webank.wedatasphere.linkis.orchestrator.listener.{OrchestratorAsyncListenerBus, OrchestratorListenerBusContext, OrchestratorSyncListenerBus}
import com.webank.wedatasphere.linkis.rpc.utils.RPCUtils
import org.springframework.stereotype.Service
import javax.annotation.PostConstruct
/**
*
*
*/
@Service
class ComputationTaskExecutionReceiver extends TaskExecutionReceiver with Logging {
private val codeExecTaskExecutorManager = CodeExecTaskExecutorManager.getCodeExecTaskExecutorManager
//private val asyncListenerBus: OrchestratorAsyncListenerBus = OrchestratorListenerBusContext.getListenerBusContext().getOrchestratorAsyncListenerBus
//private val syncListenerBus: OrchestratorSyncListenerBus = OrchestratorListenerBusContext.getListenerBusContext().getOrchestratorSyncListenerBus
// TODO ListenerBus should to split into OrchestratorSessions.
// TODO Two whole ListenerBus will cause the consume problem.
@PostConstruct
private def init(): Unit = {
EngineConnMonitor.addEngineExecutorStatusMonitor(codeExecTaskExecutorManager.getAllInstanceToExecutorCache(),
failedEngineServiceInstance => {
val taskToExecutorCache = codeExecTaskExecutorManager.getAllExecTaskToExecutorCache()
val failedTaskMap = synchronized {
taskToExecutorCache.filter(_._2.getEngineConnExecutor.getServiceInstance.equals(failedEngineServiceInstance))
}
if (null != failedTaskMap && failedTaskMap.nonEmpty) {
failedTaskMap.foreach{
case (taskId, executor) =>
val execTask = executor.getExecTask
Utils.tryAndError {
warn(s"Will kill task ${execTask.getIDInfo()} because the engine ${executor.getEngineConnExecutor.getServiceInstance.toString} quited unexpectedly.")
val errLog = LogUtils.generateERROR(s"Your job : ${execTask.getIDInfo()} was failed because the engine quitted unexpectedly(任务${execTask.getIDInfo()}失败," +
s"原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM).")
val logEvent = TaskLogEvent(execTask, errLog)
execTask.getPhysicalContext.pushLog(logEvent)
val errorResponseEvent = TaskErrorResponseEvent(execTask, "task failed,Engine quitted unexpectedly(任务运行失败原因是引擎意外退出,可能是复杂任务导致引擎退出,如OOM).")
execTask.getPhysicalContext.broadcastSyncEvent(errorResponseEvent)
val statusEvent = TaskStatusEvent(execTask, ExecutionNodeStatus.Failed)
execTask.getPhysicalContext.broadcastSyncEvent(statusEvent)
}
}
}
})
}
@Receiver
override def taskLogReceiver(taskLog: ResponseTaskLog, smc: ServiceMethodContext): Unit = {
val serviceInstance = RPCUtils.getServiceInstanceFromSender(smc.getSender)
codeExecTaskExecutorManager.getByEngineConnAndTaskId(serviceInstance, taskLog.execId).foreach { codeExecutor =>
val event = TaskLogEvent(codeExecutor.getExecTask, taskLog.log)
codeExecutor.getExecTask.getPhysicalContext.pushLog(event)
codeExecutor.getEngineConnExecutor.updateLastUpdateTime()
//asyncListenerBus.post(event)
}
}
@Receiver
override def taskProgressReceiver(taskProgress: ResponseTaskProgress, smc: ServiceMethodContext): Unit = {
val serviceInstance = RPCUtils.getServiceInstanceFromSender(smc.getSender)
codeExecTaskExecutorManager.getByEngineConnAndTaskId(serviceInstance, taskProgress.execId).foreach{ codeExecutor =>
val event = TaskProgressEvent(codeExecutor.getExecTask, taskProgress.progress, taskProgress.progressInfo)
codeExecutor.getExecTask.getPhysicalContext.pushProgress(event)
codeExecutor.getEngineConnExecutor.updateLastUpdateTime()
}
}
@Receiver
override def taskStatusReceiver(taskStatus: ResponseTaskStatus, smc: ServiceMethodContext): Unit = {
val serviceInstance = RPCUtils.getServiceInstanceFromSender(smc.getSender)
codeExecTaskExecutorManager.getByEngineConnAndTaskId(serviceInstance, taskStatus.execId).foreach { codeExecutor =>
val event = TaskStatusEvent(codeExecutor.getExecTask, taskStatus.status)
info(s"From engineConn receive status info:$taskStatus, now post to listenerBus event: $event")
codeExecutor.getExecTask.getPhysicalContext.broadcastSyncEvent(event)
codeExecutor.getEngineConnExecutor.updateLastUpdateTime()
}
}
@Receiver
override def taskResultSizeReceiver(taskResultSize: ResponseTaskResultSize, smc: ServiceMethodContext): Unit = {
val serviceInstance = RPCUtils.getServiceInstanceFromSender(smc.getSender)
codeExecTaskExecutorManager.getByEngineConnAndTaskId(serviceInstance, taskResultSize.execId).foreach { codeExecutor =>
val event = TaskResultSetSizeEvent(codeExecutor.getExecTask, taskResultSize.resultSize)
info(s"From engineConn receive resultSet size info$taskResultSize, now post to listenerBus event: $event")
codeExecutor.getExecTask.getPhysicalContext.broadcastSyncEvent(event)
codeExecutor.getEngineConnExecutor.updateLastUpdateTime()
}
}
@Receiver
override def taskResultSetReceiver(taskResultSet: ResponseTaskResultSet, smc: ServiceMethodContext): Unit = {
val serviceInstance = RPCUtils.getServiceInstanceFromSender(smc.getSender)
codeExecTaskExecutorManager.getByEngineConnAndTaskId(serviceInstance, taskResultSet.execId).foreach { codeExecutor =>
val event = TaskResultSetEvent(codeExecutor.getExecTask, ResultSet(taskResultSet.output, taskResultSet.alias))
info(s"From engineConn receive resultSet info $taskResultSet , now post to listenerBus event: $event")
codeExecutor.getExecTask.getPhysicalContext.broadcastSyncEvent(event)
codeExecutor.getEngineConnExecutor.updateLastUpdateTime()
}
}
@Receiver
override def taskErrorReceiver(responseTaskError: ResponseTaskError, smc: ServiceMethodContext): Unit = {
val serviceInstance = RPCUtils.getServiceInstanceFromSender(smc.getSender)
codeExecTaskExecutorManager.getByEngineConnAndTaskId(serviceInstance, responseTaskError.execId).foreach { codeExecutor =>
val event = TaskErrorResponseEvent(codeExecutor.getExecTask, responseTaskError.errorMsg)
info(s"From engineConn receive responseTaskError info$responseTaskError, now post to listenerBus event: $event")
codeExecutor.getExecTask.getPhysicalContext.broadcastSyncEvent(event)
codeExecutor.getEngineConnExecutor.updateLastUpdateTime()
}
}
}