Merge pull request #20 from nadav-har-tzvi/version-0.2.0-incubating-rc3

PySpark fixes for YARN and Mesos
diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
index 7c9f924..3661b48 100755
--- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
@@ -71,8 +71,8 @@
       var memoryMB: Int = 1024
 
       def load(props: Properties): Unit = {
-        if (props.containsKey("yarn.master.cores")) this.cores = props.getProperty("yarn.master.cores").asInstanceOf[Int]
-        if (props.containsKey("yarn.master.memoryMB")) this.memoryMB = props.getProperty("yarn.master.memoryMB").asInstanceOf[Int]
+        if (props.containsKey("yarn.master.cores")) this.cores = props.getProperty("yarn.master.cores").toInt
+        if (props.containsKey("yarn.master.memoryMB")) this.memoryMB = props.getProperty("yarn.master.memoryMB").toInt
       }
     }
 
@@ -83,8 +83,8 @@
       var memoryMB: Int = 1024
 
       def load(props: Properties): Unit = {
-        if (props.containsKey("yarn.worker.cores")) this.cores = props.getProperty("yarn.worker.cores").asInstanceOf[Int]
-        if (props.containsKey("yarn.worker.memoryMB")) this.memoryMB = props.getProperty("yarn.worker.memoryMB").asInstanceOf[Int]
+        if (props.containsKey("yarn.worker.cores")) this.cores = props.getProperty("yarn.worker.cores").toInt
+        if (props.containsKey("yarn.worker.memoryMB")) this.memoryMB = props.getProperty("yarn.worker.memoryMB").toInt
       }
     }
 
@@ -133,9 +133,9 @@
 
     def load(props: Properties): Unit = {
 
-      if (props.containsKey("jobs.cpu")) cpus = props.getProperty("jobs.cpu").asInstanceOf[Double]
-      if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").asInstanceOf[Long]
-      if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").asInstanceOf[Long]
+      if (props.containsKey("jobs.cpu")) cpus = props.getProperty("jobs.cpu").toDouble
+      if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").toLong
+      if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").toLong
 
       Tasks.load(props)
     }
@@ -148,9 +148,9 @@
 
       def load(props: Properties): Unit = {
 
-        if (props.containsKey("jobs.tasks.attempts")) attempts = props.getProperty("jobs.tasks.attempts").asInstanceOf[Int]
-        if (props.containsKey("jobs.tasks.cpus")) attempts = props.getProperty("jobs.tasks.cpus").asInstanceOf[Int]
-        if (props.containsKey("jobs.tasks.mem")) attempts = props.getProperty("jobs.tasks.mem").asInstanceOf[Int]
+        if (props.containsKey("jobs.tasks.attempts")) attempts = props.getProperty("jobs.tasks.attempts").toInt
+        if (props.containsKey("jobs.tasks.cpus")) cpus = props.getProperty("jobs.tasks.cpus").toInt
+        if (props.containsKey("jobs.tasks.mem")) mem = props.getProperty("jobs.tasks.mem").toInt
 
       }
     }
@@ -209,7 +209,7 @@
     if (props.containsKey("timeout")) timeout = props.getProperty("timeout").asInstanceOf[Double]
     if (props.containsKey("mode")) mode = props.getProperty("mode")
     if (props.containsKey("workingFolder")) workingFolder = props.getProperty("workingFolder", s"/user/$user")
-
+    if (props.containsKey("pysparkPath")) pysparkPath = props.getProperty("pysparkPath")
     // TODO: rethink this
     Jar = this.getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
     JarName = Paths.get(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath).getFileName.toString
diff --git a/executor/src/main/resources/spark_intp.py b/executor/src/main/resources/spark_intp.py
index 0faae2b..f3c9fc0 100755
--- a/executor/src/main/resources/spark_intp.py
+++ b/executor/src/main/resources/spark_intp.py
@@ -21,6 +21,7 @@
 import os
 import sys
 import zipimport
