Merge pull request #51 from roadan/AMATERASU-30

Amaterasu 30
diff --git a/.gitignore b/.gitignore
index dbb51cb..eaefdef 100755
--- a/.gitignore
+++ b/.gitignore
@@ -50,3 +50,6 @@
 executor/spark-warehouse/
 repo
 repo/**
+
+#python
+.zip
\ No newline at end of file
diff --git a/common/build.gradle b/common/build.gradle
index 245da82..38f2393 100644
--- a/common/build.gradle
+++ b/common/build.gradle
@@ -74,6 +74,10 @@
         exclude group: 'org.jboss.netty'
     }
 
+    compile('com.uchuhimo:konf:0.11') {
+        exclude group: 'org.eclipse.jgit'
+    }
+
     compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
     compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
     
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/ConfigManager.kt
similarity index 97%
rename from leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt
rename to common/src/main/kotlin/org/apache/amaterasu/common/configuration/ConfigManager.kt
index 00b640a..5d317c2 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/ConfigManager.kt
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.leader.common.configuration
+package org.apache.amaterasu.common.configuration
 
 import com.uchuhimo.konf.Config
 import com.uchuhimo.konf.source.yaml.toYaml
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/GenericSpec.kt
similarity index 94%
rename from leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt
rename to common/src/main/kotlin/org/apache/amaterasu/common/configuration/GenericSpec.kt
index ed6fa9e..8299f96 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/GenericSpec.kt
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.leader.common.configuration
+package org.apache.amaterasu.common.configuration
 
 import com.uchuhimo.konf.ConfigSpec
 import com.uchuhimo.konf.OptionalItem
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/Job.kt
similarity index 94%
rename from leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt
rename to common/src/main/kotlin/org/apache/amaterasu/common/configuration/Job.kt
index 076b0af..7a1258b 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/Job.kt
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/configuration/Job.kt
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.leader.common.configuration
+package org.apache.amaterasu.common.configuration
 
 import com.uchuhimo.konf.ConfigSpec
 
diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
index b19f82f..9d83c65 100644
--- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
+++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
@@ -19,6 +19,7 @@
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.BasicPythonRunnerProvider
 import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PandasRunnerProvider
+import org.apache.amaterasu.common.configuration.ConfigManager
 import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
 import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
 import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration
@@ -33,8 +34,10 @@
         get() = "python"
     override val groupResources: Array<File>
         get() = arrayOf()
-    override val driverConfiguration: DriverConfiguration
-        get() = DriverConfiguration(conf!!.taskMem(), 1)
+
+    override fun getDriverConfiguration(configManager: ConfigManager): DriverConfiguration {
+        return DriverConfiguration(conf!!.taskMem(), 1) //TODO: this should be configured on env level
+    }
     override val environmentVariables: Map<String, String>
         get() = mapOf()
     override val configurationItems: Array<String>
diff --git a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip b/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
deleted file mode 100644
index 6541930..0000000
--- a/frameworks/python/pandas_runtime/dist/amaterasu_pandas-0.2.0-incubating-rc4.zip
+++ /dev/null
Binary files differ
diff --git a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip b/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
deleted file mode 100644
index 6e6e599..0000000
--- a/frameworks/python/python_runtime/dist/amaterasu_python-0.2.0-incubating-rc4.zip
+++ /dev/null
Binary files differ
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
index 007dcf0..7a32b9e 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
@@ -17,9 +17,9 @@
 package org.apache.amaterasu.frameworks.spark.dispatcher
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.configuration.ConfigManager
 import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers.PySparkRunnerProvider
 import org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers.SparkSubmitScalaRunnerProvider
-import org.apache.amaterasu.leader.common.utilities.DataLoader
 import org.apache.amaterasu.leader.common.utilities.MemoryFormatParser
 import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
 import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
@@ -32,11 +32,6 @@
     private lateinit var env: String
     private lateinit var conf: ClusterConfig
 
-    private val sparkExecConfigurations: Map<String, Any> by lazy {
-        val execData = DataLoader.getExecutorData(env, conf)
-        execData.configurations["spark"].orEmpty()
-    }
-
     private lateinit var runnerProviders: Map<String, RunnerSetupProvider>
 
     override fun init(env: String, conf: ClusterConfig) {
@@ -72,32 +67,25 @@
     }
 
     override val groupIdentifier: String = "spark"
-    override val configurationItems = arrayOf("sparkConfiguration", "sparkExecutor")
+    override val configurationItems = arrayOf("sparkProperties", "sparkOptions")
 
-    override val driverConfiguration: DriverConfiguration
-        get() {
-            //TODO: Spark configuration sould come for the ENV only
-            val sparkOpts = conf.spark().opts()
-            val cpu: Int = when {
-                sparkExecConfigurations.containsKey("spark.yarn.am.cores") -> sparkExecConfigurations["spark.yarn.am.cores"].toString().toInt()
-                sparkExecConfigurations.containsKey("spark.driver.cores") -> sparkExecConfigurations["spark.driver.cores"].toString().toInt()
-                sparkOpts.contains("yarn.am.cores") -> sparkOpts["yarn.am.cores"].toString().toInt()
-                sparkOpts.contains("driver.cores") -> sparkOpts["driver.cores"].toString().toInt()
-                conf.yarn().worker().cores() > 0 -> conf.yarn().worker().cores()
-                else -> 1
-            }
+    override fun getDriverConfiguration(configManager: ConfigManager): DriverConfiguration {
+        val sparkOptions: Map<String, Any> = configManager.config["sparkOptions"]
+        val sparkProperties: Map<String, Any> = configManager.config["sparkProperties"]
 
-            val mem: Int = when {
-                sparkExecConfigurations.containsKey("spark.yarn.am.memory") -> MemoryFormatParser.extractMegabytes(sparkExecConfigurations["spark.yarn.am.memory"].toString())
-                sparkExecConfigurations.containsKey("spark.driver.memeory") -> MemoryFormatParser.extractMegabytes(sparkExecConfigurations["spark.driver.memeory"].toString())
-                sparkOpts.contains("yarn.am.memory") -> MemoryFormatParser.extractMegabytes(sparkOpts["yarn.am.memory"].get())
-                sparkOpts.contains("driver.memory") -> MemoryFormatParser.extractMegabytes(sparkOpts["driver.memory"].get())
-                conf.yarn().worker().memoryMB() > 0 -> conf.yarn().worker().memoryMB()
-                conf.taskMem() > 0 -> conf.taskMem()
-                else -> 1024
-            }
-            return DriverConfiguration(mem, cpu)
+        val cpu: Int = when {
+            sparkOptions.containsKey("driver-cores") -> sparkOptions["driver-cores"].toString().toInt()
+            sparkProperties.containsKey("spark.yarn.am.cores") -> sparkProperties["spark.yarn.am.cores"].toString().toInt()
+            sparkProperties.containsKey("spark.driver.cores") -> sparkProperties["spark.driver.cores"].toString().toInt()
+            else -> 1
         }
 
-
+        val mem: Int = when {
+            sparkOptions.containsKey("driver-memory") -> MemoryFormatParser.extractMegabytes(sparkOptions["driver-memory"].toString())
+            sparkProperties.containsKey("spark.yarn.am.memory") -> MemoryFormatParser.extractMegabytes(sparkProperties["spark.yarn.am.memory"].toString())
+            sparkProperties.containsKey("spark.driver.memeory") -> MemoryFormatParser.extractMegabytes(sparkProperties["spark.driver.memeory"].toString())
+            else -> 1024
+        }
+        return DriverConfiguration(mem, cpu)
+    }
 }
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
index 1d8e3ce..d716427 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.kt
@@ -20,21 +20,26 @@
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
-import org.apache.amaterasu.leader.common.configuration.Job
+import org.apache.amaterasu.common.configuration.Job
 
 class PySparkRunnerProvider(env: String, conf: ClusterConfig) : PythonRunnerProviderBase(env, conf) {
 
     override fun getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String {
-        val command = super.getCommand(jobId, actionData , env, executorId, callbackAddress)
-        command + " && \$SPARK_HOME/bin/spark-submit --master ${env[Job.master]} " +
+        val sparkProperties: Map<String, Any> = env["sparkProperties"]
+        val sparkOptions: Map<String, Any> = env["sparkOptions"]
+        val command = super.getCommand(jobId, actionData, env, executorId, callbackAddress)
+        val hive  = if (conf.mode() == "yarn") "--files \$SPARK_HOME/conf/hive-site.xml " else ""
+        val master = if (!sparkOptions.containsKey("master")) " --master ${env[Job.master]} " else ""
+
+        return "$command && \$SPARK_HOME/bin/spark-submit $master" +
+                SparkCommandLineHelper.getOptions(sparkOptions) +
+                SparkCommandLineHelper.getProperties(sparkProperties) + " " +
                 "--conf spark.pyspark.python=${conf.pythonPath()} " +
                 "--conf spark.pyspark.driver.python=$virtualPythonPath " +
-                "--files \$SPARK_HOME/conf/hive-site.xml ${actionData.src}"
-
-        return command
+                hive +
+                " ${actionData.src}"
     }
 
-
     override fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> = arrayOf()
 
     override val runnerResources: Array<String>
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkCommandLineHelper.kt
similarity index 65%
copy from leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt
copy to frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkCommandLineHelper.kt
index ed6fa9e..e60eb11 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/GenericSpec.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkCommandLineHelper.kt
@@ -14,12 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.leader.common.configuration
+package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
 
-import com.uchuhimo.konf.ConfigSpec
-import com.uchuhimo.konf.OptionalItem
+object SparkCommandLineHelper {
 
-class GenericSpec(configurationItem: String) {
-    val spec = ConfigSpec()
-    val items = OptionalItem(spec, configurationItem, emptyMap<String, String>())
+    fun getOptions(sparkOptions: Map<String, Any>): String {
+        return sparkOptions.map { "--${it.key} ${it.value}" }.joinToString( separator = " ")
+    }
+
+    fun getProperties(sparkProps: Map<String, Any>): String{
+        return sparkProps.map { "--conf $it" }.joinToString( separator = " ")
+    }
 }
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
index f13e7c9..a607d75 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
@@ -20,17 +20,24 @@
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.utils.ArtifactUtil
-import org.apache.amaterasu.leader.common.configuration.Job
+import org.apache.amaterasu.common.configuration.ConfigManager
+import org.apache.amaterasu.common.configuration.Job
 import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
 
+
 class SparkSubmitScalaRunnerProvider(val conf: ClusterConfig) : RunnerSetupProvider() {
 
     override fun getCommand(jobId: String, actionData: ActionData, env: Config, executorId: String, callbackAddress: String): String {
         val util = ArtifactUtil(listOf(actionData.repo), jobId)
         val classParam = if (actionData.hasArtifact) " --class ${actionData.entryClass}" else ""
+        val sparkProperties: Map<String, Any> = env["sparkProperties"]
+        val sparkOptions: Map<String, Any> = env["sparkOptions"]
+        val master = if (!sparkOptions.containsKey("master")) " --master ${env[Job.master]} " else ""
 
         return "\$SPARK_HOME/bin/spark-submit $classParam ${util.getLocalArtifacts(actionData.artifact).first().name} " +
-                " --master ${env[Job.master]}" +
+                SparkCommandLineHelper.getOptions(sparkOptions) +
+                SparkCommandLineHelper.getProperties(sparkProperties) +
+                master +
                 " --jars spark-runtime-${conf.version()}.jar >&1"
     }
 
@@ -39,6 +46,7 @@
     override fun getActionDependencies(jobId: String, actionData: ActionData): Array<String> = arrayOf()
 
     override val hasExecutor: Boolean = false
+
     override val runnerResources: Array<String> = arrayOf()
 
 }
\ No newline at end of file
diff --git a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
index 377a241..b363058 100644
--- a/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
+++ b/frameworks/spark/pyspark_runtime/dist/amaterasu_pyspark-0.2.0-incubating-rc4.zip
Binary files differ
diff --git a/frameworks/spark/pyspark_runtime/tests/env.yml b/frameworks/spark/pyspark_runtime/tests/env.yml
index 6eabc26..1268c0c 100644
--- a/frameworks/spark/pyspark_runtime/tests/env.yml
+++ b/frameworks/spark/pyspark_runtime/tests/env.yml
@@ -24,7 +24,7 @@
 configuration:
   spark.cassandra.connection.host: 127.0.0.1
   sourceTable: documents
-sparkConfiguration:
+sparkProperties:
   spark.executor.extraJavaOptions: -XX:+PrintGCDetails
   spark.executor.memory: 1g
   spark.driver.memeory: 2g
diff --git a/leader-common/build.gradle b/leader-common/build.gradle
index 0adb406..928a444 100644
--- a/leader-common/build.gradle
+++ b/leader-common/build.gradle
@@ -89,9 +89,7 @@
 
     compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
     compile "org.jetbrains.kotlin:kotlin-reflect"
-    compile('com.uchuhimo:konf:0.11') {
-        exclude group: 'org.eclipse.jgit'
-    }
+
 
     testCompile 'org.jetbrains.spek:spek-api:1.1.5'
     testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
@@ -106,6 +104,9 @@
 sourceSets {
     test {
         resources.srcDirs += [file('src/test/resources')]
+        kotlin {
+            srcDirs = ['src/test/kotlin']
+        }
     }
 }
 
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
index 78e06fa..48f619a 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
@@ -17,6 +17,7 @@
 package org.apache.amaterasu.leader.common.execution.frameworls
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.configuration.ConfigManager
 import org.apache.amaterasu.common.logging.KLogging
 import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
 import org.reflections.Reflections
diff --git a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.scala b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.kt
similarity index 70%
rename from leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.scala
rename to leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.kt
index c9df942..5c32730 100644
--- a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.scala
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/MemoryFormatParser.kt
@@ -17,18 +17,15 @@
 package org.apache.amaterasu.leader.common.utilities
 
 object MemoryFormatParser {
+    fun extractMegabytes(input: String): Int {
+        val lower = input.toLowerCase()
 
-  def extractMegabytes(input: String): Int = {
-    var result: Int = 0
-    val lower = input.toLowerCase
-    if (lower.contains("mb")) {
-      result = lower.replace("mb", "").toInt
-    } else if (lower.contains("gb") | lower.contains("g")) {
-      result = lower.replace("g", "").replace("b", "").toInt * 1024
-    } else {
-      result = lower.toInt
+        return if (lower.contains("mb")) {
+            lower.replace("mb", "").toInt()
+        } else if (lower.contains("gb") || lower.contains("g")) {
+            lower.replace("g", "").replace("b", "").toInt() * 1024
+        } else {
+            lower.toInt()
+        }
     }
-
-    result
-  }
-}
+}
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
index 0e87cae..a7b2e47 100644
--- a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
@@ -16,6 +16,9 @@
  */
 package org.apache.amaterasu.leader.common.configuration
 
