Merge branch 'IOTA-23' of https://github.com/shiv4nsh/incubator-iota
diff --git a/fey-core/src/main/resources/fey-json-schema-validator.json b/fey-core/src/main/resources/fey-json-schema-validator.json
index 579fbca..79952dd 100644
--- a/fey-core/src/main/resources/fey-json-schema-validator.json
+++ b/fey-core/src/main/resources/fey-json-schema-validator.json
@@ -36,8 +36,26 @@
"type": "string"
},
"autoScale":{
- "type":"integer",
- "minimum":0
+ "type": "object",
+ "lowerBound" : {
+ "type":"integer",
+ "minimum":1
+ },
+ "upperBound" : {
+ "type":"integer",
+ "minimum":1
+ },
+ "backoffThreshold" :{
+ "type":"number",
+ "minimum":0.0
+ },
+ "roundRobin":{
+ "type": "boolean"
+ },
+ "required":[
+ "lowerBound",
+ "upperBound"
+ ]
},
"schedule":{
"type":"integer",
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index afa39f2..a606217 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -19,7 +19,7 @@
import akka.actor.SupervisorStrategy.Restart
import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, PoisonPill, Props, Terminated}
-import akka.routing.{ActorRefRoutee, DefaultResizer, GetRoutees, SmallestMailboxPool}
+import akka.routing._
import org.apache.iota.fey.JSON_PATH._
import play.api.libs.json.JsObject
@@ -171,13 +171,21 @@
var actor:ActorRef = null
val actorProps = getPerformer(performerInfo, connections)
- if(performerInfo.autoScale > 0) {
+ if(performerInfo.autoScale) {
- val resizer = DefaultResizer(lowerBound = 1, upperBound = performerInfo.autoScale,
- messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = 0.4)
- val smallestMailBox = SmallestMailboxPool(1, Some(resizer))
+ val resizer = DefaultResizer(lowerBound = performerInfo.lowerBound, upperBound = performerInfo.upperBound,
+ messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.backoffThreshold, backoffRate = 0.1)
- actor = context.actorOf(smallestMailBox.props(actorProps), name = performerID)
+ val strategy =
+ if(performerInfo.isRoundRobin) {
+ log.info(s"Using Round Robin for performer ${performerID}")
+ RoundRobinPool(1, Some(resizer))
+ } else {
+ log.info(s"Using Smallest mailbox for performer ${performerID}")
+ SmallestMailboxPool(1, Some(resizer))
+ }
+
+ actor = context.actorOf(strategy.props(actorProps), name = performerID)
}else{
actor = context.actorOf(actorProps, name = performerID)
@@ -204,11 +212,10 @@
val clazz = loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName)
- val autoScale = if(performerInfo.autoScale > 0) true else false
val dispatcher = if(performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else ""
val actorProps = Props(clazz,
- performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, autoScale)
+ performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, performerInfo.autoScale)
// dispatcher has higher priority than controlAware. That means that if both are defined
// then the custom dispatcher will be used
@@ -282,7 +289,19 @@
val id: String= (performer \ GUID).as[String]
val schedule: Int = (performer \ SCHEDULE).as[Int]
val backoff: Int = (performer \ BACKOFF).as[Int]
- val autoScale: Int = if (performer.keys.contains(PERFORMER_AUTO_SCALE)) (performer \ PERFORMER_AUTO_SCALE).as[Int] else 0
+
+ val autoScale: Boolean = if (performer.keys.contains(PERFORMER_AUTO_SCALE)) true else false
+ val lowerBound: Int = if (autoScale) (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_LOWER_BOUND).as[Int] else 0
+ val upperBound: Int = if (autoScale) (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_UPPER_BOUND).as[Int] else 0
+ if(lowerBound > upperBound){
+ throw new IllegalArgumentException(" Could not define performer. Autoscale param: Lower bound greater than upper bound")
+ }
+ val threshold: Double = if (autoScale && (performer \ PERFORMER_AUTO_SCALE).as[JsObject].keys.contains(PERFORMER_BACKOFF_THRESHOLD))
+ (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_BACKOFF_THRESHOLD).as[Double] else 0.3
+ val roundRobin: Boolean = if (autoScale && (performer \ PERFORMER_AUTO_SCALE).as[JsObject].keys.contains(PERFORMER_ROUND_ROBIN))
+ (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_ROUND_ROBIN).as[Boolean] else false
+
+
val jarName: String = (performer \ SOURCE \ SOURCE_NAME).as[String]
val classPath: String = (performer \ SOURCE \ SOURCE_CLASSPATH).as[String]
val params:Map[String,String] = getMapOfParams((performer \ SOURCE \ SOURCE_PARAMS).as[JsObject])
@@ -290,7 +309,8 @@
val location: String = if ( (performer \ SOURCE).as[JsObject].keys.contains(JAR_LOCATION) ) CONFIG.DYNAMIC_JAR_REPO else CONFIG.JAR_REPOSITORY
val dispatcher: String = if (performer.keys.contains(PERFORMER_DISPATCHER)) (performer \ PERFORMER_DISPATCHER).as[String] else ""
- (id, new Performer(id, jarName, classPath, params, schedule.millisecond, backoff.millisecond, autoScale,controlAware, location, dispatcher))
+ (id, new Performer(id, jarName, classPath, params, schedule.millisecond, backoff.millisecond,
+ autoScale, lowerBound, upperBound,threshold, roundRobin,controlAware, location, dispatcher))
}).toMap
}
@@ -316,18 +336,24 @@
/**
* Holds the performer information
*
- * @param uid performer uid
- * @param jarName performer jar name
- * @param classPath performer class path
- * @param parameters performer params
- * @param schedule performer schedule interval
- * @param backoff performer backoff interval
- * @param autoScale if actor was started as a router and can autoscala
- * @param controlAware if the actor uses a controlAware mailbox
- * @param jarLocation download jar
- * @param dispatcher Akka dispatcher that the actor is using
+ * @param uid
+ * @param jarName
+ * @param classPath
+ * @param parameters
+ * @param schedule
+ * @param backoff
+ * @param autoScale
+ * @param lowerBound
+ * @param upperBound
+ * @param backoffThreshold
+ * @param isRoundRobin
+ * @param controlAware
+ * @param jarLocation
+ * @param dispatcher
*/
case class Performer(uid: String, jarName: String,
- classPath: String, parameters: Map[String,String],
- schedule: FiniteDuration, backoff: FiniteDuration,
- autoScale: Int, controlAware: Boolean, jarLocation: String, dispatcher: String)
+ classPath: String, parameters: Map[String,String],
+ schedule: FiniteDuration, backoff: FiniteDuration,
+ autoScale: Boolean, lowerBound: Int, upperBound: Int,
+ backoffThreshold: Double, isRoundRobin: Boolean, controlAware: Boolean,
+ jarLocation: String, dispatcher: String)
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index 8ac18cf..ff4e574 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -190,6 +190,10 @@
val ORCHESTRATION_NAME = "name"
val ORCHESTRATION_TIMESTAMP = "timestamp"
val PERFORMER_AUTO_SCALE = "autoScale"
+ val PERFORMER_LOWER_BOUND = "lowerBound"
+ val PERFORMER_UPPER_BOUND = "upperBound"
+ val PERFORMER_BACKOFF_THRESHOLD = "backoffThreshold"
+ val PERFORMER_ROUND_ROBIN = "roundRobin"
val CONTROL_AWARE = "controlAware"
val JAR_LOCATION = "location"
val JAR_LOCATION_URL = "url"
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
index e7e3554..ae7ce50 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
@@ -68,7 +68,11 @@
performer.controlAware should equal(false)
performer.jarName should equal((performerSpec \ SOURCE \ SOURCE_NAME).as[String])
performer.jarLocation should equal(CONFIG.JAR_REPOSITORY)
- performer.autoScale should equal(0)
+ performer.autoScale should equal(false)
+ performer.lowerBound should equal(0)
+ performer.upperBound should equal(0)
+ performer.isRoundRobin should equal(false)
+ performer.backoffThreshold should equal(0.3)
performer.backoff should equal((performerSpec \ BACKOFF).as[Int].millisecond)
performer.classPath should equal((performerSpec \ SOURCE \ SOURCE_CLASSPATH).as[String])
performer.uid should equal((performerSpec \ GUID).as[String])
@@ -268,6 +272,9 @@
var backScheduleRef: ActorRef = _
val backprocessParamsTB = TestProbe("BACKOFF")
val routee = """$a"""
+ val routee2 = """$b"""
+ val routee3 = """$c"""
+ val routee4 = """$d"""
s"creating Ensemble with Backoff performer" should {
s"result in creation of Ensemble actor " in {
@@ -286,8 +293,10 @@
s"create 'PERFORMER-PARAMS' with backoff time equal to 1 second" in{
backEnsembleState.performers_metadata.get("PERFORMER-PARAMS").get.backoff should equal(1000.millisecond)
}
- s"create 'PERFORMER-SCHEDUKE' with autoScale equal to true" in{
- backEnsembleState.performers_metadata.get("PERFORMER-SCHEDULER").get.autoScale should equal(2)
+ s"create 'PERFORMER-SCHEDULER' with autoScale equal to true" in {
+ backEnsembleState.performers_metadata.get("PERFORMER-SCHEDULER").get.autoScale should equal(true)
+ backEnsembleState.performers_metadata.get("PERFORMER-SCHEDULER").get.lowerBound should equal(4)
+ backEnsembleState.performers_metadata.get("PERFORMER-SCHEDULER").get.upperBound should equal(6)
}
}
s"Performer with backoff enabled" should {
@@ -303,11 +312,14 @@
"result in router and routees created" in {
globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
Thread.sleep(500)
- IdentifyFeyActors.actorsPath should have size(4)
+ IdentifyFeyActors.actorsPath should have size(7)
IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005")
IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-PARAMS")
IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER")
IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee")
+ IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee2")
+ IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee3")
+ IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee4")
}
}
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala b/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala
index 70d3ddc..106a406 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala
@@ -333,7 +333,10 @@
"guid": "PERFORMER-SCHEDULER",
"schedule": 200,
"backoff": 0,
- "autoScale": 2,
+ "autoScale": {
+ "lowerBound" : 4,
+ "upperBound" : 6
+ },
"source": {
"name": "fey-test-actor.jar",
"classPath": "org.apache.iota.fey.TestActor",
diff --git a/project/Build.scala b/project/Build.scala
index 4444c59..2036523 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -17,6 +17,7 @@
import sbt._
import sbt.Keys._
+import sbtassembly.AssemblyPlugin.autoImport._
object ModuleDependencies {
@@ -53,22 +54,24 @@
id = "fey-stream",
base = file("./performers/stream"),
settings = BasicSettings ++ StreambuildSettings ++ Seq(
- libraryDependencies ++= ModuleDependencies.StreamDependencies
-
- ))
+ libraryDependencies ++= ModuleDependencies.StreamDependencies,
+ assemblyJarName in assembly := "fey_stream.jar"
+ )
+ )
lazy val zmq = Project(
id = "fey-zmq",
base = file("./performers/zmq"),
settings = BasicSettings ++ ZMQbuildSettings ++ Seq(
- libraryDependencies ++= ModuleDependencies.ZMQDependencies
+ libraryDependencies ++= ModuleDependencies.ZMQDependencies,
+ assemblyJarName in assembly := "fey_zmq.jar"
))
lazy val virtual_sensor = Project(
id = "fey-virtual-sensor",
base = file("./performers/virtual_sensor"),
settings = BasicSettings ++ VirtualSensorbuildSettings ++ Seq(
- libraryDependencies ++= ModuleDependencies.VirtualSensorDependencies
+ libraryDependencies ++= ModuleDependencies.VirtualSensorDependencies,
+ assemblyJarName in assembly := "fey-virtual-sensor.jar"
))
-
}
diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala
index c2013b1..8817df7 100644
--- a/project/BuildSettings.scala
+++ b/project/BuildSettings.scala
@@ -65,7 +65,7 @@
//All tests need to be execute sequentially
parallelExecution in Test := false,
testOptions in Test += Tests.Cleanup( () => {
- println("CLenning up")
+ print("\nCLeaning up")
removeAll("/tmp/fey/test")
def removeAll(path: String) = {
def getRecursively(f: File): Seq[File] =