blob: 4c573563b85b81bcb3bf318f63c90eeb85f6aae7 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.webank.wedatasphere.linkis.orchestrator.computation.physical
import java.util.concurrent.TimeUnit
import com.webank.wedatasphere.linkis.common.exception.{ErrorException, LinkisRetryException, WarnException}
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.governance.common.protocol.task.{RequestTask, RequestTaskExecute}
import com.webank.wedatasphere.linkis.governance.common.utils.GovernanceConstant
import com.webank.wedatasphere.linkis.manager.label.entity.Label
import com.webank.wedatasphere.linkis.orchestrator.computation.conf.ComputationOrchestratorConf
import com.webank.wedatasphere.linkis.orchestrator.computation.execute.{CodeExecTaskExecutor, CodeExecTaskExecutorManager}
import com.webank.wedatasphere.linkis.orchestrator.exception.{OrchestratorErrorCodeSummary, OrchestratorErrorException, OrchestratorRetryException}
import com.webank.wedatasphere.linkis.orchestrator.execution.AsyncTaskResponse.NotifyListener
import com.webank.wedatasphere.linkis.orchestrator.execution.impl.DefaultFailedTaskResponse
import com.webank.wedatasphere.linkis.orchestrator.execution.{AsyncTaskResponse, TaskResponse}
import com.webank.wedatasphere.linkis.orchestrator.plans.ast.QueryParams
import com.webank.wedatasphere.linkis.orchestrator.plans.physical.{AbstractExecTask, ExecTask, PhysicalContext, ReheatableExecTask, RetryExecTask}
import com.webank.wedatasphere.linkis.orchestrator.plans.unit.CodeLogicalUnit
import com.webank.wedatasphere.linkis.orchestrator.strategy.async.AsyncExecTask
import com.webank.wedatasphere.linkis.orchestrator.strategy.{ResultSetExecTask, StatusInfoExecTask}
import com.webank.wedatasphere.linkis.orchestrator.utils.OrchestratorIDCreator
import com.webank.wedatasphere.linkis.scheduler.executer.{ErrorExecuteResponse, SubmitResponse}
import org.apache.commons.lang.StringUtils
import scala.concurrent.duration.Duration
import scala.collection.convert.decorateAsScala._
/**
*
*
*/
class CodeLogicalUnitExecTask (parents: Array[ExecTask], children: Array[ExecTask])
extends AbstractExecTask(parents, children) with StatusInfoExecTask with ResultSetExecTask
with AsyncExecTask with Logging {
private var physicalContext: PhysicalContext = _
private var id: String = _
private val codeExecTaskExecutorManager = CodeExecTaskExecutorManager.getCodeExecTaskExecutorManager
private val askDuration = Duration(ComputationOrchestratorConf.MAX_ASK_EXECUTOR_TIME.getValue.toLong, TimeUnit.MILLISECONDS)
private var codeLogicalUnit: CodeLogicalUnit = _
override def execute(): TaskResponse = {
info(s"Start to execute CodeLogicalUnitExecTask(${getIDInfo()}).")
var executor: Option[CodeExecTaskExecutor] = None
var retryException: LinkisRetryException = null
executor = Utils.tryCatch(codeExecTaskExecutorManager.askExecutor(this, askDuration)) {
case retry: LinkisRetryException =>
retryException = retry
None
case e:ErrorException =>
//job.getLogListener.foreach(_.onLogUpdate(job, LogUtils.generateERROR(e.getMessage)))
throw e
case error: Throwable =>
//job.getLogListener.foreach(_.onLogUpdate(job, LogUtils.generateERROR(error.getMessage)))
throw error
}
if (executor.isDefined) {
val requestTask = toRequestTask
val codeExecutor = executor.get
val response = codeExecutor.getEngineConnExecutor.execute(requestTask)
response match {
case SubmitResponse(engineConnExecId) =>
//封装engineConnExecId信息
codeExecutor.setEngineConnTaskId(engineConnExecId)
codeExecTaskExecutorManager.addEngineConnTaskID(codeExecutor)
new AsyncTaskResponse {
override def notifyMe(listener: NotifyListener): Unit = null
override def waitForCompleted(): TaskResponse = throw new OrchestratorErrorException(OrchestratorErrorCodeSummary.METHOD_NUT_SUPPORT_CODE, "waitForCompleted method not support")
}
case ErrorExecuteResponse(message, t) =>
info(s"failed to submit task to engineConn,reason: $message")
throw new OrchestratorRetryException(OrchestratorErrorCodeSummary.EXECUTION_FOR_EXECUTION_ERROR_CODE, "failed to submit task to engineConn", t)
}
} else if(null != retryException) {
new DefaultFailedTaskResponse(s"ask Engine failed + ${retryException.getMessage}", OrchestratorErrorCodeSummary.EXECUTION_FOR_EXECUTION_ERROR_CODE, retryException)
} else {
throw new OrchestratorRetryException(OrchestratorErrorCodeSummary.EXECUTION_FOR_EXECUTION_ERROR_CODE, "Failed to ask executor")
}
}
private def toRequestTask: RequestTask ={
val requestTask = new RequestTaskExecute
requestTask.setCode(getCodeLogicalUnit.toStringCode)
getLabels.add(getCodeLogicalUnit.getLabel)
requestTask.setLabels(getLabels)
//Map
// if (null != getParams.getRuntimeParams.getDataSources ) {
// requestTask.getProperties.putAll(getParams.getRuntimeParams.getDataSources)
// }
// if (null != getParams.getRuntimeParams.getContext) {
// requestTask.getProperties.putAll(getParams.getRuntimeParams.getContext)
// }
// if (null != getParams.getRuntimeParams.getSpecials) {
// requestTask.getProperties.putAll(getParams.getRuntimeParams.getSpecials)
// }
if (null != getParams.getRuntimeParams.getJobs) {
requestTask.getProperties.putAll(getParams.getRuntimeParams.getJobs)
}
// requestTask.getProperties.put(GovernanceConstant.TASK_SOURCE_MAP_KEY, getParams.getRuntimeParams.get(GovernanceConstant.TASK_SOURCE_MAP_KEY) match {
// case o: Object => o
// case _ => null
// })
requestTask.getProperties.putAll(getParams.getRuntimeParams.toMap)
requestTask
}
override def isLocalMode: Boolean = false
override def canExecute: Boolean = true
override def verboseString: String = s"CodeLogicalUnitExecTask(codes=${codeLogicalUnit.codes.toArray.mkString(";")}, labels=${getLabels.asScala.map(_.getStringValue).mkString("&")})"
override def initialize(physicalContext: PhysicalContext): Unit = {
this.physicalContext = physicalContext
//CodeLogicalUnitExecTask is an executable task which has progress information, so initialize it as 0.0
}
override def getId: String = {
if (null == id) synchronized {
if (null == id) {
id = OrchestratorIDCreator.getPhysicalTaskIDCreator.nextID("codeExec")
}
}
id
}
override def getPhysicalContext: PhysicalContext = physicalContext
def getCodeLogicalUnit: CodeLogicalUnit = this.codeLogicalUnit
def setCodeLogicalUnit(codeLogicalUnit: CodeLogicalUnit): Unit = this.codeLogicalUnit = codeLogicalUnit
def getParams: QueryParams = {
getTaskDesc.getOrigin.getASTOrchestration.getASTContext.getParams
}
def getLabels: java.util.List[Label[_]] = {
val labels = getTaskDesc.getOrigin.getASTOrchestration.getASTContext.getLabels
if (null != labels) {
labels
} else {
null
}
}
def getExecuteUser: String = {
getTaskDesc.getOrigin.getASTOrchestration.getASTContext.getExecuteUser
}
override def kill(): Unit = {
codeExecTaskExecutorManager.getByExecTaskId(this.getId).foreach { codeEngineConnExecutor =>
if (StringUtils.isNotBlank(codeEngineConnExecutor.getEngineConnTaskId)) {
info(s"execTask($getId) be killed, engineConn execId is${codeEngineConnExecutor.getEngineConnTaskId}")
Utils.tryAndWarn(codeEngineConnExecutor.getEngineConnExecutor.killTask(codeEngineConnExecutor.getEngineConnTaskId))
Utils.tryAndWarn(codeExecTaskExecutorManager.unLockEngineConn(this, codeEngineConnExecutor))
}
}
}
def getCodeEngineConnExecutor: CodeExecTaskExecutor = {
codeExecTaskExecutorManager.getByExecTaskId(this.getId).orNull
}
override protected def newNode(): ExecTask = {
val task = new CodeLogicalUnitExecTask(null, null)
task.setCodeLogicalUnit(codeLogicalUnit)
task.setTaskDesc(getTaskDesc)
task
}
/*override def close(): Unit = {
codeExecTaskExecutorManager.getByExecTaskId(this.getId).foreach { codeEngineConnExecutor =>
info(s"ExecTask(${getIDInfo()}) be closed.")
Utils.tryAndWarn(codeExecTaskExecutorManager.delete(this, codeEngineConnExecutor))
}
}*/
override def clear(isSucceed: Boolean): Unit = {
codeExecTaskExecutorManager.getByExecTaskId(this.getId).foreach { codeEngineConnExecutor =>
if (isSucceed) {
info(s"ExecTask(${getIDInfo()}) execute success executor be delete.")
Utils.tryAndWarn(codeExecTaskExecutorManager.delete(this, codeEngineConnExecutor))
} else {
info(s"ExecTask(${getIDInfo()}) execute failed executor be unLock.")
Utils.tryAndWarn(codeExecTaskExecutorManager.unLockEngineConn(this, codeEngineConnExecutor))
}
}
}
}