Merge pull request #49 from roadan/AMATERASU-69

Amaterasu 69
diff --git a/common/build.gradle b/common/build.gradle
index 337c3d9..bd30444 100644
--- a/common/build.gradle
+++ b/common/build.gradle
@@ -34,6 +34,14 @@
     mavenCentral()
 }
 
+junitPlatform {
+    filters {
+        engines {
+            include 'spek'
+        }
+    }
+}
+
 configurations {
     provided
     compile.extendsFrom provided
@@ -46,6 +54,8 @@
     compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.9'
     compile group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.4'
     compile group: 'com.fasterxml.jackson.module', name: 'jackson-module-kotlin', version: '2.9.8'
+    compile group: 'commons-validator', name: 'commons-validator', version: '1.6'
+    compile group: 'software.amazon.awssdk', name: 's3', version: '2.5.23'
 
     compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
     compile "org.jetbrains.kotlin:kotlin-reflect"
@@ -66,12 +76,13 @@
 
     compile group: 'org.apache.maven', name: 'maven-core', version: '3.0.5'
     compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
+    compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
 
     provided group: 'org.apache.hadoop', name: 'hadoop-yarn-client', version: '2.8.4'
     provided group: 'org.apache.hadoop', name: 'hadoop-common', version: '2.8.4'
     provided group: 'org.apache.hadoop', name: 'hadoop-yarn-api', version: '2.8.4'
     provided group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: '2.8.4'
-    compile group: 'net.liftweb', name: 'lift-json_2.11', version: '3.2.0'
+
     testCompile "gradle.plugin.com.github.maiflai:gradle-scalatest:0.14"
     testRuntime 'org.pegdown:pegdown:1.1.0'
     testCompile 'junit:junit:4.11'
diff --git a/common/src/main/kotlin/org/apache/amaterasu/common/utils/FileUtil.kt b/common/src/main/kotlin/org/apache/amaterasu/common/utils/FileUtil.kt
new file mode 100644
index 0000000..626c7f4
--- /dev/null
+++ b/common/src/main/kotlin/org/apache/amaterasu/common/utils/FileUtil.kt
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.amaterasu.common.utils
+
+import org.apache.commons.io.FilenameUtils
+import software.amazon.awssdk.services.s3.S3Client
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
+import org.apache.commons.validator.routines.UrlValidator
+import org.jets3t.service.S3ServiceException
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
+import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider
+import software.amazon.awssdk.core.sync.ResponseTransformer
+import software.amazon.awssdk.regions.Region
+import software.amazon.awssdk.services.s3.model.GetObjectRequest
+import java.io.FileNotFoundException
+import java.io.IOException
+import java.lang.IllegalArgumentException
+import java.net.URL
+import java.nio.file.Paths
+
+class FileUtil(accessKeyId: String = "", secretAccessKey: String = "") {
+
+    private val schemes = arrayOf("http", "https", "s3", "s3a")
+    private val urlValidator = UrlValidator(schemes)
+
+    private var credentials: AwsCredentialsProvider = if (accessKeyId.isNotEmpty()) {
+        StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey))
+    } else {
+        InstanceProfileCredentialsProvider.builder().build()
+    }
+
+    fun downloadFile(remote: String): String {
+
+        assert(isSupportedUrl(remote))
+        val url = URL(remote)
+        var result = ""
+
+        try {
+
+            // https://s3-ap-southeast-2.amazonaws.com/amaterasu/BugBounty-TestUpload.txt
+            val scheme = url.protocol //http
+            if (scheme !in schemes) {
+                throw IllegalArgumentException("${url.protocol} not supported")
+            }
+
+            val host = url.host // s3-ap-southeast-2.amazonaws.com
+            val region: String = if (host == "s3.amazonaws.com") {
+                "us-east-1" //N.Virginia
+            } else {
+                host.removePrefix("s3-").removeSuffix(".amazonaws.com")
+            }
+
+            val path = url.path.removePrefix("/") // /amaterasu/testfile.txt
+            val split = path.split("/")
+            val bucket = split[0]
+            val key = split.subList(1, split.size).joinToString("/")
+
+            val s3 = S3Client.builder()
+                    .credentialsProvider(credentials)
+                    .region(Region.of(region))
+                    .build()
+
+            val request = GetObjectRequest.builder()
+                    .bucket(bucket)
+                    .key(key)
+                    .build()
+
+            s3.getObject(request, ResponseTransformer.toFile(Paths.get(FilenameUtils.getName(URL(remote).file))))
+            result = FilenameUtils.getName(URL(remote).file)
+
+        } catch (e: S3ServiceException) {
+            System.err.println(e.message)
+        } catch (e: FileNotFoundException) {
+            System.err.println(e.message)
+        } catch (e: IOException) {
+            System.err.println(e.message)
+        }
+        return result
+    }
+
+    fun isSupportedUrl(string: String): Boolean {
+        return urlValidator.isValid(string)
+    }
+
+}
\ No newline at end of file
diff --git a/common/src/test/kotlin/org/apache/amaterasu/common/utils/FileTestUtils.kt b/common/src/test/kotlin/org/apache/amaterasu/common/utils/FileTestUtils.kt
new file mode 100644
index 0000000..2e0c5c2
--- /dev/null
+++ b/common/src/test/kotlin/org/apache/amaterasu/common/utils/FileTestUtils.kt
@@ -0,0 +1,25 @@
+//package org.apache.amaterasu.common.utils
+//
+//import org.jetbrains.spek.api.Spek
+//import org.jetbrains.spek.api.dsl.given
+//import org.jetbrains.spek.api.dsl.it
+//import org.jetbrains.spek.api.dsl.on
+//
+//import org.junit.Assert.*
+//import java.io.File
+//
+//class FileUtilTest: Spek({
+//
+//    given("an s3 url") {
+//        val url = "https://s3-ap-southeast-2.amazonaws.com/amaterasu/testfile.txt"
+//        val util =  FileUtil("", "")
+//        on("downloading file from s3") {
+//            val result: Boolean = util.downloadFile(url,"testfile.txt")
+//            it("is successful") {
+//                val resultFile = File("testfile.txt")
+//                assert(resultFile.exists())
+//            }
+//        }
+//    }
+//
+//})
\ No newline at end of file
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
index 0309399..d0a2442 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/PySparkRunnerProvider.scala
@@ -42,6 +42,7 @@
     Array[String]()
 
   override def getHasExecutor: Boolean = true
