blob: e2944f00b86ffb9c1663932d26e94706a97fa090 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iota.fey
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, PoisonPill, Props, Terminated}
import akka.routing._
import org.apache.iota.fey.JSON_PATH._
import play.api.libs.json.JsObject
import scala.collection.mutable.HashMap
import scala.concurrent.duration._
protected class Ensemble(val orchestrationID: String,
val orchestrationName: String,
val ensembleSpec: JsObject) extends Actor with ActorLogging{
import Ensemble._
val monitoring_actor = FEY_MONITOR.actorRef
var performers_metadata: Map[String, Performer] = Map.empty[String, Performer]
var connectors: Map[String,Array[String]] = Map.empty[String,Array[String]]
var performer: Map[String,ActorRef] = Map.empty[String,ActorRef]
override def receive: Receive = {
case STOP_PERFORMERS => stopPerformers()
case FORCE_RESTART_ENSEMBLE => throw new RestartEnsemble(s"Forcing restart of ensemble. Reason: Shared performer DEAD")
case PRINT_ENSEMBLE =>
val ed = connectors.map(connector => {
s""" \t ${connector._1} : ${connector._2.mkString("[",",","]")}"""
}).mkString("\n")
val nd = performers_metadata.map(performer_metadata => s"${performer_metadata._1}").mkString("[",",","]")
val actors = performer.map(actor => actor._2.path.name).mkString("["," | ","]")
log.info(s"Edges: \n$ed \nNodes: \n\t$nd \nPerformers \n\t$actors")
context.actorSelection(s"*") ! FeyGenericActor.PRINT_PATH
case Terminated(actor) =>
monitoring_actor ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
log.error(s"DEAD nPerformers ${actor.path.name}")
context.children.foreach{ child =>
context.unwatch(child)
context.stop(child)
}
throw new RestartEnsemble(s"DEAD Performer ${actor.path.name}")
case GetRoutees => //Discard
case x => log.warning(s"Message $x not treated by Ensemble")
}
/**
* If any of the performer dies, it tries to restart it.
* If we could not be restarted, then the terminated message will be received
* and Ensemble is going to throw an Exception to its orchestration
* asking it to Restart the entire Ensemble. The restart will then stop all of its
* children when call the preStart.
*/
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
case e: Exception => Restart
}
/**
* Uses the json spec to create the performers
*/
override def preStart() : Unit = {
monitoring_actor ! Monitor.START(Utils.getTimestamp)
val connectors_js = (ensembleSpec \ CONNECTIONS).as[List[JsObject]]
val performers_js = (ensembleSpec \ PERFORMERS).as[List[JsObject]]
performers_metadata = extractPerformers(performers_js)
connectors = extractConnections(connectors_js)
performer = createPerformers()
val ed = connectors.map(connector => {
s""" \t ${connector._1} : ${connector._2.mkString("[",",","]")}"""
}).mkString("\n")
val nd = performers_metadata.map(performer => s"${performer._1}").mkString("[",",","]")
val actors = performer.map(actor => actor._2.path.name).mkString("["," | ","]")
log.info(s"Edges: \n$ed \nNodes: \n\t$nd \nPerformers \n\t$actors")
}
override def postStop() : Unit = {
monitoring_actor ! Monitor.STOP(Utils.getTimestamp)
}
override def postRestart(reason: Throwable): Unit = {
monitoring_actor ! Monitor.RESTART(reason, Utils.getTimestamp)
preStart()
}
def getActorsName(): Set[String] = {
performer.map(performer => performer._2.path.name).toSet
}
private def createPerformers(): Map[String,ActorRef] = {
val tmpActors: HashMap[String, ActorRef] = HashMap.empty
connectors.foreach {
case (performerID: String, connectionIDs: Array[String]) =>
try {
createFeyActor(performerID, connectionIDs, tmpActors)
} catch {
/* if the creation fails, it will stop all the actors in the Ensemble */
case e: Exception =>
log.error(e,"During Performer creation")
throw new RestartEnsemble("Not able to create the Performers.")
}
}
//Treat performers that are not part of the connectors
noConnectionsPerformers(tmpActors)
tmpActors.toMap
}
/**
* Creates actors that was not part of an Connection
*
* @param tmpActors
*/
private def noConnectionsPerformers(tmpActors: HashMap[String, ActorRef]) = {
performers_metadata.filter(performer_metadata => !tmpActors.contains(performer_metadata._1)).
foreach(performer_metadata => {
try {
createFeyActor(performer_metadata._1, Array.empty, tmpActors)
} catch {
/* if the creation fails, it will stop all the actors in the Ensemble */
case e: Exception =>
log.error(e, "Problems while creating Performer")
throw new RestartEnsemble("Not able to create the Performer.")
}
})
}
/**
* Create an actorOf for the performer in the Ensemble
*
* @param performerID performer uid
* @param connectionIDs performer connections (connectors)
* @param tmpActors auxiliar map of actorsRef
* @return (performerID, ActorRef of the performer)
*/
private def createFeyActor(performerID: String, connectionIDs: Array[String], tmpActors:HashMap[String, ActorRef]):(String, ActorRef) = {
// performer was already created
if(tmpActors.contains(performerID)){
(performerID, tmpActors.get(performerID).get)
}
else{
val performerInfo = performers_metadata.get(performerID)
if (performerInfo.isDefined) {
val connections: Map[String, ActorRef] = connectionIDs.map(connID => {
createFeyActor(connID, connectors.getOrElse(connID,Array.empty),tmpActors)
}).toMap
var actor: ActorRef = null
val actorProps = getPerformer(performerInfo.get, connections)
if(performerInfo.get.autoScale) {
val resizer = DefaultResizer(lowerBound = performerInfo.get.lowerBound, upperBound = performerInfo.get.upperBound,
messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.get.backoffThreshold, backoffRate = 0.1)
val strategy =
if(performerInfo.get.isRoundRobin) {
log.info(s"Using Round Robin for performer ${performerID}")
RoundRobinPool(1, Some(resizer))
} else {
log.info(s"Using Smallest mailbox for performer ${performerID}")
SmallestMailboxPool(1, Some(resizer))
}
actor = context.actorOf(strategy.props(actorProps), name = performerID)
}else{
actor = context.actorOf(actorProps, name = performerID)
}
context.watch(actor)
tmpActors.put(performerID, actor)
(performerID, actor)
}else if(GlobalPerformer.activeGlobalPerformers.contains(orchestrationID)
&& GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.contains(performerID)){
// Performer is a global orchestration performer and is already created
(performerID, GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.get(performerID).get)
}else if(CoreSharedPerformers.activeSharedPerformers.contains(performerID)){
// Shared performers, level is FeyCore
CoreSharedPerformers.ensemblesUsingShared.put((self.path.toString, performerID), self)
(performerID, CoreSharedPerformers.activeSharedPerformers.get(performerID).get)
}
else {
throw new IllegalPerformerCreation(s"Performer $performerID is not defined in the JSON")
}
}
}
/**
* Creates actor props based on JSON configuration
*
* @param performerInfo Performer object
* @param connections connections
* @return Props of actor based on JSON config
*/
private def getPerformer(performerInfo: Performer, connections: Map[String, ActorRef]): Props = {
var clazz:Option[Class[FeyGenericActor]] = None
Utils.loadedJars.synchronized {
clazz = Some(loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName))
}
if(clazz.isDefined) {
val dispatcher = if (performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else ""
val actorProps = Props(clazz.get,
performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, performerInfo.autoScale)
// dispatcher has higher priority than controlAware. That means that if both are defined
// then the custom dispatcher will be used
if (dispatcher != "") {
log.info(s"Using dispatcher: $dispatcher")
actorProps.withDispatcher(dispatcher)
}
else if (performerInfo.controlAware) {
actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
} else {
actorProps
}
}else{
log.error(s"Could not load class for performer ${performerInfo.uid}")
throw new ClassNotFoundException(s"${performerInfo.jarName} -- ${performerInfo.jarLocation}")
}
}
/**
* Load a clazz instance of FeyGenericActor from a jar
*
* @param classPath class path
* @param jarLocation Full path where to load the jar from
* @return clazz instance of FeyGenericActor
*/
private def loadClazzFromJar(classPath: String, jarLocation: String, jarName: String):Class[FeyGenericActor] = {
try {
Utils.loadActorClassFromJar(jarLocation,classPath,jarName)
}catch {
case e: Exception =>
log.error(e,s"Could not load class $classPath from jar $jarLocation. Please, check the Jar repository path as well the jar name")
throw e
}
}
override val toString = {
val ed = connectors.map(connector => {
s""" \t ${connector._1} : ${connector._2.mkString("[",",","]")}"""
}).mkString("\n")
val nd = performers_metadata.map(performer => s"${performer._1}").mkString("[",",","]")
val actors = performer.map(actor => actor._2.path.name).mkString("["," | ","]")
s"Edges: \n$ed \nNodes: \n\t$nd \nPerformer \n\t$actors"
}
def stopPerformers(): Unit = {
performer.foreach(actor => actor._2 ! PoisonPill)
}
}
object Ensemble {
/**
* Extract a map from the connectors json
*
* @param connectors connectors json
* @return map of connectors
*/
def extractConnections(connectors: List[JsObject]): Map[String,Array[String]] = {
connectors.flatMap(connector => {
connector.keys.map(key => {
(key, (connector \ key).as[List[String]].toArray)
}).toMap
}).toMap
}
/**
* Extratc a map from the performers json
*
* @param performers performers json
* @return Map of performers
*/
def extractPerformers(performers: List[JsObject]): Map[String, Performer] = {
performers.map(performer => {
val id: String= (performer \ GUID).as[String]
val schedule: Int = (performer \ SCHEDULE).as[Int]
val backoff: Int = (performer \ BACKOFF).as[Int]
val autoScale: Boolean = if (performer.keys.contains(PERFORMER_AUTO_SCALE)) true else false
val lowerBound: Int = if (autoScale) (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_LOWER_BOUND).as[Int] else 0
val upperBound: Int = if (autoScale) (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_UPPER_BOUND).as[Int] else 0
if(lowerBound > upperBound){
throw new IllegalArgumentException(" Could not define performer. Autoscale param: Lower bound greater than upper bound")
}
val threshold: Double = if (autoScale && (performer \ PERFORMER_AUTO_SCALE).as[JsObject].keys.contains(PERFORMER_BACKOFF_THRESHOLD)) {
(performer \ PERFORMER_AUTO_SCALE \ PERFORMER_BACKOFF_THRESHOLD).as[Double]
}
else {
0.3
}
val roundRobin: Boolean = if (autoScale && (performer \ PERFORMER_AUTO_SCALE).as[JsObject].keys.contains(PERFORMER_ROUND_ROBIN)) {
(performer \ PERFORMER_AUTO_SCALE \ PERFORMER_ROUND_ROBIN).as[Boolean]
}
else {
false
}
val jarName: String = (performer \ SOURCE \ SOURCE_NAME).as[String]
val classPath: String = (performer \ SOURCE \ SOURCE_CLASSPATH).as[String]
val params:Map[String,String] = getMapOfParams((performer \ SOURCE \ SOURCE_PARAMS).as[JsObject])
val controlAware:Boolean = if (performer.keys.contains(CONTROL_AWARE)) (performer \ CONTROL_AWARE).as[Boolean] else false
val location: String = if ( (performer \ SOURCE).as[JsObject].keys.contains(JAR_LOCATION) ) CONFIG.DYNAMIC_JAR_REPO else CONFIG.JAR_REPOSITORY
val dispatcher: String = if (performer.keys.contains(PERFORMER_DISPATCHER)) (performer \ PERFORMER_DISPATCHER).as[String] else ""
(id, new Performer(id, jarName, classPath, params, schedule.millisecond, backoff.millisecond,
autoScale, lowerBound, upperBound,threshold, roundRobin,controlAware, location, dispatcher))
}).toMap
}
/**
* returns a map of the params
*
* @param params params json
* @return map of params where key is the json key and value is the json value for the key
*/
private def getMapOfParams(params: JsObject):Map[String,String] = {
params.keys.map(key => {
(key.toString, (params \ key).as[String])
}).toMap
}
/**
* Message that send the START message to all of the Performers
*/
case object STOP_PERFORMERS
case object PRINT_ENSEMBLE
case object FORCE_RESTART_ENSEMBLE
}
/**
* Holds the performer information
*
* @param uid
* @param jarName
* @param classPath
* @param parameters
* @param schedule
* @param backoff
* @param autoScale
* @param lowerBound
* @param upperBound
* @param backoffThreshold
* @param isRoundRobin
* @param controlAware
* @param jarLocation
* @param dispatcher
*/
case class Performer(uid: String, jarName: String,
classPath: String, parameters: Map[String,String],
schedule: FiniteDuration, backoff: FiniteDuration,
autoScale: Boolean, lowerBound: Int, upperBound: Int,
backoffThreshold: Double, isRoundRobin: Boolean, controlAware: Boolean,
jarLocation: String, dispatcher: String)