copy fixed sofr mesos
diff --git a/frameworks/python/dispatcher/build.gradle b/frameworks/python/dispatcher/build.gradle
index 71ed827..b1cbd39 100644
--- a/frameworks/python/dispatcher/build.gradle
+++ b/frameworks/python/dispatcher/build.gradle
@@ -69,11 +69,6 @@
}
dependencies {
- compile 'org.scala-lang:scala-library:2.11.8'
-
-// compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
-// compile group: 'org.scala-lang', name: 'scala-compiler', version: '2.11.8'
-
compile project(':common')
compile project(':amaterasu-sdk')
compile project(':leader')
diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
index 01ef770..954c5c7 100644
--- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
+++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
@@ -48,11 +48,9 @@
}
override fun getActionDependencies(jobId: String, actionData: ActionData): Array<String> {
- val reqFile = File("dist/$requirementsFileName")
- val dist = Paths.get("dist/")
+ val reqFile = File(requirementsFileName)
if (reqFile.exists()) reqFile.delete()
- if (Files.notExists(dist)) Files.createDirectories(dist)
val dependencies = runnerResources + mandatoryPYPIPackages
diff --git a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
index 3534e3d..d39ae32 100644
--- a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
+++ b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
Binary files differ
diff --git a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
index 379f066..0fa079e 100644
--- a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
+++ b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
Binary files differ
diff --git a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
index 7c53be8..f499d96 100644
--- a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
+++ b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
Binary files differ
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
index 075cfec..6bdd68f 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -238,12 +238,14 @@
val runnerProvider = framework.getRunnerProvider(actionData.typeId)
val ctx = Records.newRecord(ContainerLaunchContext::class.java)
+
val envConf = configManager.getActionConfiguration(actionData.name, actionData.config)
val commands: List<String> = listOf(runnerProvider.getCommand(jobManager.jobId, actionData, envConf, "${actionData.id}-${container.id.containerId}", address))
notifier.info("container command ${commands.joinToString(prefix = " ", postfix = " ")}")
ctx.commands = commands
ctx.tokens = allTokens()
+
ctx.localResources = setupContainerResources(framework, runnerProvider, actionData)
ctx.environment = framework.environmentVariables
@@ -252,7 +254,7 @@
jobManager.actionStarted(actionData.id)
containersIdsToTask[container.id.containerId] = actionData
notifier.info("created container for ${actionData.name} created")
- //ctx.localResources.forEach { t: String, u: LocalResource -> notifier.info("resource: $t = ${u.resource}") }
+ //ctx.localResources.forEach { t: String, u: LocalResource -> notifier.info("resource: $t = ${u.resource}") }
log.info("launching container succeeded: ${container.id.containerId}; task: ${actionData.id}")
} catch (e: Exception) {
notifier.error("", "error launching container with ${e.message} in ${ExceptionUtils.getStackTrace(e)}")
@@ -299,6 +301,7 @@
// getting the action specific dependencies
runnerProvider.getActionDependencies(jobManager.jobId, actionData).forEach { distributeFile(it, "${jobManager.jobId}/${actionData.name}/") }
+
result.putAll(runnerProvider.getActionDependencies(jobManager.jobId, actionData).map { File(it).name to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath("${jobManager.jobId}/${actionData.name}/$it"))) })
// Adding the Amaterasu configuration files
@@ -313,6 +316,8 @@
result[File(executable).name] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath("${jobManager.jobId}/${actionData.name}/$executable")))
result.forEach { println("entry ${it.key} with value ${it.value}") }
+
+ result.forEach { notifier.info("entry ${it.key} with value ${it.value}") }
return result.map { x -> x.key.removePrefix("/") to x.value }.toMap()
}
@@ -348,12 +353,13 @@
private fun distributeFile(file: String, distributionPath: String) {
- log.info("copying file $file, file status ${File(file).exists()}")
val actionDistPath = createDistPath("$distributionPath/$file")
val yarnJarPath = Path(config.yarn().hdfsJarsPath())
val targetPath = Path.mergePaths(yarnJarPath, actionDistPath)
+ notifier.info("copying file $file, file status ${File(file).exists()} to $targetPath")
+
log.info("target is $targetPath")
fs.copyFromLocalFile(false, true, Path(file), targetPath)
@@ -434,7 +440,7 @@
} else {
// TODO: Check the getDiagnostics value and see if appropriate
jobManager.actionFailed(taskId, status.diagnostics)
- notifier.error( "Container $containerId Complete with task $taskId with Failed status code (${status.exitStatus})", status.diagnostics)
+ notifier.error("Container $containerId Complete with task $taskId with Failed status code (${status.exitStatus})", status.diagnostics)
}
}
}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index f6d09f2..d357378 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -129,7 +129,7 @@
}
case TaskState.TASK_FINISHED => {
jobManager.actionComplete(status.getTaskId.getValue)
- printNotification(new Notification("",s"Container ${status.getExecutorId.getValue} Complete with task ${status.getTaskId.getValue} with success.", NotificationType.Info, NotificationLevel.Execution))
+ printNotification(new Notification("", s"Container ${status.getExecutorId.getValue} Complete with task ${status.getTaskId.getValue} with success.", NotificationType.Info, NotificationLevel.Execution))
}
case TaskState.TASK_FAILED |
TaskState.TASK_KILLED |
@@ -256,13 +256,14 @@
// Getting action dependencies
runnerProvider.getActionDependencies(jobManager.getJobId, actionData).foreach(r => {
+
+ FileUtils.copyFile(new File(r), new File(s"dist/$r"))
command.addUris(URI.newBuilder
.setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
.setExecutable(false)
.setExtract(false)
.build())
- }
- )
+ })
// Getting action specific resources
runnerProvider.getActionResources(jobManager.getJobId, actionData).foreach(r => command.addUris(URI.newBuilder
@@ -350,7 +351,7 @@
//driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
}
- printNotification(new Notification("", s"requesting container fo ${actionData.getName}", NotificationType.Info, NotificationLevel.Execution))
+ printNotification(new Notification("", s"requesting container for ${actionData.getName}", NotificationType.Info, NotificationLevel.Execution))
driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
}
diff --git a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
index 8abd578..cfcc56d 100644
--- a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
+++ b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
Binary files differ