removed deprecated leader and executor projects
diff --git a/executor/build.gradle b/executor/build.gradle
deleted file mode 100644
index 443fc38..0000000
--- a/executor/build.gradle
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-plugins {
- id 'com.github.johnrengelman.shadow' version '1.2.4'
- id 'com.github.maiflai.scalatest' version '0.22'
- id 'scala'
- id 'java'
-}
-
-shadowJar {
- zip64 true
-}
-
-//sourceCompatibility = 1.8
-//targetCompatibility = 1.8
-
-repositories {
- maven {
- url "https://plugins.gradle.org/m2/"
- }
- mavenCentral()
-}
-
-test {
- maxParallelForks = 1
- forkEvery = 1
-}
-
-configurations {
- provided
-}
-
-sourceSets {
- main.compileClasspath += configurations.provided
- test.compileClasspath += configurations.provided
- test.runtimeClasspath += configurations.provided
-}
-
-dependencies {
-
- compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8'
- compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
- compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final'
- compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
- compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
- compile group: 'org.reflections', name: 'reflections', version: '0.9.10'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
- compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.9.8'
-
- compile('com.jcabi:jcabi-aether:0.10.1') {
- exclude group: 'org.jboss.netty'
- }
-
-
- compile project(':common')
- compile project(':amaterasu-sdk')
-
-
-
-}
-
-sourceSets {
- test {
- resources.srcDirs += [file('src/test/resources')]
- }
-
- // this is done so Scala will compile before Java
- main {
- scala {
- srcDirs = ['src/main/scala', 'src/main/java']
- }
- java {
- srcDirs = []
- }
- }
-}
-
-test {
-
- maxParallelForks = 1
-}
-
-task copyToHome(type: Copy) {
- dependsOn shadowJar
- from 'build/libs'
- into '../build/amaterasu/dist'
- from 'build/resources/main'
- into '../build/amaterasu/dist'
-}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
deleted file mode 100644
index 5f78bc2..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.common.executors
-
-import java.io.ByteArrayOutputStream
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.ExecData
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider}
-import org.reflections.Reflections
-
-import scala.collection.JavaConversions._
-
-//TODO: Check if we can use this in the YARN impl
-class ProvidersFactory {
-
- var providers: Map[String, RunnersProvider] = _
-
- def getRunner(groupId: String, id: String): Option[AmaterasuRunner] = {
- val provider = providers.get(groupId)
- provider match {
- case Some(provider) => Some(provider.getRunner(id))
- case None => None
- }
- }
-}
-
-object ProvidersFactory {
-
- def apply(data: ExecData,
- jobId: String,
- outStream: ByteArrayOutputStream,
- notifier: Notifier,
- executorId: String,
- hostName: String,
- propFile: String = null): ProvidersFactory = {
-
- val result = new ProvidersFactory()
- val reflections = new Reflections(getClass.getClassLoader)
- val runnerTypes = reflections.getSubTypesOf(classOf[RunnersProvider]).toSet
- val config = if (propFile != null) {
- import java.io.FileInputStream
- ClusterConfig.apply(new FileInputStream(propFile))
- } else {
- new ClusterConfig()
- }
-
- result.providers = runnerTypes.map(r => {
-
- val provider = Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[RunnersProvider]
-
- provider.init(data, jobId, outStream, notifier, executorId, config, hostName)
- notifier.info(s"a provider for group ${provider.getGroupIdentifier} was created")
- (provider.getGroupIdentifier, provider)
- }).toMap
-
- result
- }
-
-}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
deleted file mode 100755
index f204db8..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.mesos.executors
-
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.mesos.Protos._
-import org.apache.mesos.protobuf.ByteString
-import org.apache.mesos.{Executor, ExecutorDriver, MesosExecutorDriver}
-
-import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-class MesosActionsExecutor extends Logging with Executor {
-
- var master: String = _
- var executorDriver: ExecutorDriver = _
- var jobId: String = _
- var actionName: String = _
- // var sparkScalaRunner: SparkScalaRunner = _
- // var pySparkRunner: PySparkRunner = _
- var notifier: MesosNotifier = _
- var providersFactory: ProvidersFactory = _
-
- val mapper = new ObjectMapper()
- mapper.registerModule(DefaultScalaModule)
-
-
- override def shutdown(driver: ExecutorDriver) = {
-
- }
-
- override def killTask(driver: ExecutorDriver, taskId: TaskID) = ???
-
- override def disconnected(driver: ExecutorDriver) = ???
-
- override def reregistered(driver: ExecutorDriver, slaveInfo: SlaveInfo) = {
- this.executorDriver = driver
- }
-
- override def error(driver: ExecutorDriver, message: String) = {
-
- val status = TaskStatus.newBuilder
- .setData(ByteString.copyFromUtf8(message))
- .setState(TaskState.TASK_ERROR).build()
-
- driver.sendStatusUpdate(status)
-
- }
-
- override def frameworkMessage(driver: ExecutorDriver, data: Array[Byte]) = ???
-
- override def registered(driver: ExecutorDriver, executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo): Unit = {
-
- this.executorDriver = driver
- val data = mapper.readValue(new ByteArrayInputStream(executorInfo.getData.toByteArray), classOf[ExecData])
-
- // this is used to resolve the spark drier address
- val hostName = slaveInfo.getHostname
- notifier = new MesosNotifier(driver)
- notifier.info(s"Executor ${executorInfo.getExecutorId.getValue} registered")
- val outStream = new ByteArrayOutputStream()
- providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName, "./amaterasu.properties")
-
- }
-
- override def launchTask(driver: ExecutorDriver, taskInfo: TaskInfo): Unit = {
-
-
- notifier.info(s"launching task: ${taskInfo.getTaskId.getValue}")
- log.debug(s"launching task: $taskInfo")
- val status = TaskStatus.newBuilder
- .setTaskId(taskInfo.getTaskId)
- .setState(TaskState.TASK_STARTING).build()
-
- notifier.info(s"container info: ${taskInfo.getContainer.getDocker.getImage}, ${taskInfo.getContainer.getType}")
-
- driver.sendStatusUpdate(status)
-
- val task = Future {
-
- val taskData = mapper.readValue(new ByteArrayInputStream(taskInfo.getData.toByteArray), classOf[TaskData])
-
- val status = TaskStatus.newBuilder
- .setTaskId(taskInfo.getTaskId)
- .setState(TaskState.TASK_RUNNING).build()
- driver.sendStatusUpdate(status)
- val runner = providersFactory.getRunner(taskData.getGroupId, taskData.getTypeId)
- runner match {
- case Some(r) => r.executeSource(taskData.getSrc, actionName, taskData.getExports)
- case None =>
- notifier.error("", s"Runner not found for group: ${taskData.getGroupId}, type ${taskData.getTypeId}. Please verify the tasks")
- None
- }
-
- }
-
- task onComplete {
-
- case Failure(t) =>
- println(s"launching task Failed: ${t.getMessage}")
- System.exit(1)
-
- case Success(ts) =>
-
- driver.sendStatusUpdate(TaskStatus.newBuilder()
- .setTaskId(taskInfo.getTaskId)
- .setState(TaskState.TASK_FINISHED).build())
- notifier.info(s"Complete task: ${taskInfo.getTaskId.getValue}")
-
- }
-
- }
-
-}
-
-object MesosActionsExecutor extends Logging {
-
- def main(args: Array[String]) {
- System.loadLibrary("mesos")
- log.debug("Starting a new ActionExecutor")
-
- val executor = new MesosActionsExecutor
- executor.jobId = args(0)
- executor.master = args(1)
- executor.actionName = args(2)
-
- val driver = new MesosExecutorDriver(executor)
- driver.run()
- }
-
-}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala
deleted file mode 100755
index b256386..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.mesos.executors
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.execution.actions.enums.{NotificationLevel, NotificationType}
-import org.apache.amaterasu.common.execution.actions.{Notification, Notifier}
-import org.apache.mesos.ExecutorDriver
-
-
-class MesosNotifier(driver: ExecutorDriver) extends Notifier {
-
- private val mapper = new ObjectMapper()
- mapper.registerModule(DefaultScalaModule)
-
- override def success(line: String): Unit = {
-
-
- getLog.info(s"successfully executed line: $line")
-
- val notification = new Notification(line, "", NotificationType.Success, NotificationLevel.Code)
- val msg = mapper.writeValueAsBytes(notification)
-
- driver.sendFrameworkMessage(msg)
-
- }
-
- override def error(line: String, message: String): Unit = {
-
- getLog.error(s"Error executing line: $line message: $message")
-
- val notification = new Notification(line, message, NotificationType.Error, NotificationLevel.Code)
- val msg = mapper.writeValueAsBytes(notification)
-
- driver.sendFrameworkMessage(msg)
-
- }
-
- override def info(message: String): Unit = {
-
- getLog.info(message)
-
- val notification = new Notification("", message, NotificationType.Info, NotificationLevel.Execution)
- val msg = mapper.writeValueAsBytes(notification)
-
- driver.sendFrameworkMessage(msg)
- }
-}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
deleted file mode 100644
index cda6351..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.yarn.executors
-
-import java.io.ByteArrayOutputStream
-import java.net.{InetAddress, URLDecoder}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.utils.ActiveNotifier
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-
-import scala.collection.JavaConverters._
-
-
-class ActionsExecutor extends Logging {
-
- var master: String = _
- var jobId: String = _
- var actionName: String = _
- var taskData: TaskData = _
- var execData: ExecData = _
- var providersFactory: ProvidersFactory = _
-
- def execute(): Unit = {
- val runner = providersFactory.getRunner(taskData.getGroupId, taskData.getTypeId)
- runner match {
- case Some(r) => {
- try {
- r.executeSource(taskData.getSrc, actionName, taskData.getExports)
- log.info("Completed action")
- System.exit(0)
- } catch {
- case e: Exception => {
- log.error("Exception in execute source", e)
- System.exit(100)
- }
- }
- }
- case None =>
- log.error("", s"Runner not found for group: ${taskData.getGroupId}, type ${taskData.getTypeId}. Please verify the tasks")
- System.exit(101)
- }
- }
-}
-
-object ActionsExecutorLauncher extends Logging with App {
-
- val hostName = InetAddress.getLocalHost.getHostName
-
- println(s"Hostname resolved to: $hostName")
- val mapper = new ObjectMapper()
- mapper.registerModule(DefaultScalaModule)
-
- log.info("Starting actions executor")
-
- val jobId = this.args(0)
- val master = this.args(1)
- val actionName = this.args(2)
- val notificationsAddress = this.args(6)
-
- log.info("parsing task data")
- val taskData = mapper.readValue(URLDecoder.decode(this.args(3), "UTF-8"), classOf[TaskData])
- log.info("parsing executor data")
- val execData = mapper.readValue(URLDecoder.decode(this.args(4), "UTF-8"), classOf[ExecData])
- val taskIdAndContainerId = this.args(5)
-
- val actionsExecutor: ActionsExecutor = new ActionsExecutor
- actionsExecutor.master = master
- actionsExecutor.actionName = actionName
- actionsExecutor.taskData = taskData
- actionsExecutor.execData = execData
-
- log.info("Setup executor")
- val baos = new ByteArrayOutputStream()
- val notifier = new ActiveNotifier(notificationsAddress)
-
- actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, notifier, taskIdAndContainerId, hostName, propFile = "./amaterasu.properties")
- actionsExecutor.execute()
-}
diff --git a/executor/src/test/resources/codegen.pyc b/executor/src/test/resources/codegen.pyc
deleted file mode 100644
index ab32046..0000000
--- a/executor/src/test/resources/codegen.pyc
+++ /dev/null
Binary files differ
diff --git a/frameworks/python/dispatcher/build.gradle b/frameworks/python/dispatcher/build.gradle
index b1cbd39..35ab5aa 100644
--- a/frameworks/python/dispatcher/build.gradle
+++ b/frameworks/python/dispatcher/build.gradle
@@ -71,7 +71,8 @@
dependencies {
compile project(':common')
compile project(':amaterasu-sdk')
- compile project(':leader')
+ compile project(':leader-common')
+
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect"
compile 'com.uchuhimo:konf:0.11'
diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
index 394dd06..d5a691e 100644
--- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
+++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
@@ -30,18 +30,14 @@
private var conf: ClusterConfig? = null
private var runnerProviders: Map<String, RunnerSetupProvider> = mapOf()
- override val groupIdentifier: String
- get() = "python"
- override val groupResources: List<File>
- get() = listOf()
+ override val groupIdentifier: String = "python"
+ override val groupResources: List<File> = listOf()
override fun getDriverConfiguration(configManager: ConfigManager): DriverConfiguration {
return DriverConfiguration(conf!!.taskMem(), 1) //TODO: this should be configured on env level
}
- override val environmentVariables: Map<String, String>
- get() = mapOf()
- override val configurationItems: List<String>
- get() = listOf()
+ override val environmentVariables: Map<String, String> = mapOf()
+ override val configurationItems: List<String> = listOf()
override fun init(env: String, conf: ClusterConfig) {
this.env = env
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
index 0a2863b..0f63e1d 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
@@ -90,7 +90,7 @@
val status = ActionStatus.valueOf(String(client.data.forPath("/$jobId/$task")))
if (status == ActionStatus.Queued || status == ActionStatus.Started) {
- jobManager.reQueueAction(task.substring(task.indexOf("task-") + 5))
+ jobManager.requeueAction(task.substring(task.indexOf("task-") + 5))
}
}
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
index 9d117ec..3c77ba2 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
@@ -72,7 +72,7 @@
return nextAction
}
- fun reQueueAction(actionId: String) {
+ fun requeueAction(actionId: String) {
log.info("requeing action $actionId")
registeredActions.forEach { log.info("key ${it.key}") }
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/ActionStatusTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/ActionStatusTests.kt
new file mode 100644
index 0000000..c7c3d99
--- /dev/null
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/ActionStatusTests.kt
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.dsl
+
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.leader.common.execution.actions.SequentialAction
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.test.TestingServer
+import org.apache.zookeeper.CreateMode
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import java.util.concurrent.LinkedBlockingQueue
+import kotlin.test.assertEquals
+import kotlin.test.assertNotNull
+
+class ActionStatusTests : Spek({
+
+ // setting up a testing zookeeper server (curator TestServer)
+ val retryPolicy = ExponentialBackoffRetry(1000, 3)
+ val server = TestingServer(2181, true)
+ val jobId = "job_${System.currentTimeMillis()}"
+
+
+ given("an action") {
+
+ val data = ActionData(ActionStatus.Pending, "test_action", "start.scala", "", "spark", "scala", "0000001", hashMapOf(), mutableListOf())
+ val queue = LinkedBlockingQueue<ActionData>()
+
+ val client = CuratorFrameworkFactory.newClient(server.connectString, retryPolicy)
+ client.start()
+
+ client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId")
+ val action = SequentialAction(data.name, data.src, "", data.groupId, data.typeId, mapOf(), jobId, queue, client, 1)
+
+ it("should queue it's ActionData int the job queue when executed") {
+ action.execute()
+ assertEquals(queue.peek().name, data.name)
+ assertEquals(queue.peek().src, data.src)
+ }
+
+ it("should also create a sequential znode for the task with the value of Queued") {
+ val taskStatus = client.data.forPath("/$jobId/task-0000000000")
+
+ assertNotNull(taskStatus)
+ assertEquals(String(taskStatus), "Queued")
+
+ }
+ }
+})
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobExecutionTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobExecutionTests.kt
new file mode 100644
index 0000000..af3c9d9
--- /dev/null
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobExecutionTests.kt
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.dsl
+
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.test.TestingServer
+import org.apache.zookeeper.CreateMode
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import java.util.concurrent.LinkedBlockingQueue
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+
+class JobExecutionTests : Spek({
+ val retryPolicy = ExponentialBackoffRetry(1000, 3)
+ val server = TestingServer(2183, true)
+ val client = CuratorFrameworkFactory.newClient(server.connectString, retryPolicy)
+ client.start()
+
+ val jobId = "job_${System.currentTimeMillis()}"
+
+ val yaml = this::class.java.getResource("/simple-maki.yml").readText()
+ val queue = LinkedBlockingQueue<ActionData>()
+ client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId")
+
+ given("a job parsed in to a JobManager") {
+ val job = JobParser.parse(jobId, yaml, queue, client, 1)
+
+ it("queue the first action when the JobManager.start method is called ") {
+ job.start()
+
+ assertEquals(queue.peek().name, "start")
+
+ // making sure that the status is reflected in zk
+ val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+ assertEquals(String(actionStatus), "Queued")
+ }
+
+ it("return the start action when calling getNextAction and dequeue it") {
+ assertEquals(job.nextActionData!!.name, "start")
+ assertEquals(queue.size, 0)
+ }
+
+ it("also changes the status of start to started") {
+ val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+ assertEquals(String(actionStatus), "Started")
+ }
+
+ it("is not out of actions when an action is still Pending") {
+ assertFalse { job.outOfActions }
+ }
+
+ it("will requeue a task when calling JobManager.requeueAction") {
+ job.requeueAction("0000000000")
+ val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+ assertEquals(String(actionStatus), "Queued")
+ }
+
+ it("will restart the task") {
+ val data = job.nextActionData
+ assertEquals(data!!.name, "start")
+
+ // making sure that the status is reflected in zk
+ val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+ assertEquals(String(actionStatus), "Started")
+ }
+
+ it("will mark the action as Complete when the actionComplete method is called") {
+ job.actionComplete("0000000000")
+ // making sure that the status is reflected in zk
+ val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+
+ assertEquals(String(actionStatus), "Complete")
+ }
+
+ on("completion of start, the next action step2 is queued") {
+ assertEquals(queue.peek().name, "step2")
+ }
+
+ it("should also be registered as queued in zk") {
+ val actionStatus = client.data.forPath("/$jobId/task-0000000001")
+ assertEquals(String(actionStatus), "Queued")
+ }
+
+ it("is marked as Started when JobManager.nextActionData is called") {
+ val data = job.nextActionData
+ assertEquals(data!!.name, "step2")
+ }
+
+ it("will start an error action JobManager.actionFailed is called") {
+ job.actionFailed("0000000001", "test failure")
+
+ assertEquals(queue.peek().name, "error-action")
+ }
+
+ it("marks the error action as Complete when the actionComplete method is called"){
+ job.actionComplete("0000000001-error")
+
+ // making sure that the status is reflected in zk
+ val actionStatus = client.data.forPath("/$jobId/task-0000000001-error")
+ assertEquals(String(actionStatus) ,"Complete")
+ }
+
+ it("will be out of actions when all actions have been executed"){
+ assertTrue { job.outOfActions }
+ }
+ }
+
+})
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobParserTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobParserTests.kt
new file mode 100644
index 0000000..9e4f53b
--- /dev/null
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobParserTests.kt
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.dsl
+
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.test.TestingServer
+import org.apache.zookeeper.CreateMode
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import java.util.concurrent.LinkedBlockingQueue
+import kotlin.test.assertEquals
+
+class JobParserTests : Spek({
+
+ val retryPolicy = ExponentialBackoffRetry(1000, 3)
+ val server = TestingServer(2187, true)
+ val client = CuratorFrameworkFactory.newClient(server.connectString, retryPolicy)
+ client.start()
+
+ val jobId = "job_${System.currentTimeMillis()}"
+ val queue = LinkedBlockingQueue<ActionData>()
+
+ // this will be performed by the job bootstrapper
+ client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId")
+
+ given("a simple-maki.yaml that is passed to a JobParser") {
+
+ val yaml = this::class.java.getResource("/simple-maki.yml").readText()
+ val job = JobParser.parse(jobId, yaml, queue, client, 1)
+
+ it("parse the job details correctly") {
+ assertEquals(job.name, "amaterasu-test")
+ }
+
+ it("also have two actions in the right order") {
+
+ assertEquals(job.registeredActions.size, 3)
+
+ assertEquals(job.registeredActions["0000000000"]!!.data.name, "start")
+ assertEquals(job.registeredActions["0000000001"]!!.data.name, "step2")
+ assertEquals(job.registeredActions["0000000001-error"]!!.data.name, "error-action")
+
+ }
+
+ it("also also parse action 'config' successfully") {
+ assertEquals(job.registeredActions["0000000000"]!!.data.config, "start-cfg.yaml")
+ assertEquals(job.registeredActions["0000000001"]!!.data.config, "step2-cfg.yaml")
+ assertEquals(job.registeredActions["0000000001-error"]!!.data.config, "error-cfg.yaml")
+ }
+ }
+})
\ No newline at end of file
diff --git a/leader/src/test/resources/simple-maki.yml b/leader-common/src/test/resources/simple-maki.yml
similarity index 100%
rename from leader/src/test/resources/simple-maki.yml
rename to leader-common/src/test/resources/simple-maki.yml
diff --git a/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/JobScheduler.kt b/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/JobScheduler.kt
index ef40003..f19d0fa 100644
--- a/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/JobScheduler.kt
+++ b/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/JobScheduler.kt
@@ -314,7 +314,7 @@
override fun offerRescinded(driver: SchedulerDriver?, offerId: Protos.OfferID?) {
offerId?.let {
val actionId = offersToTaskIds[it.value]
- jobManager.reQueueAction(actionId!!)
+ jobManager.requeueAction(actionId!!)
}
}
diff --git a/leader/build.gradle b/leader/build.gradle
deleted file mode 100644
index fd545c6..0000000
--- a/leader/build.gradle
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-plugins {
- id "com.github.johnrengelman.shadow" version "1.2.4"
- id 'com.github.maiflai.scalatest' version '0.22'
- id 'scala'
- id 'org.jetbrains.kotlin.jvm'
- id 'java'
-}
-
-sourceCompatibility = 1.8
-targetCompatibility = 1.8
-
-shadowJar {
- zip64 true
-}
-
-repositories {
- maven {
- url "https://plugins.gradle.org/m2/"
- }
- mavenCentral()
-}
-
-dependencies {
- compile 'org.scala-lang:scala-library:2.11.8'
- compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
-
- compile project(':common')
- compile project(':leader-common')
- compile project(':amaterasu-sdk')
- compile group: 'com.github.scopt', name: 'scopt_2.11', version: '3.3.0'
- compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0'
- compile group: 'org.apache.curator', name: 'curator-framework', version: '2.13.0'
- compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.9.4'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
- compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.8'
- compile group: 'org.eclipse.jetty', name: 'jetty-plus', version: '9.4.14.v20181114'
- compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.4.14.v20181114'
- compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.4.14.v20181114'
- compile group: 'org.eclipse.jetty', name: 'jetty-io', version: '9.4.14.v20181114'
- compile group: 'org.eclipse.jetty', name: 'jetty-servlet', version: '9.4.14.v20181114'
- compile group: 'org.eclipse.jetty.toolchain', name: 'jetty-test-helper', version: '4.0'
- compile group: 'org.yaml', name: 'snakeyaml', version: '1.23'
- compile group: 'commons-cli', name: 'commons-cli', version: '1.2'
- compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
- compile group: 'org.scala-lang.modules', name: 'scala-async_2.11', version: '0.9.6'
- compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
- compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
- compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
- compile "org.jetbrains.kotlin:kotlin-reflect"
-
- testCompile project(':common')
- testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
- testRuntime 'org.pegdown:pegdown:1.1.0'
- testCompile 'junit:junit:4.11'
- testCompile 'org.scalatest:scalatest_2.11:3.0.2'
- testCompile 'org.scala-lang:scala-library:2.11.8'
- testCompile 'org.apache.curator:curator-test:2.13.0'
-
-}
-
-
-sourceSets {
- test {
- resources.srcDirs += [file('src/test/resources')]
- }
-
- // this is done so Scala will compile before Java
- main {
- kotlin {
- srcDirs = ['src/main/kotlin']
- }
- scala {
- srcDirs = ['src/main/kotlin','src/main/java', 'src/main/scala']
- }
- java {
- srcDirs = ['src/main/java']
- }
- }
-}
-
-test {
- maxParallelForks = 1
-}
-
-task copyToHomeRoot(type: Copy) {
- from 'src/main/scripts'
- into '../build/amaterasu/'
-}
-
-task copyToHomeBin(type: Copy) {
- dependsOn shadowJar
- from 'build/libs'
- into '../build/amaterasu/bin'
-}
-
-task copyToHome() {
- dependsOn copyToHomeRoot
- dependsOn copyToHomeBin
-}
-
-compileKotlin{
- kotlinOptions.jvmTarget = "1.8"
-}
-
-compileTestKotlin {
- kotlinOptions.jvmTarget = "1.8"
-}
-
-compileTestScala {
- dependsOn compileScala
-}
-
-compileScala {
- dependsOn compileJava
- classpath += files(compileJava.destinationDir) + files(compileKotlin.destinationDir)
-}
-
-compileJava {
- dependsOn compileKotlin
-}
\ No newline at end of file
diff --git a/leader/src/main/resources/log4j.properties b/leader/src/main/resources/log4j.properties
deleted file mode 100644
index 7ba668f..0000000
--- a/leader/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Root logger option
-log4j.rootLogger=DEBUG, stdout, file
-
-# Redirect log messages to console
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.out
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
deleted file mode 100755
index 0473c58..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package org.apache.amaterasu.leader.mesos
-//
-//import org.apache.amaterasu.common.configuration.ClusterConfig
-//import org.apache.amaterasu.leader.mesos.schedulers.JobScheduler
-//import org.apache.amaterasu.leader.utilities.{Args, BaseJobLauncher}
-//import org.apache.log4j.LogManager
-//import org.apache.mesos.Protos.FrameworkID
-//import org.apache.mesos.{MesosSchedulerDriver, Protos}
-//
-///**
-// * The JobLauncher allows the execution of a single job, without creating a full
-// * Amaterasu cluster (no cluster scheduler).
-// */
-//object MesosJobLauncher extends BaseJobLauncher {
-//
-// override def run(arguments: Args, config: ClusterConfig, resume: Boolean): Unit = {
-// LogManager.resetConfiguration()
-// val frameworkBuilder = Protos.FrameworkInfo.newBuilder()
-// .setName(s"${arguments.name} - Amaterasu Job")
-// .setFailoverTimeout(config.timeout)
-// .setUser(config.user)
-//
-// // TODO: test this
-// if (resume) {
-// frameworkBuilder.setId(FrameworkID.newBuilder().setValue(arguments.jobId))
-// }
-//
-// val framework = frameworkBuilder.build()
-//
-// val masterAddress = s"${config.master}:${config.masterPort}"
-//
-// val scheduler = JobScheduler(
-// arguments.repo,
-// arguments.branch,
-// arguments.username,
-// arguments.password,
-// arguments.env,
-// resume,
-// config,
-// arguments.report,
-// arguments.home
-// )
-//
-// val driver = new MesosSchedulerDriver(scheduler, framework, masterAddress)
-//
-// log.debug(s"Connecting to master on: $masterAddress")
-// driver.run()
-// }
-//}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala
deleted file mode 100755
index 2adff07..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.mesos.executors
-
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.mesos.Protos._
-import org.apache.mesos.{Executor, ExecutorDriver}
-
-object JobExecutor extends Logging with Executor {
-
- override def shutdown(driver: ExecutorDriver): Unit = {}
-
- override def disconnected(driver: ExecutorDriver): Unit = {}
-
- override def killTask(driver: ExecutorDriver, taskId: TaskID): Unit = {}
-
- override def reregistered(driver: ExecutorDriver, slaveInfo: SlaveInfo): Unit = {}
-
- override def error(driver: ExecutorDriver, message: String): Unit = {}
-
- override def frameworkMessage(driver: ExecutorDriver, data: Array[Byte]): Unit = {}
-
- override def registered(driver: ExecutorDriver, executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo): Unit = {}
-
- override def launchTask(driver: ExecutorDriver, task: TaskInfo): Unit = {
-
- //val data = mapper.readValue(task.getData.toStringUtf8, JobData.getClass)
-
- }
-
- def main(args: Array[String]) {
-
- val repo = args(0)
-
- }
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala
deleted file mode 100755
index 13b2413..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.mesos.schedulers
-
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.mesos.Protos.{Resource, Value}
-import org.apache.mesos.Scheduler
-
-trait AmaterasuScheduler extends Logging with Scheduler {
-
- def createScalarResource(name: String, value: Double): Resource = {
- Resource.newBuilder
- .setName(name)
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(value)).build()
- }
-
-}
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
deleted file mode 100755
index 5f05468..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ /dev/null
@@ -1,525 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package org.apache.amaterasu.leader.mesos.schedulers
-//
-//import java.io.{File, PrintWriter, StringWriter}
-//import java.nio.file.{Files, Path, Paths, StandardCopyOption}
-//import java.util
-//import java.util.{Collections, UUID}
-//import java.util.concurrent.locks.ReentrantLock
-//import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
-//
-//import com.fasterxml.jackson.databind.ObjectMapper
-//import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
-//import com.fasterxml.jackson.module.scala.DefaultScalaModule
-//import org.apache.amaterasu.common.configuration.{ClusterConfig, ConfigManager}
-//import org.apache.amaterasu.common.configuration.enums.ActionStatus
-//import org.apache.amaterasu.common.dataobjects.ActionData
-//import org.apache.amaterasu.common.execution.actions.Notification
-//import org.apache.amaterasu.common.execution.actions.enums.{NotificationLevel, NotificationType}
-//import org.apache.amaterasu.leader.common.execution.{JobLoader, JobManager}
-//import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
-//import org.apache.amaterasu.leader.common.utilities.DataLoader
-//import org.apache.amaterasu.leader.utilities.HttpServer
-//import org.apache.commons.io.FileUtils
-//import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-//import org.apache.curator.retry.ExponentialBackoffRetry
-//import org.apache.log4j.LogManager
-//import org.apache.mesos.Protos.CommandInfo.URI
-//import org.apache.mesos.Protos.Environment.Variable
-//import org.apache.mesos.Protos._
-//import org.apache.mesos.protobuf.ByteString
-//import org.apache.mesos.{Protos, SchedulerDriver}
-//
-//import scala.collection.JavaConverters._
-//import scala.collection.concurrent
-//import scala.collection.concurrent.TrieMap
-//
-///**
-// * The JobScheduler is a Mesos implementation. It is in charge of scheduling the execution of
-// * Amaterasu actions for a specific job
-// */
-//class JobScheduler extends AmaterasuScheduler {
-//
-// /*private val props: Properties = new Properties(new File(""))
-// private val version = props.getProperty("version")
-// println(s"===> version $version")*/
-// LogManager.resetConfiguration()
-// private var frameworkFactory: FrameworkProvidersFactory = _
-// private var configManager: ConfigManager = _
-// private var jobManager: JobManager = _
-// private var client: CuratorFramework = _
-// private var config: ClusterConfig = _
-// private var src: String = _
-// private var env: String = _
-// private var branch: String = _
-// private var userName: String = _
-// private var password: String = _
-// private var resume: Boolean = false
-// private var reportLevel: NotificationLevel = _
-//
-// private val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
-// private val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist")
-//
-// val slavesExecutors = new TrieMap[String, ExecutorInfo]
-// private var awsEnv: String = ""
-//
-// // this map holds the following structure:
-// // slaveId
-// // |
-// // +-> taskId, actionStatus)
-// private val executionMap: concurrent.Map[String, concurrent.Map[String, ActionStatus]] = new ConcurrentHashMap[String, concurrent.Map[String, ActionStatus]].asScala
-// private val lock = new ReentrantLock()
-// private val offersToTaskIds: concurrent.Map[String, String] = new ConcurrentHashMap[String, String].asScala
-// private val taskIdsToActions: concurrent.Map[Protos.TaskID, String] = new ConcurrentHashMap[Protos.TaskID, String].asScala
-//
-// private val mapper = new ObjectMapper()
-// mapper.registerModule(DefaultScalaModule)
-//
-// private val yamlMapper = new ObjectMapper(new YAMLFactory())
-// yamlMapper.registerModule(DefaultScalaModule)
-//
-// def error(driver: SchedulerDriver, message: String): Unit = {
-// log.error(s"===> $message")
-// }
-//
-// def executorLost(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) {}
-//
-// def slaveLost(driver: SchedulerDriver, slaveId: SlaveID) {}
-//
-// def disconnected(driver: SchedulerDriver) {}
-//
-// def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]): Unit = {
-//
-// val notification = mapper.readValue(data, classOf[Notification])
-//
-// reportLevel match {
-// case NotificationLevel.Code => printNotification(notification)
-// case NotificationLevel.Execution =>
-// if (notification.getNotLevel != NotificationLevel.Code)
-// printNotification(notification)
-// case _ =>
-// }
-//
-// }
-//
-// def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
-//
-// val actionName = taskIdsToActions(status.getTaskId)
-// status.getState match {
-// case TaskState.TASK_STARTING => log.info("Task starting ...")
-// case TaskState.TASK_RUNNING => {
-// jobManager.actionStarted(status.getTaskId.getValue)
-// printNotification(new Notification("", s"created container for $actionName created", NotificationType.Info, NotificationLevel.Execution))
-//
-// }
-// case TaskState.TASK_FINISHED => {
-// jobManager.actionComplete(status.getTaskId.getValue)
-// printNotification(new Notification("", s"Container ${status.getExecutorId.getValue} Complete with task ${status.getTaskId.getValue} with success.", NotificationType.Info, NotificationLevel.Execution))
-// }
-// case TaskState.TASK_FAILED |
-// TaskState.TASK_KILLED |
-// TaskState.TASK_ERROR |
-// TaskState.TASK_LOST => {
-// jobManager.actionFailed(status.getTaskId.getValue, status.getMessage)
-// printNotification(new Notification("", s"error launching container with ${status.getMessage} in ${status.getData.toStringUtf8}", NotificationType.Error, NotificationLevel.Execution))
-//
-// }
-// case _ => log.warn("WTF? just got unexpected task state: " + status.getState)
-// }
-//
-// }
-//
-// def validateOffer(offer: Offer): Boolean = {
-//
-// val resources = offer.getResourcesList.asScala
-//
-// resources.count(r => r.getName == "cpus" && r.getScalar.getValue >= config.jobs.tasks.cpus) > 0 &&
-// resources.count(r => r.getName == "mem" && r.getScalar.getValue >= config.jobs.tasks.mem) > 0
-// }
-//
-// def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {
-//
-// val actionId = offersToTaskIds(offerId.getValue)
-// jobManager.reQueueAction(actionId)
-//
-// }
-//
-// def resourceOffers(driver: SchedulerDriver, offers: util.List[Offer]): Unit = {
-//
-// println(jobManager.toString)
-//
-// for (offer <- offers.asScala) {
-//
-// if (validateOffer(offer)) {
-//
-// log.info(s"Accepting offer, id=${offer.getId}")
-//
-// // this is done to avoid the processing the same action
-// // multiple times
-// lock.lock()
-//
-// try {
-// val actionData = jobManager.getNextActionData
-// if (actionData != null) {
-//
-// frameworkFactory = FrameworkProvidersFactory(env, config)
-// val items = frameworkFactory.providers.values.flatMap(_.getConfigurationItems).toList.asJava
-// configManager = new ConfigManager(env, "repo", items)
-//
-// val taskId = Protos.TaskID.newBuilder().setValue(actionData.getId).build()
-// taskIdsToActions.put(taskId, actionData.getName)
-// // setting up the configuration files for the container
-// val envYaml = configManager.getActionConfigContent(actionData.getName, actionData.getConfig)
-// writeConfigFile(envYaml, jobManager.getJobId, actionData.getName, "env.yaml")
-//
-// val envConf = configManager.getActionConfiguration(actionData.getName, actionData.getConfig)
-// val dataStores = DataLoader.getTaskData(actionData, env).getExports
-// val writer = new StringWriter()
-// yamlMapper.writeValue(writer, dataStores)
-// val dataStoresYaml = writer.toString
-// writeConfigFile(dataStoresYaml, jobManager.getJobId, actionData.getName, "datastores.yaml")
-//
-// writeConfigFile(s"jobId: ${jobManager.getJobId}\nactionName: ${actionData.getName}", jobManager.getJobId, actionData.getName, "runtime.yaml")
-//
-// val datasets = DataLoader.getDatasets(env)
-// writeConfigFile(datasets, jobManager.getJobId, actionData.getName, "datasets.yaml")
-// offersToTaskIds.put(offer.getId.getValue, taskId.getValue)
-//
-// // atomically adding a record for the slave, I'm storing all the actions
-// // on a slave level to efficiently handle slave loses
-// executionMap.putIfAbsent(offer.getSlaveId.toString, new ConcurrentHashMap[String, ActionStatus].asScala)
-//
-// val slaveActions = executionMap(offer.getSlaveId.toString)
-// slaveActions.put(taskId.getValue, ActionStatus.Started)
-//
-// val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
-//
-// val runnerProvider = frameworkProvider.getRunnerProvider(actionData.getTypeId)
-//
-// printNotification(new Notification("", s"provider ${runnerProvider.getClass.getName}", NotificationType.Info, NotificationLevel.Execution))
-// // searching for an executor that already exist on the slave, if non exist
-// // we create a new one
-// var executor: ExecutorInfo = null
-//
-// // val slaveId = offer.getSlaveId.getValue
-// // slavesExecutors.synchronized {
-//
-// val execData = DataLoader.getExecutorDataBytes(env, config)
-// val executorId = taskId.getValue + "-" + UUID.randomUUID()
-// //creating the command
-//
-// // // TODO: move this into the runner provider somehow
-// // if(!actionData.getSrc.isEmpty){
-// // copy(get(s"repo/src/${actionData.getSrc}"), get(s"dist/${jobManager.getJobId}/${actionData.getName}/${actionData.getSrc}"), REPLACE_EXISTING)
-// // }
-// val commandStr = runnerProvider.getCommand(jobManager.getJobId, actionData, envConf, executorId, "")
-// printNotification(new Notification("", s"container command $commandStr", NotificationType.Info, NotificationLevel.Execution))
-//
-// val command = CommandInfo
-// .newBuilder
-// .setValue(commandStr)
-// // .addUris(URI.newBuilder
-// // .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/executor-${config.version}-all.jar")
-// // .setExecutable(false)
-// // .setExtract(false)
-// // .build())
-//
-// // Getting framework (group) resources
-// log.info(s"===> groupResources: ${frameworkProvider.getGroupResources}")
-// frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder
-// .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/${f.getName}")
-// .setExecutable(false)
-// .setExtract(true)
-// .build()
-// ))
-//
-// // Getting runner resources
-// runnerProvider.getRunnerResources.foreach(r => {
-// command.addUris(URI.newBuilder
-// .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
-// .setExecutable(false)
-// .setExtract(false)
-// .build())
-// })
-//
-// // Getting action dependencies
-// runnerProvider.getActionDependencies(jobManager.getJobId, actionData).foreach(r => {
-//
-// FileUtils.copyFile(new File(r), new File(s"dist/$r"))
-// command.addUris(URI.newBuilder
-// .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
-// .setExecutable(false)
-// .setExtract(false)
-// .build())
-// })
-//
-// // Getting action specific resources
-// runnerProvider.getActionResources(jobManager.getJobId, actionData).foreach(r => command.addUris(URI.newBuilder
-// .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
-// .setExecutable(false)
-// .setExtract(false)
-// .build()))
-//
-// // setting up action executable
-// val sourcePath = new File(runnerProvider.getActionExecutable(jobManager.getJobId, actionData))
-// var executable: Path = null
-// if (actionData.getHasArtifact) {
-// val relativePath = amaDist.toPath.getRoot.relativize(sourcePath.toPath)
-// executable = relativePath.subpath(amaDist.toPath.getNameCount, relativePath.getNameCount)
-// } else {
-// val dest = new File(s"dist/${jobManager.getJobId}/${sourcePath.toString}")
-// FileUtils.copyFile(sourcePath, dest)
-// executable = Paths.get(jobManager.getJobId, sourcePath.toPath.toString)
-// }
-//
-// println(s"===> executable $executable")
-// command.addUris(URI.newBuilder
-// .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$executable")
-// .setExecutable(false)
-// .setExtract(false)
-// .build())
-//
-// command
-// .addUris(URI.newBuilder()
-// .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/amaterasu.properties")
-// .setExecutable(false)
-// .setExtract(false)
-// .build())
-//
-// // setting the processes environment variables
-// val envVarsList = frameworkProvider.getEnvironmentVariables.asScala.toList.map(x => Variable.newBuilder().setName(x._1).setValue(x._2).build()).asJava
-// command.setEnvironment(Environment.newBuilder().addAllVariables(envVarsList))
-//
-// executor = ExecutorInfo
-// .newBuilder
-// .setData(ByteString.copyFrom(execData))
-// .setName(taskId.getValue)
-// .setExecutorId(ExecutorID.newBuilder().setValue(executorId))
-// .setCommand(command)
-// .build()
-//
-// slavesExecutors.put(offer.getSlaveId.getValue, executor)
-//
-//
-// val driverConfiguration = frameworkProvider.getDriverConfiguration(configManager)
-//
-// var actionTask: TaskInfo = null
-//
-// if (runnerProvider.getHasExecutor) {
-// actionTask = TaskInfo
-// .newBuilder
-// .setName(taskId.getValue)
-// .setTaskId(taskId)
-// .setExecutor(executor)
-//
-// .setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env)))
-// .addResources(createScalarResource("cpus", driverConfiguration.getCpus))
-// .addResources(createScalarResource("mem", driverConfiguration.getMemory))
-// .addResources(createScalarResource("disk", config.jobs.repoSize))
-// .setSlaveId(offer.getSlaveId)
-// .build()
-//
-// //driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
-// }
-// else {
-// actionTask = TaskInfo
-// .newBuilder
-// .setName(taskId.getValue)
-// .setTaskId(taskId)
-// .setCommand(command)
-//
-// //.setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env)))
-// .addResources(createScalarResource("cpus", driverConfiguration.getCpus))
-// .addResources(createScalarResource("mem", driverConfiguration.getMemory))
-// .addResources(createScalarResource("disk", config.jobs.repoSize))
-// .setSlaveId(offer.getSlaveId)
-// .build()
-//
-// //driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
-// }
-//
-// printNotification(new Notification("", s"requesting container for ${actionData.getName}", NotificationType.Info, NotificationLevel.Execution))
-// driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
-//
-// }
-// else if (jobManager.getOutOfActions) {
-// log.info(s"framework ${jobManager.getJobId} execution finished")
-//
-// val repo = new File("repo/")
-// repo.delete()
-//
-// HttpServer.stop()
-// driver.declineOffer(offer.getId)
-// driver.stop()
-// sys.exit()
-// }
-// else {
-// log.info("Declining offer, no action ready for execution")
-// driver.declineOffer(offer.getId)
-// }
-// }
-// finally {
-// lock.unlock()
-// }
-// }
-// else {
-// log.info("Declining offer, no sufficient resources")
-// driver.declineOffer(offer.getId)
-// }
-//
-// }
-//
-// }
-//
-// def registered(driver: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo): Unit = {
-//
-// if (!resume) {
-//
-// jobManager = JobLoader.loadJob(
-// src,
-// branch,
-// frameworkId.getValue,
-// userName,
-// password,
-// client,
-// config.jobs.tasks.attempts,
-// new LinkedBlockingQueue[ActionData]()
-// )
-// }
-// else {
-//
-// JobLoader.reloadJob(
-// frameworkId.getValue,
-// userName,
-// password,
-// client,
-// config.jobs.tasks.attempts,
-// new LinkedBlockingQueue[ActionData]()
-// )
-//
-// }
-//
-//
-//
-//
-// jobManager.start()
-//
-// createJobDir(jobManager.getJobId)
-//
-// }
-//
-// def reregistered(driver: SchedulerDriver, masterInfo: Protos.MasterInfo) {}
-//
-// def printNotification(notification: Notification): Unit = {
-//
-// var color = Console.WHITE
-//
-// notification.getNotType match {
-//
-// case NotificationType.Info =>
-// color = Console.WHITE
-// println(s"$color${Console.BOLD}===> ${notification.getMsg} ${Console.RESET}")
-// case NotificationType.Success =>
-// color = Console.GREEN
-// println(s"$color${Console.BOLD}===> ${notification.getLine} ${Console.RESET}")
-// case NotificationType.Error =>
-// color = Console.RED
-// println(s"$color${Console.BOLD}===> ${notification.getLine} ${Console.RESET}")
-// println(s"$color${Console.BOLD}===> ${notification.getMsg} ${Console.RESET}")
-//
-// }
-//
-// }
-//
-// private def createJobDir(jobId: String): Unit = {
-// val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
-// val amaHome = new File(jarFile.getParent).getParent
-// val jobDir = s"$amaHome/dist/$jobId/"
-//
-// val dir = new File(jobDir)
-// if (!dir.exists()) {
-// dir.mkdir()
-// }
-// }
-//
-// /**
-// * This function creates an action specific env.yml file int the dist folder with the following path:
-// * dist/{jobId}/{actionName}/env.yml to be added to the container
-// *
-// * @param configuration A YAML string to be written to the env file
-// * @param jobId the jobId
-// * @param actionName the name of the action
-// */
-// def writeConfigFile(configuration: String, jobId: String, actionName: String, fileName: String): Unit = {
-// val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
-// val amaHome = new File(jarFile.getParent).getParent
-// val envLocation = s"$amaHome/dist/$jobId/$actionName/"
-//
-// val dir = new File(envLocation)
-// if (!dir.exists()) {
-// dir.mkdirs()
-// }
-//
-// new PrintWriter(s"$envLocation/$fileName") {
-// write(configuration)
-// close
-// }
-// }
-//}
-//
-//object JobScheduler {
-//
-// def apply(src: String,
-// branch: String,
-// username: String,
-// password: String,
-// env: String,
-// resume: Boolean,
-// config: ClusterConfig,
-// report: String,
-// home: String): JobScheduler = {
-//
-// LogManager.resetConfiguration()
-// val scheduler = new JobScheduler()
-//
-// HttpServer.start(config.webserver.Port, s"$home/${config.webserver.Root}")
-//
-// if (sys.env.get("AWS_ACCESS_KEY_ID").isDefined &&
-// sys.env.get("AWS_SECRET_ACCESS_KEY").isDefined) {
-//
-// scheduler.awsEnv = s"env AWS_ACCESS_KEY_ID=${sys.env("AWS_ACCESS_KEY_ID")} env AWS_SECRET_ACCESS_KEY=${sys.env("AWS_SECRET_ACCESS_KEY")}"
-// }
-//
-// scheduler.resume = resume
-// scheduler.src = src
-// scheduler.branch = branch
-// scheduler.userName = username
-// scheduler.password = password
-// scheduler.env = env
-// scheduler.reportLevel = NotificationLevel.valueOf(report.capitalize)
-//
-// val retryPolicy = new ExponentialBackoffRetry(1000, 3)
-// scheduler.client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy)
-// scheduler.client.start()
-// scheduler.config = config
-//
-// scheduler
-//
-// }
-//
-//}
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala
deleted file mode 100644
index 8364456..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.utilities
-
-case class Args(
- repo: String = "",
- branch: String = "master",
- env: String = "default",
- name: String = "amaterasu-job",
- jobId: String = null,
- report: String = "code",
- home: String = "",
- newJobId: String = "",
- username: String = "",
- password: String = ""
- ) {
- def toCmdString: String = {
- var cmd = s""" --repo $repo --branch $branch --env $env --name $name --report $report --home $home"""
- if (!username.isEmpty && !password.isEmpty) {
- cmd += s" --user-name $username --password $password"
- }
- if(jobId != null && !jobId.isEmpty) {
- cmd += s" --job-id $jobId"
- }
- cmd
- }
-
- override def toString: String = {
- toCmdString
- }
-}
-
-object Args {
- def getParser: scopt.OptionParser[Args] = {
- val pack = this.getClass.getPackage
- new scopt.OptionParser[Args]("amaterasu job") {
-
- head("amaterasu job", if(pack == null) "DEVELOPMENT" else pack.getImplementationVersion)
-
- opt[String]('r', "repo") action { (x, c) =>
- c.copy(repo = x)
- } text "The git repo containing the job"
-
- opt[String]('b', "branch") action { (x, c) =>
- c.copy(branch = x)
- } text "The branch to be executed (default is master)"
-
- opt[String]('e', "env") action { (x, c) =>
- c.copy(env = x)
- } text "The environment to be executed (test, prod, etc. values from the default env are taken if np env specified)"
-
- opt[String]('n', "name") action { (x, c) =>
- c.copy(name = x)
- } text "The name of the job"
-
- opt[String]('i', "job-id") action { (x, c) =>
- c.copy(jobId = x)
- } text "The jobId - should be passed only when resuming a job"
-
- opt[String]('j', "new-job-id") action { (x, c) =>
- c.copy(newJobId = x)
- } text "A new jobId - should never be passed by a user"
-
- opt[String]('r', "report") action { (x, c) =>
- c.copy(report = x)
- } text "The level of reporting"
-
- opt[String]('h', "home") action { (x, c) =>
- c.copy(home = x)
- }
-
- opt[String]('u', "user-name") action { (x, c) =>
- c.copy(username = x)
- }
-
- opt[String]('p', "password") action { (x, c) =>
- c.copy(password = x)
- }
- }
- }
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
deleted file mode 100644
index 38c90c7..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.utilities
-
-import java.io.FileInputStream
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.logging.Logging
-
-
-abstract class BaseJobLauncher extends Logging with App {
-
- def run(args: Args, config: ClusterConfig, resume: Boolean): Unit = ???
-
- val parser = Args.getParser
- parser.parse(args, Args()) match {
-
- case Some(arguments: Args) =>
-
- val config = ClusterConfig(new FileInputStream(s"${arguments.home}/amaterasu.properties"))
- val resume = arguments.jobId != null
-
- run(arguments, config, resume)
-
- case None =>
- // arguments are bad, error message will have been displayed
- }
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala
deleted file mode 100644
index 0d41bb7..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements. See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package org.apache.amaterasu.leader.utilities
-//
-//import org.apache.amaterasu.common.logging.Logging
-//import org.apache.log4j.{BasicConfigurator, Level, Logger}
-//import org.eclipse.jetty.server.handler._
-//import org.eclipse.jetty.server.{Server, ServerConnector}
-//import org.eclipse.jetty.util.log.StdErrLog
-//import org.jsoup.Jsoup
-//import org.jsoup.select.Elements
-//
-//import scala.collection.JavaConverters._
-//import scala.io.{BufferedSource, Source}
-//
-///**
-// * Implementation of Jetty Web server to server Amaterasu libraries and other distribution files
-// */
-//object HttpServer extends Logging {
-//
-// var server: Server = _
-//
-// def start(port: String, serverRoot: String): Unit = {
-//
-// BasicConfigurator.configure()
-// initLogging()
-//
-// server = new Server()
-// val connector = new ServerConnector(server)
-// connector.setPort(port.toInt)
-// server.addConnector(connector)
-//
-// val handler = new ResourceHandler()
-// handler.setDirectoriesListed(true)
-// handler.setWelcomeFiles(Array[String]("index.html"))
-// handler.setResourceBase(serverRoot)
-// val handlers = new HandlerList()
-// handlers.setHandlers(Array(handler, new DefaultHandler()))
-//
-// server.setHandler(handlers)
-// server.start()
-//
-// }
-//
-// def stop() {
-// if (server == null) throw new IllegalStateException("Server not Started")
-//
-// server.stop()
-// server = null
-// }
-//
-// def initLogging(): Unit = {
-// System.setProperty("org.eclipse.jetty.util.log.class", classOf[StdErrLog].getName)
-// Logger.getLogger("org.eclipse.jetty").setLevel(Level.ALL)
-// Logger.getLogger("org.eclipse.jetty.websocket").setLevel(Level.OFF)
-// }
-//
-// /*
-// Method: getFilesInDirectory
-// Description: provides a list of files in the given directory URL.
-// @Params: amaNode: Hostname of the URL, port: Port # of the host, directory: destination directory to fetch files
-// Note: Should the files in URL root be fetched, provide an empty value to directory.
-// */
-// def getFilesInDirectory(amaNode: String, port: String, directory: String = ""): Array[String] = {
-// //println("http://" + amaNode + ":" + port + "/" + directory)
-// val html: BufferedSource = Source.fromURL("http://" + amaNode + ":" + port + "/" + directory)
-// println(html)
-// val htmlDoc = Jsoup.parse(html.mkString)
-// val htmlElement: Elements = htmlDoc.body().select("a")
-// val files = htmlElement.asScala
-// val fileNames = files.map(url => url.attr("href")).filter(file => !file.contains("..")).map(name => name.replace("/", "")).toArray
-// fileNames
-// }
-//}
\ No newline at end of file
diff --git a/leader/src/test/resources/amaterasu.properties b/leader/src/test/resources/amaterasu.properties
deleted file mode 100755
index 96e3724..0000000
--- a/leader/src/test/resources/amaterasu.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-zk=127.0.0.1
-version=0.2.0-incubating-rc4
-master=10.0.0.31
-user=root
-mode=yarn
-webserver.port=8000
-webserver.root=dist
-spark.version=2.2.1-bin-hadoop2.7
-yarn.queue=default
-yarn.jarspath=hdfs:///apps/amaterasu
-spark.home=/usr/lib/spark
-#spark.home=/opt/cloudera/parcels/SPARK2-2.1.0.cloudera2-1.cdh5.7.0.p0.171658/lib/spark2
-yarn.hadoop.home.dir=/etc/hadoop
-spark.opts.spark.yarn.am.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
-spark.opts.spark.driver.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
-yarn.master.memoryMB=2048
-yarn.worker.memoryMB=2048
-mesos.libPath=/usr/local/lib/libmesos.dylib
\ No newline at end of file
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala
deleted file mode 100755
index ca9186c..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution
-
-import java.util
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.execution.actions.SequentialAction
-import org.apache.curator.framework.CuratorFrameworkFactory
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.test.TestingServer
-import org.apache.zookeeper.CreateMode
-import org.scalatest.{DoNotDiscover, FlatSpec, Matchers}
-
-import scala.collection.JavaConverters._
-
-class ActionStatusTests extends FlatSpec with Matchers {
-
- // setting up a testing zookeeper server (curator TestServer)
- val retryPolicy = new ExponentialBackoffRetry(1000, 3)
- val server = new TestingServer(2181, true)
- val jobId = s"job_${System.currentTimeMillis}"
- val data = new ActionData(ActionStatus.Pending, "test_action", "start.scala", "", "spark","scala", "0000001", new util.HashMap() , List[String]().asJava)
-
- "an Action" should "queue it's ActionData int the job queue when executed" in {
-
- val queue = new LinkedBlockingQueue[ActionData]()
- // val config = ClusterConfig()
-
- val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
- client.start()
-
- client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
- val action = new SequentialAction(data.getName, data.getSrc, "", data.getGroupId, data.getTypeId, Map.empty[String, String].asJava, jobId, queue, client, 1)
-
- action.execute()
- queue.peek().getName should be(data.getName)
- queue.peek().getSrc should be(data.getSrc)
-
- }
-
- it should "also create a sequential znode for the task with the value of Queued" in {
-
- val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
- client.start()
-
- val taskStatus = client.getData.forPath(s"/$jobId/task-0000000000")
-
- taskStatus should not be null
- new String(taskStatus) should be("Queued")
-
- }
-
-}
\ No newline at end of file
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
deleted file mode 100755
index 28b1ce3..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.dsl.JobParser
-import org.apache.amaterasu.leader.common.execution.actions.Action
-import org.apache.curator.framework.CuratorFrameworkFactory
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.test.TestingServer
-import org.apache.zookeeper.CreateMode
-import org.scalatest.{FlatSpec, Matchers}
-
-import scala.io.Source
-
-class JobExecutionTests extends FlatSpec with Matchers {
-
- val retryPolicy = new ExponentialBackoffRetry(1000, 3)
- val server = new TestingServer(2183, true)
- val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
- client.start()
-
- val jobId = s"job_${System.currentTimeMillis}"
- val yaml = Source.fromURL(getClass.getResource("/simple-maki.yml")).mkString
- val queue = new LinkedBlockingQueue[ActionData]()
-
- // this will be performed by the job bootstraper
-
- client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
- // client.setData().forPath(s"/$jobId/src",src.getBytes)
- // client.setData().forPath(s"/$jobId/branch", branch.getBytes)
-
-
- val job = JobParser.parse(jobId, yaml, queue, client, 1)
-
- "a job" should "queue the first action when the JobManager.start method is called " in {
-
- job.start
-
- queue.peek.getName should be("start")
-
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
- new String(actionStatus) should be("Queued")
-
- }
-
- it should "return the start action when calling getNextAction and dequeue it" in {
-
- job.getNextActionData.getName should be("start")
- queue.size should be(0)
-
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
- new String(actionStatus) should be("Started")
- }
-
- it should "not be out of actions when an action is still Pending" in {
- job.getOutOfActions should be(false)
- }
-
- it should "be requeued correctly" in {
- job.reQueueAction("0000000000")
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
- new String(new String(actionStatus)) should be("Queued")
- }
-
- it should "should be able to restart" in {
-
- val data = job.getNextActionData
-
- data.getName should be("start")
-
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
- new String(actionStatus) should be("Started")
- }
-
- it should "be marked as Complete when the actionComplete method is called" in {
-
- job.actionComplete("0000000000")
-
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
-
- new String(new String(actionStatus)) should be("Complete")
-
- }
-
- "the next step2 job" should "be Queued as a result of the completion" in {
-
- queue.peek.getName should be("step2")
-
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
- new String(actionStatus) should be("Queued")
-
- }
-
- it should "be marked as Started when JobManager.getNextActionData is called" in {
-
- val data = job.getNextActionData
-
- data.getName should be("step2")
-
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
- new String(actionStatus) should be("Started")
- }
-
- it should "be marked as Failed when JobManager.actionFailed is called" in {
-
- job.actionFailed("0000000001", "test failure")
- queue.peek.getName should be("error-action")
- }
-
- "an ErrorAction" should "be queued if one exist" in {
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001-error")
- new String(actionStatus) should be("Queued")
-
- // and returned by getNextActionData
- val data = job.getNextActionData
-
- }
-
- it should "be marked as Complete when the actionComplete method is called" in {
-
- job.actionComplete("0000000001-error")
-
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001-error")
-
- new String(new String(actionStatus)) should be("Complete")
-
- }
-
- it should " be out of actions when all actions have been executed" in {
-
- job.getOutOfActions should be(true)
- }
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
deleted file mode 100755
index 5987b35..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.dsl.JobParser
-import org.apache.curator.framework.CuratorFrameworkFactory
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.test.TestingServer
-import org.apache.zookeeper.CreateMode
-import org.scalatest.{FlatSpec, Matchers}
-
-import scala.io.Source
-
-class JobParserTests extends FlatSpec with Matchers {
-
- val retryPolicy = new ExponentialBackoffRetry(1000, 3)
- val server = new TestingServer(2187, true)
- val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
- client.start()
-
- private val jobId = s"job_${System.currentTimeMillis}"
- private val yaml = Source.fromURL(getClass.getResource("/simple-maki.yml")).mkString
- private val queue = new LinkedBlockingQueue[ActionData]()
-
- // this will be performed by the job bootstrapper
- client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
-
- private val job = JobParser.parse(jobId, yaml, queue, client, 1)
-
- "JobParser" should "parse the simple-maki.yml" in {
-
- job.getName should be("amaterasu-test")
-
- }
-
- //TODO: I suspect this test is not indicative, and that order is assured need to verify this
- it should "also have two actions in the right order" in {
-
- job.getRegisteredActions.size should be(3)
-
- job.getRegisteredActions.get("0000000000").data.getName should be("start")
- job.getRegisteredActions.get("0000000001").data.getName should be("step2")
- job.getRegisteredActions.get("0000000001-error").data.getName should be("error-action")
-
- }
-
- it should "Action 'config' is parsed successfully" in {
-
- job.getRegisteredActions.size should be(3)
-
- job.getRegisteredActions.get("0000000000").data.getConfig should be("start-cfg.yaml")
- job.getRegisteredActions.get("0000000001").data.getConfig should be("step2-cfg.yaml")
- job.getRegisteredActions.get("0000000001-error").data.getConfig should be("error-cfg.yaml")
-
- }
-
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala
deleted file mode 100755
index 235eccb..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.execution.{JobLoader, JobManager}
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.test.TestingServer
-import org.apache.zookeeper.CreateMode
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import scala.io.Source
-
-class JobRestoreTests extends FlatSpec with Matchers with BeforeAndAfterEach {
-
- val retryPolicy = new ExponentialBackoffRetry(1000, 3)
- val server = new TestingServer(2184, true)
- var client: CuratorFramework = null
-
- val jobId = s"job_${System.currentTimeMillis}"
- val maki = Source.fromURL(getClass.getResource("/simple-maki.yml")).mkString
- val queue = new LinkedBlockingQueue[ActionData]()
-
- var manager: JobManager = null
-
- client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
- client.start()
-
- override def beforeEach(): Unit = {
-
- // creating the jobs znode and storing the source repo and branch
- client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
- client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId/src", "".getBytes)
- client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId/branch", "".getBytes)
-
- manager = JobLoader.createJobManager(maki, jobId, client, 3, queue)
-
- }
-
- override def afterEach(): Unit = {
-
- client.delete().deletingChildrenIfNeeded().forPath(s"/$jobId")
-
- }
-
- "a restored job" should "have all Queued actions in the executionQueue" in {
-
- // setting task-0000000002 as Queued
- client.setData().forPath(s"/${jobId}/task-0000000002", ActionStatus.Queued.toString.getBytes)
-
- JobLoader.restoreJobState(manager, jobId, client)
-
- queue.peek.getName should be("start")
- }
-
- "a restored job" should "have all Started actions in the executionQueue" in {
-
- // setting task-0000000002 as Queued
- client.setData().forPath(s"/${jobId}/task-0000000002", ActionStatus.Started.toString.getBytes)
-
- JobLoader.restoreJobState(manager, jobId, client)
-
- queue.peek.getName should be("start")
- }
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala b/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala
deleted file mode 100755
index 1e9b48b..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.integration
-
-import org.apache.amaterasu.leader.common.dsl.GitUtil
-import org.scalatest.{FlatSpec, Matchers}
-
-import scala.reflect.io.Path
-
-class GitTests extends FlatSpec with Matchers {
-
- "GitUtil.cloneRepo" should "clone the sample job git repo" in {
-
- val path = Path("repo")
- path.deleteRecursively()
-
- GitUtil.cloneRepo("https://github.com/shintoio/amaterasu-job-sample.git", "master", "", "")
-
- val exists = new java.io.File("repo/maki.yml").exists
- exists should be(true)
- }
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/leader/mesos/MesosTestUtil.scala b/leader/src/test/scala/org/apache/amaterasu/leader/mesos/MesosTestUtil.scala
deleted file mode 100755
index 0397b87..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/leader/mesos/MesosTestUtil.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.mesos
-
-import java.util
-import java.util.UUID
-
-import org.apache.mesos.Protos._
-
-
-object MesosTestUtil {
- def createOffer(mem: Double, disk: Double, cpus: Int): Offer = {
-
- val resources = new util.ArrayList[Resource]()
- resources.add(createScalarResource("mem", mem))
- resources.add(createScalarResource("disk", disk))
- resources.add(createScalarResource("cpus", cpus))
-
- Offer.newBuilder()
- .setId(OfferID.newBuilder().setValue(UUID.randomUUID.toString))
- .setFrameworkId(FrameworkID.newBuilder().setValue("Amaterasu"))
- .setSlaveId(SlaveID.newBuilder().setValue(UUID.randomUUID.toString))
- .setHostname("localhost")
- .addAllResources(resources)
-
- .build()
-
- }
-
- def createScalarResource(name: String, value: Double): org.apache.mesos.Protos.Resource = {
-
- org.apache.mesos.Protos.Resource.newBuilder().setName(name).setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder.setValue(value).build()).build()
-
- }
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
deleted file mode 100644
index 7eba1da..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.utilities
-
-
-import java.io.File
-
-import org.scalatest.{FlatSpec, Matchers}
-
-
-class HttpServerTests extends FlatSpec with Matchers {
-
- // this is an ugly hack, getClass.getResource("/").getPath should have worked but
- // stopped working when we moved to gradle :(
- val resources: String = new File(getClass.getResource("/simple-maki.yml").getPath).getParent
-
- // "Jetty Web server" should "start HTTP server, serve content and stop successfully" in {
- //
- // var data = ""
- // try {
- // HttpServer.start("8000", resources)
- // val html = Source.fromURL("http://localhost:8000/jetty-test-data.txt")
- // data = html.mkString
- // }
- // finally {
- // HttpServer.stop()
- // }
- // data should equal("This is a test file to download from Jetty webserver")
- // }
-
-// "Jetty File server with '/' as root" should "start HTTP server, serve content and stop successfully" in {
-//
-// var urlCount: Int = 0
-// println("resource location" + resources)
-// try {
-// HttpServer.start("8000", resources)
-// val urls = HttpServer.getFilesInDirectory("127.0.0.1", "8000", "dist")
-// urls.foreach(println)
-// urlCount = urls.length
-// } catch {
-// case e: Exception => println(s"++++>> ${e.getMessage}")
-// }
-// finally {
-// HttpServer.stop()
-// }
-// urlCount should equal(2)
-// }
-//
-// "Jetty File server with 'dist' as root" should "start HTTP server, serve content and stop successfully" in {
-// var data = ""
-// var urlCount: Int = 0
-// println("resource location" + resources)
-// try {
-// HttpServer.start("8000", resources + "/dist")
-// val urls = HttpServer.getFilesInDirectory("localhost", "8000", "")
-// urls.foreach(println)
-// urlCount = urls.length
-// }
-// finally {
-// HttpServer.stop()
-// }
-// urlCount should equal(2)
-// }
-}
diff --git a/settings.gradle b/settings.gradle
index 9e757dc..076924e 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -16,8 +16,8 @@
*/
// Core
-include 'leader'
-project(':leader')
+//include 'leader'
+//project(':leader')
include 'leader-common'
project(':leader-common')