blob: afe3dfd5dc14610d273d95b407e58ac20853399b [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engine.executors
import java.text.NumberFormat
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import com.webank.wedatasphere.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
import com.webank.wedatasphere.linkis.engine.configuration.SparkConfiguration
import com.webank.wedatasphere.linkis.engine.exception.{NoSupportEngineException, SparkEngineException}
import com.webank.wedatasphere.linkis.engine.execute.{EngineExecutor, EngineExecutorContext}
import com.webank.wedatasphere.linkis.engine.extension.{SparkPostExecutionHook, SparkPreExecutionHook}
import com.webank.wedatasphere.linkis.engine.spark.common._
import com.webank.wedatasphere.linkis.engine.spark.utils.{EngineUtils, JobProgressUtil}
import com.webank.wedatasphere.linkis.protocol.engine.JobProgressInfo
import com.webank.wedatasphere.linkis.resourcemanager.{DriverAndYarnResource, LoadInstanceResource, YarnResource}
import com.webank.wedatasphere.linkis.scheduler.exception.DWCJobRetryException
import com.webank.wedatasphere.linkis.scheduler.executer.ExecutorState.{ExecutorState => _}
import com.webank.wedatasphere.linkis.scheduler.executer._
import com.webank.wedatasphere.linkis.storage.domain.{Column, DataType}
import com.webank.wedatasphere.linkis.storage.resultset.ResultSetFactory
import com.webank.wedatasphere.linkis.storage.resultset.table.{TableMetaData, TableRecord}
import com.webank.wedatasphere.linkis.storage.{LineMetaData, LineRecord}
import org.apache.commons.lang.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{StructField, StructType}
import scala.collection.mutable.ArrayBuffer
/**
* Created by allenlliu on 2019/4/8.
*/
class SparkEngineExecutor(val sc: SparkContext, id: Long, outputPrintLimit: Int, sparkExecutors: Seq[SparkExecutor])
extends EngineExecutor(outputPrintLimit, false) with SingleTaskOperateSupport with SingleTaskInfoSupport with Logging {
val queryNum = new AtomicLong(0)
private var jobGroup: String = _
private var oldprogress:Float = 0f
override def init() = {
info(s"Ready to change engine state!")
// this.setCodeParser(new SparkCombinedCodeParser())
val heartbeat = Utils.defaultScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
if (sc.isStopped) {
error("Spark application has already stopped, please restart a new engine.")
transition(ExecutorState.Error)
}
}
}, 1000, 3000, TimeUnit.MILLISECONDS)
super.init()
}
override def getName: String = sc.appName
//Runtime resources, because of the time constraints, so take it directly in the conf(运行时资源 这里由于时间紧迫,所以先直接在conf里面拿)
override def getActualUsedResources: DriverAndYarnResource = {
info("Begin to get actual used resources!")
Utils.tryCatch({
// val driverHost: String = sc.getConf.get("spark.driver.host")
// val executorMemList = sc.getExecutorMemoryStatus.filter(x => !x._1.split(":")(0).equals(driverHost)).map(x => x._2._1)
val executorNum: Int = sc.getConf.get("spark.executor.instances").toInt
val executorMem: Long = ByteTimeUtils.byteStringAsBytes(sc.getConf.get("spark.executor.memory")) * executorNum
// if(executorMemList.size>0) {
// executorMem = executorMemList.reduce((x, y) => x + y)
// }
val driverMem: Long = ByteTimeUtils.byteStringAsBytes(sc.getConf.get("spark.driver.memory"))
// val driverMemList = sc.getExecutorMemoryStatus.filter(x => x._1.split(":")(0).equals(driverHost)).map(x => x._2._1)
// if(driverMemList.size > 0) {
// driverMem = driverMemList.reduce((x, y) => x + y)
// }
val sparkExecutorCores = sc.getConf.get("spark.executor.cores").toInt * executorNum
val sparkDriverCores = sc.getConf.get("spark.driver.cores", "1").toInt
val queue = sc.getConf.get("spark.yarn.queue")
info("Current actual used resources is driverMem:" + driverMem + ",driverCores:" + sparkDriverCores + ",executorMem:" + executorMem + ",executorCores:" + sparkExecutorCores + ",queue:" + queue)
new DriverAndYarnResource(
new LoadInstanceResource(driverMem, sparkDriverCores, 1),
new YarnResource(executorMem, sparkExecutorCores, 0, queue, sc.applicationId)
)
})(t => {
warn("Get actual used resource exception", t)
null
})
}
private[executors] var engineExecutorContext: EngineExecutorContext = _
override protected def executeLine(engineExecutorContext: EngineExecutorContext, code: String): ExecuteResponse = Utils.tryFinally {
if (sc.isStopped) {
error("Spark application has already stopped, please restart it.")
transition(ExecutorState.Error)
throw new DWCJobRetryException("Spark application sc has already stopped, please restart it.")
}
this.engineExecutorContext = engineExecutorContext
oldprogress = 0f
val runType = engineExecutorContext.getProperties.get("runType").asInstanceOf[String]
var kind: Kind = null
if (StringUtils.isNotBlank(runType)) {
runType.toLowerCase match {
case "python" | "py" | "pyspark" => kind = PySpark()
case "sql" | "sparksql" => kind = SparkSQL()
case "scala" |SparkKind.FUNCTION_MDQ_TYPE => kind = SparkScala()
case _ => kind = SparkSQL()
}
}
var preCode = code
//Pre-execution hook
SparkPreExecutionHook.getSparkPreExecutionHooks().foreach(hook => preCode = hook.callPreExecutionHook(engineExecutorContext,preCode))
val _code = Kind.getRealCode(preCode)
info(s"Ready to run code with kind $kind.")
jobGroup = String.valueOf("dwc-spark-mix-code-" + queryNum.incrementAndGet())
// val executeCount = queryNum.get().toInt - 1
info("Set jobGroup to " + jobGroup)
sc.setJobGroup(jobGroup, _code, true)
val response = Utils.tryFinally(sparkExecutors.find(_.kind == kind).map(_.execute(this, _code, engineExecutorContext, jobGroup)).getOrElse(throw new NoSupportEngineException(40008, s"Not supported $kind executor!"))) {
this.engineExecutorContext.pushProgress(1, getProgressInfo)
jobGroup = null
sc.clearJobGroup()
}
//Post-execution hook
SparkPostExecutionHook.getSparkPostExecutionHooks().foreach(_.callPostExecutionHook(engineExecutorContext,response,code))
response
}{
this.engineExecutorContext = null
oldprogress = 0f
}
override protected def executeCompletely(engineExecutorContext: EngineExecutorContext, code: String, completedLine: String): ExecuteResponse = {
val newcode = completedLine + code
info("newcode is " + newcode)
executeLine(engineExecutorContext, newcode)
}
override def createEngineExecutorContext(executeRequest: ExecuteRequest): EngineExecutorContext = {
val engineExecutorContext = super.createEngineExecutorContext(executeRequest)
executeRequest match {
case runTypeExecuteRequest: RunTypeExecuteRequest => engineExecutorContext.addProperty("runType", runTypeExecuteRequest.runType)
case _ =>
}
engineExecutorContext
}
override def pause(): Boolean = false
override def kill(): Boolean = {
if (!sc.isStopped) {
sc.cancelAllJobs
true
} else {
false
}
}
override def resume(): Boolean = false
override def progress(): Float = if (jobGroup == null || engineExecutorContext.getTotalParagraph == 0) 0
else {
debug("request new progress for jobGroup is " + jobGroup + "old progress:" + oldprogress)
val newProgress = (engineExecutorContext.getCurrentParagraph * 1f - 1f )/ engineExecutorContext.getTotalParagraph + JobProgressUtil.progress(sc,jobGroup)/engineExecutorContext.getTotalParagraph - 0.01f
if(newProgress < oldprogress && oldprogress < 0.98) oldprogress else {
oldprogress = newProgress
newProgress
}
}
override def getProgressInfo: Array[JobProgressInfo] = if (jobGroup == null) Array.empty
else {
debug("request new progress info for jobGroup is " + jobGroup)
val progressInfoArray = ArrayBuffer[JobProgressInfo]()
progressInfoArray ++= JobProgressUtil.getActiveJobProgressInfo(sc,jobGroup)
progressInfoArray ++= JobProgressUtil.getCompletedJobProgressInfo(sc,jobGroup)
progressInfoArray.toArray
}
override def log(): String = {
""
}
override def close(): Unit = {
sparkExecutors foreach {
executor => Utils.tryCatch(executor.close){
case e:Exception => logger.error("c;pse")
}
}
}
}
object SQLSession extends Logging {
val nf = NumberFormat.getInstance()
nf.setGroupingUsed(false)
nf.setMaximumFractionDigits(SparkConfiguration.SPARK_NF_FRACTION_LENGTH.getValue)
def showDF(sc: SparkContext, jobGroup: String, df: Any, alias: String, maxResult: Int, engineExecutorContext: EngineExecutorContext): Unit = {
//
if (sc.isStopped) {
error("Spark application has already stopped in showDF, please restart it.")
throw new DWCJobRetryException("Spark application sc has already stopped, please restart it.")
}
val startTime = System.currentTimeMillis()
// sc.setJobGroup(jobGroup, "Get IDE-SQL Results.", false)
val dataFrame = df.asInstanceOf[DataFrame]
val iterator = Utils.tryThrow(dataFrame.toLocalIterator) { t =>
throw new SparkEngineException(40002, s"dataFrame to local exception", t)
}
//var columns: List[Attribute] = null
// get field names
//logger.info("SCHEMA BEGIN")
import java.util
val colSet = new util.HashSet[String]()
val schema = dataFrame.schema
var columnsSet:StructType = null
schema foreach (s => colSet.add(s.name))
if (colSet.size() < schema.size){
val arr:ArrayBuffer[StructField] = new ArrayBuffer[StructField]()
dataFrame.queryExecution.analyzed.output foreach {
attri => val tempAttri = StructField(attri.qualifiedName, attri.dataType, attri.nullable, attri.metadata)
arr += tempAttri
}
columnsSet = StructType(arr.toArray)
}else{
columnsSet = schema
}
//val columnsSet = dataFrame.schema
val columns = columnsSet.map(c =>
Column(c.name, DataType.toDataType(c.dataType.typeName.toLowerCase), c.getComment().orNull)).toArray[Column]
if (columns == null || columns.isEmpty) return
val metaData = new TableMetaData(columns)
val writer = if (StringUtils.isNotBlank(alias))
engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE, alias)
else engineExecutorContext.createResultSetWriter(ResultSetFactory.TABLE_TYPE)
writer.addMetaData(metaData)
var index = 0
Utils.tryThrow({
while (index < maxResult && iterator.hasNext) {
val row = iterator.next()
val r: Array[Any] = columns.indices.map { i =>
val data = row(i) match {
case value: String => value.replaceAll("\n|\t", " ")
case value: Double => nf.format(value)
case value: Any => value.toString
case _ => null
}
data
}.toArray
writer.addRecord(new TableRecord(r))
index += 1
}
}) { t =>
throw new SparkEngineException(40001, s"read record exception", t)
}
warn(s"Time taken: ${System.currentTimeMillis() - startTime}, Fetched $index row(s).")
engineExecutorContext.appendStdout(s"${EngineUtils.getName} >> Time taken: ${System.currentTimeMillis() - startTime}, Fetched $index row(s).")
engineExecutorContext.sendResultSet(writer)
}
def showHTML(sc: SparkContext, jobGroup: String, htmlContent: Any, engineExecutorContext: EngineExecutorContext): Unit = {
val startTime = System.currentTimeMillis()
val writer = engineExecutorContext.createResultSetWriter(ResultSetFactory.HTML_TYPE)
val metaData = new LineMetaData(null)
writer.addMetaData(metaData)
writer.addRecord(new LineRecord(htmlContent.toString))
warn(s"Time taken: ${System.currentTimeMillis() - startTime}")
engineExecutorContext.sendResultSet(writer)
}
}