package org.apache.amaterasu.frameworks.spark.dispatcher
import java.util
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers._
import org.apache.amaterasu.leader.common.utilities.{DataLoader, MemoryFormatParser}
import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration
import org.apache.amaterasu.sdk.frameworks.{FrameworkSetupProvider, RunnerSetupProvider}
import org.apache.commons.lang.StringUtils
import scala.collection.mutable
import collection.JavaConversions._
class SparkSetupProvider extends FrameworkSetupProvider {
private var env: String = _
private var conf: ClusterConfig = _
private lazy val sparkExecConfigurations: mutable.Map[String, Any] = loadSparkConfig
private val runnerProviders: mutable.Map[String, RunnerSetupProvider] = mutable.Map[String, RunnerSetupProvider]()
private def loadSparkConfig: mutable.Map[String, Any] = {
println(s"===> env=$env")
val execData = DataLoader.getExecutorData(env, conf)
val sparkExecConfiguration = execData.getConfigurations.get("spark")
if (sparkExecConfiguration.isEmpty) {
throw new Exception(s"Spark configuration files could not be loaded for the environment $env")
collection.mutable.Map(sparkExecConfiguration.toSeq: _*)
override def init(env: String, conf: ClusterConfig): Unit = {
this.env = env
this.conf = conf
// this.sparkExecConfigurations = loadSparkConfig
runnerProviders += ("scala" -> SparkScalaRunnerProvider(conf))
runnerProviders += ("jar" -> SparkSubmitScalaRunnerProvider(conf))
runnerProviders += ("pyspark" -> PySparkRunnerProvider(conf))
override def getGroupIdentifier: String = "spark"
override def getGroupResources: Array[File] = conf.mode match {
case "mesos" => Array[File](new File(s"spark-${conf.Webserver.sparkVersion}.tgz"), new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"))
case "yarn" => Array[File](new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"), new File(s"executor-${conf.version}-all.jar")) ++ new File(conf.spark.home).listFiles
case _ => Array[File]()
override def getEnvironmentVariables: util.Map[String, String] = conf.mode match {
case "mesos" => Map[String, String]("SPARK_HOME" -> s"spark-${conf.Webserver.sparkVersion}", "SPARK_HOME_DOCKER" -> "/opt/spark/")
case "yarn" => Map[String, String]("SPARK_HOME" -> StringUtils.stripStart(conf.spark.home, "/"))
case _ => Map[String, String]()
override def getDriverConfiguration: DriverConfiguration = {
var cpu: Int = 0
if (sparkExecConfigurations.get("").isDefined) {
cpu = sparkExecConfigurations("").toString.toInt
} else if (sparkExecConfigurations.get("spark.driver.cores").isDefined) {
cpu = sparkExecConfigurations("spark.driver.cores").toString.toInt
} else if (conf.spark.opts.contains("")) {
cpu = conf.spark.opts("").toInt
} else if (conf.spark.opts.contains("driver.cores")) {
cpu = conf.spark.opts("driver.cores").toInt
} else if (conf.yarn.Worker.cores > 0) {
cpu = conf.yarn.Worker.cores
} else {
cpu = 1
var mem: Int = 0
if (sparkExecConfigurations.get("").isDefined) {
mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("").toString)
} else if (sparkExecConfigurations.get("spark.driver.memeory").isDefined) {
mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.driver.memeory").toString)
} else if (conf.spark.opts.contains("")) {
mem = MemoryFormatParser.extractMegabytes(conf.spark.opts(""))
} else if (conf.spark.opts.contains("driver.memory")) {
mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("driver.memory"))
} else if (conf.yarn.Worker.memoryMB > 0) {
mem = conf.yarn.Worker.memoryMB
} else if (conf.taskMem > 0) {
mem = conf.taskMem
} else {
mem = 1024
new DriverConfiguration(mem, cpu)
override def getRunnerProvider(runnerId: String): RunnerSetupProvider = {
override def getConfigurationItems = Array("sparkConfiguration", "sparkExecutor")