+import com.uchuhimo.konf.source.yaml.toYaml
+import org.apache.amaterasu.common.configuration.ConfigManager
+import org.apache.amaterasu.common.configuration.Job
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
@@ -32,38 +35,38 @@
         val repoPath = "${File(marker).parent}/test_repo"
         val cfg = ConfigManager("test", repoPath)
 
-        it("loads the job level environment"){
-            assertEquals(cfg.config[Job.master] , "yarn")
+        it("loads the job level environment") {
+            assertEquals(cfg.config[Job.master], "yarn")
         }
 
         on("getting an env for an action with default path") {
             val startConf = cfg.getActionConfiguration("start")
-            it("loads the specific configuration defined in the actions folder"){
-                assertEquals(startConf[Job.master] , "mesos")
+            it("loads the specific configuration defined in the actions folder") {
+                assertEquals(startConf[Job.master], "mesos")
             }
         }
 
-        on("getting an env for an action with a conf: property in the maki.yml"){
+        on("getting an env for an action with a conf: property in the maki.yml") {
             val step2conf = cfg.getActionConfiguration("step2", "src/{action_name}/{env}/")
 
-            it("loads the specific configuration defined in the actions folder"){
-                assertEquals(step2conf[Job.name] , "test2")
+            it("loads the specific configuration defined in the actions folder") {
+                assertEquals(step2conf[Job.name], "test2")
             }
         }
 
-        on("getting an env for an action with no action level config"){
+        on("getting an env for an action with no action level config") {
             val step3conf = cfg.getActionConfiguration("step3")
 
-            it("loads only the job level conf"){
-                assertEquals(step3conf[Job.name] , "test")
+            it("loads only the job level conf") {
+                assertEquals(step3conf[Job.name], "test")
             }
         }
 
-        on("receiving a path to a specific file" ){
+        on("receiving a path to a specific file") {
             val step4conf = cfg.getActionConfiguration("step4", "src/start/env/{env}/job.yml")
 
-            it("loads the specific configuration from the file"){
-                assertEquals(step4conf[Job.master] , "mesos")
+            it("loads the specific configuration from the file") {
+                assertEquals(step4conf[Job.master], "mesos")
             }
         }
 
@@ -73,11 +76,27 @@
     given("a ConfigManager for a job with spark framework") {
 
         val repoPath = "${File(marker).parent}/spark_repo"
-        val cfg = ConfigManager("test", repoPath, listOf("sparkConfiguration"))
+        val cfg = ConfigManager("test", repoPath, listOf("sparkProperties"))
 
-        it("load the framework configuration for spark"){
-            val spark: Map<String, String> = cfg.config["sparkConfiguration"]
-            assertEquals(spark["spark.executor.memory"], "1g")
+        val actionConf = cfg.getActionConfiguration("")
+
+        it("load the framework configuration for spark") {
+            val spark: Map<String, Any> = actionConf["sparkProperties"]
+            assertEquals(spark["spark.executor.memory"].toString(), "1g")
         }
+
+        it("loads int values as strings") {
+            val spark: Map<String, Any> = actionConf["sparkProperties"]
+            val x = spark.map { "--conf $it" }.joinToString(separator = " ")
+            println(x)
+            assertEquals(spark["spark.driver.cores"].toString(), "2")
+        }
+
+//        //TODO: Create spark specific tests
+////        it("should be converted correctly to spark-submit parameters") {
+////            val spark: Map<String, Any> = cfg.config["sparkProperties"]
+////            val x = spark.map { "--conf $it" }.joinToString(separator = " ")
+////            println(x)
+////        }
     }
 })
\ No newline at end of file
diff --git a/leader-common/src/test/resources/spark_repo/env/test/spark.yml b/leader-common/src/test/resources/spark_repo/env/test/spark.yml
index 85f1431..983b5e2 100644
--- a/leader-common/src/test/resources/spark_repo/env/test/spark.yml
+++ b/leader-common/src/test/resources/spark_repo/env/test/spark.yml
@@ -1,3 +1,4 @@
-sparkConfiguration:
+sparkProperties:
     spark.executor.extraJavaOptions: -XX:+PrintGCDetails
-    spark.executor.memory: 1g
\ No newline at end of file
+    spark.executor.memory: 1g
+    spark.driver.cores: 2
\ No newline at end of file
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
index a53ee77..35f3800 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -20,7 +20,6 @@
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
 import com.fasterxml.jackson.module.kotlin.KotlinModule
 
-import kotlinx.coroutines.async
 import kotlinx.coroutines.runBlocking
 
 import org.apache.activemq.broker.BrokerService
@@ -29,7 +28,7 @@
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.logging.KLogging
 import org.apache.amaterasu.common.utils.ActiveNotifier
-import org.apache.amaterasu.leader.common.configuration.ConfigManager
+import org.apache.amaterasu.common.configuration.ConfigManager
 import org.apache.amaterasu.leader.common.execution.JobLoader
 import org.apache.amaterasu.leader.common.execution.JobManager
 import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
@@ -156,7 +155,7 @@
 
                 notifier.info("requesting container fo ${actionData.name}")
                 val frameworkProvider = frameworkFactory.getFramework(actionData.groupId)
-                val driverConfiguration = frameworkProvider.driverConfiguration
+                val driverConfiguration = frameworkProvider.getDriverConfiguration(configManager)
 
                 var mem: Long = driverConfiguration.memory.toLong()
                 mem = Math.min(mem, maxMem)
@@ -242,7 +241,6 @@
                         val runnerProvider = framework.getRunnerProvider(actionData.typeId)
                         val ctx = Records.newRecord(ContainerLaunchContext::class.java)
 
-
                         val envConf = configManager.getActionConfiguration(actionData.name, actionData.config)
                         val commands: List<String> = listOf(runnerProvider.getCommand(jobManager.jobId, actionData, envConf, "${actionData.id}-${container.id.containerId}", address))
 
@@ -258,7 +256,7 @@
                         jobManager.actionStarted(actionData.id)
                         containersIdsToTask[container.id.containerId] = actionData
                         notifier.info("created container for ${actionData.name} created")
-                        //ctx.localResources.forEach { t: String, u: LocalResource -> notifier.info("resource: $t = ${u.resource}") }
+
                         log.info("launching container succeeded: ${container.id.containerId}; task: ${actionData.id}")
                     } catch (e: Exception) {
                         notifier.error("", "error launching container with ${e.message} in ${ExceptionUtils.getStackTrace(e)}")
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index a2f470a..866d9e2 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -26,12 +26,11 @@
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.configuration.{ClusterConfig, ConfigManager}
 import org.apache.amaterasu.common.configuration.enums.ActionStatus
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.common.execution.actions.Notification
 import org.apache.amaterasu.common.execution.actions.enums.{NotificationLevel, NotificationType}
-import org.apache.amaterasu.leader.common.configuration.ConfigManager
 import org.apache.amaterasu.leader.common.execution.{JobLoader, JobManager}
 import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
 import org.apache.amaterasu.leader.common.utilities.DataLoader
@@ -177,6 +176,11 @@
         try {
           val actionData = jobManager.getNextActionData
           if (actionData != null) {
+
+            frameworkFactory = FrameworkProvidersFactory(env, config)
+            val items = frameworkFactory.providers.values.flatMap(_.getConfigurationItems).toList.asJava
+            configManager = new ConfigManager(env, "repo", items)
+
             val taskId = Protos.TaskID.newBuilder().setValue(actionData.getId).build()
             taskIdsToActions.put(taskId, actionData.getName)
             // setting up the configuration files for the container
@@ -203,12 +207,11 @@
             val slaveActions = executionMap(offer.getSlaveId.toString)
             slaveActions.put(taskId.getValue, ActionStatus.Started)
 
-            log.info(s">>>> Framework: ${actionData.getGroupId}")
             val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
-            log.info(s">>>> Runner: ${actionData.getTypeId}")
 
             val runnerProvider = frameworkProvider.getRunnerProvider(actionData.getTypeId)
 
+            printNotification(new Notification("", s"provider ${runnerProvider.getClass.getName}", NotificationType.Info, NotificationLevel.Execution))
             // searching for an executor that already exist on the slave, if non exist
             // we create a new one
             var executor: ExecutorInfo = null
@@ -252,8 +255,7 @@
                 .setExecutable(false)
                 .setExtract(false)
                 .build())
-            }
-            )
+            })
 
             // Getting action dependencies
             runnerProvider.getActionDependencies(jobManager.getJobId, actionData).foreach(r => {
@@ -315,7 +317,7 @@
             slavesExecutors.put(offer.getSlaveId.getValue, executor)
 
 
-            val driverConfiguration = frameworkProvider.getDriverConfiguration
+            val driverConfiguration = frameworkProvider.getDriverConfiguration(configManager)
 
             var actionTask: TaskInfo = null
 
@@ -413,9 +415,8 @@
 
     }
 
-    frameworkFactory = FrameworkProvidersFactory(env, config)
-    val items = frameworkFactory.providers.values.flatMap(_.getConfigurationItems).toList.asJava
-    configManager = new ConfigManager(env, "repo", items)
+
+
 
     jobManager.start()
 
diff --git a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt
index d54c422..f68922e 100644
--- a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt
+++ b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt
@@ -17,6 +17,7 @@
 package org.apache.amaterasu.sdk.frameworks
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.configuration.ConfigManager
 import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration
 
 import java.io.File
@@ -27,8 +28,6 @@
 
     val groupResources: Array<File>
 
-    val driverConfiguration: DriverConfiguration
-
     val environmentVariables: Map<String, String>
 
     val configurationItems: Array<String>
@@ -37,4 +36,5 @@
 
     fun getRunnerProvider(runnerId: String): RunnerSetupProvider
 
+    fun getDriverConfiguration(configManager: ConfigManager): DriverConfiguration
 }
\ No newline at end of file
diff --git a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip b/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
deleted file mode 100644
index 9c18986..0000000
--- a/sdk_python/dist/amaterasu-sdk-0.2.0-incubating-rc4.zip
+++ /dev/null
Binary files differ