Merge branch 'IOTA-23' of https://github.com/shiv4nsh/incubator-iota
diff --git a/fey-core/src/main/resources/fey-json-schema-validator.json b/fey-core/src/main/resources/fey-json-schema-validator.json
index 579fbca..79952dd 100644
--- a/fey-core/src/main/resources/fey-json-schema-validator.json
+++ b/fey-core/src/main/resources/fey-json-schema-validator.json
@@ -36,8 +36,26 @@
             "type": "string"
           },
           "autoScale":{
-            "type":"integer",
-            "minimum":0
+            "type": "object",
+            "lowerBound" : {
+              "type":"integer",
+              "minimum":1
+            },
+            "upperBound" : {
+              "type":"integer",
+              "minimum":1
+            },
+            "backoffThreshold" :{
+              "type":"number",
+              "minimum":0.0
+            },
+            "roundRobin":{
+              "type": "boolean"
+            },
+            "required":[
+              "lowerBound",
+              "upperBound"
+            ]
           },
           "schedule":{
             "type":"integer",
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index afa39f2..a606217 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -19,7 +19,7 @@
 
 import akka.actor.SupervisorStrategy.Restart
 import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, PoisonPill, Props, Terminated}
-import akka.routing.{ActorRefRoutee, DefaultResizer, GetRoutees, SmallestMailboxPool}
+import akka.routing._
 import org.apache.iota.fey.JSON_PATH._
 import play.api.libs.json.JsObject
 
@@ -171,13 +171,21 @@
 
         var actor:ActorRef = null
         val actorProps = getPerformer(performerInfo, connections)
-        if(performerInfo.autoScale > 0) {
+        if(performerInfo.autoScale) {
 
-          val resizer = DefaultResizer(lowerBound = 1, upperBound = performerInfo.autoScale,
-            messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = 0.4)
-          val smallestMailBox = SmallestMailboxPool(1, Some(resizer))
+          val resizer = DefaultResizer(lowerBound = performerInfo.lowerBound, upperBound = performerInfo.upperBound,
+            messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.backoffThreshold, backoffRate = 0.1)
 
-          actor = context.actorOf(smallestMailBox.props(actorProps), name = performerID)
+          val strategy =
+            if(performerInfo.isRoundRobin) {
+               log.info(s"Using Round Robin for performer ${performerID}")
+               RoundRobinPool(1, Some(resizer))
+             } else {
+               log.info(s"Using Smallest mailbox for performer ${performerID}")
+               SmallestMailboxPool(1, Some(resizer))
+             }
+
+          actor = context.actorOf(strategy.props(actorProps), name = performerID)
 
         }else{
           actor = context.actorOf(actorProps, name = performerID)
@@ -204,11 +212,10 @@
 
     val clazz = loadClazzFromJar(performerInfo.classPath, s"${performerInfo.jarLocation}/${performerInfo.jarName}", performerInfo.jarName)
 
-    val autoScale = if(performerInfo.autoScale > 0) true else false
     val dispatcher = if(performerInfo.dispatcher != "") s"fey-custom-dispatchers.${performerInfo.dispatcher}" else ""
 
     val actorProps = Props(clazz,
-      performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, autoScale)
+      performerInfo.parameters, performerInfo.backoff, connections, performerInfo.schedule, orchestrationName, orchestrationID, performerInfo.autoScale)
 
     // dispatcher has higher priority than controlAware. That means that if both are defined
     // then the custom dispatcher will be used
@@ -282,7 +289,19 @@
       val id: String= (performer \ GUID).as[String]
       val schedule: Int = (performer \ SCHEDULE).as[Int]
       val backoff: Int = (performer \ BACKOFF).as[Int]
-      val autoScale: Int = if (performer.keys.contains(PERFORMER_AUTO_SCALE)) (performer \ PERFORMER_AUTO_SCALE).as[Int] else 0
+
+      val autoScale: Boolean = if (performer.keys.contains(PERFORMER_AUTO_SCALE)) true else false
+      val lowerBound: Int = if (autoScale) (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_LOWER_BOUND).as[Int] else 0
+      val upperBound: Int = if (autoScale) (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_UPPER_BOUND).as[Int] else 0
+      if(lowerBound > upperBound){
+        throw new IllegalArgumentException(" Could not define performer. Autoscale param: Lower bound greater than upper bound")
+      }
+      val threshold: Double = if (autoScale && (performer \ PERFORMER_AUTO_SCALE).as[JsObject].keys.contains(PERFORMER_BACKOFF_THRESHOLD))
+        (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_BACKOFF_THRESHOLD).as[Double] else 0.3
+      val roundRobin: Boolean = if (autoScale && (performer \ PERFORMER_AUTO_SCALE).as[JsObject].keys.contains(PERFORMER_ROUND_ROBIN))
+        (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_ROUND_ROBIN).as[Boolean] else false
+
+
       val jarName: String = (performer \ SOURCE \ SOURCE_NAME).as[String]
       val classPath: String = (performer \ SOURCE \ SOURCE_CLASSPATH).as[String]
       val params:Map[String,String] = getMapOfParams((performer \ SOURCE \ SOURCE_PARAMS).as[JsObject])
@@ -290,7 +309,8 @@
       val location: String = if ( (performer \ SOURCE).as[JsObject].keys.contains(JAR_LOCATION) ) CONFIG.DYNAMIC_JAR_REPO else CONFIG.JAR_REPOSITORY
       val dispatcher: String = if (performer.keys.contains(PERFORMER_DISPATCHER)) (performer \ PERFORMER_DISPATCHER).as[String] else ""
 
-      (id, new Performer(id, jarName, classPath, params, schedule.millisecond, backoff.millisecond, autoScale,controlAware, location, dispatcher))
+      (id, new Performer(id, jarName, classPath, params, schedule.millisecond, backoff.millisecond,
+        autoScale, lowerBound, upperBound,threshold, roundRobin,controlAware, location, dispatcher))
     }).toMap
   }
 