+sys.path.append(os.getcwd())
 from runtime import AmaContext, Environment
 
 # os.chdir(os.getcwd() + '/build/resources/test/')
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
index 94b8056..79fe18a 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/PySpark/PySparkRunner.scala
@@ -16,19 +16,21 @@
  */
 package org.apache.amaterasu.executor.execution.actions.runners.spark.PySpark
 
-import java.io.{File, PrintWriter, StringWriter}
+import java.io.File
 import java.util
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.{PythonDependencies, PythonPackage}
+import org.apache.amaterasu.common.execution.dependencies.PythonDependencies
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.common.runtime.Environment
 import org.apache.amaterasu.sdk.AmaterasuRunner
 import org.apache.spark.SparkEnv
 import org.apache.spark.sql.SparkSession
 
-import scala.sys.process.Process
+import scala.sys.process.{Process, ProcessLogger}
+
+
 
 
 class PySparkRunner extends AmaterasuRunner with Logging {
@@ -69,6 +71,15 @@
 
 object PySparkRunner {
 
+  def collectCondaPackages(): String = {
+    val pkgsDirs = new File("./miniconda/pkgs")
+    (pkgsDirs.listFiles.filter {
+      file => file.getName.endsWith(".tar.bz2")
+    }.map {
+      file => s"./miniconda/pkgs/${file.getName}"
+    }.toBuffer ++ "dist/codegen.py").mkString(",")
+  }
+
   def apply(env: Environment,
             jobId: String,
             notifier: Notifier,
@@ -77,14 +88,13 @@
             pyDeps: PythonDependencies,
             config: ClusterConfig): PySparkRunner = {
 
-    //TODO: can we make this less ugly?
-    var pysparkPython = "/usr/bin/python"
+    val shellLoger = ProcessLogger(
+      (o: String) => println(o),
+      (e: String) => println(e)
+    )
 
-    if (pyDeps != null &&
-        pyDeps.packages.nonEmpty) {
-      loadPythonDependencies(pyDeps, notifier)
-      pysparkPython = "miniconda/bin/python"
-    }
+    //TODO: can we make this less ugly?
+
 
     val result = new PySparkRunner
 
@@ -98,87 +108,44 @@
       intpPath = s"spark_intp.py"
     }
     var pysparkPath = ""
-    if (env.configuration.contains("pysparkPath")) {
-      pysparkPath = env.configuration("pysparkPath")
-    } else {
-      pysparkPath = s"${config.spark.home}/bin/spark-submit"
+    var condaPkgs = ""
+    if (pyDeps != null)
+      condaPkgs = collectCondaPackages()
+    var sparkCmd: Seq[String] = Seq()
+    config.mode match {
+      case "yarn" =>
+        pysparkPath = s"spark/bin/spark-submit"
+        sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, "--master", "yarn", intpPath, port.toString)
+        val proc = Process(sparkCmd, None,
+          "PYTHONPATH" -> pypath,
+          "PYTHONHASHSEED" -> 0.toString)
+
+        proc.run(shellLoger)
+      case "mesos" =>
+        pysparkPath = config.pysparkPath
+        if (pysparkPath.endsWith("spark-submit")) {
+          sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, intpPath, port.toString)
+        }
+        else {
+          sparkCmd = Seq(pysparkPath, intpPath, port.toString)
+        }
+        var pysparkPython = "/usr/bin/python"
+
+        if (pyDeps != null &&
+          pyDeps.packages.nonEmpty) {
+          pysparkPython = "./miniconda/bin/python"
+        }
+        val proc = Process(sparkCmd, None,
+          "PYTHONPATH" -> pypath,
+          "PYSPARK_PYTHON" -> pysparkPython,
+        "PYTHONHASHSEED" -> 0.toString)
+
+        proc.run(shellLoger)
     }
-    val proc = Process(Seq(pysparkPath, intpPath, port.toString), None,
-      "PYTHONPATH" -> pypath,
-      "PYSPARK_PYTHON" -> pysparkPython,
-      "PYTHONHASHSEED" -> 0.toString) #> System.out
-
-    proc.run()
-
 
     result.notifier = notifier
 
     result
   }
 