+
 }
 
 object PySparkRunnerProvider {
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
index 80afa32..ed1be82 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkScalaRunnerProvider.scala
@@ -61,6 +61,7 @@
 
   override def getHasExecutor: Boolean = true
 
+
 }
 
 object SparkScalaRunnerProvider {
diff --git a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
index 08a6fb3..1ceec51 100644
--- a/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
+++ b/frameworks/spark/dispatcher/src/main/scala/org/apache/amaterasu/frameworks/spark/dispatcher/runners/providers/SparkSubmitScalaRunnerProvider.scala
@@ -26,20 +26,17 @@
   override def getRunnerResources: Array[String] =
     Array[String]()
 
-  override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] = {
+  override def getActionUserResources(jobId: String, actionData: ActionData): Array[String] =
     Array[String]()
-  }
 
-  override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] = {
-    val util = new ArtifactUtil(List(actionData.repo).asJava, jobId)
-    conf.mode match {
-      case "mesos" => util.getLocalArtifacts(actionData.getArtifact).toArray().map(x => amaDist.toPath.relativize(x.asInstanceOf[File].toPath).toString)
-      case "yarn" => util.getLocalArtifacts(actionData.getArtifact).toArray().map(x => x.asInstanceOf[File].getPath)
-    }
-  }
+
+  override def getActionDependencies(jobId: String, actionData: ActionData): Array[String] =
+    Array[String]()
+
 
   override def getHasExecutor: Boolean = false
 
+
 }
 
 object SparkSubmitScalaRunnerProvider {
diff --git a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
index d07c010..3e8a7b6 100644
--- a/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
+++ b/leader-yarn/src/main/kotlin/org/apache/amaterasu/leader/yarn/ApplicationMaster.kt
@@ -55,7 +55,6 @@
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.exceptions.YarnException
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier
-import org.apache.hadoop.yarn.util.ConverterUtils
 import org.apache.hadoop.yarn.util.Records
 
 import org.apache.zookeeper.CreateMode
@@ -293,6 +292,13 @@
         result["amaterasu.properties"] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, Path("/amaterasu.properties")))
         result["log4j.properties"] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, Path("/log4j.properties")))
 
+        // getting the action executable
+        val executable = runnerProvider.getActionExecutable(jobManager.jobId, actionData)
+
+        // setting the action executable
+        distributeFile(executable, "${jobManager.jobId}/${actionData.name}/")
+        result[File(executable).name] = createLocalResourceFromPath(Path.mergePaths(yarnJarPath, createDistPath("${jobManager.jobId}/${actionData.name}/$executable")))
+
         result.forEach { println("entry ${it.key} with value ${it.value}") }
         return result.map { x -> x.key.removePrefix("/") to x.value }.toMap()
     }
@@ -355,7 +361,7 @@
 
         fileResource.shouldBeUploadedToSharedCache = true
         fileResource.visibility = LocalResourceVisibility.PUBLIC
-        fileResource.resource = ConverterUtils.getYarnUrlFromPath(path)
+        fileResource.resource = URL.fromPath(path)
         fileResource.size = stat.len
         fileResource.timestamp = stat.modificationTime
         fileResource.type = LocalResourceType.FILE
