| /* |
| * Copyright 2019 WeBank |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package com.webank.wedatasphere.linkis.orchestrator.computation.execute |
| |
| import java.util |
| |
| import com.webank.wedatasphere.linkis.common.ServiceInstance |
| import com.webank.wedatasphere.linkis.common.exception.{LinkisRetryException, WarnException} |
| import com.webank.wedatasphere.linkis.common.log.LogUtils |
| import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils} |
| import com.webank.wedatasphere.linkis.manager.label.constant.LabelKeyConstant |
| import com.webank.wedatasphere.linkis.manager.label.entity.Label |
| import com.webank.wedatasphere.linkis.manager.label.entity.entrance.LoadBalanceLabel |
| import com.webank.wedatasphere.linkis.manager.label.utils.{LabelUtil, LabelUtils} |
| import com.webank.wedatasphere.linkis.orchestrator.computation.conf.ComputationOrchestratorConf |
| import com.webank.wedatasphere.linkis.orchestrator.computation.physical.CodeLogicalUnitExecTask |
| import com.webank.wedatasphere.linkis.orchestrator.ecm.entity.{DefaultMarkReq, LoadBanlanceMarkReq, Mark, MarkReq, Policy} |
| import com.webank.wedatasphere.linkis.orchestrator.ecm.{EngineConnManager, EngineConnManagerBuilder} |
| import com.webank.wedatasphere.linkis.orchestrator.exception.{OrchestratorLabelConflictException, OrchestratorUseSameEngineException} |
| import com.webank.wedatasphere.linkis.orchestrator.listener.task.TaskLogEvent |
| import org.apache.commons.lang.StringUtils |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection.mutable |
| import scala.concurrent.duration.Duration |
| |
| /** |
| * |
| */ |
| class DefaultCodeExecTaskExecutorManager extends CodeExecTaskExecutorManager with Logging { |
| |
| private val instanceToExecutors = new mutable.HashMap[ServiceInstance, Array[CodeExecTaskExecutor]] |
| |
| private val execTaskToExecutor = new mutable.HashMap[String, CodeExecTaskExecutor]() |
| |
| private val defaultEngineConnManager: EngineConnManager = { |
| val builder = EngineConnManagerBuilder.builder |
| builder.setPolicy(Policy.Process) |
| builder.build() |
| } |
| private val labelEngineConnManager: EngineConnManager = { |
| val builder = EngineConnManagerBuilder.builder |
| builder.setPolicy(Policy.Label) |
| builder.build() |
| } |
| private val waitLock = new Array[Byte](0) |
| |
| override def askExecutor(execTask: CodeLogicalUnitExecTask, wait: Duration): Option[CodeExecTaskExecutor] = { |
| info(s"Start to askExecutor for execId ${execTask.getIDInfo()}, wait $wait") |
| val startTime = System.currentTimeMillis() |
| var retryException: LinkisRetryException = null |
| var executor: Option[CodeExecTaskExecutor] = None |
| while (System.currentTimeMillis - startTime < wait.toMillis && executor.isEmpty) |
| Utils.tryCatch(askExecutor(execTask)) { |
| case retry: LinkisRetryException => |
| this.warn("request engine failed!", retry) |
| retryException = retry |
| None |
| case t: Throwable => throw t |
| } match { |
| case Some(e) => |
| info(s"Finished to askExecutor for execId ${execTask.getIDInfo()}, wait ${System.currentTimeMillis() - startTime}") |
| executor = Option(e) |
| case _ => |
| if (System.currentTimeMillis - startTime < wait.toMillis) { |
| val interval = math.min(3000, wait.toMillis - System.currentTimeMillis + startTime) |
| waitForIdle(interval) |
| } |
| } |
| if (retryException != null && executor.isEmpty) throw retryException |
| executor |
| } |
| |
| override def askExecutor(execTask: CodeLogicalUnitExecTask): Option[CodeExecTaskExecutor] = { |
| debug(s"Start to askExecutor for execId ${execTask.getIDInfo()}") |
| val executor = createExecutor(execTask) |
| info(s"Finished to askExecutor for execId ${execTask.getIDInfo()}") |
| Option(executor) |
| } |
| |
| override def createExecutor(execTask: CodeLogicalUnitExecTask): CodeExecTaskExecutor = { |
| |
| val engineConnManager = getEngineConnManager(execTask.getLabels) |
| // CreateMarkReq |
| val markReq = createMarkReq(execTask) |
| |
| val executeOnceLabel = LabelUtil.getExecuteOnceLabel(execTask.getLabels) |
| val loadBalanceLabel = LabelUtil.getLoadBalanceLabel(execTask.getLabels) |
| |
| if (null != executeOnceLabel && null != loadBalanceLabel) { |
| throw new OrchestratorLabelConflictException(s"ExecuteOnceLabel : ${markReq.getLabels.get(LabelKeyConstant.EXECUTE_ONCE_KEY)} should not come with LoadBalanceLabel : ${markReq.getLabels.get(LabelKeyConstant.LOAD_BALANCE_KEY)}") |
| } |
| |
| // getMark |
| val mark: Mark = engineConnManager.applyMark(markReq) |
| markReq.setCreateService(markReq.getCreateService + s"mark_id: ${mark.getMarkId()}") |
| // getEngineConn Executor |
| info(s"create Executor for execId ${execTask.getIDInfo()} mark id is ${mark.getMarkId()}, user ${mark.getMarkReq.getUser}") |
| execTask.getPhysicalContext.pushLog(TaskLogEvent(execTask, LogUtils.generateInfo("Background is starting a new engine for you, it may take several seconds, please wait"))) |
| val engineConnExecutor = engineConnManager.getAvailableEngineConnExecutor(mark) |
| if (null == engineConnExecutor) { |
| return null |
| } |
| val codeExecTaskExecutor = new CodeExecTaskExecutor(engineConnExecutor, execTask, mark) |
| execTaskToExecutor.put(execTask.getId, codeExecTaskExecutor) |
| info(s"Finished to create Executor for execId ${execTask.getIDInfo()} mark id is ${mark.getMarkId()}, user ${mark.getMarkReq.getUser}") |
| codeExecTaskExecutor |
| } |
| |
| protected def createMarkReq(execTask: CodeLogicalUnitExecTask): MarkReq = { |
| |
| val loadBalanceLabel = LabelUtil.getLoadBalanceLabel(execTask.getLabels) |
| val markReq: MarkReq = if (null != loadBalanceLabel) { |
| new LoadBanlanceMarkReq |
| } else { |
| val defaultMarkReq = new DefaultMarkReq |
| defaultMarkReq.registerLabelKey(LabelKeyConstant.BIND_ENGINE_KEY) |
| defaultMarkReq |
| } |
| markReq.setPolicyObj(Policy.Task) |
| markReq.setCreateService(ComputationOrchestratorConf.DEFAULT_CREATE_SERVICE.getValue) |
| //markReq.setDescription |
| markReq.setEngineConnCount(ComputationOrchestratorConf.DEFAULT_MARK_MAX_ENGINE.getValue) |
| val properties = if (execTask.getParams.getStartupParams == null) new util.HashMap[String, String] |
| else { |
| val startupMap = execTask.getParams.getStartupParams.getConfigurationMap().asScala |
| val properties = new util.HashMap[String, String] |
| startupMap.foreach { case (k, v) => if (v != null && StringUtils.isNotEmpty(v.toString)) properties.put(k, v.toString) } |
| properties |
| } |
| |
| markReq.setProperties(properties) |
| markReq.setUser(execTask.getExecuteUser) |
| if (null != execTask.getLabels) { |
| markReq.setLabels(LabelUtils.labelsToMap(execTask.getLabels)) |
| } |
| markReq |
| } |
| |
| private def waitForIdle(waitTime: Long): Unit = waitLock synchronized { |
| waitLock.wait(waitTime) |
| } |
| |
| override def getByEngineConnAndTaskId(serviceInstance: ServiceInstance, engineConnTaskId: String): Option[CodeExecTaskExecutor] = { |
| val maybeExecutors = instanceToExecutors.get(serviceInstance) |
| if (maybeExecutors.isDefined) { |
| val executors = maybeExecutors.get.filter(_.getEngineConnTaskId == engineConnTaskId) |
| if (null != executors && executors.nonEmpty) { |
| return Some(executors(0)) |
| } |
| } |
| None |
| } |
| |
| override def getByExecTaskId(execTaskId: String): Option[CodeExecTaskExecutor] = { |
| execTaskToExecutor.get(execTaskId) |
| } |
| |
| override def shutdown(): Unit = { |
| |
| } |
| |
| /** |
| * The job execution process is normal. After the job is completed, you can call this method. |
| * This method will determine the bind engine label. If it is a non-end type job, no operation will be performed. |
| * @param execTask |
| * @param executor |
| */ |
| override def delete(execTask: CodeLogicalUnitExecTask, executor: CodeExecTaskExecutor): Unit = { |
| val jobGroupLabel = LabelUtil.getBindEngineLabel(execTask.getLabels) |
| var isEndJob = false |
| var jobGroupId = "" |
| if (null != jobGroupLabel) { |
| isEndJob = jobGroupLabel.getIsJobGroupEnd |
| jobGroupId = jobGroupLabel.getJobGroupId |
| if (isEndJob) { |
| debug(s"To delete codeExecTaskExecutor $executor from execTaskToExecutor for lastjob of jobGroupId : ${jobGroupId}") |
| clearExecutorById(executor, execTask.getLabels) |
| } else { |
| removeExecutorFromInstanceToExecutors(executor) |
| info(s"Subjob is not end of JobGroup with id : ${jobGroupId}, we will not delete codeExecTaskExecutor with id : ${executor} ") |
| } |
| } else { |
| debug(s"To delete codeExecTaskExecutor ${executor} from execTaskToExecutor.") |
| clearExecutorById(executor, execTask.getLabels) |
| } |
| } |
| |
| /** |
| * The method is used to clean up the executor, here will trigger the unlock of ec, |
| * but if it is with the loadBlance tag, the unlock step will be skipped |
| * @param executor |
| * @param labels |
| */ |
| private def clearExecutorById( executor: CodeExecTaskExecutor, labels: util.List[Label[_]], forceRelease: Boolean = false): Unit = { |
| if (null == executor || executor.getEngineConnExecutor == null) return |
| val loadBalanceLabel = LabelUtil.getLoadBalanceLabel(labels) |
| if (null == loadBalanceLabel || forceRelease) { |
| info(s"To release engine ConnExecutor ${executor}") |
| getEngineConnManager(labels).releaseEngineConnExecutor(executor.getEngineConnExecutor, executor.getMark) |
| } else { |
| info(s"Task has loadBalanceLabel, Not need to delete executor ${executor}") |
| } |
| removeExecutorFromInstanceToExecutors(executor) |
| } |
| |
| |
| private def removeExecutorFromInstanceToExecutors(executor: CodeExecTaskExecutor): Unit = { |
| debug(s"To delete codeExecTaskExecutor ${executor} from instanceToExecutors") |
| val maybeExecutors = instanceToExecutors.get(executor.getEngineConnExecutor.getServiceInstance) |
| if (maybeExecutors.isDefined) { |
| val executors = maybeExecutors.get.filter(_.getEngineConnTaskId != executor.getEngineConnTaskId) |
| instanceToExecutors synchronized { |
| if (null != executors && executors.nonEmpty) { |
| instanceToExecutors.put(executor.getEngineConnExecutor.getServiceInstance, executors) |
| } else { |
| instanceToExecutors.remove(executor.getEngineConnExecutor.getServiceInstance) |
| } |
| } |
| } |
| info(s"To delete exec task ${executor.getExecTask.getIDInfo()} and CodeExecTaskExecutor ${executor.getEngineConnExecutor.getServiceInstance} relation") |
| execTaskToExecutor synchronized { |
| execTaskToExecutor.remove(executor.getExecTaskId) |
| } |
| } |
| |
| override def addEngineConnTaskID(executor: CodeExecTaskExecutor): Unit = { |
| if (execTaskToExecutor.contains(executor.getExecTaskId)) { |
| val codeExecutor = new CodeExecTaskExecutor(executor.getEngineConnExecutor, executor.getExecTask, executor.getMark) |
| codeExecutor.setEngineConnTaskId(executor.getEngineConnTaskId) |
| execTaskToExecutor.put(executor.getExecTaskId, codeExecutor) |
| info(s"To add codeExecTaskExecutor $executor to instanceToExecutors") |
| val executors = instanceToExecutors.getOrElse(executor.getEngineConnExecutor.getServiceInstance, Array.empty[CodeExecTaskExecutor]) |
| instanceToExecutors synchronized { |
| instanceToExecutors.put(executor.getEngineConnExecutor.getServiceInstance, executors.+:(codeExecutor)) |
| } |
| } |
| } |
| |
| private def getEngineConnManager(labels: util.List[Label[_]]): EngineConnManager = { |
| if(null == labels || labels.isEmpty) return defaultEngineConnManager |
| if(labels.asScala.exists(_.isInstanceOf[LoadBalanceLabel])){ |
| return labelEngineConnManager |
| } |
| defaultEngineConnManager |
| } |
| |
| override def getAllInstanceToExecutorCache(): mutable.HashMap[ServiceInstance, Array[CodeExecTaskExecutor]] = instanceToExecutors |
| |
| override def getAllExecTaskToExecutorCache(): mutable.HashMap[String, CodeExecTaskExecutor] = execTaskToExecutor |
| |
| /** |
| * If the job is executed abnormally, such as execution failure, or being killed, |
| * it will go to the process for cleaning up, and the engineConn lock will be released. |
| * @param execTask |
| * @param execTaskExecutor |
| */ |
| override def unLockEngineConn(execTask: CodeLogicalUnitExecTask, execTaskExecutor: CodeExecTaskExecutor): Unit = { |
| info(s"${execTask.getIDInfo()} task be killed or failed , Now to delete executor ${execTaskExecutor.getEngineConnExecutor.getServiceInstance}") |
| clearExecutorById(execTaskExecutor, execTask.getLabels) |
| } |
| |
| /** |
| * Task failed because ec exited unexpectedly, so need to clean up ec immediately |
| * @param execTask |
| * @param executor |
| */ |
| override def markECFailed(execTask: CodeLogicalUnitExecTask, executor: CodeExecTaskExecutor): Unit = { |
| info(s"${execTask.getIDInfo()} task failed because executor exit, Now to delete executor ${executor.getEngineConnExecutor.getServiceInstance}") |
| clearExecutorById(executor, execTask.getLabels, true) |
| } |
| } |