-  /**
-    * This installs the required python dependencies.
-    * We basically need 2 packages to make pyspark work with customer's scripts:
-    * 1. py4j - supplied by spark, for communication between Python and Java runtimes.
-    * 2. codegen - for dynamically parsing and converting customer's scripts into executable Python code objects.
-    * Currently we only know how to install packages using Anaconda, the reason is 3rd party OS libraries, e.g. libevent
-    * Anaconda has the capabilities to automatically resolve the required OS libraries per Python package and install them.
-    *
-    * TODO - figure out if we really want to support pip directly, or if Anaconda is enough.
-    * @param deps All of the customer's supplied Python dependencies, this currently comes from job-repo/deps/python.yml
-    * @param notifier
-    */
-  private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
-    notifier.info("loading anaconda evn")
-    installAnacondaOnNode()
-    val codegenPackage = PythonPackage("codegen", channel = Option("auto"))
-    installAnacondaPackage(codegenPackage)
-    try {
-      deps.packages.foreach(pack => {
-        pack.index.getOrElse("anaconda").toLowerCase match {
-          case "anaconda" => installAnacondaPackage(pack)
-          // case "pypi" => installPyPiPackage(pack) TODO: See if we can support this
-        }
-      })
-    }
-    catch {
-
-      case rte: RuntimeException =>
-        val sw = new StringWriter
-        rte.printStackTrace(new PrintWriter(sw))
-        notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}")
-      case e: Exception =>
-        val sw = new StringWriter
-        e.printStackTrace(new PrintWriter(sw))
-        notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}")
-    }
-  }
-
-
-  /**
-    * Installs one python package using Anaconda.
-    * Anaconda works with multiple channels, or better called, repositories.
-    * Normally, if a channel isn't specified, Anaconda will fetch the package from the default conda channel.
-    * The reason we need to use channels, is that sometimes the required package doesn't exist on the default channel.
-    * @param pythonPackage This comes from parsing the python.yml dep file.
-    */
-  private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
-    val channel = pythonPackage.channel.getOrElse("anaconda")
-    if (channel == "anaconda") {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}")
-    } else {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}")
-    }
-  }
-
-  /**
-    * Installs Anaconda and then links it with the local spark that was installed on the executor.
-    */
-  private def installAnacondaOnNode(): Unit = {
-    Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda")
-    Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build")
-    Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark")
-  }
-
-
 }
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
index ff56d8c..ba7ff03 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/execution/actions/runners/spark/SparkRunnersProvider.scala
@@ -47,6 +47,7 @@
   )
   private var conf: Option[Map[String, Any]] = _
   private var executorEnv: Option[Map[String, Any]] = _
+  private var clusterConfig: ClusterConfig = _
 
   override def init(execData: ExecData,
                     jobId: String,
@@ -60,7 +61,7 @@
       (o: String) => log.info(o),
       (e: String) => log.error("", e)
     )
-
+    clusterConfig = config
     var jars = Seq.empty[String]
 
     if (execData.deps != null) {
@@ -83,9 +84,15 @@
     sparkScalaRunner.initializeAmaContext(execData.env)
 
     runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner)
-
+    var pypath = ""
     // TODO: get rid of hard-coded version
