| ///* |
| // * Licensed to the Apache Software Foundation (ASF) under one or more |
| // * contributor license agreements. See the NOTICE file distributed with |
| // * this work for additional information regarding copyright ownership. |
| // * The ASF licenses this file to You under the Apache License, Version 2.0 |
| // * (the "License"); you may not use this file except in compliance with |
| // * the License. You may obtain a copy of the License at |
| // * |
| // * http://www.apache.org/licenses/LICENSE-2.0 |
| // * |
| // * Unless required by applicable law or agreed to in writing, software |
| // * distributed under the License is distributed on an "AS IS" BASIS, |
| // * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // * See the License for the specific language governing permissions and |
| // * limitations under the License. |
| // */ |
| //package org.apache.amaterasu.frameworks.spark.dispatcher |
| // |
| //import java.io.File |
| //import java.util |
| // |
| //import org.apache.amaterasu.common.configuration.ClusterConfig |
| //import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers._ |
| //import org.apache.amaterasu.leader.common.utilities.{DataLoader, MemoryFormatParser} |
| //import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration |
| //import org.apache.amaterasu.sdk.frameworks.{FrameworkSetupProvider, RunnerSetupProvider} |
| //import org.apache.commons.lang.StringUtils |
| // |
| //import scala.collection.mutable |
| //import collection.JavaConversions._ |
| // |
| //class SparkSetupProvider extends FrameworkSetupProvider { |
| // |
| // private var env: String = _ |
| // private var conf: ClusterConfig = _ |
| // private lazy val sparkExecConfigurations: mutable.Map[String, Any] = loadSparkConfig |
| // |
| // private val runnerProviders: mutable.Map[String, RunnerSetupProvider] = mutable.Map[String, RunnerSetupProvider]() |
| // |
| // private def loadSparkConfig: mutable.Map[String, Any] = { |
| // |
| // val execData = DataLoader.getExecutorData(env, conf) |
| // val sparkExecConfiguration = execData.getConfigurations.get("spark") |
| // if (sparkExecConfiguration.isEmpty) { |
| // throw new Exception(s"Spark configuration files could not be loaded for the environment $env") |
| // } |
| // collection.mutable.Map(sparkExecConfiguration.toSeq: _*) |
| // |
| // } |
| // |
| // override def init(env: String, conf: ClusterConfig): Unit = { |
| // this.env = env |
| // this.conf = conf |
| // |
| //// this.sparkExecConfigurations = loadSparkConfig |
| // runnerProviders += ("scala" -> SparkScalaRunnerProvider(conf)) |
| // runnerProviders += ("jar" -> SparkSubmitScalaRunnerProvider(conf)) |
| // runnerProviders += ("pyspark" -> new PySparkRunnerProvider(env, conf)) |
| // |
| // } |
| // |
| // override def getGroupIdentifier: String = "spark" |
| // |
| // override def getGroupResources: Array[File] = conf.mode match { |
| // case "mesos" => Array[File](new File(s"spark-${conf.webserver.sparkVersion}.tgz"), new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar")) |
| // case "yarn" => Array[File](new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"), new File(s"executor-${conf.version}-all.jar"), new File(conf.spark.home)) |
| // case _ => Array[File]() |
| // } |
| // |
| // |
| // override def getEnvironmentVariables: util.Map[String, String] = conf.mode match { |
| // case "mesos" => Map[String, String]("SPARK_HOME" -> s"spark-${conf.webserver.sparkVersion}", "SPARK_HOME_DOCKER" -> "/opt/spark/") |
| // case "yarn" => Map[String, String]("SPARK_HOME" -> StringUtils.stripStart(conf.spark.home, "/")) |
| // case _ => Map[String, String]() |
| // } |
| // |
| // override def getDriverConfiguration: DriverConfiguration = { |
| // var cpu: Int = 0 |
| // if (sparkExecConfigurations.get("spark.yarn.am.cores").isDefined) { |
| // cpu = sparkExecConfigurations("spark.yarn.am.cores").toString.toInt |
| // } else if (sparkExecConfigurations.get("spark.driver.cores").isDefined) { |
| // cpu = sparkExecConfigurations("spark.driver.cores").toString.toInt |
| // } else if (conf.spark.opts.contains("yarn.am.cores")) { |
| // cpu = conf.spark.opts("yarn.am.cores").toInt |
| // } else if (conf.spark.opts.contains("driver.cores")) { |
| // cpu = conf.spark.opts("driver.cores").toInt |
| // } else if (conf.yarn.Worker.cores > 0) { |
| // cpu = conf.yarn.Worker.cores |
| // } else { |
| // cpu = 1 |
| // } |
| // var mem: Int = 0 |
| // if (sparkExecConfigurations.get("spark.yarn.am.memory").isDefined) { |
| // mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.yarn.am.memory").toString) |
| // } else if (sparkExecConfigurations.get("spark.driver.memeory").isDefined) { |
| // mem = MemoryFormatParser.extractMegabytes(sparkExecConfigurations("spark.driver.memeory").toString) |
| // } else if (conf.spark.opts.contains("yarn.am.memory")) { |
| // mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("yarn.am.memory")) |
| // } else if (conf.spark.opts.contains("driver.memory")) { |
| // mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("driver.memory")) |
| // } else if (conf.yarn.Worker.memoryMB > 0) { |
| // mem = conf.yarn.Worker.memoryMB |
| // } else if (conf.taskMem > 0) { |
| // mem = conf.taskMem |
| // } else { |
| // mem = 1024 |
| // } |
| // |
| // new DriverConfiguration(mem, cpu) |
| // } |
| // |
| // override def getRunnerProvider(runnerId: String): RunnerSetupProvider = { |
| // runnerProviders(runnerId) |
| // } |
| // |
| // override def getConfigurationItems = Array("sparkConfiguration", "sparkExecutor") |
| //} |