blob: e0fa4845084c5e15c34f39e7a9ad4c9f09626559 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engineplugin.hive.hook
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engineconn.common.creation.EngineCreationContext
import com.webank.wedatasphere.linkis.engineconn.computation.executor.execute.EngineExecutionContext
import com.webank.wedatasphere.linkis.engineconn.computation.executor.hook.ComputationExecutorHook
import com.webank.wedatasphere.linkis.engineplugin.hive.exception.HiveQueryFailedException
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import java.util
import java.util.regex.Pattern
import scala.collection.JavaConverters.mapAsScalaMapConverter
class HiveAddMetaTableNameHook extends ComputationExecutorHook with Logging {
private val HIVE_USE_TABLENAME_REGEX = ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES.varname + "\\s*=\\s*(true|false)"
override def getHookName(): String = "Add tableName config in metadata of hive result."
override def beforeExecutorExecute(engineExecutionContext: EngineExecutionContext, engineCreationContext: EngineCreationContext, codeBeforeHook: String): String = {
val configMap = new util.HashMap[String, String]()
engineCreationContext.getOptions.asScala.filterNot(_._2.isInstanceOf[util.Map[String, Any]]).foreach(kv => configMap.put(kv._1, s"${kv._2}"))
engineExecutionContext.getProperties.asScala.filterNot(_._2.isInstanceOf[util.Map[String, Any]]).foreach(kv => configMap.put(kv._1, s"${kv._2}"))
if (configMap.containsKey(ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES.varname)) {
engineExecutionContext.setEnableResultsetMetaWithTableName(configMap.get(ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES.varname).toBoolean)
}
if (codeBeforeHook.contains(ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES.varname)) {
val pattern = Pattern.compile(HIVE_USE_TABLENAME_REGEX)
codeBeforeHook.split("\n").foreach(line => {
if (StringUtils.isNotBlank(line)) {
val mather = pattern.matcher(line)
if (mather.find()) {
val value = mather.group(1)
Utils.tryCatch {
val boolValue = value.toBoolean
if (engineExecutionContext.getProperties.containsKey(ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES.varname)) {
warn(s"Should not add param ${mather.group()} in both code and starupMap, will use the param in code.")
}
engineExecutionContext.setEnableResultsetMetaWithTableName(boolValue)
} {
case e: IllegalArgumentException =>
throw HiveQueryFailedException(41006, s"Invalid value : ${value} in param [${mather.group()}]")
}
}
}
})
}
codeBeforeHook
}
}