blob: e051ed1c41e5a836f513aa4e373239f41c5f9c8b [file] [log] [blame]
/*
* Copyright 2019 WeBank
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.entrance.interceptor
import java.io.ByteArrayInputStream
import java.{lang, util}
import com.webank.wedatasphere.linkis.bml.client.BmlClientFactory
import com.webank.wedatasphere.linkis.common.utils.Utils
import com.webank.wedatasphere.linkis.governance.common.entity.job.{JobRequest, OnceExecutorContent}
import com.webank.wedatasphere.linkis.governance.common.utils.OnceExecutorContentUtils
import com.webank.wedatasphere.linkis.governance.common.utils.OnceExecutorContentUtils.BmlResource
import com.webank.wedatasphere.linkis.manager.label.builder.factory.LabelBuilderFactoryContext
import com.webank.wedatasphere.linkis.manager.label.entity.JobLabel
import com.webank.wedatasphere.linkis.manager.label.entity.engine.EngineConnMode._
import com.webank.wedatasphere.linkis.manager.label.entity.engine.{CodeLanguageLabel, EngineConnModeLabel}
import com.webank.wedatasphere.linkis.protocol.constants.TaskConstant
import com.webank.wedatasphere.linkis.protocol.utils.TaskUtils
import com.webank.wedatasphere.linkis.server.BDPJettyServerHelper
import scala.collection.convert.WrapAsJava._
import scala.collection.convert.WrapAsScala._
/**
* Created by enjoyyin on 2021/5/27.
*/
class OnceJobInterceptor extends EntranceInterceptor {
private val onceModes = Array(Once_With_Cluster.toString, Once.toString, Computation_With_Once.toString)
private val bmlClient = BmlClientFactory.createBmlClient(Utils.getJvmUser)
/**
* The apply function is to supplement the information of the incoming parameter task, making the content of this task more complete.
*    * Additional information includes: database information supplement, custom variable substitution, code check, limit limit, etc.
* apply函数是对传入参数task进行信息的补充,使得这个task的内容更加完整。
* 补充的信息包括: 数据库信息补充、自定义变量替换、代码检查、limit限制等
*
* @param task
* @param logAppender Used to cache the necessary reminder logs and pass them to the upper layer(用于缓存必要的提醒日志,传给上层)
* @return
*/
override def apply(task: JobRequest, logAppender: lang.StringBuilder): JobRequest = {
val existsOnceLabel = task.getLabels.exists {
case e: EngineConnModeLabel => onceModes.contains(e.getEngineConnMode)
case _ => false
}
if(!existsOnceLabel) return task
val jobLabel = LabelBuilderFactoryContext.getLabelBuilderFactory.createLabel(classOf[JobLabel])
jobLabel.setJobId(task.getId.toString)
task.getLabels.add(jobLabel)
val onceExecutorContent = new OnceExecutorContent
val params = task.getParams.map{case (k, v) => k -> v.asInstanceOf[Any]}
implicit def toMap(map: util.Map[String, Any]): util.Map[String, Object] = map.map { case (k, v) => k -> v.asInstanceOf[Object]}
onceExecutorContent.setSourceMap(task.getSource.map { case (k, v) => k -> v.asInstanceOf[Object]})
onceExecutorContent.setVariableMap(TaskUtils.getVariableMap(params))
onceExecutorContent.setRuntimeMap(TaskUtils.getRuntimeMap(params))
onceExecutorContent.setJobContent(getJobContent(task))
onceExecutorContent.setExtraLabels(new util.HashMap[String, Object]) //TODO Set it if needed
val contentMap = OnceExecutorContentUtils.contentToMap(onceExecutorContent)
val bytes = BDPJettyServerHelper.jacksonJson.writeValueAsBytes(contentMap)
val response = bmlClient.uploadResource(task.getExecuteUser, getFilePath(task), new ByteArrayInputStream(bytes))
val value = OnceExecutorContentUtils.resourceToValue(BmlResource(response.resourceId, response.version))
TaskUtils.addStartupMap(params,
Map(OnceExecutorContentUtils.ONCE_EXECUTOR_CONTENT_KEY -> value.asInstanceOf[Any]))
task
}
protected def getFilePath(task: JobRequest): String = s"/tmp/${task.getExecuteUser}/${task.getId}"
protected def getJobContent(task: JobRequest): util.Map[String, Object] = {
//TODO Wait for optimizing since the class `JobRequest` is waiting for optimizing .
val jobContent = new util.HashMap[String, Object]
jobContent.put(TaskConstant.CODE, task.getExecutionCode)
task.getLabels.foreach {
case label: CodeLanguageLabel =>
jobContent.put(TaskConstant.RUNTYPE, label.getCodeType)
case _ =>
}
jobContent
}
}