blob: aadd20c13c54a586e76bd63b965e5d74fa7afbb1 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.orchestrator.strategy.async
import com.webank.wedatasphere.linkis.governance.common.entity.ExecutionNodeStatus
import com.webank.wedatasphere.linkis.orchestrator.execution.ExecTaskRunner
import com.webank.wedatasphere.linkis.orchestrator.execution.impl.DefaultTaskManager
import com.webank.wedatasphere.linkis.orchestrator.listener.OrchestratorSyncEvent
import com.webank.wedatasphere.linkis.orchestrator.listener.execution.ExecTaskRunnerCompletedEvent
import com.webank.wedatasphere.linkis.orchestrator.listener.task._
import com.webank.wedatasphere.linkis.orchestrator.plans.physical.ExecTask
/**
*
*
*/
class AsyncTaskManager extends DefaultTaskManager with TaskStatusListener with TaskResultSetListener {
override def onSyncEvent(event: OrchestratorSyncEvent): Unit = {
super.onSyncEvent(event)
event match {
case taskStatusEvent: TaskStatusEvent =>
onStatusUpdate(taskStatusEvent)
case taskResultSetSizeEvent: TaskResultSetSizeEvent =>
onResultSizeCreated(taskResultSetSizeEvent)
case taskResultSetEvent: TaskResultSetEvent =>
onResultSetCreate(taskResultSetEvent)
case taskErrorResponseEvent: TaskErrorResponseEvent =>
onTaskErrorResponseEvent(taskErrorResponseEvent)
case ExecTaskRunnerCompletedEvent(execTaskRunner) =>
addCompletedTask(execTaskRunner)
case _ =>
}
}
override def onResultSetCreate(taskResultSetEvent: TaskResultSetEvent): Unit = {
info(s"received taskResultSetEvent ${taskResultSetEvent.execTask.getId}")
val execTask = taskResultSetEvent.execTask
val rootExecTask = execTask.getPhysicalContext.getRootTask
val runners = getRunningTask(rootExecTask)
runners.find(_.task.getId.equals(execTask.getId)).foreach {
case asyncExecTaskRunner: AsyncExecTaskRunner =>
asyncExecTaskRunner.addResultSet(taskResultSetEvent.resultSet)
case _ =>
}
}
override def onResultSizeCreated(taskResultSetSizeEvent: TaskResultSetSizeEvent): Unit = {
info(s"received taskResultSetSizeEvent $taskResultSetSizeEvent")
val execTask = taskResultSetSizeEvent.execTask
val rootExecTask = execTask.getPhysicalContext.getRootTask
val runners = getRunningTask(rootExecTask)
runners.find(_.task.getId.equals(execTask.getId)).foreach {
case asyncExecTaskRunner: AsyncExecTaskRunner =>
asyncExecTaskRunner.setResultSize(taskResultSetSizeEvent.resultSize)
case _ =>
}
}
override def onTaskErrorResponseEvent(taskErrorResponseEvent: TaskErrorResponseEvent): Unit = {
info(s"received taskErrorResponseEvent $taskErrorResponseEvent")
val execTask = taskErrorResponseEvent.execTask
val rootExecTask = execTask.getPhysicalContext.getRootTask
val runners = getRunningTask(rootExecTask)
runners.find(_.task.getId.equals(execTask.getId)).foreach {
case asyncExecTaskRunner: AsyncExecTaskRunner =>
asyncExecTaskRunner.markFailed(taskErrorResponseEvent.errorMsg, null)
case _ =>
}
}
override def onStatusUpdate(taskStatusEvent: TaskStatusEvent): Unit = {
info(s"received taskStatusEvent $taskStatusEvent")
if (ExecutionNodeStatus.isCompleted(taskStatusEvent.status)) {
val execTask = taskStatusEvent.execTask
val rootExecTask = execTask.getPhysicalContext.getRootTask
val runners = getRunningTask(rootExecTask)
runners.find(_.task.getId.equals(execTask.getId)).foreach { runner =>
info(s"Task(${execTask.getIDInfo}) is completed, status ${taskStatusEvent.status}")
//To transient taskRunner status
runner.transientStatus(taskStatusEvent.status)
//addCompletedTask(runner)
}
}
}
override protected def execTaskToTaskRunner(execTask: ExecTask): ExecTaskRunner = {
new AsyncExecTaskRunnerImpl(execTask)
}
}