moved data loader to kotlin
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ExecData.kt
similarity index 74%
copy from common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
copy to common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ExecData.kt
index d16d6f8..3a2e717 100644
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ExecData.kt
@@ -16,7 +16,8 @@
  */
 package org.apache.amaterasu.common.dataobjects
 
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies}
+import org.apache.amaterasu.common.execution.dependencise.Dependencies
+import org.apache.amaterasu.common.execution.dependencise.PythonDependencies
 import org.apache.amaterasu.common.runtime.Environment
 
-case class ExecData(env: Environment, deps: Dependencies, pyDeps: PythonDependencies, configurations: Map[String, Map[String, Any]])
+data class ExecData(val env: Environment, val deps: Dependencies, val pyDeps: PythonDependencies, val configurations: Map<String, Map<String, Any>>)
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/TaskData.kt
similarity index 79%
rename from common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
rename to common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/TaskData.kt
index d16d6f8..53a5e20 100644
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/TaskData.kt
@@ -16,7 +16,6 @@
  */
 package org.apache.amaterasu.common.dataobjects
 
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies}
 import org.apache.amaterasu.common.runtime.Environment
 
-case class ExecData(env: Environment, deps: Dependencies, pyDeps: PythonDependencies, configurations: Map[String, Map[String, Any]])
+data class TaskData(val src: String, val env: Environment, val groupId: String, val typeId: String, val exports: Map<String, String>)
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/Dependencies.kt
similarity index 70%
copy from common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
copy to common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/Dependencies.kt
index d16d6f8..3968885 100644
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/Dependencies.kt
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.common.dataobjects
+package org.apache.amaterasu.common.execution.dependencise
 
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies}
-import org.apache.amaterasu.common.runtime.Environment
+import org.apache.amaterasu.common.dataobjects.Artifact
+import org.apache.amaterasu.common.dataobjects.Repo
 
-case class ExecData(env: Environment, deps: Dependencies, pyDeps: PythonDependencies, configurations: Map[String, Map[String, Any]])
+data class Dependencies(val repos: List<Repo>, val artifacts: List<Artifact>)
\ No newline at end of file
diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/PythonDependencies.kt b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/PythonDependencies.kt
new file mode 100644
index 0000000..41ea991
--- /dev/null
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/PythonDependencies.kt
@@ -0,0 +1,3 @@
+package org.apache.amaterasu.common.execution.dependencise
+
+data class PythonDependencies(val packages: List<PythonPackage>)
\ No newline at end of file
diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/PythonPackage.kt b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/PythonPackage.kt
new file mode 100644
index 0000000..e5bfa90
--- /dev/null
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencise/PythonPackage.kt
@@ -0,0 +1,3 @@
+package org.apache.amaterasu.common.execution.dependencise
+
+data class PythonPackage(val packageId: String, val index: String? = null, val channel: String? = null)
\ No newline at end of file
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala b/common/src/main/kotlin/org/apache/amaterasu/common/runtime/Environment.kt
similarity index 70%
copy from common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
copy to common/src/main/kotlin/org/apache/amaterasu/common/runtime/Environment.kt
index d16d6f8..8f3ede2 100644
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/runtime/Environment.kt
@@ -14,9 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.common.dataobjects
+package org.apache.amaterasu.common.runtime
 
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies}
-import org.apache.amaterasu.common.runtime.Environment
+data class Environment (
+        var name: String = "",
+        var master: String = "",
+        var inputRootPath: String = "",
+        var outputRootPath: String = "",
+        var workingDir: String = "",
 
-case class ExecData(env: Environment, deps: Dependencies, pyDeps: PythonDependencies, configurations: Map[String, Map[String, Any]])
+        var configuration: Map<String, String> )
\ No newline at end of file
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala
deleted file mode 100644
index a745581..0000000
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.dataobjects
-
-import org.apache.amaterasu.common.runtime.Environment
-
-
-/* TODO: Future eyal and yaniv - The TaskData class should support overriding configurations for execData configurations
-// more specifiably, if execData holds configurations for spark setup (vcores/memory) a task should be able to override those
-*/
-case class TaskData(src: String, env: Environment, groupId: String, typeId: String, exports: Map[String, String])
diff --git a/common/src/main/scala/org/apache/amaterasu/common/execution/dependencies/Dependencies.scala b/common/src/main/scala/org/apache/amaterasu/common/execution/dependencies/Dependencies.scala
deleted file mode 100755
index 855262b..0000000
--- a/common/src/main/scala/org/apache/amaterasu/common/execution/dependencies/Dependencies.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution.dependencies
-
-import scala.collection.mutable.ListBuffer
-
-case class Dependencies(repos: ListBuffer[Repo], artifacts: List[Artifact])
-case class PythonDependencies(packages: List[PythonPackage])
-case class Repo(id: String, `type`: String, url: String)
-case class Artifact(groupId: String, artifactId: String, version: String)
-case class PythonPackage(packageId: String, index: Option[String] = None, channel: Option[String] = None) // Not really sure about this, basically I want default values but the ObjectMapper apparently doesn't support them
\ No newline at end of file
diff --git a/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala b/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala
index f49d8ad..fa65cbe 100755
--- a/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala
@@ -14,17 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.common.runtime
-
-case class Environment() {
-
-  var name: String = ""
-  var master: String = ""
-
-  var inputRootPath: String = ""
-  var outputRootPath: String = ""
-  var workingDir: String = ""
-
-  var configuration: Map[String, String] = _
-
-}
\ No newline at end of file
+//package org.apache.amaterasu.common.runtime
+//
+//case class Environment() {
+//
+//  var name: String = ""
+//  var master: String = ""
+//
+//  var inputRootPath: String = ""
+//  var outputRootPath: String = ""
+//  var workingDir: String = ""
+//
+//  var configuration: Map[String, String] = _
+//
+//}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
index d7211a2..5325b82 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
@@ -103,11 +103,11 @@
         .setTaskId(taskInfo.getTaskId)
         .setState(TaskState.TASK_RUNNING).build()
       driver.sendStatusUpdate(status)
