blob: 45ae26e017649a983023ac2a20cb79cf5091f16e [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.entrance.execute
import java.util
import java.util.concurrent.atomic.AtomicInteger
import com.webank.wedatasphere.linkis.common.log.LogUtils
import com.webank.wedatasphere.linkis.entrance.EntranceContext
import com.webank.wedatasphere.linkis.entrance.conf.EntranceConfiguration
import com.webank.wedatasphere.linkis.entrance.event._
import com.webank.wedatasphere.linkis.entrance.exception.EntranceErrorException
import com.webank.wedatasphere.linkis.entrance.persistence.HaPersistenceTask
import com.webank.wedatasphere.linkis.protocol.engine.JobProgressInfo
import com.webank.wedatasphere.linkis.protocol.query.RequestPersistTask
import com.webank.wedatasphere.linkis.protocol.task.Task
import com.webank.wedatasphere.linkis.protocol.utils.TaskUtils
import com.webank.wedatasphere.linkis.rpc.utils.RPCUtils
import com.webank.wedatasphere.linkis.scheduler.executer.{CompletedExecuteResponse, ErrorExecuteResponse}
import com.webank.wedatasphere.linkis.scheduler.queue.SchedulerEventState._
import com.webank.wedatasphere.linkis.scheduler.queue.{LockJob, SchedulerEventState}
/**
* Created by enjoyyin on 2018/9/26.
*/
abstract class EntranceJob extends LockJob {
private var creator: String = _
private var user: String = _
private var params: util.Map[String, Any] = new util.HashMap[String, Any](1)
private var task:Task = _
//entranceListenerBus may not exist(entranceListenerBus可能不存在)
private var entranceListenerBus: Option[EntranceEventListenerBus[EntranceEventListener, EntranceEvent]] = None
private var progressInfo:Array[JobProgressInfo] = Array.empty
private val persistedResultSets = new AtomicInteger(0)
private var resultSize = -1
private var entranceContext:EntranceContext = _
def getTask:Task = task
def setTask(task:Task):Unit = this.task = task
def setCreator(creator: String): Unit = this.creator = creator
def getCreator:String = creator
def setUser(user: String):Unit = this.user = user
def getUser:String = user
def setParams(params: util.Map[String, Any]):Unit = this.params = params
def getParams:util.Map[String, Any] = params
def setEntranceListenerBus(entranceListenerBus: EntranceEventListenerBus[EntranceEventListener, EntranceEvent]):Unit =
this.entranceListenerBus = Option(entranceListenerBus)
def getEntranceListenerBus = this.entranceListenerBus
def setProgressInfo(progressInfo:Array[JobProgressInfo]):Unit = this.progressInfo = progressInfo
def getProgressInfo:Array[JobProgressInfo] = this.progressInfo
def setEntranceContext(entranceContext: EntranceContext):Unit = this.entranceContext = entranceContext
def getEntraceCotnext:EntranceContext = this.entranceContext
def setResultSize(resultSize: Int): Unit = {
this.resultSize = resultSize
persistedResultSets synchronized persistedResultSets.notify()
}
def incrementResultSetPersisted(): Unit = {
persistedResultSets.incrementAndGet()
persistedResultSets synchronized persistedResultSets.notify()
}
protected def isWaitForPersistedTimeout(startWaitForPersistedTime: Long): Boolean =
System.currentTimeMillis - startWaitForPersistedTime >= EntranceConfiguration.JOB_MAX_PERSIST_WAIT_TIME.getValue.toLong
override def afterStateChanged(fromState: SchedulerEventState, toState: SchedulerEventState): Unit = {
if(SchedulerEventState.isCompleted(toState) && (resultSize < 0 || persistedResultSets.get() < resultSize)) {
val startWaitForPersistedTime = System.currentTimeMillis
persistedResultSets synchronized {
while((resultSize < 0 || persistedResultSets.get() < resultSize) && getErrorResponse == null && !isWaitForPersistedTimeout(startWaitForPersistedTime))
persistedResultSets.wait(3000)
}
if(isWaitForPersistedTimeout(startWaitForPersistedTime)) onFailure("persist resultSets timeout!", new EntranceErrorException(20305, "persist resultSets timeout!"))
if(isSucceed && getErrorResponse != null){
val _toState = if(getErrorResponse.t == null) Cancelled else Failed
transition(_toState)
return
}
}
if(SchedulerEventState.isRunning(toState)) {
def setEngineInstance(task: Task): Unit = task match {
case requestTask: RequestPersistTask => getExecutor match {
case engine: EntranceEngine => requestTask.setEngineInstance(engine.getModuleInstance.getInstance)
case _ =>
}
case haTask: HaPersistenceTask => setEngineInstance(haTask.task)
case _ =>
}
setEngineInstance(task)
}
super.afterStateChanged(fromState, toState)
toState match {
case Scheduled =>
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo( "Your job is Scheduled. Please wait it to run.")))
case WaitForRetry =>
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo( "Your job is turn to retry. Please wait it to schedule.")))
case Running =>
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo( "Your job is Running now. Please wait it to complete.")))
//TODO job start event
case _ if SchedulerEventState.isCompleted(toState) =>
endTime = System.currentTimeMillis()
if(getJobInfo != null) getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo(getJobInfo.getMetric)))
if(isSucceed)
getLogListener.foreach(_.onLogUpdate(this,
LogUtils.generateInfo( "Congratulations. Your job completed with status Success.")))
else getLogListener.foreach(_.onLogUpdate(this,
LogUtils.generateInfo( s"Sorry. Your job completed with a status $toState. You can view logs for the reason.")))
this.setProgress(1.0f)
entranceListenerBus.foreach(_.post(EntranceProgressEvent(this, 1.0f, this.getProgressInfo)))
this.getProgressListener.foreach(listener => listener.onProgressUpdate(this, 1.0f, Array[JobProgressInfo]()))
case _ =>
}
entranceListenerBus.foreach(_.post(EntranceJobEvent(this.getId)))
}
override def onFailure(errorMsg: String, t: Throwable): Unit = {
this.entranceListenerBus.foreach(_.post(
EntranceLogEvent(this, LogUtils.generateERROR(s"Sorry, your job executed failed with reason: $errorMsg"))))
super.onFailure(errorMsg, t)
}
override protected def transitionCompleted(executeCompleted: CompletedExecuteResponse): Unit = {
executeCompleted match {
case error: ErrorExecuteResponse if RPCUtils.isReceiverNotExists(error.t) =>
entranceListenerBus.foreach(_.post(MissingEngineNotifyEvent(this, error.t, getExecutor)))
case _ =>
}
super.transitionCompleted(executeCompleted)
}
def transitionCompleted(executeCompleted: CompletedExecuteResponse, reason: String): Unit = {
info("Job directly completed with reason: " + reason)
transitionCompleted(executeCompleted)
}
override protected def isJobShouldRetry(errorExecuteResponse: ErrorExecuteResponse): Boolean = isJobSupportRetry && errorExecuteResponse != null &&
(if(RPCUtils.isReceiverNotExists(errorExecuteResponse.t)) {
getExecutor match {
case e: EntranceEngine =>
val instance = e.getModuleInstance.getInstance
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateSystemWarn(s"Since the submitted engine rejects the connection, the system will automatically retry and exclude the engine $instance.(由于提交的引擎拒绝连接,系统将自动进行重试,并排除引擎 $instance.)")))
val specialMap = TaskUtils.getSpecialMap(getParams)
if(specialMap.isEmpty) {
TaskUtils.addSpecialMap(getParams, specialMap)
specialMap.put(ExceptInstanceEntranceExecutorRuler.EXCEPT_INSTANCES, instance)
} else if(specialMap.containsKey(ExceptInstanceEntranceExecutorRuler.EXCEPT_INSTANCES)) {
val instances = ExceptInstanceEntranceExecutorRuler.deserializable(specialMap.get(ExceptInstanceEntranceExecutorRuler.EXCEPT_INSTANCES).toString) :+ instance
specialMap.put(ExceptInstanceEntranceExecutorRuler.EXCEPT_INSTANCES, ExceptInstanceEntranceExecutorRuler.serializable(instances))
} else specialMap.put(ExceptInstanceEntranceExecutorRuler.EXCEPT_INSTANCES, instance)
case _ =>
}
true
} else super.isJobShouldRetry(errorExecuteResponse))
}