@@ -316,18 +336,24 @@
 /**
   * Holds the performer information
   *
-  * @param uid performer uid
-  * @param jarName performer jar name
-  * @param classPath performer class path
-  * @param parameters performer params
-  * @param schedule performer schedule interval
-  * @param backoff performer backoff interval
-  * @param autoScale if actor was started as a router and can autoscala
-  * @param controlAware if the actor uses a controlAware mailbox
-  * @param jarLocation download jar
-  * @param dispatcher Akka dispatcher that the actor is using
+  * @param uid
+  * @param jarName
+  * @param classPath
+  * @param parameters
+  * @param schedule
+  * @param backoff
+  * @param autoScale
+  * @param lowerBound
+  * @param upperBound
+  * @param backoffThreshold
+  * @param isRoundRobin
+  * @param controlAware
+  * @param jarLocation
+  * @param dispatcher
   */
 case class Performer(uid: String, jarName: String,
-                classPath: String, parameters: Map[String,String],
-                schedule: FiniteDuration, backoff: FiniteDuration,
-                autoScale: Int, controlAware: Boolean, jarLocation: String, dispatcher: String)
+                     classPath: String, parameters: Map[String,String],
+                     schedule: FiniteDuration, backoff: FiniteDuration,
+                     autoScale: Boolean, lowerBound: Int, upperBound: Int,
+                     backoffThreshold: Double, isRoundRobin: Boolean, controlAware: Boolean,
+                     jarLocation: String, dispatcher: String)
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index 8ac18cf..ff4e574 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -190,6 +190,10 @@
   val ORCHESTRATION_NAME = "name"
   val ORCHESTRATION_TIMESTAMP = "timestamp"
   val PERFORMER_AUTO_SCALE = "autoScale"
+  val PERFORMER_LOWER_BOUND = "lowerBound"
+  val PERFORMER_UPPER_BOUND = "upperBound"
+  val PERFORMER_BACKOFF_THRESHOLD = "backoffThreshold"
+  val PERFORMER_ROUND_ROBIN = "roundRobin"
   val CONTROL_AWARE = "controlAware"
   val JAR_LOCATION = "location"
   val JAR_LOCATION_URL = "url"
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
index e7e3554..ae7ce50 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
@@ -68,7 +68,11 @@
       performer.controlAware should equal(false)
       performer.jarName should equal((performerSpec \ SOURCE \ SOURCE_NAME).as[String])
       performer.jarLocation should equal(CONFIG.JAR_REPOSITORY)
-      performer.autoScale should equal(0)
+      performer.autoScale should equal(false)
+      performer.lowerBound should equal(0)
+      performer.upperBound should equal(0)
+      performer.isRoundRobin should equal(false)
+      performer.backoffThreshold should equal(0.3)
       performer.backoff should equal((performerSpec \ BACKOFF).as[Int].millisecond)
       performer.classPath should equal((performerSpec \ SOURCE \ SOURCE_CLASSPATH).as[String])
       performer.uid should equal((performerSpec \ GUID).as[String])
