blob: 970a99f15ed47cb3fa1f318da20295cf0e12cc94 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iota.fey
import akka.actor.{Actor, ActorLogging, ActorRef, Cancellable}
import akka.routing.GetRoutees
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
/**
* Defines the generic actor for Fey integration
* @param params Map of key value pairs that will be used to configure the actor
* @param backoff Backoff interval
* @param connectTo List of ActorRef that are connected to this actor
* (If this actor is A, and it is connect to B and C, so the network would be A -> B,C)
* @param schedulerTimeInterval When this value is different of zero,
* a scheduler will be started calling the execute method
* @param orchestrationName
* @param orchestrationID
*/
abstract class FeyGenericActor(val params: Map[String,String] = Map.empty,
val backoff: FiniteDuration = 1.minutes,
val connectTo: Map[String,ActorRef] = Map.empty,
val schedulerTimeInterval: FiniteDuration = 2.seconds,
val orchestrationName: String = "",
val orchestrationID: String = "",
val autoScale: Boolean = false)
extends Actor with ActorLogging{
import FeyGenericActor._
/**
* Keeps reference to the cancellable
*/
@volatile private var scheduler: Cancellable = null
@volatile private var endBackoff: Long = 0
private[fey] val monitoring_actor = FEY_MONITOR.actorRef
override final def receive: Receive = {
case PRINT_PATH =>
log.info(s"** ${self.path} **")
case STOP =>
context.stop(self)
case PROCESS(message) =>
if(System.nanoTime() >= endBackoff) {
processMessage(message, sender())
}
// In case
case EXCEPTION(reason) => throw reason
case GetRoutees => //Discard
//Not treated messages will be pass over to the receiveComplement
case x => customReceive(x)
}
override final def preRestart(reason: Throwable, message: Option[Any]): Unit = {
context.children foreach { child =>
context.unwatch(child)
context.stop(child)
}
postStop()
}
override final def preStart(): Unit = {
monitoring_actor ! Monitor.START(Utils.getTimestamp, startMonitorInfo)
onStart()
startScheduler()
}
override final def postStop(): Unit = {
monitoring_actor ! Monitor.STOP(Utils.getTimestamp, stopMonitorInfo)
log.info(s"STOPPED actor ${self.path.name}")
stopScheduler()
onStop()
}
override final def postRestart(reason: Throwable): Unit = {
monitoring_actor ! Monitor.RESTART(reason, Utils.getTimestamp)
log.info(s"RESTARTED Actor ${self.path.name}")
preStart()
onRestart(reason)
}
def onRestart(reason: Throwable): Unit = {
log.info("RESTARTED method")
}
/**
* Stops the scheduler
*/
private final def stopScheduler() = {
if (Option(scheduler).isDefined) {
scheduler.cancel()
scheduler = null
}
}
/**
* Enables the backoff.
* Actor will drop the PROCESS messages that are sent during the backoff period time.
*/
final def startBackoff(): Unit = {
this.endBackoff = System.nanoTime() + this.backoff.toNanos
}
/**
* start the sheduled task after 1 second
* The time interval to be used is the one passed to the constructor
*/
private final def startScheduler() = {
if(Option(scheduler).isEmpty && schedulerTimeInterval.toNanos != 0){
scheduler = context.system.scheduler.schedule(1.seconds, schedulerTimeInterval){
try{
execute()
}catch{
case e: Exception => self ! EXCEPTION(e)
}
}
}
}
/**
* Check state of scheduler
*
* @return true if scheduller is running
*/
final def isShedulerRunning():Boolean = {
if(Option(scheduler).isDefined && !scheduler.isCancelled){
true
}else{
false
}
}
/**
* get endBackoff
*
* @return
*/
final def getEndBackoff(): Long = {
endBackoff
}
/**
* Called by the scheduler.
*/
def execute(): Unit = {
log.info(s"Executing action in ${self.path.name}")
}
/**
* Called every time actors receives the PROCESS message.
* The default implementation propagates the message to the connected actors
* and fires up the backoff
*
* @param message message to be processed
* @tparam T Any
*/
def processMessage[T](message: T, sender: ActorRef): Unit = {
log.info(s"Processing message ${message.toString}")
propagateMessage(s"PROPAGATING FROM ${self.path.name} - Message: ${message.toString}")
startBackoff()
}
/**
* This method should be called to propagate the message
* to the actors that are linked.
* Call this method in the end of the method processMessage
*
* @param message message to be propagated
* @tparam T Any
*/
final def propagateMessage[T](message: T): Unit = {
connectTo.foreach(linkedActor => {
linkedActor._2 ! PROCESS(message)
})
}
/**
* Method called after actor has received the START message.
* All the necessary configurations will be ready to be used.
*/
def onStart(): Unit = {
log.info(s"Actor ${self.path.name} started.")
}
/**
* Method called after actor has been stopped.
* Any scheduler that might have been running will be already canceled.
*/
def onStop(): Unit = {
log.info(s"Actor ${self.path.name} stopped.")
}
/**
* Any message that was not processed by the default actor receiver
* will be passed to this method.
*
* @return
*/
def customReceive: Receive
/**
* Used to set a info message when sending Stop monitor events
*
* @return String info
*/
def stopMonitorInfo:String = "Stopped"
/**
* Used to set a info message when sending Start monitor events
*
* @return String info
*/
def startMonitorInfo:String = "Started"
}
object FeyGenericActor {
/**
* Stops the actor
*/
case object STOP
/**
* Default message to execution an action when it is received.
*
* @param message message to be processed
* @tparam T Any
*/
case class PROCESS[T](message: T)
/**
* Message sent to the actor when an exception happens in the scheduler
*
* @param reason
*/
case class EXCEPTION(reason: Throwable)
/**
* Prints the path of the actor
*/
case object PRINT_PATH
}