blob: 846f097c3e57ec52b924a4340e4f8dcd9a8b9385 [file] [log] [blame]
package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) extends PythonRunnerProviderBase(env, conf) {
override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = {
var command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String)
log.info(s"===> Cluster manager: ${conf.mode}")
conf.mode match {
case "mesos" =>
command + s" && env AMA_NODE=${sys.env("AMA_NODE")} && env MESOS_NATIVE_JAVA_LIBRARY=${conf.mesos.libPath}" +
s" && python3 ${actionData.getSrc}"
case "yarn" =>
command + s" && /bin/bash spark/bin/load-spark-env.sh" +
s" && python3 ${actionData.getSrc}"
case _ =>
log.warn(s"Received unsupported cluster manager: ${conf.mode}")
command
}
}
override def getRunnerResources: Array[String] = {
val resources = super.getRunnerResources
resources :+ s"amaterasu_pyspark-${conf.version}.zip"
resources
}
override def getHasExecutor: Boolean = true
}
object PySparkRunnerProvider {
def apply(env: String, conf: ClusterConfig): PySparkRunnerProvider = {
val result = new PySparkRunnerProvider(env, conf)
result
}
}