@@ -268,6 +272,9 @@
   var backScheduleRef: ActorRef = _
   val backprocessParamsTB = TestProbe("BACKOFF")
   val routee = """$a"""
+  val routee2 = """$b"""
+  val routee3 = """$c"""
+  val routee4 = """$d"""
 
   s"creating Ensemble with Backoff performer" should {
     s"result in creation of Ensemble actor " in {
@@ -286,8 +293,10 @@
     s"create 'PERFORMER-PARAMS' with backoff time equal to 1 second" in{
       backEnsembleState.performers_metadata.get("PERFORMER-PARAMS").get.backoff should  equal(1000.millisecond)
     }
-    s"create 'PERFORMER-SCHEDUKE' with autoScale equal to true" in{
-      backEnsembleState.performers_metadata.get("PERFORMER-SCHEDULER").get.autoScale should  equal(2)
+    s"create 'PERFORMER-SCHEDULER' with autoScale equal to true" in {
+      backEnsembleState.performers_metadata.get("PERFORMER-SCHEDULER").get.autoScale should equal(true)
+      backEnsembleState.performers_metadata.get("PERFORMER-SCHEDULER").get.lowerBound should equal(4)
+      backEnsembleState.performers_metadata.get("PERFORMER-SCHEDULER").get.upperBound should equal(6)
     }
   }
   s"Performer with backoff enabled" should {
@@ -303,11 +312,14 @@
     "result in router and routees created" in {
       globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
       Thread.sleep(500)
-      IdentifyFeyActors.actorsPath should have size(4)
+      IdentifyFeyActors.actorsPath should have size(7)
       IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005")
       IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-PARAMS")
       IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER")
       IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee")
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee2")
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee3")
+      IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee4")
     }
   }
 
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala b/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala
index 70d3ddc..106a406 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/Utils_JSONTest.scala
@@ -333,7 +333,10 @@
            "guid": "PERFORMER-SCHEDULER",
            "schedule": 200,
            "backoff": 0,
-           "autoScale": 2,
+           "autoScale": {
+            "lowerBound" : 4,
+            "upperBound" : 6
+           },
            "source": {
              "name": "fey-test-actor.jar",
              "classPath": "org.apache.iota.fey.TestActor",
diff --git a/project/Build.scala b/project/Build.scala
index 4444c59..2036523 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -17,6 +17,7 @@
 
 import sbt._
 import sbt.Keys._
+import sbtassembly.AssemblyPlugin.autoImport._
 
 object ModuleDependencies {
 
@@ -53,22 +54,24 @@
     id = "fey-stream",
     base = file("./performers/stream"),
     settings = BasicSettings ++ StreambuildSettings ++ Seq(
-      libraryDependencies ++= ModuleDependencies.StreamDependencies
-
-    ))
+      libraryDependencies ++= ModuleDependencies.StreamDependencies,
+      assemblyJarName in assembly := "fey_stream.jar"
+    )
+  )
 
    lazy val zmq = Project(
     id = "fey-zmq",
     base = file("./performers/zmq"),
     settings = BasicSettings ++ ZMQbuildSettings ++ Seq(
-      libraryDependencies ++= ModuleDependencies.ZMQDependencies
+      libraryDependencies ++= ModuleDependencies.ZMQDependencies,
+        assemblyJarName in assembly := "fey_zmq.jar"
     ))
 
   lazy val virtual_sensor = Project(
     id = "fey-virtual-sensor",
     base = file("./performers/virtual_sensor"),
     settings = BasicSettings ++ VirtualSensorbuildSettings ++ Seq(
-      libraryDependencies ++= ModuleDependencies.VirtualSensorDependencies
+      libraryDependencies ++= ModuleDependencies.VirtualSensorDependencies,
+      assemblyJarName in assembly := "fey-virtual-sensor.jar"
     ))
-
 }
diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala
index c2013b1..8817df7 100644
--- a/project/BuildSettings.scala
+++ b/project/BuildSettings.scala
@@ -65,7 +65,7 @@
     //All tests need to be execute sequentially
     parallelExecution in Test := false,
     testOptions in Test += Tests.Cleanup( () => {
-      println("CLenning up")
+      print("\nCLeaning up")
       removeAll("/tmp/fey/test")
       def removeAll(path: String) = {
         def getRecursively(f: File): Seq[File] =