Merge pull request #1 from apache/develop

Pull
diff --git a/engine-executor/build.sbt b/engine-executor/build.sbt
index dc6f5ea..edd13b1 100644
--- a/engine-executor/build.sbt
+++ b/engine-executor/build.sbt
@@ -49,6 +49,7 @@
 libraryDependencies += "com.github.java-json-tools" % "json-schema-validator" % "2.2.8"
 libraryDependencies += "io.jvm.uuid" %% "scala-uuid" % "0.2.3"
 libraryDependencies += "com.amazonaws" % "aws-java-sdk-s3" % "1.11.232"
+libraryDependencies += "com.microsoft.azure" % "azure-storage" % "5.0.0"
 
 dependencyOverrides ++= Seq(
   "io.netty" %% "netty" % "3.7.0",
diff --git a/engine-executor/src/main/scala/META-INF/MANIFEST.MF b/engine-executor/src/main/scala/META-INF/MANIFEST.MF
new file mode 100644
index 0000000..4fc5179
--- /dev/null
+++ b/engine-executor/src/main/scala/META-INF/MANIFEST.MF
@@ -0,0 +1,3 @@
+Manifest-Version: 1.0

+Main-Class: org.apache.marvin.executor.EngineExecutorApp

+

diff --git a/engine-executor/src/main/scala/org/marvin/artifact/manager/ArtifactAZSaver.scala b/engine-executor/src/main/scala/org/marvin/artifact/manager/ArtifactAZSaver.scala
new file mode 100644
index 0000000..7030c7a
--- /dev/null
+++ b/engine-executor/src/main/scala/org/marvin/artifact/manager/ArtifactAZSaver.scala
@@ -0,0 +1,111 @@
+package org.apache.marvin.artifact.manager
+
+import org.apache.marvin.model.EngineMetadata
+import java.io.{File, FileInputStream}
+
+import org.apache.hadoop.fs.Path
+import org.apache.marvin.artifact.manager.ArtifactSaver.{SaveToLocal, SaveToRemote}
+import akka.Done
+import akka.actor.{Actor, ActorLogging}
+import com.microsoft.azure.storage.CloudStorageAccount
+import com.microsoft.azure.storage.StorageException
+import com.microsoft.azure.storage.blob.CloudBlobClient
+
+case class ArtifactAZSaver(metadata: EngineMetadata) extends Actor with ActorLogging {
+  var azClient:CloudBlobClient = _
+
+  override def preStart() = {
+    log.info(s"${this.getClass().getCanonicalName} actor initialized...")
+
+
+    //Create Azure Client with default credential informations(Environment Variable)
+    val account = CloudStorageAccount.parse(metadata.azConnectionString)
+    azClient = account.createCloudBlobClient
+
+    log.info("Azure client initialized...")
+  }
+
+
+  def generatePaths(artifactName: String, protocol: String): Map[String, Path] = {
+    var artifactsRemotePath: String = null
+    if(metadata.artifactsRemotePath.startsWith("/")){
+      artifactsRemotePath = metadata.artifactsRemotePath.substring(1)
+    }
+    Map(
+      "localPath" -> new Path(s"${metadata.artifactsLocalPath}/${metadata.name}/$artifactName"),
+      "remotePath" -> new Path(s"${artifactsRemotePath}/${metadata.name}/${metadata.version}/$artifactName/$protocol")
+    )
+  }
+
+  def validatePath(path: Path, isRemote: Boolean): Boolean = {
+    if (isRemote) {
+      azClient.getContainerReference(metadata.azContainerName).exists()
+    } else {
+      new java.io.File(path.toString).exists
+    }
+  }
+
+  override def receive: Receive = {
+    case SaveToLocal(artifactName, protocol) =>
+      log.info("Receive message and starting to working...")
+
+      val uris = generatePaths(artifactName, protocol)
+      val localToSave = new File(uris("localPath").toString)
+
+      // Validate if the protocol is correct
+      if (validatePath(uris("remotePath"), true)) {
+        log.info(s"Copying files from ${metadata.azContainerName}: ${uris("remotePath")} to ${uris("localPath")}")
+
+        // Container name must be lower case.
+        val azBlobContainer = azClient.getContainerReference(metadata.azContainerName)
+
+        //Get local artifact and save to S3 Bucket with name "uris("remotePath")"
+        val blob = azBlobContainer.getBlockBlobReference(uris("remotePath").toString)
+
+        //Get artifact named "uris("remotePath")" from AZURE Blob Container and save it to local
+        blob.downloadToFile(localToSave.getAbsolutePath)
+
+
+        log.info(s"File ${uris("localPath")} saved!")
+      }
+      else {
+        log.error(s"Invalid protocol: ${protocol}, save process canceled!")
+      }
+
+      sender ! Done
+
+    case SaveToRemote(artifactName, protocol) =>
+      log.info("Receive message and starting to working...")
+      val uris = generatePaths(artifactName, protocol)
+      val fileToUpload = new File(uris("localPath").toString)
+
+      // Validate if the protocol is correct
+      if (validatePath(uris("localPath"), false)) {
+        log.info(s"Copying files from ${uris("localPath")} to ${metadata.azContainerName}: ${uris("remotePath")}")
+
+        // Container name must be lower case.
+        val azBlobContainer = azClient.getContainerReference(metadata.azContainerName)
+        azBlobContainer.createIfNotExists()
+
+        //Get local artifact and save to AZURE Blob Container with name "uris("remotePath")"
+        val blob = azBlobContainer.getBlockBlobReference(uris("remotePath").toString)
+
+        try {
+          val sourceStream = new FileInputStream(fileToUpload)
+
+          try blob.upload(sourceStream, fileToUpload.length)
+          finally if (sourceStream != null) sourceStream.close()
+        }
+        log.info(s"File ${uris("localPath")} saved!")
+      }
+      else {
+        log.error(s"Invalid protocol: ${protocol}, save process canceled!")
+      }
+
+      sender ! Done
+
+    case _ =>
+      log.warning("Received a bad format message...")
+  }
+}
+
diff --git a/engine-executor/src/main/scala/org/marvin/artifact/manager/ArtifactSaver.scala b/engine-executor/src/main/scala/org/marvin/artifact/manager/ArtifactSaver.scala
index 840768f..110d246 100644
--- a/engine-executor/src/main/scala/org/marvin/artifact/manager/ArtifactSaver.scala
+++ b/engine-executor/src/main/scala/org/marvin/artifact/manager/ArtifactSaver.scala
@@ -30,6 +30,7 @@
       case "FS" => return Props(new ArtifactFSSaver(metadata))
       case "HDFS" => return Props(new ArtifactHdfsSaver(metadata))
       case "S3" => return Props(new ArtifactS3Saver(metadata))
+      case "AZ" => return Props(new ArtifactAZSaver(metadata))
       case _ => throw new MarvinEExecutorException(s"Can not recognize ArtifactManagerType from EngineMetadata")
     }
   }
