Merge branch 'master' into AMATERASU-36-mesos
diff --git a/.gitignore b/.gitignore
index eaefdef..fce2c13 100755
--- a/.gitignore
+++ b/.gitignore
@@ -52,4 +52,4 @@
 repo/**
 
 #python
-.zip
\ No newline at end of file
+*.zip
\ No newline at end of file
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/Export.scala b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/Export.scala
deleted file mode 100644
index 8db81bc..0000000
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/Export.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.dataobjects
-
-case class Export(dataset: String, format: String)
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/JobData.scala b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/JobData.scala
deleted file mode 100755
index acf5c4a..0000000
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/JobData.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.dataobjects
-
-import org.joda.time.DateTime
-
-case class JobData(src: String, branch: String = "master", id: String, timeCreated: DateTime, startTime: DateTime, endTime: DateTime)
\ No newline at end of file
diff --git a/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala b/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala
deleted file mode 100644
index 9813296..0000000
--- a/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.utils
-
-import java.io.File
-
-object FileUtils {
-
-  def getAllFiles(dir: File): Array[File] = {
-    val these = dir.listFiles
-    these ++ these.filter(_.isDirectory).flatMap(getAllFiles)
-  }
-
-}
diff --git a/executor/build.gradle b/executor/build.gradle
deleted file mode 100644
index 443fc38..0000000
--- a/executor/build.gradle
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-plugins {
-    id 'com.github.johnrengelman.shadow' version '1.2.4'
-    id 'com.github.maiflai.scalatest' version '0.22'
-    id 'scala'
-    id 'java'
-}
-
-shadowJar {
-    zip64 true
-}
-
-//sourceCompatibility = 1.8
-//targetCompatibility = 1.8
-
-repositories {
-    maven {
-        url "https://plugins.gradle.org/m2/"
-    }
-    mavenCentral()
-}
-
-test {
-    maxParallelForks = 1
-    forkEvery = 1
-}
-
-configurations {
-    provided
-}
-
-sourceSets {
-    main.compileClasspath += configurations.provided
-    test.compileClasspath += configurations.provided
-    test.runtimeClasspath += configurations.provided
-}
-
-dependencies {
-
-    compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8'
-    compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
-    compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final'
-    compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
-    compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
-    compile group: 'org.reflections', name: 'reflections', version: '0.9.10'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
-    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.9.8'
-
-    compile('com.jcabi:jcabi-aether:0.10.1') {
-        exclude group: 'org.jboss.netty'
-    }
-
-
-    compile project(':common')
-    compile project(':amaterasu-sdk')
-
-
-
-}
-
-sourceSets {
-    test {
-        resources.srcDirs += [file('src/test/resources')]
-    }
-
-    // this is done so Scala will compile before Java
-    main {
-        scala {
-            srcDirs = ['src/main/scala', 'src/main/java']
-        }
-        java {
-            srcDirs = []
-        }
-    }
-}
-
-test {
-
-    maxParallelForks = 1
-}
-
-task copyToHome(type: Copy) {
-    dependsOn shadowJar
-    from 'build/libs'
-    into '../build/amaterasu/dist'
-    from 'build/resources/main'
-    into '../build/amaterasu/dist'
-}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
deleted file mode 100644
index 5f78bc2..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.common.executors
-
-import java.io.ByteArrayOutputStream
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.ExecData
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider}
-import org.reflections.Reflections
-
-import scala.collection.JavaConversions._
-
-//TODO: Check if we can use this in the YARN impl
-class ProvidersFactory {
-
-  var providers: Map[String, RunnersProvider] = _
-
-  def getRunner(groupId: String, id: String): Option[AmaterasuRunner] = {
-    val provider = providers.get(groupId)
-    provider match {
-      case Some(provider) => Some(provider.getRunner(id))
-      case None => None
-    }
-  }
-}
-
-object ProvidersFactory {
-
-  def apply(data: ExecData,
-            jobId: String,
-            outStream: ByteArrayOutputStream,
-            notifier: Notifier,
-            executorId: String,
-            hostName: String,
-            propFile: String = null): ProvidersFactory = {
-
-    val result = new ProvidersFactory()
-    val reflections = new Reflections(getClass.getClassLoader)
-    val runnerTypes = reflections.getSubTypesOf(classOf[RunnersProvider]).toSet
-    val config = if (propFile != null) {
-      import java.io.FileInputStream
-      ClusterConfig.apply(new FileInputStream(propFile))
-    } else {
-      new ClusterConfig()
-    }
-
-    result.providers = runnerTypes.map(r => {
-
-      val provider = Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[RunnersProvider]
-
-      provider.init(data, jobId, outStream, notifier, executorId, config, hostName)
-      notifier.info(s"a provider for group ${provider.getGroupIdentifier} was created")
-      (provider.getGroupIdentifier, provider)
-    }).toMap
-
-    result
-  }
-
-}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
deleted file mode 100755
index f204db8..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.mesos.executors
-
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.mesos.Protos._
-import org.apache.mesos.protobuf.ByteString
-import org.apache.mesos.{Executor, ExecutorDriver, MesosExecutorDriver}
-
-import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-class MesosActionsExecutor extends Logging with Executor {
-
-  var master: String = _
-  var executorDriver: ExecutorDriver = _
-  var jobId: String = _
-  var actionName: String = _
-  //  var sparkScalaRunner: SparkScalaRunner = _
-  //  var pySparkRunner: PySparkRunner = _
-  var notifier: MesosNotifier = _
-  var providersFactory: ProvidersFactory = _
-
-  val mapper = new ObjectMapper()
-  mapper.registerModule(DefaultScalaModule)
-
-
-  override def shutdown(driver: ExecutorDriver) = {
-
-  }
-
-  override def killTask(driver: ExecutorDriver, taskId: TaskID) = ???
-
-  override def disconnected(driver: ExecutorDriver) = ???
-
-  override def reregistered(driver: ExecutorDriver, slaveInfo: SlaveInfo) = {
-    this.executorDriver = driver
-  }
-
-  override def error(driver: ExecutorDriver, message: String) = {
-
-    val status = TaskStatus.newBuilder
-      .setData(ByteString.copyFromUtf8(message))
-      .setState(TaskState.TASK_ERROR).build()
-
-    driver.sendStatusUpdate(status)
-
-  }
-
-  override def frameworkMessage(driver: ExecutorDriver, data: Array[Byte]) = ???
-
-  override def registered(driver: ExecutorDriver, executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo): Unit = {
-
-    this.executorDriver = driver
-    val data = mapper.readValue(new ByteArrayInputStream(executorInfo.getData.toByteArray), classOf[ExecData])
-
-    // this is used to resolve the spark drier address
-    val hostName = slaveInfo.getHostname
-    notifier = new MesosNotifier(driver)
-    notifier.info(s"Executor ${executorInfo.getExecutorId.getValue} registered")
-    val outStream = new ByteArrayOutputStream()
-    providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName, "./amaterasu.properties")
-
-  }
-
-  override def launchTask(driver: ExecutorDriver, taskInfo: TaskInfo): Unit = {
-
-
-    notifier.info(s"launching task: ${taskInfo.getTaskId.getValue}")
-    log.debug(s"launching task: $taskInfo")
-    val status = TaskStatus.newBuilder
-      .setTaskId(taskInfo.getTaskId)
-      .setState(TaskState.TASK_STARTING).build()
-
-    notifier.info(s"container info: ${taskInfo.getContainer.getDocker.getImage}, ${taskInfo.getContainer.getType}")
-
-    driver.sendStatusUpdate(status)
-
-    val task = Future {
-
-      val taskData = mapper.readValue(new ByteArrayInputStream(taskInfo.getData.toByteArray), classOf[TaskData])
-
-      val status = TaskStatus.newBuilder
-        .setTaskId(taskInfo.getTaskId)
-        .setState(TaskState.TASK_RUNNING).build()
-      driver.sendStatusUpdate(status)
-      val runner = providersFactory.getRunner(taskData.getGroupId, taskData.getTypeId)
-      runner match {
-        case Some(r) => r.executeSource(taskData.getSrc, actionName, taskData.getExports)
-        case None =>
-          notifier.error("", s"Runner not found for group: ${taskData.getGroupId}, type ${taskData.getTypeId}. Please verify the tasks")
-          None
-      }
-
-    }
-
-    task onComplete {
-
-      case Failure(t) =>
-        println(s"launching task Failed: ${t.getMessage}")
-        System.exit(1)
-
-      case Success(ts) =>
-
-        driver.sendStatusUpdate(TaskStatus.newBuilder()
-          .setTaskId(taskInfo.getTaskId)
-          .setState(TaskState.TASK_FINISHED).build())
-        notifier.info(s"Complete task: ${taskInfo.getTaskId.getValue}")
-
-    }
-
-  }
-
-}
-
-object MesosActionsExecutor extends Logging {
-
-  def main(args: Array[String]) {
-    System.loadLibrary("mesos")
-    log.debug("Starting a new ActionExecutor")
-
-    val executor = new MesosActionsExecutor
-    executor.jobId = args(0)
-    executor.master = args(1)
-    executor.actionName = args(2)
-
-    val driver = new MesosExecutorDriver(executor)
-    driver.run()
-  }
-
-}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala
deleted file mode 100755
index b256386..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.mesos.executors
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.execution.actions.enums.{NotificationLevel, NotificationType}
-import org.apache.amaterasu.common.execution.actions.{Notification, Notifier}
-import org.apache.mesos.ExecutorDriver
-
-
-class MesosNotifier(driver: ExecutorDriver) extends Notifier {
-
-  private val mapper = new ObjectMapper()
-  mapper.registerModule(DefaultScalaModule)
-
-  override def success(line: String): Unit = {
-
-
-    getLog.info(s"successfully executed line: $line")
-
-    val notification = new Notification(line, "", NotificationType.Success, NotificationLevel.Code)
-    val msg = mapper.writeValueAsBytes(notification)
-
-    driver.sendFrameworkMessage(msg)
-
-  }
-
-  override def error(line: String, message: String): Unit = {
-
-    getLog.error(s"Error executing line: $line message: $message")
-
-    val notification = new Notification(line, message, NotificationType.Error, NotificationLevel.Code)
-    val msg = mapper.writeValueAsBytes(notification)
-
-    driver.sendFrameworkMessage(msg)
-
-  }
-
-  override def info(message: String): Unit = {
-
-    getLog.info(message)
-
-    val notification = new Notification("", message, NotificationType.Info, NotificationLevel.Execution)
-    val msg = mapper.writeValueAsBytes(notification)
-
-    driver.sendFrameworkMessage(msg)
-  }
-}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
deleted file mode 100644
index cda6351..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.yarn.executors
-
-import java.io.ByteArrayOutputStream
-import java.net.{InetAddress, URLDecoder}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.utils.ActiveNotifier
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-
-import scala.collection.JavaConverters._
-
-
-class ActionsExecutor extends Logging {
-
-  var master: String = _
-  var jobId: String = _
-  var actionName: String = _
-  var taskData: TaskData = _
-  var execData: ExecData = _
-  var providersFactory: ProvidersFactory = _
-
-  def execute(): Unit = {
-    val runner = providersFactory.getRunner(taskData.getGroupId, taskData.getTypeId)
-    runner match {
-      case Some(r) => {
-        try {
-          r.executeSource(taskData.getSrc, actionName, taskData.getExports)
-          log.info("Completed action")
-          System.exit(0)
-        } catch {
-          case e: Exception => {
-            log.error("Exception in execute source", e)
-            System.exit(100)
-          }
-        }
-      }
-      case None =>
-        log.error("", s"Runner not found for group: ${taskData.getGroupId}, type ${taskData.getTypeId}. Please verify the tasks")
-        System.exit(101)
-    }
-  }
-}
-
-object ActionsExecutorLauncher extends Logging with App {
-
-  val hostName = InetAddress.getLocalHost.getHostName
-
-  println(s"Hostname resolved to: $hostName")
-  val mapper = new ObjectMapper()
-  mapper.registerModule(DefaultScalaModule)
-
-  log.info("Starting actions executor")
-
-  val jobId = this.args(0)
-  val master = this.args(1)
-  val actionName = this.args(2)
-  val notificationsAddress = this.args(6)
-
-  log.info("parsing task data")
-  val taskData = mapper.readValue(URLDecoder.decode(this.args(3), "UTF-8"), classOf[TaskData])
-  log.info("parsing executor data")
-  val execData = mapper.readValue(URLDecoder.decode(this.args(4), "UTF-8"), classOf[ExecData])
-  val taskIdAndContainerId = this.args(5)
-
-  val actionsExecutor: ActionsExecutor = new ActionsExecutor
-  actionsExecutor.master = master
-  actionsExecutor.actionName = actionName
-  actionsExecutor.taskData = taskData
-  actionsExecutor.execData = execData
-
-  log.info("Setup executor")
-  val baos = new ByteArrayOutputStream()
-  val notifier = new ActiveNotifier(notificationsAddress)
-
-  actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, notifier, taskIdAndContainerId, hostName, propFile = "./amaterasu.properties")
-  actionsExecutor.execute()
-}
diff --git a/executor/src/test/resources/codegen.pyc b/executor/src/test/resources/codegen.pyc
deleted file mode 100644
index ab32046..0000000
--- a/executor/src/test/resources/codegen.pyc
+++ /dev/null
Binary files differ
diff --git a/frameworks/python/dispatcher/build.gradle b/frameworks/python/dispatcher/build.gradle
index b1cbd39..35ab5aa 100644
--- a/frameworks/python/dispatcher/build.gradle
+++ b/frameworks/python/dispatcher/build.gradle
@@ -71,7 +71,8 @@
 dependencies {
     compile project(':common')
     compile project(':amaterasu-sdk')
-    compile project(':leader')
+    compile project(':leader-common')
+    
     compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
     compile "org.jetbrains.kotlin:kotlin-reflect"
     compile 'com.uchuhimo:konf:0.11'
diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
index 9d83c65..d5a691e 100644
--- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
+++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
@@ -30,18 +30,14 @@
     private var conf: ClusterConfig? = null
     private var runnerProviders: Map<String, RunnerSetupProvider> = mapOf()
 
-    override val groupIdentifier: String
-        get() = "python"
-    override val groupResources: Array<File>
-        get() = arrayOf()
+    override val groupIdentifier: String = "python"
+    override val groupResources: List<File> = listOf()
 
     override fun getDriverConfiguration(configManager: ConfigManager): DriverConfiguration {
         return DriverConfiguration(conf!!.taskMem(), 1) //TODO: this should be configured on env level
     }
-    override val environmentVariables: Map<String, String>
-        get() = mapOf()
-    override val configurationItems: Array<String>
-        get() = arrayOf()
+    override val environmentVariables: Map<String, String> = mapOf()
+    override val configurationItems: List<String> = listOf()
 
     override fun init(env: String, conf: ClusterConfig) {
         this.env = env
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
index e3e503f..7a5a42d 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
@@ -54,11 +54,11 @@
         }
     }
 
-    override val groupResources: Array<File> by lazy {
+    override val groupResources: List<File> by lazy {
         when (conf.mode()) {
-            "mesos" -> arrayOf(File("spark-${conf.webserver().sparkVersion()}.tgz"), File("spark-runtime-${conf.version()}.jar"))
-            "yarn" -> arrayOf(File("spark-runtime-${conf.version()}.jar"), File("executor-${conf.version()}-all.jar"), File(conf.spark().home()))
-            else -> arrayOf()
+            "mesos" -> listOf(File("spark-${conf.webserver().sparkVersion()}.tgz"), File("spark-runtime-${conf.version()}-all.jar"))
+            "yarn" -> listOf(File("spark-runtime-${conf.version()}-all.jar"),  File(conf.spark().home()))
+            else -> listOf()
         }
     }
 
@@ -67,7 +67,7 @@
     }
 
     override val groupIdentifier: String = "spark"
-    override val configurationItems = arrayOf("sparkProperties", "sparkOptions")
+    override val configurationItems = listOf("sparkProperties", "sparkOptions")
 
     override fun getDriverConfiguration(configManager: ConfigManager): DriverConfiguration {
         val sparkOptions: Map<String, Any> = configManager.config["sparkOptions"]
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
index f413fd1..a1b7a24 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
@@ -37,7 +37,7 @@
                 SparkCommandLineHelper.getOptions(sparkOptions) + " " +
                 SparkCommandLineHelper.getProperties(sparkProperties) + " " +
                 master +
-                " --jars spark-runtime-${conf.version()}.jar >&1"
+                " --jars spark-runtime-${conf.version()}-all.jar >&1"
     }
 
     override fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> = arrayOf()
diff --git a/frameworks/spark/runtime/build.gradle b/frameworks/spark/runtime/build.gradle
index 3bfd7d4..1f2659d 100644
--- a/frameworks/spark/runtime/build.gradle
+++ b/frameworks/spark/runtime/build.gradle
@@ -54,8 +54,7 @@
 }
 
 dependencies {
-
-    compile project(':executor')
+    compile project(':common')
     provided('org.apache.spark:spark-repl_2.11:2.2.1')
     provided('org.apache.spark:spark-core_2.11:2.2.1')
 
@@ -81,7 +80,8 @@
 }
 
 task copyToHome(type: Copy) {
-    dependsOn jar
+    dependsOn shadowJar
+
     from 'build/libs'
     into '../../../build/amaterasu/dist'
     from 'build/resources/main'
diff --git a/leader-common/build.gradle b/leader-common/build.gradle
index 928a444..b2734cf 100644
--- a/leader-common/build.gradle
+++ b/leader-common/build.gradle
@@ -34,7 +34,6 @@
 
 plugins {
     id "com.github.johnrengelman.shadow" version "2.0.4"
-    id 'scala'
 }
 
 apply plugin: 'kotlin'
@@ -66,12 +65,10 @@
 }
 
 dependencies {
-    compile 'org.scala-lang:scala-library:2.11.8'
 
     compile project(':common')
     compile project(':amaterasu-sdk')
 
-    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.3'
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
@@ -87,6 +84,16 @@
     compile group: 'com.beust', name: 'klaxon', version: '5.0.1'
     compile group: 'com.github.ajalt', name: 'clikt', version: '1.6.0'
 
+    compile group: 'org.eclipse.jetty', name: 'jetty-plus', version: '9.4.14.v20181114'
+    compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.4.14.v20181114'
+    compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.4.14.v20181114'
+    compile group: 'org.eclipse.jetty', name: 'jetty-io', version: '9.4.14.v20181114'
+    compile group: 'org.eclipse.jetty', name: 'jetty-servlet', version: '9.4.14.v20181114'
+    compile group: 'org.eclipse.jetty.toolchain', name: 'jetty-test-helper', version: '4.0'
+
+    compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
+    //compile group: 'io.ktor', name: 'ktor-client-core', version: '0.9.5'
+
     compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
     compile "org.jetbrains.kotlin:kotlin-reflect"
 
@@ -116,3 +123,8 @@
 compileTestKotlin {
     kotlinOptions.jvmTarget = "1.8"
 }
+
+task copyToHome(type: Copy) {
+    from 'src/main/scripts'
+    into '../build/amaterasu/'
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
index 0a2863b..0f63e1d 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
@@ -90,7 +90,7 @@
 
             val status = ActionStatus.valueOf(String(client.data.forPath("/$jobId/$task")))
             if (status == ActionStatus.Queued || status == ActionStatus.Started) {
-                jobManager.reQueueAction(task.substring(task.indexOf("task-") + 5))
+                jobManager.requeueAction(task.substring(task.indexOf("task-") + 5))
             }
 
         }
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
index 1853e8c..3c77ba2 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
@@ -72,7 +72,7 @@
             return nextAction
         }
 
-    fun reQueueAction(actionId: String) {
+    fun requeueAction(actionId: String) {
 
         log.info("requeing action $actionId")
         registeredActions.forEach { log.info("key ${it.key}") }
@@ -124,14 +124,14 @@
 
         val action = registeredActions[actionId]
         val id = action!!.handleFailure(message)
-        if (!id.isEmpty())
+        if (id.isNotEmpty())
             registeredActions[id]?.execute()
 
         //delete all future actions
         cancelFutureActions(action)
     }
 
-    fun cancelFutureActions(action: Action) {
+    private fun cancelFutureActions(action: Action) {
 
         if (action.data.status != ActionStatus.Failed)
             action.announceCanceled()
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.kt
similarity index 75%
rename from leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
rename to leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.kt
index 48f619a..18a2fc8 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.kt
@@ -14,24 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.amaterasu.leader.common.execution.frameworls
+package org.apache.amaterasu.leader.common.execution.frameworks
 
 import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.configuration.ConfigManager
 import org.apache.amaterasu.common.logging.KLogging
 import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
 import org.reflections.Reflections
 
 class FrameworkProvidersFactory(val env: String, val config: ClusterConfig) : KLogging() {
 
-    var  providers: Map<String, FrameworkSetupProvider>
+    val providers: MutableMap<String, FrameworkSetupProvider> = mutableMapOf()
+
+    val groups: List<String> by lazy { providers.keys.toList() }
+
+    fun getFramework(groupId: String): FrameworkSetupProvider = providers.getValue(groupId)
 
     init {
-        val reflections =  Reflections(ClassLoader::class.java)
+        val reflections =  Reflections(this::class.java.classLoader)
         val runnerTypes = reflections.getSubTypesOf(FrameworkSetupProvider::class.java)
 
-
-        providers = runnerTypes.map  {
+        val loadedProviders = runnerTypes.map {
 
             val provider = it.newInstance()
 
@@ -41,5 +43,7 @@
             provider.groupIdentifier to provider
 
         }.toMap()
+
+        providers.putAll(loadedProviders)
     }
 }
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.kt
index 6bda13c..f73d717 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.kt
@@ -41,7 +41,7 @@
         else -> println("===> Unknown message")
     }
 
-    private fun printNotification(notification: Notification) = when (notification.notType) {
+    fun printNotification(notification: Notification) = when (notification.notType) {
 
         NotificationType.Info ->
             println("===> ${notification.msg} ".brightWhite().bold())
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/HttpServer.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/HttpServer.kt
new file mode 100644
index 0000000..5b6563f
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/HttpServer.kt
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.utilities
+
+import org.apache.log4j.BasicConfigurator
+import org.apache.log4j.Level
+import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.server.ServerConnector
+import org.eclipse.jetty.server.handler.DefaultHandler
+import org.eclipse.jetty.server.handler.HandlerList
+import org.eclipse.jetty.server.handler.ResourceHandler
+import org.jsoup.Jsoup
+import org.jsoup.select.Elements
+import org.apache.log4j.Logger
+import org.eclipse.jetty.util.log.StdErrLog
+
+class HttpServer {
+    lateinit var server: Server
+
+    fun start(port: String, serverRoot: String) {
+        BasicConfigurator.configure()
+        initLogging()
+
+        server = Server()
+        val connector = ServerConnector(server)
+        connector.port = port.toInt()
+        server.addConnector(connector)
+
+        val handler = ResourceHandler()
+        handler.isDirectoriesListed = true
+        handler.welcomeFiles = arrayOf("index.html")
+        handler.resourceBase = serverRoot
+        val handlers = HandlerList()
+        handlers.handlers = arrayOf(handler, DefaultHandler())
+
+        server.handler = handlers
+        server.start()
+    }
+
+    private fun initLogging() {
+        System.setProperty("org.eclipse.jetty.util.log.class", StdErrLog::javaClass.name)
+        Logger.getLogger("org.eclipse.jetty").level = Level.ALL
+        Logger.getLogger("org.eclipse.jetty.websocket").level = Level.OFF
+    }
+
+    /**
+     * Method: getFilesInDirectory
+     * Description: provides a list of files in the given directory URL.
+     * @Params: amaNode: Hostname of the URL, port: Port # of the host, directory: destination directory to fetch files
+     * Note: Should the files in URL root be fetched, provide an empty value to directory.
+     */
+    fun getFilesInDirectory(amaNode: String, port: String, directory: String = ""): Array<String> {
+
+        val htmlDoc = Jsoup.connect("http://$amaNode:$port/$directory").get()
+        val files: Elements = htmlDoc.body().select("a")
+        return files.map { it.attr("href") }
+                .filter { !it.contains("..") }
+                .map { it.replace("/", "") }.toTypedArray()
+    }
+
+    fun stop() {
+        if (::server.isInitialized) {
+            server.stop()
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.scala b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.scala
deleted file mode 100644
index a748690..0000000
--- a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.common.execution.frameworks
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
-import org.reflections.Reflections
-
-import scala.collection.JavaConversions._
-
-class FrameworkProvidersFactory {
-  var providers: Map[String, FrameworkSetupProvider] = _
-
-  def groups: Array[String] = {
-    providers.keys.toArray
-  }
-
-  def getFramework(groupId: String): FrameworkSetupProvider = {
-    providers(groupId)
-  }
-
-}
-
-object FrameworkProvidersFactory extends Logging {
-
-  def apply(env: String, config: ClusterConfig): FrameworkProvidersFactory = {
-
-    val result = new FrameworkProvidersFactory()
-
-    val reflections = new Reflections(getClass.getClassLoader)
-    val runnerTypes = reflections.getSubTypesOf(classOf[FrameworkSetupProvider]).toSet
-
-    result.providers = runnerTypes.map(r => {
-
-      val provider = Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[FrameworkSetupProvider]
-
-      provider.init(env, config)
-      log.info(s"a provider for group ${provider.getGroupIdentifier} was created")
-      log.info(s"config = $config")
-      (provider.getGroupIdentifier, provider)
-
-    }).toMap
-
-    result
-  }
-}
\ No newline at end of file
diff --git a/leader/src/main/scripts/amaterasu.properties b/leader-common/src/main/scripts/amaterasu.properties
similarity index 100%
rename from leader/src/main/scripts/amaterasu.properties
rename to leader-common/src/main/scripts/amaterasu.properties
diff --git a/leader/src/main/scripts/log4j.properties b/leader-common/src/main/scripts/log4j.properties
similarity index 100%
rename from leader/src/main/scripts/log4j.properties
rename to leader-common/src/main/scripts/log4j.properties
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
index a7b2e47..35c4b0e 100644
--- a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
@@ -16,7 +16,6 @@
  */
 package org.apache.amaterasu.leader.common.configuration
 
-import com.uchuhimo.konf.source.yaml.toYaml
 import org.apache.amaterasu.common.configuration.ConfigManager
 import org.apache.amaterasu.common.configuration.Job
 import org.jetbrains.spek.api.Spek
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/ActionStatusTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/ActionStatusTests.kt
new file mode 100644
index 0000000..c7c3d99
--- /dev/null
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/ActionStatusTests.kt
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.dsl
+
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.leader.common.execution.actions.SequentialAction
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.test.TestingServer
+import org.apache.zookeeper.CreateMode
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import java.util.concurrent.LinkedBlockingQueue
+import kotlin.test.assertEquals
+import kotlin.test.assertNotNull
+
+class ActionStatusTests : Spek({
+
+    // setting up a testing zookeeper server (curator TestServer)
+    val retryPolicy = ExponentialBackoffRetry(1000, 3)
+    val server = TestingServer(2181, true)
+    val jobId = "job_${System.currentTimeMillis()}"
+
+
+    given("an action") {
+
+        val data = ActionData(ActionStatus.Pending, "test_action", "start.scala", "", "spark", "scala", "0000001", hashMapOf(), mutableListOf())
+        val queue = LinkedBlockingQueue<ActionData>()
+
+        val client = CuratorFrameworkFactory.newClient(server.connectString, retryPolicy)
+        client.start()
+
+        client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId")
+        val action = SequentialAction(data.name, data.src, "", data.groupId, data.typeId, mapOf(), jobId, queue, client, 1)
+
+        it("should queue it's ActionData int the job queue when executed") {
+            action.execute()
+            assertEquals(queue.peek().name, data.name)
+            assertEquals(queue.peek().src, data.src)
+        }
+
+        it("should also create a sequential znode for the task with the value of Queued") {
+            val taskStatus = client.data.forPath("/$jobId/task-0000000000")
+
+            assertNotNull(taskStatus)
+            assertEquals(String(taskStatus), "Queued")
+
+        }
+    }
+})
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtilTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtilTests.kt
index 3b9a229..db36abf 100644
--- a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtilTests.kt
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtilTests.kt
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.amaterasu.leader.common.dsl
 
 import org.jetbrains.spek.api.Spek
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobExecutionTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobExecutionTests.kt
new file mode 100644
index 0000000..af3c9d9
--- /dev/null
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobExecutionTests.kt
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.dsl
+
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.test.TestingServer
+import org.apache.zookeeper.CreateMode
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import java.util.concurrent.LinkedBlockingQueue
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+
+class JobExecutionTests : Spek({
+    val retryPolicy = ExponentialBackoffRetry(1000, 3)
+    val server = TestingServer(2183, true)
+    val client = CuratorFrameworkFactory.newClient(server.connectString, retryPolicy)
+    client.start()
+
+    val jobId = "job_${System.currentTimeMillis()}"
+
+    val yaml = this::class.java.getResource("/simple-maki.yml").readText()
+    val queue = LinkedBlockingQueue<ActionData>()
+    client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId")
+
+    given("a job parsed in to a JobManager") {
+        val job = JobParser.parse(jobId, yaml, queue, client, 1)
+
+        it("queue the first action when the JobManager.start method is called ") {
+            job.start()
+
+            assertEquals(queue.peek().name, "start")
+
+            // making sure that the status is reflected in zk
+            val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+            assertEquals(String(actionStatus), "Queued")
+        }
+
+        it("return the start action when calling getNextAction and dequeue it") {
+            assertEquals(job.nextActionData!!.name, "start")
+            assertEquals(queue.size, 0)
+        }
+
+        it("also changes the status of start to started") {
+            val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+            assertEquals(String(actionStatus), "Started")
+        }
+
+        it("is not out of actions when an action is still Pending") {
+            assertFalse { job.outOfActions }
+        }
+
+        it("will requeue a task when calling JobManager.requeueAction") {
+            job.requeueAction("0000000000")
+            val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+            assertEquals(String(actionStatus), "Queued")
+        }
+
+        it("will restart the task") {
+            val data = job.nextActionData
+            assertEquals(data!!.name, "start")
+
+            // making sure that the status is reflected in zk
+            val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+            assertEquals(String(actionStatus), "Started")
+        }
+
+        it("will mark the action as Complete when the actionComplete method is called") {
+            job.actionComplete("0000000000")
+            // making sure that the status is reflected in zk
+            val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+
+            assertEquals(String(actionStatus), "Complete")
+        }
+
+        on("completion of start, the next action step2 is queued") {
+            assertEquals(queue.peek().name, "step2")
+        }
+
+        it("should also be registered as queued in zk") {
+            val actionStatus = client.data.forPath("/$jobId/task-0000000001")
+            assertEquals(String(actionStatus), "Queued")
+        }
+
+        it("is marked as Started when JobManager.nextActionData is called") {
+            val data = job.nextActionData
+            assertEquals(data!!.name, "step2")
+        }
+
+        it("will start an error action JobManager.actionFailed is called") {
+            job.actionFailed("0000000001", "test failure")
+
+            assertEquals(queue.peek().name, "error-action")
+        }
+
+        it("marks the error action as Complete when the actionComplete method is called"){
+            job.actionComplete("0000000001-error")
+
+            // making sure that the status is reflected in zk
+            val actionStatus = client.data.forPath("/$jobId/task-0000000001-error")
+            assertEquals(String(actionStatus) ,"Complete")
+        }
+
+        it("will be out of actions when all actions have been executed"){
+            assertTrue { job.outOfActions }
+        }
+    }
+
+})
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobParserTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobParserTests.kt
new file mode 100644
index 0000000..9e4f53b
--- /dev/null
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobParserTests.kt
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.dsl
+
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.test.TestingServer
+import org.apache.zookeeper.CreateMode
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import java.util.concurrent.LinkedBlockingQueue
+import kotlin.test.assertEquals
+
+class JobParserTests : Spek({
+
+    val retryPolicy = ExponentialBackoffRetry(1000, 3)
+    val server = TestingServer(2187, true)
+    val client = CuratorFrameworkFactory.newClient(server.connectString, retryPolicy)
+    client.start()
+
+    val jobId = "job_${System.currentTimeMillis()}"
+    val queue = LinkedBlockingQueue<ActionData>()
+
+    // this will be performed by the job bootstrapper
+    client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId")
+
+    given("a simple-maki.yaml that is passed to a JobParser") {
+
+        val yaml = this::class.java.getResource("/simple-maki.yml").readText()
+        val job = JobParser.parse(jobId, yaml, queue, client, 1)
+
+        it("parse the job details correctly") {
+            assertEquals(job.name, "amaterasu-test")
+        }
+
+        it("also have two actions in the right order") {
+
+            assertEquals(job.registeredActions.size, 3)
+
+            assertEquals(job.registeredActions["0000000000"]!!.data.name, "start")
+            assertEquals(job.registeredActions["0000000001"]!!.data.name, "step2")
+            assertEquals(job.registeredActions["0000000001-error"]!!.data.name, "error-action")
+
+        }
+
+        it("also also parse action 'config' successfully") {
+            assertEquals(job.registeredActions["0000000000"]!!.data.config, "start-cfg.yaml")
+            assertEquals(job.registeredActions["0000000001"]!!.data.config, "step2-cfg.yaml")
+            assertEquals(job.registeredActions["0000000001-error"]!!.data.config, "error-cfg.yaml")
+        }
+    }
+})
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/utilities/HttpServerTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/utilities/HttpServerTests.kt
new file mode 100644
index 0000000..c79a819
--- /dev/null
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/utilities/HttpServerTests.kt
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.utilities
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jsoup.Jsoup
+import kotlin.test.assertEquals
+
+class HttpServerTests : Spek({
+    val resources = this::class.java.getResource("/jetty-test-data.txt").path.removeSuffix("/jetty-test-data.txt")
+
+    given("an Amaterasu Web server is started") {
+        val server = HttpServer()
+        server.start("8002", resources)
+
+        it("serve content and stop successfully") {
+            lateinit var data: String
+            try {
+                val html = Jsoup.connect("http://localhost:8002/jetty-test-data.txt").get().select("body").text()
+                data = html.toString()
+            } finally {
+                server.stop()
+            }
+            assertEquals(data, "This is a test file to download from Jetty webserver")
+        }
+    }
+
+    given("an Amaterasu Web server is started pointing to a sub dir with two files") {
+        val server = HttpServer()
+        server.start("8001", "$resources/dist")
+
+        it("list the files correctly") {
+            try {
+                val urls = server.getFilesInDirectory("localhost", "8001", "")
+                assertEquals(urls.size, 2)
+            } finally {
+                server.stop()
+            }
+        }
+    }
+})
\ No newline at end of file
diff --git a/leader/src/test/resources/dist/jetty-fileserver-test-file-1.txt b/leader-common/src/test/resources/dist/jetty-fileserver-test-file-1.txt
similarity index 100%
rename from leader/src/test/resources/dist/jetty-fileserver-test-file-1.txt
rename to leader-common/src/test/resources/dist/jetty-fileserver-test-file-1.txt
diff --git a/leader/src/test/resources/dist/jetty-fileserver-test-file-2.txt b/leader-common/src/test/resources/dist/jetty-fileserver-test-file-2.txt
similarity index 100%
rename from leader/src/test/resources/dist/jetty-fileserver-test-file-2.txt
rename to leader-common/src/test/resources/dist/jetty-fileserver-test-file-2.txt
diff --git a/leader/src/test/resources/jetty-test-data.txt b/leader-common/src/test/resources/jetty-test-data.txt
similarity index 100%
rename from leader/src/test/resources/jetty-test-data.txt
rename to leader-common/src/test/resources/jetty-test-data.txt
diff --git a/leader/src/test/resources/simple-maki.yml b/leader-common/src/test/resources/simple-maki.yml
similarity index 100%
rename from leader/src/test/resources/simple-maki.yml
rename to leader-common/src/test/resources/simple-maki.yml
diff --git a/leader-mesos/build.gradle b/leader-mesos/build.gradle
index cb08d6a..e0ade9f 100644
--- a/leader-mesos/build.gradle
+++ b/leader-mesos/build.gradle
@@ -14,7 +14,93 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+buildscript {
 
+    repositories {
+        mavenCentral()
+        maven {
+            url 'http://repository.jetbrains.com/all'
+        }
+        maven {
+            url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots"
+        }
+    }
 
+    dependencies {
+        classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
+        classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0'
+    }
+}
 
+plugins {
+    id "com.github.johnrengelman.shadow" version "2.0.4"
+}
 
+apply plugin: 'kotlin'
+apply plugin: 'org.junit.platform.gradle.plugin'
+
+junitPlatform {
+    filters {
+        engines {
+            include 'spek'
+        }
+    }
+}
+
+sourceCompatibility = 1.8
+targetCompatibility = 1.8
+
+shadowJar {
+    zip64 true
+}
+
+repositories {
+    maven { url "https://plugins.gradle.org/m2/" }
+    maven { url 'http://repository.jetbrains.com/all' }
+    maven { url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots" }
+    maven { url "http://dl.bintray.com/jetbrains/spek" }
+    maven { url "http://oss.jfrog.org/artifactory/oss-snapshot-local" }
+
+    mavenCentral()
+    jcenter()
+}
+
+dependencies {
+    compile project(':leader-common')
+    compile project(':amaterasu-sdk')
+
+    compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
+    compile "org.jetbrains.kotlin:kotlin-reflect"
+    compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.1.1'
+
+    testCompile 'org.jetbrains.spek:spek-api:1.1.5'
+    testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
+    testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5'
+
+    // Spek requires kotlin-reflect, can be omitted if already in the classpath
+    testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
+}
+
+task copyToHomeRoot(type: Copy) {
+    from 'src/main/scripts'
+    into '../build/amaterasu/'
+}
+
+task copyToHomeBin(type: Copy) {
+    dependsOn shadowJar
+    from 'build/libs'
+    into '../build/amaterasu/bin'
+}
+
+task copyToHome() {
+    dependsOn copyToHomeRoot
+    dependsOn copyToHomeBin
+}
+
+compileKotlin{
+    kotlinOptions.jvmTarget = "1.8"
+}
+
+compileTestKotlin {
+    kotlinOptions.jvmTarget = "1.8"
+}
diff --git a/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/Client.kt b/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/Client.kt
new file mode 100644
index 0000000..d2209cf
--- /dev/null
+++ b/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/Client.kt
@@ -0,0 +1,8 @@
+package org.apache.amaterasu.leader.mesos
+
+object Client {
+
+    @Throws(Exception::class)
+    @JvmStatic
+    fun main(args: Array<String>) = ClientArgsParser().main(args)
+}
\ No newline at end of file
diff --git a/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/ClientArgsParser.kt b/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/ClientArgsParser.kt
new file mode 100644
index 0000000..6b9a73e
--- /dev/null
+++ b/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/ClientArgsParser.kt
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.mesos
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.leader.common.launcher.AmaOpts
+import org.apache.amaterasu.leader.common.launcher.ArgsParser
+import org.apache.log4j.LogManager
+import org.apache.mesos.MesosSchedulerDriver
+import org.apache.mesos.Protos
+import java.io.FileInputStream
+
+class ClientArgsParser : ArgsParser() {
+
+    override fun run() {
+
+        val opts = AmaOpts(repo, branch, env, name, jobId, newJobId, report, home)
+
+        val config = ClusterConfig.apply(FileInputStream("${opts.home}/amaterasu.properties"))
+        val resume = opts.jobId.isNotEmpty()
+
+        LogManager.resetConfiguration()
+
+        val frameworkBuilder = Protos.FrameworkInfo.newBuilder()
+                .setName("${opts.name} - Amaterasu Job")
+                .setFailoverTimeout(config.timeout())
+                .setUser(config.user())
+
+        if (resume) {
+            frameworkBuilder.setId(Protos.FrameworkID.newBuilder().setValue(opts.jobId))
+        }
+
+        val framework = frameworkBuilder.build()
+
+        val masterAddress = "${config.master()}:${config.masterPort()}"
+
+        val scheduler = JobScheduler(
+                opts.repo,
+                opts.branch,
+                opts.userName,
+                opts.password,
+                opts.env,
+                resume,
+                config,
+                opts.report,
+                opts.home
+        )
+
+        val driver = MesosSchedulerDriver(scheduler, framework, masterAddress)
+
+        driver.run()
+
+    }
+}
+
diff --git a/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/JobScheduler.kt b/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/JobScheduler.kt
new file mode 100644
index 0000000..f19d0fa
--- /dev/null
+++ b/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/JobScheduler.kt
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.mesos
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import com.fasterxml.jackson.module.kotlin.readValue
+import com.fasterxml.jackson.module.kotlin.registerKotlinModule
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.configuration.ConfigManager
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.execution.actions.Notification
+import org.apache.amaterasu.common.execution.actions.enums.NotificationLevel
+import org.apache.amaterasu.common.execution.actions.enums.NotificationType
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.leader.common.execution.JobLoader
+import org.apache.amaterasu.leader.common.execution.JobManager
+import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
+import org.apache.amaterasu.leader.common.utilities.ActiveReportListener
+import org.apache.amaterasu.leader.common.utilities.DataLoader
+import org.apache.amaterasu.leader.common.utilities.HttpServer
+import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
+import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+import org.apache.commons.io.FileUtils
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.log4j.LogManager
+import org.apache.mesos.Protos
+import org.apache.mesos.Scheduler
+import org.apache.mesos.SchedulerDriver
+import org.apache.mesos.protobuf.ByteString
+import java.io.File
+import java.nio.file.Path
+import java.nio.file.Paths
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.LinkedBlockingQueue
+
+class JobScheduler(private val src: String,
+                   private val branch: String,
+                   private val username: String,
+                   private val password: String,
+                   private val env: String,
+                   private val resume: Boolean,
+                   private val config: ClusterConfig,
+                   private val report: String,
+                   private val home: String) : Scheduler, KLogging() {
+
+    private var client: CuratorFramework
+    private val server = HttpServer()
+    private val listener = ActiveReportListener()
+    private lateinit var jobManager: JobManager
+    private val mapper = ObjectMapper().registerKotlinModule()
+    private var reportLevel: NotificationLevel
+    private val offersToTaskIds = ConcurrentHashMap<String, String>()
+    private val taskIdsToActions = ConcurrentHashMap<Protos.TaskID, String>()
+    private lateinit var frameworkFactory: FrameworkProvidersFactory
+    private lateinit var configManager: ConfigManager
+    private val yamlMapper = ObjectMapper(YAMLFactory()).registerKotlinModule()
+
+    private val jarFile = File(this::class.java.protectionDomain.codeSource.location.path)
+    private val amaDist = File("${File(jarFile.parent).parent}/dist")
+
+
+    init {
+        LogManager.resetConfiguration()
+        server.start(config.webserver().Port(), "$home/${config.webserver().Root()}")
+
+        reportLevel = NotificationLevel.valueOf(report.capitalize())
+        val retryPolicy = ExponentialBackoffRetry(1000, 3)
+        client = CuratorFrameworkFactory.newClient(config.zk(), retryPolicy)
+        client.start()
+    }
+
+    /**
+     * creates a working dir for a job
+     * @param jobId: the id of the job (will be used as the jobs directory name as well)
+     */
+    private fun createJobDir(jobId: String) {
+        val amaHome = File(jarFile.parent).parent
+        val jobDir = "$amaHome/dist/$jobId/"
+
+        val dir = File(jobDir)
+        if (!dir.exists()) {
+            dir.mkdir()
+        }
+    }
+
+    override fun registered(driver: SchedulerDriver?, frameworkId: Protos.FrameworkID?, masterInfo: Protos.MasterInfo?) {
+        jobManager = if (!resume) {
+            JobLoader.loadJob(src,
+                    branch,
+                    frameworkId!!.value,
+                    username,
+                    password,
+                    client,
+                    config.jobs().tasks().attempts(),
+                    LinkedBlockingQueue<ActionData>())
+        } else {
+            JobLoader.reloadJob(frameworkId!!.value,
+                    username,
+                    password,
+                    client,
+                    config.jobs().tasks().attempts(),
+                    LinkedBlockingQueue<ActionData>())
+        }
+
+        jobManager.start()
+        createJobDir(jobManager.jobId)
+    }
+
+    override fun disconnected(driver: SchedulerDriver?) {}
+
+    override fun reregistered(driver: SchedulerDriver?, masterInfo: Protos.MasterInfo?) {}
+
+    override fun error(driver: SchedulerDriver?, message: String?) {
+        log.error(message)
+    }
+
+    override fun slaveLost(driver: SchedulerDriver?, slaveId: Protos.SlaveID?) {}
+
+    override fun executorLost(driver: SchedulerDriver?, executorId: Protos.ExecutorID?, slaveId: Protos.SlaveID?, status: Int) {}
+
+    override fun statusUpdate(driver: SchedulerDriver?, status: Protos.TaskStatus?) {
+        status?.let {
+
+            val actionName = taskIdsToActions[it.taskId]
+            when (it.state) {
+                Protos.TaskState.TASK_STARTING -> log.info("Task starting ...")
+                Protos.TaskState.TASK_RUNNING -> {
+                    jobManager.actionStarted(it.taskId.value)
+                    listener.printNotification(Notification("", "created container for $actionName created", NotificationType.Info, NotificationLevel.Execution))
+                }
+                Protos.TaskState.TASK_FINISHED -> {
+                    jobManager.actionComplete(it.taskId.value)
+                    listener.printNotification(Notification("", "Container ${it.executorId.value} Complete with task ${it.taskId.value} with success.", NotificationType.Info, NotificationLevel.Execution))
+                }
+                Protos.TaskState.TASK_FAILED,
+                Protos.TaskState.TASK_KILLED,
+                Protos.TaskState.TASK_ERROR,
+                Protos.TaskState.TASK_LOST -> {
+                    jobManager.actionFailed(it.taskId.value, it.message)
+                    listener.printNotification(Notification("", "error launching container with ${it.message} in ${it.data.toStringUtf8()}", NotificationType.Error, NotificationLevel.Execution))
+                }
+                else -> log.warn("WTF? just got unexpected task state: " + it.state)
+            }
+        }
+    }
+
+    override fun frameworkMessage(driver: SchedulerDriver?, executorId: Protos.ExecutorID?, slaveId: Protos.SlaveID?, data: ByteArray?) {
+
+        data?.let {
+            val notification: Notification = mapper.readValue(it)
+
+            when (reportLevel) {
+                NotificationLevel.Code -> listener.printNotification(notification)
+                NotificationLevel.Execution ->
+                    if (notification.notLevel != NotificationLevel.Code)
+                        listener.printNotification(notification)
+            }
+        }
+    }
+
+    override fun resourceOffers(driver: SchedulerDriver?, offers: MutableList<Protos.Offer>?) {
+        offers?.forEach {
+            when {
+                validateOffer(it) -> synchronized(jobManager) {
+                    val actionData = jobManager.nextActionData
+
+                    // if we got a new action to process
+                    actionData?.let { ad ->
+
+                        frameworkFactory = FrameworkProvidersFactory(env, config)
+                        val items = frameworkFactory.providers.values.flatMap { x -> x.configurationItems }
+                        configManager = ConfigManager(env, "repo", items)
+
+                        val taskId = Protos.TaskID.newBuilder().setValue(actionData.id).build()
+                        taskIdsToActions[taskId] = actionData.name
+
+                        createTaskConfiguration(ad)
+                        listener.printNotification(Notification("", "looking for ${actionData.groupId} provider", NotificationType.Info, NotificationLevel.Execution))
+                        val frameworkProvider = frameworkFactory.providers[actionData.groupId]
+                        listener.printNotification(Notification("", "found $frameworkProvider provider", NotificationType.Info, NotificationLevel.Execution))
+
+                        val runnerProvider = frameworkProvider!!.getRunnerProvider(actionData.typeId)
+
+                        listener.printNotification(Notification("", "provider ${runnerProvider::class.qualifiedName}", NotificationType.Info, NotificationLevel.Execution))
+                        val execData = DataLoader.getExecutorDataBytes(env, config)
+                        val executorId = taskId.value + "-" + UUID.randomUUID()
+
+                        val envConf = configManager.getActionConfiguration(ad.name, ad.config)
+                        val commandStr = runnerProvider.getCommand(jobManager.jobId, actionData, envConf, executorId, "")
+                        listener.printNotification(Notification("", "container command $commandStr", NotificationType.Info, NotificationLevel.Execution))
+
+                        val command = Protos.CommandInfo
+                                .newBuilder()
+                                .setValue(commandStr)
+
+                        // setting the container's resources
+                        val resources = setupContainerResources(frameworkProvider, runnerProvider, actionData)
+                        resources.forEach { r ->
+                            command.addUris(Protos.CommandInfo.URI.newBuilder()
+                                    .setValue(r)
+                                    .setExecutable(false)
+                                    .setExtract(true)
+                                    .build())
+                        }
+
+                        // setting the container's executable
+                        val executable: Path = getExecutable(runnerProvider, actionData)
+                        command.addUris(Protos.CommandInfo.URI.newBuilder()
+                                .setValue(toServingUri(executable.toString()))
+                                .setExecutable(false)
+                                .setExtract(false)
+                                .build())
+
+                        // setting the processes environment variables
+                        val envVarsList = frameworkProvider.environmentVariables.map { e ->
+                            Protos.Environment.Variable.newBuilder()
+                                    .setName(e.key)
+                                    .setValue(e.value)
+                                    .build()
+                        }
+                        command.setEnvironment(Protos.Environment.newBuilder().addAllVariables(envVarsList))
+
+                        val driverConfiguration = frameworkProvider.getDriverConfiguration(configManager)
+
+                        var actionTask: Protos.TaskInfo = if (runnerProvider.hasExecutor) {
+                            val executor = Protos.ExecutorInfo
+                                    .newBuilder()
+                                    .setData(ByteString.copyFrom(execData))
+                                    .setName(taskId.value)
+                                    .setExecutorId(Protos.ExecutorID.newBuilder().setValue(executorId))
+                                    .setCommand(command)
+                                    .build()
+
+                            Protos.TaskInfo.newBuilder()
+                                    .setName(taskId.value)
+                                    .setTaskId(taskId)
+                                    .setExecutor(executor)
+                                    .setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env)))
+                                    .addResources(createScalarResource("cpus", driverConfiguration.cpus.toDouble()))
+                                    .addResources(createScalarResource("mem", driverConfiguration.memory.toDouble()))
+                                    .addResources(createScalarResource("disk", config.jobs().repoSize().toDouble()))
+                                    .setSlaveId(it.slaveId)
+                                    .build()
+                            //slavesExecutors.put(offer.getSlaveId.getValue, executor)
+                        } else {
+                            Protos.TaskInfo.newBuilder()
+                                    .setName(taskId.value)
+                                    .setTaskId(taskId)
+                                    .setCommand(command)
+                                    .addResources(createScalarResource("cpus", driverConfiguration.cpus.toDouble()))
+                                    .addResources(createScalarResource("mem", driverConfiguration.memory.toDouble()))
+                                    .addResources(createScalarResource("disk", config.jobs().repoSize().toDouble()))
+                                    .setSlaveId(it.slaveId)
+                                    .build()
+                        }
+
+                        listener.printNotification(Notification("", "requesting container for ${actionData.name}", NotificationType.Info, NotificationLevel.Execution))
+                        driver?.launchTasks(Collections.singleton(it.id), listOf(actionTask))
+
+                    } ?: run {
+                        if (jobManager.outOfActions) {
+                            log.info("framework ${jobManager.jobId} execution finished")
+
+                            val repo = File("repo/")
+                            repo.delete()
+
+                            server.stop()
+                            driver?.declineOffer(it.id)
+                            driver?.stop()
+                            System.exit(0)
+                        } else {
+                            log.info("Declining offer, no sufficient resources")
+                            driver!!.declineOffer(it.id)
+                        }
+                    }
+                }
+
+            }
+        }
+    }
+
+    private fun getExecutable(runnerProvider: RunnerSetupProvider, actionData: ActionData): Path {
+        // setting up action executable
+        val sourcePath = File(runnerProvider.getActionExecutable(jobManager.jobId, actionData))
+        val executable: Path = if (actionData.hasArtifact) {
+            val relativePath = amaDist.toPath().root.relativize(sourcePath.toPath())
+            relativePath.subpath(amaDist.toPath().nameCount, relativePath.nameCount)
+        } else {
+            val dest = File("dist/${jobManager.jobId}/$sourcePath")
+            FileUtils.copyFile(sourcePath, dest)
+            Paths.get(jobManager.jobId, sourcePath.toPath().toString())
+        }
+        return executable
+    }
+
+    override fun offerRescinded(driver: SchedulerDriver?, offerId: Protos.OfferID?) {
+        offerId?.let {
+            val actionId = offersToTaskIds[it.value]
+            jobManager.requeueAction(actionId!!)
+        }
+    }
+
+    private fun validateOffer(offer: Protos.Offer): Boolean {
+        val resources = offer.resourcesList
+
+        return resources.count { it.name == "cpus" && it.scalar.value >= config.jobs().tasks().cpus() } > 0 &&
+                resources.count { it.name == "mem" && it.scalar.value >= config.jobs().tasks().mem() } > 0
+    }
+
+    private fun createTaskConfiguration(actionData: ActionData) {
+
+        // setting up the configuration files for the container
+        val envYaml = configManager.getActionConfigContent(actionData.name, actionData.config)
+        writeConfigFile(envYaml, jobManager.jobId, actionData.name, "env.yaml")
+
+        val dataStores = DataLoader.getTaskData(actionData, env).exports
+        val dataStoresYaml = yamlMapper.writeValueAsString(dataStores)
+        writeConfigFile(dataStoresYaml, jobManager.jobId, actionData.name, "datastores.yaml")
+
+        val datasets = DataLoader.getDatasets(env)
+        writeConfigFile(datasets, jobManager.jobId, actionData.name, "datasets.yaml")
+
+        writeConfigFile("jobId: ${jobManager.jobId}\nactionName: ${actionData.name}", jobManager.jobId, actionData.name, "runtime.yaml")
+
+    }
+
+    private fun writeConfigFile(configuration: String, jobId: String, actionName: String, fileName: String) {
+        val jarFile = File(this::class.java.protectionDomain.codeSource.location.path)
+        val amaHome = File(jarFile.parent).parent
+        val envLocation = "$amaHome/dist/$jobId/$actionName/"
+
+        val dir = File(envLocation)
+        if (!dir.exists()) {
+            dir.mkdirs()
+        }
+
+        File("$envLocation/$fileName").writeText(configuration)
+    }
+
+    private fun setupContainerResources(framework: FrameworkSetupProvider, runnerProvider: RunnerSetupProvider, actionData: ActionData): List<String> {
+        val result = mutableListOf<String>()
+
+        result.addAll(framework.groupResources.map { toServingUri(it.name) })
+        result.addAll(runnerProvider.runnerResources.map { toServingUri(it) })
+
+        val actionDeps = runnerProvider.getActionDependencies(jobManager.jobId, actionData).map {
+            FileUtils.copyFile(File(it), File("dist/$it"))
+            toServingUri(it)
+        }
+        result.addAll(actionDeps)
+
+        val actionResources = runnerProvider.getActionResources(jobManager.jobId, actionData).map { toServingUri(it) }
+        result.addAll(actionResources
+        )
+        return result
+    }
+
+    private fun toServingUri(file: String): String = "http://${System.getenv("AMA_NODE")}:${config.webserver().Port()}/$file"
+
+    private fun createScalarResource(name: String, value: Double): Protos.Resource {
+        return Protos.Resource.newBuilder()
+                .setName(name)
+                .setType(Protos.Value.Type.SCALAR)
+                .setScalar(Protos.Value.Scalar.newBuilder().setValue(value)).build()
+    }
+}
\ No newline at end of file
diff --git a/leader/src/main/scripts/ama-start-mesos.sh b/leader-mesos/src/main/scripts/ama-start-mesos.sh
similarity index 98%
rename from leader/src/main/scripts/ama-start-mesos.sh
rename to leader-mesos/src/main/scripts/ama-start-mesos.sh
index f12ad5c..5b9e59c 100755
--- a/leader/src/main/scripts/ama-start-mesos.sh
+++ b/leader-mesos/src/main/scripts/ama-start-mesos.sh
@@ -108,7 +108,7 @@
 done
 
 echo "repo: ${REPO} "