-    lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, s"${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip", execData.pyDeps, config)
+    config.mode match {
+      case "yarn" =>
+        pypath = s"$$PYTHONPATH:$$SPARK_HOME/python:$$SPARK_HOME/python/build:${config.spark.home}/python:${config.spark.home}/python/pyspark:${config.spark.home}/python/pyspark/build:${config.spark.home}/python/pyspark/lib/py4j-0.10.4-src.zip:${new File(".").getAbsolutePath}"
+      case "mesos" =>
+        pypath = s"${new File(".").getAbsolutePath}/miniconda/pkgs:${new File(".").getAbsolutePath}"
+    }
+    lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, pypath, execData.pyDeps, config)
     runners.put(pySparkRunner.getIdentifier, pySparkRunner)
 
     lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark)
@@ -95,17 +102,22 @@
   private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
     val channel = pythonPackage.channel.getOrElse("anaconda")
     if (channel == "anaconda") {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
+      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
     } else {
-      Seq("bash", "-c", s"$$PWD/miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
+      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
     }
   }
 
   private def installAnacondaOnNode(): Unit = {
     // TODO: get rid of hard-coded version
-    Seq("bash", "-c", "sh Miniconda2-latest-Linux-x86_64.sh -b -p $PWD/miniconda") ! shellLoger
-    Seq("bash", "-c", "$PWD/miniconda/bin/python -m conda install -y conda-build") ! shellLoger
-    Seq("bash", "-c", "ln -s $PWD/spark-2.2.1-bin-hadoop2.7/python/pyspark $PWD/miniconda/pkgs/pyspark") ! shellLoger
+
+    this.clusterConfig.mode match {
+      case "yarn" => Seq("sh", "-c", "export HOME=$PWD && ./miniconda.sh -b -p miniconda") ! shellLoger
+      case "mesos" => Seq("sh", "Miniconda2-latest-Linux-x86_64.sh", "-b", "-p", "miniconda") ! shellLoger
+    }
+
+    Seq("bash", "-c", "export HOME=$PWD && ./miniconda/bin/python -m conda install -y conda-build") ! shellLoger
+    Seq("bash", "-c", "ln -s spark/python/pyspark miniconda/pkgs/pyspark") ! shellLoger
   }
 
   private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
diff --git a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
index 0bf7337..f2c2afa 100644
--- a/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
+++ b/executor/src/main/scala/org/apache/spark/repl/amaterasu/runners/spark/SparkRunnerHelper.scala
@@ -152,9 +152,9 @@
           .set("spark.history.kerberos.principal", "none")
 
           .set("spark.master", master)
-          .set("spark.executor.instances", "1") // TODO: change this
+          .set("spark.executor.instances", config.spark.opts.getOrElse("executor.instances", "1"))
           .set("spark.yarn.jars", s"spark/jars/*")
-          .set("spark.executor.memory", "1g")
+          .set("spark.executor.memory", config.spark.opts.getOrElse("executor.memory", "1g"))
           .set("spark.dynamicAllocation.enabled", "false")
           .set("spark.eventLog.enabled", "false")
           .set("spark.history.fs.logDirectory", "hdfs:///spark2-history/")
diff --git a/executor/src/test/resources/amaterasu.properties b/executor/src/test/resources/amaterasu.properties
index 19cb189..d402fed 100755
--- a/executor/src/test/resources/amaterasu.properties
+++ b/executor/src/test/resources/amaterasu.properties
@@ -6,3 +6,4 @@
 webserver.port=8000
 webserver.root=dist
 spark.version=2.1.1-bin-hadoop2.7
+pysparkPath = /usr/bin/python
diff --git a/executor/src/test/resources/spark_intp.py b/executor/src/test/resources/spark_intp.py
index a427e92..fd8dc0e 100755
--- a/executor/src/test/resources/spark_intp.py
+++ b/executor/src/test/resources/spark_intp.py
@@ -31,6 +31,7 @@
 zip.extractall()
 sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/pyspark')
 sys.path.insert(1, os.getcwd() + '/executor/src/test/resources/py4j')
+sys.path.append(os.getcwd())
 
 # py4j_path = 'spark-2.2.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip'
 # py4j_importer = zipimport.zipimporter(py4j_path)