diff --git a/engine-executor/src/main/scala/org/marvin/model/Metadata.scala b/engine-executor/src/main/scala/org/marvin/model/Metadata.scala
index 65bcfe8..b53516f 100644
--- a/engine-executor/src/main/scala/org/marvin/model/Metadata.scala
+++ b/engine-executor/src/main/scala/org/marvin/model/Metadata.scala
@@ -25,6 +25,8 @@
                           artifactsRemotePath:String,
                           artifactManagerType:String,
                           s3BucketName:String,
+                          azConnectionString:String,
+                          azContainerName:String,
                           pipelineActions: List[String],
                           onlineActionTimeout:Double,
                           metricsTimeout:Double=10000,
diff --git a/engine-executor/src/test/scala/org/marvin/artifact/manager/ArtifactSaverTest.scala b/engine-executor/src/test/scala/org/marvin/artifact/manager/ArtifactSaverTest.scala
index 3d949c7..bb9a133 100644
--- a/engine-executor/src/test/scala/org/marvin/artifact/manager/ArtifactSaverTest.scala
+++ b/engine-executor/src/test/scala/org/marvin/artifact/manager/ArtifactSaverTest.scala
@@ -24,7 +24,8 @@
   "A engineMetadata with artifactsSaverType as HDFS" should {
     "return Props with actorClass ArtifactHdfsSaver" in {
       val props = ArtifactSaver.build(new EngineMetadata("name",
-        "version", "engineType", null, "artifactsRemotePath", "HDFS", "marvin-artifact-bucket", List("acquisitor"),
+        "version", "engineType", null, "artifactsRemotePath", "HDFS", "marvin-artifact-bucket","",
+        "",List("acquisitor"),
         3000, 3000, 3000, 3000, Option(3000), 3000, "testHost"))
       assert(props.actorClass().toString == "class org.apache.marvin.artifact.manager.ArtifactHdfsSaver")
     }
@@ -33,7 +34,8 @@
   "A engineMetadata with artifactsSaverType as S3" should {
     "return Props with actorClass ArtifactS3Saver" in {
       val props = ArtifactSaver.build(new EngineMetadata("name",
-        "version", "engineType", null, "artifactsRemotePath", "S3", "marvin-artifact-bucket", List("acquisitor"),
+        "version", "engineType", null, "artifactsRemotePath", "S3", "marvin-artifact-bucket", "",
+        "",List("acquisitor"),
         3000, 3000, 3000, 3000, Option(3000), 3000, "testHost"))
       assert(props.actorClass().toString == "class org.apache.marvin.artifact.manager.ArtifactS3Saver")
     }
@@ -42,7 +44,8 @@
   "A engineMetadata with artifactsSaverType as FS" should {
     "return Props with actorClass ArtifactFSSaver" in {
       val props = ArtifactSaver.build(new EngineMetadata("name",
-        "version", "engineType", null, "artifactsRemotePath", "fs", "marvin-artifact-bucket", List("acquisitor"),
+        "version", "engineType", null, "artifactsRemotePath", "fs", "marvin-artifact-bucket", "",
+        "",List("acquisitor"),
         3000, 3000, 3000, 3000, Option(3000), 3000, "testHost"))
       assert(props.actorClass().toString == "class org.apache.marvin.artifact.manager.ArtifactFSSaver")
     }
diff --git a/engine-executor/src/test/scala/org/marvin/executor/api/GenericAPITest.scala b/engine-executor/src/test/scala/org/marvin/executor/api/GenericAPITest.scala
index bc49df8..8a7c7d8 100644
--- a/engine-executor/src/test/scala/org/marvin/executor/api/GenericAPITest.scala
+++ b/engine-executor/src/test/scala/org/marvin/executor/api/GenericAPITest.scala
@@ -98,6 +98,8 @@
         artifactsRemotePath = "",
         artifactManagerType = "",
         s3BucketName = "",
+        azConnectionString = "",
+        azContainerName = "",
         batchActionTimeout = 50,
         engineType = "",
         hdfsHost = "",
@@ -130,6 +132,8 @@
         artifactsRemotePath = "",
         artifactManagerType = "",
         s3BucketName = "",
+        azConnectionString = "",
+        azContainerName = "",
         batchActionTimeout = 50,
         engineType = "",
         hdfsHost = "",
@@ -163,6 +167,8 @@
         artifactsRemotePath = "",
         artifactManagerType = "",
         s3BucketName = "",
+        azConnectionString = "",
+        azContainerName = "",
         batchActionTimeout = 50,
         engineType = "",
         hdfsHost = "",
@@ -201,6 +207,8 @@
         artifactsRemotePath = "",
         artifactManagerType = "",
         s3BucketName = "",
+        azConnectionString = "",
+        azContainerName = "",
         batchActionTimeout = 50,
         engineType = "",
         hdfsHost = "",
@@ -295,6 +303,8 @@
         artifactsRemotePath = "",
         artifactManagerType = "",
         s3BucketName = "",
+        azConnectionString = "",
+        azContainerName = "",
         batchActionTimeout = 50,
         engineType = "",
         hdfsHost = "",
@@ -729,6 +739,8 @@
         artifactsRemotePath = "",
         artifactManagerType = "",
         s3BucketName = "",
+        azConnectionString = "",
+        azContainerName = "",
         batchActionTimeout = 50,
         engineType = "",
         hdfsHost = "",
@@ -804,6 +816,8 @@
         artifactsRemotePath = "",
         artifactManagerType = "",
         s3BucketName = "",
+        azConnectionString = "",
+        azContainerName = "",
         batchActionTimeout = 50,
         engineType = "",
         hdfsHost = "",
@@ -879,6 +893,8 @@
         artifactsRemotePath = "",
         artifactManagerType = "",
         s3BucketName = "",
+        azConnectionString = "",
+        azContainerName = "",
         batchActionTimeout = 50,
         engineType = "",
         hdfsHost = "",
@@ -954,6 +970,8 @@
         artifactsRemotePath = "",
         artifactManagerType = "",
         s3BucketName = "",
+        azConnectionString = "",
+        azContainerName = "",
         batchActionTimeout = 50,
         engineType = "",
         hdfsHost = "",
@@ -1029,6 +1047,8 @@
         artifactsRemotePath = "",
         artifactManagerType = "",
         s3BucketName = "",
+        azConnectionString = "",
+        azContainerName = "",
         batchActionTimeout = 50,
         engineType = "",
         hdfsHost = "",
@@ -1105,6 +1125,8 @@
         artifactsRemotePath = "",
         artifactManagerType = "",
         s3BucketName = "",
+        azConnectionString = "",
+        azContainerName = "",
         batchActionTimeout = 50,
         engineType = "",
         hdfsHost = "",
diff --git a/engine-executor/src/test/scala/org/marvin/fixtures/MetadataMock.scala b/engine-executor/src/test/scala/org/marvin/fixtures/MetadataMock.scala
index 6748b9c..7756249 100644
--- a/engine-executor/src/test/scala/org/marvin/fixtures/MetadataMock.scala
+++ b/engine-executor/src/test/scala/org/marvin/fixtures/MetadataMock.scala
@@ -34,6 +34,8 @@
       artifactsRemotePath = "",
       artifactManagerType = "HDFS",
       s3BucketName = "marvin-artifact-bucket",
+      azConnectionString = "",
+      azContainerName = "",
       batchActionTimeout = 2000,
       engineType = "python",
       hdfsHost = "",
diff --git a/engine-executor/src/test/scala/org/marvin/util/ProtocolUtilTest.scala b/engine-executor/src/test/scala/org/marvin/util/ProtocolUtilTest.scala
index a1eb793..0b1cb87 100644
--- a/engine-executor/src/test/scala/org/marvin/util/ProtocolUtilTest.scala
+++ b/engine-executor/src/test/scala/org/marvin/util/ProtocolUtilTest.scala
@@ -74,6 +74,8 @@
           artifactsRemotePath = "",
           artifactManagerType = "HDFS",
           s3BucketName = "marvin-artifact-bucket",
+          azConnectionString = "",
+          azContainerName = "",
           batchActionTimeout = 100,
           engineType = "python",
           hdfsHost = "",