-CMD="java -cp ${CP} -Djava.library.path=/usr/lib org.apache.amaterasu.leader.mesos.MesosJobLauncher --home ${BASEDIR}"
+CMD="java -cp ${CP} -Djava.library.path=/usr/lib org.apache.amaterasu.leader.mesos.Client --home ${BASEDIR}"
 
 if [ -n "$REPO" ]; then
     CMD+=" --repo ${REPO}"
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
index e89345b..b94135b 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -136,8 +136,8 @@
 
         val items = mutableListOf<FrameworkSetupProvider>()
 
-        for (p in frameworkFactory.providers().values()) {
-            items.add(p)
+        for (provider in frameworkFactory.providers.values) {
+            items.add(provider)
         }
         val configItems = items.flatMap { it.configurationItems.asIterable() }
         configManager = ConfigManager(env, "repo", configItems)
@@ -178,7 +178,7 @@
     private fun initJob(opts: AmaOpts) {
 
         this.env = opts.env
-        frameworkFactory = FrameworkProvidersFactory.apply(env, config)
+        frameworkFactory = FrameworkProvidersFactory(env, config)
 
         try {
             val retryPolicy = ExponentialBackoffRetry(1000, 3)
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt
index 0a0e8d7..ba5b3e6 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt
@@ -138,8 +138,8 @@
                 }
 
                 // setup frameworks
-                val frameworkFactory = FrameworkProvidersFactory.apply(opts.env, config)
-                for (group in frameworkFactory.groups()) {
+                val frameworkFactory = FrameworkProvidersFactory(opts.env, config)
+                for (group in frameworkFactory.groups) {
                     val framework = frameworkFactory.getFramework(group)
                     for (file in framework.groupResources) {
                         if (file.exists())
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
index b9f1e67..bc3c321 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
@@ -22,7 +22,7 @@
 class ClientArgsParser(val args: Array<String>): ArgsParser() {
 
     override fun run() {
-        var opts = AmaOpts(repo, branch, env, name, jobId, newJobId, report, home)
+        val opts = AmaOpts(repo, branch, env, name, jobId, newJobId, report, home)
         val client = Client()
         client.run(opts, args)
     }
diff --git a/leader/src/main/scripts/ama-start-yarn.sh b/leader-yarn/src/main/scripts/ama-start-yarn.sh
similarity index 100%
rename from leader/src/main/scripts/ama-start-yarn.sh
rename to leader-yarn/src/main/scripts/ama-start-yarn.sh
diff --git a/leader/build.gradle b/leader/build.gradle
deleted file mode 100644
index fd545c6..0000000
--- a/leader/build.gradle
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-plugins {
-    id "com.github.johnrengelman.shadow" version "1.2.4"
-    id 'com.github.maiflai.scalatest' version '0.22'
-    id 'scala'
-    id 'org.jetbrains.kotlin.jvm'
-    id 'java'
-}
-
-sourceCompatibility = 1.8
-targetCompatibility = 1.8
-
-shadowJar {
-    zip64 true
-}
-
-repositories {
-    maven {
-        url "https://plugins.gradle.org/m2/"
-    }
-    mavenCentral()
-}
-
-dependencies {
-    compile 'org.scala-lang:scala-library:2.11.8'
-    compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
-
-    compile project(':common')
-    compile project(':leader-common')
-    compile project(':amaterasu-sdk')
-    compile group: 'com.github.scopt', name: 'scopt_2.11', version: '3.3.0'
-    compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0'
-    compile group: 'org.apache.curator', name: 'curator-framework', version: '2.13.0'
-    compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.9.4'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
-    compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
-    compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.8'
-    compile group: 'org.eclipse.jetty', name: 'jetty-plus', version: '9.4.14.v20181114'
-    compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.4.14.v20181114'
-    compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.4.14.v20181114'
-    compile group: 'org.eclipse.jetty', name: 'jetty-io', version: '9.4.14.v20181114'
-    compile group: 'org.eclipse.jetty', name: 'jetty-servlet', version: '9.4.14.v20181114'
-    compile group: 'org.eclipse.jetty.toolchain', name: 'jetty-test-helper', version: '4.0'
-    compile group: 'org.yaml', name: 'snakeyaml', version: '1.23'
-    compile group: 'commons-cli', name: 'commons-cli', version: '1.2'
-    compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
-    compile group: 'org.scala-lang.modules', name: 'scala-async_2.11', version: '0.9.6'
-    compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
-    compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
-    compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
-    compile "org.jetbrains.kotlin:kotlin-reflect"
-
-    testCompile project(':common')
-    testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
-    testRuntime 'org.pegdown:pegdown:1.1.0'
-    testCompile 'junit:junit:4.11'
-    testCompile 'org.scalatest:scalatest_2.11:3.0.2'
-    testCompile 'org.scala-lang:scala-library:2.11.8'
-    testCompile 'org.apache.curator:curator-test:2.13.0'
-
-}
-
-
-sourceSets {
-    test {
-        resources.srcDirs += [file('src/test/resources')]
-    }
-
-    // this is done so Scala will compile before Java
-    main {
-        kotlin {
-            srcDirs = ['src/main/kotlin']
-        }
-        scala {
-            srcDirs = ['src/main/kotlin','src/main/java', 'src/main/scala']
-        }
-        java {
-            srcDirs = ['src/main/java']
-        }
-    }
-}
-
-test {
-    maxParallelForks = 1
-}
-
-task copyToHomeRoot(type: Copy) {
-    from 'src/main/scripts'
-    into '../build/amaterasu/'
-}
-
-task copyToHomeBin(type: Copy) {
-    dependsOn shadowJar
-    from 'build/libs'
-    into '../build/amaterasu/bin'
-}
-
-task copyToHome() {
-    dependsOn copyToHomeRoot
-    dependsOn copyToHomeBin
-}
-
-compileKotlin{
-    kotlinOptions.jvmTarget = "1.8"
-}
-
-compileTestKotlin {
-    kotlinOptions.jvmTarget = "1.8"
-}
-
-compileTestScala {
-    dependsOn compileScala
-}
-
-compileScala {
-    dependsOn compileJava
-    classpath += files(compileJava.destinationDir) + files(compileKotlin.destinationDir)
-}
-
-compileJava {
-    dependsOn compileKotlin
-}
\ No newline at end of file
diff --git a/leader/src/main/resources/log4j.properties b/leader/src/main/resources/log4j.properties
deleted file mode 100644
index 7ba668f..0000000
--- a/leader/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-# Root logger option
-log4j.rootLogger=DEBUG, stdout, file
-
-# Redirect log messages to console
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.out
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
deleted file mode 100755
index 98d2afa..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.mesos
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.leader.mesos.schedulers.JobScheduler
-import org.apache.amaterasu.leader.utilities.{Args, BaseJobLauncher}
-import org.apache.log4j.LogManager
-import org.apache.mesos.Protos.FrameworkID
-import org.apache.mesos.{MesosSchedulerDriver, Protos}
-
-/**
-  * The JobLauncher allows the execution of a single job, without creating a full
-  * Amaterasu cluster (no cluster scheduler).
-  */
-object MesosJobLauncher extends BaseJobLauncher {
-
-  override def run(arguments: Args, config: ClusterConfig, resume: Boolean): Unit = {
-    LogManager.resetConfiguration()
-    val frameworkBuilder = Protos.FrameworkInfo.newBuilder()
-      .setName(s"${arguments.name} - Amaterasu Job")
-      .setFailoverTimeout(config.timeout)
-      .setUser(config.user)
-
-    // TODO: test this
-    if (resume) {
-      frameworkBuilder.setId(FrameworkID.newBuilder().setValue(arguments.jobId))
-    }
-
-    val framework = frameworkBuilder.build()
-
-    val masterAddress = s"${config.master}:${config.masterPort}"
-
-    val scheduler = JobScheduler(
-      arguments.repo,
-      arguments.branch,
-      arguments.username,
-      arguments.password,
-      arguments.env,
-      resume,
-      config,
-      arguments.report,
-      arguments.home
-    )
-
-    val driver = new MesosSchedulerDriver(scheduler, framework, masterAddress)
-
-    log.debug(s"Connecting to master on: $masterAddress")
-    driver.run()
-  }
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala
deleted file mode 100755
index 2adff07..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.mesos.executors
-
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.mesos.Protos._
-import org.apache.mesos.{Executor, ExecutorDriver}
-
-object JobExecutor extends Logging with Executor {
-
-  override def shutdown(driver: ExecutorDriver): Unit = {}
-
-  override def disconnected(driver: ExecutorDriver): Unit = {}
-
-  override def killTask(driver: ExecutorDriver, taskId: TaskID): Unit = {}
-
-  override def reregistered(driver: ExecutorDriver, slaveInfo: SlaveInfo): Unit = {}
-
-  override def error(driver: ExecutorDriver, message: String): Unit = {}
-
-  override def frameworkMessage(driver: ExecutorDriver, data: Array[Byte]): Unit = {}
-
-  override def registered(driver: ExecutorDriver, executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo): Unit = {}
-
-  override def launchTask(driver: ExecutorDriver, task: TaskInfo): Unit = {
-
-    //val data = mapper.readValue(task.getData.toStringUtf8, JobData.getClass)
-
-  }
-
-  def main(args: Array[String]) {
-
-    val repo = args(0)
-
-  }
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala
deleted file mode 100755
index 13b2413..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.mesos.schedulers
-
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.mesos.Protos.{Resource, Value}
-import org.apache.mesos.Scheduler
-
-trait AmaterasuScheduler extends Logging with Scheduler {
-
-  def createScalarResource(name: String, value: Double): Resource = {
-    Resource.newBuilder
-      .setName(name)
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(value)).build()
-  }
-
-}
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
deleted file mode 100755
index 866d9e2..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ /dev/null
@@ -1,526 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.mesos.schedulers
-
-import java.io.{File, PrintWriter, StringWriter}
-import java.nio.file.{Files, Path, Paths, StandardCopyOption}
-import java.util
-import java.util.{Collections, UUID}
-import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.configuration.{ClusterConfig, ConfigManager}
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.common.execution.actions.Notification
-import org.apache.amaterasu.common.execution.actions.enums.{NotificationLevel, NotificationType}
-import org.apache.amaterasu.leader.common.execution.{JobLoader, JobManager}
-import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
-import org.apache.amaterasu.leader.common.utilities.DataLoader
-import org.apache.amaterasu.leader.utilities.HttpServer
-import org.apache.commons.io.FileUtils
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.log4j.LogManager
-import org.apache.mesos.Protos.CommandInfo.URI
-import org.apache.mesos.Protos.Environment.Variable
-import org.apache.mesos.Protos._
-import org.apache.mesos.protobuf.ByteString
-import org.apache.mesos.{Protos, SchedulerDriver}
-
-import scala.collection.JavaConverters._
-import scala.collection.concurrent
-import scala.collection.concurrent.TrieMap
-
-/**
-  * The JobScheduler is a Mesos implementation. It is in charge of scheduling the execution of
-  * Amaterasu actions for a specific job
-  */
-class JobScheduler extends AmaterasuScheduler {
-
-  /*private val props: Properties = new Properties(new File(""))
-  private val version = props.getProperty("version")
-  println(s"===> version  $version")*/
-  LogManager.resetConfiguration()
-  private var frameworkFactory: FrameworkProvidersFactory = _
-  private var configManager: ConfigManager = _
-  private var jobManager: JobManager = _
-  private var client: CuratorFramework = _
-  private var config: ClusterConfig = _
-  private var src: String = _
-  private var env: String = _
-  private var branch: String = _
-  private var userName: String = _
-  private var password: String = _
-  private var resume: Boolean = false
-  private var reportLevel: NotificationLevel = _
-
-  private val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
-  private val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist")
-
-  val slavesExecutors = new TrieMap[String, ExecutorInfo]
-  private var awsEnv: String = ""
-
-  // this map holds the following structure:
-  // slaveId
-  //  |
-  //  +-> taskId, actionStatus)
-  private val executionMap: concurrent.Map[String, concurrent.Map[String, ActionStatus]] = new ConcurrentHashMap[String, concurrent.Map[String, ActionStatus]].asScala
-  private val lock = new ReentrantLock()
-  private val offersToTaskIds: concurrent.Map[String, String] = new ConcurrentHashMap[String, String].asScala
-  private val taskIdsToActions: concurrent.Map[Protos.TaskID, String] = new ConcurrentHashMap[Protos.TaskID, String].asScala
-
-  private val mapper = new ObjectMapper()
-  mapper.registerModule(DefaultScalaModule)
-
-  private val yamlMapper = new ObjectMapper(new YAMLFactory())
-  yamlMapper.registerModule(DefaultScalaModule)
-
-  def error(driver: SchedulerDriver, message: String): Unit = {
-    log.error(s"===> $message")
-  }
-
-  def executorLost(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) {}
-
-  def slaveLost(driver: SchedulerDriver, slaveId: SlaveID) {}
-
-  def disconnected(driver: SchedulerDriver) {}
-
-  def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]): Unit = {
-
-    val notification = mapper.readValue(data, classOf[Notification])
-
-    reportLevel match {
-      case NotificationLevel.Code => printNotification(notification)
-      case NotificationLevel.Execution =>
-        if (notification.getNotLevel != NotificationLevel.Code)
-          printNotification(notification)
-      case _ =>
-    }
-
-  }
-
-  def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
-
-    val actionName = taskIdsToActions(status.getTaskId)
-    status.getState match {
-      case TaskState.TASK_STARTING => log.info("Task starting ...")
-      case TaskState.TASK_RUNNING => {
-        jobManager.actionStarted(status.getTaskId.getValue)
-        printNotification(new Notification("", s"created container for $actionName created", NotificationType.Info, NotificationLevel.Execution))
-
-      }
-      case TaskState.TASK_FINISHED => {
-        jobManager.actionComplete(status.getTaskId.getValue)
-        printNotification(new Notification("", s"Container ${status.getExecutorId.getValue} Complete with task ${status.getTaskId.getValue} with success.", NotificationType.Info, NotificationLevel.Execution))
-      }
-      case TaskState.TASK_FAILED |
-           TaskState.TASK_KILLED |
-           TaskState.TASK_ERROR |
-           TaskState.TASK_LOST => {
-        jobManager.actionFailed(status.getTaskId.getValue, status.getMessage)
-        printNotification(new Notification("", s"error launching container with ${status.getMessage} in ${status.getData.toStringUtf8}", NotificationType.Error, NotificationLevel.Execution))
-
-      }
-      case _ => log.warn("WTF? just got unexpected task state: " + status.getState)
-    }
-
-  }
-
-  def validateOffer(offer: Offer): Boolean = {
-
-    val resources = offer.getResourcesList.asScala
-
-    resources.count(r => r.getName == "cpus" && r.getScalar.getValue >= config.jobs.tasks.cpus) > 0 &&
-      resources.count(r => r.getName == "mem" && r.getScalar.getValue >= config.jobs.tasks.mem) > 0
-  }
-
-  def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {
-
-    val actionId = offersToTaskIds(offerId.getValue)
-    jobManager.reQueueAction(actionId)
-
-  }
-
-  def resourceOffers(driver: SchedulerDriver, offers: util.List[Offer]): Unit = {
-
-    println(jobManager.toString)
-
-    for (offer <- offers.asScala) {
-
-      if (validateOffer(offer)) {
-
-        log.info(s"Accepting offer, id=${offer.getId}")
-
-        // this is done to avoid the processing the same action
-        // multiple times
-        lock.lock()
-
-        try {
-          val actionData = jobManager.getNextActionData
-          if (actionData != null) {
-
-            frameworkFactory = FrameworkProvidersFactory(env, config)
-            val items = frameworkFactory.providers.values.flatMap(_.getConfigurationItems).toList.asJava
-            configManager = new ConfigManager(env, "repo", items)
-
-            val taskId = Protos.TaskID.newBuilder().setValue(actionData.getId).build()
-            taskIdsToActions.put(taskId, actionData.getName)
-            // setting up the configuration files for the container
-            val envYaml = configManager.getActionConfigContent(actionData.getName, actionData.getConfig)
-            writeConfigFile(envYaml, jobManager.getJobId, actionData.getName, "env.yaml")
-
-            val envConf = configManager.getActionConfiguration(actionData.getName, actionData.getConfig)
-            val dataStores = DataLoader.getTaskData(actionData, env).getExports
-            val writer = new StringWriter()
-            yamlMapper.writeValue(writer, dataStores)
-            val dataStoresYaml = writer.toString
-            writeConfigFile(dataStoresYaml, jobManager.getJobId, actionData.getName, "datastores.yaml")
-
-            writeConfigFile(s"jobId: ${jobManager.getJobId}\nactionName: ${actionData.getName}", jobManager.getJobId, actionData.getName, "runtime.yaml")
-
-            val datasets = DataLoader.getDatasets(env)
-            writeConfigFile(datasets, jobManager.getJobId, actionData.getName, "datasets.yaml")
-            offersToTaskIds.put(offer.getId.getValue, taskId.getValue)
-
-            // atomically adding a record for the slave, I'm storing all the actions
-            // on a slave level to efficiently handle slave loses
-            executionMap.putIfAbsent(offer.getSlaveId.toString, new ConcurrentHashMap[String, ActionStatus].asScala)
-
-            val slaveActions = executionMap(offer.getSlaveId.toString)
-            slaveActions.put(taskId.getValue, ActionStatus.Started)
-
-            val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
-
-            val runnerProvider = frameworkProvider.getRunnerProvider(actionData.getTypeId)
-
-            printNotification(new Notification("", s"provider ${runnerProvider.getClass.getName}", NotificationType.Info, NotificationLevel.Execution))
-            // searching for an executor that already exist on the slave, if non exist
-            // we create a new one
-            var executor: ExecutorInfo = null
-
-            //            val slaveId = offer.getSlaveId.getValue
-            //            slavesExecutors.synchronized {
-
-            val execData = DataLoader.getExecutorDataBytes(env, config)
-            val executorId = taskId.getValue + "-" + UUID.randomUUID()
-            //creating the command
-
-            //                          // TODO: move this into the runner provider somehow
-            //                          if(!actionData.getSrc.isEmpty){
-            //                            copy(get(s"repo/src/${actionData.getSrc}"), get(s"dist/${jobManager.getJobId}/${actionData.getName}/${actionData.getSrc}"), REPLACE_EXISTING)
-            //                          }
-            val commandStr = runnerProvider.getCommand(jobManager.getJobId, actionData, envConf, executorId, "")
-            printNotification(new Notification("", s"container command $commandStr", NotificationType.Info, NotificationLevel.Execution))
-
-            val command = CommandInfo
-              .newBuilder
-              .setValue(commandStr)
-            //              .addUris(URI.newBuilder
-            //                .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/executor-${config.version}-all.jar")
-            //                .setExecutable(false)
-            //                .setExtract(false)
-            //                .build())
-
-            // Getting framework (group) resources
-            log.info(s"===> groupResources: ${frameworkProvider.getGroupResources}")
-            frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder
-              .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/${f.getName}")
-              .setExecutable(false)
-              .setExtract(true)
-              .build()
-            ))
-
-            // Getting runner resources
-            runnerProvider.getRunnerResources.foreach(r => {
-              command.addUris(URI.newBuilder
-                .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
-                .setExecutable(false)
-                .setExtract(false)
-                .build())
-            })
-
-            // Getting action dependencies
-            runnerProvider.getActionDependencies(jobManager.getJobId, actionData).foreach(r => {
-
-              FileUtils.copyFile(new File(r), new File(s"dist/$r"))
-              command.addUris(URI.newBuilder
-                .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
-                .setExecutable(false)
-                .setExtract(false)
-                .build())
-            })
-
-            // Getting action specific resources
-            runnerProvider.getActionResources(jobManager.getJobId, actionData).foreach(r => command.addUris(URI.newBuilder
-              .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
-              .setExecutable(false)
-              .setExtract(false)
-              .build()))
-
-            // setting up action executable
-            val sourcePath = new File(runnerProvider.getActionExecutable(jobManager.getJobId, actionData))
-            var executable: Path = null
-            if (actionData.getHasArtifact) {
-              val relativePath = amaDist.toPath.getRoot.relativize(sourcePath.toPath)
-              executable = relativePath.subpath(amaDist.toPath.getNameCount, relativePath.getNameCount)
-            } else {
-              val dest = new File(s"dist/${jobManager.getJobId}/${sourcePath.toString}")
-              FileUtils.copyFile(sourcePath, dest)
-              executable = Paths.get(jobManager.getJobId, sourcePath.toPath.toString)
-            }
-
-            println(s"===> executable $executable")
-            command.addUris(URI.newBuilder
-              .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$executable")
-              .setExecutable(false)
-              .setExtract(false)
-              .build())
-
-            command
-              .addUris(URI.newBuilder()
-                .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/amaterasu.properties")
-                .setExecutable(false)
-                .setExtract(false)
-                .build())
-
-            // setting the processes environment variables
-            val envVarsList = frameworkProvider.getEnvironmentVariables.asScala.toList.map(x => Variable.newBuilder().setName(x._1).setValue(x._2).build()).asJava
-            command.setEnvironment(Environment.newBuilder().addAllVariables(envVarsList))
-
-            executor = ExecutorInfo
-              .newBuilder
-              .setData(ByteString.copyFrom(execData))
-              .setName(taskId.getValue)
-              .setExecutorId(ExecutorID.newBuilder().setValue(executorId))
-              .setCommand(command)
-
-              .build()
-
-            slavesExecutors.put(offer.getSlaveId.getValue, executor)
-
-
-            val driverConfiguration = frameworkProvider.getDriverConfiguration(configManager)
-
-            var actionTask: TaskInfo = null
-
-            if (runnerProvider.getHasExecutor) {
-              actionTask = TaskInfo
-                .newBuilder
-                .setName(taskId.getValue)
-                .setTaskId(taskId)
-                .setExecutor(executor)
-
-                .setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env)))
-                .addResources(createScalarResource("cpus", driverConfiguration.getCpus))
-                .addResources(createScalarResource("mem", driverConfiguration.getMemory))
-                .addResources(createScalarResource("disk", config.jobs.repoSize))
-                .setSlaveId(offer.getSlaveId)
-                .build()
-
-              //driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
-            }
-            else {
-              actionTask = TaskInfo
-                .newBuilder
-                .setName(taskId.getValue)
-                .setTaskId(taskId)
-                .setCommand(command)
-
-                //.setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env)))
-                .addResources(createScalarResource("cpus", driverConfiguration.getCpus))
-                .addResources(createScalarResource("mem", driverConfiguration.getMemory))
-                .addResources(createScalarResource("disk", config.jobs.repoSize))
-                .setSlaveId(offer.getSlaveId)
-                .build()
-
-              //driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
-            }
-
-            printNotification(new Notification("", s"requesting container for ${actionData.getName}", NotificationType.Info, NotificationLevel.Execution))
-            driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
-
-          }
-          else if (jobManager.getOutOfActions) {
-            log.info(s"framework ${jobManager.getJobId} execution finished")
-
-            val repo = new File("repo/")
-            repo.delete()
-
-            HttpServer.stop()
-            driver.declineOffer(offer.getId)
-            driver.stop()
-            sys.exit()
-          }
-          else {
-            log.info("Declining offer, no action ready for execution")
-            driver.declineOffer(offer.getId)
-          }
-        }
-        finally {
-          lock.unlock()
-        }
-      }
-      else {
-        log.info("Declining offer, no sufficient resources")
-        driver.declineOffer(offer.getId)
-      }
-
-    }
-
-  }
-
-  def registered(driver: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo): Unit = {
-
-    if (!resume) {
-
-      jobManager = JobLoader.loadJob(
-        src,
-        branch,
-        frameworkId.getValue,
-        userName,
-        password,
-        client,
-        config.jobs.tasks.attempts,
-        new LinkedBlockingQueue[ActionData]()
-      )
-    }
-    else {
-
-      JobLoader.reloadJob(
-        frameworkId.getValue,
-        userName,
-        password,
-        client,
-        config.jobs.tasks.attempts,
-        new LinkedBlockingQueue[ActionData]()
-      )
-
-    }
-
-
-
-
-    jobManager.start()
-
-    createJobDir(jobManager.getJobId)
-
-  }
-
-  def reregistered(driver: SchedulerDriver, masterInfo: Protos.MasterInfo) {}
-
-  def printNotification(notification: Notification): Unit = {
-
-    var color = Console.WHITE
-
-    notification.getNotType match {
-
-      case NotificationType.Info =>
-        color = Console.WHITE
-        println(s"$color${Console.BOLD}===> ${notification.getMsg} ${Console.RESET}")
-      case NotificationType.Success =>
-        color = Console.GREEN
-        println(s"$color${Console.BOLD}===> ${notification.getLine} ${Console.RESET}")
-      case NotificationType.Error =>
-        color = Console.RED
-        println(s"$color${Console.BOLD}===> ${notification.getLine} ${Console.RESET}")
-        println(s"$color${Console.BOLD}===> ${notification.getMsg} ${Console.RESET}")
-
-    }
-
-  }
-
-  private def createJobDir(jobId: String): Unit = {
-    val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
-    val amaHome = new File(jarFile.getParent).getParent
-    val jobDir = s"$amaHome/dist/$jobId/"
-
-    val dir = new File(jobDir)
-    if (!dir.exists()) {
-      dir.mkdir()
-    }
-  }
-
-  /**
-    * This function creates an action specific env.yml file int the dist folder with the following path:
-    * dist/{jobId}/{actionName}/env.yml to be added to the container
-    *
-    * @param configuration A YAML string to be written to the env file
-    * @param jobId         the jobId
-    * @param actionName    the name of the action
-    */
-  def writeConfigFile(configuration: String, jobId: String, actionName: String, fileName: String): Unit = {
-    val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
-    val amaHome = new File(jarFile.getParent).getParent
-    val envLocation = s"$amaHome/dist/$jobId/$actionName/"
-
-    val dir = new File(envLocation)
-    if (!dir.exists()) {
-      dir.mkdirs()
-    }
-
-    new PrintWriter(s"$envLocation/$fileName") {
-      write(configuration)
-      close
-    }
-  }
-}
-
-object JobScheduler {
-
-  def apply(src: String,
-            branch: String,
-            username: String,
-            password: String,
-            env: String,
-            resume: Boolean,
-            config: ClusterConfig,
-            report: String,
-            home: String): JobScheduler = {
-
-    LogManager.resetConfiguration()
-    val scheduler = new JobScheduler()
-
-    HttpServer.start(config.webserver.Port, s"$home/${config.webserver.Root}")
-
-    if (sys.env.get("AWS_ACCESS_KEY_ID").isDefined &&
-      sys.env.get("AWS_SECRET_ACCESS_KEY").isDefined) {
-
-      scheduler.awsEnv = s"env AWS_ACCESS_KEY_ID=${sys.env("AWS_ACCESS_KEY_ID")} env AWS_SECRET_ACCESS_KEY=${sys.env("AWS_SECRET_ACCESS_KEY")}"
-    }
-
-    scheduler.resume = resume
-    scheduler.src = src
-    scheduler.branch = branch
-    scheduler.userName = username
-    scheduler.password = password
-    scheduler.env = env
-    scheduler.reportLevel = NotificationLevel.valueOf(report.capitalize)
-
-    val retryPolicy = new ExponentialBackoffRetry(1000, 3)
-    scheduler.client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy)
-    scheduler.client.start()
-    scheduler.config = config
-
-    scheduler
-
-  }
-
-}
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala
deleted file mode 100644
index 8364456..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.utilities
-
-case class Args(
-                 repo: String = "",
-                 branch: String = "master",
-                 env: String = "default",
-                 name: String = "amaterasu-job",
-                 jobId: String = null,
-                 report: String = "code",
-                 home: String = "",
-                 newJobId: String = "",
-                 username: String = "",
-                 password: String = ""
-               ) {
-  def toCmdString: String = {
-    var cmd = s""" --repo $repo --branch $branch --env $env --name $name --report $report --home $home"""
-    if (!username.isEmpty && !password.isEmpty) {
-      cmd += s" --user-name $username --password $password"
-    }
-    if(jobId != null && !jobId.isEmpty) {
-      cmd += s" --job-id $jobId"
-    }
-    cmd
-  }
-
-  override def toString: String = {
-    toCmdString
-  }
-}
-
-object Args {
-  def getParser: scopt.OptionParser[Args] = {
-    val pack = this.getClass.getPackage
-    new scopt.OptionParser[Args]("amaterasu job") {
-
-      head("amaterasu job", if(pack == null) "DEVELOPMENT" else pack.getImplementationVersion)
-
-      opt[String]('r', "repo") action { (x, c) =>
-        c.copy(repo = x)
-      } text "The git repo containing the job"
-
-      opt[String]('b', "branch") action { (x, c) =>
-        c.copy(branch = x)
-      } text "The branch to be executed (default is master)"
-
-      opt[String]('e', "env") action { (x, c) =>
-        c.copy(env = x)
-      } text "The environment to be executed (test, prod, etc. values from the default env are taken if np env specified)"
-
-      opt[String]('n', "name") action { (x, c) =>
-        c.copy(name = x)
-      } text "The name of the job"
-
-       opt[String]('i', "job-id") action { (x, c) =>
-        c.copy(jobId = x)
-      } text "The jobId - should be passed only when resuming a job"
-
-      opt[String]('j', "new-job-id") action { (x, c) =>
-        c.copy(newJobId = x)
-      } text "A new jobId - should never be passed by a user"
-
-      opt[String]('r', "report") action { (x, c) =>
-        c.copy(report = x)
-      }  text "The level of reporting"
-
-      opt[String]('h', "home") action { (x, c) =>
-        c.copy(home = x)
-      }
-
-      opt[String]('u', "user-name") action { (x, c) =>
-        c.copy(username = x)
-      }
-
-      opt[String]('p', "password") action { (x, c) =>
-        c.copy(password = x)
-      }
-    }
-  }
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
deleted file mode 100644
index 38c90c7..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.utilities
-
-import java.io.FileInputStream
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.logging.Logging
-
-
-abstract class BaseJobLauncher extends Logging with App {
-
-  def run(args: Args, config: ClusterConfig, resume: Boolean): Unit = ???
-
-  val parser = Args.getParser
-  parser.parse(args, Args()) match {
-
-    case Some(arguments: Args) =>
-
-      val config = ClusterConfig(new FileInputStream(s"${arguments.home}/amaterasu.properties"))
-      val resume = arguments.jobId != null
-
-      run(arguments, config, resume)
-
-    case None =>
-    // arguments are bad, error message will have been displayed
-  }
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala
deleted file mode 100644
index 4aacd38..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.utilities
-
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.log4j.{BasicConfigurator, Level, Logger}
-import org.eclipse.jetty.server.handler._
-import org.eclipse.jetty.server.{Server, ServerConnector}
-import org.eclipse.jetty.util.log.StdErrLog
-import org.jsoup.Jsoup
-import org.jsoup.select.Elements
-
-import scala.collection.JavaConverters._
-import scala.io.{BufferedSource, Source}
-
-/**
-  * Implementation of Jetty Web server to server Amaterasu libraries and other distribution files
-  */
-object HttpServer extends Logging {
-
-  var server: Server = _
-
-  def start(port: String, serverRoot: String): Unit = {
-
-    BasicConfigurator.configure()
-    initLogging()
-
-    server = new Server()
-    val connector = new ServerConnector(server)
-    connector.setPort(port.toInt)
-    server.addConnector(connector)
-
-    val handler = new ResourceHandler()
-    handler.setDirectoriesListed(true)
-    handler.setWelcomeFiles(Array[String]("index.html"))
-    handler.setResourceBase(serverRoot)
-    val handlers = new HandlerList()
-    handlers.setHandlers(Array(handler, new DefaultHandler()))
-
-    server.setHandler(handlers)
-    server.start()
-
-  }
-
-  def stop() {
-    if (server == null) throw new IllegalStateException("Server not Started")
-
-    server.stop()
-    server = null
-  }
-
-  def initLogging(): Unit = {
-    System.setProperty("org.eclipse.jetty.util.log.class", classOf[StdErrLog].getName)
-    Logger.getLogger("org.eclipse.jetty").setLevel(Level.ALL)
-    Logger.getLogger("org.eclipse.jetty.websocket").setLevel(Level.OFF)
-  }
-
-  /*
-  Method: getFilesInDirectory
-  Description: provides a list of files in the given directory URL.
-  @Params: amaNode: Hostname of the URL, port: Port # of the host, directory: destination directory to fetch files
-  Note: Should the files in URL root be fetched, provide an empty value to directory.
-   */
-  def getFilesInDirectory(amaNode: String, port: String, directory: String = ""): Array[String] = {
-    println("http://" + amaNode + ":" + port + "/" + directory)
-    val html: BufferedSource = Source.fromURL("http://" + amaNode + ":" + port + "/" + directory)
-    println(html)
-    val htmlDoc = Jsoup.parse(html.mkString)
-    val htmlElement: Elements = htmlDoc.body().select("a")
-    val files = htmlElement.asScala
-    val fileNames = files.map(url => url.attr("href")).filter(file => !file.contains("..")).map(name => name.replace("/", "")).toArray
-    fileNames
-  }
-}
\ No newline at end of file
diff --git a/leader/src/test/resources/amaterasu.properties b/leader/src/test/resources/amaterasu.properties
deleted file mode 100755
index 96e3724..0000000
--- a/leader/src/test/resources/amaterasu.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one or more
-#  contributor license agreements.  See the NOTICE file distributed with
-#  this work for additional information regarding copyright ownership.
-#  The ASF licenses this file to You under the Apache License, Version 2.0
-#  (the "License"); you may not use this file except in compliance with
-#  the License.  You may obtain a copy of the License at
-#
-#       http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-zk=127.0.0.1
-version=0.2.0-incubating-rc4
-master=10.0.0.31
-user=root
-mode=yarn
-webserver.port=8000
-webserver.root=dist
-spark.version=2.2.1-bin-hadoop2.7
-yarn.queue=default
-yarn.jarspath=hdfs:///apps/amaterasu
-spark.home=/usr/lib/spark
-#spark.home=/opt/cloudera/parcels/SPARK2-2.1.0.cloudera2-1.cdh5.7.0.p0.171658/lib/spark2
-yarn.hadoop.home.dir=/etc/hadoop
-spark.opts.spark.yarn.am.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
-spark.opts.spark.driver.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
-yarn.master.memoryMB=2048
-yarn.worker.memoryMB=2048
-mesos.libPath=/usr/local/lib/libmesos.dylib
\ No newline at end of file
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala
deleted file mode 100755
index ca9186c..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution
-
-import java.util
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.execution.actions.SequentialAction
-import org.apache.curator.framework.CuratorFrameworkFactory
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.test.TestingServer
-import org.apache.zookeeper.CreateMode
-import org.scalatest.{DoNotDiscover, FlatSpec, Matchers}
-
-import scala.collection.JavaConverters._
-
-class ActionStatusTests extends FlatSpec with Matchers {
-
-  // setting up a testing zookeeper server (curator TestServer)
-  val retryPolicy = new ExponentialBackoffRetry(1000, 3)
-  val server = new TestingServer(2181, true)
-  val jobId = s"job_${System.currentTimeMillis}"
-  val data = new ActionData(ActionStatus.Pending, "test_action", "start.scala", "", "spark","scala", "0000001", new util.HashMap() , List[String]().asJava)
-
-  "an Action" should "queue it's ActionData int the job queue when executed" in {
-
-    val queue = new LinkedBlockingQueue[ActionData]()
-    // val config = ClusterConfig()
-
-    val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
-    client.start()
-
-    client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
-    val action = new SequentialAction(data.getName, data.getSrc, "", data.getGroupId, data.getTypeId, Map.empty[String, String].asJava, jobId, queue, client, 1)
-
-    action.execute()
-    queue.peek().getName should be(data.getName)
-    queue.peek().getSrc should be(data.getSrc)
-
-  }
-
-  it should "also create a sequential znode for the task with the value of Queued" in {
-
-    val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
-    client.start()
-
-    val taskStatus = client.getData.forPath(s"/$jobId/task-0000000000")
-
-    taskStatus should not be null
-    new String(taskStatus) should be("Queued")
-
-  }
-
-}
\ No newline at end of file
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
deleted file mode 100755
index 28b1ce3..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.dsl.JobParser
-import org.apache.amaterasu.leader.common.execution.actions.Action
-import org.apache.curator.framework.CuratorFrameworkFactory
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.test.TestingServer
-import org.apache.zookeeper.CreateMode
-import org.scalatest.{FlatSpec, Matchers}
-
-import scala.io.Source
-
-class JobExecutionTests extends FlatSpec with Matchers {
-
-  val retryPolicy = new ExponentialBackoffRetry(1000, 3)
-  val server = new TestingServer(2183, true)
-  val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
-  client.start()
-
-  val jobId = s"job_${System.currentTimeMillis}"
-  val yaml = Source.fromURL(getClass.getResource("/simple-maki.yml")).mkString
-  val queue = new LinkedBlockingQueue[ActionData]()
-
-  // this will be performed by the job bootstraper
-
-  client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
-  //  client.setData().forPath(s"/$jobId/src",src.getBytes)
-  //  client.setData().forPath(s"/$jobId/branch", branch.getBytes)
-
-
-  val job = JobParser.parse(jobId, yaml, queue, client, 1)
-
-  "a job" should "queue the first action when the JobManager.start method is called " in {
-
-    job.start
-
-    queue.peek.getName should be("start")
-
-    // making sure that the status is reflected in zk
-    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
-    new String(actionStatus) should be("Queued")
-
-  }
-
-  it should "return the start action when calling getNextAction and dequeue it" in {
-
-    job.getNextActionData.getName should be("start")
-    queue.size should be(0)
-
-    // making sure that the status is reflected in zk
-    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
-    new String(actionStatus) should be("Started")
-  }
-
-  it should "not be out of actions when an action is still Pending" in {
-    job.getOutOfActions should be(false)
-  }
-
-  it should "be requeued correctly" in {
-    job.reQueueAction("0000000000")
-    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
-    new String(new String(actionStatus)) should be("Queued")
-  }
-
-  it should "should be able to restart" in {
-
-    val data = job.getNextActionData
-
-    data.getName should be("start")
-
-    // making sure that the status is reflected in zk
-    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
-    new String(actionStatus) should be("Started")
-  }
-
-  it should "be marked as Complete when the actionComplete method is called" in {
-
-    job.actionComplete("0000000000")
-
-    // making sure that the status is reflected in zk
-    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
-
-    new String(new String(actionStatus)) should be("Complete")
-
-  }
-
-  "the next step2 job" should "be Queued as a result of the completion" in {
-
-    queue.peek.getName should be("step2")
-
-    // making sure that the status is reflected in zk
-    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
-    new String(actionStatus) should be("Queued")
-
-  }
-
-  it should "be marked as Started when JobManager.getNextActionData is called" in {
-
-    val data = job.getNextActionData
-
-    data.getName should be("step2")
-
-    // making sure that the status is reflected in zk
-    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
-    new String(actionStatus) should be("Started")
-  }
-
-  it should "be marked as Failed when JobManager.actionFailed is called" in {
-
-    job.actionFailed("0000000001", "test failure")
-    queue.peek.getName should be("error-action")
-  }
-
-  "an ErrorAction" should "be queued if one exist" in {
-    // making sure that the status is reflected in zk
-    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001-error")
-    new String(actionStatus) should be("Queued")
-
-    // and returned by getNextActionData
-    val data = job.getNextActionData
-
-  }
-
-  it should "be marked as Complete when the actionComplete method is called" in {
-
-    job.actionComplete("0000000001-error")
-
-    // making sure that the status is reflected in zk
-    val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001-error")
-
-    new String(new String(actionStatus)) should be("Complete")
-
-  }
-
-  it should " be out of actions when all actions have been executed" in {
-
-    job.getOutOfActions should be(true)
-  }
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
deleted file mode 100755
index 5987b35..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.dsl.JobParser
-import org.apache.curator.framework.CuratorFrameworkFactory
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.test.TestingServer
-import org.apache.zookeeper.CreateMode
-import org.scalatest.{FlatSpec, Matchers}
-
-import scala.io.Source
-
-class JobParserTests extends FlatSpec with Matchers {
-
-  val retryPolicy = new ExponentialBackoffRetry(1000, 3)
-  val server = new TestingServer(2187, true)
-  val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
-  client.start()
-
-  private val jobId = s"job_${System.currentTimeMillis}"
-  private val yaml = Source.fromURL(getClass.getResource("/simple-maki.yml")).mkString
-  private val queue = new LinkedBlockingQueue[ActionData]()
-
-  // this will be performed by the job bootstrapper
-  client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
-
-  private val job = JobParser.parse(jobId, yaml, queue, client, 1)
-
-  "JobParser" should "parse the simple-maki.yml" in {
-
-    job.getName should be("amaterasu-test")
-
-  }
-
-  //TODO: I suspect this test is not indicative, and that order is assured need to verify this
-  it should "also have two actions in the right order" in {
-
-    job.getRegisteredActions.size should be(3)
-
-    job.getRegisteredActions.get("0000000000").data.getName should be("start")
-    job.getRegisteredActions.get("0000000001").data.getName should be("step2")
-    job.getRegisteredActions.get("0000000001-error").data.getName should be("error-action")
-
-  }
-
-  it should "Action 'config' is parsed successfully" in {
-
-    job.getRegisteredActions.size should be(3)
-
-    job.getRegisteredActions.get("0000000000").data.getConfig should be("start-cfg.yaml")
-    job.getRegisteredActions.get("0000000001").data.getConfig should be("step2-cfg.yaml")
-    job.getRegisteredActions.get("0000000001-error").data.getConfig should be("error-cfg.yaml")
-
-  }
-
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala
deleted file mode 100755
index 235eccb..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.execution.{JobLoader, JobManager}
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.test.TestingServer
-import org.apache.zookeeper.CreateMode
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import scala.io.Source
-
-class JobRestoreTests extends FlatSpec with Matchers with BeforeAndAfterEach {
-
-  val retryPolicy = new ExponentialBackoffRetry(1000, 3)
-  val server = new TestingServer(2184, true)
-  var client: CuratorFramework = null
-
-  val jobId = s"job_${System.currentTimeMillis}"
-  val maki = Source.fromURL(getClass.getResource("/simple-maki.yml")).mkString
-  val queue = new LinkedBlockingQueue[ActionData]()
-
-  var manager: JobManager = null
-
-  client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
-  client.start()
-
-  override def beforeEach(): Unit = {
-
-    // creating the jobs znode and storing the source repo and branch
-    client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
-    client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId/src", "".getBytes)
-    client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId/branch", "".getBytes)
-
-    manager = JobLoader.createJobManager(maki, jobId, client, 3, queue)
-
-  }
-
-  override def afterEach(): Unit = {
-
-    client.delete().deletingChildrenIfNeeded().forPath(s"/$jobId")
-
-  }
-
-  "a restored job" should "have all Queued actions in the executionQueue" in {
-
-    // setting task-0000000002 as Queued
-    client.setData().forPath(s"/${jobId}/task-0000000002", ActionStatus.Queued.toString.getBytes)
-
-    JobLoader.restoreJobState(manager, jobId, client)
-
-    queue.peek.getName should be("start")
-  }
-
-  "a restored job" should "have all Started actions in the executionQueue" in {
-
-    // setting task-0000000002 as Queued
-    client.setData().forPath(s"/${jobId}/task-0000000002", ActionStatus.Started.toString.getBytes)
-
-    JobLoader.restoreJobState(manager, jobId, client)
-
-    queue.peek.getName should be("start")
-  }
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala b/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala
deleted file mode 100755
index 1e9b48b..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.integration
-
-import org.apache.amaterasu.leader.common.dsl.GitUtil
-import org.scalatest.{FlatSpec, Matchers}
-
-import scala.reflect.io.Path
-
-class GitTests extends FlatSpec with Matchers {
-
-  "GitUtil.cloneRepo" should "clone the sample job git repo" in {
-
-    val path = Path("repo")
-    path.deleteRecursively()
-
-    GitUtil.cloneRepo("https://github.com/shintoio/amaterasu-job-sample.git", "master", "", "")
-
-    val exists = new java.io.File("repo/maki.yml").exists
-    exists should be(true)
-  }
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/leader/mesos/MesosTestUtil.scala b/leader/src/test/scala/org/apache/amaterasu/leader/mesos/MesosTestUtil.scala
deleted file mode 100755
index 0397b87..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/leader/mesos/MesosTestUtil.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.mesos
-
-import java.util
-import java.util.UUID
-
-import org.apache.mesos.Protos._
-
-
-object MesosTestUtil {
-  def createOffer(mem: Double, disk: Double, cpus: Int): Offer = {
-
-    val resources = new util.ArrayList[Resource]()
-    resources.add(createScalarResource("mem", mem))
-    resources.add(createScalarResource("disk", disk))
-    resources.add(createScalarResource("cpus", cpus))
-
-    Offer.newBuilder()
-      .setId(OfferID.newBuilder().setValue(UUID.randomUUID.toString))
-      .setFrameworkId(FrameworkID.newBuilder().setValue("Amaterasu"))
-      .setSlaveId(SlaveID.newBuilder().setValue(UUID.randomUUID.toString))
-      .setHostname("localhost")
-      .addAllResources(resources)
-
-      .build()
-
-  }
-
-  def createScalarResource(name: String, value: Double): org.apache.mesos.Protos.Resource = {
-
-    org.apache.mesos.Protos.Resource.newBuilder().setName(name).setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder.setValue(value).build()).build()
-
-  }
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
deleted file mode 100644
index 7eba1da..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.utilities
-
-
-import java.io.File
-
-import org.scalatest.{FlatSpec, Matchers}
-
-
-class HttpServerTests extends FlatSpec with Matchers {
-
-  // this is an ugly hack, getClass.getResource("/").getPath should have worked but
-  // stopped working when we moved to gradle :(
-  val resources: String = new File(getClass.getResource("/simple-maki.yml").getPath).getParent
-
-  //  "Jetty Web server" should "start HTTP server, serve content and stop successfully" in {
-  //
-  //    var data = ""
-  //    try {
-  //      HttpServer.start("8000", resources)
-  //      val html = Source.fromURL("http://localhost:8000/jetty-test-data.txt")
-  //      data = html.mkString
-  //    }
-  //    finally {
-  //      HttpServer.stop()
-  //    }
-  //    data should equal("This is a test file to download from Jetty webserver")
-  //  }
-
-//  "Jetty File server with '/' as root" should "start HTTP server, serve content and stop successfully" in {
-//
-//    var urlCount: Int = 0
-//    println("resource location" + resources)
-//    try {
-//      HttpServer.start("8000", resources)
-//      val urls = HttpServer.getFilesInDirectory("127.0.0.1", "8000", "dist")
-//      urls.foreach(println)
-//      urlCount = urls.length
-//    } catch {
-//      case e: Exception => println(s"++++>> ${e.getMessage}")
-//    }
-//    finally {
-//      HttpServer.stop()
-//    }
-//    urlCount should equal(2)
-//  }
-//
-//  "Jetty File server with 'dist' as root" should "start HTTP server, serve content and stop successfully" in {
-//    var data = ""
-//    var urlCount: Int = 0
-//    println("resource location" + resources)
-//    try {
-//      HttpServer.start("8000", resources + "/dist")
-//      val urls = HttpServer.getFilesInDirectory("localhost", "8000", "")
-//      urls.foreach(println)
-//      urlCount = urls.length
-//    }
-//    finally {
-//      HttpServer.stop()
-//    }
-//    urlCount should equal(2)
-//  }
-}
diff --git a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt
index f68922e..6b0859f 100644
--- a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt
+++ b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt
@@ -26,11 +26,11 @@
 
     val groupIdentifier: String
 
-    val groupResources: Array<File>
+    val groupResources: List<File>
 
     val environmentVariables: Map<String, String>
 
-    val configurationItems: Array<String>
+    val configurationItems: List<String>
 
     fun init(env: String, conf: ClusterConfig)
 
diff --git a/settings.gradle b/settings.gradle
index 9e757dc..cc5161b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -16,8 +16,8 @@
  */
 
 // Core
-include 'leader'
-project(':leader')
+//include 'leader'
+//project(':leader')
 
 include 'leader-common'
 project(':leader-common')
@@ -32,9 +32,6 @@
 include 'common'
 project(':common')
 
-include 'executor'
-project(':executor')
-
 include 'sdk'
 findProject(':sdk')?.name = 'amaterasu-sdk'