Merge pull request #20 from nadav-har-tzvi/version-0.2.0-incubating-rc3
PySpark fixes for YARN and Mesos
diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
index 7c9f924..3661b48 100755
--- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
@@ -71,8 +71,8 @@
var memoryMB: Int = 1024
def load(props: Properties): Unit = {
- if (props.containsKey("yarn.master.cores")) this.cores = props.getProperty("yarn.master.cores").asInstanceOf[Int]
- if (props.containsKey("yarn.master.memoryMB")) this.memoryMB = props.getProperty("yarn.master.memoryMB").asInstanceOf[Int]
+ if (props.containsKey("yarn.master.cores")) this.cores = props.getProperty("yarn.master.cores").toInt
+ if (props.containsKey("yarn.master.memoryMB")) this.memoryMB = props.getProperty("yarn.master.memoryMB").toInt
}
}
@@ -83,8 +83,8 @@
var memoryMB: Int = 1024
def load(props: Properties): Unit = {
- if (props.containsKey("yarn.worker.cores")) this.cores = props.getProperty("yarn.worker.cores").asInstanceOf[Int]
- if (props.containsKey("yarn.worker.memoryMB")) this.memoryMB = props.getProperty("yarn.worker.memoryMB").asInstanceOf[Int]
+ if (props.containsKey("yarn.worker.cores")) this.cores = props.getProperty("yarn.worker.cores").toInt
+ if (props.containsKey("yarn.worker.memoryMB")) this.memoryMB = props.getProperty("yarn.worker.memoryMB").toInt
}
}
@@ -133,9 +133,9 @@
def load(props: Properties): Unit = {
- if (props.containsKey("jobs.cpu")) cpus = props.getProperty("jobs.cpu").asInstanceOf[Double]
- if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").asInstanceOf[Long]
- if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").asInstanceOf[Long]
+ if (props.containsKey("jobs.cpu")) cpus = props.getProperty("jobs.cpu").toDouble
+ if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").toLong
+ if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").toLong
Tasks.load(props)
}
@@ -148,9 +148,9 @@
def load(props: Properties): Unit = {
- if (props.containsKey("jobs.tasks.attempts")) attempts = props.getProperty("jobs.tasks.attempts").asInstanceOf[Int]
- if (props.containsKey("jobs.tasks.cpus")) attempts = props.getProperty("jobs.tasks.cpus").asInstanceOf[Int]
- if (props.containsKey("jobs.tasks.mem")) attempts = props.getProperty("jobs.tasks.mem").asInstanceOf[Int]
+ if (props.containsKey("jobs.tasks.attempts")) attempts = props.getProperty("jobs.tasks.attempts").toInt
+ if (props.containsKey("jobs.tasks.cpus")) cpus = props.getProperty("jobs.tasks.cpus").toInt
+ if (props.containsKey("jobs.tasks.mem")) mem = props.getProperty("jobs.tasks.mem").toInt
}
}
@@ -209,7 +209,7 @@
if (props.containsKey("timeout")) timeout = props.getProperty("timeout").asInstanceOf[Double]
if (props.containsKey("mode")) mode = props.getProperty("mode")
if (props.containsKey("workingFolder")) workingFolder = props.getProperty("workingFolder", s"/user/$user")
-
+ if (props.containsKey("pysparkPath")) pysparkPath = props.getProperty("pysparkPath")
// TODO: rethink this
Jar = this.getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
JarName = Paths.get(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath).getFileName.toString
diff --git a/executor/src/main/resources/spark_intp.py b/executor/src/main/resources/spark_intp.py
index 0faae2b..f3c9fc0 100755
--- a/executor/src/main/resources/spark_intp.py
+++ b/executor/src/main/resources/spark_intp.py
@@ -21,6 +21,7 @@
import os
import sys
import zipimport
+sys.path.append(os.getcwd())
from runtime import AmaContext, Environment
# os.chdir(os.getcwd() + '/build/resources/test/')
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
index 94b8056..79fe18a 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
@@ -16,19 +16,21 @@
*/
package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
-import java.io.{File, PrintWriter, StringWriter}
+import java.io.File
import java.util
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.{PythonDependencies, PythonPackage}
+import org.apache.amaterasu.common.execution.dependencies.PythonDependencies
import org.apache.amaterasu.common.logging.Logging
import org.apache.amaterasu.common.runtime.Environment
import org.apache.amaterasu.sdk.AmaterasuRunner
import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession
-import scala.sys.process.Process
+import scala.sys.process.{Process, ProcessLogger}
+
+
class PySparkRunner extends AmaterasuRunner with Logging {
@@ -69,6 +71,15 @@
object PySparkRunner {
+ def collectCondaPackages(): String = {
+ val pkgsDirs = new File("./miniconda/pkgs")
+ (pkgsDirs.listFiles.filter {
+ file => file.getName.endsWith(".tar.bz2")
+ }.map {
+ file => s"./miniconda/pkgs/${file.getName}"
+ }.toBuffer ++ "dist/codegen.py").mkString(",")
+ }
+
def apply(env: Environment,
jobId: String,
notifier: Notifier,
@@ -77,14 +88,13 @@
pyDeps: PythonDependencies,
config: ClusterConfig): PySparkRunner = {
- //TODO: can we make this less ugly?
- var pysparkPython = "/usr/bin/python"
+ val shellLoger = ProcessLogger(
+ (o: String) => println(o),
+ (e: String) => println(e)
+ )
- if (pyDeps != null &&
- pyDeps.packages.nonEmpty) {
- loadPythonDependencies(pyDeps, notifier)
- pysparkPython = "miniconda/bin/python"
- }
+ //TODO: can we make this less ugly?
+
val result = new PySparkRunner
@@ -98,87 +108,44 @@
intpPath = s"spark_intp.py"
}
var pysparkPath = ""
- if (env.configuration.contains("pysparkPath")) {
- pysparkPath = env.configuration("pysparkPath")
- } else {
- pysparkPath = s"${config.spark.home}/bin/spark-submit"
+ var condaPkgs = ""
+ if (pyDeps != null)
+ condaPkgs = collectCondaPackages()
+ var sparkCmd: Seq[String] = Seq()
+ config.mode match {
+ case "yarn" =>
+ pysparkPath = s"spark/bin/spark-submit"
+ sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, "--master", "yarn", intpPath, port.toString)
+ val proc = Process(sparkCmd, None,
+ "PYTHONPATH" -> pypath,
+ "PYTHONHASHSEED" -> 0.toString)
+
+ proc.run(shellLoger)
+ case "mesos" =>
+ pysparkPath = config.pysparkPath
+ if (pysparkPath.endsWith("spark-submit")) {
+ sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, intpPath, port.toString)
+ }
+ else {
+ sparkCmd = Seq(pysparkPath, intpPath, port.toString)
+ }
+ var pysparkPython = "/usr/bin/python"
+
+ if (pyDeps != null &&
+ pyDeps.packages.nonEmpty) {
+ pysparkPython = "./miniconda/bin/python"
+ }
+ val proc = Process(sparkCmd, None,
+ "PYTHONPATH" -> pypath,
+ "PYSPARK_PYTHON" -> pysparkPython,
+ "PYTHONHASHSEED" -> 0.toString)
+
+ proc.run(shellLoger)
}
- val proc = Process(Seq(pysparkPath, intpPath, port.toString), None,
- "PYTHONPATH" -> pypath,
- "PYSPARK_PYTHON" -> pysparkPython,
- "PYTHONHASHSEED" -> 0.toString) #> System.out
-
- proc.run()
-
result.notifier = notifier
result
}
- /**
- * This installs the required python dependencies.
- * We basically need 2 packages to make pyspark work with customer's scripts:
- * 1. py4j - supplied by spark, for communication between Python and Java runtimes.
- * 2. codegen - for dynamically parsing and converting customer's scripts into executable Python code objects.
- * Currently we only know how to install packages using Anaconda, the reason is 3rd party OS libraries, e.g. libevent
- * Anaconda has the capabilities to automatically resolve the required OS libraries per Python package and install them.
- *
- * TODO - figure out if we really want to support pip directly, or if Anaconda is enough.
- * @param deps All of the customer's supplied Python dependencies, this currently comes from job-repo/deps/python.yml
- * @param notifier
- */
- private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
- notifier.info("loading anaconda evn")
- installAnacondaOnNode()
- val codegenPackage = PythonPackage("codegen", channel = Option("auto"))
- installAnacondaPackage(codegenPackage)
- try {
- deps.packages.foreach(pack => {
- pack.index.getOrElse("anaconda").toLowerCase match {
- case "anaconda" => installAnacondaPackage(pack)
- // case "pypi" => installPyPiPackage(pack) TODO: See if we can support this
- }
- })
- }
- catch {
-
- case rte: RuntimeException =>
- val sw = new StringWriter
- rte.printStackTrace(new PrintWriter(sw))
- notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}")
- case e: Exception =>
- val sw = new StringWriter
- e.printStackTrace(new PrintWriter(sw))
- notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}")
- }
- }
-
-
- /**
- * Installs one python package using Anaconda.
- * Anaconda works with multiple channels, or better called, repositories.
- * Normally, if a channel isn't specified, Anaconda will fetch the package from the default conda channel.
- * The reason we need to use channels, is that sometimes the required package doesn't exist on the default channel.
- * @param pythonPackage This comes from parsing the python.yml dep file.
- */
- private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
- val channel = pythonPackage.channel.getOrElse("anaconda")
- if (channel == "anaconda") {
- Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}")
- } else {
- Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}")
- }
- }
-
- /**
- * Installs Anaconda and then links it with the local spark that was installed on the executor.
- */
- private def installAnacondaOnNode(): Unit = {
- Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda")
- Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build")
- Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark")
- }
-
-
}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
index ff56d8c..ba7ff03 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
@@ -47,6 +47,7 @@
)
private var conf: Option[Map[String, Any]] = _
private var executorEnv: Option[Map[String, Any]] = _
+ private var clusterConfig: ClusterConfig = _
override def init(execData: ExecData,
jobId: String,
@@ -60,7 +61,7 @@
(o: String) => log.info(o),
(e: String) => log.error("", e)
)
-
+ clusterConfig = config
var jars = Seq.empty[String]
if (execData.deps != null) {
@@ -83,9 +84,15 @@
sparkScalaRunner.initializeAmaContext(execData.env)
runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner)
-
+ var pypath = ""
// TODO: get rid of hard-coded version
- lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, s"${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip", execData.pyDeps, config)
+ config.mode match {
+ case "yarn" =>
+ pypath = s"$$PYTHONPATH:$$SPARK_HOME/python:$$SPARK_HOME/python/build:${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip:${new File(".").getAbsolutePath}"
+ case "mesos" =>
+ pypath = s"${new File(".").getAbsolutePath}/miniconda/pkgs:${new File(".").getAbsolutePath}"
+ }
+ lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, pypath, execData.pyDeps, config)
runners.put(pySparkRunner.getIdentifier, pySparkRunner)
lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark)
@@ -95,17 +102,22 @@
private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
val channel = pythonPackage.channel.getOrElse("anaconda")
if (channel == "anaconda") {
- Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
+ Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
} else {
- Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
+ Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
}
}
private def installAnacondaOnNode(): Unit = {
// TODO: get rid of hard-coded version
- Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda") ! shellLoger
- Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build") ! shellLoger
- Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") ! shellLoger
+
+ this.clusterConfig.mode match {
+ case "yarn" => Seq("sh", "-c", "export HOME=$PWD && ./miniconda.sh -b -p miniconda") ! shellLoger
+ case "mesos" => Seq("sh", "Miniconda2-latest-Linux-x86_64.sh", "-b", "-p", "miniconda") ! shellLoger
+ }
+
+ Seq("bash", "-c", "export HOME=$PWD && ./miniconda/bin/python -m conda install -y conda-build") ! shellLoger
+ Seq("bash", "-c", "ln -s spark/python/pyspark miniconda/pkgs/pyspark") ! shellLoger
}
private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
index 0bf7337..f2c2afa 100644
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
+++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
@@ -152,9 +152,9 @@
.set("spark.history.kerberos.principal", "none")
.set("spark.master", master)
- .set("spark.executor.instances", "1") // TODO: change this
+ .set("spark.executor.instances", config.spark.opts.getOrElse("executor.instances", "1"))
.set("spark.yarn.jars", s"spark/jars/*")
- .set("spark.executor.memory", "1g")
+ .set("spark.executor.memory", config.spark.opts.getOrElse("executor.memory", "1g"))
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.eventLog.enabled", "false")
.set("spark.history.fs.logDirectory", "hdfs:///spark2-history/")
diff --git a/executor/src/test/resources/amaterasu.properties b/executor/src/test/resources/amaterasu.properties
index 19cb189..d402fed 100755
--- a/executor/src/test/resources/amaterasu.properties
+++ b/executor/src/test/resources/amaterasu.properties
@@ -6,3 +6,4 @@
webserver.port=8000
webserver.root=dist
spark.version=2.1.1-bin-hadoop2.7
+pysparkPath = /usr/bin/python
diff --git a/executor/src/test/resources/spark_intp.py b/executor/src/test/resources/spark_intp.py
index a427e92..fd8dc0e 100755
--- a/executor/src/test/resources/spark_intp.py
+++ b/executor/src/test/resources/spark_intp.py
@@ -31,6 +31,7 @@
zip.extractall()
sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
+sys.path.append(os.getcwd())
# py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
# py4j_importer = zipimport.zipimporter(py4j_path)