tree: e7564b47b781fe6d05ec2312674af0b7209157d5 [path history] [tgz]
  1. images/
  2. src/
  3. LICENSE
  4. NOTICE
  5. README.md
fey-core/README.md

Fey Engine

Fey is an Akka based framework that facilitates the definition of Fey actors each actor implementing a Fey component. Fey actors extend a generic Fey Actor FeyGenericActor and override generic functions, like onStart and execute , to define an autonomous computation. Each Fey actor should be provided through a .jar which the Fey engine loads to access the actors functionality.

The Actors defined by a set of .jar files are referenced via a JSON configuration file that we call an Orchestration. The Orchestration is used to define which actors to use, what building properties of the actor to set, what unique parameters to pass into each actor, and what message passing relationship actors have with each other. In essence an Orchestration defines what will be computed by the Fey engine. The .jar files and the Orchestration files are stored in locations that the Fey Engine is told of , through its configuration, when it starts up. The Fey engine manages the lifecycle of the actors, called Performers, defined by the Orchestrations. This includes the creation, execution, deletion, scalability and fault-tolerance of the component actors. An Orchestrations defines the message passing relationship that exists between actors as well as any time based scheduler to be implemented by each actor. Each Orchestration defines a computation to be executed by the Fey engine. Note each actor can specify, in the Orchestration file, that it wants Fey to automatically scale instances of an actor upwards or downwards based on the actors incoming message load.

In summary the Fey engine manages Orchestrations of asynchronous networks of continuously operating actors that interact with each other through message passing. If you have a pre-defined set of .jar files then programming the Fey engine is simply a matter of defining Orchestrations via JSON file. Orchestration files can be added, updated and deleted and the Fey engine will manage the Orchestration life cycle.

The Fey engine can be used at different levels of compute infrastructure. For example it can run on low cost devices such as a RaspberryPi, on a standalone server or in a Mesos cluster. Remote invocation of actors make it possible for Orchestrations running in one location to interact with Orchestrations running in another where services not available locally can be accessed.

