yarm containers resources are using the new SDK API
diff --git a/build.gradle b/build.gradle
index dc63d02..7b79da4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 buildscript {
-    ext.kotlin_version = '1.3.0'
+    ext.kotlin_version = '1.3.21'
 
     repositories {
         mavenCentral()
diff --git a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
index 5fca169..31d696c 100755
--- a/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
+++ b/common/src/main/scala/org/apache/amaterasu/common/configuration/ClusterConfig.scala
@@ -92,7 +92,7 @@
   }
 
 
-  val YARN = new YARN()
+  val yarn = new YARN()
 
   class Spark {
     var home: String = ""
@@ -125,7 +125,7 @@
     }
   }
 
-  object Jobs {
+  class Jobs {
 
     var cpus: Double = 1
     var mem: Long = 1024
@@ -137,10 +137,10 @@
       if (props.containsKey("jobs.mem")) mem = props.getProperty("jobs.mem").toLong
       if (props.containsKey("jobs.repoSize")) repoSize = props.getProperty("jobs.repoSize").toLong
 
-      Tasks.load(props)
+      tasks.load(props)
     }
 
-    object Tasks {
+    class Tasks {
 
       var attempts: Int = 3
       var cpus: Int = 1
@@ -155,6 +155,8 @@
       }
     }
 
+    val tasks = new Tasks()
+
   }
 
   object AWS {
@@ -214,9 +216,11 @@
     Jar = this.getClass.getProtectionDomain.getCodeSource.getLocation.toURI.getPath
     JarName = Paths.get(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath).getFileName.toString
 
-    Jobs.load(props)
+    val jobsss = new Jobs()
+    jobsss.load(props)
+
     Webserver.load(props)
-    YARN.load(props)
+    yarn.load(props)
     spark.load(props)
 
     distLocation match {
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
index 7104e28..5b3e842 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.scala
@@ -68,7 +68,7 @@
 
   override def getEnvironmentVariables: util.Map[String, String] = conf.mode match {
     case "mesos" => Map[String, String]("SPARK_HOME" ->s"spark-${conf.Webserver.sparkVersion}","SPARK_HOME_DOCKER" -> "/opt/spark/")
-    case "yarn" => Map[String, String]("SPARK_HOME" -> "spark")
+    case "yarn" => Map[String, String]("SPARK_HOME" -> "spark-${conf.Webserver.sparkVersion}")
     case _ => Map[String, String]()
   }
 
diff --git a/leader-common/build.gradle b/leader-common/build.gradle
index 3499791..7ee53d1 100644
--- a/leader-common/build.gradle
+++ b/leader-common/build.gradle
@@ -79,6 +79,7 @@
     runtime group: 'org.apache.activemq', name: 'activemq-kahadb-store', version: '5.15.3'
     compile group: 'com.andreapivetta.kolor', name: 'kolor', version: '0.0.2'
     compile group: 'com.beust', name: 'klaxon', version: '5.0.1'
+    compile group: 'com.github.ajalt', name: 'clikt', version: '1.6.0'
 
     compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
     compile "org.jetbrains.kotlin:kotlin-reflect"
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
new file mode 100644
index 0000000..413f2cd
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.execution
+
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.leader.common.dsl.GitUtil
+import org.apache.amaterasu.leader.common.dsl.JobParser
+import org.apache.curator.framework.CuratorFramework
+import org.apache.zookeeper.CreateMode
+import java.util.concurrent.BlockingQueue
+
+object JobLoader : KLogging() {
+
+    fun loadJob(src: String, branch: String, jobId: String, client: CuratorFramework, attempts: Int, actionsQueue: BlockingQueue<ActionData>): JobManager {
+
+        // creating the jobs znode and storing the source repo and branch
+        client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId")
+        client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId/repo", src.toByteArray())
+        client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId/branch", branch.toByteArray())
+
+        val maki: String = loadMaki(src, branch)
+
+        return createJobManager(maki, jobId, client, attempts, actionsQueue)
+
+    }
+
+    fun createJobManager(maki: String, jobId: String, client: CuratorFramework, attempts: Int, actionsQueue: BlockingQueue<ActionData>): JobManager {
+
+        return JobParser.parse(
+                jobId,
+                maki,
+                actionsQueue,
+                client,
+                attempts
+        )
+    }
+
+    fun loadMaki(src: String, branch: String): String {
+
+        // cloning the git repo
+        log.debug("getting repo: $src, for branch $branch")
+        GitUtil.cloneRepo(src, branch)
+
+        // parsing the maki.yaml and creating a JobManager to
+        // coordinate the workflow based on the file
+        val maki = JobParser.loadMakiFile()
+        return maki
+    }
+
+    fun reloadJob(jobId: String, client: CuratorFramework, attempts: Int, actionsQueue: BlockingQueue<ActionData>): JobManager {
+
+        //val jobState = client.getChildren.forPath(s"/$jobId")
+        val src = String(client.data.forPath("/$jobId/repo"))
+        val branch = String(client.data.forPath("/$jobId/branch"))
+
+        val maki: String = loadMaki(src, branch)
+
+        val jobManager: JobManager = createJobManager(maki, jobId, client, attempts, actionsQueue)
+        restoreJobState(jobManager, jobId, client)
+
+        jobManager.start()
+        return jobManager
+    }
+
+    fun restoreJobState(jobManager: JobManager, jobId: String, client: CuratorFramework): Unit {
+
+        val tasks = client.children.forPath("/$jobId").filter { it.startsWith("task") }
+
+        for (task in tasks) {
+
+            val status = ActionStatus.valueOf(client.data.forPath("/$jobId/$task").toString())
+            if (status == ActionStatus.Queued || status == ActionStatus.Started) {
+                jobManager.reQueueAction(task.substring(task.indexOf("task-") + 5))
+            }
+
+        }
+
+    }
+
+
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
new file mode 100644
index 0000000..cf3e10d
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.execution.frameworls
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
+import org.reflections.Reflections
+
+class FrameworkProvidersFactory(val env: String, val config: ClusterConfig) : KLogging() {
+
+    var  providers: Map<String, FrameworkSetupProvider>
+
+    init {
+        val reflections =  Reflections(ClassLoader::class.java)
+        val runnerTypes = reflections.getSubTypesOf(FrameworkSetupProvider::class.java)
+
+
+        providers = runnerTypes.map  {
+
+
+            val provider = it.newInstance()
+
+            provider.init(env, config)
+            log.info("a provider for group ${provider.groupIdentifier} was created")
+
+            provider.groupIdentifier to provider
+
+        }.toMap()
+    }
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/AmaOpts.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/AmaOpts.kt
new file mode 100644
index 0000000..132bd77
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/AmaOpts.kt
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.launcher
+
+data class AmaOpts(
+        var repo: String = "",
+        var branch: String = "master",
+        var env: String = "default",
+        var name: String = "amaterasu-job",
+        var jobId: String = "",
+        var newJobId: String = "",
+        var report: String = "code",
+        var home: String = "") {
+
+    fun toCmdString(): String {
+
+        var cmd = " --repo $repo --branch $branch --env $env --name $name --report $report --home $home"
+        if (jobId.isNotEmpty()) {
+            cmd += " --job-id $jobId"
+        }
+        return cmd
+    }
+
+    override fun toString(): String {
+        return toCmdString()
+    }
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/ArgsParser.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/ArgsParser.kt
new file mode 100644
index 0000000..330ccbc
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/launcher/ArgsParser.kt
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.launcher
+
+import com.github.ajalt.clikt.core.CliktCommand
+import com.github.ajalt.clikt.parameters.options.default
+import com.github.ajalt.clikt.parameters.options.option
+import com.github.ajalt.clikt.parameters.options.prompt
+
+abstract class ArgsParser : CliktCommand() {
+
+    private val repo: String by option(help = "The service address").prompt("Please provide an Amaterasu Reop")
+    private val branch: String by option(help = "The branch to be executed (default is master)").default("master")
+    private val env: String by option(help = "The environment to be executed (test, prod, etc. values from the default env are taken if np env specified)").default("default")
+    private val name: String by option(help = "The name of the job").default("amaterasu-job")
+    private val jobId: String by option("--job-id", help = "The jobId - should be passed only when resuming a job").default("")
+    private val newJobId: String by option("--new-job-id" ,help = "The jobId - should never be passed by a user").default("")
+    private val report: String by option(help = "The level of reporting").default("code")
+    private val home: String by option(help = "").default("")
+
+    val opts = AmaOpts(repo, branch, env, name, jobId, newJobId, report, home)
+}
\ No newline at end of file
diff --git a/leader-yarn/build.gradle b/leader-yarn/build.gradle
index 5f05943..f44a7e8 100644
--- a/leader-yarn/build.gradle
+++ b/leader-yarn/build.gradle
@@ -14,6 +14,84 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+buildscript {
 
+    repositories {
+        mavenCentral()
+        maven {
+            url 'http://repository.jetbrains.com/all'
+        }
+        maven {
+            url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots"
+        }
+    }
 
+    dependencies {
+        classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
+        classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0'
+    }
+}
 
+plugins {
+    id "com.github.johnrengelman.shadow" version "2.0.4"
+    id 'scala'
+}
+
+apply plugin: 'kotlin'
+apply plugin: 'org.junit.platform.gradle.plugin'
+
+junitPlatform {
+    filters {
+        engines {
+            include 'spek'
+        }
+    }
+}
+
+sourceCompatibility = 1.8
+targetCompatibility = 1.8
+
+repositories {
+    maven { url "https://plugins.gradle.org/m2/" }
+    maven { url 'http://repository.jetbrains.com/all' }
+    maven { url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots" }
+    maven { url "http://dl.bintray.com/jetbrains/spek" }
+    maven { url "http://oss.jfrog.org/artifactory/oss-snapshot-local" }
+
+    mavenCentral()
+    jcenter()
+}
+
+dependencies {
+    compile project(':leader-common')
+    compile project(':amaterasu-sdk')
+
+    compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
+    compile "org.jetbrains.kotlin:kotlin-reflect"
+    compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.1.1'
+
+}
+
+task copyToHomeRoot(type: Copy) {
+    from 'src/main/scripts'
+    into '../build/amaterasu/'
+}
+
+task copyToHomeBin(type: Copy) {
+    dependsOn shadowJar
+    from 'build/libs'
+    into '../build/amaterasu/bin'
+}
+
+task copyToHome() {
+    dependsOn copyToHomeRoot
+    dependsOn copyToHomeBin
+}
+
+compileKotlin{
+    kotlinOptions.jvmTarget = "1.8"
+}
+
+compileTestKotlin {
+    kotlinOptions.jvmTarget = "1.8"
+}
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/AppMasterArgsParser.kt
similarity index 60%
copy from leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java
copy to leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/AppMasterArgsParser.kt
index b8c29b7..6af59d8 100644
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/AppMasterArgsParser.kt
@@ -14,15 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.leader.yarn;
+package org.apache.amaterasu.leader.yarn
 
-public class JobOpts {
-    public String repo = "";
-    public String branch = "master";
-    public String env = "default";
-    public String name = "amaterasu-job";
-    public String jobId = null;
-    public String newJobId = null;
-    public String report ="code";
-    public String home ="";
-}
\ No newline at end of file
+import org.apache.amaterasu.leader.common.launcher.ArgsParser
+import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
+
+class AppMasterArgsParser(val args: Array<String>): ArgsParser() {
+
+
+    override fun run() {
+
+        val appMaster = ApplicationMaster()
+        appMaster.address = MessagingClientUtil.borkerAddress
+        appMaster.broker.addConnector(appMaster.address)
+        appMaster.broker.start()
+
+        appMaster.execute(opts)
+    }
+}
+
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
new file mode 100644
index 0000000..92e5858
--- /dev/null
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.yarn
+
+import org.apache.activemq.broker.BrokerService
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.leader.common.execution.JobLoader
+import org.apache.amaterasu.leader.common.execution.JobManager
+import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
+import org.apache.amaterasu.leader.common.launcher.AmaOpts
+import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
+import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
+import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.framework.recipes.barriers.DistributedBarrier
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.yarn.api.records.*
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.util.Records
+import org.apache.zookeeper.CreateMode
+import java.io.File
+import java.io.FileInputStream
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.LinkedBlockingQueue
+import javax.jms.MessageConsumer
+
+import kotlinx.coroutines.*
+import org.apache.hadoop.io.DataOutputBuffer
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
+import java.nio.ByteBuffer
+
+class ApplicationMaster : KLogging(), AMRMClientAsync.CallbackHandler {
+    override fun onNodesUpdated(updatedNodes: MutableList<NodeReport>?) {
+        TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+    }
+
+    override fun onShutdownRequest() {
+        TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+    }
+
+    override fun getProgress(): Float {
+        TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+    }
+
+    override fun onError(e: Throwable?) {
+        TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+    }
+
+    override fun onContainersCompleted(statuses: MutableList<ContainerStatus>?) {
+        TODO("not implemented") //To change body of created functions use File | Settings | File Templates.
+    }
+
+    lateinit var address: String
+
+    val broker: BrokerService = BrokerService()
+    private val conf = YarnConfiguration()
+    private val fs = FileSystem.get(conf)
+    private val propPath = System.getenv("PWD") + "/amaterasu.properties"
+    private val props = FileInputStream(File(propPath))
+    private val config = ClusterConfig.apply(props)
+    private val nmClient: NMClientAsync = NMClientAsyncImpl(YarnNMCallbackHandler())
+    private val actionsBuffer = ConcurrentLinkedQueue<ActionData>()
+
+    private lateinit var jobManager: JobManager
+    private lateinit var client: CuratorFramework
+    private lateinit var env: String
+    private lateinit var consumer: MessageConsumer
+    private lateinit var rmClient: AMRMClientAsync<AMRMClient.ContainerRequest>
+
+    fun execute(opts: AmaOpts) {
+
+        try {
+            initJob(opts)
+        } catch (e: Exception) {
+            log.error("error initializing ", e.message)
+        }
+
+        // now that the job was initiated, the curator client is Started and we can
+        // register the broker's address
+        client.create().withMode(CreateMode.PERSISTENT).forPath("/${jobManager.jobId}/broker")
+        client.setData().forPath("/${jobManager.jobId}/broker", address.toByteArray())
+
+        // once the broker is registered, we can remove the barrier so clients can connect
+        log.info("/${jobManager.jobId}-report-barrier")
+        val barrier = DistributedBarrier(client, "/${jobManager.jobId}-report-barrier")
+        barrier.removeBarrier()
+
+        consumer = MessagingClientUtil.setupMessaging(address)
+
+        // Initialize clients to ResourceManager and NodeManagers
+        nmClient.init(conf)
+        nmClient.start()
+
+        rmClient = startRMClient()
+
+        val registrationResponse = rmClient.registerApplicationMaster("", 0, "")
+        val maxMem = registrationResponse.maximumResourceCapability.memory
+        val maxVCores = registrationResponse.maximumResourceCapability.virtualCores
+
+
+        val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
+
+        while (!jobManager.outOfActions) {
+            val capability = Records.newRecord(Resource::class.java)
+
+            val actionData = jobManager.nextActionData
+            if (actionData != null) {
+
+                val frameworkProvider = frameworkFactory.getFramework(actionData.groupId)
+                val driverConfiguration = frameworkProvider.driverConfiguration
+
+                var mem: Int = driverConfiguration.memory
+                mem = Math.min(mem, maxMem)
+                capability.memory = mem
+
+                var cpu = driverConfiguration.cpus
+                cpu = Math.min(cpu, maxVCores)
+                capability.virtualCores = cpu
+
+                requestContainer(actionData, capability)
+            }
+        }
+    }
+
+    private fun initJob(opts: AmaOpts) {
+
+        this.env = opts.env
+
+        try {
+            val retryPolicy = ExponentialBackoffRetry(1000, 3)
+            client = CuratorFrameworkFactory.newClient(config.zk(), retryPolicy)
+            client.start()
+        } catch (e: Exception) {
+            log.error("Error connecting to zookeeper", e)
+            throw e
+        }
+
+        if (opts.jobId.isNotEmpty()) {
+            log.info("resuming job" + opts.jobId)
+            jobManager = JobLoader.reloadJob(
+                    opts.jobId,
+                    client,
+                    config.Jobs().tasks().attempts(),
+                    LinkedBlockingQueue<ActionData>())
+
+        } else {
+            log.info("new job is being created")
+            try {
+
+                jobManager = JobLoader.loadJob(
+                        opts.repo,
+                        opts.branch,
+                        opts.newJobId,
+                        client,
+                        config.Jobs().tasks().attempts(),
+                        LinkedBlockingQueue<ActionData>())
+            } catch (e: Exception) {
+                log.error("Error creating JobManager.", e)
+                throw e
+            }
+
+        }
+
+        jobManager.start()
+        log.info("Started jobManager")
+    }
+
+    override fun onContainersAllocated(containers: MutableList<Container>?) = runBlocking {
+        containers?.let {
+            for (container in it) {
+                if (actionsBuffer.isNotEmpty()) {
+                    val actionData = actionsBuffer.poll()
+                    val result = async {
+                        val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
+                        val framework = frameworkFactory.getFramework(actionData.groupId)
+                        val runnerProvider = framework.getRunnerProvider(actionData.typeId)
+                        val ctx = Records.newRecord(ContainerLaunchContext::class.java)
+                        val commands: List<String> = listOf(runnerProvider.getCommand(jobManager.jobId, actionData, env, "${actionData.id}-${container.id.containerId}", address))
+
+                        ctx.commands = commands
+                        ctx.tokens = allTokens()
+                        ctx.localResources = setupContainerResources(framework, runnerProvider)
+
+                        nmClient.startContainerAsync(container, ctx)
+
+                    }
+
+
+
+                }
+            }
+        }
+    }!!
+
+    private fun allTokens(): ByteBuffer {
+        // creating the credentials for container execution
+        val credentials = UserGroupInformation.getCurrentUser().credentials
+        val dob = DataOutputBuffer()
+        credentials.writeTokenStorageToStream(dob)
+
+        // removing the AM->RM token so that containers cannot access it.
+        val iter = credentials.allTokens.iterator()
+        log.info("Executing with tokens:")
+        for (token in iter) {
+            log.info(token.toString())
+            if (token.kind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
+        }
+        return ByteBuffer.wrap(dob.data, 0, dob.length)
+    }
+
+    /**
+     * Creates the map of resources to be copied into the container
+     * @framework The frameworkSetupProvider for the action
+     * @runnerProvider the actions runner provider
+     */
+    private fun setupContainerResources(framework: FrameworkSetupProvider, runnerProvider: RunnerSetupProvider): Map<String, LocalResource> {
+
+        val yarnJarPath = Path(config.yarn().hdfsJarsPath())
+
+        // Getting framework (group) resources
+        val result = framework.groupResources.map { it.path to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it.path))) }.toMap().toMutableMap()
+
+        // Getting runner resources
+        result.putAll(runnerProvider.runnerResources.map { it to createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath(it))) }.toMap())
+
+        // Adding the Amaterasu configuration files
+        result["amaterasu.properties"] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, Path("/amaterasu.properties")))
+        result["log4j.properties"] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, Path("/log4j.properties")))
+
+        return result
+    }
+
+    private fun createDistPath(path: String): Path = Path.mergePaths(Path("dist/"), Path(path))
+
+    private fun startRMClient(): AMRMClientAsync<AMRMClient.ContainerRequest> {
+        val client = AMRMClientAsync.createAMRMClientAsync<AMRMClient.ContainerRequest>(1000, this)
+        client.init(conf)
+        client.start()
+        return client
+    }
+
+    private fun createLocalResourceFromPath(path: Path): LocalResource {
+
+        val stat = fs.getFileStatus(path)
+        val fileResource = Records.newRecord(LocalResource::class.java)
+
+        fileResource.shouldBeUploadedToSharedCache = true
+        fileResource.visibility = LocalResourceVisibility.PUBLIC
+        fileResource.resource = ConverterUtils.getYarnUrlFromPath(path)
+        fileResource.size = stat.len
+        fileResource.timestamp = stat.modificationTime
+        fileResource.type = LocalResourceType.FILE
+        fileResource.visibility = LocalResourceVisibility.PUBLIC
+        return fileResource
+
+    }
+
+    private fun requestContainer(actionData: ActionData, capability: Resource) {
+
+        actionsBuffer.add(actionData)
+        log.info("About to ask container for action ${actionData.id}. Action buffer size is: ${actionsBuffer.size}")
+
+        // we have an action to schedule, let's request a container
+        val priority: Priority = Records.newRecord(Priority::class.java)
+        priority.priority = 1
+        val containerReq = AMRMClient.ContainerRequest(capability, null, null, priority)
+        rmClient.addContainerRequest(containerReq)
+        log.info("Asked container for action ${actionData.id}")
+
+    }
+
+    companion object {
+        @JvmStatic
+        fun main(args: Array<String>) = AppMasterArgsParser(args).main(args)
+
+    }
+}
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt
new file mode 100644
index 0000000..48fb67b
--- /dev/null
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.yarn
+
+import org.apache.activemq.ActiveMQConnectionFactory
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.leader.common.launcher.AmaOpts
+import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
+import org.apache.amaterasu.leader.common.utilities.ActiveReportListener
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.framework.recipes.barriers.DistributedBarrier
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.hadoop.fs.*
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.records.*
+import org.apache.hadoop.yarn.client.api.YarnClient
+import org.apache.hadoop.yarn.client.api.YarnClientApplication
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.YarnException
+import org.apache.hadoop.yarn.util.Apps
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.util.Records
+import org.apache.log4j.LogManager
+import org.slf4j.LoggerFactory
+
+import javax.jms.*
+import java.io.File
+import java.io.FileInputStream
+import java.io.IOException
+import java.util.*
+
+import java.lang.System.exit
+
+class Client {
+    private val conf = YarnConfiguration()
+    private var fs: FileSystem? = null
+
+    @Throws(IOException::class)
+    private fun setLocalResourceFromPath(path: Path): LocalResource {
+
+        val stat = fs!!.getFileStatus(path)
+        val fileResource = Records.newRecord(LocalResource::class.java)
+        fileResource.resource = ConverterUtils.getYarnUrlFromPath(path)
+        fileResource.size = stat.len
+        fileResource.timestamp = stat.modificationTime
+        fileResource.type = LocalResourceType.FILE
+        fileResource.visibility = LocalResourceVisibility.PUBLIC
+        return fileResource
+    }
+
+    @Throws(Exception::class)
+    fun run(opts: AmaOpts, args: Array<String>) {
+
+        LogManager.resetConfiguration()
+        val config = ClusterConfig()
+        config.load(FileInputStream(opts.home + "/amaterasu.properties"))
+
+        // Create yarnClient
+        val yarnClient = YarnClient.createYarnClient()
+        yarnClient.init(conf)
+        yarnClient.start()
+
+        // Create application via yarnClient
+        var app: YarnClientApplication? = null
+        try {
+            app = yarnClient.createApplication()
+        } catch (e: YarnException) {
+            LOGGER.error("Error initializing yarn application with yarn client.", e)
+            exit(1)
+        } catch (e: IOException) {
+            LOGGER.error("Error initializing yarn application with yarn client.", e)
+            exit(2)
+        }
+
+        // Setup jars on hdfs
+        try {
+            fs = FileSystem.get(conf)
+        } catch (e: IOException) {
+            LOGGER.error("Eror creating HDFS client isntance.", e)
+            exit(3)
+        }
+
+        val jarPath = Path(config.yarn().hdfsJarsPath())
+        val jarPathQualified = fs!!.makeQualified(jarPath)
+
+        val appContext = app!!.applicationSubmissionContext
+
+        var newId = ""
+        if (opts.jobId.isEmpty()) {
+            newId = "--new-job-id " + appContext.applicationId.toString() + "-" + UUID.randomUUID().toString()
+        }
+
+
+        val commands = listOf("env AMA_NODE=" + System.getenv("AMA_NODE") +
+                " env HADOOP_USER_NAME=" + UserGroupInformation.getCurrentUser().userName +
+                " \$JAVA_HOME/bin/java" +
+                " -Dscala.usejavacp=false" +
+                " -Xmx2G" +
+                " org.apache.amaterasu.leader.yarn.ApplicationMaster " +
+                joinStrings(args) +
+                newId +
+                " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+                " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+
+        // Set up the container launch context for the application master
+        val amContainer = Records.newRecord(ContainerLaunchContext::class.java)
+        amContainer.commands = commands
+
+        // Setup local ama folder on hdfs.
+        try {
+
+            println("===> $jarPathQualified")
+            if (!fs!!.exists(jarPathQualified)) {
+                val home = File(opts.home)
+                fs!!.mkdirs(jarPathQualified)
+
+                for (f in home.listFiles()) {
+                    fs!!.copyFromLocalFile(false, true, Path(f.getAbsolutePath()), jarPathQualified)
+                }
+
+                // setup frameworks
+                println("===> setting up frameworks")
+                val frameworkFactory = FrameworkProvidersFactory.apply(opts.env, config)
+                for (group in frameworkFactory.groups()) {
+                    println("===> setting up $group")
+                    val framework = frameworkFactory.getFramework(group)
+
+                    //creating a group folder
+                    val frameworkPath = Path.mergePaths(jarPathQualified, Path("/" + framework.groupIdentifier))
+                    println("===> $frameworkPath")
+
+                    fs!!.mkdirs(frameworkPath)
+                    for (file in framework.groupResources) {
+                        if (file.exists())
+                            file.let {
+                                fs!!.copyFromLocalFile(false, true, Path(file.absolutePath), Path(it.path))
+                            }
+
+                    }
+                }
+            }
+
+        } catch (e: IOException) {
+            println("===>" + e.message)
+            LOGGER.error("Error uploading ama folder to HDFS.", e)
+            exit(3)
+        } catch (ne: NullPointerException) {
+            println("===>" + ne.message)
+            LOGGER.error("No files in home dir.", ne)
+            exit(4)
+        }
+
+        // get version of build
+        val version = config.version()
+
+        // get local resources pointers that will be set on the master container env
+        val leaderJarPath = String.format("/bin/leader-%s-all.jar", version)
+        LOGGER.info("Leader Jar path is: {}", leaderJarPath)
+        val mergedPath = Path.mergePaths(jarPath, Path(leaderJarPath))
+
+        // System.out.println("===> path: " + jarPathQualified);
+        LOGGER.info("Leader merged jar path is: {}", mergedPath)
+        var leaderJar: LocalResource? = null
+        var propFile: LocalResource? = null
+        var log4jPropFile: LocalResource? = null
+
+        try {
+            leaderJar = setLocalResourceFromPath(mergedPath)
+            propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, Path("/amaterasu.properties")))
+            log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, Path("/log4j.properties")))
+        } catch (e: IOException) {
+            LOGGER.error("Error initializing yarn local resources.", e)
+            exit(4)
+        }
+
+        // set local resource on master container
+        val localResources = HashMap<String, LocalResource>()
+        //localResources.put("leader.jar", leaderJar);
+        // making the bin folder's content available to the appMaster
+        val bin = fs!!.listFiles(Path.mergePaths(jarPath, Path("/bin")), true)
+
+        while (bin.hasNext()) {
+            val binFile = bin.next()
+            localResources[binFile.path.name] = setLocalResourceFromPath(binFile.path)
+        }
+
+        localResources["amaterasu.properties"] = propFile!!
+        localResources["log4j.properties"] = log4jPropFile!!
+        amContainer.localResources = localResources
+
+        // Setup CLASSPATH for ApplicationMaster
+        val appMasterEnv = HashMap<String, String>()
+        setupAppMasterEnv(appMasterEnv)
+        appMasterEnv["AMA_CONF_PATH"] = String.format("%s/amaterasu.properties", config.YARN().hdfsJarsPath())
+        amContainer.environment = appMasterEnv
+
+        // Set up resource type requirements for ApplicationMaster
+        val capability = Records.newRecord(Resource::class.java)
+        capability.memory = config.YARN().master().memoryMB()
+        capability.virtualCores = config.YARN().master().cores()
+
+        // Finally, set-up ApplicationSubmissionContext for the application
+        appContext.applicationName = "amaterasu-" + opts.name
+        appContext.amContainerSpec = amContainer
+        appContext.resource = capability
+        appContext.queue = config.YARN().queue()
+        appContext.priority = Priority.newInstance(1)
+
+        // Submit application
+        val appId = appContext.applicationId
+        LOGGER.info("Submitting application {}", appId)
+        try {
+            yarnClient.submitApplication(appContext)
+
+        } catch (e: YarnException) {
+            LOGGER.error("Error submitting application.", e)
+            exit(6)
+        } catch (e: IOException) {
+            LOGGER.error("Error submitting application.", e)
+            exit(7)
+        }
+
+        val client = CuratorFrameworkFactory.newClient(config.zk(),
+                ExponentialBackoffRetry(1000, 3))
+        client.start()
+
+        val newJobId = newId.replace("--new-job-id ", "")
+        println("===> /$newJobId-report-barrier")
+        val reportBarrier = DistributedBarrier(client, "/$newJobId-report-barrier")
+        reportBarrier.setBarrier()
+        reportBarrier.waitOnBarrier()
+
+        val address = String(client.data.forPath("/$newJobId/broker"))
+        println("===> $address")
+        setupReportListener(address)
+
+        var appReport: ApplicationReport? = null
+        var appState: YarnApplicationState
+
+        do {
+            try {
+                appReport = yarnClient.getApplicationReport(appId)
+            } catch (e: YarnException) {
+                LOGGER.error("Error getting application report.", e)
+                exit(8)
+            } catch (e: IOException) {
+                LOGGER.error("Error getting application report.", e)
+                exit(9)
+            }
+
+            appState = appReport!!.yarnApplicationState
+            if (isAppFinished(appState)) {
+                exit(0)
+                break
+            }
+            //LOGGER.info("Application not finished ({})", appReport.getProgress());
+            try {
+                Thread.sleep(100)
+            } catch (e: InterruptedException) {
+                LOGGER.error("Interrupted while waiting for job completion.", e)
+                exit(137)
+            }
+
+        } while (!isAppFinished(appState))
+
+        LOGGER.info("Application {} finished with state {}-{} at {}", appId, appState, appReport!!.finalApplicationStatus, appReport.finishTime)
+    }
+
+    private fun isAppFinished(appState: YarnApplicationState): Boolean {
+        return appState == YarnApplicationState.FINISHED ||
+                appState == YarnApplicationState.KILLED ||
+                appState == YarnApplicationState.FAILED
+    }
+
+    @Throws(JMSException::class)
+    private fun setupReportListener(address: String) {
+
+        val cf = ActiveMQConnectionFactory(address)
+        val conn = cf.createConnection()
+        conn.start()
+
+        val session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
+
+        //TODO: move to a const in common
+        val destination = session.createTopic("JOB.REPORT")
+
+        val consumer = session.createConsumer(destination)
+        consumer.messageListener =ActiveReportListener()
+
+    }
+
+    private fun setupAppMasterEnv(appMasterEnv: Map<String, String>) {
+        Apps.addToEnvironment(appMasterEnv,
+                ApplicationConstants.Environment.CLASSPATH.name,
+                ApplicationConstants.Environment.PWD.`$`() + File.separator + "*", File.pathSeparator)
+
+        for (c in conf.getStrings(
+                YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                *YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+            Apps.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name,
+                    c.trim { it <= ' ' }, File.pathSeparator)
+        }
+    }
+
+    companion object {
+
+        private val LOGGER = LoggerFactory.getLogger(Client::class.java)
+
+        @Throws(Exception::class)
+        @JvmStatic
+        fun main(args: Array<String>) = ClientArgsParser(args).main(args)
+
+        private fun joinStrings(str: Array<String>): String {
+
+            val builder = StringBuilder()
+            for (s in str) {
+                builder.append(s)
+                builder.append(" ")
+            }
+            return builder.toString()
+
+        }
+    }
+}
\ No newline at end of file
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
similarity index 69%
rename from leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java
rename to leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
index b8c29b7..df3aed9 100644
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/JobOpts.java
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
@@ -14,15 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.leader.yarn;
+package org.apache.amaterasu.leader.yarn
 
