blob: 0eb3a1bd4fc229954b5db2be69952cfd59ed2474 [file] [log] [blame]
/*
* Copyright [2017] [B2W Digital]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.marvin.artifact.manager
import java.io.File
import akka.Done
import akka.actor.{Actor, ActorLogging}
import com.amazonaws.services.s3.model.GetObjectRequest
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import org.apache.hadoop.fs.Path
import org.marvin.artifact.manager.ArtifactSaver.{SaveToLocal, SaveToRemote}
import org.marvin.model.EngineMetadata
class ArtifactS3Saver(metadata: EngineMetadata) extends Actor with ActorLogging {
var s3Client: AmazonS3 = _
override def preStart() = {
log.info(s"${this.getClass().getCanonicalName} actor initialized...")
//Create S3 Client with default credential informations(Environment Variable)
s3Client = AmazonS3ClientBuilder.standard.withRegion(System.getenv("AWS_DEFAULT_REGION")).build
log.info("Amazon S3 client initialized...")
}
def generatePaths(artifactName: String, protocol: String): Map[String, Path] = {
var artifactsRemotePath: String = null
if(metadata.artifactsRemotePath.startsWith("/")){
artifactsRemotePath = metadata.artifactsRemotePath.substring(1)
}
Map(
"localPath" -> new Path(s"${metadata.artifactsLocalPath}/${metadata.name}/$artifactName"),
"remotePath" -> new Path(s"${artifactsRemotePath}/${metadata.name}/${metadata.version}/$artifactName/$protocol")
)
}
def validatePath(path: Path, isRemote: Boolean): Boolean = {
if (isRemote) {
s3Client.doesObjectExist(metadata.s3BucketName, path.toString)
} else {
new java.io.File(path.toString).exists
}
}
override def receive: Receive = {
case SaveToLocal(artifactName, protocol) =>
log.info("Receive message and starting to working...")
val uris = generatePaths(artifactName, protocol)
val localToSave = new File(uris("localPath").toString)
// Validate if the protocol is correct
if (validatePath(uris("remotePath"), true)) {
log.info(s"Copying files from ${metadata.s3BucketName}: ${uris("remotePath")} to ${uris("localPath")}")
//Get artifact named "uris("remotePath")" from S3 Bucket and save it to local
s3Client.getObject(new GetObjectRequest(metadata.s3BucketName, uris("remotePath").toString), localToSave)
log.info(s"File ${uris("localPath")} saved!")
}
else {
log.error(s"Invalid protocol: ${protocol}, save process canceled!")
}
sender ! Done
case SaveToRemote(artifactName, protocol) =>
log.info("Receive message and starting to working...")
val uris = generatePaths(artifactName, protocol)
val fileToUpload = new File(uris("localPath").toString)
// Validate if the protocol is correct
if (validatePath(uris("localPath"), false)) {
log.info(s"Copying files from ${uris("localPath")} to ${metadata.s3BucketName}: ${uris("remotePath")}")
//Get local artifact and save to S3 Bucket with name "uris("remotePath")"
s3Client.putObject(metadata.s3BucketName, uris("remotePath").toString, fileToUpload)
log.info(s"File ${uris("localPath")} saved!")
}
else {
log.error(s"Invalid protocol: ${protocol}, save process canceled!")
}
sender ! Done
case _ =>
log.warning("Received a bad format message...")
}
}