-      val runner = providersFactory.getRunner(taskData.groupId, taskData.typeId)
+      val runner = providersFactory.getRunner(taskData.getGroupId, taskData.getTypeId)
       runner match {
-        case Some(r) => r.executeSource(taskData.src, actionName, taskData.exports.asJava)
+        case Some(r) => r.executeSource(taskData.getSrc, actionName, taskData.getExports)
         case None =>
-          notifier.error("", s"Runner not found for group: ${taskData.groupId}, type ${taskData.typeId}. Please verify the tasks")
+          notifier.error("", s"Runner not found for group: ${taskData.getGroupId}, type ${taskData.getTypeId}. Please verify the tasks")
           None
       }
 
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
index dc5ebbf..6efcf9e 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
@@ -38,11 +38,11 @@
   var providersFactory: ProvidersFactory = _
 
   def execute(): Unit = {
-    val runner = providersFactory.getRunner(taskData.groupId, taskData.typeId)
+    val runner = providersFactory.getRunner(taskData.getGroupId, taskData.getTypeId)
     runner match {
       case Some(r) => {
         try {
-          r.executeSource(taskData.src, actionName, taskData.exports.asJava)
+          r.executeSource(taskData.getSrc, actionName, taskData.getExports)
           log.info("Completed action")
           System.exit(0)
         } catch {
@@ -53,7 +53,7 @@
         }
       }
       case None =>
-        log.error("", s"Runner not found for group: ${taskData.groupId}, type ${taskData.typeId}. Please verify the tasks")
+        log.error("", s"Runner not found for group: ${taskData.getGroupId}, type ${taskData.getTypeId}. Please verify the tasks")
         System.exit(101)
     }
   }
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
index 84077e2..54b5b7b 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
@@ -39,12 +39,14 @@
 
   private def loadSparkConfig: mutable.Map[String, Any] = {
 
+    println(s"===> env=$env")
+
     val execData = DataLoader.getExecutorData(env, conf)
-    val sparkExecConfiguration = execData.configurations.get("spark")
+    val sparkExecConfiguration = execData.getConfigurations.get("spark")
     if (sparkExecConfiguration.isEmpty) {
       throw new Exception(s"Spark configuration files could not be loaded for the environment $env")
     }
-    collection.mutable.Map(sparkExecConfiguration.get.toSeq: _*)
+    collection.mutable.Map(sparkExecConfiguration.toSeq: _*)
 
   }
 
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
index a48aaa0..053e104 100644
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
@@ -22,9 +22,8 @@
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ExecData
 import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies, PythonPackage}