-public class JobOpts {
-    public String repo = "";
-    public String branch = "master";
-    public String env = "default";
-    public String name = "amaterasu-job";
-    public String jobId = null;
-    public String newJobId = null;
-    public String report ="code";
-    public String home ="";
+import org.apache.amaterasu.leader.common.launcher.ArgsParser
+
+class ClientArgsParser(val args: Array<String>): ArgsParser() {
+
+    override fun run() {
+        val client = Client()
+        client.run(opts, args)
+    }
 }
\ No newline at end of file
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt
new file mode 100644
index 0000000..0702f01
--- /dev/null
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/YarnNMCallbackHandler.kt
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.yarn
+
+import org.apache.amaterasu.common.logging.KLogging
+
+import java.nio.ByteBuffer
+
+import org.apache.hadoop.yarn.api.records.ContainerId
+import org.apache.hadoop.yarn.api.records.ContainerStatus
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync
+
+
+
+class YarnNMCallbackHandler : KLogging() , NMClientAsync.CallbackHandler {
+
+    override fun onStartContainerError(containerId: ContainerId, t: Throwable) {
+        log.error("Container ${containerId.containerId} couldn't start.", t)
+    }
+
+    override fun onGetContainerStatusError(containerId: ContainerId, t: Throwable) {
+        log.error("Couldn't get status from container ${containerId.containerId}.", t)
+    }
+
+    override fun onContainerStatusReceived(containerId: ContainerId, containerStatus: ContainerStatus) {
+        log.info("Container ${containerId.containerId} has status of ${containerStatus.state}")
+    }
+
+    override fun onContainerStarted(containerId: ContainerId, allServiceResponse: Map<String, ByteBuffer>) {
+        log.info("Container ${containerId.containerId} Started")
+    }
+
+    override fun onStopContainerError(containerId: ContainerId, t: Throwable) {
+        log.error("Container ${containerId.containerId} has thrown an error", t)
+    }
+
+    override fun onContainerStopped(containerId: ContainerId) {
+        log.info("Container ${containerId.containerId} stopped")
+    }
+
+}
\ No newline at end of file
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java
deleted file mode 100644
index 38a9c38..0000000
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/ArgsParser.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.yarn;
-
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-public class ArgsParser {
-    private static Options getOptions() {
-
-        Options options = new Options();
-        options.addOption("r", "repo", true, "The git repo containing the job");
-        options.addOption("b", "branch", true, "The branch to be executed (default is master)");
-        options.addOption("e", "env", true, "The environment to be executed (test, prod, etc. values from the default env are taken if np env specified)");
-        options.addOption("n", "name", true, "The name of the job");
-        options.addOption("i", "job-id", true, "The jobId - should be passed only when resuming a job");
-        options.addOption("j", "new-job-id", true, "The jobId - should never be passed by a user");
-        options.addOption("r", "report", true, "The level of reporting");
-        options.addOption("h", "home", true, "The level of reporting");
-
-        return options;
-    }
-
-    public static JobOpts getJobOpts(String[] args) throws ParseException {
-
-        CommandLineParser parser = new BasicParser();
-        Options options = getOptions();
-        CommandLine cli = parser.parse(options, args);
-
-        JobOpts opts = new JobOpts();
-        if (cli.hasOption("repo")) {
-            opts.repo = cli.getOptionValue("repo");
-        }
-
-        if (cli.hasOption("branch")) {
-            opts.branch = cli.getOptionValue("branch");
-        }
-
-        if (cli.hasOption("env")) {
-            opts.env = cli.getOptionValue("env");
-        }
-
-        if (cli.hasOption("job-id")) {
-            opts.jobId = cli.getOptionValue("job-id");
-        }
-        if (cli.hasOption("new-job-id")) {
-            opts.newJobId = cli.getOptionValue("new-job-id");
-        }
-
-        if (cli.hasOption("report")) {
-            opts.report = cli.getOptionValue("report");
-        }
-
-        if (cli.hasOption("home")) {
-            opts.home = cli.getOptionValue("home");
-        }
-
-        if (cli.hasOption("name")) {
-            opts.name = cli.getOptionValue("name");
-        }
-
-        return opts;
-    }
-}
diff --git a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java b/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
deleted file mode 100644
index 3b0e8c2..0000000
--- a/leader/src/main/java/org/apache/amaterasu/leader/yarn/Client.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.yarn;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.amaterasu.common.configuration.ClusterConfig;
-import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory;
-import org.apache.amaterasu.leader.common.utilities.ActiveReportListener;
-import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.log4j.LogManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.*;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.*;
-
-import static java.lang.System.exit;
-
-public class Client {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(Client.class);
-    private final Configuration conf = new YarnConfiguration();
-    private FileSystem fs;
-
-    private LocalResource setLocalResourceFromPath(Path path) throws IOException {
-
-        FileStatus stat = fs.getFileStatus(path);
-        LocalResource fileResource = Records.newRecord(LocalResource.class);
-        fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path));
-        fileResource.setSize(stat.getLen());
-        fileResource.setTimestamp(stat.getModificationTime());
-        fileResource.setType(LocalResourceType.FILE);
-        fileResource.setVisibility(LocalResourceVisibility.PUBLIC);
-        return fileResource;
-    }
-
-    private void run(JobOpts opts, String[] args) throws Exception {
-
-        LogManager.resetConfiguration();
-        ClusterConfig config = new ClusterConfig();
-        config.load(new FileInputStream(opts.home + "/amaterasu.properties"));
-
-        // Create yarnClient
-        YarnClient yarnClient = YarnClient.createYarnClient();
-        yarnClient.init(conf);
-        yarnClient.start();
-
-        // Create application via yarnClient
-        YarnClientApplication app = null;
-        try {
-            app = yarnClient.createApplication();
-        } catch (YarnException e) {
-            LOGGER.error("Error initializing yarn application with yarn client.", e);
-            exit(1);
-        } catch (IOException e) {
-            LOGGER.error("Error initializing yarn application with yarn client.", e);
-            exit(2);
-        }
-
-        // Setup jars on hdfs
-        try {
-            fs = FileSystem.get(conf);
-        } catch (IOException e) {
-            LOGGER.error("Eror creating HDFS client isntance.", e);
-            exit(3);
-        }
-        Path jarPath = new Path(config.YARN().hdfsJarsPath());
-        Path jarPathQualified = fs.makeQualified(jarPath);
-
-        ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
-
-        String newId = "";
-        if (opts.jobId == null) {
-            newId = "--new-job-id " + appContext.getApplicationId().toString() + "-" + UUID.randomUUID().toString();
-        }
-
-
-        List<String> commands = Collections.singletonList(
-                "env AMA_NODE=" + System.getenv("AMA_NODE") +
-                        " env HADOOP_USER_NAME=" + UserGroupInformation.getCurrentUser().getUserName() +
-                        " $JAVA_HOME/bin/java" +
-                        " -Dscala.usejavacp=false" +
-                        " -Xmx2G" +
-                        " org.apache.amaterasu.leader.yarn.ApplicationMaster " +
-                        joinStrings(args) +
-                        newId +
-                        " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
-                        " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
-        );
-
-
-        // Set up the container launch context for the application master
-        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
-        amContainer.setCommands(commands);
-
-        // Setup local ama folder on hdfs.
-        try {
-
-            System.out.println("===> " + jarPathQualified);
-            if (!fs.exists(jarPathQualified)) {
-                File home = new File(opts.home);
-                fs.mkdirs(jarPathQualified);
-
-                for (File f : home.listFiles()) {
-                    fs.copyFromLocalFile(false, true, new Path(f.getAbsolutePath()), jarPathQualified);
-                }
-
-                // setup frameworks
-                System.out.println("===> setting up frameworks");
-                FrameworkProvidersFactory frameworkFactory = FrameworkProvidersFactory.apply(opts.env, config);
-                for (String group : frameworkFactory.groups()) {
-                    System.out.println("===> setting up " + group);
-                    FrameworkSetupProvider framework = frameworkFactory.getFramework(group);
-
-                    //creating a group folder
-                    Path frameworkPath = Path.mergePaths(jarPathQualified, new Path("/" + framework.getGroupIdentifier()));
-                    System.out.println("===> " + frameworkPath.toString());
-
-                    fs.mkdirs(frameworkPath);
-                    for (File file : framework.getGroupResources()) {
-                        if (file.exists())
-                            fs.copyFromLocalFile(false, true, new Path(file.getAbsolutePath()), frameworkPath);
-                    }
-                }
-            }
-        } catch (IOException e) {
-            System.out.println("===>" + e.getMessage());
-            LOGGER.error("Error uploading ama folder to HDFS.", e);
-            exit(3);
-        } catch (NullPointerException ne) {
-            System.out.println("===>" + ne.getMessage());
-            LOGGER.error("No files in home dir.", ne);
-            exit(4);
-        }
-
-        // get version of build
-        String version = config.version();
-
-        // get local resources pointers that will be set on the master container env
-        String leaderJarPath = String.format("/bin/leader-%s-all.jar", version);
-        LOGGER.info("Leader Jar path is: {}", leaderJarPath);
-        Path mergedPath = Path.mergePaths(jarPath, new Path(leaderJarPath));
-
-        // System.out.println("===> path: " + jarPathQualified);
-        LOGGER.info("Leader merged jar path is: {}", mergedPath);
-        LocalResource leaderJar = null;
-        LocalResource propFile = null;
-        LocalResource log4jPropFile = null;
-
-        try {
-            leaderJar = setLocalResourceFromPath(mergedPath);
-            propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/amaterasu.properties")));
-            log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/log4j.properties")));
-        } catch (IOException e) {
-            LOGGER.error("Error initializing yarn local resources.", e);
-            exit(4);
-        }
-
-        // set local resource on master container
-        Map<String, LocalResource> localResources = new HashMap<>();
-        //localResources.put("leader.jar", leaderJar);
-        // making the bin folder's content available to the appMaster
-        RemoteIterator<LocatedFileStatus> bin = fs.listFiles(Path.mergePaths(jarPath, new Path("/bin")), true);
-
-        while (bin.hasNext()){
-            LocatedFileStatus binFile = bin.next();
-            localResources.put(binFile.getPath().getName(), setLocalResourceFromPath(binFile.getPath()));
-        }
-
-        localResources.put("amaterasu.properties", propFile);
-        localResources.put("log4j.properties", log4jPropFile);
-        amContainer.setLocalResources(localResources);
-
-        // Setup CLASSPATH for ApplicationMaster
-        Map<String, String> appMasterEnv = new HashMap<>();
-        setupAppMasterEnv(appMasterEnv);
-        appMasterEnv.put("AMA_CONF_PATH", String.format("%s/amaterasu.properties", config.YARN().hdfsJarsPath()));
-        amContainer.setEnvironment(appMasterEnv);
-
-        // Set up resource type requirements for ApplicationMaster
-        Resource capability = Records.newRecord(Resource.class);
-        capability.setMemory(config.YARN().master().memoryMB());
-        capability.setVirtualCores(config.YARN().master().cores());
-
-        // Finally, set-up ApplicationSubmissionContext for the application
-        appContext.setApplicationName("amaterasu-" + opts.name);
-        appContext.setAMContainerSpec(amContainer);
-        appContext.setResource(capability);
-        appContext.setQueue(config.YARN().queue());
-        appContext.setPriority(Priority.newInstance(1));
-
-        // Submit application
-        ApplicationId appId = appContext.getApplicationId();
-        LOGGER.info("Submitting application {}", appId);
-        try {
-            yarnClient.submitApplication(appContext);
-
-        } catch (YarnException e) {
-            LOGGER.error("Error submitting application.", e);
-            exit(6);
-        } catch (IOException e) {
-            LOGGER.error("Error submitting application.", e);
-            exit(7);
-        }
-
-        CuratorFramework client = CuratorFrameworkFactory.newClient(config.zk(),
-                new ExponentialBackoffRetry(1000, 3));
-        client.start();
-
-        String newJobId = newId.replace("--new-job-id ", "");
-        System.out.println("===> /" + newJobId + "-report-barrier");
-        DistributedBarrier reportBarrier = new DistributedBarrier(client, "/" + newJobId + "-report-barrier");
-        reportBarrier.setBarrier();
-        reportBarrier.waitOnBarrier();
-
-        String address = new String(client.getData().forPath("/" + newJobId + "/broker"));
-        System.out.println("===> " + address);
-        setupReportListener(address);
-
-        ApplicationReport appReport = null;
-        YarnApplicationState appState;
-
-        do {
-            try {
-                appReport = yarnClient.getApplicationReport(appId);
-            } catch (YarnException e) {
-                LOGGER.error("Error getting application report.", e);
-                exit(8);
-            } catch (IOException e) {
-                LOGGER.error("Error getting application report.", e);
-                exit(9);
-            }
-            appState = appReport.getYarnApplicationState();
-            if (isAppFinished(appState)) {
-                exit(0);
-                break;
-            }
-            //LOGGER.info("Application not finished ({})", appReport.getProgress());
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                LOGGER.error("Interrupted while waiting for job completion.", e);
-                exit(137);
-            }
-        } while (!isAppFinished(appState));
-
-        LOGGER.info("Application {} finished with state {}-{} at {}", appId, appState, appReport.getFinalApplicationStatus(), appReport.getFinishTime());
-    }
-
-    private boolean isAppFinished(YarnApplicationState appState) {
-        return appState == YarnApplicationState.FINISHED ||
-                appState == YarnApplicationState.KILLED ||
-                appState == YarnApplicationState.FAILED;
-    }
-
-    public static void main(String[] args) throws Exception {
-        Client c = new Client();
-
-        JobOpts opts = ArgsParser.getJobOpts(args);
-
-        c.run(opts, args);
-    }
-
-    private static String joinStrings(String[] str) {
-
-        StringBuilder builder = new StringBuilder();
-        for (String s : str) {
-            builder.append(s);
-            builder.append(" ");
-        }
-        return builder.toString();
-
-    }
-
-    private void setupReportListener(String address) throws JMSException {
-
-        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(address);
-        Connection conn = cf.createConnection();
-        conn.start();
-
-        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-        //TODO: move to a const in common
-        Topic destination = session.createTopic("JOB.REPORT");
-
-        MessageConsumer consumer = session.createConsumer(destination);
-        consumer.setMessageListener(new ActiveReportListener());
-
-    }
-
-    private void setupAppMasterEnv(Map<String, String> appMasterEnv) {
-        Apps.addToEnvironment(appMasterEnv,
-                ApplicationConstants.Environment.CLASSPATH.name(),
-                ApplicationConstants.Environment.PWD.$() + File.separator + "*", File.pathSeparator);
-
-        for (String c : conf.getStrings(
-                YarnConfiguration.YARN_APPLICATION_CLASSPATH,
-                YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
-            Apps.addToEnvironment(appMasterEnv, ApplicationConstants.Environment.CLASSPATH.name(),
-                    c.trim(), File.pathSeparator);
-        }
-    }
-}
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
index 771eb15..979709a 100644
--- a/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/yarn/ApplicationMaster.scala
@@ -1,483 +1,483 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.yarn
-
-import java.io.{File, FileInputStream, InputStream}
-import java.net.{InetAddress, ServerSocket}
-import java.nio.ByteBuffer
-import java.util
-import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
-
-import javax.jms.MessageConsumer
-import org.apache.activemq.broker.BrokerService
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.leader.common.execution.JobManager
-import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
-import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
-import org.apache.amaterasu.leader.execution.JobLoader
-import org.apache.amaterasu.leader.utilities.Args
-import org.apache.curator.framework.recipes.barriers.DistributedBarrier
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.io.DataOutputBuffer
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
-import org.apache.hadoop.yarn.client.api.async.{AMRMClientAsync, NMClientAsync}
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
-import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import org.apache.zookeeper.CreateMode
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-import scala.collection.{concurrent, mutable}
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-class ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler {
-
-  var capability: Resource = _
-
-  log.info("ApplicationMaster start")
-
-  private var jobManager: JobManager = _
-  private var client: CuratorFramework = _
-  private var config: ClusterConfig = _
-  private var env: String = _
-  private var branch: String = _
-  private var fs: FileSystem = _
-  private var conf: YarnConfiguration = _
-  private var propPath: String = ""
-  private var props: InputStream = _
-  private var jarPath: Path = _
-  private var executorPath: Path = _
-  private var executorJar: LocalResource = _
-  private var propFile: LocalResource = _
-  private var log4jPropFile: LocalResource = _
-  private var nmClient: NMClientAsync = _
-  private var allocListener: YarnRMCallbackHandler = _
-  private var rmClient: AMRMClientAsync[ContainerRequest] = _
-  private var address: String = _
-  private var consumer: MessageConsumer = _
-
-  private val containersIdsToTask: concurrent.Map[Long, ActionData] = new ConcurrentHashMap[Long, ActionData].asScala
-  private val completedContainersAndTaskIds: concurrent.Map[Long, String] = new ConcurrentHashMap[Long, String].asScala
-  private val actionsBuffer: java.util.concurrent.ConcurrentLinkedQueue[ActionData] = new java.util.concurrent.ConcurrentLinkedQueue[ActionData]()
-  private val host: String = InetAddress.getLocalHost.getHostName
-  private val broker: BrokerService = new BrokerService()
-
-
-  def setLocalResourceFromPath(path: Path): LocalResource = {
-
-    val stat = fs.getFileStatus(path)
-    val fileResource = Records.newRecord(classOf[LocalResource])
-
-    fileResource.setShouldBeUploadedToSharedCache(true)
-    fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
-    fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path))
-    fileResource.setSize(stat.getLen)
-    fileResource.setTimestamp(stat.getModificationTime)
-    fileResource.setType(LocalResourceType.FILE)
-    fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
-    fileResource
-
-  }
-
-  def execute(arguments: Args): Unit = {
-
-    log.info(s"Started AM with args $arguments")
-
-    propPath = System.getenv("PWD") + "/amaterasu.properties"
-    props = new FileInputStream(new File(propPath))
-
-    // no need for hdfs double check (nod to Aaron Rodgers)
-    // jars on HDFS should have been verified by the YARN client
-    conf = new YarnConfiguration()
-    fs = FileSystem.get(conf)
-
-    config = ClusterConfig(props)
-
-    try {
-      initJob(arguments)
-    } catch {
-      case e: Exception => log.error("error initializing ", e.getMessage)
-    }
-
-    // now that the job was initiated, the curator client is Started and we can
-    // register the broker's address
-    client.create().withMode(CreateMode.PERSISTENT).forPath(s"/${jobManager.getJobId}/broker")
-    client.setData().forPath(s"/${jobManager.getJobId}/broker", address.getBytes)
-
-    // once the broker is registered, we can remove the barrier so clients can connect
-    log.info(s"/${jobManager.getJobId}-report-barrier")
-    val barrier = new DistributedBarrier(client, s"/${jobManager.getJobId}-report-barrier")
-    barrier.removeBarrier()
-
-    consumer = MessagingClientUtil.setupMessaging(address)
-
-    log.info(s"Job ${jobManager.getJobId} initiated with ${jobManager.getRegisteredActions.size} actions")
-
-    jarPath = new Path(config.YARN.hdfsJarsPath)
-
-    // TODO: change this to read all dist folder and add to exec path
-    executorPath = Path.mergePaths(jarPath, new Path(s"/dist/executor-${config.version}-all.jar"))
-    log.info("Executor jar path is {}", executorPath)
-    executorJar = setLocalResourceFromPath(executorPath)
-    propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/amaterasu.properties")))
-    log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/log4j.properties")))
-
-    log.info("Started execute")
-
-    nmClient = new NMClientAsyncImpl(new YarnNMCallbackHandler())
-
-    // Initialize clients to ResourceManager and NodeManagers
-    nmClient.init(conf)
-    nmClient.start()
-
-    // TODO: awsEnv currently set to empty string. should be changed to read values from (where?).
-    allocListener = new YarnRMCallbackHandler(nmClient, jobManager, env, awsEnv = "", config, executorJar)
-
-    rmClient = startRMClient()
-    val registrationResponse = registerAppMaster("", 0, "")
-    val maxMem = registrationResponse.getMaximumResourceCapability.getMemory
-    log.info("Max mem capability of resources in this cluster " + maxMem)
-    val maxVCores = registrationResponse.getMaximumResourceCapability.getVirtualCores
-    log.info("Max vcores capability of resources in this cluster " + maxVCores)
-    log.info(s"Created jobManager. jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}")
-
-    // Resource requirements for worker containers
-    this.capability = Records.newRecord(classOf[Resource])
-    val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
-
-    while (!jobManager.getOutOfActions) {
-      val actionData = jobManager.getNextActionData
-      if (actionData != null) {
-
-        val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
-        val driverConfiguration = frameworkProvider.getDriverConfiguration
-
-        var mem: Int = driverConfiguration.getMemory
-        mem = Math.min(mem, maxMem)
-        this.capability.setMemory(mem)
-
-        var cpu = driverConfiguration.getCpus
-        cpu = Math.min(cpu, maxVCores)
-        this.capability.setVirtualCores(cpu)
-
-        askContainer(actionData)
-      }
-    }
-
-    log.info("Finished asking for containers")
-  }
-
-  private def startRMClient(): AMRMClientAsync[ContainerRequest] = {
-    val client = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](1000, this)
-    client.init(conf)
-    client.start()
-    client
-  }
-
-  private def registerAppMaster(host: String, port: Int, url: String) = {
-    // Register with ResourceManager
-    log.info("Registering application")
-    val registrationResponse = rmClient.registerApplicationMaster(host, port, url)
-    log.info("Registered application")
-    registrationResponse
-  }
-
-//  private def setupMessaging(jobId: String): Unit = {
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements.  See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License.  You may obtain a copy of the License at
+// *
+// *      http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//package org.apache.amaterasu.leader.yarn
 //
