blob: 32f1f180ad69cd658c9bd0f5e095ea64e6e64ff2 [file] [log] [blame]
package com.webank.wedatasphere.linkis.engineconn.launch
import com.webank.wedatasphere.linkis.common.ServiceInstance
import com.webank.wedatasphere.linkis.common.conf.{CommonVars, Configuration}
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.engineconn.common.creation.{DefaultEngineCreationContext, EngineCreationContext}
import com.webank.wedatasphere.linkis.engineconn.common.engineconn.EngineConn
import com.webank.wedatasphere.linkis.engineconn.common.hook.EngineConnHook
import com.webank.wedatasphere.linkis.engineconn.core.EngineConnObject
import com.webank.wedatasphere.linkis.engineconn.core.engineconn.EngineConnManager
import com.webank.wedatasphere.linkis.engineconn.core.execution.{AbstractEngineConnExecution, EngineConnExecution}
import com.webank.wedatasphere.linkis.engineconn.core.hook.ShutdownHook
import com.webank.wedatasphere.linkis.engineconn.core.util.EngineConnUtils
import com.webank.wedatasphere.linkis.governance.common.conf.GovernanceCommonConf
import com.webank.wedatasphere.linkis.governance.common.exception.engineconn.{EngineConnExecutorErrorCode, EngineConnExecutorErrorException}
import com.webank.wedatasphere.linkis.governance.common.utils.EngineConnArgumentsParser
import com.webank.wedatasphere.linkis.manager.engineplugin.common.launch.process.Environment
import com.webank.wedatasphere.linkis.manager.label.builder.factory.{LabelBuilderFactory, LabelBuilderFactoryContext}
import com.webank.wedatasphere.linkis.manager.label.entity.Label
import org.apache.commons.lang.exception.ExceptionUtils
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
object EngineConnServer extends Logging {
private val engineCreationContext: EngineCreationContext = new DefaultEngineCreationContext
private val labelBuilderFactory: LabelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
def main(args: Array[String]): Unit = {
info("<<---------------------EngineConnServer Start --------------------->>")
try {
// 1. 封装EngineCreationContext
init(args)
val isTestMode = Configuration.IS_TEST_MODE.getValue(engineCreationContext.getOptions)
if(isTestMode) {
info(s"Step into test mode, pause 30s if debug is required. If you want to disable test mode, please set ${Configuration.IS_TEST_MODE.key} = false.")
Utils.sleepQuietly(30000)
}
info("Finished to create EngineCreationContext, EngineCreationContext content: " + EngineConnUtils.GSON.toJson(engineCreationContext))
EngineConnHook.getEngineConnHooks.foreach(_.beforeCreateEngineConn(getEngineCreationContext))
info("Finished to execute hook of beforeCreateEngineConn.")
//2. 创建EngineConn
val engineConn = getEngineConnManager.createEngineConn(getEngineCreationContext)
info(s"Finished to create ${engineConn.getEngineConnType}EngineConn.")
EngineConnHook.getEngineConnHooks.foreach(_.beforeExecutionExecute(getEngineCreationContext, engineConn))
info("Finished to execute all hooks of beforeExecutionExecute.")
//3. 注册的executions 执行
Utils.tryThrow(executeEngineConn(engineConn)){ t =>
error(s"Init executors error. Reason: ${ExceptionUtils.getRootCauseMessage(t)}", t)
throw new EngineConnExecutorErrorException(EngineConnExecutorErrorCode.INIT_EXECUTOR_FAILED, "Init executors failed. ", t)
}
EngineConnObject.setReady()
info("Finished to execute executions.")
EngineConnHook.getEngineConnHooks.foreach(_.afterExecutionExecute(getEngineCreationContext, engineConn))
info("Finished to execute hook of afterExecutionExecute")
EngineConnHook.getEngineConnHooks.foreach(_.afterEngineServerStartSuccess(getEngineCreationContext, engineConn))
} catch {
case t: Throwable =>
EngineConnHook.getEngineConnHooks.foreach(_.afterEngineServerStartFailed(getEngineCreationContext, t))
error("EngineConnServer Start Failed", t)
System.exit(1)
}
//4. 等待Executions执行完毕
ShutdownHook.getShutdownHook.waitForStopOrError()
info("<<---------------------EngineConnServer Exit --------------------->>")
System.exit(ShutdownHook.getShutdownHook.getExitCode())
}
/**
*
* @param args main函数入参
*/
private def init(args: Array[String]): Unit = {
val arguments = EngineConnArgumentsParser.getEngineConnArgumentsParser.parseToObj(args)
val engineConf = arguments.getEngineConnConfMap
this.engineCreationContext.setUser(engineConf.getOrElse("user", Utils.getJvmUser))
this.engineCreationContext.setTicketId(engineConf.getOrElse("ticketId", ""))
val host = CommonVars(Environment.ECM_HOST.toString, "127.0.0.1").getValue
val port = CommonVars(Environment.ECM_PORT.toString, "80").getValue
this.engineCreationContext.setEMInstance(ServiceInstance(GovernanceCommonConf.ENGINE_CONN_MANAGER_SPRING_NAME.getValue, s"$host:$port"))
val labels = new ArrayBuffer[Label[_]]
val labelArgs = engineConf.filter(_._1.startsWith(EngineConnArgumentsParser.LABEL_PREFIX))
if (labelArgs.nonEmpty) {
labelArgs.foreach { case (key, value) =>
labels += labelBuilderFactory.createLabel[Label[_]](key.replace(EngineConnArgumentsParser.LABEL_PREFIX, ""), value)
}
engineCreationContext.setLabels(labels.toList)
}
val jMap = new java.util.HashMap[String, String](engineConf.size)
jMap.putAll(engineConf)
this.engineCreationContext.setOptions(jMap)
this.engineCreationContext.setArgs(args)
EngineConnObject.setEngineCreationContext(this.engineCreationContext)
info("Finished to init engineCreationContext: " + EngineConnUtils.GSON.toJson(engineCreationContext))
}
private def executeEngineConn(engineConn: EngineConn): Unit = {
EngineConnExecution.getEngineConnExecutions.foreach{
case execution: AbstractEngineConnExecution =>
info(s"Ready to execute ${execution.getClass.getSimpleName}.")
execution.execute(getEngineCreationContext, engineConn)
if(execution.returnAfterMeExecuted(getEngineCreationContext, engineConn)) return
case execution =>
info(s"Ready to execute ${execution.getClass.getSimpleName}.")
execution.execute(getEngineCreationContext, engineConn)}
}
def getEngineCreationContext: EngineCreationContext = this.engineCreationContext
private def getEngineConnManager: EngineConnManager = EngineConnManager.getEngineConnManager
}