blob: 897c9dc75320aca8fb5bbd96e6c510a677a8d57c [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.enginemanager.hive.process
import java.nio.file.Paths
import com.webank.wedatasphere.linkis.common.conf.Configuration
import com.webank.wedatasphere.linkis.enginemanager.conf.EnvConfiguration.{DEFAULT_JAVA_OPTS, JAVA_HOME, engineGCLogPath}
import com.webank.wedatasphere.linkis.enginemanager.hive.conf.HiveEngineConfiguration
import com.webank.wedatasphere.linkis.enginemanager.impl.UserEngineResource
import com.webank.wedatasphere.linkis.enginemanager.process.JavaProcessEngineBuilder
import com.webank.wedatasphere.linkis.enginemanager.{AbstractEngineCreator, EngineResource}
import com.webank.wedatasphere.linkis.protocol.engine.RequestEngine
import org.apache.commons.lang.StringUtils
import org.slf4j.LoggerFactory
import scala.collection.mutable.ArrayBuffer
/**
* created by cooperyang on 2018/11/21
* Description:
*/
class HiveQLProcessBuilder extends JavaProcessEngineBuilder{
private val LOG = LoggerFactory.getLogger(getClass)
//private var request:RequestEngine = _
override protected def getExtractJavaOpts: String = {
val javaOpts = new ArrayBuffer[String]()
javaOpts += HiveEngineConfiguration.HIVE_CLIENT_OPTS.getValue(request.properties)
javaOpts.mkString(" ")
""
}
override protected def getAlias(request: RequestEngine): String = {
HiveEngineConfiguration.HIVE_ENGINE_SPRING_APPLICATION_NAME.getValue
}
/**
* get extract classpaths for hive to run
* @return
*/
override protected def getExtractClasspath: Array[String] = {
if (StringUtils.isNotBlank(HiveEngineConfiguration.HIVE_CLIENT_EXTRACLASSPATH.getValue)){
HiveEngineConfiguration.HIVE_CLIENT_EXTRACLASSPATH.getValue.split(",")
}else Array.empty
}
/**
* check classpath of java cmd process
* @param jarOrFiles Array[String]
*/
override protected def classpathCheck(jarOrFiles: Array[String]): Unit = {
for(jarOrFile <- jarOrFiles){
checkJarOrFile(jarOrFile)
}
}
//todo Check the jar of the classpath(对classpath的jar进行检查)
private def checkJarOrFile(jarOrFile:String):Unit = {
}
override def build(engineRequest: EngineResource, request: RequestEngine): Unit = {
this.request = request
userEngineResource = engineRequest.asInstanceOf[UserEngineResource]
val javaHome = JAVA_HOME.getValue(request.properties)
if(StringUtils.isEmpty(javaHome)) {
warn("We cannot find the java home, use java to run storage repl web server.")
commandLine += "java"
} else {
commandLine += Paths.get(javaHome, "bin/java").toAbsolutePath.toFile.getAbsolutePath
}
if (request.properties.containsKey(HiveEngineConfiguration.HIVE_CLIENT_MEMORY.key)){
val settingClientMemory = request.properties.get(HiveEngineConfiguration.HIVE_CLIENT_MEMORY.key)
if (!settingClientMemory.toLowerCase().endsWith("g")){
request.properties.put(HiveEngineConfiguration.HIVE_CLIENT_MEMORY.key, settingClientMemory + "g")
}
//request.properties.put(HiveEngineConfiguration.HIVE_CLIENT_MEMORY.key, request.properties.get(HiveEngineConfiguration.HIVE_CLIENT_MEMORY.key)+"g")
}
val clientMemory = HiveEngineConfiguration.HIVE_CLIENT_MEMORY.getValue(request.properties).toString
if (clientMemory.toLowerCase().endsWith("g")){
commandLine += ("-Xmx" + clientMemory.toLowerCase())
commandLine += ("-Xms" + clientMemory.toLowerCase())
}else{
commandLine += ("-Xmx" + clientMemory + "g")
commandLine += ("-Xms" + clientMemory + "g")
}
val javaOPTS = getExtractJavaOpts
val alias = getAlias(request)
if(StringUtils.isNotEmpty(DEFAULT_JAVA_OPTS.getValue))
DEFAULT_JAVA_OPTS.getValue.format(engineGCLogPath(port, userEngineResource.getUser, alias)).split("\\s+").foreach(commandLine += _)
if(StringUtils.isNotEmpty(javaOPTS)) javaOPTS.split("\\s+").foreach(commandLine += _)
//engineLogJavaOpts(port, alias).trim.split(" ").foreach(commandLine += _)
if(Configuration.IS_TEST_MODE.getValue) {
val port = AbstractEngineCreator.getNewPort
info(s"$toString open debug mode with port $port.")
commandLine += s"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$port"
}
var classpath = getClasspath(request.properties, getExtractClasspath)
classpath = classpath ++ request.properties.get("jars").split(",")
classpathCheck(classpath)
commandLine += "-Djava.library.path=/appcom/Install/hadoop/lib/native"
commandLine += "-cp"
commandLine += classpath.mkString(":")
commandLine += "com.webank.wedatasphere.linkis.engine.DataWorkCloudEngineApplication"
}
// override def build(engineRequest: EngineResource, request: RequestEngine): Unit = {
// import scala.collection.JavaConversions._
// request.properties foreach {case (k, v) => LOG.info(s"request key is $k, value is $v")}
// this.request = request
// super.build(engineRequest, request)
//
// }
override protected val addApacheConfigPath: Boolean = true
}