Merge pull request #53 from arunma/AMA-52-Revised
[AMATERASU-52] Read configuration from datasets.yaml for File, Hive a…
diff --git a/frameworks/jvm-common/build.gradle b/frameworks/jvm-common/build.gradle
new file mode 100644
index 0000000..8747571
--- /dev/null
+++ b/frameworks/jvm-common/build.gradle
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+buildscript {
+ ext.kotlin_version = '1.3.21'
+
+ repositories {
+ mavenCentral()
+ maven {
+ url 'http://repository.jetbrains.com/all'
+ }
+ maven {
+ url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots"
+ }
+ }
+
+ dependencies {
+ classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
+ classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0'
+ }
+}
+
+apply plugin: 'kotlin'
+apply plugin: 'java'
+apply plugin: 'org.junit.platform.gradle.plugin'
+
+junitPlatform {
+ filters {
+ engines {
+ include 'spek'
+ }
+ }
+}
+
+sourceCompatibility = 1.8
+targetCompatibility = 1.8
+
+repositories {
+ mavenCentral()
+ jcenter()
+ maven { url "https://plugins.gradle.org/m2/" }
+ maven { url 'http://repository.jetbrains.com/all' }
+ maven { url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots" }
+ maven { url "http://dl.bintray.com/jetbrains/spek" }
+ maven { url "http://oss.jfrog.org/artifactory/oss-snapshot-local" }
+
+
+}
+
+dependencies {
+ testCompile group: 'junit', name: 'junit', version: '4.11'
+ compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
+ compile "org.jetbrains.kotlin:kotlin-reflect"
+ compile('com.uchuhimo:konf:0.11') {
+ exclude group: 'org.eclipse.jgit'
+ }
+
+ compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: '2.9.8'
+ compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.8'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
+ compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
+
+ testCompile 'org.jetbrains.spek:spek-api:1.1.5'
+ testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
+ testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5'
+
+ // spek requires kotlin-reflect, can be omitted if already in the classpath
+ testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
+
+
+ testImplementation 'org.junit.platform:junit-platform-runner:1.0.0'
+ testImplementation 'org.junit.platform:junit-platform-launcher:1.0.0'
+ testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.0.0'
+}
+
+sourceSets {
+ test {
+ resources.srcDirs += [file('src/test/resources')]
+ }
+}
+
+compileKotlin {
+ kotlinOptions {
+ jvmTarget = "1.8"
+ }
+}
+compileTestKotlin {
+ kotlinOptions {
+ jvmTarget = "1.8"
+ }
+}
diff --git a/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfig.kt b/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfig.kt
new file mode 100644
index 0000000..000d38d
--- /dev/null
+++ b/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfig.kt
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amaterasu.frameworks.jvm.common.configuration.dataset
+
+import com.fasterxml.jackson.annotation.*
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+data class DataSetConfig(
+ val file: Array<FileConfig> = emptyArray(),
+ val hive: Array<HiveConfig> = emptyArray(),
+ val generic: Array<GenericConfig> = emptyArray()
+)
+
+abstract class Config{
+ abstract val name: String
+ abstract val params: MutableMap<String, String>
+}
+
+data class GenericConfig(override val name: String,
+ @JsonAnySetter override val params: MutableMap<String, String> = mutableMapOf()) : Config()
+
+data class FileConfig(
+ override val name: String,
+ @JsonAnySetter override val params: MutableMap<String, String> = mutableMapOf(),
+ val uri: String,
+ val format: String,
+ val mode: String
+) : Config()
+
+data class HiveConfig(
+ override val name: String,
+ @JsonAnySetter override val params: MutableMap<String, String> = mutableMapOf(),
+ val uri: String,
+ val format: String,
+ val database: String,
+ val table: String
+) : Config()
diff --git a/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManager.kt b/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManager.kt
new file mode 100644
index 0000000..89852e0
--- /dev/null
+++ b/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManager.kt
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.frameworks.jvm.common.configuration.dataset
+
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import com.fasterxml.jackson.module.kotlin.KotlinModule
+import java.io.File
+import java.io.FileInputStream
+
+class DataSetConfigManager(env: String, repoPath: String) {
+
+ private val envFolder = "$repoPath/env/$env"
+ private val mapper = let {
+ val mapper = ObjectMapper(YAMLFactory())
+ mapper.registerModule(KotlinModule())
+ mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
+ mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.OBJECT_AND_NON_CONCRETE)
+ mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_CONCRETE_AND_ARRAYS)
+ mapper
+ }
+
+ var config: DataSetConfig
+
+ init {
+ config = File(envFolder)
+ .listFiles(DATASET_YAML_FILE_FILTER)
+ .fold(DataSetConfig()) { root, file -> mergeConfig(root, parseConfigs(file)) }
+ }
+
+ private fun mergeConfig(acc: DataSetConfig?, curr: DataSetConfig?): DataSetConfig {
+ return if (acc != null && curr != null) {
+ acc.copy(file = acc.file.plus(curr.file),
+ hive = acc.hive.plus(curr.hive),
+ generic = acc.generic.plus(curr.generic)
+ )
+
+ } else if (acc != null) {
+ acc
+ } else curr!!
+ }
+
+ private inline fun parseConfigs(file: File?): DataSetConfig? {
+ FileInputStream(file).use { iStream ->
+ return mapper.readValue(iStream, DataSetConfig::class.java)
+ }
+ }
+
+ fun getFileConfigs(): List<FileConfig> {
+ return config.file.asList()
+ }
+
+ fun getHiveConfigs(): List<HiveConfig> {
+ return config.hive.asList()
+ }
+
+ fun getGenericConfigs(): List<GenericConfig> {
+ return config.generic.asList()
+ }
+
+ fun getFileConfigByName(name: String): FileConfig? {
+ return getFileConfigs().find { conf -> conf.name == name }
+ }
+
+ fun getHiveConfigByName(name: String): HiveConfig? {
+ return getHiveConfigs().find { conf -> conf.name == name }
+ }
+
+ fun getGenericConfigByName(name: String): GenericConfig? {
+ return getGenericConfigs().find { conf -> conf.name == name }
+ }
+
+ companion object {
+ private val DATASET_FILE_PATTERN = """(datasets)(.*)+(.yml|.yaml)$""".toRegex(RegexOption.IGNORE_CASE)
+ val DATASET_YAML_FILE_FILTER = { file: File ->
+ (file.isFile && DATASET_FILE_PATTERN.find(file.name)?.value?.isNotBlank() ?: false)
+ }
+ }
+}
diff --git a/frameworks/jvm-common/src/test/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManagerTests.kt b/frameworks/jvm-common/src/test/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManagerTests.kt
new file mode 100644
index 0000000..3ea5d20
--- /dev/null
+++ b/frameworks/jvm-common/src/test/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManagerTests.kt
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.frameworks.jvm.common.configuration.dataset
+
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import java.io.File
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertNull
+import kotlin.test.assertTrue
+
+object DataSetConfigManagerTests : Spek({
+
+ val marker = DataSetConfigManagerTests::class.java.getResource("/maki.yml")!!.path
+
+ given("a ConfigManager for configuration for datasets") {
+ val repoPath = "${File(marker).parent}/test_repo"
+ val cfg = DataSetConfigManager("test", repoPath)
+
+ it("should retrieve all configurations for File datasets") {
+ val fileConfigs = cfg.getFileConfigs()
+ assertEquals(1, fileConfigs.size)
+ assertEquals("users", fileConfigs[0].name)
+ assertEquals("parquet", fileConfigs[0].format)
+ assertEquals("s3://filestore", fileConfigs[0].uri)
+ assertEquals("overwrite", fileConfigs[0].mode)
+ }
+
+ it("should retrieve all configurations for the Hive datasets") {
+ val hiveConfs = cfg.getHiveConfigs()
+ assertEquals(2, hiveConfs.count())
+ val hiveConfig = cfg.getHiveConfigByName("transactions")!!
+ assertEquals("parquet", hiveConfig.format)
+ assertEquals("/user/somepath", hiveConfig.uri)
+ assertEquals("transations_daily", hiveConfig.database)
+ assertEquals("transx", hiveConfig.table)
+ }
+
+ it("should retrieve specific configuration from the Hive dataset") {
+ val hiveConfig = cfg.getHiveConfigByName("second_transactions")!!
+ assertEquals("avro", hiveConfig.format)
+ assertEquals("/seconduser/somepath", hiveConfig.uri)
+ assertEquals("transations_monthly", hiveConfig.database)
+ assertEquals("avro_table", hiveConfig.table)
+ }
+
+ it("should retrieve all generic configurations") {
+ val genConfs = cfg.getGenericConfigs()
+ assertEquals(1, genConfs.count())
+ val genConfig = cfg.getGenericConfigByName("mydataset")
+ assertEquals("param1value", genConfig!!.params["param1key"])
+ }
+
+
+ it("should return a null Config for an absent name lookup") {
+ val hiveConf = cfg.getHiveConfigByName("non-existent")
+ assertNull(hiveConf)
+ }
+ }
+
+ given("a ConfigManager file filter") {
+ it("should match the files with the expected pattern") {
+ assertTrue(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("datasets", ".yml")))
+ assertTrue(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("otherdatasets", ".yml")))
+ assertTrue(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("my-datasets", ".yaml")))
+ assertTrue(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("my-DATASETS", ".YAML")))
+ }
+
+ it("should filter files with the unexpected pattern") {
+ assertFalse(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("dataset", ".yml")))
+ assertFalse(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("otherdatasets", ".yml1")))
+ assertFalse(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("my-dataseets", ".yaml")))
+ }
+ }
+})
diff --git a/frameworks/jvm-common/src/test/resources/maki.yml b/frameworks/jvm-common/src/test/resources/maki.yml
new file mode 100644
index 0000000..bcb26e2
--- /dev/null
+++ b/frameworks/jvm-common/src/test/resources/maki.yml
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+job-config: env/{env}/job.yml, env/{env}/passwords.yml, env/{env}/spark.yml
+job-name: amaterasu-test
+flow:
+ - name: start
+ runner:
+ group: spark
+ type: scala
+ exports:
+ odd: parquet
+ - name: step2
+ config: src/{action_name}/{env}/
+ runner:
+ group: spark
+ type: scala
+ file: file2.scala
+...
diff --git a/frameworks/jvm-common/src/test/resources/test_repo/env/test/datasets.yml b/frameworks/jvm-common/src/test/resources/test_repo/env/test/datasets.yml
new file mode 100644
index 0000000..002a2fd
--- /dev/null
+++ b/frameworks/jvm-common/src/test/resources/test_repo/env/test/datasets.yml
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+file:
+ - name: users
+ uri: s3://filestore
+ format: parquet
+ mode: overwrite
+ hello: world
+
+generic:
+ - name: mydataset
+ param1key : param1value
+
+hive:
+ - name: transactions
+ uri: /user/somepath
+ format: parquet
+ database: transations_daily
+ table: transx
+
+ - name: second_transactions
+ uri: /seconduser/somepath
+ format: avro
+ database: transations_monthly
+ table: avro_table
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 1f45aab..e41445d 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -59,6 +59,9 @@
include 'python-pandas'
project(':python-pandas').projectDir=file("frameworks/python/pandas_runtime")
+include 'jvm-common'
+project(':jvm-common').projectDir=file("frameworks/jvm-common")
+
File spekProject = file("../../kotlin/spek")
if (spekProject.exists()) {
// Composite build