blob: 01cbb4c3c3eb9bb4c7b4b135b752fa063a3da610 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.utils
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.livy.{LivyConf, Logging}
class SparkProcessBuilder(livyConf: LivyConf) extends Logging {
private[this] var _executable: String = livyConf.sparkSubmit()
private[this] var _master: Option[String] = None
private[this] var _deployMode: Option[String] = None
private[this] var _className: Option[String] = None
private[this] var _name: Option[String] = None
private[this] val _conf = mutable.HashMap[String, String]()
private[this] var _driverClassPath: ArrayBuffer[String] = ArrayBuffer()
private[this] var _proxyUser: Option[String] = None
private[this] var _queue: Option[String] = None
private[this] var _env: ArrayBuffer[(String, String)] = ArrayBuffer()
private[this] var _redirectOutput: Option[ProcessBuilder.Redirect] = None
private[this] var _redirectError: Option[ProcessBuilder.Redirect] = None
private[this] var _redirectErrorStream: Option[Boolean] = None
def executable(executable: String): SparkProcessBuilder = {
_executable = executable
this
}
def master(masterUrl: String): SparkProcessBuilder = {
_master = Some(masterUrl)
this
}
def deployMode(deployMode: String): SparkProcessBuilder = {
_deployMode = Some(deployMode)
this
}
def className(className: String): SparkProcessBuilder = {
_className = Some(className)
this
}
def name(name: String): SparkProcessBuilder = {
_name = Some(name)
this
}
def conf(key: String): Option[String] = {
_conf.get(key)
}
def conf(key: String, value: String, admin: Boolean = false): SparkProcessBuilder = {
this._conf(key) = value
this
}
def conf(conf: Traversable[(String, String)]): SparkProcessBuilder = {
conf.foreach { case (key, value) => this.conf(key, value) }
this
}
def driverJavaOptions(driverJavaOptions: String): SparkProcessBuilder = {
conf("spark.driver.extraJavaOptions", driverJavaOptions)
}
def driverClassPath(classPath: String): SparkProcessBuilder = {
_driverClassPath += classPath
this
}
def driverClassPaths(classPaths: Traversable[String]): SparkProcessBuilder = {
_driverClassPath ++= classPaths
this
}
def driverCores(driverCores: Int): SparkProcessBuilder = {
this.driverCores(driverCores.toString)
}
def driverMemory(driverMemory: String): SparkProcessBuilder = {
conf("spark.driver.memory", driverMemory)
}
def driverCores(driverCores: String): SparkProcessBuilder = {
conf("spark.driver.cores", driverCores)
}
def executorCores(executorCores: Int): SparkProcessBuilder = {
this.executorCores(executorCores.toString)
}
def executorCores(executorCores: String): SparkProcessBuilder = {
conf("spark.executor.cores", executorCores)
}
def executorMemory(executorMemory: String): SparkProcessBuilder = {
conf("spark.executor.memory", executorMemory)
}
def numExecutors(numExecutors: Int): SparkProcessBuilder = {
this.numExecutors(numExecutors.toString)
}
def numExecutors(numExecutors: String): SparkProcessBuilder = {
this.conf("spark.executor.instances", numExecutors)
}
def proxyUser(proxyUser: String): SparkProcessBuilder = {
_proxyUser = Some(proxyUser)
this
}
def queue(queue: String): SparkProcessBuilder = {
_queue = Some(queue)
this
}
def env(key: String, value: String): SparkProcessBuilder = {
_env += ((key, value))
this
}
def redirectOutput(redirect: ProcessBuilder.Redirect): SparkProcessBuilder = {
_redirectOutput = Some(redirect)
this
}
def redirectError(redirect: ProcessBuilder.Redirect): SparkProcessBuilder = {
_redirectError = Some(redirect)
this
}
def redirectErrorStream(redirect: Boolean): SparkProcessBuilder = {
_redirectErrorStream = Some(redirect)
this
}
def start(file: Option[String], args: Traversable[String]): LineBufferedProcess = {
var arguments = ArrayBuffer(_executable)
def addOpt(option: String, value: Option[String]): Unit = {
value.foreach { v =>
arguments += option
arguments += v
}
}
def addList(option: String, values: Traversable[String]): Unit = {
if (values.nonEmpty) {
arguments += option
arguments += values.mkString(",")
}
}
addOpt("--master", _master)
addOpt("--deploy-mode", _deployMode)
addOpt("--name", _name)
addOpt("--class", _className)
_conf.foreach { case (key, value) =>
if (key == "spark.submit.pyFiles") {
arguments += "--py-files"
arguments += f"$value"
} else {
arguments += "--conf"
arguments += f"$key=$value"
}
}
addList("--driver-class-path", _driverClassPath)
if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
addOpt("--proxy-user", _proxyUser)
}
addOpt("--queue", _queue)
arguments += file.getOrElse("spark-internal")
arguments ++= args
val argsString = arguments
.map("'" + _.replace("'", "\\'") + "'")
.mkString(" ")
info(s"Running $argsString")
val pb = new ProcessBuilder(arguments.asJava)
val env = pb.environment()
for ((key, value) <- _env) {
env.put(key, value)
}
_redirectOutput.foreach(pb.redirectOutput)
_redirectError.foreach(pb.redirectError)
_redirectErrorStream.foreach(pb.redirectErrorStream)
new LineBufferedProcess(pb.start(), livyConf.getInt(LivyConf.SPARK_LOGS_SIZE))
}
}