-//    val cf = new ActiveMQConnectionFactory(address)
-//    val conn = cf.createConnection()
-//    conn.start()
+//import java.io.{File, FileInputStream, InputStream}
+//import java.net.{InetAddress, ServerSocket}
+//import java.nio.ByteBuffer
+//import java.util
+//import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
 //
-//    val session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
-//    //TODO: move to a const in common
-//    val destination = session.createTopic("JOB.REPORT")
+//import javax.jms.MessageConsumer
+//import org.apache.activemq.broker.BrokerService
+//import org.apache.amaterasu.common.configuration.ClusterConfig
+//import org.apache.amaterasu.common.dataobjects.ActionData
+//import org.apache.amaterasu.common.logging.Logging
+//import org.apache.amaterasu.leader.common.execution.JobManager
+//import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
+//import org.apache.amaterasu.leader.common.utilities.MessagingClientUtil
+//import org.apache.amaterasu.leader.execution.JobLoader
+//import org.apache.amaterasu.leader.utilities.Args
+//import org.apache.curator.framework.recipes.barriers.DistributedBarrier
+//import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
+//import org.apache.curator.retry.ExponentialBackoffRetry
+//import org.apache.hadoop.fs.{FileSystem, Path}
+//import org.apache.hadoop.io.DataOutputBuffer
+//import org.apache.hadoop.security.UserGroupInformation
+//import org.apache.hadoop.yarn.api.records._
+//import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+//import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl
+//import org.apache.hadoop.yarn.client.api.async.{AMRMClientAsync, NMClientAsync}
+//import org.apache.hadoop.yarn.conf.YarnConfiguration
+//import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
+//import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
+//import org.apache.zookeeper.CreateMode
 //
