mereged with master
diff --git a/.gitignore b/.gitignore
index dbb51cb..eaefdef 100755
--- a/.gitignore
+++ b/.gitignore
@@ -50,3 +50,6 @@
executor/spark-warehouse/
repo
repo/**
+
+#python
+.zip
\ No newline at end of file
diff --git a/frameworks/jvm-common/build.gradle b/frameworks/jvm-common/build.gradle
new file mode 100644
index 0000000..8747571
--- /dev/null
+++ b/frameworks/jvm-common/build.gradle
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+buildscript {
+ ext.kotlin_version = '1.3.21'
+
+ repositories {
+ mavenCentral()
+ maven {
+ url 'http://repository.jetbrains.com/all'
+ }
+ maven {
+ url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots"
+ }
+ }
+
+ dependencies {
+ classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
+ classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0'
+ }
+}
+
+apply plugin: 'kotlin'
+apply plugin: 'java'
+apply plugin: 'org.junit.platform.gradle.plugin'
+
+junitPlatform {
+ filters {
+ engines {
+ include 'spek'
+ }
+ }
+}
+
+sourceCompatibility = 1.8
+targetCompatibility = 1.8
+
+repositories {
+ mavenCentral()
+ jcenter()
+ maven { url "https://plugins.gradle.org/m2/" }
+ maven { url 'http://repository.jetbrains.com/all' }
+ maven { url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots" }
+ maven { url "http://dl.bintray.com/jetbrains/spek" }
+ maven { url "http://oss.jfrog.org/artifactory/oss-snapshot-local" }
+
+
+}
+
+dependencies {
+ testCompile group: 'junit', name: 'junit', version: '4.11'
+ compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
+ compile "org.jetbrains.kotlin:kotlin-reflect"
+ compile('com.uchuhimo:konf:0.11') {
+ exclude group: 'org.eclipse.jgit'
+ }
+
+ compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: '2.9.8'
+ compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.8'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
+
+ testCompile 'org.jetbrains.spek:spek-api:1.1.5'
+ testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
+ testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5'
+
+ // spek requires kotlin-reflect, can be omitted if already in the classpath
+ testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
+
+
+ testImplementation 'org.junit.platform:junit-platform-runner:1.0.0'
+ testImplementation 'org.junit.platform:junit-platform-launcher:1.0.0'
+ testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.0.0'
+}
+
+sourceSets {
+ test {
+ resources.srcDirs += [file('src/test/resources')]
+ }
+}
+
+compileKotlin {
+ kotlinOptions {
+ jvmTarget = "1.8"
+ }
+}
+compileTestKotlin {
+ kotlinOptions {
+ jvmTarget = "1.8"
+ }
+}
diff --git a/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfig.kt b/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfig.kt
new file mode 100644
index 0000000..000d38d
--- /dev/null
+++ b/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfig.kt
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amaterasu.frameworks.jvm.common.configuration.dataset
+
+import com.fasterxml.jackson.annotation.*
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+data class DataSetConfig(
+ val file: Array<FileConfig> = emptyArray(),
+ val hive: Array<HiveConfig> = emptyArray(),
+ val generic: Array<GenericConfig> = emptyArray()
+)
+
+abstract class Config{
+ abstract val name: String
+ abstract val params: MutableMap<String, String>
+}
+
+data class GenericConfig(override val name: String,
+ @JsonAnySetter override val params: MutableMap<String, String> = mutableMapOf()) : Config()
+
+data class FileConfig(
+ override val name: String,
+ @JsonAnySetter override val params: MutableMap<String, String> = mutableMapOf(),
+ val uri: String,
+ val format: String,
+ val mode: String
+) : Config()
+
+data class HiveConfig(
+ override val name: String,
+ @JsonAnySetter override val params: MutableMap<String, String> = mutableMapOf(),
+ val uri: String,
+ val format: String,
+ val database: String,
+ val table: String
+) : Config()
diff --git a/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManager.kt b/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManager.kt
new file mode 100644
index 0000000..89852e0
--- /dev/null
+++ b/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManager.kt
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.frameworks.jvm.common.configuration.dataset
+
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import com.fasterxml.jackson.module.kotlin.KotlinModule
+import java.io.File
+import java.io.FileInputStream
+
+class DataSetConfigManager(env: String, repoPath: String) {
+
+ private val envFolder = "$repoPath/env/$env"
+ private val mapper = let {
+ val mapper = ObjectMapper(YAMLFactory())
+ mapper.registerModule(KotlinModule())
+ mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
+ mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.OBJECT_AND_NON_CONCRETE)
+ mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_CONCRETE_AND_ARRAYS)
+ mapper
+ }
+
+ var config: DataSetConfig
+
+ init {
+ config = File(envFolder)
+ .listFiles(DATASET_YAML_FILE_FILTER)
+ .fold(DataSetConfig()) { root, file -> mergeConfig(root, parseConfigs(file)) }
+ }
+
+ private fun mergeConfig(acc: DataSetConfig?, curr: DataSetConfig?): DataSetConfig {
+ return if (acc != null && curr != null) {
+ acc.copy(file = acc.file.plus(curr.file),
+ hive = acc.hive.plus(curr.hive),
+ generic = acc.generic.plus(curr.generic)
+ )
+
+ } else if (acc != null) {
+ acc
+ } else curr!!
+ }
+
+ private inline fun parseConfigs(file: File?): DataSetConfig? {
+ FileInputStream(file).use { iStream ->
+ return mapper.readValue(iStream, DataSetConfig::class.java)
+ }
+ }
+
+ fun getFileConfigs(): List<FileConfig> {
+ return config.file.asList()
+ }
+
+ fun getHiveConfigs(): List<HiveConfig> {
+ return config.hive.asList()
+ }
+
+ fun getGenericConfigs(): List<GenericConfig> {
+ return config.generic.asList()
+ }
+
+ fun getFileConfigByName(name: String): FileConfig? {
+ return getFileConfigs().find { conf -> conf.name == name }
+ }
+
+ fun getHiveConfigByName(name: String): HiveConfig? {
+ return getHiveConfigs().find { conf -> conf.name == name }
+ }
+
+ fun getGenericConfigByName(name: String): GenericConfig? {
+ return getGenericConfigs().find { conf -> conf.name == name }
+ }
+
+ companion object {
+ private val DATASET_FILE_PATTERN = """(datasets)(.*)+(.yml|.yaml)$""".toRegex(RegexOption.IGNORE_CASE)
+ val DATASET_YAML_FILE_FILTER = { file: File ->
+ (file.isFile && DATASET_FILE_PATTERN.find(file.name)?.value?.isNotBlank() ?: false)
+ }
+ }
+}
diff --git a/frameworks/jvm-common/src/test/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManagerTests.kt b/frameworks/jvm-common/src/test/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManagerTests.kt
new file mode 100644
index 0000000..3ea5d20
--- /dev/null
+++ b/frameworks/jvm-common/src/test/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManagerTests.kt
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.frameworks.jvm.common.configuration.dataset
+
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import java.io.File
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertNull
+import kotlin.test.assertTrue
+
+object DataSetConfigManagerTests : Spek({
+
+ val marker = DataSetConfigManagerTests::class.java.getResource("/maki.yml")!!.path
+
+ given("a ConfigManager for configuration for datasets") {
+ val repoPath = "${File(marker).parent}/test_repo"
+ val cfg = DataSetConfigManager("test", repoPath)
+
+ it("should retrieve all configurations for File datasets") {
+ val fileConfigs = cfg.getFileConfigs()
+ assertEquals(1, fileConfigs.size)
+ assertEquals("users", fileConfigs[0].name)
+ assertEquals("parquet", fileConfigs[0].format)
+ assertEquals("s3://filestore", fileConfigs[0].uri)
+ assertEquals("overwrite", fileConfigs[0].mode)
+ }
+
+ it("should retrieve all configurations for the Hive datasets") {
+ val hiveConfs = cfg.getHiveConfigs()
+ assertEquals(2, hiveConfs.count())
+ val hiveConfig = cfg.getHiveConfigByName("transactions")!!
+ assertEquals("parquet", hiveConfig.format)
+ assertEquals("/user/somepath", hiveConfig.uri)
+ assertEquals("transations_daily", hiveConfig.database)
+ assertEquals("transx", hiveConfig.table)
+ }
+
+ it("should retrieve specific configuration from the Hive dataset") {
+ val hiveConfig = cfg.getHiveConfigByName("second_transactions")!!
+ assertEquals("avro", hiveConfig.format)
+ assertEquals("/seconduser/somepath", hiveConfig.uri)
+ assertEquals("transations_monthly", hiveConfig.database)
+ assertEquals("avro_table", hiveConfig.table)
+ }
+
+ it("should retrieve all generic configurations") {
+ val genConfs = cfg.getGenericConfigs()
+ assertEquals(1, genConfs.count())
+ val genConfig = cfg.getGenericConfigByName("mydataset")
+ assertEquals("param1value", genConfig!!.params["param1key"])
+ }
+
+
+ it("should return a null Config for an absent name lookup") {
+ val hiveConf = cfg.getHiveConfigByName("non-existent")
+ assertNull(hiveConf)
+ }
+ }
+
+ given("a ConfigManager file filter") {
+ it("should match the files with the expected pattern") {
+ assertTrue(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("datasets", ".yml")))
+ assertTrue(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("otherdatasets", ".yml")))
+ assertTrue(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("my-datasets", ".yaml")))
+ assertTrue(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("my-DATASETS", ".YAML")))
+ }
+
+ it("should filter files with the unexpected pattern") {
+ assertFalse(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("dataset", ".yml")))
+ assertFalse(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("otherdatasets", ".yml1")))
+ assertFalse(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("my-dataseets", ".yaml")))
+ }
+ }
+})
diff --git a/frameworks/jvm-common/src/test/resources/maki.yml b/frameworks/jvm-common/src/test/resources/maki.yml
new file mode 100644
index 0000000..bcb26e2
--- /dev/null
+++ b/frameworks/jvm-common/src/test/resources/maki.yml
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+job-config: env/{env}/job.yml, env/{env}/passwords.yml, env/{env}/spark.yml
+job-name: amaterasu-test
+flow:
+ - name: start
+ runner:
+ group: spark
+ type: scala
+ exports:
+ odd: parquet
+ - name: step2
+ config: src/{action_name}/{env}/
+ runner:
+ group: spark
+ type: scala
+ file: file2.scala
+...
diff --git a/frameworks/jvm-common/src/test/resources/test_repo/env/test/datasets.yml b/frameworks/jvm-common/src/test/resources/test_repo/env/test/datasets.yml
new file mode 100644
index 0000000..002a2fd
--- /dev/null
+++ b/frameworks/jvm-common/src/test/resources/test_repo/env/test/datasets.yml
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+file:
+ - name: users
+ uri: s3://filestore
+ format: parquet
+ mode: overwrite
+ hello: world
+
+generic:
+ - name: mydataset
+ param1key : param1value
+
+hive:
+ - name: transactions
+ uri: /user/somepath
+ format: parquet
+ database: transations_daily
+ table: transx
+
+ - name: second_transactions
+ uri: /seconduser/somepath
+ format: avro
+ database: transations_monthly
+ table: avro_table
\ No newline at end of file
diff --git a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
deleted file mode 100644
index cc71e6a..0000000
--- a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
+++ /dev/null
Binary files differ
diff --git a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
deleted file mode 100644
index 51f23dd..0000000
--- a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
index 273645a..e3e503f 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
@@ -88,6 +88,4 @@
}
return DriverConfiguration(mem, cpu)
}
-
-
}
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt.orig b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt.orig
new file mode 100644
index 0000000..daaab40
--- /dev/null
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt.orig
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
+
+import com.uchuhimo.konf.Config
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
+import org.apache.amaterasu.common.configuration.Job
+
+class PySparkRunnerProvider(env: String, conf: ClusterConfig) : PythonRunnerProviderBase(env, conf) {
+
+ override fun getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String {
+ val sparkProperties: Map<String, Any> = env["sparkProperties"]
+ val sparkOptions: Map<String, Any> = env["sparkOptions"]
+ val command = super.getCommand(jobId, actionData, env, executorId, callbackAddress)
+ val hive = if (conf.mode() == "yarn") "--files \$SPARK_HOME/conf/hive-site.xml " else ""
+ val master = if (!sparkOptions.containsKey("master")) " --master ${env[Job.master]} " else ""
+
+<<<<<<< HEAD
+ return "$command && \$SPARK_HOME/bin/spark-submit $master " +
+ SparkCommandLineHelper.getOptions(sparkOptions) + " " +
+=======
+ return "$command && \$SPARK_HOME/bin/spark-submit $master" +
+ SparkCommandLineHelper.getOptions(sparkOptions) +
+>>>>>>> upstream/master
+ SparkCommandLineHelper.getProperties(sparkProperties) + " " +
+ "--conf spark.pyspark.python=${conf.pythonPath()} " +
+ "--conf spark.pyspark.driver.python=$virtualPythonPath " +
+ hive +
+ " ${actionData.src}"
+ }
+
+ override fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> = arrayOf()
+
+ override val runnerResources: Array<String>
+ get() {
+ var resources = super.runnerResources
+ resources += "amaterasu_pyspark-${conf.version()}.zip"
+ //log.info("PYSPARK RESOURCES ==> ${resources.toSet}")
+ return resources
+ }
+
+ override val hasExecutor: Boolean = false
+}
\ No newline at end of file
diff --git a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
deleted file mode 100644
index 3a66b65..0000000
--- a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
+++ /dev/null
Binary files differ
diff --git a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.scala b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.kt
similarity index 70%
rename from leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.scala
rename to leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.kt
index c9df942..5c32730 100644
--- a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.scala
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.kt
@@ -17,18 +17,15 @@
package org.apache.amaterasu.leader.common.utilities
object MemoryFormatParser {
+ fun extractMegabytes(input: String): Int {
+ val lower = input.toLowerCase()
- def extractMegabytes(input: String): Int = {
- var result: Int = 0
- val lower = input.toLowerCase
- if (lower.contains("mb")) {
- result = lower.replace("mb", "").toInt
- } else if (lower.contains("gb") | lower.contains("g")) {
- result = lower.replace("g", "").replace("b", "").toInt * 1024
- } else {
- result = lower.toInt
+ return if (lower.contains("mb")) {
+ lower.replace("mb", "").toInt()
+ } else if (lower.contains("gb") || lower.contains("g")) {
+ lower.replace("g", "").replace("b", "").toInt() * 1024
+ } else {
+ lower.toInt()
+ }
}
-
- result
- }
-}
+}
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
index 34b83d4..a7b2e47 100644
--- a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
@@ -16,6 +16,7 @@
*/
package org.apache.amaterasu.leader.common.configuration
+import com.uchuhimo.konf.source.yaml.toYaml
import org.apache.amaterasu.common.configuration.ConfigManager
import org.apache.amaterasu.common.configuration.Job
import org.jetbrains.spek.api.Spek
@@ -77,21 +78,25 @@
val repoPath = "${File(marker).parent}/spark_repo"
val cfg = ConfigManager("test", repoPath, listOf("sparkProperties"))
+ val actionConf = cfg.getActionConfiguration("")
+
it("load the framework configuration for spark") {
- val spark: Map<String, String> = cfg.config["sparkProperties"]
- assertEquals(spark["spark.executor.memory"], "1g")
+ val spark: Map<String, Any> = actionConf["sparkProperties"]
+ assertEquals(spark["spark.executor.memory"].toString(), "1g")
}
it("loads int values as strings") {
- val spark: Map<String, String> = cfg.config["sparkProperties"]
+ val spark: Map<String, Any> = actionConf["sparkProperties"]
+ val x = spark.map { "--conf $it" }.joinToString(separator = " ")
+ println(x)
assertEquals(spark["spark.driver.cores"].toString(), "2")
}
- //TODO: Create spark specific tests
-// it("should be converted correctly to spark-submit parameters") {
-// val spark: Map<String, Any> = cfg.config["sparkProperties"]
-// val x = spark.map { "--conf $it" }.joinToString(separator = " ")
-// println(x)
-// }
+// //TODO: Create spark specific tests
+//// it("should be converted correctly to spark-submit parameters") {
+//// val spark: Map<String, Any> = cfg.config["sparkProperties"]
+//// val x = spark.map { "--conf $it" }.joinToString(separator = " ")
+//// println(x)
+//// }
}
})
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index d50bc25..866d9e2 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -255,8 +255,7 @@
.setExecutable(false)
.setExtract(false)
.build())
- }
- )
+ })
// Getting action dependencies
runnerProvider.getActionDependencies(jobManager.getJobId, actionData).foreach(r => {
diff --git a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
deleted file mode 100644
index 8c94531..0000000
--- a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
+++ /dev/null
Binary files differ
diff --git a/settings.gradle b/settings.gradle
index 28f54ad..9e757dc 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -57,6 +57,9 @@
include 'python-pandas'
project(':python-pandas').projectDir=file("frameworks/python/pandas_runtime")
+include 'jvm-common'
+project(':jvm-common').projectDir=file("frameworks/jvm-common")
+
File spekProject = file("../../kotlin/spek")
if (spekProject.exists()) {
// Composite build