Merge pull request #49 from roadan/AMATERASU-69
Amaterasu 69
diff --git a/common/build.gradle b/common/build.gradle
index 337c3d9..bd30444 100644
--- a/common/build.gradle
+++ b/common/build.gradle
@@ -34,6 +34,14 @@
mavenCentral()
}
+junitPlatform {
+ filters {
+ engines {
+ include 'spek'
+ }
+ }
+}
+
configurations {
provided
compile.extendsFrom provided
@@ -46,6 +54,8 @@
compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.9'
compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.4'
compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: '2.9.8'
+ compile group: 'commons-validator', name: 'commons-validator', version: '1.6'
+ compile group: 'software.amazon.awssdk', name: 's3', version: '2.5.23'
compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
compile "org.jetbrains.kotlin:kotlin-reflect"
@@ -66,12 +76,13 @@
compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
+ compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
provided group: 'org.apache.hadoop', name: 'hadoop-yarn-client', version: '2.8.4'
provided group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.8.4'
provided group: 'org.apache.hadoop', name: 'hadoop-yarn-api', version: '2.8.4'
provided group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.8.4'
- compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
+
testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
testRuntime 'org.pegdown:pegdown:1.1.0'
testCompile 'junit:junit:4.11'
diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/utils/FileUtil.kt b/common/src/main/kotlin/org/apache/amaterasu/common/utils/FileUtil.kt
new file mode 100644
index 0000000..626c7f4
--- /dev/null
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/utils/FileUtil.kt
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.common.utils
+
+import org.apache.commons.io.FilenameUtils
+import software.amazon.awssdk.services.s3.S3Client
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
+import org.apache.commons.validator.routines.UrlValidator
+import org.jets3t.service.S3ServiceException
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
+import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider
+import software.amazon.awssdk.core.sync.ResponseTransformer
+import software.amazon.awssdk.regions.Region
+import software.amazon.awssdk.services.s3.model.GetObjectRequest
+import java.io.FileNotFoundException
+import java.io.IOException
+import java.lang.IllegalArgumentException
+import java.net.URL
+import java.nio.file.Paths
+
+class FileUtil(accessKeyId: String = "", secretAccessKey: String = "") {
+
+ private val schemes = arrayOf("http", "https", "s3", "s3a")
+ private val urlValidator = UrlValidator(schemes)
+
+ private var credentials: AwsCredentialsProvider = if (accessKeyId.isNotEmpty()) {
+ StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey))
+ } else {
+ InstanceProfileCredentialsProvider.builder().build()
+ }
+
+ fun downloadFile(remote: String): String {
+
+ assert(isSupportedUrl(remote))
+ val url = URL(remote)
+ var result = ""
+
+ try {
+
+ // https://s3-ap-southeast-2.amazonaws.com/amaterasu/BugBounty-TestUpload.txt
+ val scheme = url.protocol //http
+ if (scheme !in schemes) {
+ throw IllegalArgumentException("${url.protocol} not supported")
+ }
+
+ val host = url.host // s3-ap-southeast-2.amazonaws.com
+ val region: String = if (host == "s3.amazonaws.com") {
+ "us-east-1" //N.Virginia
+ } else {
+ host.removePrefix("s3-").removeSuffix(".amazonaws.com")
+ }
+
+ val path = url.path.removePrefix("/") // /amaterasu/testfile.txt
+ val split = path.split("/")
+ val bucket = split[0]
+ val key = split.subList(1, split.size).joinToString("/")
+
+ val s3 = S3Client.builder()
+ .credentialsProvider(credentials)
+ .region(Region.of(region))
+ .build()
+
+ val request = GetObjectRequest.builder()
+ .bucket(bucket)
+ .key(key)
+ .build()
+
+ s3.getObject(request, ResponseTransformer.toFile(Paths.get(FilenameUtils.getName(URL(remote).file))))
+ result = FilenameUtils.getName(URL(remote).file)
+
+ } catch (e: S3ServiceException) {
+ System.err.println(e.message)
+ } catch (e: FileNotFoundException) {
+ System.err.println(e.message)
+ } catch (e: IOException) {
+ System.err.println(e.message)
+ }
+ return result
+ }
+
+ fun isSupportedUrl(string: String): Boolean {
+ return urlValidator.isValid(string)
+ }
+
+}
\ No newline at end of file
diff --git a/common/src/test/kotlin/org/apache/amaterasu/common/utils/FileTestUtils.kt b/common/src/test/kotlin/org/apache/amaterasu/common/utils/FileTestUtils.kt
new file mode 100644
index 0000000..2e0c5c2
--- /dev/null
+++ b/common/src/test/kotlin/org/apache/amaterasu/common/utils/FileTestUtils.kt
@@ -0,0 +1,25 @@
+//package org.apache.amaterasu.common.utils
+//
+//import org.jetbrains.spek.api.Spek
+//import org.jetbrains.spek.api.dsl.given
+//import org.jetbrains.spek.api.dsl.it
+//import org.jetbrains.spek.api.dsl.on
+//
+//import org.junit.Assert.*
+//import java.io.File
+//
+//class FileUtilTest: Spek({
+//
+// given("an s3 url") {
+// val url = "https://s3-ap-southeast-2.amazonaws.com/amaterasu/testfile.txt"
+// val util = FileUtil("", "")
+// on("downloading file from s3") {
+// val result: Boolean = util.downloadFile(url,"testfile.txt")
+// it("is successful") {
+// val resultFile = File("testfile.txt")
+// assert(resultFile.exists())
+// }
+// }
+// }
+//
+//})
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
index 0309399..d0a2442 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
@@ -42,6 +42,7 @@
Array[String]()
override def getHasExecutor: Boolean = true
+
}
object PySparkRunnerProvider {
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
index 80afa32..ed1be82 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
@@ -61,6 +61,7 @@
override def getHasExecutor: Boolean = true
+
}
object SparkScalaRunnerProvider {
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
index 08a6fb3..1ceec51 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
@@ -26,20 +26,17 @@
override def getRunnerResources: Array[String] =
Array[String]()
- override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = {
+ override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] =
Array[String]()
- }
- override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] = {
- val util = new ArtifactUtil(List(actionData.repo).asJava, jobId)
- conf.mode match {
- case "mesos" => util.getLocalArtifacts(actionData.getArtifact).toArray().map(x => amaDist.toPath.relativize(x.asInstanceOf[File].toPath).toString)
- case "yarn" => util.getLocalArtifacts(actionData.getArtifact).toArray().map(x => x.asInstanceOf[File].getPath)
- }
- }
+
+ override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] =
+ Array[String]()
+
override def getHasExecutor: Boolean = false
+
}
object SparkSubmitScalaRunnerProvider {
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
index d07c010..3e8a7b6 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -55,7 +55,6 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.YarnException
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
-import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.hadoop.yarn.util.Records
import org.apache.zookeeper.CreateMode
@@ -293,6 +292,13 @@
result["amaterasu.properties"] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, Path("/amaterasu.properties")))
result["log4j.properties"] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, Path("/log4j.properties")))
+ // getting the action executable
+ val executable = runnerProvider.getActionExecutable(jobManager.jobId, actionData)
+
+ // setting the action executable
+ distributeFile(executable, "${jobManager.jobId}/${actionData.name}/")
+ result[File(executable).name] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath("${jobManager.jobId}/${actionData.name}/$executable")))
+
result.forEach { println("entry ${it.key} with value ${it.value}") }
return result.map { x -> x.key.removePrefix("/") to x.value }.toMap()
}
@@ -355,7 +361,7 @@
fileResource.shouldBeUploadedToSharedCache = true
fileResource.visibility = LocalResourceVisibility.PUBLIC
- fileResource.resource = ConverterUtils.getYarnUrlFromPath(path)
+ fileResource.resource = URL.fromPath(path)
fileResource.size = stat.len
fileResource.timestamp = stat.modificationTime
fileResource.type = LocalResourceType.FILE
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index 6ede8d8..464e3bf 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -17,6 +17,7 @@
package org.apache.amaterasu.leader.mesos.schedulers
import java.io.{File, PrintWriter, StringWriter}
+import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.util
import java.util.UUID
import java.util.concurrent.locks.ReentrantLock
@@ -36,6 +37,7 @@
import org.apache.amaterasu.leader.common.utilities.DataLoader
import org.apache.amaterasu.leader.execution.JobLoader
import org.apache.amaterasu.leader.utilities.HttpServer
+import org.apache.commons.io.FileUtils
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.log4j.LogManager
@@ -70,6 +72,9 @@
private var resume: Boolean = false
private var reportLevel: NotificationLevel = _
+ private val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
+ private val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist")
+
val slavesExecutors = new TrieMap[String, ExecutorInfo]
private var awsEnv: String = ""
@@ -97,15 +102,15 @@
def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]): Unit = {
- val notification = mapper.readValue(data, classOf[Notification])
+ val notification = mapper.readValue(data, classOf[Notification])
- reportLevel match {
- case NotificationLevel.Code => printNotification(notification)
- case NotificationLevel.Execution =>
- if (notification.getNotLevel != NotificationLevel.Code)
- printNotification(notification)
- case _ =>
- }
+ reportLevel match {
+ case NotificationLevel.Code => printNotification(notification)
+ case NotificationLevel.Execution =>
+ if (notification.getNotLevel != NotificationLevel.Code)
+ printNotification(notification)
+ case _ =>
+ }
}
@@ -243,6 +248,25 @@
.setExtract(false)
.build()))
+ // setting up action executable
+ val sourcePath = new File(runnerProvider.getActionExecutable(jobManager.getJobId, actionData))
+ var executable: Path = null
+ if (actionData.getHasArtifact) {
+ val relativePath = amaDist.toPath.getRoot.relativize(sourcePath.toPath)
+ executable = relativePath.subpath(amaDist.toPath.getNameCount, relativePath.getNameCount)
+ } else {
+ val dest = new File(s"dist/${jobManager.getJobId}/${sourcePath.toString}")
+ FileUtils.moveFile(sourcePath, dest)
+ executable = Paths.get(jobManager.getJobId, sourcePath.toPath.toString)
+ }
+
+ println(s"===> executable $executable")
+ command.addUris(URI.newBuilder
+ .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$executable")
+ .setExecutable(false)
+ .setExtract(false)
+ .build())
+
command
.addUris(URI.newBuilder()
.setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") //TODO: Nadav needs to clean this on the executor side
diff --git a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
index d8b94e7..9af488e 100644
--- a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
+++ b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
@@ -17,6 +17,8 @@
package org.apache.amaterasu.sdk.frameworks
import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.utils.ArtifactUtil
+import org.apache.amaterasu.common.utils.FileUtil
abstract class RunnerSetupProvider {
@@ -34,6 +36,28 @@
abstract fun getActionDependencies(jobId: String, actionData: ActionData): Array<String>
+ fun getActionExecutable(jobId: String, actionData: ActionData): String {
+
+ // if the action is artifact based
+ return if (actionData.hasArtifact) {
+
+ val util = ArtifactUtil(listOf(actionData.repo), jobId)
+ util.getLocalArtifacts(actionData.artifact).first().path
+
+ } else {
+
+ //action src can be URL based, so we first check if it needs to be downloaded
+ val fileUtil = FileUtil()
+
+ if (fileUtil.isSupportedUrl(actionData.src)) {
+ fileUtil.downloadFile(actionData.src)
+ } else {
+ //"repo/src/${actionData.name}/${actionData.src}"
+ "repo/src/${actionData.src}"
+ }
+ }
+ }
+
abstract val hasExecutor: Boolean
- get
+ get
}
\ No newline at end of file