blob: cb95948962cb10cb44afd5ca1733182ca02d196e [file] [log] [blame]
/*
* Copyright [2019] [Apache Software Foundation]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.marvin.executor.manager
import akka.actor.{Actor, ActorLogging}
import akka.util.Timeout
import org.apache.marvin.executor.api.{GenericAPI, GenericAPIFunctions}
import org.apache.marvin.executor.manager.ExecutorManager.{GetMetadata, StopActor}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{Failure, Success}
object ExecutorManager {
case class StopActor(actorName: String)
case class GetMetadata()
}
class ExecutorManager(api: GenericAPIFunctions) extends Actor with ActorLogging {
implicit val ec = ExecutionContext.global
implicit val futureTimeout = Timeout(30 seconds)
override def preStart() = {
log.info(s"Executor Manager enabled and starting!!!")
log.info(s"Executor Manager path ${self.path}")
}
override def receive = {
case StopActor(actorName) =>
if(api.manageableActors.contains(actorName)){
val actorRef = api.manageableActors(actorName)
log.info(s"Actor ${actorRef.path} found. Trying to stop selected actor..")
context.stop(actorRef)
log.info(s"Actor ${actorRef.path} successfully stopped!")
sender ! Success
}else{
log.info(s"Actor related with the key ${actorName} is not a valid manageable actor.")
sender ! Failure
}
case GetMetadata =>
log.info(s"Getting Metadata object from engine ...")
sender ! api.getMetadata
}
}