Prerequisites

  1. Java SDK (Software Development Kit) 1.8+
  2. Scala 2.11
  3. SBT 0.13.11 (http://www.scala-sbt.org/)

Architecture

Fey is composed by Akka actors. Following is the description of each one of the actors:

  1. Fey-Core: Main actor of the system. It is the highest ancestor of all other actors and is created right after Fey started.
  2. Directory-Watcher: Actor responsible for monitoring the JSON directory and notifying Fey-Core when a new file event happens.
  3. Fey-Identifier: Actor responsible for sending the Identity message to each one of the active actors. Routees are not affected by this actor.
  4. Orchestration-ID: A new instance of Orchestration is created every time we process a JSON with a new Orchestration GUID. It is responsible for managing the Ensembles.
  5. Ensemble-ID: A new instance of Ensemble is created under the correspondent Orchestration every time the JSON specify a new Ensemble GUID.
  6. Performer-ID: A new instance is created for each Performer inside the Ensemble. Each Performer belongs to an Ensemble and also has a GUID.

The figure below shows the actor hierarchy for Fey. In this example, Fey has only two Orchestrations running, in which each Orchestration has two Ensembles, and each Ensemble has its own Performers.

Fey actor hierarchy

Each actor path follows the following pattern:

  • Orchestration - /FEY-CORE/ORCHESTRATION-GUID/
  • Ensemble - /FEY-CORE/ORCHESTRATION-GUID/ENSEMBLE-GUID/
  • Performer - /FEY-CORE/ORCHESTRATION-GUID/ENSEMBLE-GUID/PERFORMER-GUID

Orchestrations manage their Ensembles and Ensembles manage their Performers

When started Fey processes things in the following order:

  1. If checkpoint functionality is enabled it processes the JSON files in the checkpoint directory (see checkpointing below)
  2. It processes JSON files in the Fey JSON repository directory
  3. Finally it starts monitoring for update and create events in the Fey JSON repository.

Active Actors

Fey offers a REST-API end point that displays all active actors. It also uses the Fey-Identifier actor that sends an Identity message to the actors. Having said that, some actors may take longer to answer back with their identity than others. This can make it seem like those actor are not active becuase they are not present in the response.

The REST-API binds to localhost listening to the port 16666, and the end point should be called with:

http://localhost:16666/fey/activeactors

Below is a sample of what you might see:

Fey actor hierarchy

Running Fey

You can specify one argument in order to run Fey:

  1. Fey config file: Path to the configuration file to be used by Fey. It is a optional argument, if it is not defined then the default configuration will be used.

You just need to execute Fey .jar:

java -jar fey.jar FEY_CONFIG_FILE

Fey will process all the JSON files in JSON directory specified in the configuration file and then it will start monitoring that directory for file events. Note: If checkpointing is enabled, then all files in the checkpoint directory will be processed before the ones in the JSON directory.

Fey Configuration

The Fey configuration file can optionally include one of more of the following properties:

PropertyTypeDescriptionDefault
enable-checkpointBooleanKeeps track of the latest Orchestrations running in case Fey stops, it will restart Fey from the checkpoint. When checkpointing is enabled fey will add .processed to all processed files and start all Orchestrations in the checkpoint dir. If checkpointing is disabled then no checkpoint will be created.true
checkpoint-directoryStringPath where Fey should keep the checkpoint files/tmp/fey/checkpoint
json-repositoryStringPath that will be monitored by Fey in order to get the JSON files~/feyJSONRepo
json-extensionStringExtension of the files that should be processed by Fey.json
jar-repositoryStringPath where Fey should look for the GenericActor jars to be loaded from~/feyJarRepo
log-levelStringLog level for FeyDEBUG
log-appenderStringEnable or disable the appender based on user configuration. Accepts 3 options: FILE or STDOUT or FILE_STDOUTSTDOUT
auto-scale.messages-per-resizeIntegerAffects only Performers that are able to auto scale. Defines the volume of message that will trigger a resize500
dynamic-jar-populationObjectFor details check Dynamic Jar Population

Fey Logging

Fey uses logback.xml to configure its logs. By Default, Fey appends the logs to STDOUT. You can change the configuration to log a file or you could log to both. If you save the log to a file the default location would be at ${HOME}/.fey/logs/. Fey uses a Rolling File Appender where each log file has a max size of one megabyte (1MB) and it keeps 30 log files at maximum.

In Fey the default log level is DEBUG for the entire system, and the other configuration offered by Fey are log level .

Dynamic Jar Population

Fey offers the possibility of downloading the jar to be used by the Performer before it starts it, which means that the jar could not be in the jar repo and will be download when requested to.

The jars downloaded on demand will be stored in the directory specified in Fey configuration dynamic-jar-population.downloaded-repository. The default value is ${HOME}"/.fey/jars".

Fey will download the jar only when it is not available in the downloaded-repository. By default Fey will not clean up the downloaded-repository every time it is launched. If you want to force this condition you can change the Fey configuration dynamic-jar-population.force-pull to true.

dynamic-jar-population{
  downloaded-repository = ${HOME}"/.fey/jars"
  force-pull = false
}

In order to tell Fey to download a jar, you have to specify its location in the JSON, inside source of the Performer.

"location": {
  "url": "https://my-remote-location",
  "credentials":{
    "user": "USERNAME_REPO",
    "password": "PASSWORD_REPO"
  }

location is an optional property. If it is present, the url property must be defined as well. The credentials property is also optional.

You can use environment variables to define your credentials. Fey will first try to resolve the user and the password value by looking to the environment variables, in case it does not exists, the value itself will be used.

For example, if your user property is MYNAME, Fey will look for an environment variable called MYNAME, if it is able to find, the value in the environment variable will be used, if it is not able to find the value MYNAME will be used.

JSON Configuration

Fey gets its instructions to start the generic actors through a well defined JSON schema. For Fey, each JSON will specify an Orchestration, in which will be defined the Ensembles with its Performers.

The Orchestration specification is defined at the root of the JSON, and requires the following JSON properties:

PropertyTypeDescription
guidStringGlobal unique Orchestration identifier. If this property changes it value, it will be considered a new Orchestration by Fey.
timestampStringOrchestration timestamp. Holds the timestamp for the last action in the Orchestration.
nameStringOrchestration name.
commandStringSee details in Orchestration Commands.
ensemblesArray[Object]Holds all the Ensembles that this Orchestration will be running. See Ensembles for more details

Ensemble

An Orchestration can have one or more Ensembles. Each Ensemble must define the following properties:

PropertyTypeDescription
guidStringMust be unique inside the Orchestration
commandStringSee details in Ensembles Commands.
performersArray[Object]See details in Ensemble Performers.
connectionsArray[Object]See details in Ensemble Connections.

###Orchestration Commands Fey drives the Orchestration based on a set of 4 commands:

  1. CREATE: Fey will check if there is an Orchestration with the same guid running. If there isn't, Fey will create the Orchestration, the Ensembles and its Performers, and start them. If there is already an Orchestration running, Fey will log an WARN, and nothing will be created or updated.
  2. UPDATE: Fey will check if there is already an Orchestration with the same guid running. If there is, Fey will check the command for each one of the Ensembles and execute the respective action. If there isn't, Fey will log an WARN, and nothing will happen. Please, see Ensemble Commands for a list of available commands.
  3. DELETE: If there is an Orchestration with the same guid running, Fey will stop all of the actors and delete the Orchestration.
  4. RECREATE: The recreate commands does not care if the Orchestration exists or not, it will always try to delete the Orchestration and then create a new one based on the JSON specification.

###Ensemble Commands

If the Orchestration command is UPDATE, Fey will check if the Orchestration is running and then for each one of the Ensembles it will check the specific command:

  1. CREATE: Creates a new Ensemble if there isn't one running yet.
  2. UPDATE: Deletes the Ensemble and starts a new one using the new configuration.
  3. DELETE: Deletes the Ensemble.
  4. NONE: The Ensemble will not change.

Ensemble Performers

For Fey, each Performer represents a Generic Actor which should have the following properties:

PropertyTypeDescription
guidStringMust be a unique ID inside the Ensemble
controlAwareBooleanOptional property. Tells if the actor should use a Control aware Mailbox, so the Control messages have higher priority over the others. If not specified then the actor will use the Default mailbox.
autoScaleIntegerOptional property. Tells if the actor should be a load balanced actor. If zero or not specified, the actor will be started without the load balancing property. If greater than zero, the actor will be started using load balancing and the max number of replicated actors is the specified number. It means that if the value is 10, then the actor will be a load balanced actor and it can scale up to 10 replicas.
scheduleIntegerDefines the time interval in Milliseconds for the actor scheduler. If zero, no scheduler will be started.
backoffIntegerDefines the time window in Milliseconds that the actor should backoff after receiving a PROCESS message. (See Handling Backoff for more details.)
sourceObjectDefines the needed information used by Fey to load the GenericActor. See Source for details.

Source Property

The source property of an Performer holds the necessary information for loading the actor from a .jar. Each Performer has only one source property and it should contain the following information:

PropertyTypeDescription
nameStringJar name that contains the Generic Actor. This jar must be present in the specified jar repo. The jar name is not case sensitive.
classPathStringclass path for the GenericActor class inside the .jar. It should include the package as well.
parametersObjectContains any additional information that will be used by the GenericActor. It will be passed to the actor as a HashMap[String,String] in which the key is the property name, and the value is the property value. It can contain as many properties as you want to.
locationObjectOptional. Please, see Dynamic Jar Population for more details.

Ensemble Connections

The Connections property of an Ensemble defines the connection between the Performers. See connectTo constructor parameter for more details about how this information is used.

An object inside the Connections property obeys the following pattern:

  1. Property name: Performer GUID that will be connect to the property values.
  2. Property value: Array of Performer IDs that will be connected to the property name.

The Performer IDs must be defined at the Performer property of the Ensemble in order to be used inside the Connections

Sample JSON

This JSON specifies an Orchestration that has only one Ensemble. The Ensemble defines the Redis and ZMQ .jar and then maps the Redis Permer's output messages to to the ZMQ Performer.

{  
  "guid":"MYORCHESTRATION01",
  "command":"CREATE",
  "timestamp":"213263914535",
  "name":"ORCHESTRATION SAMPLE JSON CONFIG",
  "ensembles":[  
    {  
      "guid":"ENSEMBLE01",
      "command":"NONE",
      "performers":[  
        {  
          "guid":"REDIS",
          "schedule":1000,
          "backoff":0,
          "source":{  
            "name":"fey-redis.jar",
            "classPath":"com.fey.RedisConnector",
            "parameters":{  
              "server":"localhost",
              "keys":"{\"keys\":[\"key1\",\"key2\"]}",
              "port":"1234"
            }
          }
        },
        {  
          "guid":"ZMQ",
          "schedule":0,
          "loadBalance":3,
          "backoff":0,
          "source":{  
            "name":"fey-zmq.jar",
            "classPath":"com.fey.ZMQPublisher",
            "parameters":{  
              "server":"localhost",
              "topic":"mytopic",
              "port":"1235"
            }
          }
        }
      ],
      "connections":[  
        {  
          "REDIS":[  
            "ZMQ"
          ]
        }
      ]
    }
  ]
}

Fey Checkpoint

Fey will keep track of the latest version of each Orchestration running. This can be disabled through the configuration file. When checkpoint is enabled, Fey will process the files in the JSON directory and for each it will add a file extension that refects its status:

  1. .processed: Means that the file had the correct JSON schema and was able to be processed by Fey.
  2. .failed: Means that something was wrong in the JSON and it could not be parsed by Fey.

All running Orchestration have their JSON file stored in the checkpoint directory. If the Fey system restarts, Fey will restart all Orchestrations in the checkpoint directory prior to starting new ones that have appeared in the JSON directory.

If checkpoint is not enabled no extension will be added to the JSON files and no checkpoint will be kept. This option is useful for the developers of Fey performers when in development mode.

Developing New Performers

All actors (called Performers in Fey) used by Fey must be an extension of the FeyGenericActor abstract class. In order to create your own Fey Generic Actor you will need to add fey as a provided library to your project and create a new class that extends from FeyGenericActor.

###Constructor

The Generic Actor offers the following constructor parameters:

NameTypeDescription
paramsMap[String,String]Holds all the extra configuration that is going to be used by the actor. For example: (“port” -> “1234”)
backoffFiniteDurationTime interval to backoff after the processing of the message PROCESS. Will always be greater or equal to zero
connectToMap[String,ActorRef]Holds all the actors that your actor is suppose to propagate a message to. Maps the Actor's ID to the ActorRef
schedulerTimeIntervalFiniteDurationTime interval for the scheduler. If it is zero, then no scheduler will be started for the actor.
orchestrationNameStringName of the Orchestration in which the actor belongs to
orchestrationIDStringId of the Orchestration in which the actor belongs to
autoScaleBooleanTrue means that this actor will be replicated in order to obtain scalability, so you should be ware that the onStart will be called for each replica.

When starting a new Performer, Fey will give to the actor all the configuration specified in the Orchestration's JSON.

The GenericActor must override all the constructor parameters in order and can not define any extra ones.

The reason for these restrictions is that Fey will load your actor from the .jar and you generate the actor reference by passing in the list of constructor parameters in order. In case these restrictions are not obeyed, Fey will throw an IllegalArgumentException exception during the creation of the actor because it could not find a matching constructor on the class.

class MyGenericActor(override val params: Map[String, String] = Map.empty,
               override val backoff: FiniteDuration = 1.minutes,
               override val connectTo: Map[String, ActorRef] = Map.empty,
               override val schedulerTimeInterval: FiniteDuration = 30.seconds,
               override val orchestrationName: String = "",
               override val orchestrationID: String = "",
               override val autoScale: Boolean = false) extends FeyGenericActor {}

Life-cycle Actor Hooks

Fey's generic actor final overrides the life-cycle actor hooks, such as: preStart, postStop, preRestart, postRestart. But, it does offers the user the ability to execute additional commands by overriding the following methods:

  • onStart: Will be called as part of the actor's life-cycle preStart, right after make the decision of starting a scheduler or not (see Scheduler for more details). Be careful when using this method for an autoscaling Performer, since it will be called for every routee (Akka terminology). So, if you are doing something like binding to a port in the OnStart method of the Performer, other routees may not be able to bind to the same port again. (See Auto Scaling for more details)
  • onStop: Will be called as part of the actor's life-cycle postStop, after stopping the scheduler. Normally, this method is used to “clean-up” the actor, like closing connections.
  • onRestart: Will be called as part of the actor's life-cycle postRestart. When the actor is restarted, all of its children will be stopped and the postStop is going to be called, followed by calling preStart

Messages

Fey‘s generic actor final overrides the actor’s receive method. But it gives you a complementary receive that is going to be called in case the message could not be handled by the generic receiver. The generic actor handles the following messages:

  1. PRINT-PATH: logs the actor's path
  2. STOP: Stops himself
  3. PROCESS(message: T): Generic typed message that should be used for the communication between generic actors. This message check if the backoff is enable, and if not, it calls the user-overridable processMessage method. (See processing messages for more details).
  4. EXCEPTION(reason: Throwable): Throws the Throwable so the actor's parent can handle it.

All of the other messages that are not handled by the default receive will be pass to the user-overridable customReceive: Receive method.

Keep in mind that even if your actor can handle a different set of messages of the default ones, the main communication between the generic actor should happen through the PROCESS message

###Propagating Messages

Fey works with the concept that the actor will communicate with the actors that connects to it by sending PROCESS messages. Having that in mind, the generic actor offers a final generic typed method (propagateMessage(message:T)) that sends a PROCESS message to each one of the actors in the connectTo parameter.

If you don't want to propagate the message to all of the actors that connects to it, you should implement a different propagate method.

###Processing Messages

The PROCESS[T](message: T) is the global message to be used when communicating to other Fey actors. The actor can receive any type of message through it.

After receiving the PROCESS message, the actor will check if the backoff is enabled and, if it is enabled, nothing will happen and the message will not be processed, if it is not enabled then the actor will call the user-overridable processMessage[T](message: T, sender: ActorRef) method.

The default implementation of processMessage[T](message: T, sender: ActorRef) logs the message being processed, calls the propagate method and then starts the backoff by calling startBackoff method (see Handling Backoff).

You could override this method to handle only the type of message that you are expecting and to execute some action when a message is received. In the example bellow, the actors only handles PROCESS message of type Int or String, and starts the backoff if the message is of type Int

override def processMessage[T](message:T, sender: ActorRef): Unit = {
 message match {
   case msg:String =>
     log.info(s"Processing String $msg")
   case msg:Int =>
     log.info(s"Processing Int $msg -> ${msg+1}")
     startBackoff()
   case x => log.info("Type not handled")
 }
}

####Handling Backoff

A lot of use cases will require the Performer (actor) stop processing messages for a time interval after some specific action happend. The generic actor offers a built-in backoff that is used only by the PROCESS message.

Every time you need the actor to backoff after an action, you should call the startBackoff method. The startBackoff method uses the constructor parameter backoff and sets an internal state of the actor called endBackoff with the time in which the actor should starting processing messages again. The endBackoff internal state is verified everytime the actor gets a PROCESS message.

Note: Be careful when calling startBackoff. Make sure it will just be affected by the flow around the PROCESS message

###Scheduler

The generic actor is able to start and control one scheduler. The scheduler will be started through the preStart life-cycle hook that will check if the constructor parameter schedulerTimeInterval is non-zero then Fey starts a system.scheduler that executes every schedulerTimeInterval. If the parameter is zero no scheduler will be started.

Once started, the scheduler will call the user-overridable execute() method every schedulerTimeInterval. If the actor dies or get restarted, the scheduler will be cancelled and then started again (in case of restart).

Note: You can start as many schedulers as you want to inside your generic actor, but just the one started during the creation of the actor by Fey will be monitored, i.e., will be stopped and started as necessary.

Auto Scaling

Fey implements Akka Load Balacing functionality (we call this Autoscaling) using Router Actors with SmallestMailboxPool strategy and with DefaultResizer. When starting the actor, Fey looks to the JSON configuration and checks if the Performer should is an auto scaled Performer. For more information about Routers, please visit Akka's webpage.

Example of a Fey Generic Actor

class MyGenericActor(override val params: Map[String, String] = Map.empty,
               override val backoff: FiniteDuration = 1.minutes,
               override val connectTo: Map[String, ActorRef] = Map.empty,
               override val schedulerTimeInterval: FiniteDuration = 30.seconds,
               override val orchestrationName: String = "",
               override val orchestrationID: String = "",
               override val autoScale: Boolean = false) extends FeyGenericActor {

  override def onStart() = {
    log.info(s"STARTING ${self.path.name}")
    val ZMQcontext = ZMQ.context(1)
    val publisher = ZMQcontext.socket(ZMQ.PUB)
    publisher.bind(s"tcp://localhost:${params.get("port").get}")
  }

  var count = 0
  
  override def execute() = {
    count +=1
    propagateMessage(count)
  }

   override def processMessage[T](message:T, sender: ActorRef): Unit = {
     message match {
       case msg:String =>
         log.info(s"Processing String $msg")
       case msg:Int =>
         log.info(s"Processing Int $msg -> ${msg+1}")
         startBackoff()
       case x => log.info("Type not handled")
     }
   }


  override def customReceive(): Receive ={
    case x => log.warning(s"Untreated message $x")
  }

}