blob: c0ace13d286b757e3d8876c4748e6e3147a1e697 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engine.hive.executor
import java.security.PrivilegedExceptionAction
import java.util
import java.util.concurrent.atomic.AtomicBoolean
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engine.execute.{EngineExecutor, EngineExecutorContext, SQLCodeParser}
import com.webank.wedatasphere.linkis.engine.hive.common.HiveUtils
import com.webank.wedatasphere.linkis.engine.hive.exception.HiveQueryFailedException
import com.webank.wedatasphere.linkis.engine.hive.progress.HiveProgressHelper
import com.webank.wedatasphere.linkis.protocol.engine.JobProgressInfo
import com.webank.wedatasphere.linkis.resourcemanager.{LoadInstanceResource, Resource}
import com.webank.wedatasphere.linkis.rpc.Sender
import com.webank.wedatasphere.linkis.scheduler.executer._
import com.webank.wedatasphere.linkis.storage.domain.{Column, DataType}
import com.webank.wedatasphere.linkis.storage.resultset.ResultSetFactory
import com.webank.wedatasphere.linkis.storage.resultset.table.{TableMetaData, TableRecord}
import org.apache.commons.io.IOUtils
import org.apache.commons.lang.StringUtils
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
import org.apache.hadoop.hive.ql.exec.mr.HadoopJobExecHelper
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory, CommandProcessorResponse}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.mapred.RunningJob
import org.apache.hadoop.security.UserGroupInformation
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
/**
* created by cooperyang on 2018/11/22
* Description:
*/
class HiveEngineExecutor(outputPrintLimit:Int,
sessionState: SessionState,
ugi:UserGroupInformation,
hiveConf:HiveConf)
extends EngineExecutor(outputPrintLimit, isSupportParallelism = false) with SingleTaskOperateSupport with SingleTaskInfoSupport {
private val LOG = LoggerFactory.getLogger(getClass)
private val name:String = Sender.getThisServiceInstance.getInstance
private var initialized:Boolean = false
private var proc:CommandProcessor = _
private var map:Int = 0
private var reduce:Int = 0
private val totalTask = 200.0f
private var singleLineProgress:Float = 0.0f
private var engineExecutorContext:EngineExecutorContext = _
private val singleCodeCompleted:AtomicBoolean = new AtomicBoolean(false)
private var numberOfMRJobs:Int = -1
private var currentSqlProgress:Float = 0.0f
private val singleSqlProgressMap:util.Map[String, Float] = new util.HashMap[String, Float]()
override def getName: String = name
override def init():Unit = {
transition(ExecutorState.Idle)
LOG.info(s"Ready to change engine state!")
setCodeParser(new SQLCodeParser)
super.init()
}
override def getActualUsedResources: Resource = {
new LoadInstanceResource(Runtime.getRuntime.totalMemory() - Runtime.getRuntime.freeMemory(), 2, 1)
}
override def executeLine(engineExecutorContext: EngineExecutorContext, code: String): ExecuteResponse = {
// Clear the data of singleSqlMap(清空singleSqlMap的数据)
singleSqlProgressMap.clear()
singleCodeCompleted.set(false)
currentSqlProgress = 0.0f
var realCode = code.trim()
while(realCode.startsWith("\n")) realCode = StringUtils.substringAfter(realCode, "\n")
LOG.info(s"hive client begins to run hql code:\n ${realCode.trim}")
if (realCode.trim.length > 500){
engineExecutorContext.appendStdout(s"$getName >> ${realCode.trim.substring(0, 500)} ...")
} else engineExecutorContext.appendStdout(s"$getName >> ${realCode.trim}")
val tokens = realCode.trim.split("""\s+""")
//if(!initialized){
SessionState.setCurrentSessionState(sessionState)
// initialized = true
// }
this.engineExecutorContext = engineExecutorContext
val proc = CommandProcessorFactory.get(tokens, hiveConf)
this.proc = proc
LOG.debug("ugi is " + ugi.getUserName)
ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() {
override def run(): ExecuteResponse = {
proc match {
case any if HiveDriverProxy.isDriver(any) =>
val driver = new HiveDriverProxy(any)
var needRetry:Boolean = true
var tryCount:Int = 0
var hasResult:Boolean = false
var rows:Int = 0
var columnCount:Int = 0
while(needRetry){
needRetry = false
driver.setTryCount(tryCount)
val startTime = System.currentTimeMillis()
try{
numberOfMRJobs = 1
val hiveResponse:CommandProcessorResponse = driver.run(realCode)
if (hiveResponse.getResponseCode != 0) {
LOG.error("Hive query failed, reason: ", hiveResponse.getException)
val errorException = HiveQueryFailedException(41004, "hive query failed:" + hiveResponse.getErrorMessage)
errorException.initCause(hiveResponse.getException)
throw errorException
}
engineExecutorContext.appendStdout(s"Time taken: ${HiveUtils.msDurationToString(System.currentTimeMillis() - startTime)}, begin to fetch results.")
LOG.info(s"$getName >> Time taken: ${HiveUtils.msDurationToString(System.currentTimeMillis() - startTime)}, begin to fetch results.")
val fieldSchemas = if (hiveResponse.getSchema != null) hiveResponse.getSchema.getFieldSchemas
else if (driver.getSchema != null) driver.getSchema.getFieldSchemas
else throw HiveQueryFailedException(41005, "cannot get the field schemas.")
LOG.debug("fieldSchemas are " + fieldSchemas)
if (fieldSchemas == null) {
numberOfMRJobs = -1
singleCodeCompleted.set(true)
singleSqlProgressMap.clear()
return SuccessExecuteResponse()
}
hasResult = true
import scala.collection.JavaConversions._
val scalaFieldSchemas:scala.collection.mutable.Buffer[FieldSchema] = fieldSchemas
LOG.debug("Scala field Schemas are " + scalaFieldSchemas.mkString(" "))
val columns = scalaFieldSchemas.map(fieldSchema => Column(justFieldName(fieldSchema.getName),
DataType.toDataType(fieldSchema.getType.toLowerCase()), fieldSchema.getComment)).toArray[Column]
val metaData = new TableMetaData(columns)
val resultSetWriter = engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
resultSetWriter.addMetaData(metaData)
val result = new util.ArrayList[String]()
while(driver.getResults(result)){
val scalaResult:scala.collection.mutable.Buffer[String] = result
scalaResult foreach { s =>
val arr:Array[String] = s.split("\t")
val arrAny:ArrayBuffer[Any] = new ArrayBuffer[Any]()
if (arr.length != 0) arr foreach arrAny.add else for(i <-1 to columns.length) arrAny add ""
resultSetWriter.addRecord(new TableRecord(arrAny.toArray))
}
rows += result.size
result.clear()
}
columnCount = if (fieldSchemas != null) fieldSchemas.size() else 0
engineExecutorContext.sendResultSet(resultSetWriter)
IOUtils.closeQuietly(resultSetWriter)
}catch{
case e if HiveDriverProxy.isCommandNeedRetryException(e) => tryCount += 1
needRetry = true
HiveProgressHelper.clearHiveProgress()
singleSqlProgressMap.clear()
clearCurrentProgress()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
LOG.warn("Retry hive query with a different approach...")
case t:Throwable => LOG.error(s"query failed, reason:",t)
HiveProgressHelper.clearHiveProgress()
clearCurrentProgress()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
singleCodeCompleted.set(true)
numberOfMRJobs = -1
singleSqlProgressMap.clear()
return ErrorExecuteResponse(t.getMessage, t)
}
}
if (hasResult) {
engineExecutorContext.appendStdout(s"Fetched $columnCount col(s) : $rows row(s) in hive")
LOG.info(s"$getName >> Fetched $columnCount col(s) : $rows row(s) in hive")
}
clearCurrentProgress()
HiveProgressHelper.clearHiveProgress()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
singleCodeCompleted.set(true)
numberOfMRJobs = -1
singleSqlProgressMap.clear()
SuccessExecuteResponse()
case _ => val resp = proc.run(realCode.substring(tokens(0).length).trim)
if(resp.getResponseCode != 0) {
clearCurrentProgress()
HiveProgressHelper.clearHiveProgress()
singleSqlProgressMap.clear()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
throw resp.getException
}
clearCurrentProgress()
HiveProgressHelper.clearHiveProgress()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
singleSqlProgressMap.clear()
SuccessExecuteResponse()
}
}
})
}
private def clearCurrentProgress():Unit = {
reduce = 0
map = 0
singleLineProgress = 0.0f
}
private def justFieldName(schemaName:String):String = {
LOG.debug("schemaName: " + schemaName)
val arr = schemaName.split("\\.")
if (arr.length == 2) arr(1) else schemaName
}
override protected def executeCompletely(engineExecutorContext: EngineExecutorContext, code: String, completedLine: String): ExecuteResponse = {
val completeCode = code + completedLine
executeLine(engineExecutorContext, completeCode)
}
override def close(): Unit = {
singleSqlProgressMap.clear()
Utils.tryAndWarnMsg(sessionState.close())("close session failed")
}
override def progress(): Float = {
if (engineExecutorContext != null){
val totalSQLs = engineExecutorContext.getTotalParagraph
val currentSQL = engineExecutorContext.getCurrentParagraph
val currentBegin = (currentSQL - 1) / totalSQLs.asInstanceOf[Float]
HadoopJobExecHelper.runningJobs synchronized {
HadoopJobExecHelper.runningJobs foreach {
runningJob => val name = runningJob.getID.toString
val _progress = runningJob.reduceProgress() + runningJob.mapProgress()
singleSqlProgressMap.put(name, _progress / 2)
}
}
var totalProgress:Float = 0.0F
singleSqlProgressMap foreach {
case (_name, _progress) => totalProgress += _progress
}
try{
totalProgress = totalProgress / (numberOfMRJobs * totalSQLs)
}catch{
case e:Exception => totalProgress = 0.0f
case _ => totalProgress = 0.0f
}
logger.info(s"hive progress is $totalProgress")
if (totalProgress.isNaN) return 0.0f
totalProgress + currentBegin
}else 0.0f
}
override def getProgressInfo: Array[JobProgressInfo] = {
val arrayBuffer:ArrayBuffer[JobProgressInfo] = new ArrayBuffer[JobProgressInfo]()
if (HadoopJobExecHelper.runningJobs.isEmpty) return arrayBuffer.toArray
singleSqlProgressMap synchronized{
val set = singleSqlProgressMap.keySet()
val tempSet = new util.HashSet[RunningJob](HadoopJobExecHelper.runningJobs)
import scala.collection.JavaConverters._
set.asScala foreach {
key => if (!tempSet.contains(key)){
arrayBuffer += JobProgressInfo(key, 200, 0, 0, 200)
}
}
}
HadoopJobExecHelper.runningJobs synchronized {
HadoopJobExecHelper.runningJobs foreach {
runningJob => val succeedTask = ((runningJob.mapProgress() + runningJob.reduceProgress()) * 100).asInstanceOf[Int]
if (succeedTask.equals(totalTask.asInstanceOf[Int]) || runningJob.isComplete || runningJob.isSuccessful){
arrayBuffer += JobProgressInfo(runningJob.getID.toString, totalTask.asInstanceOf[Int], 0, 0, totalTask.asInstanceOf[Int])
}else{
arrayBuffer += JobProgressInfo(runningJob.getID.toString, totalTask.asInstanceOf[Int], 1, 0, succeedTask)
}
}
}
arrayBuffer.toArray
}
override def log(): String = ""
override def kill(): Boolean = {
LOG.info("hive begins to kill job")
HadoopJobExecHelper.killRunningJobs()
clearCurrentProgress()
singleSqlProgressMap.clear()
HiveProgressHelper.clearHiveProgress()
LOG.info("hive killed job successfully")
true
}
override def pause(): Boolean = {
true
}
override def resume(): Boolean = {
true
}
}
class HiveDriverProxy(driver: Any) extends Logging {
def getSchema(): Schema = {
Utils.tryAndWarn {
driver.getClass.getMethod("getSchema").invoke(driver).asInstanceOf[Schema]
}
}
def run(): CommandProcessorResponse = {
Utils.tryAndWarn {
driver.getClass.getMethod("run").invoke(driver).asInstanceOf[CommandProcessorResponse]
}
}
def run(command: String): CommandProcessorResponse = {
Utils.tryAndWarn {
driver.getClass.getMethod("run", classOf[String]).invoke(driver, command.asInstanceOf[AnyRef]).asInstanceOf[CommandProcessorResponse]
}
}
def setTryCount(retry: Int): Unit = {
if (HiveDriverProxy.HAS_COMMAND_NEED_RETRY_EXCEPTION) {
Utils.tryAndWarn {
driver.getClass.getMethod("setTryCount", classOf[Int]).invoke(driver, retry.asInstanceOf[AnyRef])
}
}
}
def getResults(res: util.List[_]): Boolean = {
Utils.tryAndWarn {
driver.getClass.getMethod("getResults", classOf[util.List[_]]).invoke(driver, res.asInstanceOf[AnyRef]).asInstanceOf[Boolean]
}
}
}
object HiveDriverProxy extends Logging {
private val COMMAND_NEED_RETRY_EXCEPTION_CLASS_STR = "org.apache.hadoop.hive.ql.CommandNeedRetryException"
private val IDRIVER_CLASS_STR = "org.apache.hadoop.hive.ql.IDriver"
val COMMAND_NEED_RETRY_EXCEPTION_CLASS = Utils.tryAndWarn {
Thread.currentThread().getContextClassLoader
.loadClass(COMMAND_NEED_RETRY_EXCEPTION_CLASS_STR)
}
val IDRIVER_CLASS = Utils.tryAndWarn {
Thread.currentThread().getContextClassLoader
.loadClass(IDRIVER_CLASS_STR)
}
val HAS_COMMAND_NEED_RETRY_EXCEPTION: Boolean = COMMAND_NEED_RETRY_EXCEPTION_CLASS != null
val HAS_IDRIVER: Boolean = IDRIVER_CLASS != null
def isIDriver(any: Any): Boolean = HAS_IDRIVER && IDRIVER_CLASS.isInstance(any)
def isDriver(any: Any): Boolean = isIDriver(any) || any.isInstanceOf[Driver]
def isCommandNeedRetryException(any: Any): Boolean = HAS_COMMAND_NEED_RETRY_EXCEPTION && COMMAND_NEED_RETRY_EXCEPTION_CLASS.isInstance(any)
}