Merge pull request #51 from roadan/AMATERASU-30
Amaterasu 30
diff --git a/.gitignore b/.gitignore
index dbb51cb..eaefdef 100755
--- a/.gitignore
+++ b/.gitignore
@@ -50,3 +50,6 @@
executor/spark-warehouse/
repo
repo/**
+
+#python
+.zip
\ No newline at end of file
diff --git a/common/build.gradle b/common/build.gradle
index 245da82..38f2393 100644
--- a/common/build.gradle
+++ b/common/build.gradle
@@ -74,6 +74,10 @@
exclude group: 'org.jboss.netty'
}
+ compile('com.uchuhimo:konf:0.11') {
+ exclude group: 'org.eclipse.jgit'
+ }
+
compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/ConfigManager.kt
similarity index 97%
rename from leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt
rename to common/src/main/kotlin/org/apache/amaterasu/common/configuration/ConfigManager.kt
index 00b640a..5d317c2 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/ConfigManager.kt
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.amaterasu.leader.common.configuration
+package org.apache.amaterasu.common.configuration
import com.uchuhimo.konf.Config
import com.uchuhimo.konf.source.yaml.toYaml
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/GenericSpec.kt
similarity index 94%
rename from leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt
rename to common/src/main/kotlin/org/apache/amaterasu/common/configuration/GenericSpec.kt
index ed6fa9e..8299f96 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/GenericSpec.kt
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.amaterasu.leader.common.configuration
+package org.apache.amaterasu.common.configuration
import com.uchuhimo.konf.ConfigSpec
import com.uchuhimo.konf.OptionalItem
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/Job.kt
similarity index 94%
rename from leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt
rename to common/src/main/kotlin/org/apache/amaterasu/common/configuration/Job.kt
index 076b0af..7a1258b 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/Job.kt
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.amaterasu.leader.common.configuration
+package org.apache.amaterasu.common.configuration
import com.uchuhimo.konf.ConfigSpec
diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
index b19f82f..9d83c65 100644
--- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
+++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
@@ -19,6 +19,7 @@
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.BasicPythonRunnerProvider
import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PandasRunnerProvider
+import org.apache.amaterasu.common.configuration.ConfigManager
import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration
@@ -33,8 +34,10 @@
get() = "python"
override val groupResources: Array<File>
get() = arrayOf()
- override val driverConfiguration: DriverConfiguration
- get() = DriverConfiguration(conf!!.taskMem(), 1)
+
+ override fun getDriverConfiguration(configManager: ConfigManager): DriverConfiguration {
+ return DriverConfiguration(conf!!.taskMem(), 1) //TODO: this should be configured on env level
+ }
override val environmentVariables: Map<String, String>
get() = mapOf()
override val configurationItems: Array<String>
diff --git a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
deleted file mode 100644
index 6541930..0000000
--- a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
+++ /dev/null
Binary files differ
diff --git a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
deleted file mode 100644
index 6e6e599..0000000
--- a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
index 007dcf0..7a32b9e 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
@@ -17,9 +17,9 @@
package org.apache.amaterasu.frameworks.spark.dispatcher
import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.configuration.ConfigManager
import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers.PySparkRunnerProvider
import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers.SparkSubmitScalaRunnerProvider
-import org.apache.amaterasu.leader.common.utilities.DataLoader
import org.apache.amaterasu.leader.common.utilities.MemoryFormatParser
import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
@@ -32,11 +32,6 @@
private lateinit var env: String
private lateinit var conf: ClusterConfig
- private val sparkExecConfigurations: Map<String, Any> by lazy {
- val execData = DataLoader.getExecutorData(env, conf)
- execData.configurations["spark"].orEmpty()
- }
-
private lateinit var runnerProviders: Map<String, RunnerSetupProvider>
override fun init(env: String, conf: ClusterConfig) {
@@ -72,32 +67,25 @@
}
override val groupIdentifier: String = "spark"
- override val configurationItems = arrayOf("sparkConfiguration", "sparkExecutor")
+ override val configurationItems = arrayOf("sparkProperties", "sparkOptions")
- override val driverConfiguration: DriverConfiguration
- get() {
- //TODO: Spark configuration sould come for the ENV only
- val sparkOpts = conf.spark().opts()
- val cpu: Int = when {
- sparkExecConfigurations.containsKey("spark.yarn.am.cores") -> sparkExecConfigurations["spark.yarn.am.cores"].toString().toInt()
- sparkExecConfigurations.containsKey("spark.driver.cores") -> sparkExecConfigurations["spark.driver.cores"].toString().toInt()
- sparkOpts.contains("yarn.am.cores") -> sparkOpts["yarn.am.cores"].toString().toInt()
- sparkOpts.contains("driver.cores") -> sparkOpts["driver.cores"].toString().toInt()
- conf.yarn().worker().cores() > 0 -> conf.yarn().worker().cores()
- else -> 1
- }
+ override fun getDriverConfiguration(configManager: ConfigManager): DriverConfiguration {
+ val sparkOptions: Map<String, Any> = configManager.config["sparkOptions"]
+ val sparkProperties: Map<String, Any> = configManager.config["sparkProperties"]
- val mem: Int = when {
- sparkExecConfigurations.containsKey("spark.yarn.am.memory") -> MemoryFormatParser.extractMegabytes(sparkExecConfigurations["spark.yarn.am.memory"].toString())
- sparkExecConfigurations.containsKey("spark.driver.memeory") -> MemoryFormatParser.extractMegabytes(sparkExecConfigurations["spark.driver.memeory"].toString())
- sparkOpts.contains("yarn.am.memory") -> MemoryFormatParser.extractMegabytes(sparkOpts["yarn.am.memory"].get())
- sparkOpts.contains("driver.memory") -> MemoryFormatParser.extractMegabytes(sparkOpts["driver.memory"].get())
- conf.yarn().worker().memoryMB() > 0 -> conf.yarn().worker().memoryMB()
- conf.taskMem() > 0 -> conf.taskMem()
- else -> 1024
- }
- return DriverConfiguration(mem, cpu)
+ val cpu: Int = when {
+ sparkOptions.containsKey("driver-cores") -> sparkOptions["driver-cores"].toString().toInt()
+ sparkProperties.containsKey("spark.yarn.am.cores") -> sparkProperties["spark.yarn.am.cores"].toString().toInt()
+ sparkProperties.containsKey("spark.driver.cores") -> sparkProperties["spark.driver.cores"].toString().toInt()
+ else -> 1
}
-
+ val mem: Int = when {
+ sparkOptions.containsKey("driver-memory") -> MemoryFormatParser.extractMegabytes(sparkOptions["driver-memory"].toString())
+ sparkProperties.containsKey("spark.yarn.am.memory") -> MemoryFormatParser.extractMegabytes(sparkProperties["spark.yarn.am.memory"].toString())
+ sparkProperties.containsKey("spark.driver.memeory") -> MemoryFormatParser.extractMegabytes(sparkProperties["spark.driver.memeory"].toString())
+ else -> 1024
+ }
+ return DriverConfiguration(mem, cpu)
+ }
}
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
index 1d8e3ce..d716427 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
@@ -20,21 +20,26 @@
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
-import org.apache.amaterasu.leader.common.configuration.Job
+import org.apache.amaterasu.common.configuration.Job
class PySparkRunnerProvider(env: String, conf: ClusterConfig) : PythonRunnerProviderBase(env, conf) {
override fun getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String {
- val command = super.getCommand(jobId, actionData , env, executorId, callbackAddress)
- command + " && \$SPARK_HOME/bin/spark-submit --master ${env[Job.master]} " +
+ val sparkProperties: Map<String, Any> = env["sparkProperties"]
+ val sparkOptions: Map<String, Any> = env["sparkOptions"]
+ val command = super.getCommand(jobId, actionData, env, executorId, callbackAddress)
+ val hive = if (conf.mode() == "yarn") "--files \$SPARK_HOME/conf/hive-site.xml " else ""
+ val master = if (!sparkOptions.containsKey("master")) " --master ${env[Job.master]} " else ""
+
+ return "$command && \$SPARK_HOME/bin/spark-submit $master" +
+ SparkCommandLineHelper.getOptions(sparkOptions) +
+ SparkCommandLineHelper.getProperties(sparkProperties) + " " +
"--conf spark.pyspark.python=${conf.pythonPath()} " +
"--conf spark.pyspark.driver.python=$virtualPythonPath " +
- "--files \$SPARK_HOME/conf/hive-site.xml ${actionData.src}"
-
- return command
+ hive +
+ " ${actionData.src}"
}
-
override fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> = arrayOf()
override val runnerResources: Array<String>
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkCommandLineHelper.kt
similarity index 65%
copy from leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt
copy to frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkCommandLineHelper.kt
index ed6fa9e..e60eb11 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkCommandLineHelper.kt
@@ -14,12 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.amaterasu.leader.common.configuration
+package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
-import com.uchuhimo.konf.ConfigSpec
-import com.uchuhimo.konf.OptionalItem
+object SparkCommandLineHelper {
-class GenericSpec(configurationItem: String) {
- val spec = ConfigSpec()
- val items = OptionalItem(spec, configurationItem, emptyMap<String, String>())
+ fun getOptions(sparkOptions: Map<String, Any>): String {
+ return sparkOptions.map { "--${it.key} ${it.value}" }.joinToString( separator = " ")
+ }
+
+ fun getProperties(sparkProps: Map<String, Any>): String{
+ return sparkProps.map { "--conf $it" }.joinToString( separator = " ")
+ }
}
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
index f13e7c9..a607d75 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
@@ -20,17 +20,24 @@
import org.apache.amaterasu.common.configuration.ClusterConfig
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.common.utils.ArtifactUtil
-import org.apache.amaterasu.leader.common.configuration.Job
+import org.apache.amaterasu.common.configuration.ConfigManager
+import org.apache.amaterasu.common.configuration.Job
import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+
class SparkSubmitScalaRunnerProvider(val conf: ClusterConfig) : RunnerSetupProvider() {
override fun getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String {
val util = ArtifactUtil(listOf(actionData.repo), jobId)
val classParam = if (actionData.hasArtifact) " --class ${actionData.entryClass}" else ""
+ val sparkProperties: Map<String, Any> = env["sparkProperties"]
+ val sparkOptions: Map<String, Any> = env["sparkOptions"]
+ val master = if (!sparkOptions.containsKey("master")) " --master ${env[Job.master]} " else ""
return "\$SPARK_HOME/bin/spark-submit $classParam ${util.getLocalArtifacts(actionData.artifact).first().name} " +
- " --master ${env[Job.master]}" +
+ SparkCommandLineHelper.getOptions(sparkOptions) +
+ SparkCommandLineHelper.getProperties(sparkProperties) +
+ master +
" --jars spark-runtime-${conf.version()}.jar >&1"
}
@@ -39,6 +46,7 @@
override fun getActionDependencies(jobId: String, actionData: ActionData): Array<String> = arrayOf()
override val hasExecutor: Boolean = false
+
override val runnerResources: Array<String> = arrayOf()
}
\ No newline at end of file
diff --git a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
index 377a241..b363058 100644
--- a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
+++ b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
Binary files differ
diff --git a/frameworks/spark/pyspark_runtime/tests/env.yml b/frameworks/spark/pyspark_runtime/tests/env.yml
index 6eabc26..1268c0c 100644
--- a/frameworks/spark/pyspark_runtime/tests/env.yml
+++ b/frameworks/spark/pyspark_runtime/tests/env.yml
@@ -24,7 +24,7 @@
configuration:
spark.cassandra.connection.host: 127.0.0.1
sourceTable: documents
-sparkConfiguration:
+sparkProperties:
spark.executor.extraJavaOptions: -XX:+PrintGCDetails
spark.executor.memory: 1g
spark.driver.memeory: 2g
diff --git a/leader-common/build.gradle b/leader-common/build.gradle
index 0adb406..928a444 100644
--- a/leader-common/build.gradle
+++ b/leader-common/build.gradle
@@ -89,9 +89,7 @@
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect"
- compile('com.uchuhimo:konf:0.11') {
- exclude group: 'org.eclipse.jgit'
- }
+
testCompile 'org.jetbrains.spek:spek-api:1.1.5'
testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
@@ -106,6 +104,9 @@
sourceSets {
test {
resources.srcDirs += [file('src/test/resources')]
+ kotlin {
+ srcDirs = ['src/test/kotlin']
+ }
}
}
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
index 78e06fa..48f619a 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
@@ -17,6 +17,7 @@
package org.apache.amaterasu.leader.common.execution.frameworls
import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.configuration.ConfigManager
import org.apache.amaterasu.common.logging.KLogging
import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
import org.reflections.Reflections
diff --git a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.scala b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.kt
similarity index 70%
rename from leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.scala
rename to leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.kt
index c9df942..5c32730 100644
--- a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.scala
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.kt
@@ -17,18 +17,15 @@
package org.apache.amaterasu.leader.common.utilities
object MemoryFormatParser {
+ fun extractMegabytes(input: String): Int {
+ val lower = input.toLowerCase()
- def extractMegabytes(input: String): Int = {
- var result: Int = 0
- val lower = input.toLowerCase
- if (lower.contains("mb")) {
- result = lower.replace("mb", "").toInt
- } else if (lower.contains("gb") | lower.contains("g")) {
- result = lower.replace("g", "").replace("b", "").toInt * 1024
- } else {
- result = lower.toInt
+ return if (lower.contains("mb")) {
+ lower.replace("mb", "").toInt()
+ } else if (lower.contains("gb") || lower.contains("g")) {
+ lower.replace("g", "").replace("b", "").toInt() * 1024
+ } else {
+ lower.toInt()
+ }
}
-
- result
- }
-}
+}
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
index 0e87cae..a7b2e47 100644
--- a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
@@ -16,6 +16,9 @@
*/
package org.apache.amaterasu.leader.common.configuration
+import com.uchuhimo.konf.source.yaml.toYaml
+import org.apache.amaterasu.common.configuration.ConfigManager
+import org.apache.amaterasu.common.configuration.Job
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
@@ -32,38 +35,38 @@
val repoPath = "${File(marker).parent}/test_repo"
val cfg = ConfigManager("test", repoPath)
- it("loads the job level environment"){
- assertEquals(cfg.config[Job.master] , "yarn")
+ it("loads the job level environment") {
+ assertEquals(cfg.config[Job.master], "yarn")
}
on("getting an env for an action with default path") {
val startConf = cfg.getActionConfiguration("start")
- it("loads the specific configuration defined in the actions folder"){
- assertEquals(startConf[Job.master] , "mesos")
+ it("loads the specific configuration defined in the actions folder") {
+ assertEquals(startConf[Job.master], "mesos")
}
}
- on("getting an env for an action with a conf: property in the maki.yml"){
+ on("getting an env for an action with a conf: property in the maki.yml") {
val step2conf = cfg.getActionConfiguration("step2", "src/{action_name}/{env}/")
- it("loads the specific configuration defined in the actions folder"){
- assertEquals(step2conf[Job.name] , "test2")
+ it("loads the specific configuration defined in the actions folder") {
+ assertEquals(step2conf[Job.name], "test2")
}
}
- on("getting an env for an action with no action level config"){
+ on("getting an env for an action with no action level config") {
val step3conf = cfg.getActionConfiguration("step3")
- it("loads only the job level conf"){
- assertEquals(step3conf[Job.name] , "test")
+ it("loads only the job level conf") {
+ assertEquals(step3conf[Job.name], "test")
}
}
- on("receiving a path to a specific file" ){
+ on("receiving a path to a specific file") {
val step4conf = cfg.getActionConfiguration("step4", "src/start/env/{env}/job.yml")
- it("loads the specific configuration from the file"){
- assertEquals(step4conf[Job.master] , "mesos")
+ it("loads the specific configuration from the file") {
+ assertEquals(step4conf[Job.master], "mesos")
}
}
@@ -73,11 +76,27 @@
given("a ConfigManager for a job with spark framework") {
val repoPath = "${File(marker).parent}/spark_repo"
- val cfg = ConfigManager("test", repoPath, listOf("sparkConfiguration"))
+ val cfg = ConfigManager("test", repoPath, listOf("sparkProperties"))
- it("load the framework configuration for spark"){
- val spark: Map<String, String> = cfg.config["sparkConfiguration"]
- assertEquals(spark["spark.executor.memory"], "1g")
+ val actionConf = cfg.getActionConfiguration("")
+
+ it("load the framework configuration for spark") {
+ val spark: Map<String, Any> = actionConf["sparkProperties"]
+ assertEquals(spark["spark.executor.memory"].toString(), "1g")
}
+
+ it("loads int values as strings") {
+ val spark: Map<String, Any> = actionConf["sparkProperties"]
+ val x = spark.map { "--conf $it" }.joinToString(separator = " ")
+ println(x)
+ assertEquals(spark["spark.driver.cores"].toString(), "2")
+ }
+
+// //TODO: Create spark specific tests
+//// it("should be converted correctly to spark-submit parameters") {
+//// val spark: Map<String, Any> = cfg.config["sparkProperties"]
+//// val x = spark.map { "--conf $it" }.joinToString(separator = " ")
+//// println(x)
+//// }
}
})
\ No newline at end of file
diff --git a/leader-common/src/test/resources/spark_repo/env/test/spark.yml b/leader-common/src/test/resources/spark_repo/env/test/spark.yml
index 85f1431..983b5e2 100644
--- a/leader-common/src/test/resources/spark_repo/env/test/spark.yml
+++ b/leader-common/src/test/resources/spark_repo/env/test/spark.yml
@@ -1,3 +1,4 @@
-sparkConfiguration:
+sparkProperties:
spark.executor.extraJavaOptions: -XX:+PrintGCDetails
- spark.executor.memory: 1g
\ No newline at end of file
+ spark.executor.memory: 1g
+ spark.driver.cores: 2
\ No newline at end of file
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
index a53ee77..35f3800 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -20,7 +20,6 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.fasterxml.jackson.module.kotlin.KotlinModule
-import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import org.apache.activemq.broker.BrokerService
@@ -29,7 +28,7 @@
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.common.logging.KLogging
import org.apache.amaterasu.common.utils.ActiveNotifier
-import org.apache.amaterasu.leader.common.configuration.ConfigManager
+import org.apache.amaterasu.common.configuration.ConfigManager
import org.apache.amaterasu.leader.common.execution.JobLoader
import org.apache.amaterasu.leader.common.execution.JobManager
import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
@@ -156,7 +155,7 @@
notifier.info("requesting container fo ${actionData.name}")
val frameworkProvider = frameworkFactory.getFramework(actionData.groupId)
- val driverConfiguration = frameworkProvider.driverConfiguration
+ val driverConfiguration = frameworkProvider.getDriverConfiguration(configManager)
var mem: Long = driverConfiguration.memory.toLong()
mem = Math.min(mem, maxMem)
@@ -242,7 +241,6 @@
val runnerProvider = framework.getRunnerProvider(actionData.typeId)
val ctx = Records.newRecord(ContainerLaunchContext::class.java)
-
val envConf = configManager.getActionConfiguration(actionData.name, actionData.config)
val commands: List<String> = listOf(runnerProvider.getCommand(jobManager.jobId, actionData, envConf, "${actionData.id}-${container.id.containerId}", address))
@@ -258,7 +256,7 @@
jobManager.actionStarted(actionData.id)
containersIdsToTask[container.id.containerId] = actionData
notifier.info("created container for ${actionData.name} created")
- //ctx.localResources.forEach { t: String, u: LocalResource -> notifier.info("resource: $t = ${u.resource}") }
+
log.info("launching container succeeded: ${container.id.containerId}; task: ${actionData.id}")
} catch (e: Exception) {
notifier.error("", "error launching container with ${e.message} in ${ExceptionUtils.getStackTrace(e)}")
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index a2f470a..866d9e2 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -26,12 +26,11 @@
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.configuration.{ClusterConfig, ConfigManager}
import org.apache.amaterasu.common.configuration.enums.ActionStatus
import org.apache.amaterasu.common.dataobjects.ActionData
import org.apache.amaterasu.common.execution.actions.Notification
import org.apache.amaterasu.common.execution.actions.enums.{NotificationLevel, NotificationType}
-import org.apache.amaterasu.leader.common.configuration.ConfigManager
import org.apache.amaterasu.leader.common.execution.{JobLoader, JobManager}
import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
import org.apache.amaterasu.leader.common.utilities.DataLoader
@@ -177,6 +176,11 @@
try {
val actionData = jobManager.getNextActionData
if (actionData != null) {
+
+ frameworkFactory = FrameworkProvidersFactory(env, config)
+ val items = frameworkFactory.providers.values.flatMap(_.getConfigurationItems).toList.asJava
+ configManager = new ConfigManager(env, "repo", items)
+
val taskId = Protos.TaskID.newBuilder().setValue(actionData.getId).build()
taskIdsToActions.put(taskId, actionData.getName)
// setting up the configuration files for the container
@@ -203,12 +207,11 @@
val slaveActions = executionMap(offer.getSlaveId.toString)
slaveActions.put(taskId.getValue, ActionStatus.Started)
- log.info(s">>>> Framework: ${actionData.getGroupId}")
val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
- log.info(s">>>> Runner: ${actionData.getTypeId}")
val runnerProvider = frameworkProvider.getRunnerProvider(actionData.getTypeId)
+ printNotification(new Notification("", s"provider ${runnerProvider.getClass.getName}", NotificationType.Info, NotificationLevel.Execution))
// searching for an executor that already exist on the slave, if non exist
// we create a new one
var executor: ExecutorInfo = null
@@ -252,8 +255,7 @@
.setExecutable(false)
.setExtract(false)
.build())
- }
- )
+ })
// Getting action dependencies
runnerProvider.getActionDependencies(jobManager.getJobId, actionData).foreach(r => {
@@ -315,7 +317,7 @@
slavesExecutors.put(offer.getSlaveId.getValue, executor)
- val driverConfiguration = frameworkProvider.getDriverConfiguration
+ val driverConfiguration = frameworkProvider.getDriverConfiguration(configManager)
var actionTask: TaskInfo = null
@@ -413,9 +415,8 @@
}
- frameworkFactory = FrameworkProvidersFactory(env, config)
- val items = frameworkFactory.providers.values.flatMap(_.getConfigurationItems).toList.asJava
- configManager = new ConfigManager(env, "repo", items)
+
+
jobManager.start()
diff --git a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt
index d54c422..f68922e 100644
--- a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt
+++ b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt
@@ -17,6 +17,7 @@
package org.apache.amaterasu.sdk.frameworks
import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.configuration.ConfigManager
import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration
import java.io.File
@@ -27,8 +28,6 @@
val groupResources: Array<File>
- val driverConfiguration: DriverConfiguration
-
val environmentVariables: Map<String, String>
val configurationItems: Array<String>
@@ -37,4 +36,5 @@
fun getRunnerProvider(runnerId: String): RunnerSetupProvider
+ fun getDriverConfiguration(configManager: ConfigManager): DriverConfiguration
}
\ No newline at end of file
diff --git a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
deleted file mode 100644
index 9c18986..0000000
--- a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
+++ /dev/null
Binary files differ