blob: 9b48efde53e86ccf986ca123ce69c9d084e981d5 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.orchestrator.strategy.async
import com.webank.wedatasphere.linkis.common.utils.Logging
import com.webank.wedatasphere.linkis.governance.common.entity.ExecutionNodeStatus
import com.webank.wedatasphere.linkis.orchestrator.conf.OrchestratorConfiguration
import com.webank.wedatasphere.linkis.orchestrator.core.ResultSet
import com.webank.wedatasphere.linkis.orchestrator.exception.OrchestratorErrorCodeSummary
import com.webank.wedatasphere.linkis.orchestrator.execution.impl.{DefaultFailedTaskResponse, DefaultResultSetTaskResponse}
import com.webank.wedatasphere.linkis.orchestrator.execution.{ArrayResultSetTaskResponse, _}
import com.webank.wedatasphere.linkis.orchestrator.listener.execution.ExecTaskRunnerCompletedEvent
import com.webank.wedatasphere.linkis.orchestrator.listener.{OrchestratorListenerBusContext, OrchestratorSyncListenerBus}
import com.webank.wedatasphere.linkis.orchestrator.plans.physical.ExecTask
import com.webank.wedatasphere.linkis.orchestrator.strategy.{ExecTaskStatusInfo, ResultSetExecTask, StatusInfoExecTask}
import scala.collection.mutable.ArrayBuffer
/**
*
*
*/
class AsyncExecTaskRunnerImpl(override val task: ExecTask) extends AsyncExecTaskRunner with Logging {
private var status: ExecutionNodeStatus = ExecutionNodeStatus.Inited
private var taskResponse: TaskResponse = _
private val resultSets = new ArrayBuffer[ResultSet]()
//private val syncListenerBus: OrchestratorSyncListenerBus = OrchestratorListenerBusContext.getListenerBusContext().getOrchestratorSyncListenerBus
private var resultSize: Int = -1
override def getTaskResponse: TaskResponse = taskResponse
override def isCompleted: Boolean = {
ExecutionNodeStatus.isCompleted(status)
}
override def isRunning: Boolean = ExecutionNodeStatus.isRunning(status)
override def isSucceed: Boolean = ExecutionNodeStatus.isScheduled(status)
override def run(): Unit = try {
info(s"ExecTaskRunner Submit execTask(${task.getIDInfo}) to running")
val response = task.execute()
this.taskResponse = response
response match {
case async: AsyncTaskResponse =>
transientStatus(ExecutionNodeStatus.Running)
case succeed: SucceedTaskResponse =>
info(s"Succeed to execute ExecTask(${task.getIDInfo})")
transientStatus(ExecutionNodeStatus.Succeed)
case failedTaskResponse: FailedTaskResponse =>
info(s"Failed to execute ExecTask(${task.getIDInfo})")
transientStatus(ExecutionNodeStatus.Failed)
case retry: RetryTaskResponse =>
warn(s"ExecTask(${task.getIDInfo}) need to retry")
transientStatus(ExecutionNodeStatus.WaitForRetry)
}
} catch {
case e: Throwable =>
error(s"Failed to execute task ${task.getIDInfo}", e)
this.taskResponse = new DefaultFailedTaskResponse(e.getMessage, OrchestratorErrorCodeSummary.EXECUTION_ERROR_CODE, e)
transientStatus(ExecutionNodeStatus.Failed)
}
override def transientStatus(status: ExecutionNodeStatus): Unit = {
if (status.ordinal() < this.status.ordinal() && status != ExecutionNodeStatus.WaitForRetry) {
info(s"Task${task.getIDInfo()} status flip error! Cause: Failed to flip from ${this.status} to $status.")
return
}
//throw new OrchestratorErrorException(OrchestratorErrorCodeSummary.EXECUTION_FOR_EXECUTION_ERROR_CODE, s"Task status flip error! Cause: Failed to flip from ${this.status} to $status.") //抛异常
info(s"${task.getIDInfo} change status ${this.status} => $status.")
beforeStatusChanged(this.status, status)
val oldStatus = this.status
this.status = status
afterStatusChanged(oldStatus, status)
}
def afterStatusChanged(fromStatus: ExecutionNodeStatus, toStatus: ExecutionNodeStatus): Unit = {
// result put to physicalContext
if (ExecutionNodeStatus.isSucceed(toStatus)) {
task match {
case resultSetExecTask: ResultSetExecTask =>
if (resultSets.nonEmpty) {
this.taskResponse = new DefaultResultSetTaskResponse(resultSets.toArray)
}
this.taskResponse match {
case resultSetTaskResponse: ArrayResultSetTaskResponse =>
resultSetExecTask.addResultSet(resultSetTaskResponse)
case _ =>
}
case _ =>
}
}
// status put to physicalContext
if (ExecutionNodeStatus.isCompleted(toStatus)) {
task match {
case statusExecTask: StatusInfoExecTask =>
statusExecTask.addExecTaskStatusInfo(ExecTaskStatusInfo(toStatus, this.taskResponse))
case _ =>
}
task match {
case asyncExecTask: AsyncExecTask =>
asyncExecTask.clear(ExecutionNodeStatus.isSucceed(toStatus))
case _ =>
}
//to notify taskManager clear running tasks
task.getPhysicalContext.broadcastSyncEvent(ExecTaskRunnerCompletedEvent(this))
}
}
/**
*
* @param fromStatus
* @param toStatus
*/
def beforeStatusChanged(fromStatus: ExecutionNodeStatus, toStatus: ExecutionNodeStatus): Unit = {
task match {
case asyncExecTask: AsyncExecTask =>
if (ExecutionNodeStatus.isSucceed(toStatus) && ( resultSize < 0 || resultSets.size < resultSize)) {
val startWaitForPersistedTime = System.currentTimeMillis
resultSets synchronized {
while (( resultSize < 0 || resultSets.size < resultSize) && !isWaitForPersistedTimeout(startWaitForPersistedTime))
resultSets.wait(1000)
}
// if (isWaitForPersistedTimeout(startWaitForPersistedTime)) onFailure("persist resultSets timeout!", new EntranceErrorException(20305, "persist resultSets timeout!"))
}
case _ =>
}
}
protected def isWaitForPersistedTimeout(startWaitForPersistedTime: Long): Boolean =
System.currentTimeMillis - startWaitForPersistedTime >= OrchestratorConfiguration.TASK_MAX_PERSIST_WAIT_TIME.getValue.toLong
override def interrupt(): Unit = {
markFailed("Job be cancelled", null)
task match {
case asyncExecTask: AsyncExecTask =>
asyncExecTask.kill()
case _ =>
}
transientStatus(ExecutionNodeStatus.Cancelled)
}
override def setResultSize(resultSize: Int): Unit = {
info(s"BaseExecTaskRunner ${task.getIDInfo()} get result size is $resultSize")
if (this.resultSize == -1) this.resultSize = resultSize
resultSets.notify()
}
override def addResultSet(resultSet: ResultSet): Unit = {
info(s"BaseExecTaskRunner ${task.getIDInfo()} get result, now size is ${resultSets.size}")
resultSets += resultSet
resultSets.notify()
}
override def markFailed(errorMsg: String, cause: Throwable): Unit = {
this.taskResponse = new DefaultFailedTaskResponse(errorMsg, OrchestratorErrorCodeSummary.EXECUTION_ERROR_CODE, cause)
}
}