blob: 76d44c1da6b90b6ebecf43377233413891cf3d89 [file] [log] [blame]
/*
* Copyright 2019 WeBank
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engineplugin.hive.executor
import com.webank.wedatasphere.linkis.common.exception.ErrorException
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engineconn.computation.executor.execute.{ComputationExecutor, EngineExecutionContext}
import com.webank.wedatasphere.linkis.engineconn.core.EngineConnObject
import com.webank.wedatasphere.linkis.engineplugin.hive.cs.CSHiveHelper
import com.webank.wedatasphere.linkis.engineplugin.hive.exception.HiveQueryFailedException
import com.webank.wedatasphere.linkis.engineplugin.hive.progress.HiveProgressHelper
import com.webank.wedatasphere.linkis.governance.common.paser.SQLCodeParser
import com.webank.wedatasphere.linkis.manager.common.entity.resource.{CommonNodeResource, LoadInstanceResource, NodeResource}
import com.webank.wedatasphere.linkis.manager.engineplugin.common.conf.EngineConnPluginConf
import com.webank.wedatasphere.linkis.manager.label.entity.Label
import com.webank.wedatasphere.linkis.protocol.engine.JobProgressInfo
import com.webank.wedatasphere.linkis.scheduler.executer.{ErrorExecuteResponse, ExecuteResponse, SuccessExecuteResponse}
import com.webank.wedatasphere.linkis.storage.domain.{Column, DataType}
import com.webank.wedatasphere.linkis.storage.resultset.ResultSetFactory
import com.webank.wedatasphere.linkis.storage.resultset.table.{TableMetaData, TableRecord}
import org.apache.commons.io.IOUtils
import org.apache.hadoop.hive.common.HiveInterruptUtils
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory, CommandProcessorResponse}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.mapred.RunningJob
import org.apache.hadoop.security.UserGroupInformation
import org.slf4j.LoggerFactory
import java.io.ByteArrayOutputStream
import java.security.PrivilegedExceptionAction
import java.util
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
class HiveEngineConnExecutor(id: Int,
sessionState: SessionState,
ugi: UserGroupInformation,
hiveConf: HiveConf,
baos: ByteArrayOutputStream = null) extends ComputationExecutor {
private val LOG = LoggerFactory.getLogger(getClass)
private val namePrefix: String = "HiveEngineExecutor_"
private var proc: CommandProcessor = _
private var map: Int = 0
private var reduce: Int = 0
private val totalTask = 200.0f
private var singleLineProgress: Float = 0.0f
private var stage: Int = 0
private var engineExecutorContext: EngineExecutionContext = _
private val singleCodeCompleted: AtomicBoolean = new AtomicBoolean(false)
private var numberOfMRJobs: Int = 0
private var currentSqlProgress: Float = 0.0f
private val singleSqlProgressMap: util.Map[String, Float] = new util.HashMap[String, Float]()
private val executorLabels: util.List[Label[_]] = new util.ArrayList[Label[_]]()
private var driver: HiveDriverProxy = _
private var thread: Thread = _
override def init(): Unit = {
LOG.info(s"Ready to change engine state!")
setCodeParser(new SQLCodeParser)
super.init()
}
override def executeLine(engineExecutorContext: EngineExecutionContext, code: String): ExecuteResponse = {
this.engineExecutorContext = engineExecutorContext
if (engineExecutorContext.getEnableResultsetMetaWithTableName) {
hiveConf.setBoolVar(ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES, true)
info("set HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES true")
} else {
hiveConf.setBoolVar(ConfVars.HIVE_RESULTSET_USE_UNIQUE_COLUMN_NAMES, false)
}
CSHiveHelper.setContextIDInfoToHiveConf(engineExecutorContext, hiveConf)
singleSqlProgressMap.clear()
singleCodeCompleted.set(false)
currentSqlProgress = 0.0f
val realCode = code.trim()
LOG.info(s"hive client begins to run hql code:\n ${realCode.trim}")
if (realCode.trim.length > 500) {
engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim.substring(0, 500)} ...")
} else engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim}")
val tokens = realCode.trim.split("""\s+""")
SessionState.setCurrentSessionState(sessionState)
val proc = CommandProcessorFactory.get(tokens, hiveConf)
this.proc = proc
LOG.debug("ugi is " + ugi.getUserName)
ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() {
override def run(): ExecuteResponse = {
proc match {
case any if HiveDriverProxy.isDriver(any) =>
info(s"driver is $any")
thread = Thread.currentThread()
driver = new HiveDriverProxy(any)
executeHQL(realCode, driver)
case _ => val resp = proc.run(realCode.substring(tokens(0).length).trim)
val result = new String(baos.toByteArray)
logger.info("RESULT => {}", result)
engineExecutorContext.appendStdout(result)
baos.reset()
if (resp.getResponseCode != 0) {
clearCurrentProgress()
HiveProgressHelper.clearHiveProgress()
onComplete()
singleSqlProgressMap.clear()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
throw resp.getException
}
HiveProgressHelper.clearHiveProgress()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
onComplete()
singleSqlProgressMap.clear()
SuccessExecuteResponse()
}
}
})
}
private def executeHQL(realCode: String, driver: HiveDriverProxy): ExecuteResponse = {
var needRetry: Boolean = true
var tryCount: Int = 0
var hasResult: Boolean = false
var rows: Int = 0
var columnCount: Int = 0
while (needRetry) {
needRetry = false
driver.setTryCount(tryCount + 1)
val startTime = System.currentTimeMillis()
try {
if (numberOfMRJobs > 0) engineExecutorContext.appendStdout(s"Your hive sql has $numberOfMRJobs MR jobs to do")
val hiveResponse: CommandProcessorResponse = driver.run(realCode)
if (hiveResponse.getResponseCode != 0) {
LOG.error("Hive query failed, response code is {}", hiveResponse.getResponseCode)
// todo check uncleared context ?
return ErrorExecuteResponse(hiveResponse.getErrorMessage, hiveResponse.getException)
}
engineExecutorContext.appendStdout(s"Time taken: ${Utils.msDurationToString(System.currentTimeMillis() - startTime)}, begin to fetch results.")
LOG.info(s"$getId >> Time taken: ${Utils.msDurationToString(System.currentTimeMillis() - startTime)}, begin to fetch results.")
val fieldSchemas = if (hiveResponse.getSchema != null) hiveResponse.getSchema.getFieldSchemas
else if (driver.getSchema != null) driver.getSchema.getFieldSchemas
else throw HiveQueryFailedException(41005, "cannot get the field schemas.")
LOG.debug("fieldSchemas are " + fieldSchemas)
if (fieldSchemas == null || isNoResultSql(realCode)) {
//IOUtils.closeQuietly(resultSetWriter)
numberOfMRJobs = -1
singleCodeCompleted.set(true)
onComplete()
singleSqlProgressMap.clear()
return SuccessExecuteResponse()
}
//get column data
val metaData: TableMetaData = getResultMetaData(fieldSchemas, engineExecutorContext.getEnableResultsetMetaWithTableName)
//send result
rows = sendResultSet(engineExecutorContext, driver, metaData)
columnCount = if (fieldSchemas != null) fieldSchemas.size() else 0
hasResult = true
} catch {
case e if HiveDriverProxy.isCommandNeedRetryException(e) => tryCount += 1
needRetry = true
HiveProgressHelper.clearHiveProgress()
onComplete()
singleSqlProgressMap.clear()
clearCurrentProgress()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
LOG.warn("Retry hive query with a different approach...")
case t: Throwable => LOG.error(s"query failed, reason : ", t)
HiveProgressHelper.clearHiveProgress()
clearCurrentProgress()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
singleCodeCompleted.set(true)
numberOfMRJobs = -1
onComplete()
singleSqlProgressMap.clear()
return ErrorExecuteResponse(t.getMessage, t)
}
}
if (hasResult) {
engineExecutorContext.appendStdout(s"Fetched $columnCount col(s) : $rows row(s) in hive")
LOG.info(s"$getId >> Fetched $columnCount col(s) : $rows row(s) in hive")
}
clearCurrentProgress()
HiveProgressHelper.clearHiveProgress()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
singleCodeCompleted.set(true)
numberOfMRJobs = -1
onComplete()
singleSqlProgressMap.clear()
SuccessExecuteResponse()
}
private def sendResultSet(engineExecutorContext: EngineExecutionContext, driver: HiveDriverProxy, metaData: TableMetaData): Int = {
val resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
resultSetWriter.addMetaData(metaData)
val colLength = metaData.columns.length
val result = new util.ArrayList[String]()
var rows = 0
while (driver.getResults(result)) {
val scalaResult: mutable.Buffer[String] = result
scalaResult foreach { s =>
val arr: Array[String] = s.split("\t")
val arrAny: ArrayBuffer[Any] = new ArrayBuffer[Any]()
if (arr.length > colLength) {
logger.error(s"""hive code 查询的结果中有\t制表符,hive不能进行切割,请使用spark执行""")
throw new ErrorException(60078, """您查询的结果中有\t制表符,hive不能进行切割,请使用spark执行""")
}
if (arr.length == colLength) arr foreach arrAny.add
else if (arr.length == 0) for (i <- 1 to colLength) arrAny add ""
else {
val i = colLength - arr.length
arr foreach arrAny.add
for (i <- 1 to i) arrAny add ""
}
resultSetWriter.addRecord(new TableRecord(arrAny.toArray))
}
rows += result.size
result.clear()
}
engineExecutorContext.sendResultSet(resultSetWriter)
IOUtils.closeQuietly(resultSetWriter)
rows
}
private def getResultMetaData(fieldSchemas: util.List[FieldSchema], useTableName: Boolean): TableMetaData = {
var results: util.List[FieldSchema] = null
val nameSet = new mutable.HashSet[String]()
val cleanSchema = new util.ArrayList[FieldSchema]()
fieldSchemas foreach {
fieldSchema =>
val name = fieldSchema.getName
if (name.split('.').length == 2) {
nameSet.add(name.split('.')(1))
cleanSchema += new FieldSchema(name.split('.')(1), fieldSchema.getType, fieldSchema.getComment)
}
}
if (nameSet.size < fieldSchemas.length) {
results = fieldSchemas
} else {
if (useTableName) {
results = fieldSchemas
} else {
results = cleanSchema
}
}
val columns = results.map(result => Column(result.getName,
DataType.toDataType(result.getType.toLowerCase()), result.getComment)).toArray[Column]
val metaData = new TableMetaData(columns)
metaData
}
private def isNoResultSql(sql: String): Boolean = {
if (sql.trim.startsWith("create table") || sql.trim.startsWith("drop table")) true else false
}
/**
* 在job完成之前,要将singleSqlProgressMap的剩余的内容全部变为成功
*/
private def onComplete(): Unit = {
if (engineExecutorContext != null) {
val arrayBuffer: ArrayBuffer[JobProgressInfo] = new ArrayBuffer[JobProgressInfo]()
singleSqlProgressMap foreach {
case (jobId, progress) => arrayBuffer += JobProgressInfo(jobId, 200, 0, 0, 200)
}
engineExecutorContext.pushProgress(1.0f, arrayBuffer.toArray[JobProgressInfo])
}
}
private def clearCurrentProgress(): Unit = {
reduce = 0
map = 0
singleLineProgress = 0.0f
}
private def justFieldName(schemaName: String): String = {
LOG.debug("schemaName: " + schemaName)
val arr = schemaName.split("\\.")
if (arr.length == 2) arr(1) else schemaName
}
override def executeCompletely(engineExecutorContext: EngineExecutionContext, code: String, completedLine: String): ExecuteResponse = {
val completeCode = code + completedLine
executeLine(engineExecutorContext, completeCode)
}
override def close(): Unit = {
singleSqlProgressMap.clear()
Utils.tryAndWarnMsg(sessionState.close())("close session failed")
super.close()
}
override def progress(): Float = {
if (engineExecutorContext != null) {
val totalSQLs = engineExecutorContext.getTotalParagraph
val currentSQL = engineExecutorContext.getCurrentParagraph
val currentBegin = (currentSQL - 1) / totalSQLs.asInstanceOf[Float]
HadoopJobExecHelper.runningJobs synchronized {
HadoopJobExecHelper.runningJobs foreach {
runningJob =>
val name = runningJob.getID.toString
val _progress = runningJob.reduceProgress() + runningJob.mapProgress()
singleSqlProgressMap.put(name, _progress / 2)
}
}
var totalProgress: Float = 0.0F
singleSqlProgressMap foreach {
case (_name, _progress) => totalProgress += _progress
}
try {
totalProgress = totalProgress / (numberOfMRJobs * totalSQLs)
} catch {
case e: Exception => totalProgress = 0.0f
case _ => totalProgress = 0.0f
}
logger.debug(s"hive progress is $totalProgress")
if (totalProgress.isNaN || totalProgress.isInfinite) return 0.0f
totalProgress + currentBegin
} else 0.0f
}
override def getProgressInfo: Array[JobProgressInfo] = {
val arrayBuffer: ArrayBuffer[JobProgressInfo] = new ArrayBuffer[JobProgressInfo]()
singleSqlProgressMap synchronized {
val set = singleSqlProgressMap.keySet()
val tempSet = new util.HashSet[RunningJob](HadoopJobExecHelper.runningJobs)
import scala.collection.JavaConverters._
set.asScala foreach {
key =>
if (!tempSet.contains(key)) {
arrayBuffer += JobProgressInfo(key, 200, 0, 0, 200)
}
}
}
HadoopJobExecHelper.runningJobs synchronized {
HadoopJobExecHelper.runningJobs foreach {
runningJob =>
val succeedTask = ((runningJob.mapProgress() + runningJob.reduceProgress()) * 100).asInstanceOf[Int]
if (succeedTask.equals(totalTask.asInstanceOf[Int]) || runningJob.isComplete || runningJob.isSuccessful) {
arrayBuffer += JobProgressInfo(runningJob.getID.toString, totalTask.asInstanceOf[Int], 0, 0, totalTask.asInstanceOf[Int])
} else {
arrayBuffer += JobProgressInfo(runningJob.getID.toString, totalTask.asInstanceOf[Int], 1, 0, succeedTask)
}
}
}
arrayBuffer.toArray
}
override def killTask(taskID: String): Unit = {
LOG.info(s"hive begins to kill job with id : ${taskID}")
HadoopJobExecHelper.killRunningJobs()
//Utils.tryQuietly(TezJobExecHelper.killRunningJobs())
Utils.tryQuietly(HiveInterruptUtils.interrupt())
if (null != thread) {
Utils.tryAndWarn(thread.interrupt())
}
clearCurrentProgress()
singleSqlProgressMap.clear()
HiveProgressHelper.clearHiveProgress()
LOG.info("hive killed job successfully")
super.killTask(taskID)
}
override def supportCallBackLogs(): Boolean = {
// todo
true
}
override def getExecutorLabels(): util.List[Label[_]] = executorLabels
override def setExecutorLabels(labels: util.List[Label[_]]): Unit = {
if (null != labels) {
executorLabels.clear()
executorLabels.addAll(labels)
}
}
override def requestExpectedResource(expectedResource: NodeResource): NodeResource = {
null
}
override def getCurrentNodeResource(): NodeResource = {
val properties = EngineConnObject.getEngineCreationContext.getOptions
if (properties.containsKey(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)) {
val settingClientMemory = properties.get(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key)
if (!settingClientMemory.toLowerCase().endsWith("g")) {
properties.put(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.key, settingClientMemory + "g")
}
}
val actualUsedResource = new LoadInstanceResource(EngineConnPluginConf.JAVA_ENGINE_REQUEST_MEMORY.getValue(properties).toLong,
EngineConnPluginConf.JAVA_ENGINE_REQUEST_CORES.getValue(properties), EngineConnPluginConf.JAVA_ENGINE_REQUEST_INSTANCE.getValue)
val resource = new CommonNodeResource
resource.setUsedResource(actualUsedResource)
resource
}
override def getId(): String = namePrefix + id
}
class HiveDriverProxy(driver: Any) extends Logging {
def getSchema(): Schema = {
driver.getClass.getMethod("getSchema").invoke(driver).asInstanceOf[Schema]
}
def run(): CommandProcessorResponse = {
driver.getClass.getMethod("run").invoke(driver).asInstanceOf[CommandProcessorResponse]
}
def run(command: String): CommandProcessorResponse = {
driver.getClass.getMethod("run", classOf[String]).invoke(driver, command.asInstanceOf[AnyRef]).asInstanceOf[CommandProcessorResponse]
}
def setTryCount(retry: Int): Unit = {
if (HiveDriverProxy.HAS_COMMAND_NEED_RETRY_EXCEPTION) {
driver.getClass.getMethod("setTryCount", classOf[Int]).invoke(driver, retry.asInstanceOf[AnyRef])
}
}
def getResults(res: util.List[_]): Boolean = {
Utils.tryAndWarn {
driver.getClass.getMethod("getResults", classOf[util.List[_]]).invoke(driver, res.asInstanceOf[AnyRef]).asInstanceOf[Boolean]
}
}
def close(): Unit = {
info("start to close driver")
driver.getClass.getMethod("close").invoke(driver)
driver.getClass.getMethod("destroy").invoke(driver)
info("Finished to close driver")
}
}
object HiveDriverProxy extends Logging {
private val COMMAND_NEED_RETRY_EXCEPTION_CLASS_STR = "org.apache.hadoop.hive.ql.CommandNeedRetryException"
private val IDRIVER_CLASS_STR = "org.apache.hadoop.hive.ql.IDriver"
val COMMAND_NEED_RETRY_EXCEPTION_CLASS = Utils.tryQuietly {
Thread.currentThread().getContextClassLoader
.loadClass(COMMAND_NEED_RETRY_EXCEPTION_CLASS_STR)
}
val IDRIVER_CLASS = Utils.tryQuietly {
Thread.currentThread().getContextClassLoader
.loadClass(IDRIVER_CLASS_STR)
}
val HAS_COMMAND_NEED_RETRY_EXCEPTION: Boolean = COMMAND_NEED_RETRY_EXCEPTION_CLASS != null
val HAS_IDRIVER: Boolean = IDRIVER_CLASS != null
def isIDriver(any: Any): Boolean = HAS_IDRIVER && IDRIVER_CLASS.isInstance(any)
def isDriver(any: Any): Boolean = isIDriver(any) || any.isInstanceOf[Driver]
def isCommandNeedRetryException(any: Any): Boolean = HAS_COMMAND_NEED_RETRY_EXCEPTION && COMMAND_NEED_RETRY_EXCEPTION_CLASS.isInstance(any)
}