Merge branch 'master' into AMATERASU-36-mesos
diff --git a/.gitignore b/.gitignore
index eaefdef..fce2c13 100755
--- a/.gitignore
+++ b/.gitignore
@@ -52,4 +52,4 @@
repo/**
#python
-.zip
\ No newline at end of file
+*.zip
\ No newline at end of file
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/Export.scala b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/Export.scala
deleted file mode 100644
index 8db81bc..0000000
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/Export.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.dataobjects
-
-case class Export(dataset: String, format: String)
diff --git a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/JobData.scala b/common/src/main/scala/org/apache/amaterasu/common/dataobjects/JobData.scala
deleted file mode 100755
index acf5c4a..0000000
--- a/common/src/main/scala/org/apache/amaterasu/common/dataobjects/JobData.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.dataobjects
-
-import org.joda.time.DateTime
-
-case class JobData(src: String, branch: String = "master", id: String, timeCreated: DateTime, startTime: DateTime, endTime: DateTime)
\ No newline at end of file
diff --git a/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala b/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala
deleted file mode 100644
index 9813296..0000000
--- a/common/src/main/scala/org/apache/amaterasu/common/utils/FileUtils.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.utils
-
-import java.io.File
-
-object FileUtils {
-
- def getAllFiles(dir: File): Array[File] = {
- val these = dir.listFiles
- these ++ these.filter(_.isDirectory).flatMap(getAllFiles)
- }
-
-}
diff --git a/executor/build.gradle b/executor/build.gradle
deleted file mode 100644
index 443fc38..0000000
--- a/executor/build.gradle
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-plugins {
- id 'com.github.johnrengelman.shadow' version '1.2.4'
- id 'com.github.maiflai.scalatest' version '0.22'
- id 'scala'
- id 'java'
-}
-
-shadowJar {
- zip64 true
-}
-
-//sourceCompatibility = 1.8
-//targetCompatibility = 1.8
-
-repositories {
- maven {
- url "https://plugins.gradle.org/m2/"
- }
- mavenCentral()
-}
-
-test {
- maxParallelForks = 1
- forkEvery = 1
-}
-
-configurations {
- provided
-}
-
-sourceSets {
- main.compileClasspath += configurations.provided
- test.compileClasspath += configurations.provided
- test.runtimeClasspath += configurations.provided
-}
-
-dependencies {
-
- compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.8'
- compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
- compile group: 'io.netty', name: 'netty-all', version: '4.0.42.Final'
- compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
- compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
- compile group: 'org.reflections', name: 'reflections', version: '0.9.10'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
- compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.9.8'
-
- compile('com.jcabi:jcabi-aether:0.10.1') {
- exclude group: 'org.jboss.netty'
- }
-
-
- compile project(':common')
- compile project(':amaterasu-sdk')
-
-
-
-}
-
-sourceSets {
- test {
- resources.srcDirs += [file('src/test/resources')]
- }
-
- // this is done so Scala will compile before Java
- main {
- scala {
- srcDirs = ['src/main/scala', 'src/main/java']
- }
- java {
- srcDirs = []
- }
- }
-}
-
-test {
-
- maxParallelForks = 1
-}
-
-task copyToHome(type: Copy) {
- dependsOn shadowJar
- from 'build/libs'
- into '../build/amaterasu/dist'
- from 'build/resources/main'
- into '../build/amaterasu/dist'
-}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala b/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
deleted file mode 100644
index 5f78bc2..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/common/executors/ProvidersFactory.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.common.executors
-
-import java.io.ByteArrayOutputStream
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.dataobjects.ExecData
-import org.apache.amaterasu.common.execution.actions.Notifier
-import org.apache.amaterasu.sdk.{AmaterasuRunner, RunnersProvider}
-import org.reflections.Reflections
-
-import scala.collection.JavaConversions._
-
-//TODO: Check if we can use this in the YARN impl
-class ProvidersFactory {
-
- var providers: Map[String, RunnersProvider] = _
-
- def getRunner(groupId: String, id: String): Option[AmaterasuRunner] = {
- val provider = providers.get(groupId)
- provider match {
- case Some(provider) => Some(provider.getRunner(id))
- case None => None
- }
- }
-}
-
-object ProvidersFactory {
-
- def apply(data: ExecData,
- jobId: String,
- outStream: ByteArrayOutputStream,
- notifier: Notifier,
- executorId: String,
- hostName: String,
- propFile: String = null): ProvidersFactory = {
-
- val result = new ProvidersFactory()
- val reflections = new Reflections(getClass.getClassLoader)
- val runnerTypes = reflections.getSubTypesOf(classOf[RunnersProvider]).toSet
- val config = if (propFile != null) {
- import java.io.FileInputStream
- ClusterConfig.apply(new FileInputStream(propFile))
- } else {
- new ClusterConfig()
- }
-
- result.providers = runnerTypes.map(r => {
-
- val provider = Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[RunnersProvider]
-
- provider.init(data, jobId, outStream, notifier, executorId, config, hostName)
- notifier.info(s"a provider for group ${provider.getGroupIdentifier} was created")
- (provider.getGroupIdentifier, provider)
- }).toMap
-
- result
- }
-
-}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
deleted file mode 100755
index f204db8..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosActionsExecutor.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.mesos.executors
-
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-import org.apache.mesos.Protos._
-import org.apache.mesos.protobuf.ByteString
-import org.apache.mesos.{Executor, ExecutorDriver, MesosExecutorDriver}
-
-import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-
-class MesosActionsExecutor extends Logging with Executor {
-
- var master: String = _
- var executorDriver: ExecutorDriver = _
- var jobId: String = _
- var actionName: String = _
- // var sparkScalaRunner: SparkScalaRunner = _
- // var pySparkRunner: PySparkRunner = _
- var notifier: MesosNotifier = _
- var providersFactory: ProvidersFactory = _
-
- val mapper = new ObjectMapper()
- mapper.registerModule(DefaultScalaModule)
-
-
- override def shutdown(driver: ExecutorDriver) = {
-
- }
-
- override def killTask(driver: ExecutorDriver, taskId: TaskID) = ???
-
- override def disconnected(driver: ExecutorDriver) = ???
-
- override def reregistered(driver: ExecutorDriver, slaveInfo: SlaveInfo) = {
- this.executorDriver = driver
- }
-
- override def error(driver: ExecutorDriver, message: String) = {
-
- val status = TaskStatus.newBuilder
- .setData(ByteString.copyFromUtf8(message))
- .setState(TaskState.TASK_ERROR).build()
-
- driver.sendStatusUpdate(status)
-
- }
-
- override def frameworkMessage(driver: ExecutorDriver, data: Array[Byte]) = ???
-
- override def registered(driver: ExecutorDriver, executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo): Unit = {
-
- this.executorDriver = driver
- val data = mapper.readValue(new ByteArrayInputStream(executorInfo.getData.toByteArray), classOf[ExecData])
-
- // this is used to resolve the spark drier address
- val hostName = slaveInfo.getHostname
- notifier = new MesosNotifier(driver)
- notifier.info(s"Executor ${executorInfo.getExecutorId.getValue} registered")
- val outStream = new ByteArrayOutputStream()
- providersFactory = ProvidersFactory(data, jobId, outStream, notifier, executorInfo.getExecutorId.getValue, hostName, "./amaterasu.properties")
-
- }
-
- override def launchTask(driver: ExecutorDriver, taskInfo: TaskInfo): Unit = {
-
-
- notifier.info(s"launching task: ${taskInfo.getTaskId.getValue}")
- log.debug(s"launching task: $taskInfo")
- val status = TaskStatus.newBuilder
- .setTaskId(taskInfo.getTaskId)
- .setState(TaskState.TASK_STARTING).build()
-
- notifier.info(s"container info: ${taskInfo.getContainer.getDocker.getImage}, ${taskInfo.getContainer.getType}")
-
- driver.sendStatusUpdate(status)
-
- val task = Future {
-
- val taskData = mapper.readValue(new ByteArrayInputStream(taskInfo.getData.toByteArray), classOf[TaskData])
-
- val status = TaskStatus.newBuilder
- .setTaskId(taskInfo.getTaskId)
- .setState(TaskState.TASK_RUNNING).build()
- driver.sendStatusUpdate(status)
- val runner = providersFactory.getRunner(taskData.getGroupId, taskData.getTypeId)
- runner match {
- case Some(r) => r.executeSource(taskData.getSrc, actionName, taskData.getExports)
- case None =>
- notifier.error("", s"Runner not found for group: ${taskData.getGroupId}, type ${taskData.getTypeId}. Please verify the tasks")
- None
- }
-
- }
-
- task onComplete {
-
- case Failure(t) =>
- println(s"launching task Failed: ${t.getMessage}")
- System.exit(1)
-
- case Success(ts) =>
-
- driver.sendStatusUpdate(TaskStatus.newBuilder()
- .setTaskId(taskInfo.getTaskId)
- .setState(TaskState.TASK_FINISHED).build())
- notifier.info(s"Complete task: ${taskInfo.getTaskId.getValue}")
-
- }
-
- }
-
-}
-
-object MesosActionsExecutor extends Logging {
-
- def main(args: Array[String]) {
- System.loadLibrary("mesos")
- log.debug("Starting a new ActionExecutor")
-
- val executor = new MesosActionsExecutor
- executor.jobId = args(0)
- executor.master = args(1)
- executor.actionName = args(2)
-
- val driver = new MesosExecutorDriver(executor)
- driver.run()
- }
-
-}
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala b/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala
deleted file mode 100755
index b256386..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/mesos/executors/MesosNotifier.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.mesos.executors
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.execution.actions.enums.{NotificationLevel, NotificationType}
-import org.apache.amaterasu.common.execution.actions.{Notification, Notifier}
-import org.apache.mesos.ExecutorDriver
-
-
-class MesosNotifier(driver: ExecutorDriver) extends Notifier {
-
- private val mapper = new ObjectMapper()
- mapper.registerModule(DefaultScalaModule)
-
- override def success(line: String): Unit = {
-
-
- getLog.info(s"successfully executed line: $line")
-
- val notification = new Notification(line, "", NotificationType.Success, NotificationLevel.Code)
- val msg = mapper.writeValueAsBytes(notification)
-
- driver.sendFrameworkMessage(msg)
-
- }
-
- override def error(line: String, message: String): Unit = {
-
- getLog.error(s"Error executing line: $line message: $message")
-
- val notification = new Notification(line, message, NotificationType.Error, NotificationLevel.Code)
- val msg = mapper.writeValueAsBytes(notification)
-
- driver.sendFrameworkMessage(msg)
-
- }
-
- override def info(message: String): Unit = {
-
- getLog.info(message)
-
- val notification = new Notification("", message, NotificationType.Info, NotificationLevel.Execution)
- val msg = mapper.writeValueAsBytes(notification)
-
- driver.sendFrameworkMessage(msg)
- }
-}
\ No newline at end of file
diff --git a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala b/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
deleted file mode 100644
index cda6351..0000000
--- a/executor/src/main/scala/org/apache/amaterasu/executor/yarn/executors/ActionsExecutor.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.executor.yarn.executors
-
-import java.io.ByteArrayOutputStream
-import java.net.{InetAddress, URLDecoder}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.dataobjects.{ExecData, TaskData}
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.common.utils.ActiveNotifier
-import org.apache.amaterasu.executor.common.executors.ProvidersFactory
-
-import scala.collection.JavaConverters._
-
-
-class ActionsExecutor extends Logging {
-
- var master: String = _
- var jobId: String = _
- var actionName: String = _
- var taskData: TaskData = _
- var execData: ExecData = _
- var providersFactory: ProvidersFactory = _
-
- def execute(): Unit = {
- val runner = providersFactory.getRunner(taskData.getGroupId, taskData.getTypeId)
- runner match {
- case Some(r) => {
- try {
- r.executeSource(taskData.getSrc, actionName, taskData.getExports)
- log.info("Completed action")
- System.exit(0)
- } catch {
- case e: Exception => {
- log.error("Exception in execute source", e)
- System.exit(100)
- }
- }
- }
- case None =>
- log.error("", s"Runner not found for group: ${taskData.getGroupId}, type ${taskData.getTypeId}. Please verify the tasks")
- System.exit(101)
- }
- }
-}
-
-object ActionsExecutorLauncher extends Logging with App {
-
- val hostName = InetAddress.getLocalHost.getHostName
-
- println(s"Hostname resolved to: $hostName")
- val mapper = new ObjectMapper()
- mapper.registerModule(DefaultScalaModule)
-
- log.info("Starting actions executor")
-
- val jobId = this.args(0)
- val master = this.args(1)
- val actionName = this.args(2)
- val notificationsAddress = this.args(6)
-
- log.info("parsing task data")
- val taskData = mapper.readValue(URLDecoder.decode(this.args(3), "UTF-8"), classOf[TaskData])
- log.info("parsing executor data")
- val execData = mapper.readValue(URLDecoder.decode(this.args(4), "UTF-8"), classOf[ExecData])
- val taskIdAndContainerId = this.args(5)
-
- val actionsExecutor: ActionsExecutor = new ActionsExecutor
- actionsExecutor.master = master
- actionsExecutor.actionName = actionName
- actionsExecutor.taskData = taskData
- actionsExecutor.execData = execData
-
- log.info("Setup executor")
- val baos = new ByteArrayOutputStream()
- val notifier = new ActiveNotifier(notificationsAddress)
-
- actionsExecutor.providersFactory = ProvidersFactory(execData, jobId, baos, notifier, taskIdAndContainerId, hostName, propFile = "./amaterasu.properties")
- actionsExecutor.execute()
-}
diff --git a/executor/src/test/resources/codegen.pyc b/executor/src/test/resources/codegen.pyc
deleted file mode 100644
index ab32046..0000000
--- a/executor/src/test/resources/codegen.pyc
+++ /dev/null
Binary files differ
diff --git a/frameworks/python/dispatcher/build.gradle b/frameworks/python/dispatcher/build.gradle
index b1cbd39..35ab5aa 100644
--- a/frameworks/python/dispatcher/build.gradle
+++ b/frameworks/python/dispatcher/build.gradle
@@ -71,7 +71,8 @@
dependencies {
compile project(':common')
compile project(':amaterasu-sdk')
- compile project(':leader')
+ compile project(':leader-common')
+
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect"
compile 'com.uchuhimo:konf:0.11'
diff --git a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
index 9d83c65..d5a691e 100644
--- a/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
+++ b/frameworks/python/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/python/dispatcher/PythonSetupProvider.kt
@@ -30,18 +30,14 @@
private var conf: ClusterConfig? = null
private var runnerProviders: Map<String, RunnerSetupProvider> = mapOf()
- override val groupIdentifier: String
- get() = "python"
- override val groupResources: Array<File>
- get() = arrayOf()
+ override val groupIdentifier: String = "python"
+ override val groupResources: List<File> = listOf()
override fun getDriverConfiguration(configManager: ConfigManager): DriverConfiguration {
return DriverConfiguration(conf!!.taskMem(), 1) //TODO: this should be configured on env level
}
- override val environmentVariables: Map<String, String>
- get() = mapOf()
- override val configurationItems: Array<String>
- get() = arrayOf()
+ override val environmentVariables: Map<String, String> = mapOf()
+ override val configurationItems: List<String> = listOf()
override fun init(env: String, conf: ClusterConfig) {
this.env = env
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
index e3e503f..7a5a42d 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/SparkSetupProvider.kt
@@ -54,11 +54,11 @@
}
}
- override val groupResources: Array<File> by lazy {
+ override val groupResources: List<File> by lazy {
when (conf.mode()) {
- "mesos" -> arrayOf(File("spark-${conf.webserver().sparkVersion()}.tgz"), File("spark-runtime-${conf.version()}.jar"))
- "yarn" -> arrayOf(File("spark-runtime-${conf.version()}.jar"), File("executor-${conf.version()}-all.jar"), File(conf.spark().home()))
- else -> arrayOf()
+ "mesos" -> listOf(File("spark-${conf.webserver().sparkVersion()}.tgz"), File("spark-runtime-${conf.version()}-all.jar"))
+ "yarn" -> listOf(File("spark-runtime-${conf.version()}-all.jar"), File(conf.spark().home()))
+ else -> listOf()
}
}
@@ -67,7 +67,7 @@
}
override val groupIdentifier: String = "spark"
- override val configurationItems = arrayOf("sparkProperties", "sparkOptions")
+ override val configurationItems = listOf("sparkProperties", "sparkOptions")
override fun getDriverConfiguration(configManager: ConfigManager): DriverConfiguration {
val sparkOptions: Map<String, Any> = configManager.config["sparkOptions"]
diff --git a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
index f413fd1..a1b7a24 100644
--- a/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
+++ b/frameworks/spark/dispatcher/src/main/kotlin/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.kt
@@ -37,7 +37,7 @@
SparkCommandLineHelper.getOptions(sparkOptions) + " " +
SparkCommandLineHelper.getProperties(sparkProperties) + " " +
master +
- " --jars spark-runtime-${conf.version()}.jar >&1"
+ " --jars spark-runtime-${conf.version()}-all.jar >&1"
}
override fun getActionUserResources(jobId: String, actionData: ActionData): Array<String> = arrayOf()
diff --git a/frameworks/spark/runtime/build.gradle b/frameworks/spark/runtime/build.gradle
index 3bfd7d4..1f2659d 100644
--- a/frameworks/spark/runtime/build.gradle
+++ b/frameworks/spark/runtime/build.gradle
@@ -54,8 +54,7 @@
}
dependencies {
-
- compile project(':executor')
+ compile project(':common')
provided('org.apache.spark:spark-repl_2.11:2.2.1')
provided('org.apache.spark:spark-core_2.11:2.2.1')
@@ -81,7 +80,8 @@
}
task copyToHome(type: Copy) {
- dependsOn jar
+ dependsOn shadowJar
+
from 'build/libs'
into '../../../build/amaterasu/dist'
from 'build/resources/main'
diff --git a/leader-common/build.gradle b/leader-common/build.gradle
index 928a444..b2734cf 100644
--- a/leader-common/build.gradle
+++ b/leader-common/build.gradle
@@ -34,7 +34,6 @@
plugins {
id "com.github.johnrengelman.shadow" version "2.0.4"
- id 'scala'
}
apply plugin: 'kotlin'
@@ -66,12 +65,10 @@
}
dependencies {
- compile 'org.scala-lang:scala-library:2.11.8'
compile project(':common')
compile project(':amaterasu-sdk')
- compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.6.3'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
@@ -87,6 +84,16 @@
compile group: 'com.beust', name: 'klaxon', version: '5.0.1'
compile group: 'com.github.ajalt', name: 'clikt', version: '1.6.0'
+ compile group: 'org.eclipse.jetty', name: 'jetty-plus', version: '9.4.14.v20181114'
+ compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.4.14.v20181114'
+ compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.4.14.v20181114'
+ compile group: 'org.eclipse.jetty', name: 'jetty-io', version: '9.4.14.v20181114'
+ compile group: 'org.eclipse.jetty', name: 'jetty-servlet', version: '9.4.14.v20181114'
+ compile group: 'org.eclipse.jetty.toolchain', name: 'jetty-test-helper', version: '4.0'
+
+ compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
+ //compile group: 'io.ktor', name: 'ktor-client-core', version: '0.9.5'
+
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
compile "org.jetbrains.kotlin:kotlin-reflect"
@@ -116,3 +123,8 @@
compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
}
+
+task copyToHome(type: Copy) {
+ from 'src/main/scripts'
+ into '../build/amaterasu/'
+}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
index 0a2863b..0f63e1d 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobLoader.kt
@@ -90,7 +90,7 @@
val status = ActionStatus.valueOf(String(client.data.forPath("/$jobId/$task")))
if (status == ActionStatus.Queued || status == ActionStatus.Started) {
- jobManager.reQueueAction(task.substring(task.indexOf("task-") + 5))
+ jobManager.requeueAction(task.substring(task.indexOf("task-") + 5))
}
}
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
index 1853e8c..3c77ba2 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/JobManager.kt
@@ -72,7 +72,7 @@
return nextAction
}
- fun reQueueAction(actionId: String) {
+ fun requeueAction(actionId: String) {
log.info("requeing action $actionId")
registeredActions.forEach { log.info("key ${it.key}") }
@@ -124,14 +124,14 @@
val action = registeredActions[actionId]
val id = action!!.handleFailure(message)
- if (!id.isEmpty())
+ if (id.isNotEmpty())
registeredActions[id]?.execute()
//delete all future actions
cancelFutureActions(action)
}
- fun cancelFutureActions(action: Action) {
+ private fun cancelFutureActions(action: Action) {
if (action.data.status != ActionStatus.Failed)
action.announceCanceled()
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.kt
similarity index 75%
rename from leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
rename to leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.kt
index 48f619a..18a2fc8 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworls/FrameworkProvidersFactory.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.kt
@@ -14,24 +14,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.amaterasu.leader.common.execution.frameworls
+package org.apache.amaterasu.leader.common.execution.frameworks
import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.configuration.ConfigManager
import org.apache.amaterasu.common.logging.KLogging
import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
import org.reflections.Reflections
class FrameworkProvidersFactory(val env: String, val config: ClusterConfig) : KLogging() {
- var providers: Map<String, FrameworkSetupProvider>
+ val providers: MutableMap<String, FrameworkSetupProvider> = mutableMapOf()
+
+ val groups: List<String> by lazy { providers.keys.toList() }
+
+ fun getFramework(groupId: String): FrameworkSetupProvider = providers.getValue(groupId)
init {
- val reflections = Reflections(ClassLoader::class.java)
+ val reflections = Reflections(this::class.java.classLoader)
val runnerTypes = reflections.getSubTypesOf(FrameworkSetupProvider::class.java)
-
- providers = runnerTypes.map {
+ val loadedProviders = runnerTypes.map {
val provider = it.newInstance()
@@ -41,5 +43,7 @@
provider.groupIdentifier to provider
}.toMap()
+
+ providers.putAll(loadedProviders)
}
}
\ No newline at end of file
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.kt
index 6bda13c..f73d717 100644
--- a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.kt
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/ActiveReportListener.kt
@@ -41,7 +41,7 @@
else -> println("===> Unknown message")
}
- private fun printNotification(notification: Notification) = when (notification.notType) {
+ fun printNotification(notification: Notification) = when (notification.notType) {
NotificationType.Info ->
println("===> ${notification.msg} ".brightWhite().bold())
diff --git a/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/HttpServer.kt b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/HttpServer.kt
new file mode 100644
index 0000000..5b6563f
--- /dev/null
+++ b/leader-common/src/main/kotlin/org/apache/amaterasu/leader/common/utilities/HttpServer.kt
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.utilities
+
+import org.apache.log4j.BasicConfigurator
+import org.apache.log4j.Level
+import org.eclipse.jetty.server.Server
+import org.eclipse.jetty.server.ServerConnector
+import org.eclipse.jetty.server.handler.DefaultHandler
+import org.eclipse.jetty.server.handler.HandlerList
+import org.eclipse.jetty.server.handler.ResourceHandler
+import org.jsoup.Jsoup
+import org.jsoup.select.Elements
+import org.apache.log4j.Logger
+import org.eclipse.jetty.util.log.StdErrLog
+
+class HttpServer {
+ lateinit var server: Server
+
+ fun start(port: String, serverRoot: String) {
+ BasicConfigurator.configure()
+ initLogging()
+
+ server = Server()
+ val connector = ServerConnector(server)
+ connector.port = port.toInt()
+ server.addConnector(connector)
+
+ val handler = ResourceHandler()
+ handler.isDirectoriesListed = true
+ handler.welcomeFiles = arrayOf("index.html")
+ handler.resourceBase = serverRoot
+ val handlers = HandlerList()
+ handlers.handlers = arrayOf(handler, DefaultHandler())
+
+ server.handler = handlers
+ server.start()
+ }
+
+ private fun initLogging() {
+ System.setProperty("org.eclipse.jetty.util.log.class", StdErrLog::javaClass.name)
+ Logger.getLogger("org.eclipse.jetty").level = Level.ALL
+ Logger.getLogger("org.eclipse.jetty.websocket").level = Level.OFF
+ }
+
+ /**
+ * Method: getFilesInDirectory
+ * Description: provides a list of files in the given directory URL.
+ * @Params: amaNode: Hostname of the URL, port: Port # of the host, directory: destination directory to fetch files
+ * Note: Should the files in URL root be fetched, provide an empty value to directory.
+ */
+ fun getFilesInDirectory(amaNode: String, port: String, directory: String = ""): Array<String> {
+
+ val htmlDoc = Jsoup.connect("http://$amaNode:$port/$directory").get()
+ val files: Elements = htmlDoc.body().select("a")
+ return files.map { it.attr("href") }
+ .filter { !it.contains("..") }
+ .map { it.replace("/", "") }.toTypedArray()
+ }
+
+ fun stop() {
+ if (::server.isInitialized) {
+ server.stop()
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.scala b/leader-common/src/main/scala/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.scala
deleted file mode 100644
index a748690..0000000
--- a/leader-common/src/main/scala/org/apache/amaterasu/leader/common/execution/frameworks/FrameworkProvidersFactory.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.common.execution.frameworks
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
-import org.reflections.Reflections
-
-import scala.collection.JavaConversions._
-
-class FrameworkProvidersFactory {
- var providers: Map[String, FrameworkSetupProvider] = _
-
- def groups: Array[String] = {
- providers.keys.toArray
- }
-
- def getFramework(groupId: String): FrameworkSetupProvider = {
- providers(groupId)
- }
-
-}
-
-object FrameworkProvidersFactory extends Logging {
-
- def apply(env: String, config: ClusterConfig): FrameworkProvidersFactory = {
-
- val result = new FrameworkProvidersFactory()
-
- val reflections = new Reflections(getClass.getClassLoader)
- val runnerTypes = reflections.getSubTypesOf(classOf[FrameworkSetupProvider]).toSet
-
- result.providers = runnerTypes.map(r => {
-
- val provider = Manifest.classType(r).runtimeClass.newInstance.asInstanceOf[FrameworkSetupProvider]
-
- provider.init(env, config)
- log.info(s"a provider for group ${provider.getGroupIdentifier} was created")
- log.info(s"config = $config")
- (provider.getGroupIdentifier, provider)
-
- }).toMap
-
- result
- }
-}
\ No newline at end of file
diff --git a/leader/src/main/scripts/amaterasu.properties b/leader-common/src/main/scripts/amaterasu.properties
similarity index 100%
rename from leader/src/main/scripts/amaterasu.properties
rename to leader-common/src/main/scripts/amaterasu.properties
diff --git a/leader/src/main/scripts/log4j.properties b/leader-common/src/main/scripts/log4j.properties
similarity index 100%
rename from leader/src/main/scripts/log4j.properties
rename to leader-common/src/main/scripts/log4j.properties
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
index a7b2e47..35c4b0e 100644
--- a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/configuration/ConfigManagerTests.kt
@@ -16,7 +16,6 @@
*/
package org.apache.amaterasu.leader.common.configuration
-import com.uchuhimo.konf.source.yaml.toYaml
import org.apache.amaterasu.common.configuration.ConfigManager
import org.apache.amaterasu.common.configuration.Job
import org.jetbrains.spek.api.Spek
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/ActionStatusTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/ActionStatusTests.kt
new file mode 100644
index 0000000..c7c3d99
--- /dev/null
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/ActionStatusTests.kt
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.dsl
+
+import org.apache.amaterasu.common.configuration.enums.ActionStatus
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.leader.common.execution.actions.SequentialAction
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.test.TestingServer
+import org.apache.zookeeper.CreateMode
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import java.util.concurrent.LinkedBlockingQueue
+import kotlin.test.assertEquals
+import kotlin.test.assertNotNull
+
+class ActionStatusTests : Spek({
+
+ // setting up a testing zookeeper server (curator TestServer)
+ val retryPolicy = ExponentialBackoffRetry(1000, 3)
+ val server = TestingServer(2181, true)
+ val jobId = "job_${System.currentTimeMillis()}"
+
+
+ given("an action") {
+
+ val data = ActionData(ActionStatus.Pending, "test_action", "start.scala", "", "spark", "scala", "0000001", hashMapOf(), mutableListOf())
+ val queue = LinkedBlockingQueue<ActionData>()
+
+ val client = CuratorFrameworkFactory.newClient(server.connectString, retryPolicy)
+ client.start()
+
+ client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId")
+ val action = SequentialAction(data.name, data.src, "", data.groupId, data.typeId, mapOf(), jobId, queue, client, 1)
+
+ it("should queue it's ActionData int the job queue when executed") {
+ action.execute()
+ assertEquals(queue.peek().name, data.name)
+ assertEquals(queue.peek().src, data.src)
+ }
+
+ it("should also create a sequential znode for the task with the value of Queued") {
+ val taskStatus = client.data.forPath("/$jobId/task-0000000000")
+
+ assertNotNull(taskStatus)
+ assertEquals(String(taskStatus), "Queued")
+
+ }
+ }
+})
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtilTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtilTests.kt
index 3b9a229..db36abf 100644
--- a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtilTests.kt
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/GitUtilTests.kt
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.amaterasu.leader.common.dsl
import org.jetbrains.spek.api.Spek
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobExecutionTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobExecutionTests.kt
new file mode 100644
index 0000000..af3c9d9
--- /dev/null
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobExecutionTests.kt
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.dsl
+
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.test.TestingServer
+import org.apache.zookeeper.CreateMode
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import java.util.concurrent.LinkedBlockingQueue
+import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
+
+class JobExecutionTests : Spek({
+ val retryPolicy = ExponentialBackoffRetry(1000, 3)
+ val server = TestingServer(2183, true)
+ val client = CuratorFrameworkFactory.newClient(server.connectString, retryPolicy)
+ client.start()
+
+ val jobId = "job_${System.currentTimeMillis()}"
+
+ val yaml = this::class.java.getResource("/simple-maki.yml").readText()
+ val queue = LinkedBlockingQueue<ActionData>()
+ client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId")
+
+ given("a job parsed in to a JobManager") {
+ val job = JobParser.parse(jobId, yaml, queue, client, 1)
+
+ it("queue the first action when the JobManager.start method is called ") {
+ job.start()
+
+ assertEquals(queue.peek().name, "start")
+
+ // making sure that the status is reflected in zk
+ val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+ assertEquals(String(actionStatus), "Queued")
+ }
+
+ it("return the start action when calling getNextAction and dequeue it") {
+ assertEquals(job.nextActionData!!.name, "start")
+ assertEquals(queue.size, 0)
+ }
+
+ it("also changes the status of start to started") {
+ val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+ assertEquals(String(actionStatus), "Started")
+ }
+
+ it("is not out of actions when an action is still Pending") {
+ assertFalse { job.outOfActions }
+ }
+
+ it("will requeue a task when calling JobManager.requeueAction") {
+ job.requeueAction("0000000000")
+ val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+ assertEquals(String(actionStatus), "Queued")
+ }
+
+ it("will restart the task") {
+ val data = job.nextActionData
+ assertEquals(data!!.name, "start")
+
+ // making sure that the status is reflected in zk
+ val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+ assertEquals(String(actionStatus), "Started")
+ }
+
+ it("will mark the action as Complete when the actionComplete method is called") {
+ job.actionComplete("0000000000")
+ // making sure that the status is reflected in zk
+ val actionStatus = client.data.forPath("/$jobId/task-0000000000")
+
+ assertEquals(String(actionStatus), "Complete")
+ }
+
+ on("completion of start, the next action step2 is queued") {
+ assertEquals(queue.peek().name, "step2")
+ }
+
+ it("should also be registered as queued in zk") {
+ val actionStatus = client.data.forPath("/$jobId/task-0000000001")
+ assertEquals(String(actionStatus), "Queued")
+ }
+
+ it("is marked as Started when JobManager.nextActionData is called") {
+ val data = job.nextActionData
+ assertEquals(data!!.name, "step2")
+ }
+
+ it("will start an error action JobManager.actionFailed is called") {
+ job.actionFailed("0000000001", "test failure")
+
+ assertEquals(queue.peek().name, "error-action")
+ }
+
+ it("marks the error action as Complete when the actionComplete method is called"){
+ job.actionComplete("0000000001-error")
+
+ // making sure that the status is reflected in zk
+ val actionStatus = client.data.forPath("/$jobId/task-0000000001-error")
+ assertEquals(String(actionStatus) ,"Complete")
+ }
+
+ it("will be out of actions when all actions have been executed"){
+ assertTrue { job.outOfActions }
+ }
+ }
+
+})
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobParserTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobParserTests.kt
new file mode 100644
index 0000000..9e4f53b
--- /dev/null
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/dsl/JobParserTests.kt
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.dsl
+
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.test.TestingServer
+import org.apache.zookeeper.CreateMode
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import java.util.concurrent.LinkedBlockingQueue
+import kotlin.test.assertEquals
+
+class JobParserTests : Spek({
+
+ val retryPolicy = ExponentialBackoffRetry(1000, 3)
+ val server = TestingServer(2187, true)
+ val client = CuratorFrameworkFactory.newClient(server.connectString, retryPolicy)
+ client.start()
+
+ val jobId = "job_${System.currentTimeMillis()}"
+ val queue = LinkedBlockingQueue<ActionData>()
+
+ // this will be performed by the job bootstrapper
+ client.create().withMode(CreateMode.PERSISTENT).forPath("/$jobId")
+
+ given("a simple-maki.yaml that is passed to a JobParser") {
+
+ val yaml = this::class.java.getResource("/simple-maki.yml").readText()
+ val job = JobParser.parse(jobId, yaml, queue, client, 1)
+
+ it("parse the job details correctly") {
+ assertEquals(job.name, "amaterasu-test")
+ }
+
+ it("also have two actions in the right order") {
+
+ assertEquals(job.registeredActions.size, 3)
+
+ assertEquals(job.registeredActions["0000000000"]!!.data.name, "start")
+ assertEquals(job.registeredActions["0000000001"]!!.data.name, "step2")
+ assertEquals(job.registeredActions["0000000001-error"]!!.data.name, "error-action")
+
+ }
+
+ it("also also parse action 'config' successfully") {
+ assertEquals(job.registeredActions["0000000000"]!!.data.config, "start-cfg.yaml")
+ assertEquals(job.registeredActions["0000000001"]!!.data.config, "step2-cfg.yaml")
+ assertEquals(job.registeredActions["0000000001-error"]!!.data.config, "error-cfg.yaml")
+ }
+ }
+})
\ No newline at end of file
diff --git a/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/utilities/HttpServerTests.kt b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/utilities/HttpServerTests.kt
new file mode 100644
index 0000000..c79a819
--- /dev/null
+++ b/leader-common/src/test/kotlin/org/apache/amaterasu/leader/common/utilities/HttpServerTests.kt
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.common.utilities
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jsoup.Jsoup
+import kotlin.test.assertEquals
+
+class HttpServerTests : Spek({
+ val resources = this::class.java.getResource("/jetty-test-data.txt").path.removeSuffix("/jetty-test-data.txt")
+
+ given("an Amaterasu Web server is started") {
+ val server = HttpServer()
+ server.start("8002", resources)
+
+ it("serve content and stop successfully") {
+ lateinit var data: String
+ try {
+ val html = Jsoup.connect("http://localhost:8002/jetty-test-data.txt").get().select("body").text()
+ data = html.toString()
+ } finally {
+ server.stop()
+ }
+ assertEquals(data, "This is a test file to download from Jetty webserver")
+ }
+ }
+
+ given("an Amaterasu Web server is started pointing to a sub dir with two files") {
+ val server = HttpServer()
+ server.start("8001", "$resources/dist")
+
+ it("list the files correctly") {
+ try {
+ val urls = server.getFilesInDirectory("localhost", "8001", "")
+ assertEquals(urls.size, 2)
+ } finally {
+ server.stop()
+ }
+ }
+ }
+})
\ No newline at end of file
diff --git a/leader/src/test/resources/dist/jetty-fileserver-test-file-1.txt b/leader-common/src/test/resources/dist/jetty-fileserver-test-file-1.txt
similarity index 100%
rename from leader/src/test/resources/dist/jetty-fileserver-test-file-1.txt
rename to leader-common/src/test/resources/dist/jetty-fileserver-test-file-1.txt
diff --git a/leader/src/test/resources/dist/jetty-fileserver-test-file-2.txt b/leader-common/src/test/resources/dist/jetty-fileserver-test-file-2.txt
similarity index 100%
rename from leader/src/test/resources/dist/jetty-fileserver-test-file-2.txt
rename to leader-common/src/test/resources/dist/jetty-fileserver-test-file-2.txt
diff --git a/leader/src/test/resources/jetty-test-data.txt b/leader-common/src/test/resources/jetty-test-data.txt
similarity index 100%
rename from leader/src/test/resources/jetty-test-data.txt
rename to leader-common/src/test/resources/jetty-test-data.txt
diff --git a/leader/src/test/resources/simple-maki.yml b/leader-common/src/test/resources/simple-maki.yml
similarity index 100%
rename from leader/src/test/resources/simple-maki.yml
rename to leader-common/src/test/resources/simple-maki.yml
diff --git a/leader-mesos/build.gradle b/leader-mesos/build.gradle
index cb08d6a..e0ade9f 100644
--- a/leader-mesos/build.gradle
+++ b/leader-mesos/build.gradle
@@ -14,7 +14,93 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+buildscript {
+ repositories {
+ mavenCentral()
+ maven {
+ url 'http://repository.jetbrains.com/all'
+ }
+ maven {
+ url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots"
+ }
+ }
+ dependencies {
+ classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
+ classpath 'org.junit.platform:junit-platform-gradle-plugin:1.0.0'
+ }
+}
+plugins {
+ id "com.github.johnrengelman.shadow" version "2.0.4"
+}
+apply plugin: 'kotlin'
+apply plugin: 'org.junit.platform.gradle.plugin'
+
+junitPlatform {
+ filters {
+ engines {
+ include 'spek'
+ }
+ }
+}
+
+sourceCompatibility = 1.8
+targetCompatibility = 1.8
+
+shadowJar {
+ zip64 true
+}
+
+repositories {
+ maven { url "https://plugins.gradle.org/m2/" }
+ maven { url 'http://repository.jetbrains.com/all' }
+ maven { url "https://jetbrains.jfrog.io/jetbrains/spek-snapshots" }
+ maven { url "http://dl.bintray.com/jetbrains/spek" }
+ maven { url "http://oss.jfrog.org/artifactory/oss-snapshot-local" }
+
+ mavenCentral()
+ jcenter()
+}
+
+dependencies {
+ compile project(':leader-common')
+ compile project(':amaterasu-sdk')
+
+ compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version"
+ compile "org.jetbrains.kotlin:kotlin-reflect"
+ compile group: 'org.jetbrains.kotlinx', name: 'kotlinx-coroutines-core', version: '1.1.1'
+
+ testCompile 'org.jetbrains.spek:spek-api:1.1.5'
+ testCompile "org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version"
+ testRuntime 'org.jetbrains.spek:spek-junit-platform-engine:1.1.5'
+
+ // Spek requires kotlin-reflect, can be omitted if already in the classpath
+ testRuntimeOnly "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version"
+}
+
+task copyToHomeRoot(type: Copy) {
+ from 'src/main/scripts'
+ into '../build/amaterasu/'
+}
+
+task copyToHomeBin(type: Copy) {
+ dependsOn shadowJar
+ from 'build/libs'
+ into '../build/amaterasu/bin'
+}
+
+task copyToHome() {
+ dependsOn copyToHomeRoot
+ dependsOn copyToHomeBin
+}
+
+compileKotlin{
+ kotlinOptions.jvmTarget = "1.8"
+}
+
+compileTestKotlin {
+ kotlinOptions.jvmTarget = "1.8"
+}
diff --git a/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/Client.kt b/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/Client.kt
new file mode 100644
index 0000000..d2209cf
--- /dev/null
+++ b/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/Client.kt
@@ -0,0 +1,8 @@
+package org.apache.amaterasu.leader.mesos
+
+object Client {
+
+ @Throws(Exception::class)
+ @JvmStatic
+ fun main(args: Array<String>) = ClientArgsParser().main(args)
+}
\ No newline at end of file
diff --git a/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/ClientArgsParser.kt b/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/ClientArgsParser.kt
new file mode 100644
index 0000000..6b9a73e
--- /dev/null
+++ b/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/ClientArgsParser.kt
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.mesos
+
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.leader.common.launcher.AmaOpts
+import org.apache.amaterasu.leader.common.launcher.ArgsParser
+import org.apache.log4j.LogManager
+import org.apache.mesos.MesosSchedulerDriver
+import org.apache.mesos.Protos
+import java.io.FileInputStream
+
+class ClientArgsParser : ArgsParser() {
+
+ override fun run() {
+
+ val opts = AmaOpts(repo, branch, env, name, jobId, newJobId, report, home)
+
+ val config = ClusterConfig.apply(FileInputStream("${opts.home}/amaterasu.properties"))
+ val resume = opts.jobId.isNotEmpty()
+
+ LogManager.resetConfiguration()
+
+ val frameworkBuilder = Protos.FrameworkInfo.newBuilder()
+ .setName("${opts.name} - Amaterasu Job")
+ .setFailoverTimeout(config.timeout())
+ .setUser(config.user())
+
+ if (resume) {
+ frameworkBuilder.setId(Protos.FrameworkID.newBuilder().setValue(opts.jobId))
+ }
+
+ val framework = frameworkBuilder.build()
+
+ val masterAddress = "${config.master()}:${config.masterPort()}"
+
+ val scheduler = JobScheduler(
+ opts.repo,
+ opts.branch,
+ opts.userName,
+ opts.password,
+ opts.env,
+ resume,
+ config,
+ opts.report,
+ opts.home
+ )
+
+ val driver = MesosSchedulerDriver(scheduler, framework, masterAddress)
+
+ driver.run()
+
+ }
+}
+
diff --git a/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/JobScheduler.kt b/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/JobScheduler.kt
new file mode 100644
index 0000000..f19d0fa
--- /dev/null
+++ b/leader-mesos/src/main/kotlin/org/apache/amaterasu/leader/mesos/JobScheduler.kt
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.leader.mesos
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
+import com.fasterxml.jackson.module.kotlin.readValue
+import com.fasterxml.jackson.module.kotlin.registerKotlinModule
+import org.apache.amaterasu.common.configuration.ClusterConfig
+import org.apache.amaterasu.common.configuration.ConfigManager
+import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.execution.actions.Notification
+import org.apache.amaterasu.common.execution.actions.enums.NotificationLevel
+import org.apache.amaterasu.common.execution.actions.enums.NotificationType
+import org.apache.amaterasu.common.logging.KLogging
+import org.apache.amaterasu.leader.common.execution.JobLoader
+import org.apache.amaterasu.leader.common.execution.JobManager
+import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
+import org.apache.amaterasu.leader.common.utilities.ActiveReportListener
+import org.apache.amaterasu.leader.common.utilities.DataLoader
+import org.apache.amaterasu.leader.common.utilities.HttpServer
+import org.apache.amaterasu.sdk.frameworks.FrameworkSetupProvider
+import org.apache.amaterasu.sdk.frameworks.RunnerSetupProvider
+import org.apache.commons.io.FileUtils
+import org.apache.curator.framework.CuratorFramework
+import org.apache.curator.framework.CuratorFrameworkFactory
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.log4j.LogManager
+import org.apache.mesos.Protos
+import org.apache.mesos.Scheduler
+import org.apache.mesos.SchedulerDriver
+import org.apache.mesos.protobuf.ByteString
+import java.io.File
+import java.nio.file.Path
+import java.nio.file.Paths
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.LinkedBlockingQueue
+
+class JobScheduler(private val src: String,
+ private val branch: String,
+ private val username: String,
+ private val password: String,
+ private val env: String,
+ private val resume: Boolean,
+ private val config: ClusterConfig,
+ private val report: String,
+ private val home: String) : Scheduler, KLogging() {
+
+ private var client: CuratorFramework
+ private val server = HttpServer()
+ private val listener = ActiveReportListener()
+ private lateinit var jobManager: JobManager
+ private val mapper = ObjectMapper().registerKotlinModule()
+ private var reportLevel: NotificationLevel
+ private val offersToTaskIds = ConcurrentHashMap<String, String>()
+ private val taskIdsToActions = ConcurrentHashMap<Protos.TaskID, String>()
+ private lateinit var frameworkFactory: FrameworkProvidersFactory
+ private lateinit var configManager: ConfigManager
+ private val yamlMapper = ObjectMapper(YAMLFactory()).registerKotlinModule()
+
+ private val jarFile = File(this::class.java.protectionDomain.codeSource.location.path)
+ private val amaDist = File("${File(jarFile.parent).parent}/dist")
+
+
+ init {
+ LogManager.resetConfiguration()
+ server.start(config.webserver().Port(), "$home/${config.webserver().Root()}")
+
+ reportLevel = NotificationLevel.valueOf(report.capitalize())
+ val retryPolicy = ExponentialBackoffRetry(1000, 3)
+ client = CuratorFrameworkFactory.newClient(config.zk(), retryPolicy)
+ client.start()
+ }
+
+ /**
+ * creates a working dir for a job
+ * @param jobId: the id of the job (will be used as the jobs directory name as well)
+ */
+ private fun createJobDir(jobId: String) {
+ val amaHome = File(jarFile.parent).parent
+ val jobDir = "$amaHome/dist/$jobId/"
+
+ val dir = File(jobDir)
+ if (!dir.exists()) {
+ dir.mkdir()
+ }
+ }
+
+ override fun registered(driver: SchedulerDriver?, frameworkId: Protos.FrameworkID?, masterInfo: Protos.MasterInfo?) {
+ jobManager = if (!resume) {
+ JobLoader.loadJob(src,
+ branch,
+ frameworkId!!.value,
+ username,
+ password,
+ client,
+ config.jobs().tasks().attempts(),
+ LinkedBlockingQueue<ActionData>())
+ } else {
+ JobLoader.reloadJob(frameworkId!!.value,
+ username,
+ password,
+ client,
+ config.jobs().tasks().attempts(),
+ LinkedBlockingQueue<ActionData>())
+ }
+
+ jobManager.start()
+ createJobDir(jobManager.jobId)
+ }
+
+ override fun disconnected(driver: SchedulerDriver?) {}
+
+ override fun reregistered(driver: SchedulerDriver?, masterInfo: Protos.MasterInfo?) {}
+
+ override fun error(driver: SchedulerDriver?, message: String?) {
+ log.error(message)
+ }
+
+ override fun slaveLost(driver: SchedulerDriver?, slaveId: Protos.SlaveID?) {}
+
+ override fun executorLost(driver: SchedulerDriver?, executorId: Protos.ExecutorID?, slaveId: Protos.SlaveID?, status: Int) {}
+
+ override fun statusUpdate(driver: SchedulerDriver?, status: Protos.TaskStatus?) {
+ status?.let {
+
+ val actionName = taskIdsToActions[it.taskId]
+ when (it.state) {
+ Protos.TaskState.TASK_STARTING -> log.info("Task starting ...")
+ Protos.TaskState.TASK_RUNNING -> {
+ jobManager.actionStarted(it.taskId.value)
+ listener.printNotification(Notification("", "created container for $actionName created", NotificationType.Info, NotificationLevel.Execution))
+ }
+ Protos.TaskState.TASK_FINISHED -> {
+ jobManager.actionComplete(it.taskId.value)
+ listener.printNotification(Notification("", "Container ${it.executorId.value} Complete with task ${it.taskId.value} with success.", NotificationType.Info, NotificationLevel.Execution))
+ }
+ Protos.TaskState.TASK_FAILED,
+ Protos.TaskState.TASK_KILLED,
+ Protos.TaskState.TASK_ERROR,
+ Protos.TaskState.TASK_LOST -> {
+ jobManager.actionFailed(it.taskId.value, it.message)
+ listener.printNotification(Notification("", "error launching container with ${it.message} in ${it.data.toStringUtf8()}", NotificationType.Error, NotificationLevel.Execution))
+ }
+ else -> log.warn("WTF? just got unexpected task state: " + it.state)
+ }
+ }
+ }
+
+ override fun frameworkMessage(driver: SchedulerDriver?, executorId: Protos.ExecutorID?, slaveId: Protos.SlaveID?, data: ByteArray?) {
+
+ data?.let {
+ val notification: Notification = mapper.readValue(it)
+
+ when (reportLevel) {
+ NotificationLevel.Code -> listener.printNotification(notification)
+ NotificationLevel.Execution ->
+ if (notification.notLevel != NotificationLevel.Code)
+ listener.printNotification(notification)
+ }
+ }
+ }
+
+ override fun resourceOffers(driver: SchedulerDriver?, offers: MutableList<Protos.Offer>?) {
+ offers?.forEach {
+ when {
+ validateOffer(it) -> synchronized(jobManager) {
+ val actionData = jobManager.nextActionData
+
+ // if we got a new action to process
+ actionData?.let { ad ->
+
+ frameworkFactory = FrameworkProvidersFactory(env, config)
+ val items = frameworkFactory.providers.values.flatMap { x -> x.configurationItems }
+ configManager = ConfigManager(env, "repo", items)
+
+ val taskId = Protos.TaskID.newBuilder().setValue(actionData.id).build()
+ taskIdsToActions[taskId] = actionData.name
+
+ createTaskConfiguration(ad)
+ listener.printNotification(Notification("", "looking for ${actionData.groupId} provider", NotificationType.Info, NotificationLevel.Execution))
+ val frameworkProvider = frameworkFactory.providers[actionData.groupId]
+ listener.printNotification(Notification("", "found $frameworkProvider provider", NotificationType.Info, NotificationLevel.Execution))
+
+ val runnerProvider = frameworkProvider!!.getRunnerProvider(actionData.typeId)
+
+ listener.printNotification(Notification("", "provider ${runnerProvider::class.qualifiedName}", NotificationType.Info, NotificationLevel.Execution))
+ val execData = DataLoader.getExecutorDataBytes(env, config)
+ val executorId = taskId.value + "-" + UUID.randomUUID()
+
+ val envConf = configManager.getActionConfiguration(ad.name, ad.config)
+ val commandStr = runnerProvider.getCommand(jobManager.jobId, actionData, envConf, executorId, "")
+ listener.printNotification(Notification("", "container command $commandStr", NotificationType.Info, NotificationLevel.Execution))
+
+ val command = Protos.CommandInfo
+ .newBuilder()
+ .setValue(commandStr)
+
+ // setting the container's resources
+ val resources = setupContainerResources(frameworkProvider, runnerProvider, actionData)
+ resources.forEach { r ->
+ command.addUris(Protos.CommandInfo.URI.newBuilder()
+ .setValue(r)
+ .setExecutable(false)
+ .setExtract(true)
+ .build())
+ }
+
+ // setting the container's executable
+ val executable: Path = getExecutable(runnerProvider, actionData)
+ command.addUris(Protos.CommandInfo.URI.newBuilder()
+ .setValue(toServingUri(executable.toString()))
+ .setExecutable(false)
+ .setExtract(false)
+ .build())
+
+ // setting the processes environment variables
+ val envVarsList = frameworkProvider.environmentVariables.map { e ->
+ Protos.Environment.Variable.newBuilder()
+ .setName(e.key)
+ .setValue(e.value)
+ .build()
+ }
+ command.setEnvironment(Protos.Environment.newBuilder().addAllVariables(envVarsList))
+
+ val driverConfiguration = frameworkProvider.getDriverConfiguration(configManager)
+
+ var actionTask: Protos.TaskInfo = if (runnerProvider.hasExecutor) {
+ val executor = Protos.ExecutorInfo
+ .newBuilder()
+ .setData(ByteString.copyFrom(execData))
+ .setName(taskId.value)
+ .setExecutorId(Protos.ExecutorID.newBuilder().setValue(executorId))
+ .setCommand(command)
+ .build()
+
+ Protos.TaskInfo.newBuilder()
+ .setName(taskId.value)
+ .setTaskId(taskId)
+ .setExecutor(executor)
+ .setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env)))
+ .addResources(createScalarResource("cpus", driverConfiguration.cpus.toDouble()))
+ .addResources(createScalarResource("mem", driverConfiguration.memory.toDouble()))
+ .addResources(createScalarResource("disk", config.jobs().repoSize().toDouble()))
+ .setSlaveId(it.slaveId)
+ .build()
+ //slavesExecutors.put(offer.getSlaveId.getValue, executor)
+ } else {
+ Protos.TaskInfo.newBuilder()
+ .setName(taskId.value)
+ .setTaskId(taskId)
+ .setCommand(command)
+ .addResources(createScalarResource("cpus", driverConfiguration.cpus.toDouble()))
+ .addResources(createScalarResource("mem", driverConfiguration.memory.toDouble()))
+ .addResources(createScalarResource("disk", config.jobs().repoSize().toDouble()))
+ .setSlaveId(it.slaveId)
+ .build()
+ }
+
+ listener.printNotification(Notification("", "requesting container for ${actionData.name}", NotificationType.Info, NotificationLevel.Execution))
+ driver?.launchTasks(Collections.singleton(it.id), listOf(actionTask))
+
+ } ?: run {
+ if (jobManager.outOfActions) {
+ log.info("framework ${jobManager.jobId} execution finished")
+
+ val repo = File("repo/")
+ repo.delete()
+
+ server.stop()
+ driver?.declineOffer(it.id)
+ driver?.stop()
+ System.exit(0)
+ } else {
+ log.info("Declining offer, no sufficient resources")
+ driver!!.declineOffer(it.id)
+ }
+ }
+ }
+
+ }
+ }
+ }
+
+ private fun getExecutable(runnerProvider: RunnerSetupProvider, actionData: ActionData): Path {
+ // setting up action executable
+ val sourcePath = File(runnerProvider.getActionExecutable(jobManager.jobId, actionData))
+ val executable: Path = if (actionData.hasArtifact) {
+ val relativePath = amaDist.toPath().root.relativize(sourcePath.toPath())
+ relativePath.subpath(amaDist.toPath().nameCount, relativePath.nameCount)
+ } else {
+ val dest = File("dist/${jobManager.jobId}/$sourcePath")
+ FileUtils.copyFile(sourcePath, dest)
+ Paths.get(jobManager.jobId, sourcePath.toPath().toString())
+ }
+ return executable
+ }
+
+ override fun offerRescinded(driver: SchedulerDriver?, offerId: Protos.OfferID?) {
+ offerId?.let {
+ val actionId = offersToTaskIds[it.value]
+ jobManager.requeueAction(actionId!!)
+ }
+ }
+
+ private fun validateOffer(offer: Protos.Offer): Boolean {
+ val resources = offer.resourcesList
+
+ return resources.count { it.name == "cpus" && it.scalar.value >= config.jobs().tasks().cpus() } > 0 &&
+ resources.count { it.name == "mem" && it.scalar.value >= config.jobs().tasks().mem() } > 0
+ }
+
+ private fun createTaskConfiguration(actionData: ActionData) {
+
+ // setting up the configuration files for the container
+ val envYaml = configManager.getActionConfigContent(actionData.name, actionData.config)
+ writeConfigFile(envYaml, jobManager.jobId, actionData.name, "env.yaml")
+
+ val dataStores = DataLoader.getTaskData(actionData, env).exports
+ val dataStoresYaml = yamlMapper.writeValueAsString(dataStores)
+ writeConfigFile(dataStoresYaml, jobManager.jobId, actionData.name, "datastores.yaml")
+
+ val datasets = DataLoader.getDatasets(env)
+ writeConfigFile(datasets, jobManager.jobId, actionData.name, "datasets.yaml")
+
+ writeConfigFile("jobId: ${jobManager.jobId}\nactionName: ${actionData.name}", jobManager.jobId, actionData.name, "runtime.yaml")
+
+ }
+
+ private fun writeConfigFile(configuration: String, jobId: String, actionName: String, fileName: String) {
+ val jarFile = File(this::class.java.protectionDomain.codeSource.location.path)
+ val amaHome = File(jarFile.parent).parent
+ val envLocation = "$amaHome/dist/$jobId/$actionName/"
+
+ val dir = File(envLocation)
+ if (!dir.exists()) {
+ dir.mkdirs()
+ }
+
+ File("$envLocation/$fileName").writeText(configuration)
+ }
+
+ private fun setupContainerResources(framework: FrameworkSetupProvider, runnerProvider: RunnerSetupProvider, actionData: ActionData): List<String> {
+ val result = mutableListOf<String>()
+
+ result.addAll(framework.groupResources.map { toServingUri(it.name) })
+ result.addAll(runnerProvider.runnerResources.map { toServingUri(it) })
+
+ val actionDeps = runnerProvider.getActionDependencies(jobManager.jobId, actionData).map {
+ FileUtils.copyFile(File(it), File("dist/$it"))
+ toServingUri(it)
+ }
+ result.addAll(actionDeps)
+
+ val actionResources = runnerProvider.getActionResources(jobManager.jobId, actionData).map { toServingUri(it) }
+ result.addAll(actionResources
+ )
+ return result
+ }
+
+ private fun toServingUri(file: String): String = "http://${System.getenv("AMA_NODE")}:${config.webserver().Port()}/$file"
+
+ private fun createScalarResource(name: String, value: Double): Protos.Resource {
+ return Protos.Resource.newBuilder()
+ .setName(name)
+ .setType(Protos.Value.Type.SCALAR)
+ .setScalar(Protos.Value.Scalar.newBuilder().setValue(value)).build()
+ }
+}
\ No newline at end of file
diff --git a/leader/src/main/scripts/ama-start-mesos.sh b/leader-mesos/src/main/scripts/ama-start-mesos.sh
similarity index 98%
rename from leader/src/main/scripts/ama-start-mesos.sh
rename to leader-mesos/src/main/scripts/ama-start-mesos.sh
index f12ad5c..5b9e59c 100755
--- a/leader/src/main/scripts/ama-start-mesos.sh
+++ b/leader-mesos/src/main/scripts/ama-start-mesos.sh
@@ -108,7 +108,7 @@
done
echo "repo: ${REPO} "
-CMD="java -cp ${CP} -Djava.library.path=/usr/lib org.apache.amaterasu.leader.mesos.MesosJobLauncher --home ${BASEDIR}"
+CMD="java -cp ${CP} -Djava.library.path=/usr/lib org.apache.amaterasu.leader.mesos.Client --home ${BASEDIR}"
if [ -n "$REPO" ]; then
CMD+=" --repo ${REPO}"
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
index e89345b..b94135b 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -136,8 +136,8 @@
val items = mutableListOf<FrameworkSetupProvider>()
- for (p in frameworkFactory.providers().values()) {
- items.add(p)
+ for (provider in frameworkFactory.providers.values) {
+ items.add(provider)
}
val configItems = items.flatMap { it.configurationItems.asIterable() }
configManager = ConfigManager(env, "repo", configItems)
@@ -178,7 +178,7 @@
private fun initJob(opts: AmaOpts) {
this.env = opts.env
- frameworkFactory = FrameworkProvidersFactory.apply(env, config)
+ frameworkFactory = FrameworkProvidersFactory(env, config)
try {
val retryPolicy = ExponentialBackoffRetry(1000, 3)
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt
index 0a0e8d7..ba5b3e6 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/Client.kt
@@ -138,8 +138,8 @@
}
// setup frameworks
- val frameworkFactory = FrameworkProvidersFactory.apply(opts.env, config)
- for (group in frameworkFactory.groups()) {
+ val frameworkFactory = FrameworkProvidersFactory(opts.env, config)
+ for (group in frameworkFactory.groups) {
val framework = frameworkFactory.getFramework(group)
for (file in framework.groupResources) {
if (file.exists())
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
index b9f1e67..bc3c321 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ClientArgsParser.kt
@@ -22,7 +22,7 @@
class ClientArgsParser(val args: Array<String>): ArgsParser() {
override fun run() {
- var opts = AmaOpts(repo, branch, env, name, jobId, newJobId, report, home)
+ val opts = AmaOpts(repo, branch, env, name, jobId, newJobId, report, home)
val client = Client()
client.run(opts, args)
}
diff --git a/leader/src/main/scripts/ama-start-yarn.sh b/leader-yarn/src/main/scripts/ama-start-yarn.sh
similarity index 100%
rename from leader/src/main/scripts/ama-start-yarn.sh
rename to leader-yarn/src/main/scripts/ama-start-yarn.sh
diff --git a/leader/build.gradle b/leader/build.gradle
deleted file mode 100644
index fd545c6..0000000
--- a/leader/build.gradle
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-plugins {
- id "com.github.johnrengelman.shadow" version "1.2.4"
- id 'com.github.maiflai.scalatest' version '0.22'
- id 'scala'
- id 'org.jetbrains.kotlin.jvm'
- id 'java'
-}
-
-sourceCompatibility = 1.8
-targetCompatibility = 1.8
-
-shadowJar {
- zip64 true
-}
-
-repositories {
- maven {
- url "https://plugins.gradle.org/m2/"
- }
- mavenCentral()
-}
-
-dependencies {
- compile 'org.scala-lang:scala-library:2.11.8'
- compile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.8'
-
- compile project(':common')
- compile project(':leader-common')
- compile project(':amaterasu-sdk')
- compile group: 'com.github.scopt', name: 'scopt_2.11', version: '3.3.0'
- compile group: 'com.github.nscala-time', name: 'nscala-time_2.11', version: '2.2.0'
- compile group: 'org.apache.curator', name: 'curator-framework', version: '2.13.0'
- compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.11', version: '2.9.4'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.8'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.8'
- compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.8'
- compile group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: '2.9.8'
- compile group: 'org.eclipse.jetty', name: 'jetty-plus', version: '9.4.14.v20181114'
- compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.4.14.v20181114'
- compile group: 'org.eclipse.jetty', name: 'jetty-http', version: '9.4.14.v20181114'
- compile group: 'org.eclipse.jetty', name: 'jetty-io', version: '9.4.14.v20181114'
- compile group: 'org.eclipse.jetty', name: 'jetty-servlet', version: '9.4.14.v20181114'
- compile group: 'org.eclipse.jetty.toolchain', name: 'jetty-test-helper', version: '4.0'
- compile group: 'org.yaml', name: 'snakeyaml', version: '1.23'
- compile group: 'commons-cli', name: 'commons-cli', version: '1.2'
- compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
- compile group: 'org.scala-lang.modules', name: 'scala-async_2.11', version: '0.9.6'
- compile group: 'org.jsoup', name: 'jsoup', version: '1.10.2'
- compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
- compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
- compile "org.jetbrains.kotlin:kotlin-reflect"
-
- testCompile project(':common')
- testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
- testRuntime 'org.pegdown:pegdown:1.1.0'
- testCompile 'junit:junit:4.11'
- testCompile 'org.scalatest:scalatest_2.11:3.0.2'
- testCompile 'org.scala-lang:scala-library:2.11.8'
- testCompile 'org.apache.curator:curator-test:2.13.0'
-
-}
-
-
-sourceSets {
- test {
- resources.srcDirs += [file('src/test/resources')]
- }
-
- // this is done so Scala will compile before Java
- main {
- kotlin {
- srcDirs = ['src/main/kotlin']
- }
- scala {
- srcDirs = ['src/main/kotlin','src/main/java', 'src/main/scala']
- }
- java {
- srcDirs = ['src/main/java']
- }
- }
-}
-
-test {
- maxParallelForks = 1
-}
-
-task copyToHomeRoot(type: Copy) {
- from 'src/main/scripts'
- into '../build/amaterasu/'
-}
-
-task copyToHomeBin(type: Copy) {
- dependsOn shadowJar
- from 'build/libs'
- into '../build/amaterasu/bin'
-}
-
-task copyToHome() {
- dependsOn copyToHomeRoot
- dependsOn copyToHomeBin
-}
-
-compileKotlin{
- kotlinOptions.jvmTarget = "1.8"
-}
-
-compileTestKotlin {
- kotlinOptions.jvmTarget = "1.8"
-}
-
-compileTestScala {
- dependsOn compileScala
-}
-
-compileScala {
- dependsOn compileJava
- classpath += files(compileJava.destinationDir) + files(compileKotlin.destinationDir)
-}
-
-compileJava {
- dependsOn compileKotlin
-}
\ No newline at end of file
diff --git a/leader/src/main/resources/log4j.properties b/leader/src/main/resources/log4j.properties
deleted file mode 100644
index 7ba668f..0000000
--- a/leader/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Root logger option
-log4j.rootLogger=DEBUG, stdout, file
-
-# Redirect log messages to console
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.out
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
deleted file mode 100755
index 98d2afa..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/MesosJobLauncher.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.mesos
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.leader.mesos.schedulers.JobScheduler
-import org.apache.amaterasu.leader.utilities.{Args, BaseJobLauncher}
-import org.apache.log4j.LogManager
-import org.apache.mesos.Protos.FrameworkID
-import org.apache.mesos.{MesosSchedulerDriver, Protos}
-
-/**
- * The JobLauncher allows the execution of a single job, without creating a full
- * Amaterasu cluster (no cluster scheduler).
- */
-object MesosJobLauncher extends BaseJobLauncher {
-
- override def run(arguments: Args, config: ClusterConfig, resume: Boolean): Unit = {
- LogManager.resetConfiguration()
- val frameworkBuilder = Protos.FrameworkInfo.newBuilder()
- .setName(s"${arguments.name} - Amaterasu Job")
- .setFailoverTimeout(config.timeout)
- .setUser(config.user)
-
- // TODO: test this
- if (resume) {
- frameworkBuilder.setId(FrameworkID.newBuilder().setValue(arguments.jobId))
- }
-
- val framework = frameworkBuilder.build()
-
- val masterAddress = s"${config.master}:${config.masterPort}"
-
- val scheduler = JobScheduler(
- arguments.repo,
- arguments.branch,
- arguments.username,
- arguments.password,
- arguments.env,
- resume,
- config,
- arguments.report,
- arguments.home
- )
-
- val driver = new MesosSchedulerDriver(scheduler, framework, masterAddress)
-
- log.debug(s"Connecting to master on: $masterAddress")
- driver.run()
- }
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala
deleted file mode 100755
index 2adff07..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/executors/JobExecutor.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.mesos.executors
-
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.mesos.Protos._
-import org.apache.mesos.{Executor, ExecutorDriver}
-
-object JobExecutor extends Logging with Executor {
-
- override def shutdown(driver: ExecutorDriver): Unit = {}
-
- override def disconnected(driver: ExecutorDriver): Unit = {}
-
- override def killTask(driver: ExecutorDriver, taskId: TaskID): Unit = {}
-
- override def reregistered(driver: ExecutorDriver, slaveInfo: SlaveInfo): Unit = {}
-
- override def error(driver: ExecutorDriver, message: String): Unit = {}
-
- override def frameworkMessage(driver: ExecutorDriver, data: Array[Byte]): Unit = {}
-
- override def registered(driver: ExecutorDriver, executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo): Unit = {}
-
- override def launchTask(driver: ExecutorDriver, task: TaskInfo): Unit = {
-
- //val data = mapper.readValue(task.getData.toStringUtf8, JobData.getClass)
-
- }
-
- def main(args: Array[String]) {
-
- val repo = args(0)
-
- }
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala
deleted file mode 100755
index 13b2413..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/AmaterasuScheduler.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.mesos.schedulers
-
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.mesos.Protos.{Resource, Value}
-import org.apache.mesos.Scheduler
-
-trait AmaterasuScheduler extends Logging with Scheduler {
-
- def createScalarResource(name: String, value: Double): Resource = {
- Resource.newBuilder
- .setName(name)
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(value)).build()
- }
-
-}
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
deleted file mode 100755
index 866d9e2..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ /dev/null
@@ -1,526 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.mesos.schedulers
-
-import java.io.{File, PrintWriter, StringWriter}
-import java.nio.file.{Files, Path, Paths, StandardCopyOption}
-import java.util
-import java.util.{Collections, UUID}
-import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
-
-import com.fasterxml.jackson.databind.ObjectMapper
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
-import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.amaterasu.common.configuration.{ClusterConfig, ConfigManager}
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.common.execution.actions.Notification
-import org.apache.amaterasu.common.execution.actions.enums.{NotificationLevel, NotificationType}
-import org.apache.amaterasu.leader.common.execution.{JobLoader, JobManager}
-import org.apache.amaterasu.leader.common.execution.frameworks.FrameworkProvidersFactory
-import org.apache.amaterasu.leader.common.utilities.DataLoader
-import org.apache.amaterasu.leader.utilities.HttpServer
-import org.apache.commons.io.FileUtils
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.log4j.LogManager
-import org.apache.mesos.Protos.CommandInfo.URI
-import org.apache.mesos.Protos.Environment.Variable
-import org.apache.mesos.Protos._
-import org.apache.mesos.protobuf.ByteString
-import org.apache.mesos.{Protos, SchedulerDriver}
-
-import scala.collection.JavaConverters._
-import scala.collection.concurrent
-import scala.collection.concurrent.TrieMap
-
-/**
- * The JobScheduler is a Mesos implementation. It is in charge of scheduling the execution of
- * Amaterasu actions for a specific job
- */
-class JobScheduler extends AmaterasuScheduler {
-
- /*private val props: Properties = new Properties(new File(""))
- private val version = props.getProperty("version")
- println(s"===> version $version")*/
- LogManager.resetConfiguration()
- private var frameworkFactory: FrameworkProvidersFactory = _
- private var configManager: ConfigManager = _
- private var jobManager: JobManager = _
- private var client: CuratorFramework = _
- private var config: ClusterConfig = _
- private var src: String = _
- private var env: String = _
- private var branch: String = _
- private var userName: String = _
- private var password: String = _
- private var resume: Boolean = false
- private var reportLevel: NotificationLevel = _
-
- private val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
- private val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist")
-
- val slavesExecutors = new TrieMap[String, ExecutorInfo]
- private var awsEnv: String = ""
-
- // this map holds the following structure:
- // slaveId
- // |
- // +-> taskId, actionStatus)
- private val executionMap: concurrent.Map[String, concurrent.Map[String, ActionStatus]] = new ConcurrentHashMap[String, concurrent.Map[String, ActionStatus]].asScala
- private val lock = new ReentrantLock()
- private val offersToTaskIds: concurrent.Map[String, String] = new ConcurrentHashMap[String, String].asScala
- private val taskIdsToActions: concurrent.Map[Protos.TaskID, String] = new ConcurrentHashMap[Protos.TaskID, String].asScala
-
- private val mapper = new ObjectMapper()
- mapper.registerModule(DefaultScalaModule)
-
- private val yamlMapper = new ObjectMapper(new YAMLFactory())
- yamlMapper.registerModule(DefaultScalaModule)
-
- def error(driver: SchedulerDriver, message: String): Unit = {
- log.error(s"===> $message")
- }
-
- def executorLost(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, status: Int) {}
-
- def slaveLost(driver: SchedulerDriver, slaveId: SlaveID) {}
-
- def disconnected(driver: SchedulerDriver) {}
-
- def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]): Unit = {
-
- val notification = mapper.readValue(data, classOf[Notification])
-
- reportLevel match {
- case NotificationLevel.Code => printNotification(notification)
- case NotificationLevel.Execution =>
- if (notification.getNotLevel != NotificationLevel.Code)
- printNotification(notification)
- case _ =>
- }
-
- }
-
- def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
-
- val actionName = taskIdsToActions(status.getTaskId)
- status.getState match {
- case TaskState.TASK_STARTING => log.info("Task starting ...")
- case TaskState.TASK_RUNNING => {
- jobManager.actionStarted(status.getTaskId.getValue)
- printNotification(new Notification("", s"created container for $actionName created", NotificationType.Info, NotificationLevel.Execution))
-
- }
- case TaskState.TASK_FINISHED => {
- jobManager.actionComplete(status.getTaskId.getValue)
- printNotification(new Notification("", s"Container ${status.getExecutorId.getValue} Complete with task ${status.getTaskId.getValue} with success.", NotificationType.Info, NotificationLevel.Execution))
- }
- case TaskState.TASK_FAILED |
- TaskState.TASK_KILLED |
- TaskState.TASK_ERROR |
- TaskState.TASK_LOST => {
- jobManager.actionFailed(status.getTaskId.getValue, status.getMessage)
- printNotification(new Notification("", s"error launching container with ${status.getMessage} in ${status.getData.toStringUtf8}", NotificationType.Error, NotificationLevel.Execution))
-
- }
- case _ => log.warn("WTF? just got unexpected task state: " + status.getState)
- }
-
- }
-
- def validateOffer(offer: Offer): Boolean = {
-
- val resources = offer.getResourcesList.asScala
-
- resources.count(r => r.getName == "cpus" && r.getScalar.getValue >= config.jobs.tasks.cpus) > 0 &&
- resources.count(r => r.getName == "mem" && r.getScalar.getValue >= config.jobs.tasks.mem) > 0
- }
-
- def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = {
-
- val actionId = offersToTaskIds(offerId.getValue)
- jobManager.reQueueAction(actionId)
-
- }
-
- def resourceOffers(driver: SchedulerDriver, offers: util.List[Offer]): Unit = {
-
- println(jobManager.toString)
-
- for (offer <- offers.asScala) {
-
- if (validateOffer(offer)) {
-
- log.info(s"Accepting offer, id=${offer.getId}")
-
- // this is done to avoid the processing the same action
- // multiple times
- lock.lock()
-
- try {
- val actionData = jobManager.getNextActionData
- if (actionData != null) {
-
- frameworkFactory = FrameworkProvidersFactory(env, config)
- val items = frameworkFactory.providers.values.flatMap(_.getConfigurationItems).toList.asJava
- configManager = new ConfigManager(env, "repo", items)
-
- val taskId = Protos.TaskID.newBuilder().setValue(actionData.getId).build()
- taskIdsToActions.put(taskId, actionData.getName)
- // setting up the configuration files for the container
- val envYaml = configManager.getActionConfigContent(actionData.getName, actionData.getConfig)
- writeConfigFile(envYaml, jobManager.getJobId, actionData.getName, "env.yaml")
-
- val envConf = configManager.getActionConfiguration(actionData.getName, actionData.getConfig)
- val dataStores = DataLoader.getTaskData(actionData, env).getExports
- val writer = new StringWriter()
- yamlMapper.writeValue(writer, dataStores)
- val dataStoresYaml = writer.toString
- writeConfigFile(dataStoresYaml, jobManager.getJobId, actionData.getName, "datastores.yaml")
-
- writeConfigFile(s"jobId: ${jobManager.getJobId}\nactionName: ${actionData.getName}", jobManager.getJobId, actionData.getName, "runtime.yaml")
-
- val datasets = DataLoader.getDatasets(env)
- writeConfigFile(datasets, jobManager.getJobId, actionData.getName, "datasets.yaml")
- offersToTaskIds.put(offer.getId.getValue, taskId.getValue)
-
- // atomically adding a record for the slave, I'm storing all the actions
- // on a slave level to efficiently handle slave loses
- executionMap.putIfAbsent(offer.getSlaveId.toString, new ConcurrentHashMap[String, ActionStatus].asScala)
-
- val slaveActions = executionMap(offer.getSlaveId.toString)
- slaveActions.put(taskId.getValue, ActionStatus.Started)
-
- val frameworkProvider = frameworkFactory.providers(actionData.getGroupId)
-
- val runnerProvider = frameworkProvider.getRunnerProvider(actionData.getTypeId)
-
- printNotification(new Notification("", s"provider ${runnerProvider.getClass.getName}", NotificationType.Info, NotificationLevel.Execution))
- // searching for an executor that already exist on the slave, if non exist
- // we create a new one
- var executor: ExecutorInfo = null
-
- // val slaveId = offer.getSlaveId.getValue
- // slavesExecutors.synchronized {
-
- val execData = DataLoader.getExecutorDataBytes(env, config)
- val executorId = taskId.getValue + "-" + UUID.randomUUID()
- //creating the command
-
- // // TODO: move this into the runner provider somehow
- // if(!actionData.getSrc.isEmpty){
- // copy(get(s"repo/src/${actionData.getSrc}"), get(s"dist/${jobManager.getJobId}/${actionData.getName}/${actionData.getSrc}"), REPLACE_EXISTING)
- // }
- val commandStr = runnerProvider.getCommand(jobManager.getJobId, actionData, envConf, executorId, "")
- printNotification(new Notification("", s"container command $commandStr", NotificationType.Info, NotificationLevel.Execution))
-
- val command = CommandInfo
- .newBuilder
- .setValue(commandStr)
- // .addUris(URI.newBuilder
- // .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/executor-${config.version}-all.jar")
- // .setExecutable(false)
- // .setExtract(false)
- // .build())
-
- // Getting framework (group) resources
- log.info(s"===> groupResources: ${frameworkProvider.getGroupResources}")
- frameworkProvider.getGroupResources.foreach(f => command.addUris(URI.newBuilder
- .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/${f.getName}")
- .setExecutable(false)
- .setExtract(true)
- .build()
- ))
-
- // Getting runner resources
- runnerProvider.getRunnerResources.foreach(r => {
- command.addUris(URI.newBuilder
- .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
- .setExecutable(false)
- .setExtract(false)
- .build())
- })
-
- // Getting action dependencies
- runnerProvider.getActionDependencies(jobManager.getJobId, actionData).foreach(r => {
-
- FileUtils.copyFile(new File(r), new File(s"dist/$r"))
- command.addUris(URI.newBuilder
- .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
- .setExecutable(false)
- .setExtract(false)
- .build())
- })
-
- // Getting action specific resources
- runnerProvider.getActionResources(jobManager.getJobId, actionData).foreach(r => command.addUris(URI.newBuilder
- .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$r")
- .setExecutable(false)
- .setExtract(false)
- .build()))
-
- // setting up action executable
- val sourcePath = new File(runnerProvider.getActionExecutable(jobManager.getJobId, actionData))
- var executable: Path = null
- if (actionData.getHasArtifact) {
- val relativePath = amaDist.toPath.getRoot.relativize(sourcePath.toPath)
- executable = relativePath.subpath(amaDist.toPath.getNameCount, relativePath.getNameCount)
- } else {
- val dest = new File(s"dist/${jobManager.getJobId}/${sourcePath.toString}")
- FileUtils.copyFile(sourcePath, dest)
- executable = Paths.get(jobManager.getJobId, sourcePath.toPath.toString)
- }
-
- println(s"===> executable $executable")
- command.addUris(URI.newBuilder
- .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/$executable")
- .setExecutable(false)
- .setExtract(false)
- .build())
-
- command
- .addUris(URI.newBuilder()
- .setValue(s"http://${sys.env("AMA_NODE")}:${config.webserver.Port}/amaterasu.properties")
- .setExecutable(false)
- .setExtract(false)
- .build())
-
- // setting the processes environment variables
- val envVarsList = frameworkProvider.getEnvironmentVariables.asScala.toList.map(x => Variable.newBuilder().setName(x._1).setValue(x._2).build()).asJava
- command.setEnvironment(Environment.newBuilder().addAllVariables(envVarsList))
-
- executor = ExecutorInfo
- .newBuilder
- .setData(ByteString.copyFrom(execData))
- .setName(taskId.getValue)
- .setExecutorId(ExecutorID.newBuilder().setValue(executorId))
- .setCommand(command)
-
- .build()
-
- slavesExecutors.put(offer.getSlaveId.getValue, executor)
-
-
- val driverConfiguration = frameworkProvider.getDriverConfiguration(configManager)
-
- var actionTask: TaskInfo = null
-
- if (runnerProvider.getHasExecutor) {
- actionTask = TaskInfo
- .newBuilder
- .setName(taskId.getValue)
- .setTaskId(taskId)
- .setExecutor(executor)
-
- .setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env)))
- .addResources(createScalarResource("cpus", driverConfiguration.getCpus))
- .addResources(createScalarResource("mem", driverConfiguration.getMemory))
- .addResources(createScalarResource("disk", config.jobs.repoSize))
- .setSlaveId(offer.getSlaveId)
- .build()
-
- //driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
- }
- else {
- actionTask = TaskInfo
- .newBuilder
- .setName(taskId.getValue)
- .setTaskId(taskId)
- .setCommand(command)
-
- //.setData(ByteString.copyFrom(DataLoader.getTaskDataBytes(actionData, env)))
- .addResources(createScalarResource("cpus", driverConfiguration.getCpus))
- .addResources(createScalarResource("mem", driverConfiguration.getMemory))
- .addResources(createScalarResource("disk", config.jobs.repoSize))
- .setSlaveId(offer.getSlaveId)
- .build()
-
- //driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
- }
-
- printNotification(new Notification("", s"requesting container for ${actionData.getName}", NotificationType.Info, NotificationLevel.Execution))
- driver.launchTasks(Collections.singleton(offer.getId), List(actionTask).asJava)
-
- }
- else if (jobManager.getOutOfActions) {
- log.info(s"framework ${jobManager.getJobId} execution finished")
-
- val repo = new File("repo/")
- repo.delete()
-
- HttpServer.stop()
- driver.declineOffer(offer.getId)
- driver.stop()
- sys.exit()
- }
- else {
- log.info("Declining offer, no action ready for execution")
- driver.declineOffer(offer.getId)
- }
- }
- finally {
- lock.unlock()
- }
- }
- else {
- log.info("Declining offer, no sufficient resources")
- driver.declineOffer(offer.getId)
- }
-
- }
-
- }
-
- def registered(driver: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo): Unit = {
-
- if (!resume) {
-
- jobManager = JobLoader.loadJob(
- src,
- branch,
- frameworkId.getValue,
- userName,
- password,
- client,
- config.jobs.tasks.attempts,
- new LinkedBlockingQueue[ActionData]()
- )
- }
- else {
-
- JobLoader.reloadJob(
- frameworkId.getValue,
- userName,
- password,
- client,
- config.jobs.tasks.attempts,
- new LinkedBlockingQueue[ActionData]()
- )
-
- }
-
-
-
-
- jobManager.start()
-
- createJobDir(jobManager.getJobId)
-
- }
-
- def reregistered(driver: SchedulerDriver, masterInfo: Protos.MasterInfo) {}
-
- def printNotification(notification: Notification): Unit = {
-
- var color = Console.WHITE
-
- notification.getNotType match {
-
- case NotificationType.Info =>
- color = Console.WHITE
- println(s"$color${Console.BOLD}===> ${notification.getMsg} ${Console.RESET}")
- case NotificationType.Success =>
- color = Console.GREEN
- println(s"$color${Console.BOLD}===> ${notification.getLine} ${Console.RESET}")
- case NotificationType.Error =>
- color = Console.RED
- println(s"$color${Console.BOLD}===> ${notification.getLine} ${Console.RESET}")
- println(s"$color${Console.BOLD}===> ${notification.getMsg} ${Console.RESET}")
-
- }
-
- }
-
- private def createJobDir(jobId: String): Unit = {
- val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
- val amaHome = new File(jarFile.getParent).getParent
- val jobDir = s"$amaHome/dist/$jobId/"
-
- val dir = new File(jobDir)
- if (!dir.exists()) {
- dir.mkdir()
- }
- }
-
- /**
- * This function creates an action specific env.yml file int the dist folder with the following path:
- * dist/{jobId}/{actionName}/env.yml to be added to the container
- *
- * @param configuration A YAML string to be written to the env file
- * @param jobId the jobId
- * @param actionName the name of the action
- */
- def writeConfigFile(configuration: String, jobId: String, actionName: String, fileName: String): Unit = {
- val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
- val amaHome = new File(jarFile.getParent).getParent
- val envLocation = s"$amaHome/dist/$jobId/$actionName/"
-
- val dir = new File(envLocation)
- if (!dir.exists()) {
- dir.mkdirs()
- }
-
- new PrintWriter(s"$envLocation/$fileName") {
- write(configuration)
- close
- }
- }
-}
-
-object JobScheduler {
-
- def apply(src: String,
- branch: String,
- username: String,
- password: String,
- env: String,
- resume: Boolean,
- config: ClusterConfig,
- report: String,
- home: String): JobScheduler = {
-
- LogManager.resetConfiguration()
- val scheduler = new JobScheduler()
-
- HttpServer.start(config.webserver.Port, s"$home/${config.webserver.Root}")
-
- if (sys.env.get("AWS_ACCESS_KEY_ID").isDefined &&
- sys.env.get("AWS_SECRET_ACCESS_KEY").isDefined) {
-
- scheduler.awsEnv = s"env AWS_ACCESS_KEY_ID=${sys.env("AWS_ACCESS_KEY_ID")} env AWS_SECRET_ACCESS_KEY=${sys.env("AWS_SECRET_ACCESS_KEY")}"
- }
-
- scheduler.resume = resume
- scheduler.src = src
- scheduler.branch = branch
- scheduler.userName = username
- scheduler.password = password
- scheduler.env = env
- scheduler.reportLevel = NotificationLevel.valueOf(report.capitalize)
-
- val retryPolicy = new ExponentialBackoffRetry(1000, 3)
- scheduler.client = CuratorFrameworkFactory.newClient(config.zk, retryPolicy)
- scheduler.client.start()
- scheduler.config = config
-
- scheduler
-
- }
-
-}
\ No newline at end of file
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala
deleted file mode 100644
index 8364456..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/Args.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.utilities
-
-case class Args(
- repo: String = "",
- branch: String = "master",
- env: String = "default",
- name: String = "amaterasu-job",
- jobId: String = null,
- report: String = "code",
- home: String = "",
- newJobId: String = "",
- username: String = "",
- password: String = ""
- ) {
- def toCmdString: String = {
- var cmd = s""" --repo $repo --branch $branch --env $env --name $name --report $report --home $home"""
- if (!username.isEmpty && !password.isEmpty) {
- cmd += s" --user-name $username --password $password"
- }
- if(jobId != null && !jobId.isEmpty) {
- cmd += s" --job-id $jobId"
- }
- cmd
- }
-
- override def toString: String = {
- toCmdString
- }
-}
-
-object Args {
- def getParser: scopt.OptionParser[Args] = {
- val pack = this.getClass.getPackage
- new scopt.OptionParser[Args]("amaterasu job") {
-
- head("amaterasu job", if(pack == null) "DEVELOPMENT" else pack.getImplementationVersion)
-
- opt[String]('r', "repo") action { (x, c) =>
- c.copy(repo = x)
- } text "The git repo containing the job"
-
- opt[String]('b', "branch") action { (x, c) =>
- c.copy(branch = x)
- } text "The branch to be executed (default is master)"
-
- opt[String]('e', "env") action { (x, c) =>
- c.copy(env = x)
- } text "The environment to be executed (test, prod, etc. values from the default env are taken if np env specified)"
-
- opt[String]('n', "name") action { (x, c) =>
- c.copy(name = x)
- } text "The name of the job"
-
- opt[String]('i', "job-id") action { (x, c) =>
- c.copy(jobId = x)
- } text "The jobId - should be passed only when resuming a job"
-
- opt[String]('j', "new-job-id") action { (x, c) =>
- c.copy(newJobId = x)
- } text "A new jobId - should never be passed by a user"
-
- opt[String]('r', "report") action { (x, c) =>
- c.copy(report = x)
- } text "The level of reporting"
-
- opt[String]('h', "home") action { (x, c) =>
- c.copy(home = x)
- }
-
- opt[String]('u', "user-name") action { (x, c) =>
- c.copy(username = x)
- }
-
- opt[String]('p', "password") action { (x, c) =>
- c.copy(password = x)
- }
- }
- }
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
deleted file mode 100644
index 38c90c7..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/BaseJobLauncher.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.utilities
-
-import java.io.FileInputStream
-
-import org.apache.amaterasu.common.configuration.ClusterConfig
-import org.apache.amaterasu.common.logging.Logging
-
-
-abstract class BaseJobLauncher extends Logging with App {
-
- def run(args: Args, config: ClusterConfig, resume: Boolean): Unit = ???
-
- val parser = Args.getParser
- parser.parse(args, Args()) match {
-
- case Some(arguments: Args) =>
-
- val config = ClusterConfig(new FileInputStream(s"${arguments.home}/amaterasu.properties"))
- val resume = arguments.jobId != null
-
- run(arguments, config, resume)
-
- case None =>
- // arguments are bad, error message will have been displayed
- }
-}
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala b/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala
deleted file mode 100644
index 4aacd38..0000000
--- a/leader/src/main/scala/org/apache/amaterasu/leader/utilities/HttpServer.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.utilities
-
-import org.apache.amaterasu.common.logging.Logging
-import org.apache.log4j.{BasicConfigurator, Level, Logger}
-import org.eclipse.jetty.server.handler._
-import org.eclipse.jetty.server.{Server, ServerConnector}
-import org.eclipse.jetty.util.log.StdErrLog
-import org.jsoup.Jsoup
-import org.jsoup.select.Elements
-
-import scala.collection.JavaConverters._
-import scala.io.{BufferedSource, Source}
-
-/**
- * Implementation of Jetty Web server to server Amaterasu libraries and other distribution files
- */
-object HttpServer extends Logging {
-
- var server: Server = _
-
- def start(port: String, serverRoot: String): Unit = {
-
- BasicConfigurator.configure()
- initLogging()
-
- server = new Server()
- val connector = new ServerConnector(server)
- connector.setPort(port.toInt)
- server.addConnector(connector)
-
- val handler = new ResourceHandler()
- handler.setDirectoriesListed(true)
- handler.setWelcomeFiles(Array[String]("index.html"))
- handler.setResourceBase(serverRoot)
- val handlers = new HandlerList()
- handlers.setHandlers(Array(handler, new DefaultHandler()))
-
- server.setHandler(handlers)
- server.start()
-
- }
-
- def stop() {
- if (server == null) throw new IllegalStateException("Server not Started")
-
- server.stop()
- server = null
- }
-
- def initLogging(): Unit = {
- System.setProperty("org.eclipse.jetty.util.log.class", classOf[StdErrLog].getName)
- Logger.getLogger("org.eclipse.jetty").setLevel(Level.ALL)
- Logger.getLogger("org.eclipse.jetty.websocket").setLevel(Level.OFF)
- }
-
- /*
- Method: getFilesInDirectory
- Description: provides a list of files in the given directory URL.
- @Params: amaNode: Hostname of the URL, port: Port # of the host, directory: destination directory to fetch files
- Note: Should the files in URL root be fetched, provide an empty value to directory.
- */
- def getFilesInDirectory(amaNode: String, port: String, directory: String = ""): Array[String] = {
- println("http://" + amaNode + ":" + port + "/" + directory)
- val html: BufferedSource = Source.fromURL("http://" + amaNode + ":" + port + "/" + directory)
- println(html)
- val htmlDoc = Jsoup.parse(html.mkString)
- val htmlElement: Elements = htmlDoc.body().select("a")
- val files = htmlElement.asScala
- val fileNames = files.map(url => url.attr("href")).filter(file => !file.contains("..")).map(name => name.replace("/", "")).toArray
- fileNames
- }
-}
\ No newline at end of file
diff --git a/leader/src/test/resources/amaterasu.properties b/leader/src/test/resources/amaterasu.properties
deleted file mode 100755
index 96e3724..0000000
--- a/leader/src/test/resources/amaterasu.properties
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-zk=127.0.0.1
-version=0.2.0-incubating-rc4
-master=10.0.0.31
-user=root
-mode=yarn
-webserver.port=8000
-webserver.root=dist
-spark.version=2.2.1-bin-hadoop2.7
-yarn.queue=default
-yarn.jarspath=hdfs:///apps/amaterasu
-spark.home=/usr/lib/spark
-#spark.home=/opt/cloudera/parcels/SPARK2-2.1.0.cloudera2-1.cdh5.7.0.p0.171658/lib/spark2
-yarn.hadoop.home.dir=/etc/hadoop
-spark.opts.spark.yarn.am.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
-spark.opts.spark.driver.extraJavaOptions="-Dhdp.version=2.6.1.0-129"
-yarn.master.memoryMB=2048
-yarn.worker.memoryMB=2048
-mesos.libPath=/usr/local/lib/libmesos.dylib
\ No newline at end of file
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala
deleted file mode 100755
index ca9186c..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/ActionStatusTests.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution
-
-import java.util
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.execution.actions.SequentialAction
-import org.apache.curator.framework.CuratorFrameworkFactory
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.test.TestingServer
-import org.apache.zookeeper.CreateMode
-import org.scalatest.{DoNotDiscover, FlatSpec, Matchers}
-
-import scala.collection.JavaConverters._
-
-class ActionStatusTests extends FlatSpec with Matchers {
-
- // setting up a testing zookeeper server (curator TestServer)
- val retryPolicy = new ExponentialBackoffRetry(1000, 3)
- val server = new TestingServer(2181, true)
- val jobId = s"job_${System.currentTimeMillis}"
- val data = new ActionData(ActionStatus.Pending, "test_action", "start.scala", "", "spark","scala", "0000001", new util.HashMap() , List[String]().asJava)
-
- "an Action" should "queue it's ActionData int the job queue when executed" in {
-
- val queue = new LinkedBlockingQueue[ActionData]()
- // val config = ClusterConfig()
-
- val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
- client.start()
-
- client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
- val action = new SequentialAction(data.getName, data.getSrc, "", data.getGroupId, data.getTypeId, Map.empty[String, String].asJava, jobId, queue, client, 1)
-
- action.execute()
- queue.peek().getName should be(data.getName)
- queue.peek().getSrc should be(data.getSrc)
-
- }
-
- it should "also create a sequential znode for the task with the value of Queued" in {
-
- val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
- client.start()
-
- val taskStatus = client.getData.forPath(s"/$jobId/task-0000000000")
-
- taskStatus should not be null
- new String(taskStatus) should be("Queued")
-
- }
-
-}
\ No newline at end of file
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
deleted file mode 100755
index 28b1ce3..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobExecutionTests.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.dsl.JobParser
-import org.apache.amaterasu.leader.common.execution.actions.Action
-import org.apache.curator.framework.CuratorFrameworkFactory
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.test.TestingServer
-import org.apache.zookeeper.CreateMode
-import org.scalatest.{FlatSpec, Matchers}
-
-import scala.io.Source
-
-class JobExecutionTests extends FlatSpec with Matchers {
-
- val retryPolicy = new ExponentialBackoffRetry(1000, 3)
- val server = new TestingServer(2183, true)
- val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
- client.start()
-
- val jobId = s"job_${System.currentTimeMillis}"
- val yaml = Source.fromURL(getClass.getResource("/simple-maki.yml")).mkString
- val queue = new LinkedBlockingQueue[ActionData]()
-
- // this will be performed by the job bootstraper
-
- client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
- // client.setData().forPath(s"/$jobId/src",src.getBytes)
- // client.setData().forPath(s"/$jobId/branch", branch.getBytes)
-
-
- val job = JobParser.parse(jobId, yaml, queue, client, 1)
-
- "a job" should "queue the first action when the JobManager.start method is called " in {
-
- job.start
-
- queue.peek.getName should be("start")
-
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
- new String(actionStatus) should be("Queued")
-
- }
-
- it should "return the start action when calling getNextAction and dequeue it" in {
-
- job.getNextActionData.getName should be("start")
- queue.size should be(0)
-
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
- new String(actionStatus) should be("Started")
- }
-
- it should "not be out of actions when an action is still Pending" in {
- job.getOutOfActions should be(false)
- }
-
- it should "be requeued correctly" in {
- job.reQueueAction("0000000000")
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
- new String(new String(actionStatus)) should be("Queued")
- }
-
- it should "should be able to restart" in {
-
- val data = job.getNextActionData
-
- data.getName should be("start")
-
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
- new String(actionStatus) should be("Started")
- }
-
- it should "be marked as Complete when the actionComplete method is called" in {
-
- job.actionComplete("0000000000")
-
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000000")
-
- new String(new String(actionStatus)) should be("Complete")
-
- }
-
- "the next step2 job" should "be Queued as a result of the completion" in {
-
- queue.peek.getName should be("step2")
-
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
- new String(actionStatus) should be("Queued")
-
- }
-
- it should "be marked as Started when JobManager.getNextActionData is called" in {
-
- val data = job.getNextActionData
-
- data.getName should be("step2")
-
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001")
- new String(actionStatus) should be("Started")
- }
-
- it should "be marked as Failed when JobManager.actionFailed is called" in {
-
- job.actionFailed("0000000001", "test failure")
- queue.peek.getName should be("error-action")
- }
-
- "an ErrorAction" should "be queued if one exist" in {
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001-error")
- new String(actionStatus) should be("Queued")
-
- // and returned by getNextActionData
- val data = job.getNextActionData
-
- }
-
- it should "be marked as Complete when the actionComplete method is called" in {
-
- job.actionComplete("0000000001-error")
-
- // making sure that the status is reflected in zk
- val actionStatus = client.getData.forPath(s"/$jobId/task-0000000001-error")
-
- new String(new String(actionStatus)) should be("Complete")
-
- }
-
- it should " be out of actions when all actions have been executed" in {
-
- job.getOutOfActions should be(true)
- }
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
deleted file mode 100755
index 5987b35..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobParserTests.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.dsl.JobParser
-import org.apache.curator.framework.CuratorFrameworkFactory
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.test.TestingServer
-import org.apache.zookeeper.CreateMode
-import org.scalatest.{FlatSpec, Matchers}
-
-import scala.io.Source
-
-class JobParserTests extends FlatSpec with Matchers {
-
- val retryPolicy = new ExponentialBackoffRetry(1000, 3)
- val server = new TestingServer(2187, true)
- val client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
- client.start()
-
- private val jobId = s"job_${System.currentTimeMillis}"
- private val yaml = Source.fromURL(getClass.getResource("/simple-maki.yml")).mkString
- private val queue = new LinkedBlockingQueue[ActionData]()
-
- // this will be performed by the job bootstrapper
- client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
-
- private val job = JobParser.parse(jobId, yaml, queue, client, 1)
-
- "JobParser" should "parse the simple-maki.yml" in {
-
- job.getName should be("amaterasu-test")
-
- }
-
- //TODO: I suspect this test is not indicative, and that order is assured need to verify this
- it should "also have two actions in the right order" in {
-
- job.getRegisteredActions.size should be(3)
-
- job.getRegisteredActions.get("0000000000").data.getName should be("start")
- job.getRegisteredActions.get("0000000001").data.getName should be("step2")
- job.getRegisteredActions.get("0000000001-error").data.getName should be("error-action")
-
- }
-
- it should "Action 'config' is parsed successfully" in {
-
- job.getRegisteredActions.size should be(3)
-
- job.getRegisteredActions.get("0000000000").data.getConfig should be("start-cfg.yaml")
- job.getRegisteredActions.get("0000000001").data.getConfig should be("step2-cfg.yaml")
- job.getRegisteredActions.get("0000000001-error").data.getConfig should be("error-cfg.yaml")
-
- }
-
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala b/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala
deleted file mode 100755
index 235eccb..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/common/execution/JobRestoreTests.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.common.execution
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import org.apache.amaterasu.common.configuration.enums.ActionStatus
-import org.apache.amaterasu.common.dataobjects.ActionData
-import org.apache.amaterasu.leader.common.execution.{JobLoader, JobManager}
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.test.TestingServer
-import org.apache.zookeeper.CreateMode
-import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import scala.io.Source
-
-class JobRestoreTests extends FlatSpec with Matchers with BeforeAndAfterEach {
-
- val retryPolicy = new ExponentialBackoffRetry(1000, 3)
- val server = new TestingServer(2184, true)
- var client: CuratorFramework = null
-
- val jobId = s"job_${System.currentTimeMillis}"
- val maki = Source.fromURL(getClass.getResource("/simple-maki.yml")).mkString
- val queue = new LinkedBlockingQueue[ActionData]()
-
- var manager: JobManager = null
-
- client = CuratorFrameworkFactory.newClient(server.getConnectString, retryPolicy)
- client.start()
-
- override def beforeEach(): Unit = {
-
- // creating the jobs znode and storing the source repo and branch
- client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId")
- client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId/src", "".getBytes)
- client.create().withMode(CreateMode.PERSISTENT).forPath(s"/$jobId/branch", "".getBytes)
-
- manager = JobLoader.createJobManager(maki, jobId, client, 3, queue)
-
- }
-
- override def afterEach(): Unit = {
-
- client.delete().deletingChildrenIfNeeded().forPath(s"/$jobId")
-
- }
-
- "a restored job" should "have all Queued actions in the executionQueue" in {
-
- // setting task-0000000002 as Queued
- client.setData().forPath(s"/${jobId}/task-0000000002", ActionStatus.Queued.toString.getBytes)
-
- JobLoader.restoreJobState(manager, jobId, client)
-
- queue.peek.getName should be("start")
- }
-
- "a restored job" should "have all Started actions in the executionQueue" in {
-
- // setting task-0000000002 as Queued
- client.setData().forPath(s"/${jobId}/task-0000000002", ActionStatus.Started.toString.getBytes)
-
- JobLoader.restoreJobState(manager, jobId, client)
-
- queue.peek.getName should be("start")
- }
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala b/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala
deleted file mode 100755
index 1e9b48b..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/integration/GitTests.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.integration
-
-import org.apache.amaterasu.leader.common.dsl.GitUtil
-import org.scalatest.{FlatSpec, Matchers}
-
-import scala.reflect.io.Path
-
-class GitTests extends FlatSpec with Matchers {
-
- "GitUtil.cloneRepo" should "clone the sample job git repo" in {
-
- val path = Path("repo")
- path.deleteRecursively()
-
- GitUtil.cloneRepo("https://github.com/shintoio/amaterasu-job-sample.git", "master", "", "")
-
- val exists = new java.io.File("repo/maki.yml").exists
- exists should be(true)
- }
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/leader/mesos/MesosTestUtil.scala b/leader/src/test/scala/org/apache/amaterasu/leader/mesos/MesosTestUtil.scala
deleted file mode 100755
index 0397b87..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/leader/mesos/MesosTestUtil.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.leader.mesos
-
-import java.util
-import java.util.UUID
-
-import org.apache.mesos.Protos._
-
-
-object MesosTestUtil {
- def createOffer(mem: Double, disk: Double, cpus: Int): Offer = {
-
- val resources = new util.ArrayList[Resource]()
- resources.add(createScalarResource("mem", mem))
- resources.add(createScalarResource("disk", disk))
- resources.add(createScalarResource("cpus", cpus))
-
- Offer.newBuilder()
- .setId(OfferID.newBuilder().setValue(UUID.randomUUID.toString))
- .setFrameworkId(FrameworkID.newBuilder().setValue("Amaterasu"))
- .setSlaveId(SlaveID.newBuilder().setValue(UUID.randomUUID.toString))
- .setHostname("localhost")
- .addAllResources(resources)
-
- .build()
-
- }
-
- def createScalarResource(name: String, value: Double): org.apache.mesos.Protos.Resource = {
-
- org.apache.mesos.Protos.Resource.newBuilder().setName(name).setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder.setValue(value).build()).build()
-
- }
-}
diff --git a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala b/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
deleted file mode 100644
index 7eba1da..0000000
--- a/leader/src/test/scala/org/apache/amaterasu/utilities/HttpServerTests.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.amaterasu.utilities
-
-
-import java.io.File
-
-import org.scalatest.{FlatSpec, Matchers}
-
-
-class HttpServerTests extends FlatSpec with Matchers {
-
- // this is an ugly hack, getClass.getResource("/").getPath should have worked but
- // stopped working when we moved to gradle :(
- val resources: String = new File(getClass.getResource("/simple-maki.yml").getPath).getParent
-
- // "Jetty Web server" should "start HTTP server, serve content and stop successfully" in {
- //
- // var data = ""
- // try {
- // HttpServer.start("8000", resources)
- // val html = Source.fromURL("http://localhost:8000/jetty-test-data.txt")
- // data = html.mkString
- // }
- // finally {
- // HttpServer.stop()
- // }
- // data should equal("This is a test file to download from Jetty webserver")
- // }
-
-// "Jetty File server with '/' as root" should "start HTTP server, serve content and stop successfully" in {
-//
-// var urlCount: Int = 0
-// println("resource location" + resources)
-// try {
-// HttpServer.start("8000", resources)
-// val urls = HttpServer.getFilesInDirectory("127.0.0.1", "8000", "dist")
-// urls.foreach(println)
-// urlCount = urls.length
-// } catch {
-// case e: Exception => println(s"++++>> ${e.getMessage}")
-// }
-// finally {
-// HttpServer.stop()
-// }
-// urlCount should equal(2)
-// }
-//
-// "Jetty File server with 'dist' as root" should "start HTTP server, serve content and stop successfully" in {
-// var data = ""
-// var urlCount: Int = 0
-// println("resource location" + resources)
-// try {
-// HttpServer.start("8000", resources + "/dist")
-// val urls = HttpServer.getFilesInDirectory("localhost", "8000", "")
-// urls.foreach(println)
-// urlCount = urls.length
-// }
-// finally {
-// HttpServer.stop()
-// }
-// urlCount should equal(2)
-// }
-}
diff --git a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt
index f68922e..6b0859f 100644
--- a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt
+++ b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/FrameworkSetupProvider.kt
@@ -26,11 +26,11 @@
val groupIdentifier: String
- val groupResources: Array<File>
+ val groupResources: List<File>
val environmentVariables: Map<String, String>
- val configurationItems: Array<String>
+ val configurationItems: List<String>
fun init(env: String, conf: ClusterConfig)
diff --git a/settings.gradle b/settings.gradle
index 9e757dc..cc5161b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -16,8 +16,8 @@
*/
// Core
-include 'leader'
-project(':leader')
+//include 'leader'
+//project(':leader')
include 'leader-common'
project(':leader-common')
@@ -32,9 +32,6 @@
include 'common'
project(':common')
-include 'executor'
-project(':executor')
-
include 'sdk'
findProject(':sdk')?.name = 'amaterasu-sdk'