+import org.apache.amaterasu.common.execution.dependencise.{Dependencies, PythonDependencies, PythonPackage}
 import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.frameworks.spark.runner.repl.{SparkRunnerHelper, SparkScalaRunner}
 import org.apache.amaterasu.frameworks.spark.runner.sparksql.SparkSqlRunner
 import org.apache.amaterasu.frameworks.spark.runner.pyspark.PySparkRunner
 import org.apache.amaterasu.frameworks.spark.runner.repl.{SparkRunnerHelper, SparkScalaRunner}
@@ -46,8 +45,8 @@
     (e: String) => log.error(e)
 
   )
-  private var conf: Option[Map[String, Any]] = _
-  private var executorEnv: Option[Map[String, Any]] = _
+  private var conf: Map[String, AnyRef] = _
+  private var executorEnv: Map[String, AnyRef] = _
   private var clusterConfig: ClusterConfig = _
 
   override def init(execData: ExecData,
@@ -65,24 +64,24 @@
     clusterConfig = config
     var jars = Seq.empty[String]
 
-    if (execData.deps != null) {
-      jars ++= getDependencies(execData.deps)
+    if (execData.getDeps != null) {
+      jars ++= getDependencies(execData.getDeps)
     }
 
-    if (execData.pyDeps != null &&
-      execData.pyDeps.packages.nonEmpty) {
-      loadPythonDependencies(execData.pyDeps, notifier)
+    if (execData.getPyDeps != null &&
+      execData.getPyDeps.getPackages.nonEmpty) {
+      loadPythonDependencies(execData.getPyDeps, notifier)
     }
 
-    conf = execData.configurations.get("spark")
-    executorEnv = execData.configurations.get("spark_exec_env")
+    conf = execData.getConfigurations.get("spark").toMap
+    executorEnv = execData.getConfigurations.get("spark_exec_env").toMap
     val sparkAppName = s"job_${jobId}_executor_$executorId"
 
     SparkRunnerHelper.notifier = notifier
-    val spark = SparkRunnerHelper.createSpark(execData.env, sparkAppName, jars, conf, executorEnv, config, hostName)
+    val spark = SparkRunnerHelper.createSpark(execData.getEnv, sparkAppName, jars, Some(conf), Some(executorEnv), config, hostName)
 
-    lazy val sparkScalaRunner = SparkScalaRunner(execData.env, jobId, spark, outStream, notifier, jars)
-    sparkScalaRunner.initializeAmaContext(execData.env)
+    lazy val sparkScalaRunner = SparkScalaRunner(execData.getEnv, jobId, spark, outStream, notifier, jars)
+    sparkScalaRunner.initializeAmaContext(execData.getEnv)
 
     runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner)
     var pypath = ""
@@ -93,19 +92,19 @@
       case "mesos" =>
         pypath = s"${new File(".").getAbsolutePath}/miniconda/pkgs:${new File(".").getAbsolutePath}"
     }
-    lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, pypath, execData.pyDeps, config)
+    lazy val pySparkRunner = PySparkRunner(execData.getEnv, jobId, notifier, spark, pypath, execData.getPyDeps, config)
     runners.put(pySparkRunner.getIdentifier, pySparkRunner)
 
-    lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark)
+    lazy val sparkSqlRunner = SparkSqlRunner(execData.getEnv, jobId, notifier, spark)
     runners.put(sparkSqlRunner.getIdentifier, sparkSqlRunner)
   }
 
   private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
