blob: f85c2b9e0ffefde26d1786d2d0b7ad5ab5728ba9 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.entrance.execute
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.entrance.job.EntranceExecuteRequest
import com.webank.wedatasphere.linkis.governance.common.entity.ExecutionNodeStatus._
import com.webank.wedatasphere.linkis.governance.common.protocol.task.{RequestTask, ResponseTaskStatus}
import com.webank.wedatasphere.linkis.orchestrator.Orchestration
import com.webank.wedatasphere.linkis.orchestrator.computation.operation.log.LogProcessor
import com.webank.wedatasphere.linkis.orchestrator.computation.operation.progress.ProgressProcessor
import com.webank.wedatasphere.linkis.orchestrator.core.OrchestrationFuture
import com.webank.wedatasphere.linkis.protocol.UserWithCreator
import com.webank.wedatasphere.linkis.scheduler.executer.ExecutorState.ExecutorState
import com.webank.wedatasphere.linkis.scheduler.executer._
import org.apache.hadoop.fs.Options.CreateOpts.Progress
import scala.collection.mutable.ArrayBuffer
abstract class EntranceExecutor(val id: Long, val mark: MarkReq) extends Executor with Logging {
private implicit var userWithCreator: UserWithCreator = _
protected val engineReturns = ArrayBuffer[EngineExecuteAsynReturn]()
protected var interceptors: Array[ExecuteRequestInterceptor] = Array(LabelExecuteRequestInterceptor, JobExecuteRequestInterceptor)
def setInterceptors(interceptors: Array[ExecuteRequestInterceptor]) = if (interceptors != null && interceptors.nonEmpty) {
this.interceptors = interceptors
}
def setUser(user: String): Unit = userWithCreator = if (userWithCreator != null) UserWithCreator(user, userWithCreator.creator)
else UserWithCreator(user, null)
def getUser = if (userWithCreator != null) userWithCreator.user else null
def setCreator(creator: String): Unit = userWithCreator = if (userWithCreator != null) UserWithCreator(userWithCreator.user, creator)
else UserWithCreator(null, creator)
def getCreator = if (userWithCreator != null) userWithCreator.creator else null
// def getInstance: ServiceInstance = getEngineConnExecutor().getServiceInstance
private[execute] def getEngineReturns = engineReturns.toArray
override def execute(executeRequest: ExecuteRequest): ExecuteResponse = {
var request: RequestTask = null
interceptors.foreach(in => request = in.apply(request, executeRequest))
/*if (request.getProperties != null &&
request.getProperties.containsKey(ReconnectExecuteRequestInterceptor.PROPERTY_EXEC_ID)) {
val execId = ReconnectExecuteRequestInterceptor.PROPERTY_EXEC_ID.toString
Utils.tryCatch {
val engineReturn = new EngineExecuteAsynReturn(request, null, execId, _ => callback())
engineReturns synchronized engineReturns += engineReturn
return engineReturn
} { t: Throwable =>
error(s"Failed to get execId $execId status", t)
}
}*/
val engineReturn = callExecute(executeRequest)
engineReturns synchronized engineReturns += engineReturn
engineReturn
}
protected def callback(): Unit = {}
protected def callExecute(request: ExecuteRequest): EngineExecuteAsynReturn
// override def toString: String = s"${getInstance.getApplicationName}Engine($getId, $getUser, $getCreator, ${getInstance.getInstance})"
override def toString: String = s"string"
protected def killExecId(asynReturn: EngineExecuteAsynReturn, subJobId: String): Boolean = {
info(s"begin to send killExecId, subJobId : $subJobId")
Utils.tryCatch {
asynReturn.orchestrationFuture.cancel(s"Job ${subJobId} was cancelled by user.")
true
}{
case t : Throwable =>
error(s"Kill subjob with id : ${subJobId} failed, ${t.getMessage}")
false
}
}
override def getId: Long = this.id
override def state: ExecutorState = ExecutorState.Idle
override def getExecutorInfo: ExecutorInfo = {
null
}
def canEqual(other: Any): Boolean = other.isInstanceOf[EntranceExecutor]
override def equals(other: Any): Boolean = other match {
case that: EntranceExecutor =>
(that canEqual this) && that.getId == this.getId
case _ => false
}
def getExecId(jobId: String): String = {
val erOption = engineReturns.find(_.getJobId.contains(jobId))
if ( erOption.isDefined ) {
erOption.get.subJobId
} else {
null
}
}
override def hashCode(): Int = {
// getOrchestratorSession().hashCode()
// todo
super.hashCode()
}
def getRunningOrchestrationFuture: Option[OrchestrationFuture] = {
if (null != engineReturns && engineReturns.nonEmpty ) {
Some(engineReturns.last.orchestrationFuture)
} else {
None
}
}
}
class EngineExecuteAsynReturn(val request: ExecuteRequest, val orchestrationFuture: OrchestrationFuture,
val subJobId: String, logProcessor: LogProcessor, progressProcessor: ProgressProcessor = null,
callback: EngineExecuteAsynReturn => Unit) extends AsynReturnExecuteResponse with Logging {
getJobId.foreach(id => info("Job " + id + " received a subjobId " + subJobId + " from orchestrator"))
private var notifyJob: ExecuteResponse => Unit = _
private var error: Throwable = _
private var errorMsg: String = _
private var lastNotifyTime = System.currentTimeMillis
def getLastNotifyTime = lastNotifyTime
private[execute] def notifyStatus(responseEngineStatus: ResponseTaskStatus): Unit = {
lastNotifyTime = System.currentTimeMillis()
val response = responseEngineStatus.status match {
case Succeed => Some(SuccessExecuteResponse())
case Failed | Cancelled | Timeout => Some(ErrorExecuteResponse(errorMsg, error))
case _ => None
}
response.foreach { r =>
getJobId.foreach(id => {
var subJobId: Long = 0l
request match {
case entranceExecuteRequest: EntranceExecuteRequest =>
subJobId = entranceExecuteRequest.getSubJobInfo.getSubJobDetail.getId
val msg = "Job with execId-" + id + " and subJobId : " + subJobId + " from orchestrator" + " completed with state " + r
entranceExecuteRequest.getJob.getLogListener.foreach(_.onLogUpdate(entranceExecuteRequest.getJob, msg))
case _ =>
}
val msgInfo = "Job with execId-" + id + " and subJobId : " + subJobId + " from orchestrator" + " completed with state " + r
info(msgInfo)
request
})
if (null != logProcessor) {
logProcessor.close()
}
if (null != progressProcessor){
progressProcessor.close()
}
Utils.tryAndWarn(if(null != callback) {
callback(this)
})
if (null != notifyJob) {
notifyJob
}
else {
info("NotifyJob is null.")
}
// if (notifyJob == null) this synchronized (while (notifyJob == null) this.wait(1000))
// if (null != notifyJob) notifyJob(r)
}
}
private[execute] def notifyHeartbeat(): Unit = {
lastNotifyTime = System.currentTimeMillis()
}
private[execute] def notifyError(errorMsg: String): Unit = {
lastNotifyTime = System.currentTimeMillis()
this.errorMsg = errorMsg
}
private[execute] def notifyError(errorMsg: String, t: Throwable): Unit = {
lastNotifyTime = System.currentTimeMillis()
this.errorMsg = errorMsg
this.error = t
}
private[execute] def getJobId: Option[String] = {
// val jobId = request.getProperties.get(JobExecuteRequestInterceptor.PROPERTY_JOB_ID)
val jobId = request match {
case entranceExecutorRequest: EntranceExecuteRequest =>
entranceExecutorRequest.getJob.getId
case _ =>
null
}
jobId match {
case j: String => Option(j)
case _ => None
}
}
override def notify(rs: ExecuteResponse => Unit): Unit = {
notifyJob = rs
// this synchronized notify()
}
}