fixed merge
diff --git a/README.md b/README.md
index bbc54e1..4a9fd12 100755
--- a/README.md
+++ b/README.md
@@ -14,7 +14,10 @@
   ~ See the License for the specific language governing permissions and
   ~ limitations under the License.
   -->
-# Apache Amaterasu [![Build Status](https://travis-ci.org/apache/incubator-amaterasu.svg?branch=master)](https://travis-ci.org/apache/incubator-amaterasu)
+# Apache Amaterasu 
+
+[![Build Status](https://travis-ci.org/apache/incubator-amaterasu.svg?branch=master)](https://travis-ci.org/apache/incubator-amaterasu)
+[![License](http://img.shields.io/:license-Apache%202-blue.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt)
 
                                                /\
                                               /  \ /\
diff --git a/build.gradle b/build.gradle
index dc63d02..7b79da4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 buildscript {
-    ext.kotlin_version = '1.3.0'
+    ext.kotlin_version = '1.3.21'
 
     repositories {
         mavenCentral()
diff --git a/common/build.gradle b/common/build.gradle
index 554a879..294bd36 100644
--- a/common/build.gradle
+++ b/common/build.gradle
@@ -45,6 +45,7 @@
     compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.9'
     compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.9'
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.4'
+    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: '2.9.8'
 
     compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
     compile "org.jetbrains.kotlin:kotlin-reflect"
@@ -58,13 +59,19 @@
     compile('com.jcabi:jcabi-aether:0.10.1') {
         exclude group: 'org.jboss.netty'
     }
-    compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
-    
-    provided group: 'org.apache.hadoop', name: 'hadoop-yarn-client', version: '2.7.3'
-    provided group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.7.3'
-    provided group: 'org.apache.hadoop', name: 'hadoop-yarn-api', version: '2.7.3'
-    provided group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.7.3'
 
+    compile('org.apache.activemq:activemq-client:5.15.2') {
+        exclude group: 'org.jboss.netty'
+    }
+
+    compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
+    compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
+    
+    provided group: 'org.apache.hadoop', name: 'hadoop-yarn-client', version: '2.8.4'
+    provided group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.8.4'
+    provided group: 'org.apache.hadoop', name: 'hadoop-yarn-api', version: '2.8.4'
+    provided group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.8.4'
+    compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
     testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
     testRuntime 'org.pegdown:pegdown:1.1.0'
     testCompile 'junit:junit:4.11'
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ExecData.kt
similarity index 74%
copy from common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
copy to common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ExecData.kt
index d16d6f8..763c28e 100644
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/ExecData.kt
@@ -16,7 +16,8 @@
  */
 package org.apache.amaterasu.common.dataobjects
 
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies}
+import org.apache.amaterasu.common.execution.dependencies.Dependencies
+import org.apache.amaterasu.common.execution.dependencies.PythonDependencies
 import org.apache.amaterasu.common.runtime.Environment
 
-case class ExecData(env: Environment, deps: Dependencies, pyDeps: PythonDependencies, configurations: Map[String, Map[String, Any]])
+data class ExecData(val env: Environment, val deps: Dependencies?, val pyDeps: PythonDependencies?, val configurations: Map<String, Map<String, Any>>)
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/TaskData.kt
similarity index 79%
rename from common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
rename to common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/TaskData.kt
index d16d6f8..53a5e20 100644
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/ExecData.scala
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/dataobjects/TaskData.kt
@@ -16,7 +16,6 @@
  */
 package org.apache.amaterasu.common.dataobjects
 
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies}
 import org.apache.amaterasu.common.runtime.Environment
 
-case class ExecData(env: Environment, deps: Dependencies, pyDeps: PythonDependencies, configurations: Map[String, Map[String, Any]])
+data class TaskData(val src: String, val env: Environment, val groupId: String, val typeId: String, val exports: Map<String, String>)
diff --git a/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencies/Dependencies.kt
old mode 100755
new mode 100644
similarity index 74%
copy from common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala
copy to common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencies/Dependencies.kt
index f49d8ad..f12fd9a
--- a/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencies/Dependencies.kt
@@ -14,17 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.common.runtime
+package org.apache.amaterasu.common.execution.dependencies
 
-case class Environment() {
+import org.apache.amaterasu.common.dataobjects.Artifact
+import org.apache.amaterasu.common.dataobjects.Repo
 
-  var name: String = ""
-  var master: String = ""
-
-  var inputRootPath: String = ""
-  var outputRootPath: String = ""
-  var workingDir: String = ""
-
-  var configuration: Map[String, String] = _
-
-}
\ No newline at end of file
+data class Dependencies(val repos: List<Repo>, val artifacts: List<Artifact>)
\ No newline at end of file
diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencies/PythonDependencies.kt b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencies/PythonDependencies.kt
new file mode 100644
index 0000000..08474e8
--- /dev/null
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencies/PythonDependencies.kt
@@ -0,0 +1,3 @@
+package org.apache.amaterasu.common.execution.dependencies
+
+data class PythonDependencies(val filePaths: Array<String>)
\ No newline at end of file
diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencies/PythonPackage.kt b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencies/PythonPackage.kt
new file mode 100644
index 0000000..2d8d7ab
--- /dev/null
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/execution/dependencies/PythonPackage.kt
@@ -0,0 +1,3 @@
+package org.apache.amaterasu.common.execution.dependencies
+
+data class PythonPackage(val packageId: String, val index: String? = null, val channel: String? = null)
\ No newline at end of file
diff --git a/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala b/common/src/main/kotlin/org/apache/amaterasu/common/runtime/Environment.kt
old mode 100755
new mode 100644
similarity index 75%
rename from common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala
rename to common/src/main/kotlin/org/apache/amaterasu/common/runtime/Environment.kt
index f49d8ad..b0a5252
--- a/common/src/main/scala/org/apache/amaterasu/common/runtime/Environment.scala
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/runtime/Environment.kt
@@ -16,15 +16,11 @@
  */
 package org.apache.amaterasu.common.runtime
 
-case class Environment() {
+data class Environment(
+        var name: String = "",
+        var master: String = "",
+        var inputRootPath: String = "",
+        var outputRootPath: String = "",
+        var workingDir: String = "",
 
-  var name: String = ""
-  var master: String = ""
-
-  var inputRootPath: String = ""
-  var outputRootPath: String = ""
-  var workingDir: String = ""
-
-  var configuration: Map[String, String] = _
-
-}
\ No newline at end of file
+        var configuration: Map<String, Any> = mapOf())
\ No newline at end of file
diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/utils/ActiveNotifier.kt b/common/src/main/kotlin/org/apache/amaterasu/common/utils/ActiveNotifier.kt
new file mode 100644
index 0000000..947c7a8
--- /dev/null
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/utils/ActiveNotifier.kt
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.common.utils
+
+import org.apache.activemq.ActiveMQConnectionFactory
+import org.apache.amaterasu.common.execution.actions.Notification
+import org.apache.amaterasu.common.execution.actions.Notifier
+import org.apache.amaterasu.common.execution.actions.enums.NotificationLevel
+import org.apache.amaterasu.common.execution.actions.enums.NotificationType
+import org.codehaus.jackson.map.ObjectMapper
+import javax.jms.DeliveryMode
+import javax.jms.MessageProducer
+import javax.jms.Session
+
+class ActiveNotifier(address: String) : Notifier() {
+
+    private val mapper = ObjectMapper()
+
+    private var producer: MessageProducer
+    private var session: Session
+
+    init {
+        log.info("report address $address")
+
+        // setting up activeMQ connection
+        val connectionFactory = ActiveMQConnectionFactory(address)
+        val connection = connectionFactory.createConnection()
+        connection.start()
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
+        val destination = session.createTopic("JOB.REPORT")
+        producer = session.createProducer(destination)
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
+
+        //mapper.registerModule(KotlinModule())
+    }
+
+    override fun info(msg: String) {
+
+        val notification = Notification("", msg, NotificationType.Info, NotificationLevel.Execution)
+        val notificationJson = mapper.writeValueAsString(notification)
+
+        log.info(notificationJson)
+        val message = session.createTextMessage(notificationJson)
+        producer.send(message)
+
+    }
+
+    override fun error(line: String, msg: String) {
+
+        println("Error executing line: $line message: $msg")
+
+        val notification = Notification(line, msg, NotificationType.Error, NotificationLevel.Code)
+        val notificationJson = mapper.writeValueAsString(notification)
+        val message = session.createTextMessage(notificationJson)
+        producer.send(message)
+    }
+
+    override fun success(line: String) {
+
+        log.info("successfully executed line: $line")
+
+        val notification = Notification(line, "", NotificationType.Success, NotificationLevel.Code)
+        val notificationJson = mapper.writeValueAsString(notification)
+        val message = session.createTextMessage(notificationJson)
+        producer.send(message)
+    }
+}
\ No newline at end of file
diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
index ddf9926..be151c0 100755
--- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
@@ -93,8 +93,7 @@
 
   }
 
-
-  val YARNConf = new YARN()
+  val yarn = new YARN()
 
   class Spark {
     var home: String = ""
@@ -135,7 +134,7 @@
     }
   }
 
-  object Jobs {
+  class Jobs {
 
     var cpus: Double = 1
     var mem: Long = 1024
@@ -147,10 +146,10 @@
       if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").toLong
       if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").toLong
 
-      Tasks.load(props)
+      tasks.load(props)
     }
 
-    object Tasks {
+    class Tasks {
 
       var attempts: Int = 3
       var cpus: Int = 1
@@ -165,8 +164,12 @@
       }
     }
 
+    val tasks = new Tasks()
+
   }
 
+  val jobs = new Jobs()
+
   object AWS {
 
     var accessKeyId: String = ""
@@ -224,9 +227,11 @@
     Jar = this.getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
     JarName = Paths.get(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath).getFileName.toString
 
-    Jobs.load(props)
+    val jobsss = new Jobs()
+    jobsss.load(props)
+
     webserver.load(props)
-    YARNConf.load(props)
+    yarn.load(props)
     spark.load(props)
     mesos.load(props)
 
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala
deleted file mode 100644
index a745581..0000000
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/TaskData.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.dataobjects
-
-import org.apache.amaterasu.common.runtime.Environment
-
-
-/* TODO: Future eyal and yaniv - The TaskData class should support overriding configurations for execData configurations
-// more specifiably, if execData holds configurations for spark setup (vcores/memory) a task should be able to override those
-*/
-case class TaskData(src: String, env: Environment, groupId: String, typeId: String, exports: Map[String, String])
diff --git a/common/src/main/scala/org/apache/amaterasu/common/execution/dependencies/Dependencies.scala b/common/src/main/scala/org/apache/amaterasu/common/execution/dependencies/Dependencies.scala
deleted file mode 100755
index c108a9d..0000000
--- a/common/src/main/scala/org/apache/amaterasu/common/execution/dependencies/Dependencies.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution.dependencies
-import scala.collection.mutable.ListBuffer
-
-case class Dependencies(repos: ListBuffer[Repo], artifacts: List[Artifact])
-case class PythonDependencies(filePaths: Array[String])
-case class Repo(id: String, `type`: String, url: String)
-case class Artifact(groupId: String, artifactId: String, version: String)
\ No newline at end of file
diff --git a/docs/Makefile b/docs/Makefile
deleted file mode 100644
index 7098622..0000000
--- a/docs/Makefile
+++ /dev/null
@@ -1,20 +0,0 @@
-# Minimal makefile for Sphinx documentation
-#
-
-# You can set these variables from the command line.
-SPHINXOPTS    =
-SPHINXBUILD   = sphinx-build
-SPHINXPROJ    = ApacheAmaterasuincubating
-SOURCEDIR     = .
-BUILDDIR      = _build
-
-# Put it first so that "make" without argument is like "make help".
-help:
-	@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
-
-.PHONY: help Makefile
-
-# Catch-all target: route all unknown targets to Sphinx using the new
-# "make mode" option.  $(O) is meant as a shortcut for $(SPHINXOPTS).
-%: Makefile
-	@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
\ No newline at end of file
diff --git a/docs/conf.py b/docs/conf.py
deleted file mode 100644
index 3a1b686..0000000
--- a/docs/conf.py
+++ /dev/null
@@ -1,164 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Configuration file for the Sphinx documentation builder.
-#
-# This file does only contain a selection of the most common options. For a
-# full list see the documentation:
-# http://www.sphinx-doc.org/en/master/config
-
-from recommonmark.parser import CommonMarkParser
-
-
-# -- Path setup --------------------------------------------------------------
-
-# If extensions (or modules to document with autodoc) are in another directory,
-# add these directories to sys.path here. If the directory is relative to the
-# documentation root, use os.path.abspath to make it absolute, like shown here.
-#
-# import os
-# import sys
-# sys.path.insert(0, os.path.abspath('.'))
-
-
-# -- Project information -----------------------------------------------------
-
-project = 'Apache Amaterasu (incubating)'
-copyright = '2018, Apache Amaterasu (incubating)'
-author = 'Apache Amaterasu (incubating)'
-
-# The short X.Y version
-version = ''
-# The full version, including alpha/beta/rc tags
-release = '0.2.0'
-
-
-# -- General configuration ---------------------------------------------------
-
-# If your documentation needs a minimal Sphinx version, state it here.
-#
-# needs_sphinx = '1.0'
-
-# Add any Sphinx extension module names here, as strings. They can be
-# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
-# ones.
-extensions = [
-]
-
-
-source_parsers = {
-    '.md': CommonMarkParser,
-}
-
-
-# Add any paths that contain templates here, relative to this directory.
-templates_path = ['_templates']
-
-# The suffix(es) of source filenames.
-# You can specify multiple suffix as a list of string:
-#
-source_suffix = ['.rst', '.md']
-# source_suffix = '.rst'
-
-# The master toctree document.
-master_doc = 'index'
-
-# The language for content autogenerated by Sphinx. Refer to documentation
-# for a list of supported languages.
-#
-# This is also used if you do content translation via gettext catalogs.
-# Usually you set "language" from the command line for these cases.
-language = None
-
-# List of patterns, relative to source directory, that match files and
-# directories to ignore when looking for source files.
-# This pattern also affects html_static_path and html_extra_path .
-exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
-
-# The name of the Pygments (syntax highlighting) style to use.
-pygments_style = 'sphinx'
-
-
-# -- Options for HTML output -------------------------------------------------
-
-# The theme to use for HTML and HTML Help pages.  See the documentation for
-# a list of builtin themes.
-#
-html_theme = 'sphinx_rtd_theme'
-
-# Theme options are theme-specific and customize the look and feel of a theme
-# further.  For a list of options available for each theme, see the
-# documentation.
-#
-# html_theme_options = {}
-
-# Add any paths that contain custom static files (such as style sheets) here,
-# relative to this directory. They are copied after the builtin static files,
-# so a file named "default.css" will overwrite the builtin "default.css".
-html_static_path = ['_static']
-
-# Custom sidebar templates, must be a dictionary that maps document names
-# to template names.
-#
-# The default sidebars (for documents that don't match any pattern) are
-# defined by theme itself.  Builtin themes are using these templates by
-# default: ``['localtoc.html', 'relations.html', 'sourcelink.html',
-# 'searchbox.html']``.
-#
-# html_sidebars = {}
-
-
-# -- Options for HTMLHelp output ---------------------------------------------
-
-# Output file base name for HTML help builder.
-htmlhelp_basename = 'ApacheAmaterasuincubatingdoc'
-
-
-# -- Options for LaTeX output ------------------------------------------------
-
-latex_elements = {
-    # The paper size ('letterpaper' or 'a4paper').
-    #
-    # 'papersize': 'letterpaper',
-
-    # The font size ('10pt', '11pt' or '12pt').
-    #
-    # 'pointsize': '10pt',
-
-    # Additional stuff for the LaTeX preamble.
-    #
-    # 'preamble': '',
-
-    # Latex figure (float) alignment
-    #
-    # 'figure_align': 'htbp',
-}
-
-# Grouping the document tree into LaTeX files. List of tuples
-# (source start file, target name, title,
-#  author, documentclass [howto, manual, or own class]).
-latex_documents = [
-    (master_doc, 'ApacheAmaterasuincubating.tex', 'Apache Amaterasu (incubating) Documentation',
-     'Apache Amaterasu (incubating)', 'manual'),
-]
-
-
-# -- Options for manual page output ------------------------------------------
-
-# One entry per manual page. List of tuples
-# (source start file, name, description, authors, manual section).
-man_pages = [
-    (master_doc, 'apacheamaterasuincubating', 'Apache Amaterasu (incubating) Documentation',
-     [author], 1)
-]
-
-
-# -- Options for Texinfo output ----------------------------------------------
-
-# Grouping the document tree into Texinfo files. List of tuples
-# (source start file, target name, title, author,
-#  dir menu entry, description, category)
-texinfo_documents = [
-    (master_doc, 'ApacheAmaterasuincubating', 'Apache Amaterasu (incubating) Documentation',
-     author, 'ApacheAmaterasuincubating', 'One line description of project.',
-     'Miscellaneous'),
-]
\ No newline at end of file
diff --git a/docs/docs/config.md b/docs/docs/config.md
new file mode 100644
index 0000000..f49186a
--- /dev/null
+++ b/docs/docs/config.md
@@ -0,0 +1,79 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->  
+##Overview
+
+One of the core capabilities of Apache Amaterasu is configuration management for data pipelines. Configurations are stored in environments. By default, environments are defined in folders named `Env` that can be stored both at the root of the Amaterasu repo which is applied to all the actions in the repo as well as in the action folder under: `src/{action_name}/{env}/` which are available only for the specific action. 
+
+**Note:** When the same configuration value is defined at the root and for an action, the action level definition overrides the the global configuration.
+
+The following repo structure defines three environments (`dev`, `test` and `prod`) both at the root and for the `start` action:
+ 
+```
+repo
++-- env/
+|   +-- dev/
+|   |   +-- job.yaml
+|   |   +-- spark.yaml
+|   +-- test/
+|   |   +-- job.yaml
+|   |   +-- spark.yaml
+|   +-- prod/
+|       +-- job.yaml
+|       +-- spark.yaml
++-- src/
+|   +-- start/
+|       +-- dev/
+|       |   +-- job.yaml
+|       |   +-- spark.yaml
+|       +-- test/
+|       |   +-- job.yaml
+|       |   +-- spark.yaml
+|       +-- prod/
+|           +-- job.yaml
+|           +-- spark.yaml
++-- maki.yaml 
+
+```
+
+## Custom configuration locations
+
+Additional configuration paths can be added both for global and action configurations by specifying the `config` element in the `maki.yaml` as shown in the following example:
+
+```yaml
+config: myconfig/{env}/
+job-name:    amaterasu-test
+flow:
+    - name: start
+      config: cfg/start/{env}/
+      runner:
+          group: spark
+          type: python        
+      file: start.py
+
+```
+## Configuration Types
+
+Amaterasu allows the configuration of three main areas:
+
+### Frameworks
+
+All frameworks have their own configuration, Apache Amaterasu allows different frameworks to define their configurations per environment and by doing so, allowing to configure how actions will be configured when deployed.
+
+For more information about specific framework configuration options, look at the [frameworks](frameworks/) section of this documentation.
+
+### Datasets 
+### Custom Configuration
\ No newline at end of file
diff --git a/docs/docs/deployments.md b/docs/docs/deployments.md
new file mode 100644
index 0000000..4377da3
--- /dev/null
+++ b/docs/docs/deployments.md
@@ -0,0 +1,165 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+##Overview
+
+Amaterasu Deployments are a combination of yaml deployments definition (usually defined in the `maki.yml` or `maki.yaml` file), the environment configuration (described in the configuration section) and artifacts to be deployed.
+
+### The Deployments DSL
+The deployment DSL, allows developers to define Actions to be deployed, their order of deployment and execution using a simple YAML definition. The following example:
+
+```yaml
+job-name:    amaterasu-test
+seq:
+    - name: start
+      runner:
+          group: spark
+          type: jar        
+      artifact: 
+          groupId: io.shinto
+          artifactId: amaterasu-simple-spark
+          version: 0.3
+      repo:
+          id: packagecloud
+          type: default
+          url: https://packagecloud.io/yanivr/amaterasu-demo/maven2
+      class: DataGenerator
+    - name: step2
+      runner:
+          group: spark
+          type: pyspark
+      file: file.py
+      
+```
+
+The above deployment, defines two actions which will run sequantially. Each action, defines a [framework](../frameworks/) runner to be used and an executable to be run.
+
+### Executables
+Cureently, Amaterasu actions can define two types of executables:
+
+ - **Files** 
+   
+File executables can be located inside the Amaterasu repo, under the `src` folder, for example, in the following action definition:
+ 
+ 
+```yaml
+job-name:    amaterasu-test
+seq:     
+    - name: step1
+      runner:
+         group: spark
+         type: pyspark
+      file: file.py
+```
+     
+the executable `file.py` would be located under the src folder as follows:
+
+```yaml
+repo/
++-- src/
+|   +-- file.py
++-- maki.yaml
+
+```          
+
+Files can also be specified as URLs, where currently the `http`, `https` and `s3a` schemas are supported at this time for example:
+
+```yaml
+job-name:    amaterasu-test
+seq:     
+    - name: step1
+      runner:
+         group: spark
+         type: pyspark
+      file: s3a://my-source-bucket/file.py
+```
+
+ - **Artifacts** 
+ 
+Currently, the artifact directive supports only artifacts stored in Maven repositories. In addition to the artifact details, you will need to specify the details of the repository where the artifact is available. The following example, fetches an artifact to be submitted as a spark job:
+
+```yaml
+job-name:    amaterasu-test
+seq:
+    - name: start
+      runner:
+          group: spark
+          type: jar        
+      artifact: 
+          groupId: io.shinto
+          artifactId: amaterasu-simple-spark
+          version: 0.3
+      repo:
+          id: packagecloud
+          type: default
+          url: https://packagecloud.io/yanivr/amaterasu-demo/maven2
+      class: DataGenerator
+```
+#### Error Handling Actions
+
+When an action fails, Amaterasu will re-queue that action for execution a configurable number of times. If the action continues to fail, Amaterasu allows for the definition of error handling actions that will execute when an action fails repeatedly. The following deployment defines an error handling action. 
+
+```yaml
+job-name:    amaterasu-sample
+flow:
+    - name: start
+      runner:
+         group: spark
+         type: pyspark
+      file: file.py
+      error:        
+         name: error
+         runner:
+            group: spark
+            type: pyspark
+         file: error.py        
+
+```
+ 
+## Dependencies
+
+ In addition to defining executables, Amaterasu jobs and actions can define dependencies to be deployed in the containers and used in runtime. Dependencies can be defined either globally for a job, under the `deps` folder, or per action in the action folder:
+ 
+```
+repo
++-- deps/
+|   +-- jars.yaml          #contains golbal depenedencies which are deployed in all action containers
++-- src/
+|   +-- start/
+|   |   +-- jars.yaml      #contains depenedencies which are deployed only in the start action container
+|   +-- file.py
++-- maki.yaml 
+
+```
+
+ - Java/JVM Dependencies
+ 
+JVM dependencies are defined in the `jars.yaml` file, the file defines a set of dependencies and repositories where those dependencies are available. THe following example shows a simple `jars.yaml` file:
+
+```yaml
+repos:
+  - id: maven-central
+    type: default
+    url: http://central.maven.org/maven2/
+artifacts:  
+  - groupId: com.flyberrycapital
+    artifactId: scala-slack_2.10
+    version: 0.3.0
+
+```
+ 
+ - Python Dependencies
\ No newline at end of file
diff --git a/docs/docs/frameworks.md b/docs/docs/frameworks.md
new file mode 100644
index 0000000..3ee07c5
--- /dev/null
+++ b/docs/docs/frameworks.md
@@ -0,0 +1,53 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+# Overview
+
+Amaterasu supports different processing frameworks to be executed. Amaterasu frameworks provides two main components for integrating with such frameworks:
+
+ - **Dispatcher** 
+ 
+   The dispatcher is in charge of creating and configuring a containers for actions of a specific framework. It makes sure that the executable and any dependencies are available in the container, as well as the environment configuration files, and sets the command to be executed.  
+   
+ - **Runtime Library**
+   
+   The runtime library provide an easy way to consume environment configuration and share data between actions. The main entry point for doing so is using the Amaterasu Context object. Amaterasu Context exposes the following functionality:
+   
+   **Note:** Each runtime (Java, Python, etc.) and framework have slightly different implementation of the Amaterasu context. To develop using a specific Framework, please consult the frameworks documentation bellow.
+   
+   - **Env**
+      
+    The env object contains the configuration for the current environment.
+    
+    
+    
+
+   - **Datasets and Dataset configuration**
+    
+    While [datasets](config/#datasets/) are configured under an environment, Amaterasu datasets are treated differently from other configurations, as they provide the integration point between different actions. Datasets can be either consumed as a configuration or to be loaded directly into an appropriate data structure for the specific framework and runtime. 
+
+# Amaterasu Frameworks
+
+## Apache Spark
+
+### Spark Configuration
+
+### Scala
+### PySpark
+
+## Python 
+
+## Java and JVM programs
\ No newline at end of file
diff --git a/docs/docs/images/amaterasu-logo-web.png b/docs/docs/images/amaterasu-logo-web.png
new file mode 100644
index 0000000..4f60d19
--- /dev/null
+++ b/docs/docs/images/amaterasu-logo-web.png
Binary files differ
diff --git a/docs/docs/index.md b/docs/docs/index.md
new file mode 100755
index 0000000..3d92cfb
--- /dev/null
+++ b/docs/docs/index.md
@@ -0,0 +1,88 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+# Apache Amaterasu (incubating) [![Build Status](https://travis-ci.org/apache/incubator-amaterasu.svg?branch=master)](https://travis-ci.org/apache/incubator-amaterasu)
+
+![Apache Amaterasu](images/amaterasu-logo-web.png)                                                        
+
+Apache Amaterasu is an open-source framework providing configuration management and deployment of containerized data pipelines. Amaterasu allows developers and data scientists to write, collaborate and easily deploy data pipelines to different cluster environments. Amaterasu allows them manage configuration and dependencies for different environments.
+
+## Main concepts
+
+### Repo
+
+Amaterasu jobs are defined within and Amaterasu repository. A repository is a filesystem structure stored in a git repository that contains definitions for the following components: 
+ 
+#### Actions
+
+Put simply, an action is a process that is being managed by Amaterasu. In order to deploy and manage an actions Amaterasu is creating a container with the action, its dependencies and configuration, and deploys it on a cluster (currently only Apache Mesos and YARN clusters are supported with Kubernetes planned for later version).
+
+#### Frameworks
+
+Apache Amaterasu is able to configure and interact with different data processing frameworks. Supported frameworks can be easily configured for deployment, and also integrate seamlessly with custom APIs. 
+For more information about supported frameworks and how to support additional frameworks seeour  [Frameworks](frameworks/) section.
+
+#### Configuration and Environments  
+
+One of the main objectives of Amaterasu is to manage configuration [configuration](config/) for data pipelines. Amaterasu configurations are stored per environment allowing the same pipeline to be deployed with a configuration that fits it's environment.
+
+#### Deployments
+
+Amaterasu [deployments](deployments/) are stored in a `maki.yml` or `maki.yaml` file in the root of the amaterasu repository. The deployment definition contains the different actions, and their order of deployment and execution.
+
+## Setting up Amaterasu  
+
+### Download
+
+Amaterasu is available for [download](http://amaterasu.incubator.apache.org/downloads.html) download page.
+You need to download Amaterasu and extract it on to a node in the cluster. Once you do that, you are just a couple of easy steps away from running your first job.
+
+### Configuration
+
+Configuring amaterasu is simply done buy editing the `amaterasu.properties` file in the top-level amaterasu directory. 
+
+Because Amaterasu supports several cluster environments (currently it supports Apache Mesos and Apache YARN) 
+
+#### Apache Mesos
+
+| property   | Description                    | Value          |
+| ---------- | ------------------------------ | -------------- |
+| Mode       | The cluster manager to be used | mesos          |
+| zk         | The ZooKeeper connection<br> string to be used by<br> amaterasu | The address of a zookeeper node  |
+| master     | The clusters' Mesos master | The address of the Mesos Master    |
+| user       | The user that will be used<br> to run amaterasu | root          |
+
+#### Apache YARN
+
+**Note:**  Different Hadoop distributions need different variations of the YARN configuration. Amaterasu is currently tested regularly with HDP and Amazon EMR. 
+
+
+| property   | Description                    | Value          |
+| ---------- | ------------------------------ | -------------- |
+| Mode       | The cluster manager to be used | mesos          |
+| zk         | The ZooKeeper connection<br> string to be used by<br> amaterasu | The address of a zookeeper node  |
+
+
+## Running a Job
+
+To run an amaterasu job, run the following command in the top-level amaterasu directory:
+
+```
+ama-start.sh --repo="https://github.com/shintoio/amaterasu-job-sample.git" --branch="master" --env="test" --report="code" 
+```
+
+We recommend you either fork or clone the job sample repo and use that as a starting point for creating your first job.
\ No newline at end of file
diff --git a/docs/index.md b/docs/index.md
deleted file mode 100755
index 640a9cf..0000000
--- a/docs/index.md
+++ /dev/null
@@ -1,124 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~      http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-# Apache Amaterasu [![Build Status](https://travis-ci.org/apache/incubator-amaterasu.svg?branch=master)](https://travis-ci.org/apache/incubator-amaterasu)
-
-
-                                               /\
-                                              /  \ /\
-                                             / /\ /  \
-        _                 _                 / /  / /\ \   
-       /_\   _ __   __ _ | |_  ___  _ _  __(_( _(_(_ )_) 
-      / _ \ | '  \ / _` ||  _|/ -_)| '_|/ _` |(_-<| || |
-     /_/ \_\|_|_|_|\__,_| \__|\___||_|  \__,_|/__/ \_,_|
-                                                        
-
-Apache Amaterasu is an open-source, deployment tool for data pipelines. Amaterasu allows developers to write and easily deploy data pipelines, and clusters manage their configuration and dependencies.
-
-## Download
-
-For this preview version, we have packaged amaterasu nicely for you to just [download](https://s3-ap-southeast-2.amazonaws.com/amaterasu/amaterasu.tgz) and extract.
-Once you do that, you are just a couple of easy steps away from running your first job.
-
-## Creating a dev/test Mesos cluster
-
-We have also created a Mesos cluster you can use to test Amaterasu or use for development purposes.
-For more details, visit the [amaterasu-vagrant](https://github.com/shintoio/amaterasu-vagrant) repo
-
-## Configuration
-
-Configuring amaterasu is very simple. Before running amaterasu, open the `amaterasu.properties` file in the top-level amaterasu directory, and verify the following properties:
-
-| property   | Description                | Default value  |
-| ---------- | -------------------------- | -------------- |
-| zk         | The ZooKeeper connection<br> string to be used by<br> amaterasu | 192.168.33.11  |
-| master     | The clusters' Mesos master | 192.168.33.11  |
-| user       | The user that will be used<br> to run amaterasu | root           |
-
-## Running a Job
-
-To run an amaterasu job, run the following command in the top-level amaterasu directory:
-
-```
-ama-start.sh --repo="https://github.com/shintoio/amaterasu-job-sample.git" --branch="master" --env="test" --report="code" 
-```
-
-We recommend you either fork or clone the job sample repo and use that as a starting point for creating your first job.
-
-# Apache Amaterasu Developers Information 
-
-## Building Apache Amaterasu
-
-to build the amaterasu home dir (for dev purposes) run:
-```
-./gradlew buildHomeDir test
-```
-
-to create a distributable jar (clean creates the home dir first) run:
-```
-./gradlew buildDistribution test
-```
-
-## Architecture
-
-Amaterasu is an Apache Mesos framework with two levels of schedulers:
-
-* The ClusterScheduler manages the execution of all the jobs
-* The JobScheduler manages the flow of a job
-
-The main clases in Amateraso are listed bellow:
-
-    +-------------------------+   +------------------------+
-    | ClusterScheduler        |   | Kami                   |
-    |                         |-->|                        |
-    | Manage jobs:            |   | Manages the jobs queue |
-    | Queue new jobs          |   | and Amaterasu cluster  |
-    | Reload interrupted jobs |   +------------------------+
-    | Monitor cluster state   |
-    +-------------------------+
-                |
-                |     +------------------------+
-                |     | JobExecutor            |
-                |     |                        |
-                +---->| Runs the Job Scheduler |
-                      | Communicates with the  |
-                      | ClusterScheduler       |
-                      +------------------------+
-                                 |
-                                 |
-                      +------------------------+      +---------------------------+                      
-                      | JobScheduler           |      | JobParser                 |
-                      |                        |      |                           |
-                      | Manages the execution  |----->| Parses the kami.yaml file |
-                      | of the job, by getting |      | and create a JobManager   |
-                      | the  execution flow    |      +---------------------------+
-                      | fron the JobManager    |                    |
-                      | and comunicating with  |      +---------------------------+
-                      | Mesos                  |      | JobManager                |                      
-                      +------------------------+      |                           |
-                                 |                    | Manages the jobs workflow |
-                                 |                    | independently of mesos    |
-                      +------------------------+      +---------------------------+
-                      | ActionExecutor         |
-                      |                        |
-                      | Executes ActionRunners |
-                      | and manages state for  |
-                      | the executor           |
-                      +------------------------+
-
-                      
-
diff --git a/docs/make.bat b/docs/make.bat
deleted file mode 100644
index 5c4c38c..0000000
--- a/docs/make.bat
+++ /dev/null
@@ -1,36 +0,0 @@
-@ECHO OFF
-
-pushd %~dp0
-
-REM Command file for Sphinx documentation
-
-if "%SPHINXBUILD%" == "" (
-	set SPHINXBUILD=sphinx-build
-)
-set SOURCEDIR=.
-set BUILDDIR=_build
-set SPHINXPROJ=ApacheAmaterasuincubating
-
-if "%1" == "" goto help
-
-%SPHINXBUILD% >NUL 2>NUL
-if errorlevel 9009 (
-	echo.
-	echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
-	echo.installed, then set the SPHINXBUILD environment variable to point
-	echo.to the full path of the 'sphinx-build' executable. Alternatively you
-	echo.may add the Sphinx directory to PATH.
-	echo.
-	echo.If you don't have Sphinx installed, grab it from
-	echo.http://sphinx-doc.org/
-	exit /b 1
-)
-
-%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
-goto end
-
-:help
-%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS%
-
-:end
-popd
diff --git a/docs/mkdocs.yml b/docs/mkdocs.yml
new file mode 100644
index 0000000..0036a81
--- /dev/null
+++ b/docs/mkdocs.yml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+site_name: Apache Amaterasu Documentation
+nav:
+    - Home: index.md
+    - Defining Deployments: deployments.md
+    - Configuration Managment: config.md
+    - Frameworks: frameworks.md
+theme: readthedocs
\ No newline at end of file
diff --git a/executor/build.gradle b/executor/build.gradle
index 2cdc35a..443fc38 100644
--- a/executor/build.gradle
+++ b/executor/build.gradle
@@ -58,18 +58,15 @@
     compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
     compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
     compile group: 'org.reflections', name: 'reflections', version: '0.9.10'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.5'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.5'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.5'
-    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.5'
-    compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.9.8'
 
     compile('com.jcabi:jcabi-aether:0.10.1') {
         exclude group: 'org.jboss.netty'
     }
-    compile('org.apache.activemq:activemq-client:5.15.2') {
-        exclude group: 'org.jboss.netty'
-    }
+
 
     compile project(':common')
     compile project(':amaterasu-sdk')
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala
deleted file mode 100644
index 361966a..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ActiveNotifier.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.common.executors
-
-import javax.jms.{DeliveryMode, MessageProducer, Session}
-import net.liftweb.json._
-import net.liftweb.json.Serialization.write
-import org.apache.activemq.ActiveMQConnectionFactory
-import org.apache.amaterasu.common.execution.actions.enums.{NotificationLevel, NotificationType}
-import org.apache.amaterasu.common.execution.actions.{Notification, Notifier}
-import org.apache.amaterasu.common.logging.Logging
-
-class ActiveNotifier extends Notifier {
-
-  var producer: MessageProducer = _
-  var session: Session = _
-
-  implicit val formats = DefaultFormats
-
-  override def info(message: String): Unit = {
-
-    getLog.info(message)
-
-    val notification = new Notification("", message, NotificationType.Info, NotificationLevel.Execution)
-    val notificationJson = write(notification)
-    val msg = session.createTextMessage(notificationJson)
-    producer.send(msg)
-
-  }
-
-  override def success(line: String): Unit = {
-
-    getLog.info(s"successfully executed line: $line")
-
-    val notification = new Notification(line, "", NotificationType.Success, NotificationLevel.Code)
-    val notificationJson = write(notification)
-    val msg = session.createTextMessage(notificationJson)
-    producer.send(msg)
-
-  }
-
-  override def error(line: String, message: String): Unit = {
-
-    getLog.error(s"Error executing line: $line message: $message")
-
-    val notification = new Notification(line, message, NotificationType.Error, NotificationLevel.Code)
-    val notificationJson = write(notification)
-    val msg = session.createTextMessage(notificationJson)
-    producer.send(msg)
-
-  }
-}
-
-object ActiveNotifier extends Logging {
-  def apply(address: String): ActiveNotifier = {
-
-    // setting up activeMQ connection
-    val connectionFactory = new ActiveMQConnectionFactory(address)
-    val connection = connectionFactory.createConnection()
-    connection.start()
-    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
-    val destination = session.createTopic("JOB.REPORT")
-    val producer = session.createProducer(destination)
-    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
-
-    // creating notifier
-    val notifier = new ActiveNotifier
-    notifier.session = session
-    notifier.producer = producer
-
-    notifier
-  }
-}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
index d7211a2..5325b82 100755
--- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
@@ -103,11 +103,11 @@
         .setTaskId(taskInfo.getTaskId)
         .setState(TaskState.TASK_RUNNING).build()
       driver.sendStatusUpdate(status)
-      val runner = providersFactory.getRunner(taskData.groupId, taskData.typeId)
+      val runner = providersFactory.getRunner(taskData.getGroupId, taskData.getTypeId)
       runner match {
-        case Some(r) => r.executeSource(taskData.src, actionName, taskData.exports.asJava)
+        case Some(r) => r.executeSource(taskData.getSrc, actionName, taskData.getExports)
         case None =>
-          notifier.error("", s"Runner not found for group: ${taskData.groupId}, type ${taskData.typeId}. Please verify the tasks")
+          notifier.error("", s"Runner not found for group: ${taskData.getGroupId}, type ${taskData.getTypeId}. Please verify the tasks")
           None
       }
 
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
index 282de68..cda6351 100644
--- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
+++ b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
@@ -23,7 +23,8 @@
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
 import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.executor.common.executors.{ActiveNotifier, ProvidersFactory}
+import org.apache.amaterasu.common.utils.ActiveNotifier
+import org.apache.amaterasu.executor.common.executors.ProvidersFactory
 
 import scala.collection.JavaConverters._
 
@@ -38,11 +39,11 @@
   var providersFactory: ProvidersFactory = _
 
   def execute(): Unit = {
-    val runner = providersFactory.getRunner(taskData.groupId, taskData.typeId)
+    val runner = providersFactory.getRunner(taskData.getGroupId, taskData.getTypeId)
     runner match {
       case Some(r) => {
         try {
-          r.executeSource(taskData.src, actionName, taskData.exports.asJava)
+          r.executeSource(taskData.getSrc, actionName, taskData.getExports)
           log.info("Completed action")
           System.exit(0)
         } catch {
@@ -53,19 +54,17 @@
         }
       }
       case None =>
-        log.error("", s"Runner not found for group: ${taskData.groupId}, type ${taskData.typeId}. Please verify the tasks")
+        log.error("", s"Runner not found for group: ${taskData.getGroupId}, type ${taskData.getTypeId}. Please verify the tasks")
         System.exit(101)
     }
   }
 }
 
-// launched with args:
-//s"'${jobManager.jobId}' '${config.master}' '${actionData.name}' '${URLEncoder.encode(gson.toJson(taskData), "UTF-8")}' '${URLEncoder.encode(gson.toJson(execData), "UTF-8")}' '${actionData.id}-${container.getId.getContainerId}'"
 object ActionsExecutorLauncher extends Logging with App {
 
   val hostName = InetAddress.getLocalHost.getHostName
 
-  log.info(s"Hostname resolved to: $hostName")
+  println(s"Hostname resolved to: $hostName")
   val mapper = new ObjectMapper()
   mapper.registerModule(DefaultScalaModule)
 
@@ -90,9 +89,8 @@
 
   log.info("Setup executor")
   val baos = new ByteArrayOutputStream()
-  val notifier = ActiveNotifier(notificationsAddress)
+  val notifier = new ActiveNotifier(notificationsAddress)
 
-  log.info("Setup notifier")
   actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, notifier, taskIdAndContainerId, hostName, propFile = "./amaterasu.properties")
   actionsExecutor.execute()
 }
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala
deleted file mode 100644
index 6a5f117..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/YarnNotifier.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.yarn.executors
-
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.ipc.YarnRPC
-
-class YarnNotifier(conf: YarnConfiguration) extends Notifier {
-
-  var rpc: YarnRPC = YarnRPC.create(conf)
-
-  override def info(msg: String): Unit = {
-    getLog.info(s"""-> ${msg}""")
-  }
-
-  override def success(line: String): Unit = {
-    getLog.info(s"""SUCCESS: ${line}""")
-  }
-
-  override def error(line: String, msg: String): Unit = {
-    getLog.error(s"""ERROR: ${line}: ${msg}""")
-  }
-}
diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt
index 65b89d7..c87c0ef 100644
--- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt
+++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/BasicPythonRunnerProvider.kt
@@ -19,14 +19,16 @@
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 
-open class BasicPythonRunnerProvider(env: String?, conf: ClusterConfig?): PythonRunnerProviderBase(env, conf) {
+ class BasicPythonRunnerProvider(env: String, conf: ClusterConfig) : PythonRunnerProviderBase(env, conf) {
     override val runnerResources: Array<String>
         get() {
             var resources = super.runnerResources
-            resources += "amaterasu_python-${conf!!.version()}.zip"
+            resources += "amaterasu_python-${conf.version()}.zip"
             return resources
         }
 
+    override fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> = arrayOf()
+
     override fun getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String {
         return super.getCommand(jobId, actionData, env, executorId, callbackAddress) + " && python3 ${actionData.src}"
     }
diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PandasRunnerProvider.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PandasRunnerProvider.kt
index f92204a..631235f 100644
--- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PandasRunnerProvider.kt
+++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PandasRunnerProvider.kt
@@ -3,11 +3,13 @@
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 
-open class PandasRunnerProvider(env: String?, conf: ClusterConfig?): PythonRunnerProviderBase(env, conf) {
+ class PandasRunnerProvider(env: String, conf: ClusterConfig): PythonRunnerProviderBase(env, conf) {
+    override fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> = arrayOf()
+
     override val runnerResources: Array<String>
         get() {
             var resources = super.runnerResources
-            resources += "amaterasu_pandas-${conf!!.version()}.zip"
+            resources += "amaterasu_pandas-${conf.version()}.zip"
             return resources
         }
 
diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
index 91bbaa4..ef8eefe 100644
--- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
+++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/runners/providers/PythonRunnerProviderBase.kt
@@ -18,25 +18,24 @@
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.common.dataobjects.ExecData
 import org.apache.amaterasu.leader.common.utilities.DataLoader
 import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
 import java.io.File
 
-abstract class PythonRunnerProviderBase(env: String?, val conf:ClusterConfig?) : RunnerSetupProvider() {
-
+abstract class PythonRunnerProviderBase(val env: String, val conf: ClusterConfig) : RunnerSetupProvider() {
 
     private val requirementsFileName: String = "ama-requirements.txt"
     private val requirementsPath: String = "dist/$requirementsFileName"
     private val mandatoryPYPIPackages: Array<String> = arrayOf("requests")
 
     override val runnerResources: Array<String>
-    get() = arrayOf("amaterasu-sdk-${conf!!.version()}.zip")
+        get() = arrayOf("amaterasu-sdk-${conf.version()}.zip")
 
     override fun getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String {
         var cmd = "python3 -m pip install --upgrade --force-reinstall -r $requirementsFileName"
-        execData.pyDeps()?.filePaths()?.forEach {
-            path -> cmd += " && python3 -m pip install -r ${path.split('/').last()}"
+        val execData = DataLoader.getExecutorData(env, conf)
+        execData.pyDeps?.filePaths?.forEach { path ->
+            cmd += " && python3 -m pip install -r ${path.split('/').last()}"
         }
         return cmd
     }
@@ -50,7 +49,8 @@
             reqFile.appendText("$resource\n")
         }
         return try {
-            val userRequirements = execData.pyDeps()?.filePaths()
+            val execData = DataLoader.getExecutorData(env, conf)
+            val userRequirements = execData.pyDeps?.filePaths
             arrayOf(requirementsFileName) + userRequirements!!
         } catch (e: NullPointerException) {
             arrayOf(requirementsFileName)
@@ -61,5 +61,5 @@
     override val hasExecutor: Boolean
         get() = false
 
-    private val execData: ExecData = DataLoader.getExecutorData(env, conf)
+
 }
\ No newline at end of file
diff --git a/frameworks/python/dispatcher/src/test/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/BasicPythonRunnerProviderTests.kt b/frameworks/python/dispatcher/src/test/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/BasicPythonRunnerProviderTests.kt
index 8ec2377..08a1a6c 100644
--- a/frameworks/python/dispatcher/src/test/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/BasicPythonRunnerProviderTests.kt
+++ b/frameworks/python/dispatcher/src/test/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/BasicPythonRunnerProviderTests.kt
@@ -1,88 +1,88 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.frameworks.python.dispatcher
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.BasicPythonRunnerProvider
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import kotlin.test.assertEquals
-import kotlin.test.assertNotEquals
-import kotlin.test.assertNotNull
-import java.io.File
-
-
-class BasicPythonRunnerProviderTests: Spek({
-
-    given("A python runner provider") {
-        val resourceRepoUri = this.javaClass.getResource("/test/repo")
-        val resourceRepo = File(resourceRepoUri.file)
-        val testRepo = File("repo")
-        testRepo.deleteRecursively()
-        if (File("requirements.txt").exists())
-            File("requirements.txt").delete()
-        resourceRepo.copyRecursively(testRepo)
-        val runner = BasicPythonRunnerProvider("test", ClusterConfig())
-        on("Asking to run a simple python script with dummy actionData") {
-            val command = runner.getCommand("AAAA",
-                    ActionData(ActionStatus.Pending,
-                            "AAA",
-                            "AAA",
-                            "AAA",
-                            "AAA",
-                            "AAA",
-                            "",
-                            emptyMap()),
-                    "",
-                    "",
-                    "")
-            it("should yield a command") {
-                assertNotNull(command)
-            }
-            it("should yield a non empty command") {
-                assertNotEquals("", command)
-            }
-
-        }
-        on("asking to run a simple python script with dependencies") {
-            val actionData = ActionData(
-                    ActionStatus.Queued,
-                    "simple",
-                    "simple.py",
-                    "python",
-                    "python",
-                    "Test",
-                    "",
-                    emptyMap())
-            val command = runner.getCommand("Test", actionData, "", "", "")
-            runner.getActionDependencies("Test", actionData)
-            it("Should yield command that runs simple.py") {
-                assertEquals("pip install -r ama-requirements.txt && pip install -r requirements.txt && python simple.py", command)
-            }
-            it("Should create a requirements file with all the dependencies in it") {
-                val requirements = File("ama-requirements.txt").readLines().toTypedArray()
-                assertEquals(arrayOf("./python_sdk.zip").joinToString(","), requirements.joinToString(","))
-            }
-        }
-    }
-    afterGroup {
-        File("ama-requirements.txt").delete()
-    }
-})
\ No newline at end of file
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements.  See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License.  You may obtain a copy of the License at
+// *
+// *      http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//package org.apache.amaterasu.frameworks.python.dispatcher
+//import org.apache.amaterasu.common.configuration.ClusterConfig
+//import org.apache.amaterasu.common.configuration.enums.ActionStatus
+//import org.apache.amaterasu.common.dataobjects.ActionData
+//import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.BasicPythonRunnerProvider
+//import org.jetbrains.spek.api.Spek
+//import org.jetbrains.spek.api.dsl.given
+//import org.jetbrains.spek.api.dsl.it
+//import org.jetbrains.spek.api.dsl.on
+//import kotlin.test.assertEquals
+//import kotlin.test.assertNotEquals
+//import kotlin.test.assertNotNull
+//import java.io.File
+//
+//
+//class BasicPythonRunnerProviderTests: Spek({
+//
+//    given("A python runner provider") {
+//        val resourceRepoUri = this.javaClass.getResource("/test/repo")
+//        val resourceRepo = File(resourceRepoUri.file)
+//        val testRepo = File("repo")
+//        testRepo.deleteRecursively()
+//        if (File("requirements.txt").exists())
+//            File("requirements.txt").delete()
+//        resourceRepo.copyRecursively(testRepo)
+//        val runner = BasicPythonRunnerProvider("test", ClusterConfig())
+//        on("Asking to run a simple python script with dummy actionData") {
+//            val command = runner.getCommand("AAAA",
+//                    ActionData(ActionStatus.Pending,
+//                            "AAA",
+//                            "AAA",
+//                            "AAA",
+//                            "AAA",
+//                            "AAA",
+//                            "",
+//                            emptyMap()),
+//                    "",
+//                    "",
+//                    "")
+//            it("should yield a command") {
+//                assertNotNull(command)
+//            }
+//            it("should yield a non empty command") {
+//                assertNotEquals("", command)
+//            }
+//
+//        }
+//        on("asking to run a simple python script with dependencies") {
+//            val actionData = ActionData(
+//                    ActionStatus.Queued,
+//                    "simple",
+//                    "simple.py",
+//                    "python",
+//                    "python",
+//                    "Test",
+//                    "",
+//                    emptyMap())
+//            val command = runner.getCommand("Test", actionData, "", "", "")
+//            runner.getActionDependencies("Test", actionData)
+//            it("Should yield command that runs simple.py") {
+//                assertEquals("pip install -r ama-requirements.txt && pip install -r requirements.txt && python simple.py", command)
+//            }
+//            it("Should create a requirements file with all the dependencies in it") {
+//                val requirements = File("ama-requirements.txt").readLines().toTypedArray()
+//                assertEquals(arrayOf("./python_sdk.zip").joinToString(","), requirements.joinToString(","))
+//            }
+//        }
+//    }
+//    afterGroup {
+//        File("ama-requirements.txt").delete()
+//    }
+//})
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
index 52ad9bc..cd16b33 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
@@ -24,6 +24,7 @@
 import org.apache.amaterasu.leader.common.utilities.{DataLoader, MemoryFormatParser}
 import org.apache.amaterasu.sdk.frameworks.configuration.DriverConfiguration
 import org.apache.amaterasu.sdk.frameworks.{FrameworkSetupProvider, RunnerSetupProvider}
+import org.apache.commons.lang.StringUtils
 
 import scala.collection.mutable
 import collection.JavaConversions._
@@ -39,11 +40,11 @@
   private def loadSparkConfig: mutable.Map[String, Any] = {
 
     val execData = DataLoader.getExecutorData(env, conf)
-    val sparkExecConfiguration = execData.configurations.get("spark")
+    val sparkExecConfiguration = execData.getConfigurations.get("spark")
     if (sparkExecConfiguration.isEmpty) {
       throw new Exception(s"Spark configuration files could not be loaded for the environment $env")
     }
-    collection.mutable.Map(sparkExecConfiguration.get.toSeq: _*)
+    collection.mutable.Map(sparkExecConfiguration.toSeq: _*)
 
   }
 
@@ -51,6 +52,7 @@
     this.env = env
     this.conf = conf
 
+//    this.sparkExecConfigurations = loadSparkConfig
     runnerProviders += ("scala" -> SparkScalaRunnerProvider(conf))
     runnerProviders += ("jar" -> SparkSubmitScalaRunnerProvider(conf))
     runnerProviders += ("pyspark" -> PySparkRunnerProvider(env, conf))
@@ -60,15 +62,15 @@
   override def getGroupIdentifier: String = "spark"
 
   override def getGroupResources: Array[File] = conf.mode match {
-      case "mesos" => Array[File](new File(s"spark-${conf.webserver.sparkVersion}.tgz"), new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"))
-      case "yarn" => new File(conf.spark.home).listFiles
-      case _ => Array[File]()
-    }
+    case "mesos" => Array[File](new File(s"spark-${conf.webserver.sparkVersion}.tgz"), new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"))
+    case "yarn" => Array[File](new File(s"spark-runner-${conf.version}-all.jar"), new File(s"spark-runtime-${conf.version}.jar"), new File(s"executor-${conf.version}-all.jar"), new File(conf.spark.home))
+    case _ => Array[File]()
+  }
 
 
   override def getEnvironmentVariables: util.Map[String, String] = conf.mode match {
-    case "mesos" => Map[String, String]("SPARK_HOME" ->s"spark-${conf.webserver.sparkVersion}","SPARK_HOME_DOCKER" -> "/opt/spark/")
-    case "yarn" => Map[String, String]("SPARK_HOME" -> "spark")
+    case "mesos" => Map[String, String]("SPARK_HOME" -> s"spark-${conf.webserver.sparkVersion}", "SPARK_HOME_DOCKER" -> "/opt/spark/")
+    case "yarn" => Map[String, String]("SPARK_HOME" -> StringUtils.stripStart(conf.spark.home, "/"))
     case _ => Map[String, String]()
   }
 
@@ -82,8 +84,8 @@
       cpu = conf.spark.opts("yarn.am.cores").toInt
     } else if (conf.spark.opts.contains("driver.cores")) {
       cpu = conf.spark.opts("driver.cores").toInt
-    } else if (conf.YARNConf.Worker.cores > 0) {
-      cpu = conf.YARNConf.Worker.cores
+    } else if (conf.yarn.Worker.cores > 0) {
+      cpu = conf.yarn.Worker.cores
     } else {
       cpu = 1
     }
@@ -96,8 +98,8 @@
       mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("yarn.am.memory"))
     } else if (conf.spark.opts.contains("driver.memory")) {
       mem = MemoryFormatParser.extractMegabytes(conf.spark.opts("driver.memory"))
-    } else if (conf.YARNConf.Worker.memoryMB > 0) {
-      mem = conf.YARNConf.Worker.memoryMB
+    } else if (conf.yarn.Worker.memoryMB > 0) {
+      mem = conf.yarn.Worker.memoryMB
     } else if (conf.taskMem > 0) {
       mem = conf.taskMem
     } else {
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
index 50a2d57..17382ad 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
@@ -1,4 +1,5 @@
 package org.apache.amaterasu.frameworks.spark.dispatcher.runners.providers
+
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.frameworks.python.dispatcher.runners.providers.PythonRunnerProviderBase
@@ -10,17 +11,10 @@
     var command = super.getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String)
     log.info(s"===> Cluster manager: ${conf.mode}")
     val pythonBinPath = Seq("python3", "-c", "import sys; print(sys.executable)").!!.trim()
-    conf.mode match {
-      case "mesos" =>
-          command + s" && env PYSPARK_PYTHON=$pythonBinPath && env AMA_NODE=${sys.env("AMA_NODE")} && env MESOS_NATIVE_JAVA_LIBRARY=${conf.mesos.libPath}" +
-          s" && python3 ${actionData.getSrc}"
-      case "yarn" =>
-          command + s" && /bin/bash spark/bin/load-spark-env.sh" +
-                     s" && python3 ${actionData.getSrc}"
-      case _ =>
-          log.warn(s"Received unsupported cluster manager: ${conf.mode}")
-          command
-    }
+
+    command + s" && /bin/bash $$SPARK_HOME/bin/load-spark-env.sh && env PYSPARK_PYTHON=$pythonBinPath " +
+      s" && $$SPARK_HOME/bin/spark-submit ${actionData.getSrc}"
+
   }
 
   override def getRunnerResources: Array[String] = {
@@ -32,6 +26,8 @@
 
 
   override def getHasExecutor: Boolean = false
+
+  override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]()
 }
 
 object PySparkRunnerProvider {
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
index 1a64eca..6cd63f5 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
@@ -22,6 +22,7 @@
 import org.apache.amaterasu.common.dataobjects.ActionData
 import org.apache.amaterasu.leader.common.utilities.DataLoader
 import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+import org.apache.commons.lang.StringUtils
 import org.apache.hadoop.yarn.api.ApplicationConstants
 
 class SparkScalaRunnerProvider extends RunnerSetupProvider {
@@ -35,11 +36,12 @@
       s"java -cp executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:spark-${conf.webserver.sparkVersion}/jars/* " +
       s"-Dscala.usejavacp=true -Djava.library.path=$libPath " +
       s"org.apache.amaterasu.executor.mesos.executors.MesosActionsExecutor $jobId ${conf.master} ${actionData.getName}".stripMargin
-    case "yarn" => s"/bin/bash spark/bin/load-spark-env.sh && " +
-      s"java -cp spark/jars/*:executor.jar:spark-runner.jar:spark-runtime.jar:spark/conf/:${conf.YARNConf.hadoopHomeDir}/conf/ " +
+    case "yarn" =>
+      s"/bin/bash ${StringUtils.stripStart(conf.spark.home,"/")}/conf/spark-env.sh && " +
+      s"java -cp ${StringUtils.stripStart(conf.spark.home,"/")}/jars/*:executor-${conf.version}-all.jar:spark-runner-${conf.version}-all.jar:spark-runtime-${conf.version}.jar:${StringUtils.stripStart(conf.spark.home,"/")}/conf/:${conf.yarn.hadoopHomeDir}/conf/ " +
       "-Xmx2G " +
       "-Dscala.usejavacp=true " +
-      "-Dhdp.version=2.6.1.0-129 " +
+      "-Dhdp.version=2.6.5.0-292 " +
       "org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher " +
       s"'$jobId' '${conf.master}' '${actionData.getName}' '${URLEncoder.encode(DataLoader.getTaskDataString(actionData, env), "UTF-8")}' '${URLEncoder.encode(DataLoader.getExecutorDataString(env, conf), "UTF-8")}' '$executorId' '$callbackAddress' " +
       s"1> ${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout " +
@@ -55,6 +57,7 @@
 
   override def getHasExecutor: Boolean = true
 
+  override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = Array[String]()
 }
 
 object SparkScalaRunnerProvider {
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
index fb39881..e46cc1d 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
@@ -13,13 +13,15 @@
 
   private var conf: ClusterConfig = _
   val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
-  val amaDist = new File (s"${new File(jarFile.getParent).getParent}/dist")
+  val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist")
+  val amalocation = new File(s"${new File(jarFile.getParent).getParent}")
 
   override def getCommand(jobId: String, actionData: ActionData, env: String, executorId: String, callbackAddress: String): String = {
 
     val util = new ArtifactUtil(List(actionData.repo).asJava, jobId)
-    val classParam = if (actionData.getHasArtifact)  s" --class ${actionData.entryClass}" else ""
-    s"spark-${conf.webserver.sparkVersion}/bin/spark-submit $classParam ${util.getLocalArtifacts(actionData.getArtifact).get(0).getName} --deploy-mode client --jars spark-runtime-${conf.version}.jar >&1"
+    val classParam = if (actionData.getHasArtifact) s" --class ${actionData.entryClass}" else ""
+    s"$$SPARK_HOME/bin/spark-submit $classParam ${util.getLocalArtifacts(actionData.getArtifact).get(0).getName} --deploy-mode client --jars spark-runtime-${conf.version}.jar >&1"
+
   }
 
   override def getRunnerResources: Array[String] =
@@ -31,7 +33,10 @@
 
   override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] = {
     val util = new ArtifactUtil(List(actionData.repo).asJava, jobId)
-    util.getLocalArtifacts(actionData.getArtifact).toArray().map(x => amaDist.toPath.relativize(x.asInstanceOf[File].toPath).toString)
+    conf.mode match {
+      case "mesos" => util.getLocalArtifacts(actionData.getArtifact).toArray().map(x => amaDist.toPath.relativize(x.asInstanceOf[File].toPath).toString)
+      case "yarn" => util.getLocalArtifacts(actionData.getArtifact).toArray().map(x => x.asInstanceOf[File].getPath)
+    }
   }
 
   override def getHasExecutor: Boolean = false
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
index f331cd5..06dfa0d 100644
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/SparkRunnersProvider.scala
@@ -22,9 +22,8 @@
 import org.apache.amaterasu.common.configuration.ClusterConfig
 import org.apache.amaterasu.common.dataobjects.ExecData
 import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies}
+import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies, PythonPackage}
 import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.frameworks.spark.runner.repl.{SparkRunnerHelper, SparkScalaRunner}
 import org.apache.amaterasu.frameworks.spark.runner.sparksql.SparkSqlRunner
 import org.apache.amaterasu.frameworks.spark.runner.pyspark.PySparkRunner
 import org.apache.amaterasu.frameworks.spark.runner.repl.{SparkRunnerHelper, SparkScalaRunner}
@@ -46,8 +45,8 @@
     (e: String) => log.error(e)
 
   )
-  private var conf: Option[Map[String, Any]] = _
-  private var executorEnv: Option[Map[String, Any]] = _
+  private var conf: Option[Map[String, AnyRef]] = None
+  private var executorEnv: Option[Map[String, AnyRef]] = None
   private var clusterConfig: ClusterConfig = _
 
   override def init(execData: ExecData,
@@ -65,19 +64,29 @@
     clusterConfig = config
     var jars = Seq.empty[String]
 
-    if (execData.deps != null) {
-      jars ++= getDependencies(execData.deps)
+    if (execData.getDeps != null) {
+      jars ++= getDependencies(execData.getDeps)
     }
 
-    conf = execData.configurations.get("spark")
-    executorEnv = execData.configurations.get("spark_exec_env")
+    if (execData.getPyDeps != null &&
+      execData.getPyDeps.getFilePaths.nonEmpty) {
+      loadPythonDependencies(execData.getPyDeps, notifier)
+    }
+
+    if(execData.getConfigurations.containsKey("spark")){
+      conf = Some(execData.getConfigurations.get("spark").toMap)
+    }
+
+    if (execData.getConfigurations.containsKey("spark_exec_env")) {
+      executorEnv = Some( execData.getConfigurations.get("spark_exec_env").toMap)
+    }
     val sparkAppName = s"job_${jobId}_executor_$executorId"
 
     SparkRunnerHelper.notifier = notifier
-    val spark = SparkRunnerHelper.createSpark(execData.env, sparkAppName, jars, conf, executorEnv, config, hostName)
+    val spark = SparkRunnerHelper.createSpark(execData.getEnv, sparkAppName, jars, conf, executorEnv, config, hostName)
 
-    lazy val sparkScalaRunner = SparkScalaRunner(execData.env, jobId, spark, outStream, notifier, jars)
-    sparkScalaRunner.initializeAmaContext(execData.env)
+    lazy val sparkScalaRunner = SparkScalaRunner(execData.getEnv, jobId, spark, outStream, notifier, jars)
+    sparkScalaRunner.initializeAmaContext(execData.getEnv)
 
     runners.put(sparkScalaRunner.getIdentifier, sparkScalaRunner)
     var pypath = ""
@@ -88,13 +97,22 @@
       case "mesos" =>
         pypath = s"${new File(".").getAbsolutePath}/miniconda/pkgs:${new File(".").getAbsolutePath}"
     }
-    lazy val pySparkRunner = PySparkRunner(execData.env, jobId, notifier, spark, pypath, execData.pyDeps, config)
+    lazy val pySparkRunner = PySparkRunner(execData.getEnv, jobId, notifier, spark, pypath, execData.getPyDeps, config)
     runners.put(pySparkRunner.getIdentifier, pySparkRunner)
 
-    lazy val sparkSqlRunner = SparkSqlRunner(execData.env, jobId, notifier, spark)
+    lazy val sparkSqlRunner = SparkSqlRunner(execData.getEnv, jobId, notifier, spark)
     runners.put(sparkSqlRunner.getIdentifier, sparkSqlRunner)
   }
 
+  private def installAnacondaPackage(pythonPackage: PythonPackage): Unit = {
+    val channel = pythonPackage.getChannel
+    if (channel == "anaconda") {
+      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y ${pythonPackage.getPackageId}") ! shellLoger
+    } else {
+      Seq("bash", "-c", s"export HOME=$$PWD && ./miniconda/bin/python -m conda install -y -c $channel ${pythonPackage.getPackageId}") ! shellLoger
+    }
+  }
+
   private def installAnacondaOnNode(): Unit = {
     // TODO: get rid of hard-coded version
 
@@ -107,6 +125,33 @@
     Seq("bash", "-c", "ln -s spark/python/pyspark miniconda/pkgs/pyspark") ! shellLoger
   }
 
+  private def loadPythonDependencies(deps: PythonDependencies, notifier: Notifier): Unit = {
+    notifier.info("loading anaconda evn")
+    installAnacondaOnNode()
+    val codegenPackage = new PythonPackage("codegen", "", "auto")
+    installAnacondaPackage(codegenPackage)
+//    try {
+//      // notifier.info("loadPythonDependencies #5")
+//      deps.getFilePaths.foreach(pack => {
+//        pack.toLowerCase match {
+//          case "anaconda" => installAnacondaPackage(pack)
+//          // case "pypi" => installPyPiPackage(pack)
+//        }
+//      })
+//    }
+//    catch {
+//
+//      case rte: RuntimeException =>
+//        val sw = new StringWriter
+//        rte.printStackTrace(new PrintWriter(sw))
+//        notifier.error("", s"Failed to activate environment (runtime) - cause: ${rte.getCause}, message: ${rte.getMessage}, Stack: \n${sw.toString}")
+//      case e: Exception =>
+//        val sw = new StringWriter
+//        e.printStackTrace(new PrintWriter(sw))
+//        notifier.error("", s"Failed to activate environment (other) - type: ${e.getClass.getName}, cause: ${e.getCause}, message: ${e.getMessage}, Stack: \n${sw.toString}")
+//    }
+  }
+
   override def getGroupIdentifier: String = "spark"
 
   override def getRunner(id: String): AmaterasuRunner = runners(id)
@@ -116,18 +161,18 @@
     // adding a local repo because Aether needs one
     val repo = new File(System.getProperty("java.io.tmpdir"), "ama-repo")
 
-    val remotes = deps.repos.map(r =>
+    val remotes = deps.getRepos.map(r =>
       new RemoteRepository(
-        r.id,
-        r.`type`,
-        r.url
+        r.getId,
+        r.getType,
+        r.getUrl
       )).toList.asJava
 
     val aether = new Aether(remotes, repo)
 
-    deps.artifacts.flatMap(a => {
+    deps.getArtifacts.flatMap(a => {
       aether.resolve(
-        new DefaultArtifact(a.groupId, a.artifactId, "", "jar", a.version),
+        new DefaultArtifact(a.getGroupId, a.getArtifactId, "", "jar", a.getVersion),
         JavaScopes.RUNTIME
       ).map(a => a)
     }).map(x => x.getFile.getAbsolutePath)
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
index 29e1832..24dfc8f 100644
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/pyspark/PySparkRunner.scala
@@ -25,6 +25,7 @@
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.common.runtime.Environment
 import org.apache.amaterasu.sdk.AmaterasuRunner
+import org.apache.commons.lang.StringUtils
 import org.apache.spark.SparkEnv
 import org.apache.spark.sql.SparkSession
 
@@ -101,8 +102,8 @@
     PySparkEntryPoint.start(spark, jobId, env, SparkEnv.get)
     val port = PySparkEntryPoint.getPort
     var intpPath = ""
-    if (env.configuration.contains("cwd")) {
-      val cwd = new File(env.configuration("cwd"))
+    if (env.getConfiguration.containsKey("cwd")) {
+      val cwd = new File(env.getConfiguration.get("cwd").toString)
       intpPath = s"${cwd.getAbsolutePath}/spark_intp.py" // This is to support test environment
     } else {
       intpPath = s"spark_intp.py"
@@ -114,7 +115,7 @@
     var sparkCmd: Seq[String] = Seq()
     config.mode match {
       case "yarn" =>
-        pysparkPath = s"spark/bin/spark-submit"
+        pysparkPath = s"${StringUtils.stripStart(config.spark.home,"/")}/bin/spark-submit"
         sparkCmd = Seq(pysparkPath, "--py-files", condaPkgs, "--master", "yarn", intpPath, port.toString)
         val proc = Process(sparkCmd, None,
           "PYTHONPATH" -> pypath,
@@ -130,6 +131,7 @@
           sparkCmd = Seq(pysparkPath, intpPath, port.toString)
     }
         var pysparkPython = "/usr/bin/python"
+
         val proc = Process(sparkCmd, None,
       "PYTHONPATH" -> pypath,
       "PYSPARK_PYTHON" -> pysparkPython,
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala
index 8168941..65342eb 100644
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkRunnerHelper.scala
@@ -24,6 +24,7 @@
 import org.apache.amaterasu.common.logging.Logging
 import org.apache.amaterasu.common.runtime.Environment
 import org.apache.amaterasu.common.utils.FileUtils
+import org.apache.commons.lang.StringUtils
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.SparkSession
 
@@ -128,10 +129,10 @@
       .set("spark.submit.pyFiles", pyfiles.mkString(","))
 
 
-    val master: String = if (env.master.isEmpty) {
+    val master: String = if (env.getMaster.isEmpty) {
       "yarn"
     } else {
-      env.master
+      env.getMaster
     }
 
     config.mode match {
@@ -139,13 +140,13 @@
       case "mesos" =>
         conf.set("spark.executor.uri", s"http://$getNode:${config.webserver.Port}/spark-${config.webserver.sparkVersion}.tgz")
           .setJars(jars)
-          .set("spark.master", env.master)
+          .set("spark.master", env.getMaster)
           .set("spark.home", s"${scala.reflect.io.File(".").toCanonical.toString}/spark-${config.webserver.sparkVersion}")
 
       case "yarn" =>
-        conf.set("spark.home", config.spark.home)
+        conf.set("spark.home", StringUtils.stripStart(config.spark.home,"/"))
           // TODO: parameterize those
-          .setJars(Seq("executor.jar", "spark-runner.jar", "spark-runtime.jar") ++ jars)
+          .setJars(Seq(s"executor-${config.version}-all.jar", s"spark-runner-${config.version}-all.jar", s"spark-runtime-${config.version}.jar") ++ jars)
           .set("spark.history.kerberos.keytab", "/etc/security/keytabs/spark.headless.keytab")
           .set("spark.driver.extraLibraryPath", "/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64")
           .set("spark.yarn.queue", "default")
@@ -153,12 +154,12 @@
 
           .set("spark.master", master)
           .set("spark.executor.instances", config.spark.opts.getOrElse("executor.instances", "1"))
-          .set("spark.yarn.jars", s"spark/jars/*")
+          .set("spark.yarn.jars", s"${StringUtils.stripStart(config.spark.home,"/")}/jars/*")
           .set("spark.executor.memory", config.spark.opts.getOrElse("executor.memory", "1g"))
           .set("spark.dynamicAllocation.enabled", "false")
           .set("spark.eventLog.enabled", "false")
           .set("spark.history.fs.logDirectory", "hdfs:///spark2-history/")
-          .set("hadoop.home.dir", config.YARNConf.hadoopHomeDir)
+          .set("hadoop.home.dir", config.yarn.hadoopHomeDir)
 
       case _ => throw new Exception(s"mode ${config.mode} is not legal.")
     }
@@ -182,21 +183,22 @@
     }
 
     // setting the executor env from spark_exec.yml
-    executorEnv match {
-      case Some(env) => {
-        for (c <- env) {
-          if (c._2.isInstanceOf[String])
-            conf.setExecutorEnv(c._1, c._2.toString)
+    if (executorEnv != null) {
+      executorEnv match {
+        case Some(env) => {
+          for (c <- env) {
+            if (c._2.isInstanceOf[String])
+              conf.setExecutorEnv(c._1, c._2.toString)
+          }
         }
+        case None =>
       }
-      case None =>
     }
-
     conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath)
 
     sparkSession = SparkSession.builder
       .appName(sparkAppName)
-      .master(env.master)
+      .master(env.getMaster)
 
       //.enableHiveSupport()
       .config(conf).getOrCreate()
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala
index 4d8a9a6..7e089b5 100755
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunner.scala
@@ -95,7 +95,7 @@
                   result match {
                     case ds: Dataset[_] =>
                       log.debug(s"persisting DataFrame: $resultName")
-                      val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.workingDir}/$jobId/$actionName/$resultName")"""
+                      val writeLine = s"""$resultName.write.mode(SaveMode.Overwrite).format("$format").save("${env.getWorkingDir}/$jobId/$actionName/$resultName")"""
                       val writeResult = interpreter.interpret(writeLine)
                       if (writeResult != Results.Success) {
                         val err = outStream.toString
diff --git a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala
index f91107b..9230bca 100644
--- a/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala
+++ b/frameworks/spark/runner/src/main/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunner.scala
@@ -120,7 +120,7 @@
         val exportName = exportsBuff.head._1
         val exportFormat = exportsBuff.head._2
         //notifier.info(s"exporting to -> ${env.workingDir}/$jobId/$actionName/$exportName")
-        result.write.mode(SaveMode.Overwrite).format(exportFormat).save(s"${env.workingDir}/$jobId/$actionName/$exportName")
+        result.write.mode(SaveMode.Overwrite).format(exportFormat).save(s"${env.getWorkingDir}/$jobId/$actionName/$exportName")
       }
       notifier.info(s"================= finished action $actionName =================")
     }
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/SparkTestsSuite.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/SparkTestsSuite.scala
index 57c240b..d940b2f 100644
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/SparkTestsSuite.scala
+++ b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/SparkTestsSuite.scala
@@ -18,7 +18,7 @@
 
 import java.io.{ByteArrayOutputStream, File}
 
-import org.apache.amaterasu.common.dataobjects.ExecData
+import org.apache.amaterasu.common.dataobjects.{Artifact, ExecData, Repo}
 import org.apache.amaterasu.common.execution.dependencies._
 import org.apache.amaterasu.common.runtime.Environment
 import org.apache.amaterasu.executor.common.executors.ProvidersFactory
@@ -30,10 +30,10 @@
 import org.scalatest._
 
 import scala.collection.mutable.ListBuffer
-
+import scala.collection.JavaConverters._
 
 class SparkTestsSuite extends Suites(
-  new PySparkRunnerTests,
+  //new PySparkRunnerTests,
   new RunnersLoadingTests,
   new SparkSqlRunnerTests,
   new SparkScalaRunnerTests
@@ -54,26 +54,26 @@
     val resources = new File(getClass.getResource("/spark_intp.py").getPath).getParent
     val workDir = new File(resources).getParentFile.getParent
 
-    env = Environment()
-    env.workingDir = s"file://$workDir"
+    env = new Environment()
+    env.setWorkingDir(s"file://$workDir")
 
-    env.master = "local[1]"
-    if (env.configuration != null) env.configuration ++ "pysparkPath" -> "/usr/bin/python" else env.configuration = Map(
+    env.setMaster("local[1]")
+    if (env.getConfiguration != null) env.setConfiguration(Map("pysparkPath" -> "/usr/bin/python").asJava) else env.setConfiguration( Map(
       "pysparkPath" -> "/usr/bin/python",
       "cwd" -> resources
-    )
+    ).asJava)
 
     val excEnv = Map[String, Any](
       "PYTHONPATH" -> resources
     )
     createTestMiniconda()
-    env.configuration ++ "spark_exec_env" -> excEnv
-    factory = ProvidersFactory(ExecData(env,
-      Dependencies(ListBuffer.empty[Repo], List.empty[Artifact]),
-      PythonDependencies(Array.empty[String]),
+    env.setConfiguration(Map( "spark_exec_env" -> excEnv).asJava)
+    factory = ProvidersFactory(new ExecData(env,
+      new Dependencies(ListBuffer.empty[Repo].asJava, List.empty[Artifact].asJava),
+      new PythonDependencies(Array.empty[String]),
       Map(
-        "spark" -> Map.empty[String, Any],
-        "spark_exec_env" -> Map("PYTHONPATH" -> resources))),
+        "spark" -> Map.empty[String, Any].asJava,
+        "spark_exec_env" -> Map("PYTHONPATH" -> resources).asJava).asJava),
       "test",
       new ByteArrayOutputStream(),
       new TestNotifier(),
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunnerTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunnerTests.scala
index 90b0122..011c4d9 100755
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunnerTests.scala
+++ b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/repl/SparkScalaRunnerTests.scala
@@ -43,7 +43,7 @@
 
     val sparkRunner =factory.getRunner("spark", "scala").get.asInstanceOf[SparkScalaRunner]
     val script = getClass.getResource("/step-2.scala").getPath
-    sparkRunner.env.workingDir = s"${getClass.getResource("/tmp").getPath}"
+    sparkRunner.env.setWorkingDir(s"${getClass.getResource("/tmp").getPath}")
     AmaContext.init(sparkRunner.spark,"job",sparkRunner.env)
     val sourceCode = Source.fromFile(script).getLines().mkString("\n")
     sparkRunner.executeSource(sourceCode, "cont", Map.empty[String, String].asJava)
diff --git a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunnerTests.scala b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunnerTests.scala
index f189580..756bed9 100644
--- a/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunnerTests.scala
+++ b/frameworks/spark/runner/src/test/scala/org/apache/amaterasu/frameworks/spark/runner/sparksql/SparkSqlRunnerTests.scala
@@ -52,10 +52,10 @@
     //Prepare test dataset
     val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
 
-    inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.workingDir}/${sparkSql.jobId}/sparksqldefaultparquetjobaction/sparksqldefaultparquetjobactiontempdf")
+    inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.getWorkingDir}/${sparkSql.jobId}/sparksqldefaultparquetjobaction/sparksqldefaultparquetjobactiontempdf")
     sparkSql.executeSource("select * FROM AMACONTEXT_sparksqldefaultparquetjobaction_sparksqldefaultparquetjobactiontempdf where age=22", "sql_parquet_test", Map("result" -> "parquet").asJava)
 
-    val outputDf = spark.read.parquet(s"${env.workingDir}/${sparkSql.jobId}/sql_parquet_test/result")
+    val outputDf = spark.read.parquet(s"${env.getWorkingDir}/${sparkSql.jobId}/sql_parquet_test/result")
     println("Output Default Parquet: " + inputDf.count + "," + outputDf.first().getString(1))
     outputDf.first().getString(1) shouldEqual "Michael"
   }
@@ -71,10 +71,10 @@
 
     //Prepare test dataset
     val inputDf = spark.read.parquet(getClass.getResource("/SparkSql/parquet").getPath)
-    inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.workingDir}/${sparkSql.jobId}/sparksqlparquetjobaction/sparksqlparquetjobactiontempdf")
+    inputDf.write.mode(SaveMode.Overwrite).parquet(s"${env.getWorkingDir}/${sparkSql.jobId}/sparksqlparquetjobaction/sparksqlparquetjobactiontempdf")
     sparkSql.executeSource("select * FROM AMACONTEXT_sparksqlparquetjobaction_sparksqlparquetjobactiontempdf READAS parquet", "sql_parquet_test", Map("result2" -> "parquet").asJava)
 
-    val outputDf = spark.read.parquet(s"${env.workingDir}/${sparkSql.jobId}/sql_parquet_test/result2")
+    val outputDf = spark.read.parquet(s"${env.getWorkingDir}/${sparkSql.jobId}/sql_parquet_test/result2")
     println("Output Parquet: " + inputDf.count + "," + outputDf.count)
     inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
   }
@@ -92,10 +92,10 @@
     //Prepare test dataset
     val inputDf = spark.read.json(getClass.getResource("/SparkSql/json").getPath)
 
-    inputDf.write.mode(SaveMode.Overwrite).json(s"${env.workingDir}/${sparkSql.jobId}/sparksqljsonjobaction/sparksqljsonjobactiontempdf")
+    inputDf.write.mode(SaveMode.Overwrite).json(s"${env.getWorkingDir}/${sparkSql.jobId}/sparksqljsonjobaction/sparksqljsonjobactiontempdf")
     sparkSql.executeSource("select * FROM AMACONTEXT_sparksqljsonjobaction_sparksqljsonjobactiontempdf  where age='30' READAS json", "sql_json_test", Map("result" -> "json").asJava)
 
-    val outputDf = spark.read.json(s"${env.workingDir}/${sparkSql.jobId}/sql_json_test/result")
+    val outputDf = spark.read.json(s"${env.getWorkingDir}/${sparkSql.jobId}/sql_json_test/result")
     println("Output JSON: " + inputDf.count + "," + outputDf.count)
     outputDf.first().getString(1) shouldEqual "Kirupa"
 
@@ -112,11 +112,11 @@
 
     //Prepare test dataset
     val inputDf = spark.read.csv(getClass.getResource("/SparkSql/csv").getPath)
-    inputDf.write.mode(SaveMode.Overwrite).csv(s"${env.workingDir}/${sparkSql.jobId}/sparksqlcsvjobaction/sparksqlcsvjobactiontempdf")
+    inputDf.write.mode(SaveMode.Overwrite).csv(s"${env.getWorkingDir}/${sparkSql.jobId}/sparksqlcsvjobaction/sparksqlcsvjobactiontempdf")
     sparkSql.executeSource("select * FROM AMACONTEXT_sparksqlcsvjobaction_sparksqlcsvjobactiontempdf READAS csv", "sql_csv_test", Map("result" -> "csv").asJava)
 
 
-    val outputDf = spark.read.csv(s"${env.workingDir}/${sparkSql.jobId}/sql_csv_test/result")
+    val outputDf = spark.read.csv(s"${env.getWorkingDir}/${sparkSql.jobId}/sql_csv_test/result")
     println("Output CSV: " + inputDf.count + "," + outputDf.count)
     inputDf.first().getString(1) shouldEqual outputDf.first().getString(1)
   }
diff --git a/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/frameworks/spark/runtime/AmaContext.scala b/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/frameworks/spark/runtime/AmaContext.scala
index fc9fb94..930927c 100644
--- a/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/frameworks/spark/runtime/AmaContext.scala
+++ b/frameworks/spark/runtime/src/main/scala/org/apache/amaterasu/frameworks/spark/runtime/AmaContext.scala
@@ -40,7 +40,7 @@
   }
 
   def getDataFrame(actionName: String, dfName: String, format: String = "parquet"): DataFrame = {
-    spark.read.format(format).load(s"${env.workingDir}/$jobId/$actionName/$dfName")
+    spark.read.format(format).load(s"${env.getWorkingDir}/$jobId/$actionName/$dfName")
   }
 
   def getDataset[T: Encoder](actionName: String, dfName: String, format: String = "parquet"): Dataset[T] = {
diff --git a/leader-common/build.gradle b/leader-common/build.gradle
index 3499791..381cc75 100644
--- a/leader-common/build.gradle
+++ b/leader-common/build.gradle
@@ -41,6 +41,8 @@
 apply plugin: 'org.junit.platform.gradle.plugin'
 
 junitPlatform {
+    enableStandardTestTask = true
+    
     filters {
         engines {
             include 'spek'
@@ -48,6 +50,7 @@
     }
 }
 
+
 sourceCompatibility = 1.8
 targetCompatibility = 1.8
 
@@ -69,16 +72,20 @@
     compile project(':amaterasu-sdk')
 
     compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.3'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.6.4'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.6.4'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.6.4'
-    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.6.4'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: '2.9.8'
+    
     compile group: 'org.reflections', name: 'reflections', version: '0.9.11'
-    compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '4.2.0.201601211800-r'
+    //compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '4.2.0.201601211800-r'
+    compile group: 'org.eclipse.jgit', name: 'org.eclipse.jgit', version: '5.3.0.201903130848-r'
     compile group: 'org.apache.activemq', name: 'activemq-broker', version: '5.15.3'
     runtime group: 'org.apache.activemq', name: 'activemq-kahadb-store', version: '5.15.3'
-    compile group: 'com.andreapivetta.kolor', name: 'kolor', version: '0.0.2'
+    compile group: 'com.importre', name: 'crayon', version: '0.1.0'
     compile group: 'com.beust', name: 'klaxon', version: '5.0.1'
+    compile group: 'com.github.ajalt', name: 'clikt', version: '1.6.0'
 
     compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
     compile "org.jetbrains.kotlin:kotlin-reflect"
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt
index 2f98fa6..00b640a 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManager.kt
@@ -18,9 +18,10 @@
 
 import com.uchuhimo.konf.Config
 import com.uchuhimo.konf.source.yaml.toYaml
+import org.apache.amaterasu.common.logging.KLogging
 import java.io.File
 
-class ConfigManager(private val env: String, private val repoPath: String, private val frameworkItems: List<String> = emptyList()) {
+class ConfigManager(private val env: String, private val repoPath: String, private val frameworkItems: List<String> = emptyList()): KLogging() {
 
     private val envFolder = "$repoPath/env/$env"
 
@@ -34,6 +35,7 @@
     }
 
     init {
+        log.info("environment folder is $envFolder")
         for (file in File(envFolder).listFiles()) {
             config = config.from.yaml.file(file)
             println(config.toYaml.toText())
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/JobParser.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/JobParser.kt
index f9e6118..4401251 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/JobParser.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/dsl/JobParser.kt
@@ -89,8 +89,7 @@
         )
 
         //updating the list of frameworks setup
-        manager.frameworks.getOrPut(action.data.groupId) { HashSet() }
-                .add(action.data.typeId)
+        manager[action.data.groupId] = action.data.typeId
 
 
         if (!manager.isInitialized) {
@@ -118,8 +117,7 @@
             manager.registerAction(errorAction)
 
             //updating the list of frameworks setup
-            manager.frameworks.getOrPut(errorAction.data.groupId) { HashSet() }
-                    .add(errorAction.data.typeId)
+            manager[errorAction.data.groupId] = errorAction.data.typeId
         }
 
         parseActions(actions.drop(1), manager, actionsQueue, attempts, action)
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
new file mode 100644
index 0000000..f8a9a57
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.execution
+
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.leader.common.dsl.GitUtil
+import org.apache.amaterasu.leader.common.dsl.JobParser
+import org.apache.curator.framework.CuratorFramework
+import org.apache.zookeeper.CreateMode
+import java.util.concurrent.BlockingQueue
+
+object JobLoader : KLogging() {
+
+    fun loadJob(src: String, branch: String, jobId: String, client: CuratorFramework, attempts: Int, actionsQueue: BlockingQueue<ActionData>): JobManager {
+
+        // creating the jobs znode and storing the source repo and branch
+        client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId")
+        client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId/repo", src.toByteArray())
+        client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId/branch", branch.toByteArray())
+
+        val maki: String = loadMaki(src, branch)
+
+        return createJobManager(maki, jobId, client, attempts, actionsQueue)
+
+    }
+
+    fun createJobManager(maki: String, jobId: String, client: CuratorFramework, attempts: Int, actionsQueue: BlockingQueue<ActionData>): JobManager {
+
+        return JobParser.parse(
+                jobId,
+                maki,
+                actionsQueue,
+                client,
+                attempts
+        )
+    }
+
+    fun loadMaki(src: String, branch: String): String {
+
+        // cloning the git repo
+        log.debug("getting repo: $src, for branch $branch")
+        GitUtil.cloneRepo(src, branch)
+
+        // parsing the maki.yaml and creating a JobManager to
+        // coordinate the workflow based on the file
+        val maki = JobParser.loadMakiFile()
+        return maki
+    }
+
+    fun reloadJob(jobId: String, client: CuratorFramework, attempts: Int, actionsQueue: BlockingQueue<ActionData>): JobManager {
+
+        //val jobState = client.getChildren.forPath(s"/$jobId")
+        val src = String(client.data.forPath("/$jobId/repo"))
+        val branch = String(client.data.forPath("/$jobId/branch"))
+
+        val maki: String = loadMaki(src, branch)
+
+        val jobManager: JobManager = createJobManager(maki, jobId, client, attempts, actionsQueue)
+        restoreJobState(jobManager, jobId, client)
+
+        jobManager.start()
+        return jobManager
+    }
+
+    fun restoreJobState(jobManager: JobManager, jobId: String, client: CuratorFramework): Unit {
+
+        val tasks = client.children.forPath("/$jobId").filter { it.startsWith("task") }
+
+        for (task in tasks) {
+
+            val status = ActionStatus.valueOf(String(client.data.forPath("/$jobId/$task")))
+            if (status == ActionStatus.Queued || status == ActionStatus.Started) {
+                jobManager.reQueueAction(task.substring(task.indexOf("task-") + 5))
+            }
+
+        }
+
+    }
+
+
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
index 997abfc..1c69ee7 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
@@ -37,7 +37,9 @@
 
     // TODO: this is not private due to tests, fix this!!!
     val registeredActions = HashMap<String, Action>()
-    val frameworks = HashMap<String, HashSet<String>>()
+    private val frameworks = HashMap<String, HashSet<String>>()
+
+    operator fun set(groupId: String, typeId: String) = frameworks.getOrPut(groupId) { HashSet() }.add(typeId)
 
     /**
      * The start method initiates the job execution by executing the first action.
@@ -64,7 +66,7 @@
             val nextAction: ActionData? = executionQueue.poll()
 
             if (nextAction != null) {
-                registeredActions[nextAction.id]!!.announceStart()
+                registeredActions[nextAction.id]?.announceStart()
             }
 
             return nextAction
@@ -72,8 +74,11 @@
 
     fun reQueueAction(actionId: String) {
 
-        val action = registeredActions[actionId]
-        executionQueue.put(action!!.data)
+        log.info("requeing action $actionId")
+        registeredActions.forEach { log.info("key ${it.key}") }
+
+        val action: Action = registeredActions[actionId] ?: throw IllegalAccessException()
+        executionQueue.put(action.data)
         registeredActions[actionId]!!.announceQueued()
 
     }
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
new file mode 100644
index 0000000..cf3e10d
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.execution.frameworls
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
+import org.reflections.Reflections
+
+class FrameworkProvidersFactory(val env: String, val config: ClusterConfig) : KLogging() {
+
+    var  providers: Map<String, FrameworkSetupProvider>
+
+    init {
+        val reflections =  Reflections(ClassLoader::class.java)
+        val runnerTypes = reflections.getSubTypesOf(FrameworkSetupProvider::class.java)
+
+
+        providers = runnerTypes.map  {
+
+
+            val provider = it.newInstance()
+
+            provider.init(env, config)
+            log.info("a provider for group ${provider.groupIdentifier} was created")
+
+            provider.groupIdentifier to provider
+
+        }.toMap()
+    }
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/AmaOpts.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/AmaOpts.kt
new file mode 100644
index 0000000..132bd77
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/AmaOpts.kt
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.launcher
+
+data class AmaOpts(
+        var repo: String = "",
+        var branch: String = "master",
+        var env: String = "default",
+        var name: String = "amaterasu-job",
+        var jobId: String = "",
+        var newJobId: String = "",
+        var report: String = "code",
+        var home: String = "") {
+
+    fun toCmdString(): String {
+
+        var cmd = " --repo $repo --branch $branch --env $env --name $name --report $report --home $home"
+        if (jobId.isNotEmpty()) {
+            cmd += " --job-id $jobId"
+        }
+        return cmd
+    }
+
+    override fun toString(): String {
+        return toCmdString()
+    }
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/ArgsParser.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/ArgsParser.kt
new file mode 100644
index 0000000..c745c4a
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/ArgsParser.kt
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.launcher
+
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.options.default
+import com.github.ajalt.clikt.parameters.options.option
+import com.github.ajalt.clikt.parameters.options.prompt
+
+abstract class ArgsParser : CliktCommand() {
+
+     val repo: String by option("--repo", help = "The service address").prompt("Please provide an Amaterasu Reop")
+     val branch: String by option(help = "The branch to be executed (default is master)").default("master")
+     val env: String by option(help = "The environment to be executed (test, prod, etc. values from the default env are taken if np env specified)").default("default")
+     val name: String by option(help = "The name of the job").default("amaterasu-job")
+     val jobId: String by option("--job-id", help = "The jobId - should be passed only when resuming a job").default("")
+     val newJobId: String by option("--new-job-id", help = "The jobId - should never be passed by a user").default("")
+     val report: String by option(help = "The level of reporting").default("code")
+     val home: String by option(help = "").default("")
+
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.kt
index c5eee74..6bda13c 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.kt
@@ -16,22 +16,20 @@
  */
 package org.apache.amaterasu.leader.common.utilities
 
-import com.andreapivetta.kolor.green
-import com.andreapivetta.kolor.lightWhite
-import com.andreapivetta.kolor.red
+
 import com.beust.klaxon.Klaxon
+import com.importre.crayon.bold
+import com.importre.crayon.brightWhite
+import com.importre.crayon.green
+import com.importre.crayon.red
 import org.apache.amaterasu.common.execution.actions.Notification
 import org.apache.amaterasu.common.execution.actions.enums.NotificationType
 import javax.jms.Message
 import javax.jms.MessageListener
 import javax.jms.TextMessage
 
-//import org.apache.amaterasu.common.execution.actions
-
 class ActiveReportListener : MessageListener {
 
-    //implicit val formats = DefaultFormats
-
     override fun onMessage(message: Message): Unit = when (message) {
         is TextMessage -> try {
             val notification = Klaxon().parse<Notification>(message.text)
@@ -46,12 +44,12 @@
     private fun printNotification(notification: Notification) = when (notification.notType) {
 
         NotificationType.Info ->
-            println("===> ${notification.msg} ".lightWhite())
+            println("===> ${notification.msg} ".brightWhite().bold())
         NotificationType.Success ->
-            println("===> ${notification.line}".green())
+            println("===> ${notification.line}".green().bold())
         NotificationType.Error -> {
-            println("===> ${notification.line}".red())
-            println("===> ${notification.msg} ".red())
+            println("===> ${notification.line}".red().bold())
+            println("===> ${notification.msg} ".red().bold())
 
         }
 
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt
new file mode 100644
index 0000000..dc76c62
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/DataLoader.kt
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.utilities
+
+import java.io.File
+import java.io.FileInputStream
+import java.nio.file.Files
+import java.nio.file.Paths
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import com.fasterxml.jackson.module.kotlin.KotlinModule
+import com.fasterxml.jackson.module.kotlin.readValue
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.TaskData
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.dataobjects.ExecData
+
+import org.apache.amaterasu.common.execution.dependencies.Dependencies
+import org.apache.amaterasu.common.execution.dependencies.PythonDependencies
+import org.apache.amaterasu.common.logging.KLogging
+
+import org.apache.amaterasu.common.runtime.Environment
+import org.yaml.snakeyaml.Yaml
+
+
+object DataLoader : KLogging() {
+
+    private val mapper = ObjectMapper()
+
+    private val ymlMapper = ObjectMapper(YAMLFactory())
+
+    init {
+        mapper.registerModule(KotlinModule())
+        ymlMapper.registerModule(KotlinModule())
+    }
+
+    @JvmStatic
+    fun getTaskDataBytes(actionData: ActionData, env: String): ByteArray {
+        return mapper.writeValueAsBytes(getTaskData(actionData, env))
+    }
+
+    @JvmStatic
+    fun getTaskData(actionData: ActionData, env: String): TaskData {
+        val srcFile = actionData.src
+        var src = ""
+
+        if (srcFile.isNotEmpty()) {
+            src = File("repo/src/$srcFile").readText()
+        }
+
+        val envValue = File("repo/env/$env/job.yml").readText()
+
+        val envData = ymlMapper.readValue<Environment>(envValue)
+
+        val exports = actionData.exports
+
+        return TaskData(src, envData, actionData.groupId, actionData.typeId, exports)
+    }
+
+    @JvmStatic
+    fun getTaskDataString(actionData: ActionData, env: String): String {
+        return mapper.writeValueAsString(getTaskData(actionData, env))
+    }
+
+    @JvmStatic
+    fun getExecutorDataBytes(env: String, clusterConf: ClusterConfig): ByteArray {
+        return mapper.writeValueAsBytes(getExecutorData(env, clusterConf))
+    }
+
+    @JvmStatic
+    fun getExecutorData(env: String, clusterConf: ClusterConfig): ExecData {
+
+        // loading the job configuration
+        val envValue = File("repo/env/$env/job.yml").readText() //TODO: change this to YAML
+        val envData = ymlMapper.readValue<Environment>(envValue)
+
+        // loading all additional configurations
+        val files = File("repo/env/$env/").listFiles().filter { it.isFile }.filter { it.name != "job.yml" }
+        val config = files.map { yamlToMap(it) }.toMap()
+
+        // loading the job's dependencies
+        var depsData: Dependencies? = null
+        var pyDepsData: PythonDependencies? = null
+
+        if (Files.exists(Paths.get("repo/deps/jars.yml"))) {
+            val depsValue = File("repo/deps/jars.yml").readText()
+            depsData = ymlMapper.readValue(depsValue)
+        }
+        if (Files.exists(Paths.get("repo/deps/python.yml"))) {
+            val pyDepsValue = File("repo/deps/python.yml").readText()
+            pyDepsData = ymlMapper.readValue(pyDepsValue)
+        }
+
+        return ExecData(envData, depsData, pyDepsData, config)
+    }
+
+    fun yamlToMap(file: File): Pair<String, Map<String, Any>> {
+
+        val yaml = Yaml()
+        val conf = yaml.load<Map<String, Any>>(FileInputStream(file))
+
+        return file.name.replace(".yml", "") to conf
+    }
+
+    @JvmStatic
+    fun getExecutorDataString(env: String, clusterConf: ClusterConfig): String {
+        return mapper.writeValueAsString(getExecutorData(env, clusterConf))
+    }
+
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/MessagingClientUtil.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/MessagingClientUtil.kt
index e1a20b5..7e21851 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/MessagingClientUtil.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/MessagingClientUtil.kt
@@ -36,7 +36,7 @@
         val destination = session.createTopic("JOB.REPORT")
 
         val consumer = session.createConsumer(destination)
-        consumer.messageListener = ActiveReportListener()
+        consumer.setMessageListener(ActiveReportListener())
 
         return consumer
     }
diff --git a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.scala b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.scala
index db1ea78..a748690 100644
--- a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.scala
+++ b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.scala
@@ -44,13 +44,14 @@
 
     val reflections = new Reflections(getClass.getClassLoader)
     val runnerTypes = reflections.getSubTypesOf(classOf[FrameworkSetupProvider]).toSet
-    log.info(s">>> $runnerTypes")
+
     result.providers = runnerTypes.map(r => {
 
       val provider = Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[FrameworkSetupProvider]
 
       provider.init(env, config)
       log.info(s"a provider for group ${provider.getGroupIdentifier} was created")
+      log.info(s"config = $config")
       (provider.getGroupIdentifier, provider)
 
     }).toMap
diff --git a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
deleted file mode 100755
index b8af8df..0000000
--- a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/utilities/DataLoader.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.common.utilities
-
-import java.io.{File, FileInputStream}
-import java.nio.file.{Files, Paths, StandardCopyOption}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.{ActionData, ExecData, TaskData}
-import org.apache.amaterasu.common.execution.dependencies.{Dependencies, PythonDependencies}
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.runtime.Environment
-import org.yaml.snakeyaml.Yaml
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.io.Source
-
-
-object DataLoader extends Logging {
-
-  val mapper = new ObjectMapper()
-  mapper.registerModule(DefaultScalaModule)
-
-  val ymlMapper = new ObjectMapper(new YAMLFactory())
-  ymlMapper.registerModule(DefaultScalaModule)
-
-  def getTaskDataBytes(actionData: ActionData, env: String): Array[Byte] = {
-    mapper.writeValueAsBytes(getTaskData(actionData, env))
-  }
-
-  def getTaskData(actionData: ActionData, env: String): TaskData = {
-    val srcFile = actionData.getSrc
-    var src = ""
-
-    if(!srcFile.isEmpty){
-       src = Source.fromFile(s"repo/src/$srcFile").mkString
-    }
-
-    val envValue = Source.fromFile(s"repo/env/$env/job.yml").mkString
-
-    val envData = ymlMapper.readValue(envValue, classOf[Environment])
-
-    val exports = actionData.getExports.asScala.toMap // Kotlin to Scala TODO: Remove me as fast as you can
-
-    TaskData(src, envData, actionData.getGroupId, actionData.getTypeId, exports)
-  }
-
-  def getTaskDataString(actionData: ActionData, env: String): String = {
-    mapper.writeValueAsString(getTaskData(actionData, env))
-  }
-
-  def getExecutorDataBytes(env: String, clusterConf: ClusterConfig): Array[Byte] = {
-    mapper.writeValueAsBytes(getExecutorData(env, clusterConf))
-  }
-
-  def getExecutorData(env: String, clusterConf: ClusterConfig): ExecData = {
-
-    // loading the job configuration
-    val envValue = Source.fromFile(s"repo/env/$env/job.yml").mkString //TODO: change this to YAML
-    val envData = ymlMapper.readValue(envValue, classOf[Environment])
-    // loading all additional configurations
-    val files = new File(s"repo/env/$env/").listFiles().filter(_.isFile).filter(_.getName != "job.yml")
-    val config = files.map(yamlToMap).toMap
-    // loading the job's dependencies
-    var depsData: Dependencies = null
-    var pyDepsData: PythonDependencies = null
-    if (Files.exists(Paths.get("repo/deps/jars.yml"))) {
-      val depsValue = Source.fromFile(s"repo/deps/jars.yml").mkString
-      depsData = ymlMapper.readValue(depsValue, classOf[Dependencies])
-    }
-    if (Files.exists(Paths.get("repo/deps/requirements.txt"))) {
-      Files.copy(Paths.get("repo/deps/requirements.txt"), Paths.get("dist/user-requirements.txt"), StandardCopyOption.REPLACE_EXISTING)
-      pyDepsData = PythonDependencies(Array[String]("user-requirements.txt"))
-    }
-    val data = mapper.writeValueAsBytes(ExecData(envData, depsData, pyDepsData, config))
-    ExecData(envData, depsData, pyDepsData, config)
-  }
-
-  def yamlToMap(file: File): (String, Map[String, Any]) = {
-
-    val yaml = new Yaml()
-    val conf = yaml.load(new FileInputStream(file)).asInstanceOf[java.util.Map[String, Any]].asScala.toMap
-
-    (file.getName.replace(".yml", ""), conf)
-  }
-
-  def getExecutorDataString(env: String, clusterConf: ClusterConfig): String = {
-    mapper.writeValueAsString(getExecutorData(env, clusterConf))
-  }
-
-}
-
-class ConfMap[String, T <: ConfMap[String, T]] extends mutable.ListMap[String, Either[String, T]]
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
index 00c8254..0e87cae 100644
--- a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
@@ -25,7 +25,7 @@
 
 class ConfigManagerTests : Spek({
 
-    val marker = this.javaClass.getResource("/maki.yml").path
+    val marker = this.javaClass.getResource("/maki.yml")!!.path
 
     given("a ConfigManager for a job ") {
 
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtilTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtilTests.kt
new file mode 100644
index 0000000..3b9a229
--- /dev/null
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtilTests.kt
@@ -0,0 +1,18 @@
+package org.apache.amaterasu.leader.common.dsl
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import java.io.File
+
+class GitUtilTests : Spek({
+    given("a repo and a branch") {
+        val repo = "https://github.com/shintoio/amaterasu-job-sample.git"
+        val branch = "feature/new-format"
+
+        it("clones the repo successfully") {
+            GitUtil.cloneRepo(repo, branch)
+            assert(File("repo").exists())
+        }
+    }
+})
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobParserArtifactTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobParserArtifactTests.kt
index 95677c6..88978dc 100644
--- a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobParserArtifactTests.kt
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobParserArtifactTests.kt
@@ -32,10 +32,10 @@
 /*
 this Spek tests how the JobParser handles artifacts and repositories
  */
-object JobParserArtifactTests : Spek({
+class JobParserArtifactTests : Spek({
 
     val retryPolicy =  ExponentialBackoffRetry(1000, 3)
-    val server = TestingServer(2182, true)
+    val server = TestingServer(2192, true)
     val client = CuratorFrameworkFactory.newClient(server.connectString, retryPolicy)
     client.start()
 
diff --git a/leader-yarn/build.gradle b/leader-yarn/build.gradle
index 5f05943..dcea40f 100644
--- a/leader-yarn/build.gradle
+++ b/leader-yarn/build.gradle
@@ -14,6 +14,95 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+buildscript {
 
+    repositories {
+        mavenCentral()
+        maven {
+            url 'http://repository.jetbrains.com/all'
+        }
+        maven {
+            url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots"
+        }
+    }
 
+    dependencies {
+        classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
+        classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0'
+    }
+}
 
+plugins {
+    id "com.github.johnrengelman.shadow" version "2.0.4"
+    id 'scala'
+}
+
+apply plugin: 'kotlin'
+apply plugin: 'org.junit.platform.gradle.plugin'
+
+junitPlatform {
+    filters {
+        engines {
+            include 'spek'
+        }
+    }
+}
+
+sourceCompatibility = 1.8
+targetCompatibility = 1.8
+
+shadowJar {
+    zip64 true
+}
+
+repositories {
+    maven { url "https://plugins.gradle.org/m2/" }
+    maven { url 'http://repository.jetbrains.com/all' }
+    maven { url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots" }
+    maven { url "http://dl.bintray.com/jetbrains/spek" }
+    maven { url "http://oss.jfrog.org/artifactory/oss-snapshot-local" }
+
+    mavenCentral()
+    jcenter()
+}
+
+dependencies {
+    compile project(':leader-common')
+    compile project(':amaterasu-sdk')
+
+    compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
+    compile "org.jetbrains.kotlin:kotlin-reflect"
+    compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.1.1'
+
+    testCompile 'org.jetbrains.spek:spek-api:1.1.5'
+    testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
+    testCompile 'org.apache.curator:curator-test:2.9.1'
+    testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5'
+
+    // Spek requires kotlin-reflect, can be omitted if already in the classpath
+    testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
+}
+
+task copyToHomeRoot(type: Copy) {
+    from 'src/main/scripts'
+    into '../build/amaterasu/'
+}
+
+task copyToHomeBin(type: Copy) {
+    dependsOn shadowJar
+    from 'build/libs'
+    into '../build/amaterasu/bin'
+}
+
+task copyToHome() {
+    dependsOn copyToHomeRoot
+    dependsOn copyToHomeBin
+}
+
+compileKotlin{
+    kotlinOptions.jvmTarget = "1.8"
+}
+
+compileTestKotlin {
+    kotlinOptions.jvmTarget = "1.8"
+}
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/AppMasterArgsParser.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/AppMasterArgsParser.kt
new file mode 100644
index 0000000..112d935
--- /dev/null
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/AppMasterArgsParser.kt
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.yarn
+
+import org.apache.amaterasu.leader.common.launcher.AmaOpts
+import org.apache.amaterasu.leader.common.launcher.ArgsParser
+import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
+
+class AppMasterArgsParser: ArgsParser() {
+
+    override fun run() {
+
+        var opts = AmaOpts(repo, branch, env, name, jobId, newJobId, report, home)
+
+        val appMaster = ApplicationMaster()
+        appMaster.address = MessagingClientUtil.borkerAddress
+        println("broker address is ${appMaster.address}")
+        appMaster.broker.addConnector(appMaster.address)
+        appMaster.broker.start()
+
+        appMaster.execute(opts)
+    }
+}
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
new file mode 100644
index 0000000..22b9eaa
--- /dev/null
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.yarn
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import com.fasterxml.jackson.module.kotlin.KotlinModule
+
+import kotlinx.coroutines.async
+import kotlinx.coroutines.runBlocking
+
+import org.apache.activemq.broker.BrokerService
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.common.utils.ActiveNotifier
+import org.apache.amaterasu.leader.common.configuration.ConfigManager
+import org.apache.amaterasu.leader.common.execution.JobLoader
+import org.apache.amaterasu.leader.common.execution.JobManager
+import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
+import org.apache.amaterasu.leader.common.launcher.AmaOpts
+import org.apache.amaterasu.leader.common.utilities.DataLoader
+import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
+import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
+import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.framework.recipes.barriers.DistributedBarrier
+import org.apache.curator.retry.ExponentialBackoffRetry
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.*
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.YarnException
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.util.Records
+
+import org.apache.zookeeper.CreateMode
+
+import java.io.File
+import java.io.FileInputStream
+import java.io.IOException
+import java.nio.ByteBuffer
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.LinkedBlockingQueue
+
+import javax.jms.MessageConsumer
+
+class ApplicationMaster : KLogging(), AMRMClientAsync.CallbackHandler {
+
+
+    lateinit var address: String
+
+    val broker: BrokerService = BrokerService()
+    private val conf = YarnConfiguration()
+
+
+    private val actionsBuffer = ConcurrentLinkedQueue<ActionData>()
+    private val completedContainersAndTaskIds = ConcurrentHashMap<Long, String>()
+    private val containersIdsToTask = ConcurrentHashMap<Long, ActionData>()
+    private val yamlMapper = ObjectMapper(YAMLFactory())
+
+    private lateinit var propPath: String
+    private lateinit var props: FileInputStream
+    private lateinit var jobManager: JobManager
+    private lateinit var zkClient: CuratorFramework
+    private lateinit var env: String
+    private lateinit var rmClient: AMRMClientAsync<AMRMClient.ContainerRequest>
+    private lateinit var frameworkFactory: FrameworkProvidersFactory
+    private lateinit var config: ClusterConfig
+    private lateinit var fs: FileSystem
+    private lateinit var consumer: MessageConsumer
+    private lateinit var configManager: ConfigManager
+    private lateinit var notifier: ActiveNotifier
+    private lateinit var nmClient: NMClientAsync
+
+    init {
+        yamlMapper.registerModule(KotlinModule())
+    }
+
+    fun execute(opts: AmaOpts) {
+
+        propPath = System.getenv("PWD") + "/amaterasu.properties"
+        props = FileInputStream(File(propPath))
+
+        // no need for HDFS double check (nod to Aaron Rodgers)
+        // jars on HDFS should have been verified by the YARN zkClient
+        config = ClusterConfig.apply(props)
+        fs = FileSystem.get(conf)
+
+        initJob(opts)
+
+        // now that the job was initiated, the curator zkClient is Started and we can
+        // register the broker's address
+        zkClient.create().withMode(CreateMode.PERSISTENT).forPath("/${jobManager.jobId}/broker")
+        zkClient.setData().forPath("/${jobManager.jobId}/broker", address.toByteArray())
+
+        // once the broker is registered, we can remove the barrier so clients can connect
+        log.info("/${jobManager.jobId}-report-barrier")
+        val barrier = DistributedBarrier(zkClient, "/${jobManager.jobId}-report-barrier")
+        barrier.removeBarrier()
+
+        consumer = MessagingClientUtil.setupMessaging(address)
+        notifier = ActiveNotifier(address)
+
+        log.info("number of messages ${broker.adminView.totalMessageCount}")
+
+        // Initialize clients to ResourceManager and NodeManagers
+        nmClient = NMClientAsyncImpl(YarnNMCallbackHandler(notifier))
+        nmClient.init(conf)
+        nmClient.start()
+
+        rmClient = startRMClient()
+
+        val items = mutableListOf<FrameworkSetupProvider>()
+
+        for (p in frameworkFactory.providers().values()) {
+            items.add(p)
+        }
+        val configItems = items.flatMap { it.configurationItems.asIterable() }
+        configManager = ConfigManager(env, "repo", configItems)
+
+
+        val registrationResponse = rmClient.registerApplicationMaster("", 0, "")
+        val maxMem = registrationResponse.maximumResourceCapability.memorySize
+        val maxVCores = registrationResponse.maximumResourceCapability.virtualCores
+
+        while (!jobManager.outOfActions) {
+            val capability = Records.newRecord(Resource::class.java)
+
+            val actionData = jobManager.nextActionData
+            if (actionData != null) {
+
+                notifier.info("requesting container fo ${actionData.name}")
+                val frameworkProvider = frameworkFactory.getFramework(actionData.groupId)
+                val driverConfiguration = frameworkProvider.driverConfiguration
+
+                var mem: Long = driverConfiguration.memory.toLong()
+                mem = Math.min(mem, maxMem)
+                capability.memorySize = mem
+
+                var cpu = driverConfiguration.cpus
+                cpu = Math.min(cpu, maxVCores)
+                capability.virtualCores = cpu
+
+                createTaskConfiguration(actionData)
+                requestContainer(actionData, capability)
+
+            }
+        }
+
+        log.info("Finished requesting containers")
+        readLine()
+    }
+
+    private fun initJob(opts: AmaOpts) {
+
+        this.env = opts.env
+        frameworkFactory = FrameworkProvidersFactory.apply(env, config)
+
+        try {
+            val retryPolicy = ExponentialBackoffRetry(1000, 3)
+            zkClient = CuratorFrameworkFactory.newClient(config.zk(), retryPolicy)
+            zkClient.start()
+        } catch (e: Exception) {
+            log.error("Error connecting to zookeeper", e)
+            throw e
+        }
+
+        val zkPath = zkClient.checkExists().forPath("/${opts.newJobId}")
+
+        log.info("zkPath is $zkPath")
+        if (zkPath != null) {
+            log.info("resuming job" + opts.newJobId)
+            jobManager = JobLoader.reloadJob(
+                    opts.newJobId,
+                    zkClient,
+                    config.Jobs().tasks().attempts(),
+                    LinkedBlockingQueue<ActionData>())
+
+        } else {
+            log.info("new job is being created")
+            try {
+
+                jobManager = JobLoader.loadJob(
+                        opts.repo,
+                        opts.branch,
+                        opts.newJobId,
+                        zkClient,
+                        config.Jobs().tasks().attempts(),
+                        LinkedBlockingQueue<ActionData>())
+            } catch (e: Exception) {
+                log.error("Error creating JobManager.", e)
+                throw e
+            }
+
+        }
+
+        jobManager.start()
+        log.info("Started jobManager")
+    }
+
+    override fun onContainersAllocated(containers: MutableList<Container>?) = runBlocking {
+        containers?.let {
+            for (container in it) {
+
+                log.info("container ${container.id} allocated")
+                if (actionsBuffer.isNotEmpty()) {
+                    val actionData = actionsBuffer.poll()
+                    val cd = async {
+                        log.info("container ${container.id} allocated")
+
+                        val framework = frameworkFactory.getFramework(actionData.groupId)
+                        val runnerProvider = framework.getRunnerProvider(actionData.typeId)
+                        val ctx = Records.newRecord(ContainerLaunchContext::class.java)
+                        val commands: List<String> = listOf(runnerProvider.getCommand(jobManager.jobId, actionData, env, "${actionData.id}-${container.id.containerId}", address))
+
+                        log.info("container command ${commands.joinToString(prefix = " ", postfix = " ")}")
+                        ctx.commands = commands
+                        ctx.tokens = allTokens()
+                        ctx.localResources = setupContainerResources(framework, runnerProvider, actionData)
+                        ctx.environment = framework.environmentVariables
+
+                        nmClient.startContainerAsync(container, ctx)
+
+                        jobManager.actionStarted(actionData.id)
+                        containersIdsToTask[container.id.containerId] = actionData
+                        notifier.info("created container for ${actionData.name} created")
+                        //ctx.localResources.forEach { t: String, u: LocalResource ->  notifier.info("resource: $t = ${u.resource}") }
+                        log.info("launching container succeeded: ${container.id.containerId}; task: ${actionData.id}")
+                    }
+                }
+            }
+        }
+    }!!
+
+    private fun allTokens(): ByteBuffer {
+        // creating the credentials for container execution
+        val credentials = UserGroupInformation.getCurrentUser().credentials
+        val dob = DataOutputBuffer()
+        credentials.writeTokenStorageToStream(dob)
+
+        // removing the AM->RM token so that containers cannot access it.
+        val iter = credentials.allTokens.iterator()
+        log.info("Executing with tokens:")
+        for (token in iter) {
+            log.info(token.toString())
+            if (token.kind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
+        }
+        return ByteBuffer.wrap(dob.data, 0, dob.length)
+    }
+
+    /**
+     * Creates the map of resources to be copied into the container
+     * @framework The frameworkSetupProvider for the action
+     * @runnerProvider the actions runner provider
+     */
+    private fun setupContainerResources(framework: FrameworkSetupProvider, runnerProvider: RunnerSetupProvider, actionData: ActionData): Map<String, LocalResource> {
+
+        val yarnJarPath = Path(config.yarn().hdfsJarsPath())
+
+        // Getting framework (group) resources
+        val result = framework.groupResources.map { it.path to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it.path))) }.toMap().toMutableMap()
+
+        // Getting runner resources
+        result.putAll(runnerProvider.runnerResources.map { it to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it))) }.toMap())
+
+        // getting the action specific resources
+        result.putAll(runnerProvider.getActionResources(jobManager.jobId, actionData).map { it.removePrefix("${jobManager.jobId}/${actionData.name}/") to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it))) })
+
+        // getting the action specific dependencies
+        runnerProvider.getActionDependencies(jobManager.jobId, actionData).forEach { distributeFile(it, "${jobManager.jobId}/${actionData.name}/") }
+        result.putAll(runnerProvider.getActionDependencies(jobManager.jobId, actionData).map { File(it).name to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath("${jobManager.jobId}/${actionData.name}/$it"))) })
+
+        // Adding the Amaterasu configuration files
+        result["amaterasu.properties"] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, Path("/amaterasu.properties")))
+        result["log4j.properties"] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, Path("/log4j.properties")))
+
+        result.forEach { println("entry ${it.key} with value ${it.value}") }
+        return result.map { x -> x.key.removePrefix("/") to x.value }.toMap()
+    }
+
+    private fun createTaskConfiguration(actionData: ActionData) {
+
+        // setting up the configuration files for the container
+        val envYaml = configManager.getActionConfigContent(actionData.name, actionData.config)
+        writeConfigFile(envYaml, jobManager.jobId, actionData.name, "env.yaml")
+
+        val dataStores = DataLoader.getTaskData(actionData, env).exports
+        val dataStoresYaml = yamlMapper.writeValueAsString(dataStores)
+
+        writeConfigFile(dataStoresYaml, jobManager.jobId, actionData.name, "datastores.yaml")
+
+        writeConfigFile("jobId: ${jobManager.jobId}\nactionName: ${actionData.name}", jobManager.jobId, actionData.name, "runtime.yaml")
+
+    }
+
+    private fun writeConfigFile(content: String, jobId: String, actionName: String, fileName: String) {
+
+        val actionDistPath = createDistPath("$jobId/$actionName/$fileName")
+        val yarnJarPath = Path(config.yarn().hdfsJarsPath())
+        val targetPath = Path.mergePaths(yarnJarPath, actionDistPath)
+
+        val outputStream = fs.create(targetPath)
+        outputStream.writeUTF(content)
+        outputStream.close()
+        log.info("written file $targetPath")
+
+    }
+
+    private fun distributeFile(file: String, distributionPath: String) {
+
+        log.info("copying file $file, file status ${File(file).exists()}")
+
+        val actionDistPath = createDistPath("$distributionPath/$file")
+        val yarnJarPath = Path(config.yarn().hdfsJarsPath())
+        val targetPath = Path.mergePaths(yarnJarPath, actionDistPath)
+
+        log.info("target is $targetPath")
+
+        fs.copyFromLocalFile(false, true, Path(file), targetPath)
+
+    }
+
+    private fun createDistPath(path: String): Path = Path("/dist/$path")
+
+    private fun startRMClient(): AMRMClientAsync<AMRMClient.ContainerRequest> {
+        val client = AMRMClientAsync.createAMRMClientAsync<AMRMClient.ContainerRequest>(1000, this)
+        client.init(conf)
+        client.start()
+        return client
+    }
+
+    private fun createLocalResourceFromPath(path: Path): LocalResource {
+
+        val stat = fs.getFileStatus(path)
+        val fileResource = Records.newRecord(LocalResource::class.java)
+
+        fileResource.shouldBeUploadedToSharedCache = true
+        fileResource.visibility = LocalResourceVisibility.PUBLIC
+        fileResource.resource = ConverterUtils.getYarnUrlFromPath(path)
+        fileResource.size = stat.len
+        fileResource.timestamp = stat.modificationTime
+        fileResource.type = LocalResourceType.FILE
+        fileResource.visibility = LocalResourceVisibility.PUBLIC
+        return fileResource
+
+    }
+
+    private fun requestContainer(actionData: ActionData, capability: Resource) {
+
+        actionsBuffer.add(actionData)
+        log.info("About to ask container for action ${actionData.id} with mem ${capability.memory} and cores ${capability.virtualCores}. Action buffer size is: ${actionsBuffer.size}")
+
+        // we have an action to schedule, let's request a container
+        val priority: Priority = Records.newRecord(Priority::class.java)
+        priority.priority = 1
+        val containerReq = AMRMClient.ContainerRequest(capability, null, null, priority)
+        rmClient.addContainerRequest(containerReq)
+        log.info("Asked container for action ${actionData.id}")
+
+    }
+
+    override fun onNodesUpdated(updatedNodes: MutableList<NodeReport>?) {
+        log.info("Nodes change. Nothing to report.")
+    }
+
+    override fun onShutdownRequest() {
+        log.error("Shutdown requested.")
+        stopApplication(FinalApplicationStatus.KILLED, "Shutdown requested")
+    }
+
+    override fun getProgress(): Float {
+        return jobManager.registeredActions.size.toFloat() / completedContainersAndTaskIds.size
+    }
+
+    override fun onError(e: Throwable?) {
+        notifier.error("Error on AM", e!!.message!!)
+        stopApplication(FinalApplicationStatus.FAILED, "Error on AM")
+    }
+
+    override fun onContainersCompleted(statuses: MutableList<ContainerStatus>?) {
+        for (status in statuses!!) {
+            if (status.state == ContainerState.COMPLETE) {
+
+                val containerId = status.containerId.containerId
+                val task = containersIdsToTask[containerId]
+                rmClient.releaseAssignedContainer(status.containerId)
+
+                val taskId = task!!.id
+                if (status.exitStatus == 0) {
+
+                    //completedContainersAndTaskIds.put(containerId, task.id)
+                    jobManager.actionComplete(taskId)
+                    notifier.info("Container $containerId Complete with task $taskId with success.")
+                } else {
+                    // TODO: Check the getDiagnostics value and see if appropriate
+                    jobManager.actionFailed(taskId, status.diagnostics)
+                    notifier.error("", "Container $containerId Complete with task $taskId with Failed status code (${status.exitStatus})")
+                }
+            }
+        }
+
+        if (jobManager.outOfActions) {
+            log.info("Finished all tasks successfully! Wow!")
+            jobManager.actionsCount()
+            stopApplication(FinalApplicationStatus.SUCCEEDED, "SUCCESS")
+        } else {
+            log.info("jobManager.registeredActions.size: ${jobManager.registeredActions.size}; completedContainersAndTaskIds.size: ${completedContainersAndTaskIds.size}")
+        }
+    }
+
+    private fun stopApplication(finalApplicationStatus: FinalApplicationStatus, appMessage: String) {
+
+        try {
+            rmClient.unregisterApplicationMaster(finalApplicationStatus, appMessage, null)
+        } catch (ex: YarnException) {
+
+            log.error("Failed to unregister application", ex)
+        } catch (e: IOException) {
+            log.error("Failed to unregister application", e)
+        }
+        rmClient.stop()
+        nmClient.stop()
+    }
+
+    companion object {
+        @JvmStatic
+        fun main(args: Array<String>) = AppMasterArgsParser().main(args)
+
+    }
+}
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt
new file mode 100644
index 0000000..9b6c02f
--- /dev/null
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.yarn
+
+import org.apache.activemq.ActiveMQConnectionFactory
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.leader.common.launcher.AmaOpts
+import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
+import org.apache.amaterasu.leader.common.utilities.ActiveReportListener
+import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.framework.recipes.barriers.DistributedBarrier
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.hadoop.fs.*
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.records.*
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.client.api.YarnClientApplication
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.YarnException
+import org.apache.hadoop.yarn.util.Apps
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.util.Records
+import org.apache.log4j.LogManager
+import org.slf4j.LoggerFactory
+
+import javax.jms.*
+import java.io.File
+import java.io.FileInputStream
+import java.io.IOException
+import java.util.*
+
+import java.lang.System.exit
+
+class Client {
+    private val conf = YarnConfiguration()
+    private var fs: FileSystem? = null
+    private lateinit var consumer: MessageConsumer
+
+    @Throws(IOException::class)
+    private fun setLocalResourceFromPath(path: Path): LocalResource {
+
+        val stat = fs!!.getFileStatus(path)
+        val fileResource = Records.newRecord(LocalResource::class.java)
+        fileResource.resource = ConverterUtils.getYarnUrlFromPath(path)
+        fileResource.size = stat.len
+        fileResource.timestamp = stat.modificationTime
+        fileResource.type = LocalResourceType.FILE
+        fileResource.visibility = LocalResourceVisibility.PUBLIC
+        return fileResource
+    }
+
+    @Throws(Exception::class)
+    fun run(opts: AmaOpts, args: Array<String>) {
+
+        LogManager.resetConfiguration()
+        val config = ClusterConfig()
+        config.load(FileInputStream(opts.home + "/amaterasu.properties"))
+
+        // Create yarnClient
+        val yarnClient = YarnClient.createYarnClient()
+        yarnClient.init(conf)
+        yarnClient.start()
+
+        // Create application via yarnClient
+        var app: YarnClientApplication? = null
+        try {
+            app = yarnClient.createApplication()
+        } catch (e: YarnException) {
+            LOGGER.error("Error initializing yarn application with yarn client.", e)
+            exit(1)
+        } catch (e: IOException) {
+            LOGGER.error("Error initializing yarn application with yarn client.", e)
+            exit(2)
+        }
+
+        // Setup jars on hdfs
+        try {
+            fs = FileSystem.get(conf)
+        } catch (e: IOException) {
+            LOGGER.error("Eror creating HDFS client isntance.", e)
+            exit(3)
+        }
+
+        val jarPath = Path(config.yarn().hdfsJarsPath())
+        val jarPathQualified = fs!!.makeQualified(jarPath)
+        val distPath = Path.mergePaths(jarPathQualified, Path("/dist/"))
+
+        val appContext = app!!.applicationSubmissionContext
+
+        var newId = ""
+
+        val newIdVal = appContext.applicationId.toString() + "-" + UUID.randomUUID().toString()
+        if (opts.jobId.isEmpty()) {
+            newId = "--new-job-id=" + newIdVal
+        }
+
+
+        val commands = listOf("env AMA_NODE=" + System.getenv("AMA_NODE") +
+                " env HADOOP_USER_NAME=" + UserGroupInformation.getCurrentUser().userName +
+                " \$JAVA_HOME/bin/java" +
+                " -Dscala.usejavacp=false" +
+                " -Xmx2G" +
+                " org.apache.amaterasu.leader.yarn.ApplicationMaster " +
+                joinStrings(args) +
+                newId +
+                " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+                " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+        // Set up the container launch context for the application master
+        val amContainer = Records.newRecord(ContainerLaunchContext::class.java)
+        amContainer.commands = commands
+
+        // Setup local ama folder on hdfs.
+        try {
+
+            if (!fs!!.exists(jarPathQualified)) {
+                val home = File(opts.home)
+                fs!!.mkdirs(jarPathQualified)
+
+                for (f in home.listFiles()) {
+                    fs!!.copyFromLocalFile(false, true, Path(f.getAbsolutePath()), jarPathQualified)
+                }
+
+                // setup frameworks
+                val frameworkFactory = FrameworkProvidersFactory.apply(opts.env, config)
+                for (group in frameworkFactory.groups()) {
+                    val framework = frameworkFactory.getFramework(group)
+                    for (file in framework.groupResources) {
+                        if (file.exists())
+                            file.let {
+                                val target = Path.mergePaths(distPath, Path(it.path))
+                                fs!!.copyFromLocalFile(false, true, Path(file.path), target)
+                            }
+                    }
+                }
+            }
+
+        } catch (e: IOException) {
+            println("===> error " + e.message + e.stackTrace)
+            LOGGER.error("Error uploading ama folder to HDFS.", e)
+            exit(3)
+        } catch (ne: NullPointerException) {
+            println("===> ne error " + ne.message)
+            LOGGER.error("No files in home dir.", ne)
+            exit(4)
+        }
+
+        // get version of build
+        val version = config.version()
+
+        // get local resources pointers that will be set on the master container env
+        val leaderJarPath = String.format("/bin/leader-%s-all.jar", version)
+        LOGGER.info("Leader Jar path is: {}", leaderJarPath)
+        val mergedPath = Path.mergePaths(jarPath, Path(leaderJarPath))
+
+        // System.out.println("===> path: " + jarPathQualified);
+        LOGGER.info("Leader merged jar path is: {}", mergedPath)
+        var propFile: LocalResource? = null
+        var log4jPropFile: LocalResource? = null
+
+        try {
+            propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, Path("/amaterasu.properties")))
+            log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, Path("/log4j.properties")))
+        } catch (e: IOException) {
+            LOGGER.error("Error initializing yarn local resources.", e)
+            exit(4)
+        }
+
+        // set local resource on master container
+        val localResources = HashMap<String, LocalResource>()
+
+        // making the bin folder's content available to the appMaster
+        val bin = fs!!.listFiles(Path.mergePaths(jarPath, Path("/bin")), true)
+
+        while (bin.hasNext()) {
+            val binFile = bin.next()
+            localResources[binFile.path.name] = setLocalResourceFromPath(binFile.path)
+        }
+
+        localResources["amaterasu.properties"] = propFile!!
+        localResources["log4j.properties"] = log4jPropFile!!
+        amContainer.localResources = localResources
+
+        // Setup CLASSPATH for ApplicationMaster
+        val appMasterEnv = HashMap<String, String>()
+        setupAppMasterEnv(appMasterEnv)
+        appMasterEnv["AMA_CONF_PATH"] = String.format("%s/amaterasu.properties", config.YARN().hdfsJarsPath())
+        amContainer.environment = appMasterEnv
+
+        // Set up resource type requirements for ApplicationMaster
+        val capability = Records.newRecord(Resource::class.java)
+        capability.memory = config.YARN().master().memoryMB()
+        capability.virtualCores = config.YARN().master().cores()
+
+        // Finally, set-up ApplicationSubmissionContext for the application
+        appContext.applicationName = "amaterasu-" + opts.name
+        appContext.amContainerSpec = amContainer
+        appContext.resource = capability
+        appContext.queue = config.YARN().queue()
+        appContext.priority = Priority.newInstance(1)
+
+        // Submit application
+        val appId = appContext.applicationId
+        LOGGER.info("Submitting application {}", appId)
+        try {
+            yarnClient.submitApplication(appContext)
+
+        } catch (e: YarnException) {
+            LOGGER.error("Error submitting application.", e)
+            exit(6)
+        } catch (e: IOException) {
+            LOGGER.error("Error submitting application.", e)
+            exit(7)
+        }
+
+        val zkClient = CuratorFrameworkFactory.newClient(config.zk(),
+                ExponentialBackoffRetry(1000, 3))
+        zkClient.start()
+
+         val reportBarrier = DistributedBarrier(zkClient, "/$newIdVal-report-barrier")
+        reportBarrier.setBarrier()
+        reportBarrier.waitOnBarrier()
+
+        val address = String(zkClient.data.forPath("/$newIdVal/broker"))
+        println("===> $address")
+        consumer = MessagingClientUtil.setupMessaging(address)
+
+        var appReport: ApplicationReport? = null
+        var appState: YarnApplicationState
+
+        do {
+            try {
+                appReport = yarnClient.getApplicationReport(appId)
+            } catch (e: YarnException) {
+                LOGGER.error("Error getting application report.", e)
+                exit(8)
+            } catch (e: IOException) {
+                LOGGER.error("Error getting application report.", e)
+                exit(9)
+            }
+
+            appState = appReport!!.yarnApplicationState
+            if (isAppFinished(appState)) {
+                exit(0)
+                break
+            }
+
+            //LOGGER.info("Application not finished ({})", appReport.getProgress());
+            try {
+                Thread.sleep(100)
+            } catch (e: InterruptedException) {
+                LOGGER.error("Interrupted while waiting for job completion.", e)
+                exit(137)
+            }
+
+        } while (!isAppFinished(appState))
+
+        LOGGER.info("Application {} finished with state {}-{} at {}", appId, appState, appReport!!.finalApplicationStatus, appReport.finishTime)
+    }
+
+    private fun isAppFinished(appState: YarnApplicationState): Boolean {
+        return appState == YarnApplicationState.FINISHED ||
+                appState == YarnApplicationState.KILLED ||
+                appState == YarnApplicationState.FAILED
+    }
+
+    private fun setupAppMasterEnv(appMasterEnv: Map<String, String>) {
+        Apps.addToEnvironment(appMasterEnv,
+                ApplicationConstants.Environment.CLASSPATH.name,
+                ApplicationConstants.Environment.PWD.`$`() + File.separator + "*", File.pathSeparator)
+
+        for (c in conf.getStrings(
+                YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                *YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+            Apps.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name,
+                    c.trim { it <= ' ' }, File.pathSeparator)
+        }
+    }
+
+    companion object {
+
+        private val LOGGER = LoggerFactory.getLogger(Client::class.java)
+
+        @Throws(Exception::class)
+        @JvmStatic
+        fun main(args: Array<String>) = ClientArgsParser(args).main(args)
+
+        private fun joinStrings(str: Array<String>): String {
+
+            val builder = StringBuilder()
+            for (s in str) {
+                builder.append(s)
+                builder.append(" ")
+            }
+            return builder.toString()
+
+        }
+    }
+}
\ No newline at end of file
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
similarity index 66%
rename from leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java
rename to leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
index b8c29b7..b9f1e67 100644
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
@@ -14,15 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.leader.yarn;
+package org.apache.amaterasu.leader.yarn
 
-public class JobOpts {
-    public String repo = "";
-    public String branch = "master";
-    public String env = "default";
-    public String name = "amaterasu-job";
-    public String jobId = null;
-    public String newJobId = null;
-    public String report ="code";
-    public String home ="";
+import org.apache.amaterasu.leader.common.launcher.AmaOpts
+import org.apache.amaterasu.leader.common.launcher.ArgsParser
+
+class ClientArgsParser(val args: Array<String>): ArgsParser() {
+
+    override fun run() {
+        var opts = AmaOpts(repo, branch, env, name, jobId, newJobId, report, home)
+        val client = Client()
+        client.run(opts, args)
+    }
 }
\ No newline at end of file
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt
new file mode 100644
index 0000000..7fc89e8
--- /dev/null
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.yarn
+
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.common.utils.ActiveNotifier
+
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.yarn.api.records.ContainerId
+import org.apache.hadoop.yarn.api.records.ContainerStatus
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync
+
+
+
+class YarnNMCallbackHandler(val notifier: ActiveNotifier) : KLogging() , NMClientAsync.CallbackHandler {
+
+    override fun onStartContainerError(containerId: ContainerId, t: Throwable) {
+        notifier.error("","Container ${containerId.containerId} couldn't start. message ${t.message}")
+    }
+
+    override fun onGetContainerStatusError(containerId: ContainerId, t: Throwable) {
+        notifier.error("","Couldn't get status from container ${containerId.containerId}. message ${t.message}")
+    }
+
+    override fun onContainerStatusReceived(containerId: ContainerId, containerStatus: ContainerStatus) {
+        notifier.info("Container ${containerId.containerId} has status of ${containerStatus.state}")
+    }
+
+    override fun onContainerStarted(containerId: ContainerId, allServiceResponse: Map<String, ByteBuffer>) {
+        notifier.info("Container ${containerId.containerId} Started")
+    }
+
+    override fun onStopContainerError(containerId: ContainerId, t: Throwable) {
+        notifier.error("","Container ${containerId.containerId} has thrown an error.  message ${t.message}")
+    }
+
+    override fun onContainerStopped(containerId: ContainerId) {
+        notifier.info("Container ${containerId.containerId} stopped")
+    }
+
+}
\ No newline at end of file
diff --git a/leader/build.gradle b/leader/build.gradle
index db4e52d..c3034cb 100644
--- a/leader/build.gradle
+++ b/leader/build.gradle
@@ -47,10 +47,10 @@
     compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0'
     compile group: 'org.apache.curator', name:'curator-test', version:'2.9.1'
     compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.9.4'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.4'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.4'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.4'
-    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.4'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
+    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.8'
     compile group: 'org.eclipse.jetty', name: 'jetty-plus', version: '9.4.14.v20181114'
     compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.4.14.v20181114'
     compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.4.14.v20181114'
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java
deleted file mode 100644
index 38a9c38..0000000
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.yarn;
-
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-public class ArgsParser {
-    private static Options getOptions() {
-
-        Options options = new Options();
-        options.addOption("r", "repo", true, "The git repo containing the job");
-        options.addOption("b", "branch", true, "The branch to be executed (default is master)");
-        options.addOption("e", "env", true, "The environment to be executed (test, prod, etc. values from the default env are taken if np env specified)");
-        options.addOption("n", "name", true, "The name of the job");
-        options.addOption("i", "job-id", true, "The jobId - should be passed only when resuming a job");
-        options.addOption("j", "new-job-id", true, "The jobId - should never be passed by a user");
-        options.addOption("r", "report", true, "The level of reporting");
-        options.addOption("h", "home", true, "The level of reporting");
-
-        return options;
-    }
-
-    public static JobOpts getJobOpts(String[] args) throws ParseException {
-
-        CommandLineParser parser = new BasicParser();
-        Options options = getOptions();
-        CommandLine cli = parser.parse(options, args);
-
-        JobOpts opts = new JobOpts();
-        if (cli.hasOption("repo")) {
-            opts.repo = cli.getOptionValue("repo");
-        }
-
-        if (cli.hasOption("branch")) {
-            opts.branch = cli.getOptionValue("branch");
-        }
-
-        if (cli.hasOption("env")) {
-            opts.env = cli.getOptionValue("env");
-        }
-
-        if (cli.hasOption("job-id")) {
-            opts.jobId = cli.getOptionValue("job-id");
-        }
-        if (cli.hasOption("new-job-id")) {
-            opts.newJobId = cli.getOptionValue("new-job-id");
-        }
-
-        if (cli.hasOption("report")) {
-            opts.report = cli.getOptionValue("report");
-        }
-
-        if (cli.hasOption("home")) {
-            opts.home = cli.getOptionValue("home");
-        }
-
-        if (cli.hasOption("name")) {
-            opts.name = cli.getOptionValue("name");
-        }
-
-        return opts;
-    }
-}
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
deleted file mode 100644
index 063a070..0000000
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.yarn;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.amaterasu.common.configuration.ClusterConfig;
-import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory;
-import org.apache.amaterasu.leader.common.utilities.ActiveReportListener;
-import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.log4j.LogManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.*;
-
-import static java.lang.System.exit;
-
-public class Client {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(Client.class);
-    private final Configuration conf = new YarnConfiguration();
-    private FileSystem fs;
-
-    private LocalResource setLocalResourceFromPath(Path path) throws IOException {
-
-        FileStatus stat = fs.getFileStatus(path);
-        LocalResource fileResource = Records.newRecord(LocalResource.class);
-        fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path));
-        fileResource.setSize(stat.getLen());
-        fileResource.setTimestamp(stat.getModificationTime());
-        fileResource.setType(LocalResourceType.FILE);
-        fileResource.setVisibility(LocalResourceVisibility.PUBLIC);
-        return fileResource;
-    }
-
-    private void run(JobOpts opts, String[] args) throws Exception {
-
-        LogManager.resetConfiguration();
-        ClusterConfig config = new ClusterConfig();
-        config.load(new FileInputStream(opts.home + "/amaterasu.properties"));
-
-        // Create yarnClient
-        YarnClient yarnClient = YarnClient.createYarnClient();
-        yarnClient.init(conf);
-        yarnClient.start();
-
-        // Create application via yarnClient
-        YarnClientApplication app = null;
-        try {
-            app = yarnClient.createApplication();
-        } catch (YarnException e) {
-            LOGGER.error("Error initializing yarn application with yarn client.", e);
-            exit(1);
-        } catch (IOException e) {
-            LOGGER.error("Error initializing yarn application with yarn client.", e);
-            exit(2);
-        }
-
-        // Setup jars on hdfs
-        try {
-            fs = FileSystem.get(conf);
-        } catch (IOException e) {
-            LOGGER.error("Eror creating HDFS client isntance.", e);
-            exit(3);
-        }
-        Path jarPath = new Path(config.YARNConf().hdfsJarsPath());
-        Path jarPathQualified = fs.makeQualified(jarPath);
-
-        ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
-
-        String newId = "";
-        if (opts.jobId == null) {
-            newId = "--new-job-id " + appContext.getApplicationId().toString() + "-" + UUID.randomUUID().toString();
-        }
-
-
-        List<String> commands = Collections.singletonList(
-                "env AMA_NODE=" + System.getenv("AMA_NODE") +
-                        " env HADOOP_USER_NAME=" + UserGroupInformation.getCurrentUser().getUserName() +
-                        " $JAVA_HOME/bin/java" +
-                        " -Dscala.usejavacp=false" +
-                        " -Xmx2G" +
-                        " org.apache.amaterasu.leader.yarn.ApplicationMaster " +
-                        joinStrings(args) +
-                        newId +
-                        " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
-                        " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
-        );
-
-
-        // Set up the container launch context for the application master
-        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
-        amContainer.setCommands(commands);
-
-        // Setup local ama folder on hdfs.
-        try {
-
-            System.out.println("===> " + jarPathQualified);
-            if (!fs.exists(jarPathQualified)) {
-                File home = new File(opts.home);
-                fs.mkdirs(jarPathQualified);
-
-                for (File f : home.listFiles()) {
-                    fs.copyFromLocalFile(false, true, new Path(f.getAbsolutePath()), jarPathQualified);
-                }
-
-                // setup frameworks
-                System.out.println("===> setting up frameworks");
-                FrameworkProvidersFactory frameworkFactory = FrameworkProvidersFactory.apply(opts.env, config);
-                for (String group : frameworkFactory.groups()) {
-                    System.out.println("===> setting up " + group);
-                    FrameworkSetupProvider framework = frameworkFactory.getFramework(group);
-
-                    //creating a group folder
-                    Path frameworkPath = Path.mergePaths(jarPathQualified, new Path("/" + framework.getGroupIdentifier()));
-                    System.out.println("===> " + frameworkPath.toString());
-
-                    fs.mkdirs(frameworkPath);
-                    for (File file : framework.getGroupResources()) {
-                        if (file.exists())
-                            fs.copyFromLocalFile(false, true, new Path(file.getAbsolutePath()), frameworkPath);
-                    }
-                }
-            }
-        } catch (IOException e) {
-            System.out.println("===>" + e.getMessage());
-            LOGGER.error("Error uploading ama folder to HDFS.", e);
-            exit(3);
-        } catch (NullPointerException ne) {
-            System.out.println("===>" + ne.getMessage());
-            LOGGER.error("No files in home dir.", ne);
-            exit(4);
-        }
-
-        // get version of build
-        String version = config.version();
-
-        // get local resources pointers that will be set on the master container env
-        String leaderJarPath = String.format("/bin/leader-%s-all.jar", version);
-        LOGGER.info("Leader Jar path is: {}", leaderJarPath);
-        Path mergedPath = Path.mergePaths(jarPath, new Path(leaderJarPath));
-
-        // System.out.println("===> path: " + jarPathQualified);
-        LOGGER.info("Leader merged jar path is: {}", mergedPath);
-        LocalResource leaderJar = null;
-        LocalResource propFile = null;
-        LocalResource log4jPropFile = null;
-
-        try {
-            leaderJar = setLocalResourceFromPath(mergedPath);
-            propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/amaterasu.properties")));
-            log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/log4j.properties")));
-        } catch (IOException e) {
-            LOGGER.error("Error initializing yarn local resources.", e);
-            exit(4);
-        }
-
-        // set local resource on master container
-        Map<String, LocalResource> localResources = new HashMap<>();
-        //localResources.put("leader.jar", leaderJar);
-        // making the bin folder's content available to the appMaster
-        RemoteIterator<LocatedFileStatus> bin = fs.listFiles(Path.mergePaths(jarPath, new Path("/bin")), true);
-
-        while (bin.hasNext()){
-            LocatedFileStatus binFile = bin.next();
-            localResources.put(binFile.getPath().getName(), setLocalResourceFromPath(binFile.getPath()));
-        }
-
-        localResources.put("amaterasu.properties", propFile);
-        localResources.put("log4j.properties", log4jPropFile);
-        amContainer.setLocalResources(localResources);
-
-        // Setup CLASSPATH for ApplicationMaster
-        Map<String, String> appMasterEnv = new HashMap<>();
-        setupAppMasterEnv(appMasterEnv);
-        appMasterEnv.put("AMA_CONF_PATH", String.format("%s/amaterasu.properties", config.YARNConf().hdfsJarsPath()));
-        amContainer.setEnvironment(appMasterEnv);
-
-        // Set up resource type requirements for ApplicationMaster
-        Resource capability = Records.newRecord(Resource.class);
-        capability.setMemory(config.YARNConf().master().memoryMB());
-        capability.setVirtualCores(config.YARNConf().master().cores());
-
-        // Finally, set-up ApplicationSubmissionContext for the application
-        appContext.setApplicationName("amaterasu-" + opts.name);
-        appContext.setAMContainerSpec(amContainer);
-        appContext.setResource(capability);
-        appContext.setQueue(config.YARNConf().queue());
-        appContext.setPriority(Priority.newInstance(1));
-
-        // Submit application
-        ApplicationId appId = appContext.getApplicationId();
-        LOGGER.info("Submitting application {}", appId);
-        try {
-            yarnClient.submitApplication(appContext);
-
-        } catch (YarnException e) {
-            LOGGER.error("Error submitting application.", e);
-            exit(6);
-        } catch (IOException e) {
-            LOGGER.error("Error submitting application.", e);
-            exit(7);
-        }
-
-        CuratorFramework client = CuratorFrameworkFactory.newClient(config.zk(),
-                new ExponentialBackoffRetry(1000, 3));
-        client.start();
-
-        String newJobId = newId.replace("--new-job-id ", "");
-        System.out.println("===> /" + newJobId + "-report-barrier");
-        DistributedBarrier reportBarrier = new DistributedBarrier(client, "/" + newJobId + "-report-barrier");
-        reportBarrier.setBarrier();
-        reportBarrier.waitOnBarrier();
-
-        String address = new String(client.getData().forPath("/" + newJobId + "/broker"));
-        System.out.println("===> " + address);
-        setupReportListener(address);
-
-        ApplicationReport appReport = null;
-        YarnApplicationState appState;
-
-        do {
-            try {
-                appReport = yarnClient.getApplicationReport(appId);
-            } catch (YarnException e) {
-                LOGGER.error("Error getting application report.", e);
-                exit(8);
-            } catch (IOException e) {
-                LOGGER.error("Error getting application report.", e);
-                exit(9);
-            }
-            appState = appReport.getYarnApplicationState();
-            if (isAppFinished(appState)) {
-                exit(0);
-                break;
-            }
-            //LOGGER.info("Application not finished ({})", appReport.getProgress());
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                LOGGER.error("Interrupted while waiting for job completion.", e);
-                exit(137);
-            }
-        } while (!isAppFinished(appState));
-
-        LOGGER.info("Application {} finished with state {}-{} at {}", appId, appState, appReport.getFinalApplicationStatus(), appReport.getFinishTime());
-    }
-
-    private boolean isAppFinished(YarnApplicationState appState) {
-        return appState == YarnApplicationState.FINISHED ||
-                appState == YarnApplicationState.KILLED ||
-                appState == YarnApplicationState.FAILED;
-    }
-
-    public static void main(String[] args) throws Exception {
-        Client c = new Client();
-
-        JobOpts opts = ArgsParser.getJobOpts(args);
-
-        c.run(opts, args);
-    }
-
-    private static String joinStrings(String[] str) {
-
-        StringBuilder builder = new StringBuilder();
-        for (String s : str) {
-            builder.append(s);
-            builder.append(" ");
-        }
-        return builder.toString();
-
-    }
-
-    private void setupReportListener(String address) throws JMSException {
-
-        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(address);
-        Connection conn = cf.createConnection();
-        conn.start();
-
-        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-        //TODO: move to a const in common
-        Topic destination = session.createTopic("JOB.REPORT");
-
-        MessageConsumer consumer = session.createConsumer(destination);
-        consumer.setMessageListener(new ActiveReportListener());
-
-    }
-
-    private void setupAppMasterEnv(Map<String, String> appMasterEnv) {
-        Apps.addToEnvironment(appMasterEnv,
-                ApplicationConstants.Environment.CLASSPATH.name(),
-                ApplicationConstants.Environment.PWD.$() + File.separator + "*", File.pathSeparator);
-
-        for (String c : conf.getStrings(
-                YarnConfiguration.YARN_APPLICATION_CLASSPATH,
-                YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
-            Apps.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name(),
-                    c.trim(), File.pathSeparator);
-        }
-    }
-}
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index 234f896..a87ea5d 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -130,8 +130,8 @@
 
     val resources = offer.getResourcesList.asScala
 
-    resources.count(r => r.getName == "cpus" && r.getScalar.getValue >= config.Jobs.Tasks.cpus) > 0 &&
-      resources.count(r => r.getName == "mem" && r.getScalar.getValue >= config.Jobs.Tasks.mem) > 0
+    resources.count(r => r.getName == "cpus" && r.getScalar.getValue >= config.jobs.tasks.cpus) > 0 &&
+      resources.count(r => r.getName == "mem" && r.getScalar.getValue >= config.jobs.tasks.mem) > 0
   }
 
   def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {
@@ -163,7 +163,7 @@
             val envYaml = configManager.getActionConfigContent(actionData.getName, actionData.getConfig)
             writeConfigFile(envYaml, jobManager.getJobId, actionData.getName, "env.yaml")
 
-            val dataStores = DataLoader.getTaskData(actionData, env).exports
+            val dataStores = DataLoader.getTaskData(actionData, env).getExports
             val writer = new StringWriter()
             yamlMapper.writeValue(writer, dataStores)
             val dataStoresYaml = writer.toString
@@ -284,7 +284,7 @@
                 .setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env)))
                 .addResources(createScalarResource("cpus", driverConfiguration.getCpus))
                 .addResources(createScalarResource("mem", driverConfiguration.getMemory))
-                .addResources(createScalarResource("disk", config.Jobs.repoSize))
+                .addResources(createScalarResource("disk", config.jobs.repoSize))
                 .setSlaveId(offer.getSlaveId)
                 .build()
 
@@ -300,7 +300,7 @@
                 //.setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env)))
                 .addResources(createScalarResource("cpus", driverConfiguration.getCpus))
                 .addResources(createScalarResource("mem", driverConfiguration.getMemory))
-                .addResources(createScalarResource("disk", config.Jobs.repoSize))
+                .addResources(createScalarResource("disk", config.jobs.repoSize))
                 .setSlaveId(offer.getSlaveId)
                 .build()
 
@@ -348,7 +348,7 @@
         branch,
         frameworkId.getValue,
         client,
-        config.Jobs.Tasks.attempts,
+        config.jobs.tasks.attempts,
         new LinkedBlockingQueue[ActionData]()
       )
     }
@@ -357,7 +357,7 @@
       JobLoader.reloadJob(
         frameworkId.getValue,
         client,
-        config.Jobs.Tasks.attempts,
+        config.jobs.tasks.attempts,
         new LinkedBlockingQueue[ActionData]()
       )
 
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
deleted file mode 100644
index 1fe6549..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++ /dev/null
@@ -1,483 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.yarn
-
-import java.io.{File, FileInputStream, InputStream}
-import java.net.{InetAddress, ServerSocket}
-import java.nio.ByteBuffer
-import java.util
-import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
-
-import javax.jms.MessageConsumer
-import org.apache.activemq.broker.BrokerService
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.common.execution.JobManager
-import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
-import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
-import org.apache.amaterasu.leader.execution.JobLoader
-import org.apache.amaterasu.leader.utilities.Args
-import org.apache.curator.framework.recipes.barriers.DistributedBarrier
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
-import org.apache.hadoop.yarn.client.api.async.{AMRMClientAsync, NMClientAsync}
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import org.apache.zookeeper.CreateMode
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.{concurrent, mutable}
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-class ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler {
-
-  var capability: Resource = _
-
-  log.info("ApplicationMaster start")
-
-  private var jobManager: JobManager = _
-  private var client: CuratorFramework = _
-  private var config: ClusterConfig = _
-  private var env: String = _
-  private var branch: String = _
-  private var fs: FileSystem = _
-  private var conf: YarnConfiguration = _
-  private var propPath: String = ""
-  private var props: InputStream = _
-  private var jarPath: Path = _
-  private var executorPath: Path = _
-  private var executorJar: LocalResource = _
-  private var propFile: LocalResource = _
-  private var log4jPropFile: LocalResource = _
-  private var nmClient: NMClientAsync = _
-  private var allocListener: YarnRMCallbackHandler = _
-  private var rmClient: AMRMClientAsync[ContainerRequest] = _
-  private var address: String = _
-  private var consumer: MessageConsumer = _
-
-  private val containersIdsToTask: concurrent.Map[Long, ActionData] = new ConcurrentHashMap[Long, ActionData].asScala
-  private val completedContainersAndTaskIds: concurrent.Map[Long, String] = new ConcurrentHashMap[Long, String].asScala
-  private val actionsBuffer: java.util.concurrent.ConcurrentLinkedQueue[ActionData] = new java.util.concurrent.ConcurrentLinkedQueue[ActionData]()
-  private val host: String = InetAddress.getLocalHost.getHostName
-  private val broker: BrokerService = new BrokerService()
-
-
-  def setLocalResourceFromPath(path: Path): LocalResource = {
-
-    val stat = fs.getFileStatus(path)
-    val fileResource = Records.newRecord(classOf[LocalResource])
-
-    fileResource.setShouldBeUploadedToSharedCache(true)
-    fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
-    fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path))
-    fileResource.setSize(stat.getLen)
-    fileResource.setTimestamp(stat.getModificationTime)
-    fileResource.setType(LocalResourceType.FILE)
-    fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
-    fileResource
-
-  }
-
-  def execute(arguments: Args): Unit = {
-
-    log.info(s"Started AM with args $arguments")
-
-    propPath = System.getenv("PWD") + "/amaterasu.properties"
-    props = new FileInputStream(new File(propPath))
-
-    // no need for hdfs double check (nod to Aaron Rodgers)
-    // jars on HDFS should have been verified by the YARN client
-    conf = new YarnConfiguration()
-    fs = FileSystem.get(conf)
-
-    config = ClusterConfig(props)
-
-    try {
-      initJob(arguments)
-    } catch {
-      case e: Exception => log.error("error initializing ", e.getMessage)
-    }
-
-    // now that the job was initiated, the curator client is Started and we can
-    // register the broker's address
-    client.create().withMode(CreateMode.PERSISTENT).forPath(s"/${jobManager.getJobId}/broker")
-    client.setData().forPath(s"/${jobManager.getJobId}/broker", address.getBytes)
-
-    // once the broker is registered, we can remove the barrier so clients can connect
-    log.info(s"/${jobManager.getJobId}-report-barrier")
-    val barrier = new DistributedBarrier(client, s"/${jobManager.getJobId}-report-barrier")
-    barrier.removeBarrier()
-
-    consumer = MessagingClientUtil.setupMessaging(address)
-
-    log.info(s"Job ${jobManager.getJobId} initiated with ${jobManager.getRegisteredActions.size} actions")
-
-    jarPath = new Path(config.YARNConf.hdfsJarsPath)
-
-    // TODO: change this to read all dist folder and add to exec path
-    executorPath = Path.mergePaths(jarPath, new Path(s"/dist/executor-${config.version}-all.jar"))
-    log.info("Executor jar path is {}", executorPath)
-    executorJar = setLocalResourceFromPath(executorPath)
-    propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/amaterasu.properties")))
-    log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/log4j.properties")))
-
-    log.info("Started execute")
-
-    nmClient = new NMClientAsyncImpl(new YarnNMCallbackHandler())
-
-    // Initialize clients to ResourceManager and NodeManagers
-    nmClient.init(conf)
-    nmClient.start()
-
-    // TODO: awsEnv currently set to empty string. should be changed to read values from (where?).
-    allocListener = new YarnRMCallbackHandler(nmClient, jobManager, env, awsEnv = "", config, executorJar)
-
-    rmClient = startRMClient()
-    val registrationResponse = registerAppMaster("", 0, "")
-    val maxMem = registrationResponse.getMaximumResourceCapability.getMemory
-    log.info("Max mem capability of resources in this cluster " + maxMem)
-    val maxVCores = registrationResponse.getMaximumResourceCapability.getVirtualCores
-    log.info("Max vcores capability of resources in this cluster " + maxVCores)
-    log.info(s"Created jobManager. jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}")
-
-    // Resource requirements for worker containers
-    this.capability = Records.newRecord(classOf[Resource])
-    val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
-
-    while (!jobManager.getOutOfActions) {
-      val actionData = jobManager.getNextActionData
-      if (actionData != null) {
-
-        val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
-        val driverConfiguration = frameworkProvider.getDriverConfiguration
-
-        var mem: Int = driverConfiguration.getMemory
-        mem = Math.min(mem, maxMem)
-        this.capability.setMemory(mem)
-
-        var cpu = driverConfiguration.getCpus
-        cpu = Math.min(cpu, maxVCores)
-        this.capability.setVirtualCores(cpu)
-
-        askContainer(actionData)
-      }
-    }
-
-    log.info("Finished asking for containers")
-  }
-
-  private def startRMClient(): AMRMClientAsync[ContainerRequest] = {
-    val client = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](1000, this)
-    client.init(conf)
-    client.start()
-    client
-  }
-
-  private def registerAppMaster(host: String, port: Int, url: String) = {
-    // Register with ResourceManager
-    log.info("Registering application")
-    val registrationResponse = rmClient.registerApplicationMaster(host, port, url)
-    log.info("Registered application")
-    registrationResponse
-  }
-
-//  private def setupMessaging(jobId: String): Unit = {
-//
-//    val cf = new ActiveMQConnectionFactory(address)
-//    val conn = cf.createConnection()
-//    conn.start()
-//
-//    val session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
-//    //TODO: move to a const in common
-//    val destination = session.createTopic("JOB.REPORT")
-//
-//    val consumer = session.createConsumer(destination)
-//    consumer.setMessageListener(new ActiveReportListener)
-//
-//  }
-
-  private def askContainer(actionData: ActionData): Unit = {
-
-    actionsBuffer.add(actionData)
-    log.info(s"About to ask container for action ${actionData.getId}. Action buffer size is: ${actionsBuffer.size()}")
-
-    // we have an action to schedule, let's request a container
-    val priority: Priority = Records.newRecord(classOf[Priority])
-    priority.setPriority(1)
-    val containerReq = new ContainerRequest(capability, null, null, priority)
-    rmClient.addContainerRequest(containerReq)
-    log.info(s"Asked container for action ${actionData.getId}")
-
-  }
-
-  override def onContainersAllocated(containers: util.List[Container]): Unit = {
-
-    log.info(s"${containers.size()} Containers allocated")
-    for (container <- containers.asScala) { // Launch container by create ContainerLaunchContext
-      if (actionsBuffer.isEmpty) {
-        log.warn(s"Why actionBuffer empty and i was called?. Container ids: ${containers.map(c => c.getId.getContainerId)}")
-        return
-      }
-
-      val actionData = actionsBuffer.poll()
-      val containerTask = Future[ActionData] {
-
-        val frameworkFactory = FrameworkProvidersFactory(env, config)
-        val framework = frameworkFactory.getFramework(actionData.getGroupId)
-        val runnerProvider = framework.getRunnerProvider(actionData.getTypeId)
-        val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-        val commands: List[String] = List(runnerProvider.getCommand(jobManager.getJobId, actionData, env, s"${actionData.getId}-${container.getId.getContainerId}", address))
-
-        log.info("Running container id {}.", container.getId.getContainerId)
-        log.info("Running container id {} with command '{}'", container.getId.getContainerId, commands.last)
-
-        ctx.setCommands(commands)
-        ctx.setTokens(allTokens)
-
-        val yarnJarPath = new Path(config.YARNConf.hdfsJarsPath)
-
-        //TODO Eyal - Remove the hardcoding of the dist path
-        /*  val resources = mutable.Map[String, LocalResource]()
-          val binaryFileIter = fs.listFiles(new Path(s"${config.YARN.hdfsJarsPath}/dist"), false)
-          while (binaryFileIter.hasNext) {
-            val eachFile = binaryFileIter.next().getPath
-            resources (eachFile.getName) = setLocalResourceFromPath(fs.makeQualified(eachFile))
-          }
-          resources("log4j.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/log4j.properties")))
-          resources ("amaterasu.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/amaterasu.properties")))*/
-
-        val resources = mutable.Map[String, LocalResource](
-          "executor.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/executor-${config.version}-all.jar"))),
-          "spark-runner.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runner-${config.version}-all.jar"))),
-          "spark-runtime.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runtime-${config.version}.jar"))),
-          "amaterasu.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/amaterasu.properties"))),
-          "log4j.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/log4j.properties"))),
-          // TODO: Nadav/Eyal all of these should move to the executor resource setup
-          "miniconda.sh" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/miniconda.sh"))),
-          "codegen.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/codegen.py"))),
-          "runtime.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/runtime.py"))),
-          "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark-version-info.properties"))),
-          "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark_intp.py"))))
-
-        //adding the framework and executor resources
-        setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
-        setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.getTypeId}", resources, s"${framework.getGroupIdentifier}-${actionData.getTypeId}")
-
-        ctx.setLocalResources(resources)
-
-        ctx.setEnvironment(Map[String, String](
-          "HADOOP_CONF_DIR" -> s"${config.YARNConf.hadoopHomeDir}/conf/",
-          "YARN_CONF_DIR" -> s"${config.YARNConf.hadoopHomeDir}/conf/",
-          "AMA_NODE" -> sys.env("AMA_NODE"),
-          "HADOOP_USER_NAME" -> UserGroupInformation.getCurrentUser.getUserName
-        ))
-
-        log.info(s"hadoop conf dir is ${config.YARNConf.hadoopHomeDir}/conf/")
-        nmClient.startContainerAsync(container, ctx)
-        actionData
-      }
-
-      containerTask onComplete {
-        case Failure(t) =>
-          log.error(s"launching container Failed", t)
-          askContainer(actionData)
-
-        case Success(requestedActionData) =>
-          jobManager.actionStarted(requestedActionData.getId)
-          containersIdsToTask.put(container.getId.getContainerId, requestedActionData)
-          log.info(s"launching container succeeded: ${container.getId.getContainerId}; task: ${requestedActionData.getId}")
-
-      }
-    }
-  }
-
-  private def allTokens: ByteBuffer = {
-    // creating the credentials for container execution
-    val credentials = UserGroupInformation.getCurrentUser.getCredentials
-    val dob = new DataOutputBuffer
-    credentials.writeTokenStorageToStream(dob)
-
-    // removing the AM->RM token so that containers cannot access it.
-    val iter = credentials.getAllTokens.iterator
-    log.info("Executing with tokens:")
-    for (token <- iter) {
-      log.info(token.toString)
-      if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
-    }
-    ByteBuffer.wrap(dob.getData, 0, dob.getLength)
-  }
-
-  private def setupResources(yarnJarPath: Path, frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = {
-
-    val sourcePath = Path.mergePaths(yarnJarPath, new Path(s"/$resourcesPath"))
-
-    if (fs.exists(sourcePath)) {
-
-      val files = fs.listFiles(sourcePath, true)
-
-      while (files.hasNext) {
-        val res = files.next()
-        val containerPath = res.getPath.toUri.getPath.replace("/apps/amaterasu/", "")
-        countainerResources.put(containerPath, setLocalResourceFromPath(res.getPath))
-      }
-    }
-  }
-
-  def stopApplication(finalApplicationStatus: FinalApplicationStatus, appMessage: String): Unit = {
-    import java.io.IOException
-
-    import org.apache.hadoop.yarn.exceptions.YarnException
-    try
-      rmClient.unregisterApplicationMaster(finalApplicationStatus, appMessage, null)
-    catch {
-      case ex: YarnException =>
-        log.error("Failed to unregister application", ex)
-      case e: IOException =>
-        log.error("Failed to unregister application", e)
-    }
-    rmClient.stop()
-    nmClient.stop()
-  }
-
-  override def onContainersCompleted(statuses: util.List[ContainerStatus]): Unit = {
-
-    for (status <- statuses.asScala) {
-
-      if (status.getState == ContainerState.COMPLETE) {
-
-        val containerId = status.getContainerId.getContainerId
-        val task = containersIdsToTask(containerId)
-        rmClient.releaseAssignedContainer(status.getContainerId)
-
-        val taskId = task.getId
-        if (status.getExitStatus == 0) {
-
-          //completedContainersAndTaskIds.put(containerId, task.id)
-          jobManager.actionComplete(taskId)
-          log.info(s"Container $containerId Complete with task ${taskId} with success.")
-        } else {
-          // TODO: Check the getDiagnostics value and see if appropriate
-          jobManager.actionFailed(taskId, status.getDiagnostics)
-          log.warn(s"Container $containerId Complete with task ${taskId} with Failed status code (${status.getExitStatus})")
-        }
-      }
-    }
-
-    if (jobManager.getOutOfActions) {
-      log.info("Finished all tasks successfully! Wow!")
-      jobManager.actionsCount()
-      stopApplication(FinalApplicationStatus.SUCCEEDED, "SUCCESS")
-    } else {
-      log.info(s"jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}; completedContainersAndTaskIds.size: ${completedContainersAndTaskIds.size}")
-    }
-  }
-
-  override def getProgress: Float = {
-    jobManager.getRegisteredActions.size.toFloat / completedContainersAndTaskIds.size
-  }
-
-  override def onNodesUpdated(updatedNodes: util.List[NodeReport]): Unit = {
-    log.info("Nodes change. Nothing to report.")
-  }
-
-  override def onShutdownRequest(): Unit = {
-    log.error("Shutdown requested.")
-    stopApplication(FinalApplicationStatus.KILLED, "Shutdown requested")
-  }
-
-  override def onError(e: Throwable): Unit = {
-    log.error("Error on AM", e)
-    stopApplication(FinalApplicationStatus.FAILED, "Error on AM")
-  }
-
-  def initJob(args: Args): Unit = {
-
-    this.env = args.env
-    this.branch = args.branch
-    try {
-      val retryPolicy = new ExponentialBackoffRetry(1000, 3)
-      client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy)
-      client.start()
-    } catch {
-      case e: Exception =>
-        log.error("Error connecting to zookeeper", e)
-        throw e
-    }
-    if (args.jobId != null && !args.jobId.isEmpty) {
-      log.info("resuming job" + args.jobId)
-      jobManager = JobLoader.reloadJob(
-        args.jobId,
-        client,
-        config.Jobs.Tasks.attempts,
-        new LinkedBlockingQueue[ActionData])
-
-    } else {
-      log.info("new job is being created")
-      try {
-
-        jobManager = JobLoader.loadJob(
-          args.repo,
-          args.branch,
-          args.newJobId,
-          client,
-          config.Jobs.Tasks.attempts,
-          new LinkedBlockingQueue[ActionData])
-      } catch {
-        case e: Exception =>
-          log.error("Error creating JobManager.", e)
-          throw e
-      }
-
-    }
-
-    jobManager.start()
-    log.info("Started jobManager")
-  }
-}
-
-object ApplicationMaster extends Logging with App {
-
-
-  val parser = Args.getParser
-  parser.parse(args, Args()) match {
-
-    case Some(arguments: Args) =>
-      val appMaster = new ApplicationMaster()
-
-      appMaster.address = MessagingClientUtil.getBorkerAddress
-      appMaster.broker.addConnector(appMaster.address)
-      appMaster.broker.start()
-
-      log.info(s"broker Started with address ${appMaster.address}")
-      appMaster.execute(arguments)
-
-    case None =>
-  }
-
-
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.scala
deleted file mode 100644
index 2b4b3ae..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.yarn
-
-import java.nio.ByteBuffer
-import java.util
-
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.hadoop.yarn.api.records.{ContainerId, ContainerStatus}
-import org.apache.hadoop.yarn.client.api.async.NMClientAsync
-
-
-class YarnNMCallbackHandler extends Logging with NMClientAsync.CallbackHandler {
-
-  override def onStartContainerError(containerId: ContainerId, t: Throwable): Unit = {
-    log.error(s"Container ${containerId.getContainerId} couldn't start.", t)
-  }
-
-  override def onGetContainerStatusError(containerId: ContainerId, t: Throwable): Unit = {
-    log.error(s"Couldn't get status from container ${containerId.getContainerId}.", t)
-  }
-
-  override def onContainerStatusReceived(containerId: ContainerId, containerStatus: ContainerStatus): Unit = {
-    log.info(s"Container ${containerId.getContainerId} has status of ${containerStatus.getState}")
-  }
-
-  override def onContainerStarted(containerId: ContainerId, allServiceResponse: util.Map[String, ByteBuffer]): Unit = {
-    log.info(s"Container ${containerId.getContainerId} Started")
-  }
-
-  override def onStopContainerError(containerId: ContainerId, t: Throwable): Unit = {
-    log.error(s"Container ${containerId.getContainerId} has thrown an error", t)
-  }
-
-  override def onContainerStopped(containerId: ContainerId): Unit = {
-    log.info(s"Container ${containerId.getContainerId} stopped")
-  }
-
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
deleted file mode 100644
index c99fd26..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/YarnRMCallbackHandler.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.yarn
-
-import java.util
-import java.util.Collections
-import java.util.concurrent.ConcurrentHashMap
-
-import com.google.gson.Gson
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.common.execution.JobManager
-import org.apache.amaterasu.leader.common.utilities.DataLoader
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.async.{AMRMClientAsync, NMClientAsync}
-import org.apache.hadoop.yarn.util.Records
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.concurrent
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.{Future, _}
-import scala.util.{Failure, Success}
-
-class YarnRMCallbackHandler(nmClient: NMClientAsync,
-                            jobManager: JobManager,
-                            env: String,
-                            awsEnv: String,
-                            config: ClusterConfig,
-                            executorJar: LocalResource) extends Logging with AMRMClientAsync.CallbackHandler {
-
-
-  val gson:Gson = new Gson()
-  private val containersIdsToTaskIds: concurrent.Map[Long, String] = new ConcurrentHashMap[Long, String].asScala
-  private val completedContainersAndTaskIds: concurrent.Map[Long, String] = new ConcurrentHashMap[Long, String].asScala
-  private val failedTasksCounter: concurrent.Map[String, Int] = new ConcurrentHashMap[String, Int].asScala
-
-
-  override def onError(e: Throwable): Unit = {
-    println(s"ERROR: ${e.getMessage}")
-  }
-
-  override def onShutdownRequest(): Unit = {
-    println("Shutdown requested")
-  }
-
-  val MAX_ATTEMPTS_PER_TASK = 3
-
-  override def onContainersCompleted(statuses: util.List[ContainerStatus]): Unit = {
-    for (status <- statuses.asScala) {
-      if (status.getState == ContainerState.COMPLETE) {
-        val containerId = status.getContainerId.getContainerId
-        val taskId = containersIdsToTaskIds(containerId)
-        if (status.getExitStatus == 0) {
-          completedContainersAndTaskIds.put(containerId, taskId)
-          log.info(s"Container $containerId Complete with task $taskId with success.")
-        } else {
-          log.warn(s"Container $containerId Complete with task $taskId with Failed status code (${status.getExitStatus}.")
-          val failedTries = failedTasksCounter.getOrElse(taskId, 0)
-          if (failedTries < MAX_ATTEMPTS_PER_TASK) {
-            // TODO: notify and ask for a new container
-            log.info("Retrying task")
-          } else {
-            log.error(s"Already tried task $taskId $MAX_ATTEMPTS_PER_TASK times. Time to say Bye-Bye.")
-            // TODO: die already
-          }
-        }
-      }
-    }
-    if (getProgress == 1F) {
-      log.info("Finished all tasks successfully! Wow!")
-    }
-  }
-
-  override def getProgress: Float = {
-    jobManager.getRegisteredActions.size.toFloat / completedContainersAndTaskIds.size
-  }
-
-  override def onNodesUpdated(updatedNodes: util.List[NodeReport]): Unit = {
-  }
-
-  override def onContainersAllocated(containers: util.List[Container]): Unit = {
-    log.info("containers allocated")
-    for (container <- containers.asScala) { // Launch container by create ContainerLaunchContext
-      val containerTask = Future[String] {
-
-        val actionData = jobManager.getNextActionData
-        val taskData = DataLoader.getTaskData(actionData, env)
-        val execData = DataLoader.getExecutorData(env, config)
-
-        val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-        val command = s"""$awsEnv env AMA_NODE=${sys.env("AMA_NODE")}
-                         | env SPARK_EXECUTOR_URI=http://${sys.env("AMA_NODE")}:${config.webserver.Port}/dist/spark-${config.webserver.sparkVersion}.tgz
-                         | java -cp executor.jar:spark-${config.webserver.sparkVersion}/lib/*
-                         | -Dscala.usejavacp=true
-                         | -Djava.library.path=/usr/lib org.apache.amaterasu.executor.yarn.executors.ActionsExecutorLauncher
-                         | ${jobManager.getJobId} ${config.master} ${actionData.getName} ${gson.toJson(taskData)} ${gson.toJson(execData)}""".stripMargin
-        ctx.setCommands(Collections.singletonList(command))
-
-        ctx.setLocalResources(Map[String, LocalResource] (
-          "executor.jar" -> executorJar
-        ))
-
-        nmClient.startContainerAsync(container, ctx)
-        actionData.getId
-      }
-
-      containerTask onComplete {
-        case Failure(t) => {
-          println(s"launching container Failed: ${t.getMessage}")
-        }
-
-        case Success(actionDataId) => {
-          containersIdsToTaskIds.put(container.getId.getContainerId, actionDataId)
-          println(s"launching container succeeded: ${container.getId}")
-        }
-      }
-    }
-  }
-}
diff --git a/leader/src/main/scripts/ama-start-mesos.sh b/leader/src/main/scripts/ama-start-mesos.sh
index fde8e91..adc6d9f 100755
--- a/leader/src/main/scripts/ama-start-mesos.sh
+++ b/leader/src/main/scripts/ama-start-mesos.sh
@@ -16,7 +16,7 @@
 #    limitations under the License.
 #
 
-BASEDIR=$(dirname "$0")
+BASEDIR=$(dirname -- $0)
 
 export AMA_NODE="$(hostname)"
 #pushd $BASEDIR >/dev/null
diff --git a/leader/src/main/scripts/ama-start-yarn.sh b/leader/src/main/scripts/ama-start-yarn.sh
index fe14f6e..4208d0a 100755
--- a/leader/src/main/scripts/ama-start-yarn.sh
+++ b/leader/src/main/scripts/ama-start-yarn.sh
@@ -16,7 +16,7 @@
 #    limitations under the License.
 #
 
-BASEDIR=$(dirname "$0")
+BASEDIR=$(dirname -- $0)
 
 export AMA_NODE="$(hostname)"
 
@@ -104,46 +104,40 @@
 export HADOOP_USER_CLASSPATH_FIRST=true
 export YARN_USER_CLASSPATH=${YARN_USER_CLASSPATH}:bin/*
 
-CMD="yarn jar ${BASEDIR}/bin/leader-0.2.0-incubating-rc4-all.jar org.apache.amaterasu.leader.yarn.Client --home ${BASEDIR}"
+CMD="yarn jar ${BASEDIR}/bin/leader-yarn-0.2.0-incubating-rc4-all.jar org.apache.amaterasu.leader.yarn.Client --home=${BASEDIR}"
 
 if [ -n "$REPO" ]; then
     echo "repo is ${REPO}"
-    CMD+=" --repo ${REPO}"
+    CMD+=" --repo=${REPO}"
 fi
 
 if [ -n "$BRANCH" ]; then
     echo "branch is ${BRANCH}"
-    CMD+=" --branch ${BRANCH}"
+    CMD+=" --branch=${BRANCH}"
 fi
 
 if [ -n "$ENV" ]; then
-    CMD+=" --env ${ENV}"
+    CMD+=" --env=${ENV}"
 fi
 
 if [ -n "$NAME" ]; then
-    CMD+=" --name ${NAME}"
+    CMD+=" --name=${NAME}"
 fi
 
 if [ -n "$JOBID" ]; then
-    CMD+=" --job-id ${JOBID}"
+    CMD+=" --job-id=${JOBID}"
 fi
 
 if [ -n "$REPORT" ]; then
-    CMD+=" --report ${REPORT}"
+    CMD+=" --report=${REPORT}"
 fi
 
 if [ -n "$JARPATH" ]; then
-    CMD+=" --jar-path ${JARPATH}"
+    CMD+=" --jar-path=${JARPATH}"
 fi
 
 echo $CMD
 
-if [ ! -f ${BASEDIR}/dist/miniconda.sh ]; then
-    echo "${bold}Fetching miniconda distributable ${NC}"
-    wget https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh -O ${BASEDIR}/dist/miniconda.sh
-fi
-
-
 if [ "$FORCE_BIN" = true ] ; then
     echo "FORCE: Deleting and re-creating /apps/amaterasu folder"
     eval "hdfs dfs -rm -R -skipTrash /apps/amaterasu"
@@ -152,6 +146,7 @@
     #eval "hdfs dfs -copyFromLocal ${BASEDIR}/* /apps/amaterasu/"
 fi
 
+
 eval $CMD | grep "===>"
 kill $SERVER_PID
 
diff --git a/leader/src/main/scripts/amaterasu.properties b/leader/src/main/scripts/amaterasu.properties
index 5f4a05d..ef5242a 100755
--- a/leader/src/main/scripts/amaterasu.properties
+++ b/leader/src/main/scripts/amaterasu.properties
@@ -12,16 +12,20 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
-zk=192.168.56.101
+zk=127.0.0.1
 version=0.2.0-incubating-rc4
-master=192.168.56.101
+master=192.168.33.11
 user=root
-mode=mesos
+mode=yarn
 webserver.port=8000
 webserver.root=dist
 spark.version=2.2.1-bin-hadoop2.7
+yarn.queue=default
+yarn.jarspath=hdfs:///apps/amaterasu
 spark.home=/usr/lib/spark
 #spark.home=/opt/cloudera/parcels/SPARK2-2.1.0.cloudera2-1.cdh5.7.0.p0.171658/lib/spark2
+yarn.hadoop.home.dir=/etc/hadoop
 spark.opts.spark.yarn.am.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
 spark.opts.spark.driver.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
-mesos.libPath=/usr/lib/libmesos.so
+yarn.master.memoryMB=2048
+yarn.worker.memoryMB=2048
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
index 1b8581f..5987b35 100755
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
+++ b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
@@ -31,18 +31,18 @@
 class JobParserTests extends FlatSpec with Matchers {
 
   val retryPolicy = new ExponentialBackoffRetry(1000, 3)
-  val server = new TestingServer(2182, true)
+  val server = new TestingServer(2187, true)
   val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
   client.start()
 
-  val jobId = s"job_${System.currentTimeMillis}"
-  val yaml = Source.fromURL(getClass.getResource("/simple-maki.yml")).mkString
-  val queue = new LinkedBlockingQueue[ActionData]()
+  private val jobId = s"job_${System.currentTimeMillis}"
+  private val yaml = Source.fromURL(getClass.getResource("/simple-maki.yml")).mkString
+  private val queue = new LinkedBlockingQueue[ActionData]()
 
   // this will be performed by the job bootstrapper
   client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
 
-  val job = JobParser.parse(jobId, yaml, queue, client, 1)
+  private val job = JobParser.parse(jobId, yaml, queue, client, 1)
 
   "JobParser" should "parse the simple-maki.yml" in {
 
diff --git a/project/build.properties b/project/build.properties
index c0bab04..1fc4b80 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -1 +1 @@
-sbt.version=1.2.8
+sbt.version=1.2.8
\ No newline at end of file
diff --git a/sdk/build.gradle b/sdk/build.gradle
index 585b1c0..f299963 100644
--- a/sdk/build.gradle
+++ b/sdk/build.gradle
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 buildscript {
-    ext.kotlin_version = '1.2.71'
+    ext.kotlin_version = '1.3.21'
 
     repositories {
         mavenCentral()
@@ -74,6 +74,10 @@
 
     // spek requires kotlin-reflect, can be omitted if already in the classpath
     testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
+
+    testImplementation 'org.junit.platform:junit-platform-runner:1.0.0'
+    testImplementation 'org.junit.platform:junit-platform-launcher:1.0.0'
+    testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.0.0'
 }
 
 sourceSets {
diff --git a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
index 40d72c9..57736cc 100644
--- a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
+++ b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
@@ -17,10 +17,8 @@
 package org.apache.amaterasu.sdk.frameworks
 
 import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.logging.KLogging
 import org.apache.amaterasu.common.logging.Logging
-import java.nio.file.Files
-import java.nio.file.Paths
-import java.nio.file.StandardCopyOption
 
 abstract class RunnerSetupProvider : Logging() {
 
@@ -34,12 +32,7 @@
         return "$jobId/${actionData.name}/${actionData.src}"
     }
 
-    open fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> {
-        val downloadableActionSrcPath = getDownloadableActionSrcPath(jobId, actionData)
-        val actionSrcDistPath = "dist/$downloadableActionSrcPath"
-        Files.copy(Paths.get("repo/src/${actionData.src}"), Paths.get(actionSrcDistPath), StandardCopyOption.REPLACE_EXISTING)
-        return arrayOf(downloadableActionSrcPath)
-    }
+    abstract fun getActionUserResources(jobId: String, actionData: ActionData): Array<String>
 
     fun getActionResources(jobId: String, actionData: ActionData): Array<String> =
             actionFiles.map { f -> "$jobId/${actionData.name}/$f" }.toTypedArray() +
diff --git a/settings.gradle b/settings.gradle
index 14044b3..1f45aab 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -59,3 +59,13 @@
 include 'python-pandas'
 project(':python-pandas').projectDir=file("frameworks/python/pandas_runtime")
 
+File spekProject = file("../../kotlin/spek")
+if (spekProject.exists()) {
+    // Composite build
+    includeBuild(spekProject) {
+        dependencySubstitution {
+            substitute module('org.jetbrains.spek:spek-api') with project(':spek-api')
+            substitute module('org.jetbrains.spek:spek-junit-platform-engine') with project(':spek-junit-platform-engine')
+        }
+    }
+}
\ No newline at end of file