-//    val consumer = session.createConsumer(destination)
-//    consumer.setMessageListener(new ActiveReportListener)
+//import scala.collection.JavaConversions._
+//import scala.collection.JavaConverters._
+//import scala.collection.{concurrent, mutable}
+//import scala.concurrent.ExecutionContext.Implicits.global
+//import scala.concurrent.Future
+//import scala.util.{Failure, Success}
+//
+//class ApplicationMaster extends Logging with AMRMClientAsync.CallbackHandler {
+//
+//  var capability: Resource = _
+//
+//  log.info("ApplicationMaster start")
+//
+//  private var jobManager: JobManager = _
+//  private var client: CuratorFramework = _
+//  private var config: ClusterConfig = _
+//  private var env: String = _
+//  private var branch: String = _
+//  private var fs: FileSystem = _
+//  private var conf: YarnConfiguration = _
+//  private var propPath: String = ""
+//  private var props: InputStream = _
+//  private var jarPath: Path = _
+//  private var executorPath: Path = _
+//  private var executorJar: LocalResource = _
+//  private var propFile: LocalResource = _
+//  private var log4jPropFile: LocalResource = _
+//  private var nmClient: NMClientAsync = _
+//  private var allocListener: YarnRMCallbackHandler = _
+//  private var rmClient: AMRMClientAsync[ContainerRequest] = _
+//  private var address: String = _
+//  private var consumer: MessageConsumer = _
+//
+//  private val containersIdsToTask: concurrent.Map[Long, ActionData] = new ConcurrentHashMap[Long, ActionData].asScala
+//  private val completedContainersAndTaskIds: concurrent.Map[Long, String] = new ConcurrentHashMap[Long, String].asScala
+//  private val actionsBuffer: java.util.concurrent.ConcurrentLinkedQueue[ActionData] = new java.util.concurrent.ConcurrentLinkedQueue[ActionData]()
+//  private val host: String = InetAddress.getLocalHost.getHostName
+//  private val broker: BrokerService = new BrokerService()
+//
+//
+//  def setLocalResourceFromPath(path: Path): LocalResource = {
+//
+//    val stat = fs.getFileStatus(path)
+//    val fileResource = Records.newRecord(classOf[LocalResource])
+//
+//    fileResource.setShouldBeUploadedToSharedCache(true)
+//    fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
+//    fileResource.setResource(ConverterUtils.getYarnUrlFromPath(path))
+//    fileResource.setSize(stat.getLen)
+//    fileResource.setTimestamp(stat.getModificationTime)
+//    fileResource.setType(LocalResourceType.FILE)
+//    fileResource.setVisibility(LocalResourceVisibility.PUBLIC)
+//    fileResource
 //
 //  }
