blob: 7a3903acc820eb83b8d281c6800679d0f9cbc80d [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.jobhistory.conversions
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.governance.common.entity.job.{JobRequest, SubJobDetail}
import com.webank.wedatasphere.linkis.governance.common.entity.task.{RequestPersistTask, RequestQueryTask}
import com.webank.wedatasphere.linkis.jobhistory.conf.JobhistoryConfiguration
import com.webank.wedatasphere.linkis.jobhistory.entity.{JobDetail, JobHistory, QueryTask, QueryTaskVO}
import com.webank.wedatasphere.linkis.jobhistory.transitional.TaskStatus
import com.webank.wedatasphere.linkis.jobhistory.util.QueryUtils
import com.webank.wedatasphere.linkis.manager.label.builder.factory.{LabelBuilderFactory, LabelBuilderFactoryContext}
import com.webank.wedatasphere.linkis.manager.label.entity.Label
import com.webank.wedatasphere.linkis.manager.label.utils.LabelUtil
import com.webank.wedatasphere.linkis.protocol.utils.ZuulEntranceUtils
import com.webank.wedatasphere.linkis.server.{BDPJettyServerHelper, toScalaBuffer, toScalaMap}
import org.apache.commons.lang.StringUtils
import org.springframework.beans.BeanUtils
import java.util
import java.util.Date
import java.text.SimpleDateFormat
import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant
import scala.collection.JavaConverters.{asJavaIterableConverter, asScalaBufferConverter, mapAsScalaMapConverter}
object TaskConversions extends Logging {
lazy private val labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
@Deprecated
def requestQueryTask2QueryTask(requestQueryTask: RequestQueryTask): QueryTask = {
val task: QueryTask = new QueryTask
BeanUtils.copyProperties(requestQueryTask, task)
if (requestQueryTask.getParams != null)
task.setParamsJson(BDPJettyServerHelper.gson.toJson(requestQueryTask.getParams))
else
task.setParamsJson(null)
task
}
def queryTask2RequestPersistTask(queryTask: QueryTask): RequestPersistTask = {
QueryUtils.exchangeExecutionCode(queryTask)
val task = new RequestPersistTask
BeanUtils.copyProperties(queryTask, task)
task.setSource(BDPJettyServerHelper.gson.fromJson(queryTask.getSourceJson, classOf[java.util.HashMap[String, String]]))
task.setParams(BDPJettyServerHelper.gson.fromJson(queryTask.getParamsJson, classOf[java.util.HashMap[String, Object]]))
task
}
@Deprecated
def requestPersistTaskTask2QueryTask(requestPersistTask: RequestPersistTask): QueryTask = {
val task: QueryTask = new QueryTask
BeanUtils.copyProperties(requestPersistTask, task)
if (requestPersistTask.getParams != null)
task.setParamsJson(BDPJettyServerHelper.gson.toJson(requestPersistTask.getParams))
else
task.setParamsJson(null)
task
}
def queryTask2QueryTaskVO(queryTask: QueryTask): QueryTaskVO = {
QueryUtils.exchangeExecutionCode(queryTask)
val taskVO = new QueryTaskVO
BeanUtils.copyProperties(queryTask, taskVO)
if (!StringUtils.isEmpty(taskVO.getSourceJson)) {
Utils.tryCatch {
val source = BDPJettyServerHelper.gson.fromJson(taskVO.getSourceJson, classOf[util.Map[String, String]])
taskVO.setSourceTailor(source.asScala.map(_._2).foldLeft("")(_ + _ + "-").stripSuffix("-"))
} {
case _ => warn("sourceJson deserializae failed,this task may be the old data")
}
}
if (queryTask.getExecId() != null && queryTask.getExecuteApplicationName() != null && queryTask.getInstance() != null) {
taskVO.setStrongerExecId(ZuulEntranceUtils.generateExecID(queryTask.getExecId(),
queryTask.getExecuteApplicationName(), queryTask.getInstance(), queryTask.getRequestApplicationName))
}
val status = queryTask.getStatus()
val createdTime = queryTask.getCreatedTime()
val updatedTime = queryTask.getUpdatedTime()
if (isJobFinished(status) && createdTime != null && updatedTime != null) {
taskVO.setCostTime(queryTask.getUpdatedTime().getTime() - queryTask.getCreatedTime().getTime());
} else if (createdTime != null) {
taskVO.setCostTime(System.currentTimeMillis() - queryTask.getCreatedTime().getTime());
}
taskVO
}
def isJobFinished(status: String): Boolean = {
TaskStatus.Succeed.toString.equals(status) ||
TaskStatus.Failed.toString.equals(status) ||
TaskStatus.Cancelled.toString.equals(status) ||
TaskStatus.Timeout.toString.equals(status)
}
def jobHistory2JobRequest(jobHistoryList: util.List[JobHistory]): util.List[JobRequest] = {
val jobRequestList = new util.ArrayList[JobRequest](jobHistoryList.size())
jobHistoryList.asScala.foreach(job => {
val jobReq: JobRequest = jobHistory2JobRequest(job)
jobRequestList.add(jobReq)
})
jobRequestList
}
def jobHistory2JobRequest(job: JobHistory): JobRequest = {
if (null == job) return null
val jobReq = new JobRequest
jobReq.setId(job.getId)
jobReq.setReqId(job.getJob_req_id)
// jobReq.setPriority(job.getPriority)
jobReq.setSubmitUser(job.getSubmit_user)
jobReq.setExecuteUser(job.getExecute_user)
if (null != job.getSource) jobReq.setSource(BDPJettyServerHelper.gson.fromJson(job.getSource, classOf[util.Map[String, Object]]))
if (null != job.getLabels) jobReq.setLabels(getLabelListFromJson(job.getLabels))
jobReq.setParams(BDPJettyServerHelper.gson.fromJson(job.getParams, classOf[util.Map[String, Object]]))
jobReq.setProgress(job.getProgress)
jobReq.setStatus(job.getStatus)
jobReq.setLogPath(job.getLog_path)
jobReq.setErrorCode(job.getError_code)
jobReq.setErrorDesc(job.getError_desc)
jobReq.setCreatedTime(job.getCreated_time)
jobReq.setUpdatedTime(job.getUpdated_time)
jobReq.setMetrics(BDPJettyServerHelper.gson.fromJson((job.getMetrics), classOf[util.Map[String, Object]]))
jobReq.setInstances(job.getInstances)
jobReq
}
def jobRequest2JobHistory(jobReq: JobRequest): JobHistory = {
if (null == jobReq) return null
val jobHistory = new JobHistory
jobHistory.setId(jobReq.getId)
jobHistory.setJob_req_id(jobReq.getReqId)
// jobHistory.setPriority(jobReq.getProgress)
jobHistory.setSubmit_user(jobReq.getSubmitUser)
jobHistory.setExecute_user(jobReq.getExecuteUser)
jobHistory.setSource(BDPJettyServerHelper.gson.toJson(jobReq.getSource))
if (null != jobReq.getLabels) {
val labelMap = new util.HashMap[String, String](jobReq.getLabels.size())
jobReq.getLabels.asScala.map(l => l.getLabelKey -> l.getStringValue).foreach(kv => labelMap.put(kv._1, kv._2))
jobHistory.setLabels(BDPJettyServerHelper.gson.toJson(labelMap))
}
if (null != jobReq.getParams) jobHistory.setParams(BDPJettyServerHelper.gson.toJson(jobReq.getParams))
jobHistory.setProgress(jobReq.getProgress)
jobHistory.setStatus(jobReq.getStatus)
jobHistory.setLog_path(jobReq.getLogPath)
jobHistory.setError_code(jobReq.getErrorCode)
jobHistory.setError_desc(jobReq.getErrorDesc)
if (null != jobReq.getCreatedTime) jobHistory.setCreated_time(new Date(jobReq.getCreatedTime.getTime))
if (null != jobReq.getUpdatedTime) jobHistory.setUpdated_time(new Date(jobReq.getUpdatedTime.getTime))
jobHistory.setInstances(jobReq.getInstances)
if (null != jobReq.getMetrics) jobHistory.setMetrics(BDPJettyServerHelper.gson.toJson(jobReq.getMetrics))
val engineType = LabelUtil.getEngineType(jobReq.getLabels)
jobHistory.setEngine_type(engineType)
jobHistory
}
def subjobDetails2JobDetail(subjobDetails: util.List[SubJobDetail]): util.List[JobDetail] = {
val lists = new util.ArrayList[JobDetail](subjobDetails.size())
subjobDetails.asScala.foreach(j => lists.add(subjobDetail2JobDetail(j)))
lists
}
def jobdetails2SubjobDetail(jobdetails: util.List[JobDetail]): util.List[SubJobDetail] = {
val lists = new util.ArrayList[SubJobDetail](jobdetails.size())
jobdetails.asScala.foreach(j => lists.add(jobdetail2SubjobDetail(j)))
lists
}
def subjobDetail2JobDetail(subjob: SubJobDetail): JobDetail = {
if (null == subjob) return null
val jobDetail = new JobDetail
jobDetail.setId(subjob.getId)
jobDetail.setJob_history_id(subjob.getJobGroupId)
jobDetail.setResult_location(subjob.getResultLocation)
jobDetail.setResult_array_size(subjob.getResultSize)
jobDetail.setExecution_content(subjob.getExecutionContent)
jobDetail.setJob_group_info(subjob.getJobGroupInfo)
jobDetail.setCreated_time(subjob.getCreatedTime)
jobDetail.setUpdated_time(subjob.getUpdatedTime)
jobDetail.setStatus(subjob.getStatus)
jobDetail.setPriority(subjob.getPriority)
jobDetail
}
def jobdetail2SubjobDetail(jobdetail: JobDetail): SubJobDetail = {
if (null == jobdetail) return null
val subjobDetail = new SubJobDetail
subjobDetail.setId(jobdetail.getId)
subjobDetail.setJobGroupId(jobdetail.getJob_history_id)
subjobDetail.setResultLocation(jobdetail.getResult_location)
subjobDetail.setResultSize(jobdetail.getResult_array_size)
subjobDetail.setExecutionContent(jobdetail.getExecution_content)
subjobDetail.setJobGroupInfo(jobdetail.getJob_group_info)
subjobDetail.setCreatedTime(jobdetail.getCreated_time)
subjobDetail.setUpdatedTime(jobdetail.getUpdated_time)
subjobDetail.setStatus(jobdetail.getStatus)
subjobDetail.setPriority(jobdetail.getPriority)
subjobDetail
}
def jobHistory2TaskVO(job: JobHistory, subjobs: util.List[SubJobDetail]): QueryTaskVO = {
if (null == job) return null
val taskVO = new QueryTaskVO
taskVO.setTaskID(job.getId)
taskVO.setInstance(job.getInstances)
taskVO.setExecId(job.getJob_req_id)
taskVO.setUmUser(job.getSubmit_user)
taskVO.setEngineInstance(null)
taskVO.setProgress(job.getProgress)
taskVO.setLogPath(job.getLog_path)
taskVO.setStatus(job.getStatus)
if (null != job.getCreated_time) taskVO.setCreatedTime(new Date(job.getCreated_time.getTime))
if (null != job.getUpdated_time) taskVO.setUpdatedTime(new Date(job.getUpdated_time.getTime))
val labelList = getLabelListFromJson(job.getLabels)
var engineType = job.getEngine_type
var codeType = ""
var creator = ""
if (null != labelList && labelList.size() > 0) {
if (null == engineType) {
engineType = LabelUtil.getEngineType(labelList)
}
codeType = LabelUtil.getCodeType(labelList)
val userCreator = Option(LabelUtil.getUserCreator(labelList)).getOrElse(null)
if (null != userCreator) {
creator = userCreator._2
}
}
taskVO.setEngineType(engineType)
taskVO.setExecuteApplicationName(job.getEngine_type)
taskVO.setRequestApplicationName(creator)
taskVO.setRunType(codeType)
taskVO.setParamsJson(job.getParams)
taskVO.setCreatedTime(job.getCreated_time)
taskVO.setUpdatedTime(job.getUpdated_time)
taskVO.setErrCode(job.getError_code)
taskVO.setErrDesc(job.getError_desc)
val metrics = BDPJettyServerHelper.gson.fromJson((job.getMetrics), classOf[util.Map[String, Object]])
var completeTime: Date = null
if(null != metrics && metrics.containsKey(TaskConstant.ENTRANCEJOB_COMPLETE_TIME) && metrics.get(TaskConstant.ENTRANCEJOB_COMPLETE_TIME) != null){
completeTime = dealString2Date(metrics.get(TaskConstant.ENTRANCEJOB_COMPLETE_TIME).toString)
}
var createTime: Date = null
if(null != metrics && metrics.containsKey(TaskConstant.ENTRANCEJOB_SUBMIT_TIME) && metrics.get(TaskConstant.ENTRANCEJOB_SUBMIT_TIME) != null){
createTime = dealString2Date(metrics.get(TaskConstant.ENTRANCEJOB_SUBMIT_TIME).toString)
}
if(isJobFinished(job.getStatus) && null != completeTime && null != createTime){
taskVO.setCostTime(completeTime.getTime - createTime.getTime)
}else if (null != createTime){
taskVO.setCostTime(System.currentTimeMillis() - createTime.getTime)
}
val entranceName = JobhistoryConfiguration.ENTRANCE_SPRING_NAME.getValue
val instances = job.getInstances().split(JobhistoryConfiguration.ENTRANCE_INSTANCE_DELEMITER.getValue)
taskVO.setStrongerExecId(ZuulEntranceUtils.generateExecID(job.getJob_req_id, entranceName, instances))
taskVO.setSourceJson(job.getSource)
val code = new StringBuilder("")
subjobs.asScala.foreach(job => code.append(job.getExecutionContent).append(";\n"))
taskVO.setExecutionCode(code.toString())
taskVO.setSubJobs(subjobs)
taskVO.setSourceJson(job.getSource)
if (StringUtils.isNotBlank(job.getSource)) {
Utils.tryCatch {
val source = BDPJettyServerHelper.gson.fromJson(job.getSource, classOf[util.Map[String, String]])
taskVO.setSourceTailor(source.map(_._2).foldLeft("")(_ + _ + "-").stripSuffix("-"))
} {
case _ => warn("sourceJson deserialization failed, this task may be the old data.")
}
}
taskVO
}
def getLabelListFromJson(labelJson: String): util.List[Label[_]] = {
val labelMap = BDPJettyServerHelper.gson.fromJson(labelJson, classOf[util.HashMap[String, String]])
val labels = new util.ArrayList[Label[_]]()
Utils.tryAndWarn {
labelMap.asScala.map(l => labelBuilderFactory.createLabel[Label[_]](l._1, l._2)).foreach(labels.add)
}
labels
}
def dealString2Date(strDate : String) : Date = {
val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
Utils.tryCatch{
val date = df.parse(strDate)
date
} {
_ => warn("String to Date deserialization failed.")
null
}
}
}