fixed the spark-env location
diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
index 72242d0..d6e9f42 100644
--- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
+++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
@@ -26,7 +26,7 @@
private val requirementsFileName: String = "ama-requirements.txt"
private val mandatoryPYPIPackages: Array<String> = arrayOf("requests")
- protected val virtualPythonPath = "./amaterasu_env/bin/python"
+ protected val virtualPythonPath = "amaterasu_env/bin/python"
override val runnerResources: Array<String>
get() = arrayOf("amaterasu-sdk-${conf.version()}.zip")
diff --git a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
index b6ca524..69a9002 100644
--- a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
+++ b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
Binary files differ
diff --git a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
index ce67e5a..448adf5 100644
--- a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
+++ b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
Binary files differ
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
index 407eb30..465691c 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
@@ -3,16 +3,16 @@
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
-import sys.process._
+import org.apache.commons.lang.StringUtils
class PySparkRunnerProvider(val env: String, val conf: ClusterConfig) extends PythonRunnerProviderBase(env, conf) {
override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = {
- var command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String)
+ //val command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String)
log.info(s"===> Cluster manager: ${conf.mode}")
- val pythonBinPath = Seq(getVirtualPythonPath, "-c", "import sys; print(sys.executable)").!!.trim()
- command + s" && /bin/bash $$SPARK_HOME/bin/load-spark-env.sh && env PYSPARK_PYTHON=$pythonBinPath " +
- s" && $$SPARK_HOME/bin/spark-submit ${actionData.getSrc}"
+ //command +
+ // s"$$SPARK_HOME/conf/spark-env.sh && env PYSPARK_PYTHON=$getVirtualPythonPath " +
+ s"$$SPARK_HOME/bin/spark-submit ${actionData.getSrc}"
}
override def getRunnerResources: Array[String] = {
diff --git a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
index 1e47c2e..9855261 100644
--- a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
+++ b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
Binary files differ
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
index 8c45b41..cd4bf03 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -38,6 +38,7 @@
import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+import org.apache.commons.lang.exception.ExceptionUtils
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
@@ -60,10 +61,8 @@
import org.apache.hadoop.yarn.util.Records
import org.apache.zookeeper.CreateMode
+import java.io.*
-import java.io.File
-import java.io.FileInputStream
-import java.io.IOException
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
@@ -240,7 +239,7 @@
val ctx = Records.newRecord(ContainerLaunchContext::class.java)
val commands: List<String> = listOf(runnerProvider.getCommand(jobManager.jobId, actionData, env, "${actionData.id}-${container.id.containerId}", address))
- log.info("container command ${commands.joinToString(prefix = " ", postfix = " ")}")
+ notifier.info("container command ${commands.joinToString(prefix = " ", postfix = " ")}")
ctx.commands = commands
ctx.tokens = allTokens()
ctx.localResources = setupContainerResources(framework, runnerProvider, actionData)
@@ -293,7 +292,7 @@
// getting the action specific resources
result.putAll(runnerProvider.getActionResources(jobManager.jobId, actionData).map { it.removePrefix("${jobManager.jobId}/${actionData.name}/") to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it))) })
- // getting the action specific dependencies
+ // getting the action specific dependencies
runnerProvider.getActionDependencies(jobManager.jobId, actionData).forEach { distributeFile(it, "${jobManager.jobId}/${actionData.name}/") }
result.putAll(runnerProvider.getActionDependencies(jobManager.jobId, actionData).map { File(it).name to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath("${jobManager.jobId}/${actionData.name}/$it"))) })
@@ -400,7 +399,7 @@
}
override fun onError(e: Throwable?) {
- notifier.error("Error on AM", e!!.message!!)
+ notifier.error("Error running a container ${e!!.message!!}", ExceptionUtils.getStackTrace(e))
stopApplication(FinalApplicationStatus.FAILED, "Error on AM")
}
@@ -421,7 +420,7 @@
} else {
// TODO: Check the getDiagnostics value and see if appropriate
jobManager.actionFailed(taskId, status.diagnostics)
- notifier.error("", "Container $containerId Complete with task $taskId with Failed status code (${status.exitStatus})")
+ notifier.error( "Container $containerId Complete with task $taskId with Failed status code (${status.exitStatus})", status.diagnostics)
}
}
}
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt
index 7fc89e8..697bb40 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt
@@ -18,6 +18,7 @@
import org.apache.amaterasu.common.logging.KLogging
import org.apache.amaterasu.common.utils.ActiveNotifier
+import org.apache.commons.lang.exception.ExceptionUtils
import java.nio.ByteBuffer
@@ -30,7 +31,7 @@
class YarnNMCallbackHandler(val notifier: ActiveNotifier) : KLogging() , NMClientAsync.CallbackHandler {
override fun onStartContainerError(containerId: ContainerId, t: Throwable) {
- notifier.error("","Container ${containerId.containerId} couldn't start. message ${t.message}")
+ notifier.error("Error starting a container ${t.message!!}", ExceptionUtils.getStackTrace(t))
}
override fun onGetContainerStatusError(containerId: ContainerId, t: Throwable) {
@@ -46,7 +47,7 @@
}
override fun onStopContainerError(containerId: ContainerId, t: Throwable) {
- notifier.error("","Container ${containerId.containerId} has thrown an error. message ${t.message}")
+ notifier.error("Error running a container ${t.message!!}", ExceptionUtils.getStackTrace(t))
}
override fun onContainerStopped(containerId: ContainerId) {
diff --git a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
index 1d735fd..7ba06f4 100644
--- a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
+++ b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
Binary files differ