blob: ba7ff03f02101efdf9cadd28aa5343d13daeb26e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.amaterasu.executor.execution.actions.runners.spark
import java.io._
import com.jcabi.aether.Aether
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.dataobjects.ExecData
import org.apache.amaterasu.common.execution.actions.Notifier
import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies, PythonPackage}
import org.apache.amaterasu.common.logging.Logging
import org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark.PySparkRunner
import org.apache.amaterasu.executor.execution.actions.runners.spark.SparkSql.SparkSqlRunner
import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider}
import org.apache.spark.repl.amaterasu.runners.spark.{SparkRunnerHelper, SparkScalaRunner}
import org.eclipse.aether.util.artifact.JavaScopes
import org.sonatype.aether.repository.RemoteRepository
import org.sonatype.aether.util.artifact.DefaultArtifact
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
import scala.sys.process._
class SparkRunnersProvider extends RunnersProvider with Logging {
private val runners = new TrieMap[String, AmaterasuRunner]
private var shellLoger = ProcessLogger(
(o: String) => log.info(o),
(e: String) => log.error(e)
)
private var conf: Option[Map[String, Any]] = _
private var executorEnv: Option[Map[String, Any]] = _
private var clusterConfig: ClusterConfig = _
override def init(execData: ExecData,
jobId: String,
outStream: ByteArrayOutputStream,
notifier: Notifier,
executorId: String,
config: ClusterConfig,
hostName: String): Unit = {
shellLoger = ProcessLogger(
(o: String) => log.info(o),
(e: String) => log.error("", e)
)
clusterConfig = config
var jars = Seq.empty[String]
if (execData.deps != null) {
jars ++= getDependencies(execData.deps)
}
if (execData.pyDeps != null &&
execData.pyDeps.packages.nonEmpty) {
loadPythonDependencies(execData.pyDeps, notifier)
}
conf = execData.configurations.get("spark")
executorEnv = execData.configurations.get("spark_exec_env")
val sparkAppName = s"job_${jobId}_executor_$executorId"
SparkRunnerHelper.notifier = notifier
val spark = SparkRunnerHelper.createSpark(execData.env, sparkAppName, jars, conf, executorEnv, config, hostName)
lazy val sparkScalaRunner = SparkScalaRunner(execData.env, jobId, spark, outStream, notifier, jars)
sparkScalaRunner.initializeAmaContext(execData.env)
runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner)
var pypath = ""
// TODO: get rid of hard-coded version
config.mode match {
case "yarn" =>
pypath = s"$$PYTHONPATH:$$SPARK_HOME/python:$$SPARK_HOME/python/build:${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip:${new File(".").getAbsolutePath}"
case "mesos" =>
pypath = s"${new File(".").getAbsolutePath}/miniconda/pkgs:${new File(".").getAbsolutePath}"
}
lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, pypath, execData.pyDeps, config)
runners.put(pySparkRunner.getIdentifier, pySparkRunner)
lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark)
runners.put(sparkSqlRunner.getIdentifier, sparkSqlRunner)
}
private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
val channel = pythonPackage.channel.getOrElse("anaconda")
if (channel == "anaconda") {
Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
} else {
Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
}
}
private def installAnacondaOnNode(): Unit = {
// TODO: get rid of hard-coded version
this.clusterConfig.mode match {
case "yarn" => Seq("sh", "-c", "export HOME=$PWD && ./miniconda.sh -b -p miniconda") ! shellLoger
case "mesos" => Seq("sh", "Miniconda2-latest-Linux-x86_64.sh", "-b", "-p", "miniconda") ! shellLoger
}
Seq("bash", "-c", "export HOME=$PWD && ./miniconda/bin/python -m conda install -y conda-build") ! shellLoger
Seq("bash", "-c", "ln -s spark/python/pyspark miniconda/pkgs/pyspark") ! shellLoger
}
private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
notifier.info("loading anaconda evn")
installAnacondaOnNode()
val codegenPackage = PythonPackage("codegen", channel = Option("auto"))
installAnacondaPackage(codegenPackage)
try {
// notifier.info("loadPythonDependencies #5")
deps.packages.foreach(pack => {
pack.index.getOrElse("anaconda").toLowerCase match {
case "anaconda" => installAnacondaPackage(pack)
// case "pypi" => installPyPiPackage(pack)
}
})
}
catch {
case rte: RuntimeException =>
val sw = new StringWriter
rte.printStackTrace(new PrintWriter(sw))
notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}")
case e: Exception =>
val sw = new StringWriter
e.printStackTrace(new PrintWriter(sw))
notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}")
}
}
override def getGroupIdentifier: String = "spark"
override def getRunner(id: String): AmaterasuRunner = runners(id)
private def getDependencies(deps: Dependencies): Seq[String] = {
// adding a local repo because Aether needs one
val repo = new File(System.getProperty("java.io.tmpdir"), "ama-repo")
val remotes = deps.repos.map(r =>
new RemoteRepository(
r.id,
r.`type`,
r.url
)).toList.asJava
val aether = new Aether(remotes, repo)
deps.artifacts.flatMap(a => {
aether.resolve(
new DefaultArtifact(a.groupId, a.artifactId, "", "jar", a.version),
JavaScopes.RUNTIME
).map(a => a)
}).map(x => x.getFile.getAbsolutePath)
}
}