blob: a854e68aa0fdbd72a794301f551b8d70b3f7f284 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.orchestrator.plans.physical
import java.util
import com.webank.wedatasphere.linkis.common.listener.Event
import com.webank.wedatasphere.linkis.common.log.LogUtils
import com.webank.wedatasphere.linkis.governance.common.entity.ExecutionNodeStatus
import com.webank.wedatasphere.linkis.orchestrator.exception.OrchestratorErrorCodeSummary
import com.webank.wedatasphere.linkis.orchestrator.execution.impl.DefaultFailedTaskResponse
import com.webank.wedatasphere.linkis.orchestrator.execution.{CompletedTaskResponse, SucceedTaskResponse, TaskResponse}
import com.webank.wedatasphere.linkis.orchestrator.listener._
import com.webank.wedatasphere.linkis.orchestrator.listener.task.{RootTaskResponseEvent, TaskLogEvent, TaskProgressEvent}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
*
*
*/
class PhysicalContextImpl(private var rootTask: ExecTask,private var leafTasks: Array[ExecTask]) extends PhysicalContext {
private var syncListenerBus: OrchestratorSyncListenerBus = _ //Orchestartor.getOrchestratorSyncListenerBus
private var asyncListenerBus: OrchestratorAsyncListenerBus = _ //OrchestratorListenerBusContext.getListenerBusContext().getOrchestratorAsyncListenerBus
private var executionNodeStatus: ExecutionNodeStatus = ExecutionNodeStatus.Inited
private var response: TaskResponse = _
private val context: java.util.Map[String, Any] = new util.concurrent.ConcurrentHashMap[String, Any]()
private var rootPhysicalContext: PhysicalContext = _
def this(rootPhysicalContext: PhysicalContext) = {
this(rootPhysicalContext.getRootTask, rootPhysicalContext.getLeafTasks)
this.rootPhysicalContext = rootPhysicalContext
}
override def isCompleted: Boolean = ExecutionNodeStatus.isCompleted(executionNodeStatus)
override def markFailed(errorMsg: String, cause: Throwable): Unit = {
this.executionNodeStatus = ExecutionNodeStatus.Failed
val failedResponse = new DefaultFailedTaskResponse(errorMsg, OrchestratorErrorCodeSummary.EXECUTION_ERROR_CODE, cause)
this.response = failedResponse
val event = TaskLogEvent(getRootTask, LogUtils.generateERROR(errorMsg))
pushLog(event)
syncListenerBus.postToAll(RootTaskResponseEvent(getRootTask, failedResponse))
}
override def markSucceed(response: TaskResponse): Unit = {
this.executionNodeStatus = ExecutionNodeStatus.Succeed
this.response = response
response match {
case completedResponse: CompletedTaskResponse =>
syncListenerBus.postToAll(RootTaskResponseEvent(getRootTask, completedResponse))
case _ =>
syncListenerBus.postToAll(RootTaskResponseEvent(getRootTask, new SucceedTaskResponse {}))
}
}
override def broadcastAsyncEvent(event: Event): Unit = event match {
case orchestratorAsyncEvent: OrchestratorAsyncEvent =>
asyncListenerBus.post(orchestratorAsyncEvent)
case _ =>
}
override def broadcastToAll(event: Event): Unit = {
broadcastSyncEvent(event)
}
override def broadcastSyncEvent(event: Event): Unit = {
event match {
case orchestratorSyncEvent: OrchestratorSyncEvent =>
syncListenerBus.postToAll(orchestratorSyncEvent)
case _ =>
}
}
/**
* Check if the executive task belongs to the physical tree
* @param execTask executive task
* @return
*/
override def belongsTo(execTask: ExecTask): Boolean = {
Option(rootTask) match {
case Some(task) =>
val branches = ListBuffer[String]()
val traversableQueue = new mutable.Queue[ExecTask]()
traversableQueue.enqueue(task)
while(traversableQueue.nonEmpty){
val nodeTask = traversableQueue.dequeue()
if(nodeTask.theSame(execTask)){
return true
}
val parent = Option(nodeTask.getParents).getOrElse(Array[ExecTask]())
val branch = !branches.contains(nodeTask.getId)
if (parent.length < 1 || (parent.length > 1 && branch)) {
Option(nodeTask.getChildren).getOrElse(Array[ExecTask]())
.foreach(traversableQueue.enqueue(_))
if(branch){branches += nodeTask.getId}
}
/* parent.length match{
case v if v <= 1 || (v > 1 && branch ) =>
Option(nodeTask.getChildren).getOrElse(Array[ExecTask]())
.foreach(traversableQueue.enqueue(_))
if(branch){branches += nodeTask.getId}
}*/
}
false
case None => false
}
}
override def getRootTask: ExecTask = {
if(Option(rootTask).isEmpty && Option(rootPhysicalContext).isDefined){
rootPhysicalContext.getRootTask
}else{
rootTask
}
}
override def getLeafTasks: Array[ExecTask] = {
if(Option(leafTasks).isEmpty && Option(rootPhysicalContext).isDefined){
rootPhysicalContext.getLeafTasks
}else{
leafTasks
}
}
override def get(key: String): Any = {
context.get(key)
}
override def getOption(key: String): Option[Any] = {
Some(context.get(key))
}
override def orElse(key: String, defaultValue: Any): Option[Any] = {
Some(getOrElse(key, defaultValue))
}
override def getOrElse(key: String, defaultValue: Any): Any = {
context.getOrDefault(key, defaultValue)
}
override def orElsePut(key: String, defaultValue: Any): Option[Any] = {
Some(getOrElsePut(key, defaultValue))
}
override def getOrElsePut(key: String, defaultValue: Any): Any = synchronized {
if (exists(key)) {
context.get(key)
} else {
context.put(key, defaultValue)
defaultValue
}
}
override def exists(key: String): Boolean = {
context.containsKey(key)
}
override def set(key: String, value: Any): Unit = {
context.put(key, value)
}
override def pushLog(taskLogEvent: TaskLogEvent): Unit = {
broadcastAsyncEvent(taskLogEvent)
}
override def pushProgress(taskProgressEvent: TaskProgressEvent): Unit = {
broadcastAsyncEvent(taskProgressEvent)
}
def setAsyncBus(asyncListenerBus: OrchestratorAsyncListenerBus): Unit = {
this.asyncListenerBus = asyncListenerBus
}
def setSyncBus(syncListenerBus: OrchestratorSyncListenerBus): Unit = {
this.syncListenerBus = syncListenerBus
}
}