-    val channel = pythonPackage.channel.getOrElse("anaconda")
+    val channel = pythonPackage.getChannel
     if (channel == "anaconda") {
-      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.packageId}") ! shellLoger
+      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.getPackageId}") ! shellLoger
     } else {
-      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.packageId}") ! shellLoger
+      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.getPackageId}") ! shellLoger
     }
   }
 
@@ -124,12 +123,12 @@
   private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
     notifier.info("loading anaconda evn")
     installAnacondaOnNode()
-    val codegenPackage = PythonPackage("codegen", channel = Option("auto"))
+    val codegenPackage = new PythonPackage( "codegen", "", "auto")
     installAnacondaPackage(codegenPackage)
     try {
       // notifier.info("loadPythonDependencies #5")
-      deps.packages.foreach(pack => {
-        pack.index.getOrElse("anaconda").toLowerCase match {
+      deps.getPackages.foreach(pack => {
+        pack.getIndex.toLowerCase match {
           case "anaconda" => installAnacondaPackage(pack)
           // case "pypi" => installPyPiPackage(pack)
         }
@@ -157,18 +156,18 @@
     // adding a local repo because Aether needs one
     val repo = new File(System.getProperty("java.io.tmpdir"), "ama-repo")
 
-    val remotes = deps.repos.map(r =>
+    val remotes = deps.getRepos.map(r =>
       new RemoteRepository(
-        r.id,
-        r.`type`,
-        r.url
+        r.getId,
+        r.getType,
+        r.getUrl
       )).toList.asJava
 
     val aether = new Aether(remotes, repo)
 
-    deps.artifacts.flatMap(a => {
+    deps.getArtifacts.flatMap(a => {
       aether.resolve(
-        new DefaultArtifact(a.groupId, a.artifactId, "", "jar", a.version),
+        new DefaultArtifact(a.getGroupId, a.getArtifactId, "", "jar", a.getVersion),
         JavaScopes.RUNTIME
       ).map(a => a)
     }).map(x => x.getFile.getAbsolutePath)
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
index f673fc5..89e76cb 100644
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
@@ -21,7 +21,7 @@
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.PythonDependencies
+import org.apache.amaterasu.common.execution.dependencise.PythonDependencies
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.common.runtime.Environment
 import org.apache.amaterasu.sdk.AmaterasuRunner
@@ -101,8 +101,8 @@
     PySparkEntryPoint.start(spark, jobId, env, SparkEnv.get)
     val port = PySparkEntryPoint.getPort
     var intpPath = ""
-    if (env.configuration.contains("cwd")) {
-      val cwd = new File(env.configuration("cwd"))
+    if (env.getConfiguration.containsKey("cwd")) {
+      val cwd = new File(env.getConfiguration.get("cwd"))
       intpPath = s"${cwd.getAbsolutePath}/spark_intp.py" // This is to support test environment
     } else {
       intpPath = s"spark_intp.py"
@@ -132,7 +132,7 @@
         var pysparkPython = "/usr/bin/python"
 
         if (pyDeps != null &&
-          pyDeps.packages.nonEmpty) {
+          !pyDeps.getPackages.isEmpty) {
           pysparkPython = "./miniconda/bin/python"
         }
         val proc = Process(sparkCmd, None,
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala
index 588d273..450a89a 100644
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala
@@ -128,10 +128,10 @@
       .set("spark.submit.pyFiles", pyfiles.mkString(","))
 
 
-    val master: String = if (env.master.isEmpty) {
+    val master: String = if (env.getMaster.isEmpty) {
       "yarn"
     } else {
-      env.master
+      env.getMaster
     }
 
     config.mode match {
@@ -139,7 +139,7 @@
       case "mesos" =>
         conf.set("spark.executor.uri", s"http://$getNode:${config.Webserver.Port}/spark-${config.Webserver.sparkVersion}.tgz")
           .setJars(jars)
-          .set("spark.master", env.master)
+          .set("spark.master", env.getMaster)
           .set("spark.home", s"${scala.reflect.io.File(".").toCanonical.toString}/spark-${config.Webserver.sparkVersion}")
 
       case "yarn" =>
@@ -196,7 +196,7 @@
 
     sparkSession = SparkSession.builder
       .appName(sparkAppName)
-      .master(env.master)
+      .master(env.getMaster)
 
       //.enableHiveSupport()
       .config(conf).getOrCreate()
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala
index 4d8a9a6..7e089b5 100755
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala
@@ -95,7 +95,7 @@
                   result match {
                     case ds: Dataset[_] =>
                       log.debug(s"persisting DataFrame: $resultName")
-                      val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.workingDir}/$jobId/$actionName/$resultName")"""
+                      val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.getWorkingDir}/$jobId/$actionName/$resultName")"""
                       val writeResult = interpreter.interpret(writeLine)
                       if (writeResult != Results.Success) {
                         val err = outStream.toString
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala
index f91107b..9230bca 100644
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala
@@ -120,7 +120,7 @@
         val exportName = exportsBuff.head._1
         val exportFormat = exportsBuff.head._2
         //notifier.info(s"exporting to -> ${env.workingDir}/$jobId/$actionName/$exportName")
-        result.write.mode(SaveMode.Overwrite).format(exportFormat).save(s"${env.workingDir}/$jobId/$actionName/$exportName")
+        result.write.mode(SaveMode.Overwrite).format(exportFormat).save(s"${env.getWorkingDir}/$jobId/$actionName/$exportName")
       }
       notifier.info(s"================= finished action $actionName =================")
     }
diff --git a/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/frameworks/spark/runtime/AmaContext.scala b/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/frameworks/spark/runtime/AmaContext.scala
index fc9fb94..930927c 100644
--- a/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/frameworks/spark/runtime/AmaContext.scala
+++ b/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/frameworks/spark/runtime/AmaContext.scala
@@ -40,7 +40,7 @@
   }
 
   def getDataFrame(actionName: String, dfName: String, format: String = "parquet"): DataFrame = {
-    spark.read.format(format).load(s"${env.workingDir}/$jobId/$actionName/$dfName")
+    spark.read.format(format).load(s"${env.getWorkingDir}/$jobId/$actionName/$dfName")
   }
 
   def getDataset[T: Encoder](actionName: String, dfName: String, format: String = "parquet"): Dataset[T] = {
diff --git a/leader-common/build.gradle b/leader-common/build.gradle
index 7ee53d1..f7c5242 100644
--- a/leader-common/build.gradle
+++ b/leader-common/build.gradle
@@ -69,10 +69,12 @@
     compile project(':amaterasu-sdk')
 
     compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.3'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.4'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4'
-    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: '2.9.8'
+    
     compile group: 'org.reflections', name: 'reflections', version: '0.9.11'
     compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '4.2.0.201601211800-r'
     compile group: 'org.apache.activemq', name: 'activemq-broker', version: '5.15.3'
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt
new file mode 100644
index 0000000..323cc20
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.utilities
+
+import java.io.File
+import java.io.FileInputStream
+import java.nio.file.Files
+import java.nio.file.Paths
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import com.fasterxml.jackson.module.kotlin.KotlinModule
+import com.fasterxml.jackson.module.kotlin.readValue
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.TaskData
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.dataobjects.ExecData
+
+import org.apache.amaterasu.common.execution.dependencise.Dependencies
+import org.apache.amaterasu.common.execution.dependencise.PythonDependencies
+import org.apache.amaterasu.common.logging.KLogging
+
+import org.apache.amaterasu.common.runtime.Environment
+import org.yaml.snakeyaml.Yaml
+
+
+object DataLoader : KLogging() {
+
+    private val mapper = ObjectMapper()
+
+    private val ymlMapper = ObjectMapper(YAMLFactory())
+
+    init {
+        mapper.registerModule(KotlinModule())
+        ymlMapper.registerModule(KotlinModule())
+    }
+
+    @JvmStatic
+    fun getTaskDataBytes(actionData: ActionData, env: String): ByteArray {
+        return mapper.writeValueAsBytes(getTaskData(actionData, env))
+    }
+
+    @JvmStatic
+    fun getTaskData(actionData: ActionData, env: String): TaskData {
+        val srcFile = actionData.src
+        var src = ""
+
+        if (srcFile.isNotEmpty()) {
+            src = File("repo/src/$srcFile").readText()
+        }
+
+        val envValue = File("repo/env/$env/job.yml").readText()
+
+        val envData = ymlMapper.readValue<Environment>(envValue)
+
+        val exports = actionData.exports
+
+        return TaskData(src, envData, actionData.groupId, actionData.typeId, exports)
+    }
+
+    @JvmStatic
+    fun getTaskDataString(actionData: ActionData, env: String): String {
+        return mapper.writeValueAsString(getTaskData(actionData, env))
+    }
+
+    @JvmStatic
+    fun getExecutorDataBytes(env: String, clusterConf: ClusterConfig): ByteArray {
+        return mapper.writeValueAsBytes(getExecutorData(env, clusterConf))
+    }
+
+    @JvmStatic
+    fun getExecutorData(env: String, clusterConf: ClusterConfig): ExecData {
+
+        // loading the job configuration
+        val envValue = File("repo/env/$env/job.yml").readText() //TODO: change this to YAML
+        val envData = ymlMapper.readValue<Environment>(envValue)
+
+        // loading all additional configurations
+        val files = File("repo/env/$env/").listFiles().filter { it.isFile }.filter { it.name != "job.yml" }
+        val config = files.map { yamlToMap(it) }.toMap()
+
+        // loading the job's dependencies
+        lateinit var depsData: Dependencies
+        lateinit var pyDepsData: PythonDependencies
+        if (Files.exists(Paths.get("repo/deps/jars.yml"))) {
+            val depsValue = File("repo/deps/jars.yml").readText()
+            depsData = ymlMapper.readValue(depsValue)
+        }
+        if (Files.exists(Paths.get("repo/deps/python.yml"))) {
+            val pyDepsValue = File("repo/deps/python.yml").readText()
+            pyDepsData = ymlMapper.readValue(pyDepsValue)
+        }
+        val data = mapper.writeValueAsBytes(ExecData(envData, depsData, pyDepsData, config))
+        return ExecData(envData, depsData, pyDepsData, config)
+    }
+
+    fun yamlToMap(file: File): Pair<String, Map<String, Any>> {
+
+        val yaml = Yaml()
+        val conf = yaml.load<Map<String, Any>>(FileInputStream(file))
+
+        return file.name.replace(".yml", "") to conf
+    }
+
+    @JvmStatic
+    fun getExecutorDataString(env: String, clusterConf: ClusterConfig): String {
+        return mapper.writeValueAsString(getExecutorData(env, clusterConf))
+    }
+
+}
\ No newline at end of file
diff --git a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
index f28e514..e862e1f 100755
--- a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
+++ b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
@@ -1,111 +1,111 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.common.utilities
-
-import java.io.{File, FileInputStream}
-import java.nio.file.{Files, Paths}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.{ActionData, ExecData, TaskData}
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies}
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.yaml.snakeyaml.Yaml
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.io.Source
-
-
-object DataLoader extends Logging {
-
-  val mapper = new ObjectMapper()
-  mapper.registerModule(DefaultScalaModule)
-
-  val ymlMapper = new ObjectMapper(new YAMLFactory())
-  ymlMapper.registerModule(DefaultScalaModule)
-
-  def getTaskDataBytes(actionData: ActionData, env: String): Array[Byte] = {
-    mapper.writeValueAsBytes(getTaskData(actionData, env))
-  }
-
-  def getTaskData(actionData: ActionData, env: String): TaskData = {
-    val srcFile = actionData.getSrc
-    var src = ""
-
-    if(!srcFile.isEmpty){
-       src = Source.fromFile(s"repo/src/$srcFile").mkString
-    }
-
-    val envValue = Source.fromFile(s"repo/env/$env/job.yml").mkString
-
-    val envData = ymlMapper.readValue(envValue, classOf[Environment])
-
-    val exports = actionData.getExports.asScala.toMap // Kotlin to Scala TODO: Remove me as fast as you can
-
-    TaskData(src, envData, actionData.getGroupId, actionData.getTypeId, exports)
-  }
-
-  def getTaskDataString(actionData: ActionData, env: String): String = {
-    mapper.writeValueAsString(getTaskData(actionData, env))
-  }
-
-  def getExecutorDataBytes(env: String, clusterConf: ClusterConfig): Array[Byte] = {
-    mapper.writeValueAsBytes(getExecutorData(env, clusterConf))
-  }
-
-  def getExecutorData(env: String, clusterConf: ClusterConfig): ExecData = {
-
-    // loading the job configuration
-    val envValue = Source.fromFile(s"repo/env/$env/job.yml").mkString //TODO: change this to YAML
-    val envData = ymlMapper.readValue(envValue, classOf[Environment])
-    // loading all additional configurations
-    val files = new File(s"repo/env/$env/").listFiles().filter(_.isFile).filter(_.getName != "job.yml")
-    val config = files.map(yamlToMap).toMap
-    // loading the job's dependencies
-    var depsData: Dependencies = null
-    var pyDepsData: PythonDependencies = null
-    if (Files.exists(Paths.get("repo/deps/jars.yml"))) {
-      val depsValue = Source.fromFile(s"repo/deps/jars.yml").mkString
-      depsData = ymlMapper.readValue(depsValue, classOf[Dependencies])
-    }
-    if (Files.exists(Paths.get("repo/deps/python.yml"))) {
-      val pyDepsValue = Source.fromFile(s"repo/deps/python.yml").mkString
-      pyDepsData = ymlMapper.readValue(pyDepsValue, classOf[PythonDependencies])
-    }
-    val data = mapper.writeValueAsBytes(ExecData(envData, depsData, pyDepsData, config))
-    ExecData(envData, depsData, pyDepsData, config)
-  }
-
-  def yamlToMap(file: File): (String, Map[String, Any]) = {
-
-    val yaml = new Yaml()
-    val conf = yaml.load(new FileInputStream(file)).asInstanceOf[java.util.Map[String, Any]].asScala.toMap
-
-    (file.getName.replace(".yml", ""), conf)
-  }
-
-  def getExecutorDataString(env: String, clusterConf: ClusterConfig): String = {
-    mapper.writeValueAsString(getExecutorData(env, clusterConf))
-  }
-
-}
-
-class ConfMap[String, T <: ConfMap[String, T]] extends mutable.ListMap[String, Either[String, T]]
\ No newline at end of file
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements.  See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License.  You may obtain a copy of the License at
+// *
+// *      http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//package org.apache.amaterasu.leader.common.utilities
+//
+//import java.io.{File, FileInputStream}
+//import java.nio.file.{Files, Paths}
+//
+//import com.fasterxml.jackson.databind.ObjectMapper
+//import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+//import com.fasterxml.jackson.module.scala.DefaultScalaModule
+//import org.apache.amaterasu.common.configuration.ClusterConfig
+//import org.apache.amaterasu.common.dataobjects.{ActionData, ExecData, TaskData}
+//import org.apache.amaterasu.common.execution.dependencise.{Dependencies, PythonDependencies}
+//import org.apache.amaterasu.common.logging.Logging
+//import org.apache.amaterasu.common.runtime.Environment
+//import org.yaml.snakeyaml.Yaml
+//
+//import scala.collection.JavaConverters._
+//import scala.collection.mutable
+//import scala.io.Source
+//
+//
+//object DataLoader extends Logging {
+//
+//  val mapper = new ObjectMapper()
+//  mapper.registerModule(DefaultScalaModule)
+//
+//  val ymlMapper = new ObjectMapper(new YAMLFactory())
+//  ymlMapper.registerModule(DefaultScalaModule)
+//
+//  def getTaskDataBytes(actionData: ActionData, env: String): Array[Byte] = {
+//    mapper.writeValueAsBytes(getTaskData(actionData, env))
+//  }
+//
+//  def getTaskData(actionData: ActionData, env: String): TaskData = {
+//    val srcFile = actionData.getSrc
+//    var src = ""
+//
+//    if(!srcFile.isEmpty){
+//       src = Source.fromFile(s"repo/src/$srcFile").mkString
+//    }
+//
+//    val envValue = Source.fromFile(s"repo/env/$env/job.yml").mkString
+//
+//    val envData = ymlMapper.readValue(envValue, classOf[Environment])
+//
+//    val exports = actionData.getExports.asScala.toMap // Kotlin to Scala TODO: Remove me as fast as you can
+//
+//    TaskData(src, envData, actionData.getGroupId, actionData.getTypeId, exports)
+//  }
+//
+//  def getTaskDataString(actionData: ActionData, env: String): String = {
+//    mapper.writeValueAsString(getTaskData(actionData, env))
+//  }
+//
+//  def getExecutorDataBytes(env: String, clusterConf: ClusterConfig): Array[Byte] = {
+//    mapper.writeValueAsBytes(getExecutorData(env, clusterConf))
+//  }
+//
+//  def getExecutorData(env: String, clusterConf: ClusterConfig): ExecData = {
+//
+//    // loading the job configuration
+//    val envValue = Source.fromFile(s"repo/env/$env/job.yml").mkString //TODO: change this to YAML
+//    val envData = ymlMapper.readValue(envValue, classOf[Environment])
+//    // loading all additional configurations
+//    val files = new File(s"repo/env/$env/").listFiles().filter(_.isFile).filter(_.getName != "job.yml")
+//    val config = files.map(yamlToMap).toMap
+//    // loading the job's dependencies
+//    var depsData: Dependencies = null
+//    var pyDepsData: PythonDependencies = null
+//    if (Files.exists(Paths.get("repo/deps/jars.yml"))) {
+//      val depsValue = Source.fromFile(s"repo/deps/jars.yml").mkString
+//      depsData = ymlMapper.readValue(depsValue, classOf[Dependencies])
+//    }
+//    if (Files.exists(Paths.get("repo/deps/python.yml"))) {
+//      val pyDepsValue = Source.fromFile(s"repo/deps/python.yml").mkString
+//      pyDepsData = ymlMapper.readValue(pyDepsValue, classOf[PythonDependencies])
+//    }
+//    val data = mapper.writeValueAsBytes(ExecData(envData, depsData, pyDepsData, config))
+//    ExecData(envData, depsData, pyDepsData, config)
+//  }
+//
+//  def yamlToMap(file: File): (String, Map[String, Any]) = {
+//
+//    val yaml = new Yaml()
+//    val conf = yaml.load(new FileInputStream(file)).asInstanceOf[java.util.Map[String, Any]].asScala.toMap
+//
+//    (file.getName.replace(".yml", ""), conf)
+//  }
+//
+//  def getExecutorDataString(env: String, clusterConf: ClusterConfig): String = {
+//    mapper.writeValueAsString(getExecutorData(env, clusterConf))
+//  }
+//
+//}
+//
+//class ConfMap[String, T <: ConfMap[String, T]] extends mutable.ListMap[String, Either[String, T]]
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index 27fd728..6ede8d8 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -162,7 +162,7 @@
             val envYaml = configManager.getActionConfigContent(actionData.getName, actionData.getConfig)
             writeConfigFile(envYaml, jobManager.getJobId, actionData.getName, "env.yaml")
 
-            val dataStores = DataLoader.getTaskData(actionData, env).exports
+            val dataStores = DataLoader.getTaskData(actionData, env).getExports
             val writer = new StringWriter()
             yamlMapper.writeValue(writer, dataStores)
             val dataStoresYaml = writer.toString