Adding shared performer: Shared performer should be defined on fey configuration file and can be used by any orchestration
diff --git a/fey-core/src/main/resources/application.conf b/fey-core/src/main/resources/application.conf
index 1d7e092..435804d 100644
--- a/fey-core/src/main/resources/application.conf
+++ b/fey-core/src/main/resources/application.conf
@@ -96,6 +96,9 @@
   // Which port and host should the restapi bind to
   port = 16666
   urlPath = "0.0.0.0"
+
+  //Path for shared performers json file
+  shared-performers = ""
 }
 
 // Fey akka configuration. Can not be overwritten by user
diff --git a/fey-core/src/main/resources/shared-json-schema-validator.json b/fey-core/src/main/resources/shared-json-schema-validator.json
new file mode 100644
index 0000000..2f5a376
--- /dev/null
+++ b/fey-core/src/main/resources/shared-json-schema-validator.json
@@ -0,0 +1,97 @@
+{
+  "shared-performers": {
+    "type": "array",
+    "items": {
+      "guid": {
+        "type": "string"
+      },
+      "controlAware": {
+        "type": "boolean"
+      },
+      "dispatcher": {
+        "type": "string"
+      },
+      "autoScale": {
+        "type": "object",
+        "lowerBound": {
+          "type": "integer",
+          "minimum": 1
+        },
+        "upperBound": {
+          "type": "integer",
+          "minimum": 1
+        },
+        "backoffThreshold": {
+          "type": "number",
+          "minimum": 0.0
+        },
+        "roundRobin": {
+          "type": "boolean"
+        },
+        "required": [
+          "lowerBound",
+          "upperBound"
+        ]
+      },
+      "schedule": {
+        "type": "integer",
+        "minimum": 0
+      },
+      "backoff": {
+        "type": "integer",
+        "minimum": 0
+      },
+      "source": {
+        "type": "object",
+        "name": {
+          "type": "string",
+          "pattern": "\\w+(\\.jar)"
+        },
+        "location": {
+          "type": "object",
+          "url": {
+            "type": "string",
+            "pattern": "(?i)(^(http|https|file)):\/\/"
+          },
+          "credentials": {
+            "user": {
+              "type": "string"
+            },
+            "password": {
+              "type": "string"
+            },
+            "required": [
+              "user",
+              "password"
+            ]
+          },
+          "required": [
+            "url"
+          ]
+        },
+        "classPath": {
+          "type": "string",
+          "pattern": "\\w+"
+        },
+        "parameters": {
+          "patternProperties": {
+            ".*": {
+              "type": "string"
+            }
+          }
+        },
+        "required": [
+          "name",
+          "classPath",
+          "parameters"
+        ]
+      },
+      "required": [
+        "guid",
+        "schedule",
+        "backoff",
+        "source"
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/CoreSharedPerformers.scala b/fey-core/src/main/scala/org/apache/iota/fey/CoreSharedPerformers.scala
new file mode 100644
index 0000000..57a3ceb
--- /dev/null
+++ b/fey-core/src/main/scala/org/apache/iota/fey/CoreSharedPerformers.scala
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.SupervisorStrategy.Restart
+import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, Props, Terminated}
+import akka.routing._
+import play.api.libs.json.JsObject
+
+import scala.collection.mutable.HashMap
+import scala.concurrent.duration._
+
+protected class CoreSharedPerformers(val sharedJson: List[JsObject]) extends Actor with ActorLogging{
+
+  val monitoring_actor = FEY_MONITOR.actorRef
+  var shared_metadata: Map[String, Performer] = Map.empty[String, Performer]
+
+  override def receive: Receive = {
+
+    case CoreSharedPerformers.PRINT_GLOBAL =>
+      context.actorSelection(s"*") ! FeyGenericActor.PRINT_PATH
+
+    case CoreSharedPerformers.RESTART_SHARED(uuid) =>
+      restartShared(uuid)
+
+    case Terminated(actor) =>
+      monitoring_actor  ! Monitor.TERMINATE(actor.path.toString, Utils.getTimestamp)
+      log.error(s"DEAD Shared performer: ${actor.path.name} . All the ensembles that uses this performer will be restarted")
+      self ! CoreSharedPerformers.RESTART_SHARED(actor.path.name)
+
+    case x => log.warning(s"Message $x not treated by Shared Performers")
+  }
+
+  /**
+    * If any of the shared performer dies, it tries to restart it.
+    * If we could not be restarted, then the terminated message will be received
+    * and Shared Performer is going to throw an Exception to its orchestration
+    * asking it to Restart all the entire orchestration. The restart will then stop all of its
+    * children when call the preStart.
+    */
+  override val supervisorStrategy =
+  OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 1 minute) {
+    case e: Exception => Restart
+  }
+
+  /**
+    * Uses the json spec to create the performers
+    */
+  override def preStart() : Unit = {
+    monitoring_actor ! Monitor.START(Utils.getTimestamp)
+
+    shared_metadata = Ensemble.extractPerformers(sharedJson)
+
+    createSharedPerformers()
+  }
+
+  override def postStop() : Unit = {
+    monitoring_actor  ! Monitor.STOP(Utils.getTimestamp)
+  }
+
+  override def postRestart(reason: Throwable): Unit = {
+    monitoring_actor  ! Monitor.RESTART(reason, Utils.getTimestamp)
+    preStart()
+  }
+
+  private def restartShared(uuid: String) = {
+    try {
+      log.warning(s"Restarting Shared Performer ${uuid}")
+      val metadata = shared_metadata.get(uuid)
+      if (metadata.isDefined) {
+        createFeyActor(uuid, metadata.get)
+      } else {
+        log.error(s"Could not restart shared $uuid because metadata is not configured")
+      }
+
+      CoreSharedPerformers.ensemblesUsingShared.keySet.foreach((ensAndShared) => {
+        if (ensAndShared._2 == uuid) {
+          val actor = CoreSharedPerformers.ensemblesUsingShared.get(ensAndShared).get
+          actor ! Ensemble.FORCE_RESTART_ENSEMBLE
+        }
+      })
+    }catch{
+      case e: Exception=>
+        log.error(e, s"Could not restart shared $uuid")
+    }
+  }
+
+  private def createSharedPerformers() = {
+    shared_metadata.foreach((shared) => {
+      try{
+        createFeyActor(shared._1, shared._2)
+      }catch{
+        case e: Exception =>
+          log.error(e, s"Could not created shared performer ${shared._1}")
+      }
+    })
+  }
+
+  private def createFeyActor(performerID: String, performerInfo: Performer) = {
+    val actor: ActorRef = {
+      val actorProps = getPerformer(performerInfo)
+      if (performerInfo.autoScale) {
+
+        val resizer = DefaultResizer(lowerBound = performerInfo.lowerBound, upperBound = performerInfo.upperBound,
+          messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.backoffThreshold, backoffRate = 0.1)
+
+        val strategy =
+          if (performerInfo.isRoundRobin) {
+            log.info(s"Using Round Robin for performer ${performerID}")
+            RoundRobinPool(1, Some(resizer))
+          } else {
+            log.info(s"Using Smallest mailbox for performer ${performerID}")
+            SmallestMailboxPool(1, Some(resizer))
+          }
+
+        context.actorOf(strategy.props(actorProps), name = performerID)
+      } else {
+        context.actorOf(actorProps, name = performerID)
+      }
+    }
+
+    context.watch(actor)
+    CoreSharedPerformers.activeSharedPerformers.put(performerID, actor)
+  }
+
+  /**
+    * Creates actor props based on JSON configuration
+    *
+    * @param performerInfo Performer object
+    * @return Props of actor based on JSON config
+    */
+  private def getPerformer(performerInfo: Performer): Props = {
+
+    var clazz:Option[Class[FeyGenericActor]] = None
+
+    Utils.loadedJars.synchronized {
+      clazz = Some(loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName))
+    }
+
+    if(clazz.isDefined) {
+      val dispatcher = if (performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else ""
+
+      val actorProps = Props(clazz.get, performerInfo.parameters, performerInfo.backoff,
+        Map.empty, performerInfo.schedule, self.path.name, self.path.name, performerInfo.autoScale)
+
+      // dispatcher has higher priority than controlAware. That means that if both are defined
+      // then the custom dispatcher will be used
+      if (dispatcher != "") {
+        log.info(s"Using dispatcher: $dispatcher")
+        actorProps.withDispatcher(dispatcher)
+      }
+      else if (performerInfo.controlAware) {
+        actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
+      } else {
+        actorProps
+      }
+    }else{
+      log.error(s"Could not load class for performer ${performerInfo.uid}")
+      throw new ClassNotFoundException(s"${performerInfo.jarName} -- ${performerInfo.jarLocation}")
+    }
+  }
+
+  /**
+    * Load a clazz instance of FeyGenericActor from a jar
+    *
+    * @param classPath class path
+    * @param jarLocation Full path where to load the jar from
+    * @return clazz instance of FeyGenericActor
+    */
+  private def loadClazzFromJar(classPath: String, jarLocation: String, jarName: String):Class[FeyGenericActor] = {
+    try {
+      Utils.loadActorClassFromJar(jarLocation,classPath,jarName)
+    }catch {
+      case e: Exception =>
+        log.error(e,s"Could not load class $classPath from jar $jarLocation. Please, check the Jar repository path as well the jar name")
+        throw e
+    }
+  }
+}
+
+object CoreSharedPerformers{
+
+  // [sharedID, actorRef]
+  val activeSharedPerformers:HashMap[String,ActorRef] = HashMap.empty[String,ActorRef]
+  // [[ensembleID, sharedID] -> actorRef]
+  val ensemblesUsingShared:HashMap[Tuple2[String,String], ActorRef] = HashMap.empty[Tuple2[String,String], ActorRef]
+
+  case class RESTART_SHARED(shared_id: String)
+  case object PRINT_GLOBAL
+}
\ No newline at end of file
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index 04308c3..e2944f0 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -41,6 +41,8 @@
 
     case STOP_PERFORMERS => stopPerformers()
 
+    case FORCE_RESTART_ENSEMBLE => throw new RestartEnsemble(s"Forcing restart of ensemble. Reason: Shared performer DEAD")
+
     case PRINT_ENSEMBLE =>
       val ed = connectors.map(connector => {
         s""" \t ${connector._1} : ${connector._2.mkString("[",",","]")}"""
@@ -198,14 +200,17 @@
         context.watch(actor)
         tmpActors.put(performerID, actor)
         (performerID, actor)
-      }else{
-        // Performer is a global performer and is already created
-        if(GlobalPerformer.activeGlobalPerformers.contains(orchestrationID)
-          && GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.contains(performerID)){
-          (performerID, GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.get(performerID).get)
-        }else {
-          throw new IllegalPerformerCreation(s"Performer $performerID is not defined in the JSON")
-        }
+      }else if(GlobalPerformer.activeGlobalPerformers.contains(orchestrationID)
+        && GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.contains(performerID)){
+        // Performer is a global orchestration performer and is already created
+        (performerID, GlobalPerformer.activeGlobalPerformers.get(orchestrationID).get.get(performerID).get)
+      }else if(CoreSharedPerformers.activeSharedPerformers.contains(performerID)){
+        // Shared performers, level is FeyCore
+        CoreSharedPerformers.ensemblesUsingShared.put((self.path.toString, performerID), self)
+        (performerID, CoreSharedPerformers.activeSharedPerformers.get(performerID).get)
+      }
+      else {
+        throw new IllegalPerformerCreation(s"Performer $performerID is not defined in the JSON")
       }
     }
   }
@@ -354,6 +359,7 @@
     */
   case object STOP_PERFORMERS
   case object PRINT_ENSEMBLE
+  case object FORCE_RESTART_ENSEMBLE
 }
 
 /**
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
index 14c4cc0..24a6d93 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
@@ -31,6 +31,7 @@
 
 import scala.collection.mutable.HashMap
 import scala.concurrent.duration._
+import scala.io.Source
 
 protected class FeyCore extends Actor with ActorLogging{
 
@@ -50,6 +51,7 @@
     case START =>
       val jsonReceiverActor: ActorRef = context.actorOf(Props[JsonReceiverActor], name = JSON_RECEIVER_NAME)
       context.watch(jsonReceiverActor)
+      startSharedPerformers()
 
     case ORCHESTRATION_RECEIVED(orchestrationJson, optionFile) =>
       optionFile match {
@@ -70,13 +72,34 @@
 
     case Terminated(actor) => processTerminatedMessage(actor)
 
-    case GetRoutees => //Discard
-
     case x =>
       log.info(s"Received $x")
 
   }
 
+  private def startSharedPerformers() = {
+    var jsonList:List[JsObject] = List.empty
+    if(CONFIG.SHARED_PERFORMER_JSON_PATH.isDefined){
+
+      //reading file
+      val sharedString = Source.fromFile(CONFIG.SHARED_PERFORMER_JSON_PATH.get).getLines().mkString("")
+      val sharedJson = Json.parse(sharedString)
+
+      val result = SchemaValidator.validate(FeyCore.sharedJsonSpec, sharedJson)
+      if (result.isError) {
+        log.error("Incorrect SHARED JSON schema \n" + result.asEither.left.get.toJson.as[List[JsObject]].map(error => {
+          val path = (error \ "instancePath").as[String]
+          val msg = (error \ "msgs").as[List[String]].mkString("\n\t")
+          s"$path \n\tErrors: $msg"
+        }).mkString("\n"))
+      } else {
+        jsonList = (sharedJson \ "shared-performers").as[List[JsObject]]
+      }
+    }
+    val sharedMain: ActorRef = context.actorOf(Props(classOf[CoreSharedPerformers], jsonList), name = SHARED_PERFORMERS_NAME)
+    context.watch(sharedMain)
+  }
+
   private def orchestrationReceivedNoFile(json: JsValue) = {
     val orchGUID = (json \ GUID).as[String]
     log.info(s"Orchestration $orchGUID received")
@@ -355,13 +378,11 @@
 
   final val JSON_RECEIVER_NAME: String = "JSON_RECEIVER"
   final val IDENTIFIER_NAME: String = "FEY_IDENTIFIER"
+  final val SHARED_PERFORMERS_NAME: String = "FEY_SHARED_PERFORMERS"
 
-  /**
-    * Loads the specification for validating a Fey JSON
-    */
-  val jsonSchemaSpec: SchemaType = {
+  val sharedJsonSpec: SchemaType = {
     Json.fromJson[SchemaType](Json.parse(scala.io.Source
-      .fromInputStream(getClass.getResourceAsStream("/fey-json-schema-validator.json"))
+      .fromInputStream(getClass.getResourceAsStream("/shared-json-schema-validator.json"))
       .getLines()
       .mkString(""))).get
   }
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala b/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala
index e343f2e..26a58c4 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/GlobalPerformer.scala
@@ -32,7 +32,6 @@
 
   val monitoring_actor = FEY_MONITOR.actorRef
   var global_metadata: Map[String, Performer] = Map.empty[String, Performer]
-  var global_performer: Map[String,ActorRef] = Map.empty[String,ActorRef]
 
   override def receive: Receive = {
 
@@ -138,24 +137,32 @@
     * @return Props of actor based on JSON config
     */
   private def getPerformer(performerInfo: Performer): Props = {
+    var clazz:Option[Class[FeyGenericActor]] = None
 
-    val clazz = loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName)
-
-    val dispatcher = if(performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else ""
-
-    val actorProps = Props(clazz,
-      performerInfo.parameters, performerInfo.backoff, Map.empty, performerInfo.schedule, orchestrationName, orchestrationID, performerInfo.autoScale)
-
-    // dispatcher has higher priority than controlAware. That means that if both are defined
-    // then the custom dispatcher will be used
-    if(dispatcher != ""){
-      log.info(s"Using dispatcher: $dispatcher")
-      actorProps.withDispatcher(dispatcher)
+    Utils.loadedJars.synchronized {
+      clazz = Some(loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName))
     }
-    else if(performerInfo.controlAware){
-      actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
+
+    if(clazz.isDefined) {
+      val dispatcher = if (performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else ""
+
+      val actorProps = Props(clazz.get,
+        performerInfo.parameters, performerInfo.backoff, Map.empty, performerInfo.schedule, orchestrationName, orchestrationID, performerInfo.autoScale)
+
+      // dispatcher has higher priority than controlAware. That means that if both are defined
+      // then the custom dispatcher will be used
+      if (dispatcher != "") {
+        log.info(s"Using dispatcher: $dispatcher")
+        actorProps.withDispatcher(dispatcher)
+      }
+      else if (performerInfo.controlAware) {
+        actorProps.withDispatcher(CONFIG.CONTROL_AWARE_MAILBOX)
+      } else {
+        actorProps
+      }
     }else{
-      actorProps
+      log.error(s"Could not load class for performer ${performerInfo.uid}")
+      throw new ClassNotFoundException(s"${performerInfo.jarName} -- ${performerInfo.jarLocation}")
     }
   }
 
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index 3bfde54..bedec84 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -238,6 +238,7 @@
   var MONITORING_TYPE: String = "COMPLETE"
   var PORT = DEFAULT_PORT
   var URL_PATH = "localhost"
+  var SHARED_PERFORMER_JSON_PATH: Option[String] = None
 
   def loadUserConfiguration(path: String) : Unit = {
     val app = {
@@ -265,6 +266,7 @@
     MONITORING_TYPE = app.getString("monitoring.type").toUpperCase()
     PORT = app.getInt("port")
     URL_PATH = app.getString("urlPath")
+    SHARED_PERFORMER_JSON_PATH = if(app.getString("shared-performers").isEmpty) None else Some(app.getString("shared-performers"))
 
     setLogbackConfiguration()
   }