blob: 6ecfe726ba2398a8d983060b80252a5b539e1e0a [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.ecm.core.launch
import java.io.{File, InputStream, OutputStream}
import java.net.ServerSocket
import com.webank.wedatasphere.linkis.common.conf.CommonVars
import com.webank.wedatasphere.linkis.common.exception.ErrorException
import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils}
import com.webank.wedatasphere.linkis.ecm.core.conf.ECMErrorCode
import com.webank.wedatasphere.linkis.ecm.core.exception.ECMCoreException
import com.webank.wedatasphere.linkis.governance.common.conf.GovernanceCommonConf
import com.webank.wedatasphere.linkis.governance.common.utils.{EngineConnArgumentsBuilder, EngineConnArgumentsParser}
import com.webank.wedatasphere.linkis.manager.engineplugin.common.conf.EnvConfiguration
import com.webank.wedatasphere.linkis.manager.engineplugin.common.launch.entity.EngineConnLaunchRequest
import com.webank.wedatasphere.linkis.manager.engineplugin.common.launch.process.Environment._
import com.webank.wedatasphere.linkis.manager.engineplugin.common.launch.process.LaunchConstants._
import com.webank.wedatasphere.linkis.manager.engineplugin.common.launch.process.{Environment, ProcessEngineConnLaunchRequest}
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.commons.lang.StringUtils
import scala.collection.JavaConversions._
trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging {
private var request: ProcessEngineConnLaunchRequest = _
private var engineConnManagerEnv: EngineConnManagerEnv = _
private var discoveryMsgGenerator: DiscoveryMsgGenerator = _
private var processBuilder: ProcessEngineCommandBuilder = _
private var preparedExecFile: String = _
private var process: Process = _
private var randomPortNum = 1
private var engineConnPort: String = _
protected def newProcessEngineConnCommandBuilder(): ProcessEngineCommandBuilder = new UnixProcessEngineCommandBuilder
protected def newProcessEngineConnCommandExec(command: Array[String], workDir: String): ProcessEngineCommandExec =
new ShellProcessEngineCommandExec(command, workDir)
override def setEngineConnLaunchRequest(request: EngineConnLaunchRequest): Unit = request match {
case processEngineConnLaunchRequest: ProcessEngineConnLaunchRequest =>
this.request = processEngineConnLaunchRequest
case _ => //TODO exception
}
override def setEngineConnManagerEnv(engineConnManagerEnv: EngineConnManagerEnv): Unit = this.engineConnManagerEnv = engineConnManagerEnv
override def getEngineConnManagerEnv(): EngineConnManagerEnv = this.engineConnManagerEnv
def setDiscoveryMsgGenerator(discoveryMsgGenerator: DiscoveryMsgGenerator): Unit = this.discoveryMsgGenerator = discoveryMsgGenerator
def getEngineConnLaunchRequest: EngineConnLaunchRequest = request
private def initializeEnv(): Unit = {
val environment = request.environment
def putIfExists(enum: Environment): Unit = {
val key = enum.toString
val conf = CommonVars.apply(key, "")
if(StringUtils.isNotBlank(conf.getValue)) environment.put(key, conf.getValue)
}
Environment.values foreach {
case USER => environment.put(USER.toString, request.user)
case ECM_HOME => environment.put(ECM_HOME.toString, engineConnManagerEnv.engineConnManagerHomeDir)
case PWD => environment.put(PWD.toString, engineConnManagerEnv.engineConnWorkDir)
case LOG_DIRS => environment.put(LOG_DIRS.toString, engineConnManagerEnv.engineConnLogDirs)
case TEMP_DIRS => environment.put(TEMP_DIRS.toString, engineConnManagerEnv.engineConnTempDirs)
case ECM_HOST => environment.put(ECM_HOST.toString, engineConnManagerEnv.engineConnManagerHost)
case ECM_PORT => environment.put(ECM_PORT.toString, engineConnManagerEnv.engineConnManagerPort)
case HADOOP_HOME => putIfExists(HADOOP_HOME)
case HADOOP_CONF_DIR => putIfExists(HADOOP_CONF_DIR)
case HIVE_CONF_DIR => putIfExists(HIVE_CONF_DIR)
case RANDOM_PORT => environment.put(RANDOM_PORT.toString, findAvailPort().toString)
case _ =>
}
}
private def findAvailPort(): Int = {
val socket = new ServerSocket(0)
Utils.tryFinally(socket.getLocalPort)(IOUtils.closeQuietly(socket))
}
private def setMoreAvailPort(value: String): Unit = {
val key = RANDOM_PORT.toString + randomPortNum
// TODO just replace it by sorted RANDOM_PORT, since only one RANDOM_PORT is used now.
if(value.contains(key)) {
processBuilder.setEnv(key, findAvailPort().toString)
randomPortNum += 1
}
}
override def launch(): Unit = {
request.necessaryEnvironments.foreach{e =>
val env = CommonVars(e, "")
if(StringUtils.isEmpty(env.getValue))
throw new ErrorException(30000, s"Necessary environment $e is not exists!(必须的环境变量 $e 不存在!)") //TODO exception
else request.environment.put(e, env.getValue)
}
prepareCommand()
val exec = newProcessEngineConnCommandExec(sudoCommand(request.user, execFile.mkString(" ")), engineConnManagerEnv.engineConnWorkDir)
exec.execute()
process = exec.getProcess
}
protected def execFile: Array[String]
def getEngineConnPort: String = engineConnPort
protected def getCommandArgs: Array[String] = {
if (request.creationDesc.properties.exists { case (k, v) => k.contains(" ") || (v != null && v.contains(" ")) })
throw new ErrorException(30000, "Startup parameters contain spaces!(启动参数中包含空格!)") //TODO exception
val arguments = EngineConnArgumentsBuilder.newBuilder()
engineConnPort = findAvailPort().toString
var springConf = Map("spring.application.name" -> GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue,
"server.port" -> engineConnPort, "spring.profiles.active" -> "engineconn",
"logging.config" -> s"classpath:${EnvConfiguration.LOG4J2_XML_FILE.getValue}") ++: discoveryMsgGenerator.generate(engineConnManagerEnv)
request.creationDesc.properties.filter(_._1.startsWith("spring.")).foreach { case (k, v) =>
springConf = springConf += (k -> v)
}
arguments.addSpringConf(springConf.toMap)
var engineConnConf = Map("ticketId" -> request.ticketId, "user" -> request.user)
engineConnConf = engineConnConf ++: request.labels.map(l => EngineConnArgumentsParser.LABEL_PREFIX + l.getLabelKey -> l.getStringValue).toMap
engineConnConf = engineConnConf ++: request.creationDesc.properties.filterNot(_._1.startsWith("spring.")).toMap
arguments.addEngineConnConf(engineConnConf)
EngineConnArgumentsParser.getEngineConnArgumentsParser.parseToArgs(arguments.build())
}
override def kill(): Unit = {
if(process != null){
process.destroy()
}
}
override def isAlive: Boolean = {
if(process != null){
process.isAlive
}else{
false
}
}
protected def prepareCommand(): Unit = {
processBuilder = newProcessEngineConnCommandBuilder()
initializeEnv()
//TODO env需要考虑顺序问题
val classPath = request.environment.remove(CLASSPATH.toString)
request.environment.foreach{ case (k, v) =>
val value = v.replaceAll(CLASS_PATH_SEPARATOR, File.pathSeparator)
setMoreAvailPort(value)
processBuilder.setEnv(k, processBuilder.replaceExpansionMarker(value))
}
processBuilder.setEnv(CLASSPATH.toString, processBuilder.replaceExpansionMarker(classPath.replaceAll(CLASS_PATH_SEPARATOR, File.pathSeparator)))
engineConnManagerEnv.linkDirs.foreach{case (k, v) => processBuilder.link(k, v)}
val execCommand = request.commands.map(processBuilder.replaceExpansionMarker(_)) ++ getCommandArgs
//execCommand = sudoCommand(request.user, execCommand.mkString(" "))
execCommand.foreach(setMoreAvailPort)
processBuilder.setCommand(execCommand)
preparedExecFile = new File(engineConnManagerEnv.engineConnWorkDir, "engineConnExec.sh").getPath
val output = getFileOutputStream
Utils.tryFinally(processBuilder.writeTo(output))(output.close())
}
protected def sudoCommand(user: String, command: String): Array[String]
protected def getFileOutputStream: OutputStream = FileUtils.openOutputStream(new File(preparedExecFile))
protected def getPreparedExecFile: String = preparedExecFile
def getProcessInputStream: InputStream = {
if(process != null){
process.getInputStream
}else{
throw new ECMCoreException(ECMErrorCode.PROCESS_WAITFOR_ERROR, "process is not be launch, can not get InputStream!")
}
}
def processWaitFor:Int = {
if(process != null){
process.waitFor
}else{
throw new ECMCoreException(ECMErrorCode.PROCESS_WAITFOR_ERROR, "process is not be launch, can not get terminated code by wait!")
}
}
}