blob: 1a2ca14a181239710aee28d71412bad41015c05d [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.orchestrator.execution.impl
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.governance.common.entity.ExecutionNodeStatus
import com.webank.wedatasphere.linkis.orchestrator.exception.{OrchestratorErrorCodeSummary, OrchestratorErrorException}
import com.webank.wedatasphere.linkis.orchestrator.execution.AsyncTaskResponse.NotifyListener
import com.webank.wedatasphere.linkis.orchestrator.execution.{CompletedTaskResponse, ExecutionTask, SucceedTaskResponse, TaskResponse}
import com.webank.wedatasphere.linkis.orchestrator.plans.physical.ExecTask
import com.webank.wedatasphere.linkis.orchestrator.utils.OrchestratorIDCreator
import scala.collection.mutable.ArrayBuffer
/**
*
*
*/
class BaseExecutionTask(maxParallelism: Int, rootExecTask: ExecTask) extends ExecutionTask with Logging{
private val waitLock = new Array[Byte](0)
private val listeners = ArrayBuffer[NotifyListener]()
private var status: ExecutionNodeStatus = ExecutionNodeStatus.Inited
private var response: TaskResponse = _
private var id: String = _
override def getMaxParallelism: Int = maxParallelism
override def waitForCompleted(): Unit = {
if(ExecutionNodeStatus.isCompleted(getStatus)) return
waitLock synchronized {
while(!ExecutionNodeStatus.isCompleted(getStatus)) waitLock.wait()
}
}
override def getStatus: ExecutionNodeStatus = status
override def notifyMe(listener: NotifyListener): Unit = {
listeners += listener
}
override def getRootExecTask: ExecTask = rootExecTask
override def transientStatus(status: ExecutionNodeStatus): Unit = {
if(status.ordinal() < this.status.ordinal() && status != ExecutionNodeStatus.WaitForRetry)
throw new OrchestratorErrorException(OrchestratorErrorCodeSummary.EXECUTION_FOR_EXECUTION_ERROR_CODE, s"Task status flip error! Cause: Failed to flip from ${this.status} to $status.")//抛异常
info(s"$getId change status ${this.status} => $status.")
beforeStatusChanged(this.status, status)
val oldStatus = this.status
this.status = status
afterStatusChanged(oldStatus, status)
}
// status 完成后返回执行响应
def afterStatusChanged(fromStatus: ExecutionNodeStatus, toStatus: ExecutionNodeStatus): Unit = {
if(ExecutionNodeStatus.isCompleted(toStatus)) {
Utils.tryAndWarn(listeners.foreach(listener => listener(getResponse)))
waitLock synchronized waitLock.notify()
}
}
override def getResponse: TaskResponse = this.response
def beforeStatusChanged(fromStatus: ExecutionNodeStatus, toStatus: ExecutionNodeStatus):Unit = {}
override def getId: String = {
if (null == id) synchronized {
if (null == id) {
id = OrchestratorIDCreator.getExecutionIDCreator.nextID("execution")
}
}
id
}
override def equals(obj: Any): Boolean = {
if (null != obj && obj.isInstanceOf[ExecutionTask]) {
obj.asInstanceOf[ExecutionTask].getId == this.getId
} else {
false
}
}
override def hashCode(): Int = getId.hashCode
override def markCompleted(taskCompletedTaskResponse: CompletedTaskResponse): Unit = {
this.response = taskCompletedTaskResponse
taskCompletedTaskResponse match {
case failedTaskResponse: DefaultFailedTaskResponse =>
transientStatus(ExecutionNodeStatus.Failed)
case succeedTaskResponse: SucceedTaskResponse =>
transientStatus(ExecutionNodeStatus.Succeed)
}
info(s"Finished to ExecutionTask(${getId}) with status $getStatus")
}
}