blob: 17e7d93f84c39ca35c2f309c9a3516b320abcd03 [file] [log] [blame]
/*
* Copyright 2019 WeBank
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.engineconn.computation.executor.execute
import java.io.File
import java.util
import com.webank.wedatasphere.linkis.common.io.resultset.{ResultSet, ResultSetWriter}
import com.webank.wedatasphere.linkis.common.io.{FsPath, MetaData, Record}
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.cs.client.utils.ContextServiceUtils
import com.webank.wedatasphere.linkis.cs.storage.CSTableResultSetWriter
import com.webank.wedatasphere.linkis.engineconn.acessible.executor.listener.event.{TaskLogUpdateEvent, TaskProgressUpdateEvent, TaskResultCreateEvent, TaskResultSizeCreatedEvent}
import com.webank.wedatasphere.linkis.engineconn.computation.executor.conf.ComputationExecutorConf
import com.webank.wedatasphere.linkis.engineconn.executor.ExecutorExecutionContext
import com.webank.wedatasphere.linkis.engineconn.executor.entity.Executor
import com.webank.wedatasphere.linkis.engineconn.executor.listener.{EngineConnAsyncListenerBus, EngineConnSyncListenerBus, ExecutorListenerBusContext}
import com.webank.wedatasphere.linkis.governance.common.exception.engineconn.EngineConnExecutorErrorException
import com.webank.wedatasphere.linkis.protocol.engine.JobProgressInfo
import com.webank.wedatasphere.linkis.scheduler.executer.{AliasOutputExecuteResponse, OutputExecuteResponse}
import com.webank.wedatasphere.linkis.storage.resultset.table.TableResultSet
import com.webank.wedatasphere.linkis.storage.resultset.{ResultSetFactory, ResultSetWriter}
import com.webank.wedatasphere.linkis.storage.{LineMetaData, LineRecord}
import org.apache.commons.io.IOUtils
import org.apache.commons.lang.StringUtils
class EngineExecutionContext(executor: ComputationExecutor, executorUser: String = Utils.getJvmUser) extends
ExecutorExecutionContext with Logging {
private val resultSetFactory = ResultSetFactory.getInstance
private var defaultResultSetWriter: ResultSetWriter[_ <: MetaData, _ <: Record] = _
private var resultSize = 0
private val properties: java.util.Map[String, Object] = new util.HashMap[String, Object]()
private var totalParagraph = 0
private var currentParagraph = 0
def getTotalParagraph: Int = totalParagraph
def setTotalParagraph(totalParagraph: Int): Unit = this.totalParagraph = totalParagraph
def getCurrentParagraph: Int = currentParagraph
def setCurrentParagraph(currentParagraph: Int): Unit = this.currentParagraph = currentParagraph
def pushProgress(progress: Float, progressInfo: Array[JobProgressInfo]): Unit = {
val listenerBus = getEngineSyncListenerBus
getJobId.foreach(jId => {
listenerBus.postToAll(TaskProgressUpdateEvent(jId, progress, progressInfo))
})
}
def sendResultSet(resultSetWriter: ResultSetWriter[_ <: MetaData, _ <: Record]): Unit = {
info("Start to send res to entrance")
val fileName = new File(resultSetWriter.toFSPath.getPath).getName
val index = if (fileName.indexOf(".") < 0) fileName.length else fileName.indexOf(".")
val alias = if (fileName.startsWith("_")) fileName.substring(1, index) else fileName.substring(0, fileName.indexOf("_"))
//resultSetWriter.flush()
Utils.tryFinally(sendResultSet(resultSetWriter.toString(), alias)) {
IOUtils.closeQuietly(resultSetWriter)
resultSetWriters synchronized resultSetWriters -= resultSetWriter
}
}
def sendResultSet(output: String): Unit = sendResultSet(output, "_" + aliasNum.getAndIncrement())
def appendTextResultSet(output: String): Unit = {
if (defaultResultSetWriter == null) aliasNum synchronized {
if (defaultResultSetWriter == null) {
defaultResultSetWriter = createDefaultResultSetWriter(ResultSetFactory.TEXT_TYPE)
defaultResultSetWriter.addMetaData(new LineMetaData())
resultSetWriters += defaultResultSetWriter
}
}
defaultResultSetWriter.addRecord(new LineRecord(output))
}
private def sendResultSet(output: String, alias: String): Unit = {
if (StringUtils.isEmpty(output)) return
if (resultSetFactory.isResultSetPath(output) || resultSetFactory.isResultSet(output)) {
val listenerBus = getEngineSyncListenerBus
getJobId.foreach(jId => {
//TODO peacewong executor.getEngineServerContext().getEngineAsyncListenerBus().post(ResultSetCreatedEvent(jId, output, alias))
listenerBus.postToAll(TaskResultCreateEvent(jId, output, alias))
resultSize += 1
})
} else throw new EngineConnExecutorErrorException(50050, "unknown resultSet: " + output)
}
def sendResultSet(outputExecuteResponse: OutputExecuteResponse): Unit = outputExecuteResponse match {
case AliasOutputExecuteResponse(alias, output) => sendResultSet(output, alias)
case output: OutputExecuteResponse => sendResultSet(output.getOutput, "_" + aliasNum.getAndIncrement())
}
def getProperties: java.util.Map[String, Object] = properties
def addProperty(key: String, value: String): Unit = properties.put(key, value)
override protected def getResultSetByType(resultSetType: String): ResultSet[_ <: MetaData, _ <: Record] =
resultSetFactory.getResultSetByType(resultSetType)
override protected def getDefaultResultSetByType: String = resultSetFactory.getResultSetType(0)
def newResultSetWriter(resultSet: ResultSet[_ <: MetaData, _ <: Record],
resultSetPath: FsPath,
alias: String): ResultSetWriter[_ <: MetaData, _ <: Record] = {
//update by peaceWong 20200402
resultSet match {
case result: TableResultSet =>
val contextIDStr = ContextServiceUtils.getContextIDStrByMap(getProperties)
val nodeName = ContextServiceUtils.getNodeNameStrByMap(getProperties)
if (StringUtils.isNotBlank(contextIDStr) && StringUtils.isNotBlank(nodeName)) {
val csWriter = new CSTableResultSetWriter(result, ComputationExecutorConf.ENGINE_RESULT_SET_MAX_CACHE.getValue.toLong, resultSetPath, contextIDStr, nodeName, alias)
csWriter.setProxyUser(executorUser)
csWriter
} else {
ResultSetWriter.getResultSetWriter(resultSet, ComputationExecutorConf.ENGINE_RESULT_SET_MAX_CACHE.getValue.toLong, resultSetPath, executorUser)
}
case _ => ResultSetWriter.getResultSetWriter(resultSet, ComputationExecutorConf.ENGINE_RESULT_SET_MAX_CACHE.getValue.toLong, resultSetPath, executorUser)
}
//update by peaceWong 20200402 end
}
def appendStdout(log: String): Unit = if (!executor.isEngineInitialized) {
executor.info(log)
} else {
val listenerBus = getEngineSyncListenerBus
// jobId.foreach(jId => listenerBus.post(TaskLogUpdateEvent(jId, log)))
getJobId.foreach(jId => listenerBus.postToAll(TaskLogUpdateEvent(jId, log)))
}
override def close(): Unit = {
resultSetWriters.toArray.foreach(sendResultSet)
val listenerBus = getEngineSyncListenerBus
getJobId.foreach(jId => {
listenerBus.postToAll(TaskResultSizeCreatedEvent(jId, resultSize))
})
resultSetWriters.clear()
}
private def getEngineAsyncListenerBus: EngineConnAsyncListenerBus = {
ExecutorListenerBusContext.getExecutorListenerBusContext.getEngineConnAsyncListenerBus
}
private def getEngineSyncListenerBus: EngineConnSyncListenerBus = {
ExecutorListenerBusContext.getExecutorListenerBusContext().getEngineConnSyncListenerBus
}
def getExecutor: Executor = executor
}