diff --git a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
index 6ede8d8..464e3bf 100755
--- a/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
+++ b/leader/src/main/scala/org/apache/amaterasu/leader/mesos/schedulers/JobScheduler.scala
@@ -17,6 +17,7 @@
 package org.apache.amaterasu.leader.mesos.schedulers
 
 import java.io.{File, PrintWriter, StringWriter}
+import java.nio.file.{Files, Path, Paths, StandardCopyOption}
 import java.util
 import java.util.UUID
 import java.util.concurrent.locks.ReentrantLock
@@ -36,6 +37,7 @@
 import org.apache.amaterasu.leader.common.utilities.DataLoader
 import org.apache.amaterasu.leader.execution.JobLoader
 import org.apache.amaterasu.leader.utilities.HttpServer
+import org.apache.commons.io.FileUtils
 import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
 import org.apache.curator.retry.ExponentialBackoffRetry
 import org.apache.log4j.LogManager
@@ -70,6 +72,9 @@
   private var resume: Boolean = false
   private var reportLevel: NotificationLevel = _
 
+  private val jarFile = new File(this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath)
+  private val amaDist = new File(s"${new File(jarFile.getParent).getParent}/dist")
+
   val slavesExecutors = new TrieMap[String, ExecutorInfo]
   private var awsEnv: String = ""
 
@@ -97,15 +102,15 @@
 
   def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, slaveId: SlaveID, data: Array[Byte]): Unit = {
 
-        val notification = mapper.readValue(data, classOf[Notification])
+    val notification = mapper.readValue(data, classOf[Notification])
 
-        reportLevel match {
-          case NotificationLevel.Code => printNotification(notification)
-          case NotificationLevel.Execution =>
-            if (notification.getNotLevel != NotificationLevel.Code)
-              printNotification(notification)
-          case _ =>
-        }
+    reportLevel match {
+      case NotificationLevel.Code => printNotification(notification)
+      case NotificationLevel.Execution =>
+        if (notification.getNotLevel != NotificationLevel.Code)
+          printNotification(notification)
+      case _ =>
+    }
 
   }
 
@@ -243,6 +248,25 @@
               .setExtract(false)
               .build()))
 
+            // setting up action executable
+            val sourcePath = new File(runnerProvider.getActionExecutable(jobManager.getJobId, actionData))
+            var executable: Path = null
+            if (actionData.getHasArtifact) {
+              val relativePath = amaDist.toPath.getRoot.relativize(sourcePath.toPath)
+              executable = relativePath.subpath(amaDist.toPath.getNameCount, relativePath.getNameCount)
+            } else {
+              val dest = new File(s"dist/${jobManager.getJobId}/${sourcePath.toString}")
+              FileUtils.moveFile(sourcePath, dest)
+              executable = Paths.get(jobManager.getJobId, sourcePath.toPath.toString)
+            }
+
+            println(s"===> executable $executable")
+            command.addUris(URI.newBuilder
+              .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/$executable")
+              .setExecutable(false)
+              .setExtract(false)
+              .build())
+
             command
               .addUris(URI.newBuilder()
                 .setValue(s"http://${sys.env("AMA_NODE")}:${config.Webserver.Port}/miniconda.sh") //TODO: Nadav needs to clean this on the executor side
diff --git a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
index d8b94e7..9af488e 100644
--- a/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
+++ b/sdk/src/main/kotlin/org/apache/amaterasu/sdk/frameworks/RunnerSetupProvider.kt
@@ -17,6 +17,8 @@
 package org.apache.amaterasu.sdk.frameworks
 
 import org.apache.amaterasu.common.dataobjects.ActionData
+import org.apache.amaterasu.common.utils.ArtifactUtil
+import org.apache.amaterasu.common.utils.FileUtil
 
 abstract class RunnerSetupProvider {
 
@@ -34,6 +36,28 @@
 
     abstract fun getActionDependencies(jobId: String, actionData: ActionData): Array<String>
 
+    fun getActionExecutable(jobId: String, actionData: ActionData): String {
+
+        // if the action is artifact based
+        return if (actionData.hasArtifact) {
+
+            val util = ArtifactUtil(listOf(actionData.repo), jobId)
+            util.getLocalArtifacts(actionData.artifact).first().path
+
+        } else {
+
+            //action src can be URL based, so we first check if it needs to be downloaded
+            val fileUtil = FileUtil()
+
+            if (fileUtil.isSupportedUrl(actionData.src)) {
+                fileUtil.downloadFile(actionData.src)
+            } else {
+                 //"repo/src/${actionData.name}/${actionData.src}"
+                 "repo/src/${actionData.src}"
+            }
+        }
+    }
+
     abstract val hasExecutor: Boolean
-       get
+        get
 }
\ No newline at end of file