blob: 96bd31c738b5cacc2a87c637238c8e7ea6798af1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.computation.client.job
import com.webank.wedatasphere.linkis.computation.client.LinkisJobMetrics
import com.webank.wedatasphere.linkis.computation.client.operator.StorableOperator
import com.webank.wedatasphere.linkis.governance.common.entity.task.RequestPersistTask
import com.webank.wedatasphere.linkis.ujes.client.UJESClient
import com.webank.wedatasphere.linkis.ujes.client.request.{JobSubmitAction, OpenLogAction}
import com.webank.wedatasphere.linkis.ujes.client.response.{JobInfoResult, JobSubmitResult}
trait StorableLinkisJob extends AbstractLinkisJob {
private var completedJobInfoResult: JobInfoResult = _
protected def wrapperId[T](op: => T): T = super.wrapperObj(getId, "taskId must be exists.")(op)
protected val ujesClient: UJESClient
protected def getJobSubmitResult: JobSubmitResult
override protected def wrapperObj[T](obj: Object, errorMsg: String)(op: => T): T = {
super.wrapperObj(obj, errorMsg)(op)
}
protected def getJobInfoResult: JobInfoResult = {
if(completedJobInfoResult != null) return completedJobInfoResult
val startTime = System.currentTimeMillis
val jobInfoResult = wrapperId(ujesClient.getJobInfo(getJobSubmitResult))
getJobMetrics.addClientGetJobInfoTime(System.currentTimeMillis - startTime)
if(jobInfoResult.isCompleted) {
getJobMetrics.setClientFinishedTime(System.currentTimeMillis)
completedJobInfoResult = jobInfoResult
info(s"Job-$getId is completed with status " + completedJobInfoResult.getJobStatus)
getJobListeners.foreach(_.onJobFinished(this))
} else if(jobInfoResult.isRunning)
getJobListeners.foreach(_.onJobRunning(this))
jobInfoResult
}
def getJobInfo: RequestPersistTask = getJobInfoResult.getRequestPersistTask
def getAllLogs: Array[String] = wrapperId {
val jobInfo = getJobInfo
val action = OpenLogAction.newBuilder().setLogPath(jobInfo.getLogPath)
.setProxyUser(jobInfo.getUmUser).build()
ujesClient.openLog(action).getLog
}
override def doKill(): Unit = wrapperId(ujesClient.kill(getJobSubmitResult))
override def isCompleted: Boolean = getJobInfoResult.isCompleted
override def isSucceed: Boolean = getJobInfoResult.isSucceed
}
abstract class StorableSubmittableLinkisJob(override protected val ujesClient: UJESClient,
jobSubmitAction: JobSubmitAction)
extends StorableLinkisJob with AbstractSubmittableLinkisJob {
private var taskId: String = _
private var jobSubmitResult: JobSubmitResult = _
override def getId: String = taskId
override protected def getJobSubmitResult: JobSubmitResult = jobSubmitResult
protected override def wrapperId[T](op: => T): T = super.wrapperObj(taskId, "Please submit job first.")(op)
override protected def doSubmit(): Unit = {
info("Ready to submit job: " + jobSubmitAction.getRequestPayload)
jobSubmitResult = ujesClient.submit(jobSubmitAction)
taskId = jobSubmitResult.taskID
addOperatorAction {
case operator: StorableOperator[_] => operator.setJobSubmitResult(jobSubmitResult).setUJESClient(ujesClient)
case operator => operator
}
info("Job submitted with taskId: " + taskId)
}
}
abstract class StorableExistingLinkisJob(protected override val ujesClient: UJESClient,
execId: String,
taskId: String, user: String) extends StorableLinkisJob {
private val jobSubmitResult = new JobSubmitResult
private val jobMetrics: LinkisJobMetrics = new LinkisJobMetrics(taskId)
jobSubmitResult.setUser(user)
jobSubmitResult.setTaskID(taskId)
jobSubmitResult.setExecID(execId)
jobMetrics.setClientSubmitTime(System.currentTimeMillis)
override protected def getJobSubmitResult: JobSubmitResult = jobSubmitResult
override def getId: String = taskId
override def getJobMetrics: LinkisJobMetrics = jobMetrics
}