blob: d04574f345813cf5789de2fe4ce272ba0471912e [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engine.mdq
import com.webank.wedatasphere.linkis.common.utils.Logging
import com.webank.wedatasphere.linkis.engine.configuration.SparkConfiguration
import com.webank.wedatasphere.linkis.engine.execute.EngineExecutorContext
import com.webank.wedatasphere.linkis.engine.extension.SparkPostExecutionHook
import com.webank.wedatasphere.linkis.engine.spark.common.SparkKind
import com.webank.wedatasphere.linkis.protocol.mdq.{DDLCompleteResponse, DDLExecuteResponse}
import com.webank.wedatasphere.linkis.rpc.Sender
import com.webank.wedatasphere.linkis.scheduler.executer.{ExecuteResponse, SuccessExecuteResponse}
import com.webank.wedatasphere.linkis.storage.utils.StorageUtils
import javax.annotation.PostConstruct
import org.apache.commons.lang.StringUtils
import org.springframework.stereotype.Component
@Component
class MDQPostExecutionHook extends SparkPostExecutionHook with Logging{
@PostConstruct
def init(): Unit ={
SparkPostExecutionHook.register(this)
}
override def hookName: String = "MDQPostHook"
override def callPostExecutionHook(engineExecutorContext: EngineExecutorContext, executeResponse: ExecuteResponse, code: String): Unit = {
val runType: String = engineExecutorContext.getProperties.get("runType") match {
case value:String => value
case _ => ""
}
if(StringUtils.isEmpty(runType) || ! SparkKind.FUNCTION_MDQ_TYPE.equalsIgnoreCase(runType)) return
val sender = Sender.getSender(SparkConfiguration.MDQ_APPLICATION_NAME.getValue)
executeResponse match {
case SuccessExecuteResponse() =>
sender.ask(DDLExecuteResponse(true, code, StorageUtils.getJvmUser)) match {
case DDLCompleteResponse(status) => if (! status)
warn(s"执行建表失败:$code")
}
case _=> warn(s"执行建表失败:$code")
}
}
}