moved data loader to kotlin
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ExecData.kt
similarity index 74%
copy from common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
copy to common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ExecData.kt
index d16d6f8..3a2e717 100644
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ExecData.kt
@@ -16,7 +16,8 @@
*/
package org.apache.amaterasu.common.dataobjects
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies}
+import org.apache.amaterasu.common.execution.dependencise.Dependencies
+import org.apache.amaterasu.common.execution.dependencise.PythonDependencies
import org.apache.amaterasu.common.runtime.Environment
-case class ExecData(env: Environment, deps: Dependencies, pyDeps: PythonDependencies, configurations: Map[String, Map[String, Any]])
+data class ExecData(val env: Environment, val deps: Dependencies, val pyDeps: PythonDependencies, val configurations: Map<String, Map<String, Any>>)
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/TaskData.kt
similarity index 79%
rename from common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
rename to common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/TaskData.kt
index d16d6f8..53a5e20 100644
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/TaskData.kt
@@ -16,7 +16,6 @@
*/
package org.apache.amaterasu.common.dataobjects
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies}
import org.apache.amaterasu.common.runtime.Environment
-case class ExecData(env: Environment, deps: Dependencies, pyDeps: PythonDependencies, configurations: Map[String, Map[String, Any]])
+data class TaskData(val src: String, val env: Environment, val groupId: String, val typeId: String, val exports: Map<String, String>)
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/Dependencies.kt
similarity index 70%
copy from common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
copy to common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/Dependencies.kt
index d16d6f8..3968885 100644
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/Dependencies.kt
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.amaterasu.common.dataobjects
+package org.apache.amaterasu.common.execution.dependencise
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies}
-import org.apache.amaterasu.common.runtime.Environment
+import org.apache.amaterasu.common.dataobjects.Artifact
+import org.apache.amaterasu.common.dataobjects.Repo
-case class ExecData(env: Environment, deps: Dependencies, pyDeps: PythonDependencies, configurations: Map[String, Map[String, Any]])
+data class Dependencies(val repos: List<Repo>, val artifacts: List<Artifact>)
\ No newline at end of file
diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/PythonDependencies.kt b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/PythonDependencies.kt
new file mode 100644
index 0000000..41ea991
--- /dev/null
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/PythonDependencies.kt
@@ -0,0 +1,3 @@
+package org.apache.amaterasu.common.execution.dependencise
+
+data class PythonDependencies(val packages: List<PythonPackage>)
\ No newline at end of file
diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/PythonPackage.kt b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/PythonPackage.kt
new file mode 100644
index 0000000..e5bfa90
--- /dev/null
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/PythonPackage.kt
@@ -0,0 +1,3 @@
+package org.apache.amaterasu.common.execution.dependencise
+
+data class PythonPackage(val packageId: String, val index: String? = null, val channel: String? = null)
\ No newline at end of file
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala b/common/src/main/kotlin/org/apache/amaterasu/common/runtime/Environment.kt
similarity index 70%
copy from common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
copy to common/src/main/kotlin/org/apache/amaterasu/common/runtime/Environment.kt
index d16d6f8..8f3ede2 100644
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/runtime/Environment.kt
@@ -14,9 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.amaterasu.common.dataobjects
+package org.apache.amaterasu.common.runtime
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies}
-import org.apache.amaterasu.common.runtime.Environment
+data class Environment (
+ var name: String = "",
+ var master: String = "",
+ var inputRootPath: String = "",
+ var outputRootPath: String = "",
+ var workingDir: String = "",
-case class ExecData(env: Environment, deps: Dependencies, pyDeps: PythonDependencies, configurations: Map[String, Map[String, Any]])
+ var configuration: Map<String, String> )
\ No newline at end of file
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala
deleted file mode 100644
index a745581..0000000
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.dataobjects
-
-import org.apache.amaterasu.common.runtime.Environment
-
-
-/* TODO: Future eyal and yaniv - The TaskData class should support overriding configurations for execData configurations
-// more specifiably, if execData holds configurations for spark setup (vcores/memory) a task should be able to override those
-*/
-case class TaskData(src: String, env: Environment, groupId: String, typeId: String, exports: Map[String, String])
diff --git a/common/src/main/scala/org/apache/amaterasu/common/execution/dependencies/Dependencies.scala b/common/src/main/scala/org/apache/amaterasu/common/execution/dependencies/Dependencies.scala
deleted file mode 100755
index 855262b..0000000
--- a/common/src/main/scala/org/apache/amaterasu/common/execution/dependencies/Dependencies.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution.dependencies
-
-import scala.collection.mutable.ListBuffer
-
-case class Dependencies(repos: ListBuffer[Repo], artifacts: List[Artifact])
-case class PythonDependencies(packages: List[PythonPackage])
-case class Repo(id: String, `type`: String, url: String)
-case class Artifact(groupId: String, artifactId: String, version: String)
-case class PythonPackage(packageId: String, index: Option[String] = None, channel: Option[String] = None) // Not really sure about this, basically I want default values but the ObjectMapper apparently doesn't support them
\ No newline at end of file
diff --git a/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala b/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala
index f49d8ad..fa65cbe 100755
--- a/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala
@@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.amaterasu.common.runtime
-
-case class Environment() {
-
- var name: String = ""
- var master: String = ""
-
- var inputRootPath: String = ""
- var outputRootPath: String = ""
- var workingDir: String = ""
-
- var configuration: Map[String, String] = _
-
-}
\ No newline at end of file
+//package org.apache.amaterasu.common.runtime
+//
+//case class Environment() {
+//
+// var name: String = ""
+// var master: String = ""
+//
+// var inputRootPath: String = ""
+// var outputRootPath: String = ""
+// var workingDir: String = ""
+//
+// var configuration: Map[String, String] = _
+//
+//}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
index d7211a2..5325b82 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
@@ -103,11 +103,11 @@
.setTaskId(taskInfo.getTaskId)
.setState(TaskState.TASK_RUNNING).build()
driver.sendStatusUpdate(status)
- val runner = providersFactory.getRunner(taskData.groupId, taskData.typeId)
+ val runner = providersFactory.getRunner(taskData.getGroupId, taskData.getTypeId)
runner match {
- case Some(r) => r.executeSource(taskData.src, actionName, taskData.exports.asJava)
+ case Some(r) => r.executeSource(taskData.getSrc, actionName, taskData.getExports)
case None =>
- notifier.error("", s"Runner not found for group: ${taskData.groupId}, type ${taskData.typeId}. Please verify the tasks")
+ notifier.error("", s"Runner not found for group: ${taskData.getGroupId}, type ${taskData.getTypeId}. Please verify the tasks")
None
}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
index dc5ebbf..6efcf9e 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
@@ -38,11 +38,11 @@
var providersFactory: ProvidersFactory = _
def execute(): Unit = {
- val runner = providersFactory.getRunner(taskData.groupId, taskData.typeId)
+ val runner = providersFactory.getRunner(taskData.getGroupId, taskData.getTypeId)
runner match {
case Some(r) => {
try {
- r.executeSource(taskData.src, actionName, taskData.exports.asJava)
+ r.executeSource(taskData.getSrc, actionName, taskData.getExports)
log.info("Completed action")
System.exit(0)
} catch {
@@ -53,7 +53,7 @@
}
}
case None =>
- log.error("", s"Runner not found for group: ${taskData.groupId}, type ${taskData.typeId}. Please verify the tasks")
+ log.error("", s"Runner not found for group: ${taskData.getGroupId}, type ${taskData.getTypeId}. Please verify the tasks")
System.exit(101)
}
}
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
index 84077e2..54b5b7b 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
@@ -39,12 +39,14 @@
private def loadSparkConfig: mutable.Map[String, Any] = {
+ println(s"===> env=$env")
+
val execData = DataLoader.getExecutorData(env, conf)
- val sparkExecConfiguration = execData.configurations.get("spark")
+ val sparkExecConfiguration = execData.getConfigurations.get("spark")
if (sparkExecConfiguration.isEmpty) {
throw new Exception(s"Spark configuration files could not be loaded for the environment $env")
}
- collection.mutable.Map(sparkExecConfiguration.get.toSeq: _*)
+ collection.mutable.Map(sparkExecConfiguration.toSeq: _*)
}
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
index a48aaa0..053e104 100644
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
@@ -22,9 +22,8 @@
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.dataobjects.ExecData
import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies, PythonPackage}
+import org.apache.amaterasu.common.execution.dependencise.{Dependencies, PythonDependencies, PythonPackage}
import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.frameworks.spark.runner.repl.{SparkRunnerHelper, SparkScalaRunner}
import org.apache.amaterasu.frameworks.spark.runner.sparksql.SparkSqlRunner
import org.apache.amaterasu.frameworks.spark.runner.pyspark.PySparkRunner
import org.apache.amaterasu.frameworks.spark.runner.repl.{SparkRunnerHelper, SparkScalaRunner}
@@ -46,8 +45,8 @@
(e: String) => log.error(e)
)
- private var conf: Option[Map[String, Any]] = _
- private var executorEnv: Option[Map[String, Any]] = _
+ private var conf: Map[String, AnyRef] = _
+ private var executorEnv: Map[String, AnyRef] = _
private var clusterConfig: ClusterConfig = _
override def init(execData: ExecData,
@@ -65,24 +64,24 @@
clusterConfig = config
var jars = Seq.empty[String]
- if (execData.deps != null) {
- jars ++= getDependencies(execData.deps)
+ if (execData.getDeps != null) {
+ jars ++= getDependencies(execData.getDeps)
}
- if (execData.pyDeps != null &&
- execData.pyDeps.packages.nonEmpty) {
- loadPythonDependencies(execData.pyDeps, notifier)
+ if (execData.getPyDeps != null &&
+ execData.getPyDeps.getPackages.nonEmpty) {
+ loadPythonDependencies(execData.getPyDeps, notifier)
}
- conf = execData.configurations.get("spark")
- executorEnv = execData.configurations.get("spark_exec_env")
+ conf = execData.getConfigurations.get("spark").toMap
+ executorEnv = execData.getConfigurations.get("spark_exec_env").toMap
val sparkAppName = s"job_${jobId}_executor_$executorId"
SparkRunnerHelper.notifier = notifier
- val spark = SparkRunnerHelper.createSpark(execData.env, sparkAppName, jars, conf, executorEnv, config, hostName)
+ val spark = SparkRunnerHelper.createSpark(execData.getEnv, sparkAppName, jars, Some(conf), Some(executorEnv), config, hostName)
- lazy val sparkScalaRunner = SparkScalaRunner(execData.env, jobId, spark, outStream, notifier, jars)
- sparkScalaRunner.initializeAmaContext(execData.env)
+ lazy val sparkScalaRunner = SparkScalaRunner(execData.getEnv, jobId, spark, outStream, notifier, jars)
+ sparkScalaRunner.initializeAmaContext(execData.getEnv)
runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner)
var pypath = ""
@@ -93,19 +92,19 @@
case "mesos" =>
pypath = s"${new File(".").getAbsolutePath}/miniconda/pkgs:${new File(".").getAbsolutePath}"
}
- lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, pypath, execData.pyDeps, config)
+ lazy val pySparkRunner = PySparkRunner(execData.getEnv, jobId, notifier, spark, pypath, execData.getPyDeps, config)
runners.put(pySparkRunner.getIdentifier, pySparkRunner)
- lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark)
+ lazy val sparkSqlRunner = SparkSqlRunner(execData.getEnv, jobId, notifier, spark)
runners.put(sparkSqlRunner.getIdentifier, sparkSqlRunner)
}
private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
- val channel = pythonPackage.channel.getOrElse("anaconda")
+ val channel = pythonPackage.getChannel
if (channel == "anaconda") {
- Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
+ Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.getPackageId}") ! shellLoger
} else {
- Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
+ Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.getPackageId}") ! shellLoger
}
}
@@ -124,12 +123,12 @@
private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
notifier.info("loading anaconda evn")
installAnacondaOnNode()
- val codegenPackage = PythonPackage("codegen", channel = Option("auto"))
+ val codegenPackage = new PythonPackage( "codegen", "", "auto")
installAnacondaPackage(codegenPackage)
try {
// notifier.info("loadPythonDependencies #5")
- deps.packages.foreach(pack => {
- pack.index.getOrElse("anaconda").toLowerCase match {
+ deps.getPackages.foreach(pack => {
+ pack.getIndex.toLowerCase match {
case "anaconda" => installAnacondaPackage(pack)
// case "pypi" => installPyPiPackage(pack)
}
@@ -157,18 +156,18 @@
// adding a local repo because Aether needs one
val repo = new File(System.getProperty("java.io.tmpdir"), "ama-repo")
- val remotes = deps.repos.map(r =>
+ val remotes = deps.getRepos.map(r =>
new RemoteRepository(
- r.id,
- r.`type`,
- r.url
+ r.getId,
+ r.getType,
+ r.getUrl
)).toList.asJava
val aether = new Aether(remotes, repo)
- deps.artifacts.flatMap(a => {
+ deps.getArtifacts.flatMap(a => {
aether.resolve(
- new DefaultArtifact(a.groupId, a.artifactId, "", "jar", a.version),
+ new DefaultArtifact(a.getGroupId, a.getArtifactId, "", "jar", a.getVersion),
JavaScopes.RUNTIME
).map(a => a)
}).map(x => x.getFile.getAbsolutePath)
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
index f673fc5..89e76cb 100644
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
@@ -21,7 +21,7 @@
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.PythonDependencies
+import org.apache.amaterasu.common.execution.dependencise.PythonDependencies
import org.apache.amaterasu.common.logging.Logging
import org.apache.amaterasu.common.runtime.Environment
import org.apache.amaterasu.sdk.AmaterasuRunner
@@ -101,8 +101,8 @@
PySparkEntryPoint.start(spark, jobId, env, SparkEnv.get)
val port = PySparkEntryPoint.getPort
var intpPath = ""
- if (env.configuration.contains("cwd")) {
- val cwd = new File(env.configuration("cwd"))
+ if (env.getConfiguration.containsKey("cwd")) {
+ val cwd = new File(env.getConfiguration.get("cwd"))
intpPath = s"${cwd.getAbsolutePath}/spark_intp.py" // This is to support test environment
} else {
intpPath = s"spark_intp.py"
@@ -132,7 +132,7 @@
var pysparkPython = "/usr/bin/python"
if (pyDeps != null &&
- pyDeps.packages.nonEmpty) {
+ !pyDeps.getPackages.isEmpty) {
pysparkPython = "./miniconda/bin/python"
}
val proc = Process(sparkCmd, None,
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala
index 588d273..450a89a 100644
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala
@@ -128,10 +128,10 @@
.set("spark.submit.pyFiles", pyfiles.mkString(","))
- val master: String = if (env.master.isEmpty) {
+ val master: String = if (env.getMaster.isEmpty) {
"yarn"
} else {
- env.master
+ env.getMaster
}
config.mode match {
@@ -139,7 +139,7 @@
case "mesos" =>
conf.set("spark.executor.uri", s"http://$getNode:${config.Webserver.Port}/spark-${config.Webserver.sparkVersion}.tgz")
.setJars(jars)
- .set("spark.master", env.master)
+ .set("spark.master", env.getMaster)
.set("spark.home", s"${scala.reflect.io.File(".").toCanonical.toString}/spark-${config.Webserver.sparkVersion}")
case "yarn" =>
@@ -196,7 +196,7 @@
sparkSession = SparkSession.builder
.appName(sparkAppName)
- .master(env.master)
+ .master(env.getMaster)
//.enableHiveSupport()
.config(conf).getOrCreate()
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala
index 4d8a9a6..7e089b5 100755
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala
@@ -95,7 +95,7 @@
result match {
case ds: Dataset[_] =>
log.debug(s"persisting DataFrame: $resultName")
- val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.workingDir}/$jobId/$actionName/$resultName")"""
+ val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.getWorkingDir}/$jobId/$actionName/$resultName")"""
val writeResult = interpreter.interpret(writeLine)
if (writeResult != Results.Success) {
val err = outStream.toString
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala
index f91107b..9230bca 100644
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala
@@ -120,7 +120,7 @@
val exportName = exportsBuff.head._1
val exportFormat = exportsBuff.head._2
//notifier.info(s"exporting to -> ${env.workingDir}/$jobId/$actionName/$exportName")
- result.write.mode(SaveMode.Overwrite).format(exportFormat).save(s"${env.workingDir}/$jobId/$actionName/$exportName")
+ result.write.mode(SaveMode.Overwrite).format(exportFormat).save(s"${env.getWorkingDir}/$jobId/$actionName/$exportName")
}
notifier.info(s"================= finished action $actionName =================")
}
diff --git a/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/frameworks/spark/runtime/AmaContext.scala b/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/frameworks/spark/runtime/AmaContext.scala
index fc9fb94..930927c 100644
--- a/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/frameworks/spark/runtime/AmaContext.scala
+++ b/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/frameworks/spark/runtime/AmaContext.scala
@@ -40,7 +40,7 @@
}
def getDataFrame(actionName: String, dfName: String, format: String = "parquet"): DataFrame = {
- spark.read.format(format).load(s"${env.workingDir}/$jobId/$actionName/$dfName")
+ spark.read.format(format).load(s"${env.getWorkingDir}/$jobId/$actionName/$dfName")
}
def getDataset[T: Encoder](actionName: String, dfName: String, format: String = "parquet"): Dataset[T] = {
diff --git a/leader-common/build.gradle b/leader-common/build.gradle
index 7ee53d1..f7c5242 100644
--- a/leader-common/build.gradle
+++ b/leader-common/build.gradle
@@ -69,10 +69,12 @@
compile project(':amaterasu-sdk')
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.3'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.4'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4'
- compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
+ compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.8'
+ compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: '2.9.8'
+
compile group: 'org.reflections', name: 'reflections', version: '0.9.11'
compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '4.2.0.201601211800-r'
compile group: 'org.apache.activemq', name: 'activemq-broker', version: '5.15.3'
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt
new file mode 100644
index 0000000..323cc20
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.utilities
+
+import java.io.File
+import java.io.FileInputStream
+import java.nio.file.Files
+import java.nio.file.Paths
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import com.fasterxml.jackson.module.kotlin.KotlinModule
+import com.fasterxml.jackson.module.kotlin.readValue
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.TaskData
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.dataobjects.ExecData
+
+import org.apache.amaterasu.common.execution.dependencise.Dependencies
+import org.apache.amaterasu.common.execution.dependencise.PythonDependencies
+import org.apache.amaterasu.common.logging.KLogging
+
+import org.apache.amaterasu.common.runtime.Environment
+import org.yaml.snakeyaml.Yaml
+
+
+object DataLoader : KLogging() {
+
+ private val mapper = ObjectMapper()
+
+ private val ymlMapper = ObjectMapper(YAMLFactory())
+
+ init {
+ mapper.registerModule(KotlinModule())
+ ymlMapper.registerModule(KotlinModule())
+ }
+
+ @JvmStatic
+ fun getTaskDataBytes(actionData: ActionData, env: String): ByteArray {
+ return mapper.writeValueAsBytes(getTaskData(actionData, env))
+ }
+
+ @JvmStatic
+ fun getTaskData(actionData: ActionData, env: String): TaskData {
+ val srcFile = actionData.src
+ var src = ""
+
+ if (srcFile.isNotEmpty()) {
+ src = File("repo/src/$srcFile").readText()
+ }
+
+ val envValue = File("repo/env/$env/job.yml").readText()
+
+ val envData = ymlMapper.readValue<Environment>(envValue)
+
+ val exports = actionData.exports
+
+ return TaskData(src, envData, actionData.groupId, actionData.typeId, exports)
+ }
+
+ @JvmStatic
+ fun getTaskDataString(actionData: ActionData, env: String): String {
+ return mapper.writeValueAsString(getTaskData(actionData, env))
+ }
+
+ @JvmStatic
+ fun getExecutorDataBytes(env: String, clusterConf: ClusterConfig): ByteArray {
+ return mapper.writeValueAsBytes(getExecutorData(env, clusterConf))
+ }
+
+ @JvmStatic
+ fun getExecutorData(env: String, clusterConf: ClusterConfig): ExecData {
+
+ // loading the job configuration
+ val envValue = File("repo/env/$env/job.yml").readText() //TODO: change this to YAML
+ val envData = ymlMapper.readValue<Environment>(envValue)
+
+ // loading all additional configurations
+ val files = File("repo/env/$env/").listFiles().filter { it.isFile }.filter { it.name != "job.yml" }
+ val config = files.map { yamlToMap(it) }.toMap()
+
+ // loading the job's dependencies
+ lateinit var depsData: Dependencies
+ lateinit var pyDepsData: PythonDependencies
+ if (Files.exists(Paths.get("repo/deps/jars.yml"))) {
+ val depsValue = File("repo/deps/jars.yml").readText()
+ depsData = ymlMapper.readValue(depsValue)
+ }
+ if (Files.exists(Paths.get("repo/deps/python.yml"))) {
+ val pyDepsValue = File("repo/deps/python.yml").readText()
+ pyDepsData = ymlMapper.readValue(pyDepsValue)
+ }
+ val data = mapper.writeValueAsBytes(ExecData(envData, depsData, pyDepsData, config))
+ return ExecData(envData, depsData, pyDepsData, config)
+ }
+
+ fun yamlToMap(file: File): Pair<String, Map<String, Any>> {
+
+ val yaml = Yaml()
+ val conf = yaml.load<Map<String, Any>>(FileInputStream(file))
+
+ return file.name.replace(".yml", "") to conf
+ }
+
+ @JvmStatic
+ fun getExecutorDataString(env: String, clusterConf: ClusterConfig): String {
+ return mapper.writeValueAsString(getExecutorData(env, clusterConf))
+ }
+
+}
\ No newline at end of file
diff --git a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
index f28e514..e862e1f 100755
--- a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
+++ b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
@@ -1,111 +1,111 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.common.utilities
-
-import java.io.{File, FileInputStream}
-import java.nio.file.{Files, Paths}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.{ActionData, ExecData, TaskData}
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies}
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.yaml.snakeyaml.Yaml
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.io.Source
-
-
-object DataLoader extends Logging {
-
- val mapper = new ObjectMapper()
- mapper.registerModule(DefaultScalaModule)
-
- val ymlMapper = new ObjectMapper(new YAMLFactory())
- ymlMapper.registerModule(DefaultScalaModule)
-
- def getTaskDataBytes(actionData: ActionData, env: String): Array[Byte] = {
- mapper.writeValueAsBytes(getTaskData(actionData, env))
- }
-
- def getTaskData(actionData: ActionData, env: String): TaskData = {
- val srcFile = actionData.getSrc
- var src = ""
-
- if(!srcFile.isEmpty){
- src = Source.fromFile(s"repo/src/$srcFile").mkString
- }
-
- val envValue = Source.fromFile(s"repo/env/$env/job.yml").mkString
-
- val envData = ymlMapper.readValue(envValue, classOf[Environment])
-
- val exports = actionData.getExports.asScala.toMap // Kotlin to Scala TODO: Remove me as fast as you can
-
- TaskData(src, envData, actionData.getGroupId, actionData.getTypeId, exports)
- }
-
- def getTaskDataString(actionData: ActionData, env: String): String = {
- mapper.writeValueAsString(getTaskData(actionData, env))
- }
-
- def getExecutorDataBytes(env: String, clusterConf: ClusterConfig): Array[Byte] = {
- mapper.writeValueAsBytes(getExecutorData(env, clusterConf))
- }
-
- def getExecutorData(env: String, clusterConf: ClusterConfig): ExecData = {
-
- // loading the job configuration
- val envValue = Source.fromFile(s"repo/env/$env/job.yml").mkString //TODO: change this to YAML
- val envData = ymlMapper.readValue(envValue, classOf[Environment])
- // loading all additional configurations
- val files = new File(s"repo/env/$env/").listFiles().filter(_.isFile).filter(_.getName != "job.yml")
- val config = files.map(yamlToMap).toMap
- // loading the job's dependencies
- var depsData: Dependencies = null
- var pyDepsData: PythonDependencies = null
- if (Files.exists(Paths.get("repo/deps/jars.yml"))) {
- val depsValue = Source.fromFile(s"repo/deps/jars.yml").mkString
- depsData = ymlMapper.readValue(depsValue, classOf[Dependencies])
- }
- if (Files.exists(Paths.get("repo/deps/python.yml"))) {
- val pyDepsValue = Source.fromFile(s"repo/deps/python.yml").mkString
- pyDepsData = ymlMapper.readValue(pyDepsValue, classOf[PythonDependencies])
- }
- val data = mapper.writeValueAsBytes(ExecData(envData, depsData, pyDepsData, config))
- ExecData(envData, depsData, pyDepsData, config)
- }
-
- def yamlToMap(file: File): (String, Map[String, Any]) = {
-
- val yaml = new Yaml()
- val conf = yaml.load(new FileInputStream(file)).asInstanceOf[java.util.Map[String, Any]].asScala.toMap
-
- (file.getName.replace(".yml", ""), conf)
- }
-
- def getExecutorDataString(env: String, clusterConf: ClusterConfig): String = {
- mapper.writeValueAsString(getExecutorData(env, clusterConf))
- }
-
-}
-
-class ConfMap[String, T <: ConfMap[String, T]] extends mutable.ListMap[String, Either[String, T]]
\ No newline at end of file
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//package org.apache.amaterasu.leader.common.utilities
+//
+//import java.io.{File, FileInputStream}
+//import java.nio.file.{Files, Paths}
+//
+//import com.fasterxml.jackson.databind.ObjectMapper
+//import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+//import com.fasterxml.jackson.module.scala.DefaultScalaModule
+//import org.apache.amaterasu.common.configuration.ClusterConfig
+//import org.apache.amaterasu.common.dataobjects.{ActionData, ExecData, TaskData}
+//import org.apache.amaterasu.common.execution.dependencise.{Dependencies, PythonDependencies}
+//import org.apache.amaterasu.common.logging.Logging
+//import org.apache.amaterasu.common.runtime.Environment
+//import org.yaml.snakeyaml.Yaml
+//
+//import scala.collection.JavaConverters._
+//import scala.collection.mutable
+//import scala.io.Source
+//
+//
+//object DataLoader extends Logging {
+//
+// val mapper = new ObjectMapper()
+// mapper.registerModule(DefaultScalaModule)
+//
+// val ymlMapper = new ObjectMapper(new YAMLFactory())
+// ymlMapper.registerModule(DefaultScalaModule)
+//
+// def getTaskDataBytes(actionData: ActionData, env: String): Array[Byte] = {
+// mapper.writeValueAsBytes(getTaskData(actionData, env))
+// }
+//
+// def getTaskData(actionData: ActionData, env: String): TaskData = {
+// val srcFile = actionData.getSrc
+// var src = ""
+//
+// if(!srcFile.isEmpty){
+// src = Source.fromFile(s"repo/src/$srcFile").mkString
+// }
+//
+// val envValue = Source.fromFile(s"repo/env/$env/job.yml").mkString
+//
+// val envData = ymlMapper.readValue(envValue, classOf[Environment])
+//
+// val exports = actionData.getExports.asScala.toMap // Kotlin to Scala TODO: Remove me as fast as you can
+//
+// TaskData(src, envData, actionData.getGroupId, actionData.getTypeId, exports)
+// }
+//
+// def getTaskDataString(actionData: ActionData, env: String): String = {
+// mapper.writeValueAsString(getTaskData(actionData, env))
+// }
+//
+// def getExecutorDataBytes(env: String, clusterConf: ClusterConfig): Array[Byte] = {
+// mapper.writeValueAsBytes(getExecutorData(env, clusterConf))
+// }
+//
+// def getExecutorData(env: String, clusterConf: ClusterConfig): ExecData = {
+//
+// // loading the job configuration
+// val envValue = Source.fromFile(s"repo/env/$env/job.yml").mkString //TODO: change this to YAML
+// val envData = ymlMapper.readValue(envValue, classOf[Environment])
+// // loading all additional configurations
+// val files = new File(s"repo/env/$env/").listFiles().filter(_.isFile).filter(_.getName != "job.yml")
+// val config = files.map(yamlToMap).toMap
+// // loading the job's dependencies
+// var depsData: Dependencies = null
+// var pyDepsData: PythonDependencies = null
+// if (Files.exists(Paths.get("repo/deps/jars.yml"))) {
+// val depsValue = Source.fromFile(s"repo/deps/jars.yml").mkString
+// depsData = ymlMapper.readValue(depsValue, classOf[Dependencies])
+// }
+// if (Files.exists(Paths.get("repo/deps/python.yml"))) {
+// val pyDepsValue = Source.fromFile(s"repo/deps/python.yml").mkString
+// pyDepsData = ymlMapper.readValue(pyDepsValue, classOf[PythonDependencies])
+// }
+// val data = mapper.writeValueAsBytes(ExecData(envData, depsData, pyDepsData, config))
+// ExecData(envData, depsData, pyDepsData, config)
+// }
+//
+// def yamlToMap(file: File): (String, Map[String, Any]) = {
+//
+// val yaml = new Yaml()
+// val conf = yaml.load(new FileInputStream(file)).asInstanceOf[java.util.Map[String, Any]].asScala.toMap
+//
+// (file.getName.replace(".yml", ""), conf)
+// }
+//
+// def getExecutorDataString(env: String, clusterConf: ClusterConfig): String = {
+// mapper.writeValueAsString(getExecutorData(env, clusterConf))
+// }
+//
+//}
+//
+//class ConfMap[String, T <: ConfMap[String, T]] extends mutable.ListMap[String, Either[String, T]]
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index 27fd728..6ede8d8 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -162,7 +162,7 @@
val envYaml = configManager.getActionConfigContent(actionData.getName, actionData.getConfig)
writeConfigFile(envYaml, jobManager.getJobId, actionData.getName, "env.yaml")
- val dataStores = DataLoader.getTaskData(actionData, env).exports
+ val dataStores = DataLoader.getTaskData(actionData, env).getExports
val writer = new StringWriter()
yamlMapper.writeValue(writer, dataStores)
val dataStoresYaml = writer.toString