blob: 6b2d55e9e01a220047f8b8c68a5f28912951431e [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.entrance.execute
import java.util
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import com.webank.wedatasphere.linkis.common.log.LogUtils
import com.webank.wedatasphere.linkis.common.utils.Utils
import com.webank.wedatasphere.linkis.entrance.EntranceContext
import com.webank.wedatasphere.linkis.entrance.conf.EntranceConfiguration
import com.webank.wedatasphere.linkis.entrance.event._
import com.webank.wedatasphere.linkis.entrance.exception.EntranceErrorException
import com.webank.wedatasphere.linkis.entrance.persistence.HaPersistenceTask
import com.webank.wedatasphere.linkis.governance.common.entity.job.{JobRequest, SubJobInfo}
import com.webank.wedatasphere.linkis.governance.common.entity.task.RequestPersistTask
import com.webank.wedatasphere.linkis.governance.common.paser.CodeParser
import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant
import com.webank.wedatasphere.linkis.protocol.engine.JobProgressInfo
import com.webank.wedatasphere.linkis.protocol.task.Task
import com.webank.wedatasphere.linkis.rpc.utils.RPCUtils
import com.webank.wedatasphere.linkis.scheduler.executer.{CompletedExecuteResponse, ErrorExecuteResponse, SuccessExecuteResponse}
import com.webank.wedatasphere.linkis.scheduler.queue.SchedulerEventState._
import com.webank.wedatasphere.linkis.scheduler.queue.{Job, SchedulerEventState}
import scala.beans.BeanProperty
abstract class EntranceJob extends Job {
@BeanProperty
var creator: String = _
@BeanProperty
var user: String = _
@BeanProperty
var params: util.Map[String, Any] = new util.HashMap[String, Any](1)
@BeanProperty
var jobRequest: JobRequest = _
@BeanProperty
var jobGroups: Array[SubJobInfo] = new Array[SubJobInfo](0)
@BeanProperty
var codeParser: CodeParser = _
private var entranceListenerBus: Option[EntranceEventListenerBus[EntranceEventListener, EntranceEvent]] = None
private var entranceLogListenerBus: Option[EntranceLogListenerBus[EntranceLogListener, EntranceLogEvent]] = None
private var progressInfo: Array[JobProgressInfo] = Array.empty
private val persistedResultSets = new AtomicInteger(0)
// private var resultSize = -1
private var entranceContext: EntranceContext = _
def setEntranceListenerBus(entranceListenerBus: EntranceEventListenerBus[EntranceEventListener, EntranceEvent]): Unit =
this.entranceListenerBus = Option(entranceListenerBus)
def setEntranceLogListenerBus(entranceLogListenerBus: EntranceLogListenerBus[EntranceLogListener, EntranceLogEvent]): Unit =
this.entranceLogListenerBus = Option(entranceLogListenerBus)
def getEntranceListenerBus = this.entranceListenerBus
def setProgressInfo(progressInfo: Array[JobProgressInfo]): Unit = this.progressInfo = progressInfo
def getProgressInfo: Array[JobProgressInfo] = this.progressInfo
def setEntranceContext(entranceContext: EntranceContext): Unit = this.entranceContext = entranceContext
def getEntranceContext: EntranceContext = this.entranceContext
def getRunningSubJob: SubJobInfo = {
if (null != jobGroups && jobGroups.size > 0) {
jobGroups(0)
} else {
null
}
}
def setResultSize(resultSize: Int): Unit = {
// this.resultSize = resultSize
if (resultSize >= 0) {
persistedResultSets.set(resultSize)
}
}
def addAndGetResultSize(resultSize: Int): Int = {
info(s"Job ${getJobRequest.getId} resultsize from ${persistedResultSets.get()} add ${resultSize}")
if (resultSize > 0) {
persistedResultSets.addAndGet(resultSize)
} else {
persistedResultSets.get()
}
}
@Deprecated
def incrementResultSetPersisted(): Unit = {
// persistedResultSets.incrementAndGet()
}
protected def isWaitForPersistedTimeout(startWaitForPersistedTime: Long): Boolean =
System.currentTimeMillis - startWaitForPersistedTime >= EntranceConfiguration.JOB_MAX_PERSIST_WAIT_TIME.getValue.toLong
override def beforeStateChanged(fromState: SchedulerEventState, toState: SchedulerEventState): Unit = {
// if (SchedulerEventState.isCompleted(toState) && (resultSize < 0 || persistedResultSets.get() < resultSize)) {
/*val startWaitForPersistedTime = System.currentTimeMillis
persistedResultSets synchronized {
while ((resultSize < 0 || persistedResultSets.get() < resultSize) && getErrorResponse == null && !isWaitForPersistedTimeout(startWaitForPersistedTime))
persistedResultSets.wait(3000)
}
if (isWaitForPersistedTimeout(startWaitForPersistedTime)) onFailure("persist resultSets timeout!", new EntranceErrorException(20305, "persist resultSets timeout!"))
if (isSucceed && getErrorResponse != null) {
val _toState = if (getErrorResponse.t == null) Cancelled else Failed
transition(_toState)
return
}*/
// }
super.beforeStateChanged(fromState, toState)
}
override def afterStateChanged(fromState: SchedulerEventState, toState: SchedulerEventState): Unit = {
/*if (SchedulerEventState.isRunning(toState)) {
def setEngineInstance(task: Task): Unit = task match {
case requestTask: RequestPersistTask => getExecutor match {
case engine: EntranceExecutor => requestTask.setEngineInstance(engine.getInstance.getInstance)
case _ =>
}
case haTask: HaPersistenceTask => setEngineInstance(haTask.task)
case _ =>
}
setEngineInstance(jobRequest)
}*/
getJobRequest.setStatus(toState.toString)
super.afterStateChanged(fromState, toState)
toState match {
case Scheduled =>
//Entrance指标:任务排队结束时间
if(getJobRequest.getMetrics == null){
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateWarn("Job Metrics has not been initialized.")))
}else{
if(getJobRequest.getMetrics.containsKey(TaskConstant.ENTRANCEJOB_SCHEDULE_TIME)){
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateWarn("Your job has already been scheduled before.")))
}else{
getJobRequest.getMetrics.put(TaskConstant.ENTRANCEJOB_SCHEDULE_TIME, new Date(System.currentTimeMillis))
}
}
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo("Your job is Scheduled. Please wait it to run.")))
case WaitForRetry =>
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo("Your job is turn to retry. Please wait it to schedule.")))
case Running =>
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo("Your job is Running now. Please wait it to complete.")))
//TODO job start event
case _ if SchedulerEventState.isCompleted(toState) =>
endTime = System.currentTimeMillis()
//Entrance指标,任务完成时间
getJobRequest.getMetrics.put(TaskConstant.ENTRANCEJOB_COMPLETE_TIME, new Date(System.currentTimeMillis()))
if (getJobInfo != null) getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo(getJobInfo.getMetric)))
if (isSucceed)
getLogListener.foreach(_.onLogUpdate(this,
LogUtils.generateInfo("Congratulations. Your job completed with status Success.")))
else getLogListener.foreach(_.onLogUpdate(this,
LogUtils.generateInfo(s"Sorry. Your job completed with a status $toState. You can view logs for the reason.")))
this.setProgress(EntranceJob.JOB_COMPLETED_PROGRESS)
entranceListenerBus.foreach(_.post(EntranceProgressEvent(this, EntranceJob.JOB_COMPLETED_PROGRESS, this.getProgressInfo)))
this.getProgressListener.foreach(listener => listener.onProgressUpdate(this, EntranceJob.JOB_COMPLETED_PROGRESS, Array[JobProgressInfo]()))
getEntranceContext.getOrCreatePersistenceManager().createPersistenceEngine().updateIfNeeded(getJobRequest)
case _ =>
}
entranceListenerBus.foreach(_.post(EntranceJobEvent(this.getId)))
}
override def onFailure(errorMsg: String, t: Throwable): Unit = {
getJobRequest.setStatus(SchedulerEventState.Failed.toString)
getJobRequest.setErrorDesc(errorMsg)
this.entranceLogListenerBus.foreach(_.post(
EntrancePushLogEvent(this, LogUtils.generateERROR(s"Sorry, your job executed failed with reason: $errorMsg"))))
super.onFailure(errorMsg, t)
}
override protected def transitionCompleted(executeCompleted: CompletedExecuteResponse): Unit = {
executeCompleted match {
case error: ErrorExecuteResponse => // todo checkif RPCUtils.isReceiverNotExists(error.t) =>
entranceListenerBus.foreach(_.post(MissingEngineNotifyEvent(this, error.t, getExecutor)))
getJobRequest.setStatus(SchedulerEventState.Failed.toString)
case _ : SuccessExecuteResponse =>
getJobRequest.setStatus(SchedulerEventState.Succeed.toString)
getJobRequest.setErrorCode(0)
getJobRequest.setErrorDesc(null)
case _ =>
}
Utils.tryAndErrorMsg(clearInstanceInfo())("Failed to clear executor")
super.transitionCompleted(executeCompleted)
}
private def clearInstanceInfo(): Unit = {
val executorManager = entranceContext.getOrCreateScheduler().getSchedulerContext.getOrCreateExecutorManager
executorManager.delete(getExecutor)
}
def transitionCompleted(executeCompleted: CompletedExecuteResponse, reason: String): Unit = {
info("Job directly completed with reason: " + reason)
transitionCompleted(executeCompleted)
}
override protected def isJobShouldRetry(errorExecuteResponse: ErrorExecuteResponse): Boolean = isJobSupportRetry && errorExecuteResponse != null &&
(if (RPCUtils.isReceiverNotExists(errorExecuteResponse.t)) {
getExecutor match {
case e: EntranceExecutor =>
// val instance = e.getInstance.getInstance
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateSystemWarn(s"Since the submitted engine rejects the connection, the system will automatically retry and exclude the engine.(由于提交的引擎拒绝连接,系统将自动进行重试,并排除引擎.)")))
case _ =>
}
true
} else super.isJobShouldRetry(errorExecuteResponse))
def operation[T](operate: EntranceExecutor => T ): T = {
this.getExecutor match {
case entranceExecutor: EntranceExecutor =>
operate(entranceExecutor)
case _ => throw new EntranceErrorException(10000, "Unsupported operation (不支持的操作)")
}
}
}
object EntranceJob {
def JOB_COMPLETED_PROGRESS = 1.0f
}