FeyGenericActor Tests
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
index f1c94e1..9a6fb78 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
@@ -51,7 +51,7 @@
*/
@volatile private var scheduler: Cancellable = null
@volatile private var endBackoff: Long = 0
- private val monitoring_actor = FEY_MONITOR.actorRef
+ private[fey] val monitoring_actor = FEY_MONITOR.actorRef
override final def receive: Receive = {
@@ -140,6 +140,25 @@
}
/**
+ * Check state of scheduler
+ * @return true if scheduller is running
+ */
+ final def isShedulerRunning():Boolean = {
+ if(scheduler != null && !scheduler.isCancelled){
+ true
+ }else{
+ false
+ }
+ }
+
+ /**
+ * get endBackoff
+ * @return
+ */
+ final def getEndBackoff(): Long = {
+ endBackoff
+ }
+ /**
* Called by the scheduler.
*/
def execute(): Unit = {
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
index 0b3ea56..f968804 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/EnsembleSpec.scala
@@ -37,7 +37,7 @@
override val monitoring_actor = monitor.ref
}), parent.ref, (ensembleJson \ JSON_PATH.GUID).as[String])
- val ensembleState = ensembleRef.underlyingActor
+ var ensembleState = ensembleRef.underlyingActor
var simplePerformerRef: ActorRef = _
@@ -93,7 +93,7 @@
}
s"result in Performer 'TEST-0004' restarted" in {
val newPerformer = TestProbe().expectActor(s"${parent.ref.path}/${(ensembleJson \ JSON_PATH.GUID).as[String]}/TEST-0004")
- newPerformer.compareTo(simplePerformerRef) should not be(0)
+ newPerformer should not equal(simplePerformerRef)
}
"result in two paths added to IdentifyFeyActors.actorsPath" in{
globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
@@ -213,13 +213,14 @@
monitor.expectMsgClass(classOf[Monitor.STOP])
monitor.expectMsgClass(classOf[Monitor.RESTART])
monitor.expectMsgClass(classOf[Monitor.START])
+ ensembleState = ensembleRef.underlyingActor
}
"keep ensemble actorRef when restarted" in {
- TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}").compareTo(advEnsembleRef) should be(0)
+ TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}") should equal(advEnsembleRef)
}
"stop and start the performer with a new reference" in{
- TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-SCHEDULER").compareTo(scheduleRef) should not be(0)
- TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-PARAMS").compareTo(paramsRef) should not be(0)
+ TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-SCHEDULER") should not equal(scheduleRef)
+ TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-PARAMS") should not equal(paramsRef)
scheduleRef = TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-SCHEDULER")
paramsRef = TestProbe().expectActor(s"${parent.ref.path}/${(advEnsembleJson \ JSON_PATH.GUID).as[String]}/PERFORMER-PARAMS")
}
@@ -266,6 +267,7 @@
var backParamsRef: ActorRef = _
var backScheduleRef: ActorRef = _
val backprocessParamsTB = TestProbe("BACKOFF")
+ val routee = """$a"""
s"creating Ensemble with Backoff performer" should {
s"result in creation of Ensemble actor " in {
@@ -292,8 +294,8 @@
"not process messages during the backoff period" in{
backScheduleRef ! ((schedulerScheduleTB.ref, system.deadLetters, generalScheduleTB.ref))
backParamsRef ! ((system.deadLetters, backprocessParamsTB.ref, generalParamsTB.ref))
- backprocessParamsTB.expectMsg("PROCESS - EXECUTE - akka://FEY-TEST/system/ENSEMBLE-1/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$a")
- backprocessParamsTB.expectNoMsg(1000.milliseconds)
+ backprocessParamsTB.expectMsg(s"PROCESS - EXECUTE - ${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee")
+ backprocessParamsTB.expectNoMsg(990.milliseconds)
}
}
@@ -302,10 +304,10 @@
globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
Thread.sleep(500)
IdentifyFeyActors.actorsPath should have size(4)
- IdentifyFeyActors.actorsPath should contain("akka://FEY-TEST/system/ENSEMBLE-1/MY-ENSEMBLE-0005")
- IdentifyFeyActors.actorsPath should contain("akka://FEY-TEST/system/ENSEMBLE-1/MY-ENSEMBLE-0005/PERFORMER-PARAMS")
- IdentifyFeyActors.actorsPath should contain("akka://FEY-TEST/system/ENSEMBLE-1/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER")
- IdentifyFeyActors.actorsPath should contain("akka://FEY-TEST/system/ENSEMBLE-1/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$a")
+ IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005")
+ IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-PARAMS")
+ IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER")
+ IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/MY-ENSEMBLE-0005/PERFORMER-SCHEDULER/$routee")
}
}
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorSpec.scala
new file mode 100644
index 0000000..3b5a117
--- /dev/null
+++ b/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorSpec.scala
@@ -0,0 +1,188 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.{ActorRef, Props}
+import akka.testkit.{EventFilter, TestActorRef, TestProbe}
+
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
+
+class FeyGenericActorSpec extends BaseAkkaSpec{
+
+ val parent = TestProbe("GENERIC-PARENT")
+ val monitor = TestProbe("MONITOR-GENERIC")
+ val connectTB = TestProbe("CONNECT_TO")
+ val genericRef: TestActorRef[FeyGenericActorTest] = TestActorRef[FeyGenericActorTest]( Props(new FeyGenericActorTest(Map.empty, 1.seconds,
+ Map("CONNECTED" -> connectTB.ref),300.milliseconds,"MY-ORCH", "MY-ORCH", false){
+ override private[fey] val monitoring_actor = monitor.ref
+ }),parent.ref, "GENERIC-TEST")
+ var genericState:FeyGenericActorTest = genericRef.underlyingActor
+ val path = genericRef.path.toString
+ var latestBackoff: Long = 0
+
+ "Creating a GenericActor with Schedule time defined" should {
+ "result in scheduler started" in{
+ genericState.isShedulerRunning() should be(true)
+ }
+ "result in onStart method called" in {
+ genericState.started should be(true)
+ }
+ "result in START message sent to Monitor" in{
+ monitor.expectMsgClass(classOf[Monitor.START])
+ }
+ "result in one active actor" in {
+ globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
+ Thread.sleep(500)
+ IdentifyFeyActors.actorsPath should have size(1)
+ IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/GENERIC-TEST")
+ }
+ }
+
+ "Backoff of GenericActor" should {
+ "be zero until the first PROCESS message" in {
+ genericState.getEndBackoff() should equal(0)
+ }
+ "change when first PROCESS message was received" in{
+ genericRef ! FeyGenericActor.PROCESS("TEST1")
+ latestBackoff = genericState.getEndBackoff()
+ latestBackoff should not equal(0)
+ connectTB.expectMsgClass(classOf[FeyGenericActor.PROCESS[String]])
+ }
+ }
+
+ "Sending PROCESS message to GenericActor" should{
+ "call processMessage method" in{
+ genericState.processed should be(true)
+ }
+ }
+
+ "customReceive method" should {
+ "process any non treated message" in{
+ genericRef ! "TEST_CUSTOM"
+ genericState.count should equal(1)
+ }
+ }
+
+ "Sending PROCESS message to GenericActor" should {
+ "be discarded when backoff is enabled" in{
+ genericRef ! FeyGenericActor.PROCESS("DISCARDED")
+ latestBackoff should equal(genericState.getEndBackoff())
+ genericRef ! FeyGenericActor.PROCESS("DISCARDED2")
+ connectTB.expectNoMsg(100.milliseconds)
+ }
+ "be processed when backoff has finished" in{
+ while(latestBackoff >= System.nanoTime()){}
+ genericRef ! FeyGenericActor.PROCESS("SHOULD BE PROCESSED")
+ connectTB.expectMsgClass(classOf[FeyGenericActor.PROCESS[String]])
+ latestBackoff = genericState.getEndBackoff()
+ }
+ }
+
+ "Calling startBackoff" should {
+ "set endBackoff with time now" in {
+ genericState.getEndBackoff() > latestBackoff
+ }
+ }
+
+ "Calling propagateMessage" should {
+ "send message to connectTo actors" in {
+ genericState.propagateMessage("PROPAGATE")
+ connectTB.expectMsgClass(classOf[FeyGenericActor.PROCESS[String]])
+ }
+ }
+
+ "Scheduler component" should{
+ "call execute() method" in {
+ genericState.executing should be(true)
+ }
+ }
+
+ "Sending EXCEPTION(IllegalArgumentException) message to GenericActor" should{
+ "Throw IllegalArgumentException" in{
+ EventFilter[IllegalArgumentException](occurrences = 1) intercept {
+ genericRef ! FeyGenericActor.EXCEPTION(new IllegalArgumentException("Testing"))
+ }
+ }
+ "Result in restart of the actor with sequence of Monitoring: STOP -> RESTART -> START" in{
+ monitor.expectMsgClass(classOf[Monitor.STOP])
+ monitor.expectMsgClass(classOf[Monitor.RESTART])
+ //Restart does not change the actorRef but does change the object inside the ActorReference
+ genericState = genericRef.underlyingActor
+ monitor.expectMsgClass(classOf[Monitor.START])
+ }
+ "call onStart method" in {
+ genericState.started should be(true)
+ }
+ "call onRestart method" in {
+ Thread.sleep(100)
+ genericState.restarted should be(true)
+ }
+ "restart scheduler" in {
+ genericState.isShedulerRunning() should be(true)
+ }
+ }
+
+ "Sending STOP to GenericActor" should{
+ "terminate GenericActor" in{
+ genericRef ! FeyGenericActor.STOP
+ TestProbe().verifyActorTermination(genericRef)
+ TestProbe().notExpectActor(path)
+ }
+ "call onStop method" in {
+ genericState.stopped should be(true)
+ }
+ "cancel scheduler" in{
+ genericState.isShedulerRunning() should be(false)
+ }
+ "send STOP - TERMINATE message to Monitor" in{
+ monitor.expectMsgClass(classOf[Monitor.STOP])
+ }
+ "result in no active actors" in {
+ globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
+ Thread.sleep(500)
+ IdentifyFeyActors.actorsPath shouldBe empty
+ }
+ }
+
+
+ val connectTB2 = TestProbe("CONNECT2_TO")
+ var generic2Ref: TestActorRef[FeyGenericActorTest] = _
+ var generic2State:FeyGenericActorTest = _
+
+ "Creating GenericActor with schedule anc backoff equal to zero" should{
+ "not start a scheduler" in{
+ generic2Ref = TestActorRef[FeyGenericActorTest]( Props(new FeyGenericActorTest(Map.empty, 0.seconds,
+ Map("CONNECTED" -> connectTB2.ref),0.milliseconds,"MY-ORCH-2", "MY-ORCH-2", false)),parent.ref, "GENERIC-TEST-2")
+ generic2State = generic2Ref.underlyingActor
+ generic2State.isShedulerRunning() should be(false)
+ }
+ "result in one active actor" in {
+ globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(parent.ref.path.toString)
+ Thread.sleep(500)
+ IdentifyFeyActors.actorsPath should have size(1)
+ IdentifyFeyActors.actorsPath should contain(s"${parent.ref.path}/GENERIC-TEST-2")
+ }
+ "result in no discarded PROCESS messages" in {
+ generic2Ref ! FeyGenericActor.PROCESS("NOT-DISCARDED")
+ connectTB2.expectMsgClass(classOf[FeyGenericActor.PROCESS[String]])
+ generic2Ref ! FeyGenericActor.PROCESS("NOT-DISCARDED2")
+ connectTB2.expectMsgClass(classOf[FeyGenericActor.PROCESS[String]])
+ }
+ }
+}
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorTest.scala b/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorTest.scala
new file mode 100644
index 0000000..4e248ec
--- /dev/null
+++ b/fey-core/src/test/scala/org/apache/iota/fey/FeyGenericActorTest.scala
@@ -0,0 +1,68 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iota.fey
+
+import akka.actor.ActorRef
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
+import scala.concurrent.duration.FiniteDuration
+
+class FeyGenericActorTest(override val params: Map[String,String] = Map.empty,
+ override val backoff: FiniteDuration = 1.minutes,
+ override val connectTo: Map[String,ActorRef] = Map.empty,
+ override val schedulerTimeInterval: FiniteDuration = 2.seconds,
+ override val orchestrationName: String = "",
+ override val orchestrationID: String = "",
+ override val autoScale: Boolean = false) extends FeyGenericActor {
+
+ var count = 0
+ var started = false
+ var processed = false
+ var executing = false
+ var stopped = false
+ var restarted = false
+
+ override def onStart(): Unit = {
+ started = true
+ }
+
+ override def processMessage[T](message: T, sender: ActorRef): Unit = {
+ processed = true
+ log.info(s"Processing message ${message.toString}")
+ propagateMessage(s"PROPAGATING FROM ${self.path.name} - Message: ${message.toString}")
+ startBackoff()
+ }
+
+ override def execute(): Unit = {
+ log.info(s"Executing action in ${self.path.name}")
+ executing = true
+ }
+
+ override def customReceive: Receive = {
+ case "TEST_CUSTOM" => count+=1
+ }
+
+ override def onStop(): Unit = {
+ log.info(s"Actor ${self.path.name} stopped.")
+ stopped = true
+ }
+
+ override def onRestart(reason: Throwable): Unit = {
+ restarted = true
+ }
+}
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/OrchestrationSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/OrchestrationSpec.scala
index 8719f36..fff4bab 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/OrchestrationSpec.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/OrchestrationSpec.scala
@@ -68,7 +68,7 @@
TestProbe().expectActor(s"${orchRef.path}/${(ensemble2 \ JSON_PATH.GUID).as[String]}/TEST-0001")
TestProbe().expectActor(s"${orchRef.path}/${(ensemble1 \ JSON_PATH.GUID).as[String]}/TEST-0001")
}
- s"result in two entries in Orchestration.ensembles matching the craeted ensembles" in {
+ s"result in two entries in Orchestration.ensembles matching the created ensembles" in {
orchState.ensembles should have size(2)
orchState.ensembles should contain key((ensemble1 \ JSON_PATH.GUID).as[String])
orchState.ensembles should contain key((ensemble2 \ JSON_PATH.GUID).as[String])