incorrect path passed to container
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
index 4cd0f9e..b63a441 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
@@ -8,7 +8,7 @@
override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = {
var command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String)
log.info(s"===> Cluster manager: ${conf.mode}")
- val actionSrcPath = s"dist/$jobId/${actionData.getName}/${actionData.getSrc}"
+ val actionSrcPath = getDownloadableActionSrcPath(jobId, actionData)
conf.mode match {
case "mesos" =>
command + s" && env AMA_NODE=${sys.env("AMA_NODE")} && env MESOS_NATIVE_JAVA_LIBRARY=${conf.mesos.libPath}" +
diff --git a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
index ebb556d..32cd965 100644
--- a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
+++ b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
@@ -30,10 +30,15 @@
abstract fun getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String
+ protected fun getDownloadableActionSrcPath(jobId: String, actionData: ActionData): String {
+ return "$jobId/${actionData.name}/${actionData.src}"
+ }
+
open fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> {
- val actionSrcDistPath = "dist/$jobId/${actionData.name}/${actionData.src}"
+ val downloadableActionSrcPath = getDownloadableActionSrcPath(jobId, actionData)
+ val actionSrcDistPath = "dist/$downloadableActionSrcPath"
Files.copy(Paths.get("repo/src/${actionData.src}"), Paths.get(actionSrcDistPath), StandardCopyOption.REPLACE_EXISTING)
- return arrayOf(actionSrcDistPath)
+ return arrayOf(downloadableActionSrcPath)
}
fun getActionResources(jobId: String, actionData: ActionData): Array<String> =