blob: e5504fc5fa1fcf458a119a2e0b213299d4a811b2 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engine.mdq
import java.util
import javax.annotation.PostConstruct
import com.webank.wedatasphere.linkis.common.utils.Logging
import com.webank.wedatasphere.linkis.engine.configuration.SparkConfiguration
import com.webank.wedatasphere.linkis.engine.exception.MDQErrorException
import com.webank.wedatasphere.linkis.engine.execute.EngineExecutorContext
import com.webank.wedatasphere.linkis.engine.extension.SparkPreExecutionHook
import com.webank.wedatasphere.linkis.engine.spark.common.SparkKind
import com.webank.wedatasphere.linkis.protocol.mdq.{DDLRequest, DDLResponse}
import com.webank.wedatasphere.linkis.rpc.Sender
import com.webank.wedatasphere.linkis.storage.utils.StorageUtils
import org.apache.commons.lang.StringUtils
import org.springframework.stereotype.Component
@Component
class MDQPreExecutionHook extends SparkPreExecutionHook with Logging{
@PostConstruct
def init(): Unit ={
SparkPreExecutionHook.register(this)
}
override def hookName: String = "MDQPreHook"
override def callPreExecutionHook(engineExecutorContext: EngineExecutorContext, code: String): String = {
val runType: String = engineExecutorContext.getProperties.get("runType") match {
case value:String => value
case _ => ""
}
if(StringUtils.isEmpty(runType) || ! SparkKind.FUNCTION_MDQ_TYPE.equalsIgnoreCase(runType)) return code
val sender = Sender.getSender(SparkConfiguration.MDQ_APPLICATION_NAME.getValue)
val params = new util.HashMap[String,Object]()
params.put("user", StorageUtils.getJvmUser)
params.put("code", code)
sender.ask(DDLRequest(params)) match {
case DDLResponse(postCode) => postCode
case _ => throw new MDQErrorException(40010, "向MDQ服务请求解析为可以执行的sql时失败")
}
}
}