blob: 160e051793c1fbf8bd5881b10a6911f25504b3b9 [file] [log] [blame]
/*
* Copyright [2019] [Apache Software Foundation]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.marvin.executor.actions
import java.time.LocalDateTime
import java.util.NoSuchElementException
import akka.Done
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.pattern.ask
import akka.util.Timeout
import org.apache.marvin.artifact.manager.ArtifactSaver
import org.apache.marvin.artifact.manager.ArtifactSaver.SaveToRemote
import org.apache.marvin.exception.MarvinEExecutorException
import org.apache.marvin.executor.actions.PipelineAction.{PipelineExecute, PipelineExecutionStatus}
import org.apache.marvin.executor.proxies.BatchActionProxy
import org.apache.marvin.executor.proxies.EngineProxy.{ExecuteBatch, Reload}
import org.apache.marvin.model._
import org.apache.marvin.util.{JsonUtil, LocalCache}
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.Success
object PipelineAction {
case class PipelineExecute(protocol:String, params:String)
case class PipelineExecutionStatus(protocol:String)
}
class PipelineAction(metadata: EngineMetadata) extends Actor with ActorLogging{
implicit val ec = context.dispatcher
var artifactSaver: ActorRef = _
var cache: LocalCache[BatchExecution] = _
override def preStart() = {
artifactSaver = context.actorOf(ArtifactSaver.build(metadata), name = "artifactSaver")
cache = new LocalCache[BatchExecution](maximumSize = 10000L, defaultTTL = 30.days)
}
override def receive = {
case PipelineExecute(protocol, params) =>
implicit val futureTimeout = Timeout(metadata.pipelineTimeout milliseconds)
log.info(s"Starting to process pipeline process with. Protocol: [$protocol] and Params: [$params].")
cache.save(protocol, new BatchExecution("pipeline", protocol, LocalDateTime.now, Working))
try{
for(actionName <- metadata.pipelineActions){
val engineActionMetadata = metadata.actionsMap(actionName)
val _actor: ActorRef = context.actorOf(Props(new BatchActionProxy(engineActionMetadata)), name = actionName.concat("Actor"))
Await.result((_actor ? Reload(protocol)), futureTimeout.duration)
Await.result((_actor ? ExecuteBatch(protocol, params)), futureTimeout.duration)
context stop _actor
val futures:ListBuffer[Future[Done]] = ListBuffer[Future[Done]]()
for(artifactName <- engineActionMetadata.artifactsToPersist) {
futures += (artifactSaver ? SaveToRemote(artifactName, protocol)).mapTo[Done]
}
if (!futures.isEmpty) Future.sequence(futures).onComplete{
case Success(response) =>
log.info(s"All artifacts from [$actionName] were saved with success!! [$response]")
}
}
}catch {
case e: Exception =>
cache.save(protocol, new BatchExecution("pipeline", protocol, LocalDateTime.now, Failed))
throw e
}
cache.save(protocol, new BatchExecution("pipeline", protocol, LocalDateTime.now, Finished))
case PipelineExecutionStatus(protocol) =>
log.info(s"Getting pipeline execution status to protocol $protocol.")
try {
sender ! JsonUtil.toJson(cache.load(protocol).get)
}catch {
case _: NoSuchElementException =>
sender ! akka.actor.Status.Failure(new MarvinEExecutorException(s"Protocol $protocol not found!"))
}
case Done =>
log.info("Work Done!")
case _ =>
log.warning(s"Not valid message !!")
}
}