blob: 1f6a28940c3472c6cadf7cf96dd73ae8c2ffece1 [file] [log] [blame]
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.enginemanager.configuration
import com.webank.wedatasphere.linkis.common.conf.{CommonVars, Configuration}
import com.webank.wedatasphere.linkis.common.utils.{ClassUtils, Logging}
import com.webank.wedatasphere.linkis.engine.factory.SparkEngineExecutorFactory
import com.webank.wedatasphere.linkis.enginemanager.AbstractEngineCreator
import scala.collection.mutable.ArrayBuffer
/**
* Created by allenlliu on 2019/4/8.
*/
object SparkConfiguration extends Logging {
val SPARK_MAX_PARALLELISM_USERS = CommonVars[Int]("wds.linkis.engine.spark.user.parallelism", 100)
val SPARK_USER_MAX_WAITING_SIZE = CommonVars[Int]("wds.linkis.engine.spark.user.waiting.max", 100)
val SPARK_SESSION_HOOK = CommonVars[String]("wds.linkis.engine.spark.session.hook", "")
val SPARK_LANGUAGE_REPL_INIT_TIME = CommonVars[String]("wds.linkis.engine.spark.language-repl.init.time", new String("30s"))
val SPARK_ALLOW_REQUEST_ALL_YARN_MEMORY = CommonVars[String]("wds.linkis.engine.spark.allow.all-memory.when.queue", new String("60g"))
val SPARK_ALLOW_REQUEST_ALL_YARN_CORES = CommonVars[Int]("wds.linkis.engine.spark.allow.all-cores.when.queue", 30)
val SPARK_USER_MAX_ALLOCATE_SESSIONS = CommonVars[Int]("wds.linkis.engine.spark.user.sessions.max", 5)
val SPARK_USER_MAX_ALLOCATE_YARN_MEMORY = CommonVars[String]("wds.linkis.engine.spark.user.yarn.memory.max", new String("100g"))
val SPARK_USER_MAX_ALLOCATE_YARN_CORES = CommonVars[Int]("wds.linkis.engine.spark.user.cores.max", 50)
val SPARK_USER_MAX_ALLOCATE_DRIVER_MEMORY = CommonVars[String]("wds.linkis.engine.spark.user.driver.memory.max", new String("15g"))
val SPARK_USER_MAX_ALLOCATE_DRIVER_CORES = SPARK_USER_MAX_ALLOCATE_SESSIONS
val SPARK_USER_MAX_RESOURCE_IN_QUEUE = CommonVars[Float]("wds.linkis.engine.spark.user.queue.resources.max", 0.6f)
val SPARK_DANGER_QUEUE_USED_CAPACITY = CommonVars[Float]("wds.linkis.engine.spark.danger.queue.used", 0.2f)
val SPARK_DANGER_QUEUE_USER_ALLOCATE_SESSION = CommonVars[Int]("wds.linkis.engine.spark.danger.user.sessions.max", 2)
val SPARK_WARN_QUEUE_USED_CAPACITY = CommonVars[Float]("wds.linkis.engine.spark.warning.queue.used", 0.5f)
val SPARK_WARN_QUEUE_USER_ALLOCATE_SESSION = CommonVars[Int]("wds.linkis.engine.spark.warning.user.sessions.max", 3)
val PROXY_USER = CommonVars[String]("spark.proxy.user", "${UM}")
val SPARK_CLIENT_MODE = "client"
val SPARK_CLUSTER_MODE = "cluster"
val SPARK_DEPLOY_MODE = CommonVars[String]("spark.submit.deployMode", SPARK_CLIENT_MODE)
val SPARK_APPLICATION_JARS = CommonVars[String]("spark.application.jars", "", "User-defined jars, separated by English, must be uploaded to HDFS first, and must be full path to HDFS.(用户自定义jar包,多个以英文,隔开,必须先上传到HDFS,且需为HDFS全路径。)")
val SPARK_EXTRA_JARS = CommonVars[String]("spark.jars", "", "Additional jar package, Driver and Executor take effect(额外的jar包,Driver和Executor生效)")
val MAPRED_OUTPUT_COMPRESS = CommonVars[String]("mapred.output.compress", "true", "Whether the map output is compressed(map输出结果是否压缩)")
val MAPRED_OUTPUT_COMPRESSION_CODEC = CommonVars[String]("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec", "Map output compression method(map输出结果压缩方式)")
val SPARK_MASTER = CommonVars[String]("spark.master", "yarn", "Default master(默认master)")
val SPARK_OUTPUTDIR = CommonVars[String]("spark.outputDir", "/home/georgeqiao", "Default output path(默认输出路径)")
val DWC_SPARK_USEHIVECONTEXT = CommonVars[Boolean]("wds.linkis.spark.useHiveContext", true)
val ENGINE_JAR = CommonVars[String]("wds.linkis.enginemanager.core.jar", ClassUtils.jarOfClass(classOf[SparkEngineExecutorFactory]).head)
val SPARK_DRIVER_CLASSPATH = CommonVars[String]("wds.linkis.spark.driver.conf.mainjar", "")
val SPARK_DRIVER_EXTRA_JAVA_OPTIONS = CommonVars[String]("spark.driver.extraJavaOptions", "\"-Dwds.linkis.configuration=linkis-engine.properties " + getJavaRemotePort + "\"")
val DEFAULT_JAVA_OPTS = CommonVars[String]("wds.linkis.engine.javaOpts.default", "-server -XX:+UseG1GC -XX:MaxPermSize=250m -XX:PermSize=128m " +
"-Xloggc:%s -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Dwds.linkis.configuration=linkis-engine.properties")
val SPARK_ML_BUCKET_FIELDS = CommonVars[String]("wds.linkis.engine.spark.ml.bucketFields", "age[0,18,30,60,100]")
val SPARK_SUBMIT_CMD = CommonVars[String]("wds.linkis.engine.spark.submit.cmd", "spark-submit")
private var Ports: ArrayBuffer[Int] = _
def getJavaRemotePort = {
if (Configuration.IS_TEST_MODE.getValue) {
val r = new scala.util.Random()
val port = 1024 + r.nextInt((65536 - 1024) + 1)
info(s"open debug mode with port $port.")
s"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=$port"
} else {
""
}
}
private def getAvailablePort: Int = synchronized {
var port = AbstractEngineCreator.getNewPort
info("Get new port " + port)
if (Ports == null) {
info("Get inInitPorts is null ")
Ports = ArrayBuffer(0, 1)
info("Current ports is " + Ports.toList.toString())
}
while (Ports.contains(port)) {
if (AbstractEngineCreator != null) {
port = AbstractEngineCreator.getNewPort
}
}
Ports += port
port
}
}