-
-  private def askContainer(actionData: ActionData): Unit = {
-
-    actionsBuffer.add(actionData)
-    log.info(s"About to ask container for action ${actionData.getId}. Action buffer size is: ${actionsBuffer.size()}")
-
-    // we have an action to schedule, let's request a container
-    val priority: Priority = Records.newRecord(classOf[Priority])
-    priority.setPriority(1)
-    val containerReq = new ContainerRequest(capability, null, null, priority)
-    rmClient.addContainerRequest(containerReq)
-    log.info(s"Asked container for action ${actionData.getId}")
-
-  }
-
-  override def onContainersAllocated(containers: util.List[Container]): Unit = {
-
-    log.info(s"${containers.size()} Containers allocated")
-    for (container <- containers.asScala) { // Launch container by create ContainerLaunchContext
-      if (actionsBuffer.isEmpty) {
-        log.warn(s"Why actionBuffer empty and i was called?. Container ids: ${containers.map(c => c.getId.getContainerId)}")
-        return
-      }
-
-      val actionData = actionsBuffer.poll()
-      val containerTask = Future[ActionData] {
-
-        val frameworkFactory = FrameworkProvidersFactory(env, config)
-        val framework = frameworkFactory.getFramework(actionData.getGroupId)
-        val runnerProvider = framework.getRunnerProvider(actionData.getTypeId)
-        val ctx = Records.newRecord(classOf[ContainerLaunchContext])
-        val commands: List[String] = List(runnerProvider.getCommand(jobManager.getJobId, actionData, env, s"${actionData.getId}-${container.getId.getContainerId}", address))
-
-        log.info("Running container id {}.", container.getId.getContainerId)
-        log.info("Running container id {} with command '{}'", container.getId.getContainerId, commands.last)
-
-        ctx.setCommands(commands)
-        ctx.setTokens(allTokens)
-
-        val yarnJarPath = new Path(config.YARN.hdfsJarsPath)
-
-        //TODO Eyal - Remove the hardcoding of the dist path
-        /*  val resources = mutable.Map[String, LocalResource]()
-          val binaryFileIter = fs.listFiles(new Path(s"${config.YARN.hdfsJarsPath}/dist"), false)
-          while (binaryFileIter.hasNext) {
-            val eachFile = binaryFileIter.next().getPath
-            resources (eachFile.getName) = setLocalResourceFromPath(fs.makeQualified(eachFile))
-          }
-          resources("log4j.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/log4j.properties")))
-          resources ("amaterasu.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/amaterasu.properties")))*/
-
-        val resources = mutable.Map[String, LocalResource](
-          "executor.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/executor-${config.version}-all.jar"))),
-          "spark-runner.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runner-${config.version}-all.jar"))),
-          "spark-runtime.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runtime-${config.version}.jar"))),
-          "amaterasu.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/amaterasu.properties"))),
-          "log4j.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/log4j.properties"))),
-          // TODO: Nadav/Eyal all of these should move to the executor resource setup
-          "miniconda.sh" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/miniconda.sh"))),
-          "codegen.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/codegen.py"))),
-          "runtime.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/runtime.py"))),
-          "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark-version-info.properties"))),
-          "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark_intp.py"))))
-
-        //adding the framework and executor resources
-        setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
-        setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.getTypeId}", resources, s"${framework.getGroupIdentifier}-${actionData.getTypeId}")
-
-        ctx.setLocalResources(resources)
-
-        ctx.setEnvironment(Map[String, String](
-          "HADOOP_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
-          "YARN_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
-          "AMA_NODE" -> sys.env("AMA_NODE"),
-          "HADOOP_USER_NAME" -> UserGroupInformation.getCurrentUser.getUserName
-        ))
-
-        log.info(s"hadoop conf dir is ${config.YARN.hadoopHomeDir}/conf/")
-        nmClient.startContainerAsync(container, ctx)
-        actionData
-      }
-
-      containerTask onComplete {
-        case Failure(t) =>
-          log.error(s"launching container Failed", t)
-          askContainer(actionData)
-
-        case Success(requestedActionData) =>
-          jobManager.actionStarted(requestedActionData.getId)
-          containersIdsToTask.put(container.getId.getContainerId, requestedActionData)
-          log.info(s"launching container succeeded: ${container.getId.getContainerId}; task: ${requestedActionData.getId}")
-
-      }
-    }
-  }
-
-  private def allTokens: ByteBuffer = {
-    // creating the credentials for container execution
-    val credentials = UserGroupInformation.getCurrentUser.getCredentials
-    val dob = new DataOutputBuffer
-    credentials.writeTokenStorageToStream(dob)
-
-    // removing the AM->RM token so that containers cannot access it.
-    val iter = credentials.getAllTokens.iterator
-    log.info("Executing with tokens:")
-    for (token <- iter) {
-      log.info(token.toString)
-      if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
-    }
-    ByteBuffer.wrap(dob.getData, 0, dob.getLength)
-  }
-
-  private def setupResources(yarnJarPath: Path, frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = {
-
-    val sourcePath = Path.mergePaths(yarnJarPath, new Path(s"/$resourcesPath"))
-
-    if (fs.exists(sourcePath)) {
-
-      val files = fs.listFiles(sourcePath, true)
-
-      while (files.hasNext) {
-        val res = files.next()
-        val containerPath = res.getPath.toUri.getPath.replace("/apps/amaterasu/", "")
-        countainerResources.put(containerPath, setLocalResourceFromPath(res.getPath))
-      }
-    }
-  }
-
-  def stopApplication(finalApplicationStatus: FinalApplicationStatus, appMessage: String): Unit = {
-    import java.io.IOException
-
-    import org.apache.hadoop.yarn.exceptions.YarnException
-    try
-      rmClient.unregisterApplicationMaster(finalApplicationStatus, appMessage, null)
-    catch {
-      case ex: YarnException =>
-        log.error("Failed to unregister application", ex)
-      case e: IOException =>
-        log.error("Failed to unregister application", e)
-    }
-    rmClient.stop()
-    nmClient.stop()
-  }
-
-  override def onContainersCompleted(statuses: util.List[ContainerStatus]): Unit = {
-
-    for (status <- statuses.asScala) {
-
-      if (status.getState == ContainerState.COMPLETE) {
-
-        val containerId = status.getContainerId.getContainerId
-        val task = containersIdsToTask(containerId)
-        rmClient.releaseAssignedContainer(status.getContainerId)
-
-        val taskId = task.getId
-        if (status.getExitStatus == 0) {
-
-          //completedContainersAndTaskIds.put(containerId, task.id)
-          jobManager.actionComplete(taskId)
-          log.info(s"Container $containerId Complete with task ${taskId} with success.")
-        } else {
-          // TODO: Check the getDiagnostics value and see if appropriate
-          jobManager.actionFailed(taskId, status.getDiagnostics)
-          log.warn(s"Container $containerId Complete with task ${taskId} with Failed status code (${status.getExitStatus})")
-        }
-      }
-    }
-
-    if (jobManager.getOutOfActions) {
-      log.info("Finished all tasks successfully! Wow!")
-      jobManager.actionsCount()
-      stopApplication(FinalApplicationStatus.SUCCEEDED, "SUCCESS")
-    } else {
-      log.info(s"jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}; completedContainersAndTaskIds.size: ${completedContainersAndTaskIds.size}")
-    }
-  }
-
-  override def getProgress: Float = {
-    jobManager.getRegisteredActions.size.toFloat / completedContainersAndTaskIds.size
-  }
-
-  override def onNodesUpdated(updatedNodes: util.List[NodeReport]): Unit = {
-    log.info("Nodes change. Nothing to report.")
-  }
-
-  override def onShutdownRequest(): Unit = {
-    log.error("Shutdown requested.")
-    stopApplication(FinalApplicationStatus.KILLED, "Shutdown requested")
-  }
-
-  override def onError(e: Throwable): Unit = {
-    log.error("Error on AM", e)
-    stopApplication(FinalApplicationStatus.FAILED, "Error on AM")
-  }
-
-  def initJob(args: Args): Unit = {
-
-    this.env = args.env
-    this.branch = args.branch
-    try {
-      val retryPolicy = new ExponentialBackoffRetry(1000, 3)
-      client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy)
-      client.start()
-    } catch {
-      case e: Exception =>
-        log.error("Error connecting to zookeeper", e)
-        throw e
-    }
-    if (args.jobId != null && !args.jobId.isEmpty) {
-      log.info("resuming job" + args.jobId)
-      jobManager = JobLoader.reloadJob(
-        args.jobId,
-        client,
-        config.Jobs.Tasks.attempts,
-        new LinkedBlockingQueue[ActionData])
-
-    } else {
-      log.info("new job is being created")
-      try {
-
-        jobManager = JobLoader.loadJob(
-          args.repo,
-          args.branch,
-          args.newJobId,
-          client,
-          config.Jobs.Tasks.attempts,
-          new LinkedBlockingQueue[ActionData])
-      } catch {
-        case e: Exception =>
-          log.error("Error creating JobManager.", e)
-          throw e
-      }
-
-    }
-
-    jobManager.start()
-    log.info("Started jobManager")
-  }
-}
-
-object ApplicationMaster extends Logging with App {
-
-
-  val parser = Args.getParser
-  parser.parse(args, Args()) match {
-
-    case Some(arguments: Args) =>
-      val appMaster = new ApplicationMaster()
-
-      appMaster.address = MessagingClientUtil.getBorkerAddress
-      appMaster.broker.addConnector(appMaster.address)
-      appMaster.broker.start()
-
-      log.info(s"broker Started with address ${appMaster.address}")
-      appMaster.execute(arguments)
-
-    case None =>
-  }
-
-
-}
+//
+//  def execute(arguments: Args): Unit = {
+//
+//    log.info(s"Started AM with args $arguments")
+//
+//    propPath = System.getenv("PWD") + "/amaterasu.properties"
+//    props = new FileInputStream(new File(propPath))
+//
+//    // no need for hdfs double check (nod to Aaron Rodgers)
+//    // jars on HDFS should have been verified by the YARN client
+//    conf = new YarnConfiguration()
+//    fs = FileSystem.get(conf)
+//
+//    config = ClusterConfig(props)
+//
+//    try {
+//      initJob(arguments)
+//    } catch {
+//      case e: Exception => log.error("error initializing ", e.getMessage)
+//    }
+//
+//    // now that the job was initiated, the curator client is Started and we can
+//    // register the broker's address
+//    client.create().withMode(CreateMode.PERSISTENT).forPath(s"/${jobManager.getJobId}/broker")
+//    client.setData().forPath(s"/${jobManager.getJobId}/broker", address.getBytes)
+//
+//    // once the broker is registered, we can remove the barrier so clients can connect
+//    log.info(s"/${jobManager.getJobId}-report-barrier")
+//    val barrier = new DistributedBarrier(client, s"/${jobManager.getJobId}-report-barrier")
+//    barrier.removeBarrier()
+//
+//    consumer = MessagingClientUtil.setupMessaging(address)
+//
+//    log.info(s"Job ${jobManager.getJobId} initiated with ${jobManager.getRegisteredActions.size} actions")
+//
+//    jarPath = new Path(config.YARN.hdfsJarsPath)
+//
+//    // TODO: change this to read all dist folder and add to exec path
+//    executorPath = Path.mergePaths(jarPath, new Path(s"/dist/executor-${config.version}-all.jar"))
+//    log.info("Executor jar path is {}", executorPath)
+//    executorJar = setLocalResourceFromPath(executorPath)
+//    propFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/amaterasu.properties")))
+//    log4jPropFile = setLocalResourceFromPath(Path.mergePaths(jarPath, new Path("/log4j.properties")))
+//
+//    log.info("Started execute")
+//
+//    nmClient = new NMClientAsyncImpl(new YarnNMCallbackHandler())
+//
+//    // Initialize clients to ResourceManager and NodeManagers
+//    nmClient.init(conf)
+//    nmClient.start()
+//
+//    // TODO: awsEnv currently set to empty string. should be changed to read values from (where?).
+//    allocListener = new YarnRMCallbackHandler(nmClient, jobManager, env, awsEnv = "", config, executorJar)
+//
+//    rmClient = startRMClient()
+//    val registrationResponse = registerAppMaster("", 0, "")
+//    val maxMem = registrationResponse.getMaximumResourceCapability.getMemory
+//    log.info("Max mem capability of resources in this cluster " + maxMem)
+//    val maxVCores = registrationResponse.getMaximumResourceCapability.getVirtualCores
+//    log.info("Max vcores capability of resources in this cluster " + maxVCores)
+//    log.info(s"Created jobManager. jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}")
+//
+//    // Resource requirements for worker containers
+//    this.capability = Records.newRecord(classOf[Resource])
+//    val frameworkFactory = FrameworkProvidersFactory.apply(env, config)
+//
+//    while (!jobManager.getOutOfActions) {
+//      val actionData = jobManager.getNextActionData
+//      if (actionData != null) {
+//
+//        val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
+//        val driverConfiguration = frameworkProvider.getDriverConfiguration
+//
+//        var mem: Int = driverConfiguration.getMemory
+//        mem = Math.min(mem, maxMem)
+//        this.capability.setMemory(mem)
+//
+//        var cpu = driverConfiguration.getCpus
+//        cpu = Math.min(cpu, maxVCores)
+//        this.capability.setVirtualCores(cpu)
+//
+//        askContainer(actionData)
+//      }
+//    }
+//
+//    log.info("Finished asking for containers")
+//  }
+//
+//    private def startRMClient(): AMRMClientAsync[ContainerRequest] = {
+//      val client = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](1000, this)
+//      client.init(conf)
+//      client.start()
+//      client
+//    }
+//
+//  private def registerAppMaster(host: String, port: Int, url: String) = {
+//    // Register with ResourceManager
+//    log.info("Registering application")
+//    val registrationResponse = rmClient.registerApplicationMaster(host, port, url)
+//    log.info("Registered application")
+//    registrationResponse
+//  }
+//
+////  private def setupMessaging(jobId: String): Unit = {
+////
+////    val cf = new ActiveMQConnectionFactory(address)
+////    val conn = cf.createConnection()
+////    conn.start()
+////
+////    val session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE)
+////    //TODO: move to a const in common
+////    val destination = session.createTopic("JOB.REPORT")
+////
+////    val consumer = session.createConsumer(destination)
+////    consumer.setMessageListener(new ActiveReportListener)
+////
+////  }
+//
+//  private def askContainer(actionData: ActionData): Unit = {
+//
+//    actionsBuffer.add(actionData)
+//    log.info(s"About to ask container for action ${actionData.getId}. Action buffer size is: ${actionsBuffer.size()}")
+//
+//    // we have an action to schedule, let's request a container
+//    val priority: Priority = Records.newRecord(classOf[Priority])
+//    priority.setPriority(1)
+//    val containerReq = new ContainerRequest(capability, null, null, priority)
+//    rmClient.addContainerRequest(containerReq)
+//    log.info(s"Asked container for action ${actionData.getId}")
+//
+//  }
+//
+//  override def onContainersAllocated(containers: util.List[Container]): Unit = {
+//
+//    log.info(s"${containers.size()} Containers allocated")
+//    for (container <- containers.asScala) { // Launch container by create ContainerLaunchContext
+//      if (actionsBuffer.isEmpty) {
+//        log.warn(s"Why actionBuffer empty and i was called?. Container ids: ${containers.map(c => c.getId.getContainerId)}")
+//        return
+//      }
+//
+//      val actionData = actionsBuffer.poll()
+//      val containerTask = Future[ActionData] {
+//
+//        val frameworkFactory = FrameworkProvidersFactory(env, config)
+//        val framework = frameworkFactory.getFramework(actionData.getGroupId)
+//        val runnerProvider = framework.getRunnerProvider(actionData.getTypeId)
+//        val ctx = Records.newRecord(classOf[ContainerLaunchContext])
+//        val commands: List[String] = List(runnerProvider.getCommand(jobManager.getJobId, actionData, env, s"${actionData.getId}-${container.getId.getContainerId}", address))
+//
+//        log.info("Running container id {}.", container.getId.getContainerId)
+//        log.info("Running container id {} with command '{}'", container.getId.getContainerId, commands.last)
+//
+//        ctx.setCommands(commands)
+//        ctx.setTokens(allTokens)
+//
+//        val yarnJarPath = new Path(config.YARN.hdfsJarsPath)
+//
+//        //TODO Eyal - Remove the hardcoding of the dist path
+//        /*  val resources = mutable.Map[String, LocalResource]()
+//          val binaryFileIter = fs.listFiles(new Path(s"${config.YARN.hdfsJarsPath}/dist"), false)
+//          while (binaryFileIter.hasNext) {
+//            val eachFile = binaryFileIter.next().getPath
+//            resources (eachFile.getName) = setLocalResourceFromPath(fs.makeQualified(eachFile))
+//          }
+//          resources("log4j.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/log4j.properties")))
+//          resources ("amaterasu.properties") = setLocalResourceFromPath(fs.makeQualified(new Path(s"${config.YARN.hdfsJarsPath}/amaterasu.properties")))*/
+//
+//        val resources = mutable.Map[String, LocalResource](
+//          "executor.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/executor-${config.version}-all.jar"))),
+//          "spark-runner.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runner-${config.version}-all.jar"))),
+//          "spark-runtime.jar" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path(s"/dist/spark-runtime-${config.version}.jar"))),
+//          "amaterasu.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/amaterasu.properties"))),
+//          "log4j.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/log4j.properties"))),
+//          // TODO: Nadav/Eyal all of these should move to the executor resource setup
+//          "miniconda.sh" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/miniconda.sh"))),
+//          "codegen.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/codegen.py"))),
+//          "runtime.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/runtime.py"))),
+//          "spark-version-info.properties" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark-version-info.properties"))),
+//          "spark_intp.py" -> setLocalResourceFromPath(Path.mergePaths(yarnJarPath, new Path("/dist/spark_intp.py"))))
+//
+//        //adding the framework and executor resources
+//        setupResources(yarnJarPath, framework.getGroupIdentifier, resources, framework.getGroupIdentifier)
+//        setupResources(yarnJarPath, s"${framework.getGroupIdentifier}/${actionData.getTypeId}", resources, s"${framework.getGroupIdentifier}-${actionData.getTypeId}")
+//
+//        ctx.setLocalResources(resources)
+//
+//        ctx.setEnvironment(Map[String, String](
+//          "HADOOP_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
+//          "YARN_CONF_DIR" -> s"${config.YARN.hadoopHomeDir}/conf/",
+//          "AMA_NODE" -> sys.env("AMA_NODE"),
+//          "HADOOP_USER_NAME" -> UserGroupInformation.getCurrentUser.getUserName
+//        ))
+//
+//        log.info(s"hadoop conf dir is ${config.YARN.hadoopHomeDir}/conf/")
+//        nmClient.startContainerAsync(container, ctx)
+//        actionData
+//      }
+//
+//      containerTask onComplete {
+//        case Failure(t) =>
+//          log.error(s"launching container Failed", t)
+//          askContainer(actionData)
+//
+//        case Success(requestedActionData) =>
+//          jobManager.actionStarted(requestedActionData.getId)
+//          containersIdsToTask.put(container.getId.getContainerId, requestedActionData)
+//          log.info(s"launching container succeeded: ${container.getId.getContainerId}; task: ${requestedActionData.getId}")
+//
+//      }
+//    }
+//  }
+//
+//  private def allTokens: ByteBuffer = {
+//    // creating the credentials for container execution
+//    val credentials = UserGroupInformation.getCurrentUser.getCredentials
+//    val dob = new DataOutputBuffer
+//    credentials.writeTokenStorageToStream(dob)
+//
+//    // removing the AM->RM token so that containers cannot access it.
+//    val iter = credentials.getAllTokens.iterator
+//    log.info("Executing with tokens:")
+//    for (token <- iter) {
+//      log.info(token.toString)
+//      if (token.getKind == AMRMTokenIdentifier.KIND_NAME) iter.remove()
+//    }
+//    ByteBuffer.wrap(dob.getData, 0, dob.getLength)
+//  }
+//
+//  private def setupResources(yarnJarPath: Path, frameworkPath: String, countainerResources: mutable.Map[String, LocalResource], resourcesPath: String): Unit = {
+//
+//    val sourcePath = Path.mergePaths(yarnJarPath, new Path(s"/$resourcesPath"))
+//
+//    if (fs.exists(sourcePath)) {
+//
+//      val files = fs.listFiles(sourcePath, true)
+//
+//      while (files.hasNext) {
+//        val res = files.next()
+//        val containerPath = res.getPath.toUri.getPath.replace("/apps/amaterasu/", "")
+//        countainerResources.put(containerPath, setLocalResourceFromPath(res.getPath))
+//      }
+//    }
+//  }
+//
+//  def stopApplication(finalApplicationStatus: FinalApplicationStatus, appMessage: String): Unit = {
+//    import java.io.IOException
+//
+//    import org.apache.hadoop.yarn.exceptions.YarnException
+//    try
+//      rmClient.unregisterApplicationMaster(finalApplicationStatus, appMessage, null)
+//    catch {
+//      case ex: YarnException =>
+//        log.error("Failed to unregister application", ex)
+//      case e: IOException =>
+//        log.error("Failed to unregister application", e)
+//    }
+//    rmClient.stop()
+//    nmClient.stop()
+//  }
+//
+//  override def onContainersCompleted(statuses: util.List[ContainerStatus]): Unit = {
+//
+//    for (status <- statuses.asScala) {
+//
+//      if (status.getState == ContainerState.COMPLETE) {
+//
+//        val containerId = status.getContainerId.getContainerId
+//        val task = containersIdsToTask(containerId)
+//        rmClient.releaseAssignedContainer(status.getContainerId)
+//
+//        val taskId = task.getId
+//        if (status.getExitStatus == 0) {
+//
+//          //completedContainersAndTaskIds.put(containerId, task.id)
+//          jobManager.actionComplete(taskId)
+//          log.info(s"Container $containerId Complete with task ${taskId} with success.")
+//        } else {
+//          // TODO: Check the getDiagnostics value and see if appropriate
+//          jobManager.actionFailed(taskId, status.getDiagnostics)
+//          log.warn(s"Container $containerId Complete with task ${taskId} with Failed status code (${status.getExitStatus})")
+//        }
+//      }
+//    }
+//
+//    if (jobManager.getOutOfActions) {
+//      log.info("Finished all tasks successfully! Wow!")
+//      jobManager.actionsCount()
+//      stopApplication(FinalApplicationStatus.SUCCEEDED, "SUCCESS")
+//    } else {
+//      log.info(s"jobManager.registeredActions.size: ${jobManager.getRegisteredActions.size}; completedContainersAndTaskIds.size: ${completedContainersAndTaskIds.size}")
+//    }
+//  }
+//
+//  override def getProgress: Float = {
+//    jobManager.getRegisteredActions.size.toFloat / completedContainersAndTaskIds.size
+//  }
+//
+//  override def onNodesUpdated(updatedNodes: util.List[NodeReport]): Unit = {
+//    log.info("Nodes change. Nothing to report.")
+//  }
+//
+//  override def onShutdownRequest(): Unit = {
+//    log.error("Shutdown requested.")
+//    stopApplication(FinalApplicationStatus.KILLED, "Shutdown requested")
+//  }
+//
+//  override def onError(e: Throwable): Unit = {
+//    log.error("Error on AM", e)
+//    stopApplication(FinalApplicationStatus.FAILED, "Error on AM")
+//  }
+//
+//  def initJob(args: Args): Unit = {
+//
+//    this.env = args.env
+//    this.branch = args.branch
+//    try {
+//      val retryPolicy = new ExponentialBackoffRetry(1000, 3)
+//      client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy)
+//      client.start()
+//    } catch {
+//      case e: Exception =>
+//        log.error("Error connecting to zookeeper", e)
+//        throw e
+//    }
+//    if (args.jobId != null && !args.jobId.isEmpty) {
+//      log.info("resuming job" + args.jobId)
+//      jobManager = JobLoader.reloadJob(
+//        args.jobId,
+//        client,
+//        config.Jobs.Tasks.attempts,
+//        new LinkedBlockingQueue[ActionData])
+//
+//    } else {
+//      log.info("new job is being created")
+//      try {
+//
+//        jobManager = JobLoader.loadJob(
+//          args.repo,
+//          args.branch,
+//          args.newJobId,
+//          client,
+//          config.Jobs.Tasks.attempts,
+//          new LinkedBlockingQueue[ActionData])
+//      } catch {
+//        case e: Exception =>
+//          log.error("Error creating JobManager.", e)
+//          throw e
+//      }
+//
+//    }
+//
+//    jobManager.start()
+//    log.info("Started jobManager")
+//  }
+//}
+//
+//object ApplicationMaster extends Logging with App {
+//
+//
+//  val parser = Args.getParser
+//  parser.parse(args, Args()) match {
+//
+//    case Some(arguments: Args) =>
+//      val appMaster = new ApplicationMaster()
+//
+//      appMaster.address = MessagingClientUtil.getBorkerAddress
+//      appMaster.broker.addConnector(appMaster.address)
+//      appMaster.broker.start()
+//
+//      log.info(s"broker Started with address ${appMaster.address}")
+//      appMaster.execute(arguments)
+//
+//    case None =>
+//  }
+//
+//
+//}
diff --git a/sdk/build.gradle b/sdk/build.gradle
index 585b1c0..3cc9227 100644
--- a/sdk/build.gradle
+++ b/sdk/build.gradle
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 buildscript {
-    ext.kotlin_version = '1.2.71'
+    ext.kotlin_version = '1.3.21'
 
     repositories {
         mavenCentral()