Merge pull request #53 from arunma/AMA-52-Revised

[AMATERASU-52] Read configuration from datasets.yaml for File, Hive a…
diff --git a/frameworks/jvm-common/build.gradle b/frameworks/jvm-common/build.gradle
new file mode 100644
index 0000000..8747571
--- /dev/null
+++ b/frameworks/jvm-common/build.gradle
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+buildscript {
+    ext.kotlin_version = '1.3.21'
+
+    repositories {
+        mavenCentral()
+        maven {
+            url 'http://repository.jetbrains.com/all'
+        }
+        maven {
+            url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots"
+        }
+    }
+
+    dependencies {
+        classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
+        classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0'
+    }
+}
+
+apply plugin: 'kotlin'
+apply plugin: 'java'
+apply plugin: 'org.junit.platform.gradle.plugin'
+
+junitPlatform {
+    filters {
+        engines {
+            include 'spek'
+        }
+    }
+}
+
+sourceCompatibility = 1.8
+targetCompatibility = 1.8
+
+repositories {
+    mavenCentral()
+    jcenter()
+    maven { url "https://plugins.gradle.org/m2/" }
+    maven { url 'http://repository.jetbrains.com/all' }
+    maven { url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots" }
+    maven { url "http://dl.bintray.com/jetbrains/spek" }
+    maven { url "http://oss.jfrog.org/artifactory/oss-snapshot-local" }
+
+
+}
+
+dependencies {
+    testCompile group: 'junit', name: 'junit', version: '4.11'
+    compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
+    compile "org.jetbrains.kotlin:kotlin-reflect"
+    compile('com.uchuhimo:konf:0.11') {
+        exclude group: 'org.eclipse.jgit'
+    }
+
+    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
+
+    testCompile 'org.jetbrains.spek:spek-api:1.1.5'
+    testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
+    testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5'
+
+    // spek requires kotlin-reflect, can be omitted if already in the classpath
+    testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
+
+
+    testImplementation 'org.junit.platform:junit-platform-runner:1.0.0'
+    testImplementation 'org.junit.platform:junit-platform-launcher:1.0.0'
+    testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.0.0'
+}
+
+sourceSets {
+    test {
+        resources.srcDirs += [file('src/test/resources')]
+    }
+}
+
+compileKotlin {
+    kotlinOptions {
+        jvmTarget = "1.8"
+    }
+}
+compileTestKotlin {
+    kotlinOptions {
+        jvmTarget = "1.8"
+    }
+}
diff --git a/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfig.kt b/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfig.kt
new file mode 100644
index 0000000..000d38d
--- /dev/null
+++ b/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfig.kt
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amaterasu.frameworks.jvm.common.configuration.dataset
+
+import com.fasterxml.jackson.annotation.*
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+data class DataSetConfig(
+        val file: Array<FileConfig> = emptyArray(),
+        val hive: Array<HiveConfig> = emptyArray(),
+        val generic: Array<GenericConfig> = emptyArray()
+)
+
+abstract class Config{
+    abstract val name: String
+    abstract val params: MutableMap<String, String>
+}
+
+data class GenericConfig(override val name: String,
+                         @JsonAnySetter override val params: MutableMap<String, String> = mutableMapOf()) : Config()
+
+data class FileConfig(
+        override val name: String,
+        @JsonAnySetter override val params: MutableMap<String, String> = mutableMapOf(),
+        val uri: String,
+        val format: String,
+        val mode: String
+) : Config()
+
+data class HiveConfig(
+        override val name: String,
+        @JsonAnySetter override val params: MutableMap<String, String> = mutableMapOf(),
+        val uri: String,
+        val format: String,
+        val database: String,
+        val table: String
+) : Config()
diff --git a/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManager.kt b/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManager.kt
new file mode 100644
index 0000000..89852e0
--- /dev/null
+++ b/frameworks/jvm-common/src/main/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManager.kt
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.frameworks.jvm.common.configuration.dataset
+
+import com.fasterxml.jackson.core.JsonParser
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import com.fasterxml.jackson.module.kotlin.KotlinModule
+import java.io.File
+import java.io.FileInputStream
+
+class DataSetConfigManager(env: String, repoPath: String) {
+
+    private val envFolder = "$repoPath/env/$env"
+    private val mapper = let {
+        val mapper = ObjectMapper(YAMLFactory())
+        mapper.registerModule(KotlinModule())
+        mapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
+        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.OBJECT_AND_NON_CONCRETE)
+        mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_CONCRETE_AND_ARRAYS)
+        mapper
+    }
+
+    var config: DataSetConfig
+
+    init {
+        config = File(envFolder)
+                .listFiles(DATASET_YAML_FILE_FILTER)
+                .fold(DataSetConfig()) { root, file -> mergeConfig(root, parseConfigs(file)) }
+    }
+
+    private fun mergeConfig(acc: DataSetConfig?, curr: DataSetConfig?): DataSetConfig {
+        return if (acc != null && curr != null) {
+            acc.copy(file = acc.file.plus(curr.file),
+                    hive = acc.hive.plus(curr.hive),
+                    generic = acc.generic.plus(curr.generic)
+            )
+
+        } else if (acc != null) {
+            acc
+        } else curr!!
+    }
+
+    private inline fun parseConfigs(file: File?): DataSetConfig? {
+        FileInputStream(file).use { iStream ->
+            return mapper.readValue(iStream, DataSetConfig::class.java)
+        }
+    }
+
+    fun getFileConfigs(): List<FileConfig> {
+        return config.file.asList()
+    }
+
+    fun getHiveConfigs(): List<HiveConfig> {
+        return config.hive.asList()
+    }
+
+    fun getGenericConfigs(): List<GenericConfig> {
+        return config.generic.asList()
+    }
+
+    fun getFileConfigByName(name: String): FileConfig? {
+        return getFileConfigs().find { conf -> conf.name == name }
+    }
+
+    fun getHiveConfigByName(name: String): HiveConfig? {
+        return getHiveConfigs().find { conf -> conf.name == name }
+    }
+
+    fun getGenericConfigByName(name: String): GenericConfig? {
+        return getGenericConfigs().find { conf -> conf.name == name }
+    }
+
+    companion object {
+        private val DATASET_FILE_PATTERN = """(datasets)(.*)+(.yml|.yaml)$""".toRegex(RegexOption.IGNORE_CASE)
+        val DATASET_YAML_FILE_FILTER = { file: File ->
+            (file.isFile && DATASET_FILE_PATTERN.find(file.name)?.value?.isNotBlank() ?: false)
+        }
+    }
+}
diff --git a/frameworks/jvm-common/src/test/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManagerTests.kt b/frameworks/jvm-common/src/test/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManagerTests.kt
new file mode 100644
index 0000000..3ea5d20
--- /dev/null
+++ b/frameworks/jvm-common/src/test/kotlin/org/apache/amaterasu/frameworks/jvm/common/configuration/dataset/DataSetConfigManagerTests.kt
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.frameworks.jvm.common.configuration.dataset
+
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import java.io.File
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertNull
+import kotlin.test.assertTrue
+
+object DataSetConfigManagerTests : Spek({
+
+    val marker = DataSetConfigManagerTests::class.java.getResource("/maki.yml")!!.path
+
+    given("a ConfigManager for configuration for datasets") {
+        val repoPath = "${File(marker).parent}/test_repo"
+        val cfg = DataSetConfigManager("test", repoPath)
+
+        it("should retrieve all configurations for File datasets") {
+            val fileConfigs = cfg.getFileConfigs()
+            assertEquals(1, fileConfigs.size)
+            assertEquals("users", fileConfigs[0].name)
+            assertEquals("parquet", fileConfigs[0].format)
+            assertEquals("s3://filestore", fileConfigs[0].uri)
+            assertEquals("overwrite", fileConfigs[0].mode)
+        }
+
+        it("should retrieve all configurations for the Hive datasets") {
+            val hiveConfs = cfg.getHiveConfigs()
+            assertEquals(2, hiveConfs.count())
+            val hiveConfig = cfg.getHiveConfigByName("transactions")!!
+            assertEquals("parquet", hiveConfig.format)
+            assertEquals("/user/somepath", hiveConfig.uri)
+            assertEquals("transations_daily", hiveConfig.database)
+            assertEquals("transx", hiveConfig.table)
+        }
+
+        it("should retrieve specific configuration from the Hive dataset") {
+            val hiveConfig = cfg.getHiveConfigByName("second_transactions")!!
+            assertEquals("avro", hiveConfig.format)
+            assertEquals("/seconduser/somepath", hiveConfig.uri)
+            assertEquals("transations_monthly", hiveConfig.database)
+            assertEquals("avro_table", hiveConfig.table)
+        }
+
+        it("should retrieve all generic configurations") {
+            val genConfs = cfg.getGenericConfigs()
+            assertEquals(1, genConfs.count())
+            val genConfig = cfg.getGenericConfigByName("mydataset")
+            assertEquals("param1value", genConfig!!.params["param1key"])
+        }
+
+
+        it("should return a null Config for an absent name lookup") {
+            val hiveConf = cfg.getHiveConfigByName("non-existent")
+            assertNull(hiveConf)
+        }
+    }
+
+    given("a ConfigManager file filter") {
+        it("should match the files with the expected pattern") {
+            assertTrue(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("datasets", ".yml")))
+            assertTrue(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("otherdatasets", ".yml")))
+            assertTrue(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("my-datasets", ".yaml")))
+            assertTrue(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("my-DATASETS", ".YAML")))
+        }
+
+        it("should filter files with the unexpected pattern") {
+            assertFalse(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("dataset", ".yml")))
+            assertFalse(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("otherdatasets", ".yml1")))
+            assertFalse(DataSetConfigManager.DATASET_YAML_FILE_FILTER(File.createTempFile("my-dataseets", ".yaml")))
+        }
+    }
+})
diff --git a/frameworks/jvm-common/src/test/resources/maki.yml b/frameworks/jvm-common/src/test/resources/maki.yml
new file mode 100644
index 0000000..bcb26e2
--- /dev/null
+++ b/frameworks/jvm-common/src/test/resources/maki.yml
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+job-config: env/{env}/job.yml, env/{env}/passwords.yml, env/{env}/spark.yml
+job-name:    amaterasu-test
+flow:
+  - name: start
+    runner:
+      group: spark
+      type: scala
+    exports:
+      odd: parquet
+  - name: step2
+    config: src/{action_name}/{env}/
+    runner:
+      group: spark
+      type: scala
+    file: file2.scala
+...
diff --git a/frameworks/jvm-common/src/test/resources/test_repo/env/test/datasets.yml b/frameworks/jvm-common/src/test/resources/test_repo/env/test/datasets.yml
new file mode 100644
index 0000000..002a2fd
--- /dev/null
+++ b/frameworks/jvm-common/src/test/resources/test_repo/env/test/datasets.yml
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+file:
+  - name: users
+    uri: s3://filestore
+    format: parquet
+    mode: overwrite
+    hello: world
+
+generic:
+  - name: mydataset
+    param1key : param1value
+
+hive:
+  - name: transactions
+    uri: /user/somepath
+    format: parquet
+    database: transations_daily
+    table: transx
+
+  - name: second_transactions
+    uri: /seconduser/somepath
+    format: avro
+    database: transations_monthly
+    table: avro_table
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 1f45aab..e41445d 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -59,6 +59,9 @@
 include 'python-pandas'
 project(':python-pandas').projectDir=file("frameworks/python/pandas_runtime")
 
+include 'jvm-common'
+project(':jvm-common').projectDir=file("frameworks/jvm-common")
+
 File spekProject = file("../../kotlin/spek")
 if (spekProject.exists()) {
     // Composite build