blob: 426614efd6edab6850244a0fa73a294ce0179989 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iota.fey
import java.nio.file.{Files, Paths}
import akka.actor.{ActorRef, PoisonPill, Props}
import akka.testkit.{EventFilter, TestProbe}
import scala.concurrent.duration.DurationInt
class FeyCoreSpec extends BaseAkkaSpec {
val monitor = TestProbe()
val feyCoreRef = system.actorOf(Props(new FeyCore{
override val monitoring_actor = monitor.ref
}), "FEY-CORE")
val feyPath = feyCoreRef.path.toString
"Creating FeyCore" should {
s"result in creating a child actor with the name '${FeyCore.IDENTIFIER_NAME}'" in {
TestProbe().expectActor(s"/user/FEY-CORE/${FeyCore.IDENTIFIER_NAME}")
}
"result in sending START message to Monitor actor" in {
monitor.expectMsgClass(1.seconds, classOf[Monitor.START])
}
}
"Sending FeyCore.START to FeyCore" should {
s"result in creating a child actor with the name '${FeyCore.JSON_RECEIVER_NAME}'" in {
feyCoreRef ! FeyCore.START
TestProbe().expectActor(s"$feyPath/${FeyCore.JSON_RECEIVER_NAME}")
}
s"result in starting ${GLOBAL_DEFINITIONS.WATCH_SERVICE_THREAD} Thread" in {
TestProbe().isThreadRunning(GLOBAL_DEFINITIONS.WATCH_SERVICE_THREAD) should equal(true)
}
}
var ensemble1ref:ActorRef = _
var ensemble2ref:ActorRef = _
var ensemble1Test1ref:ActorRef = _
var ensemble1Test2ref:ActorRef = _
var ensemble2Test1ref:ActorRef = _
var orchestrationref:ActorRef = _
val orchestration_name = "TEST-ACTOR"
"Sending FeyCore.ORCHESTRATION_RECEIVED with CREATE command to FeyCore" should {
s"result in creating an Orchestration child actor with the name '$orchestration_name'" in {
feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.create_json_test), None)
orchestrationref = TestProbe().expectActor(s"$feyPath/$orchestration_name")
}
s"result in creating an Ensemble child actor with the name '$orchestration_name/MY-ENSEMBLE-0001'" in {
ensemble1ref = TestProbe().expectActor(s"$feyPath/$orchestration_name/MY-ENSEMBLE-0001")
}
s"result in creating an Ensemble child actor with the name '$orchestration_name/MY-ENSEMBLE-0002'" in {
ensemble2ref = TestProbe().expectActor(s"$feyPath/$orchestration_name/MY-ENSEMBLE-0002")
}
s"result in creating a Performer child actor with the name '$orchestration_name/MY-ENSEMBLE-0001/TEST-0001'" in {
ensemble1Test1ref = TestProbe().expectActor(s"$feyPath/$orchestration_name/MY-ENSEMBLE-0001/TEST-0001")
}
s"result in creating a Performer child actor with the name '$orchestration_name/MY-ENSEMBLE-0002/TEST-0001'" in {
ensemble2Test1ref = TestProbe().expectActor(s"$feyPath/$orchestration_name/MY-ENSEMBLE-0002/TEST-0001")
}
s"result in new entry to FEY_CACHE.activeOrchestrations with key '$orchestration_name'" in {
FEY_CACHE.activeOrchestrations should contain key(orchestration_name)
}
}
"Sending FeyCore.ORCHESTRATION_RECEIVED with UPDATE command to FeyCore" should {
s"result in creating a new Performer child actor with the name '$orchestration_name/MY-ENSEMBLE-0001/TEST-0002'" in {
feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.update_json_test), None)
ensemble1Test2ref = TestProbe().expectActor(s"$feyPath/$orchestration_name/MY-ENSEMBLE-0001/TEST-0002")
}
}
"Sending FeyCore.ORCHESTRATION_RECEIVED with UPDATE command and DELETE ensemble to FeyCore" should {
s"result in termination of Ensemble with the name '$orchestration_name/MY-ENSEMBLE-0001'" in {
feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.update_delete_json_test), None)
TestProbe().verifyActorTermination(ensemble1ref)
}
s"result in termination of Performer with the name '$orchestration_name/MY-ENSEMBLE-0001/TEST-0001'" in {
TestProbe().notExpectActor(ensemble1Test1ref.path.toString)
}
s"result in termination of Performer with the name '$orchestration_name/MY-ENSEMBLE-0001/TEST-0002'" in {
TestProbe().notExpectActor(ensemble1Test2ref.path.toString)
}
}
"Sending FeyCore.ORCHESTRATION_RECEIVED with RECREATE command and same Timestamp to FeyCore" should {
s"result in logging a 'not recreated' message at Warn " in {
EventFilter.warning(pattern = s".*$orchestration_name not recreated.*", occurrences = 1) intercept {
feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.recreate_timestamp_json_test), None)
}
}
}
"Sending FeyCore.JSON_TREE to FeyCore" should {
s"result in logging a 6 path messages at Info " in {
EventFilter.info(pattern = s"^akka://.*/user/.*", occurrences = 7) intercept {
feyCoreRef ! FeyCore.JSON_TREE
}
}
}
"Sending FeyCore.ORCHESTRATION_RECEIVED with DELETE command to FeyCore" should {
s"result in termination of Orchestration with the name '$orchestration_name'" in {
feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.delete_json_test), None)
TestProbe().verifyActorTermination(orchestrationref)
}
"result in sending TERMINATE message to Monitor actor" in {
monitor.expectMsgClass(1.seconds, classOf[Monitor.TERMINATE])
}
s"result in termination of Ensemble with the name '$orchestration_name/MY-ENSEMBLE-0002'" in {
TestProbe().notExpectActor(ensemble2ref.path.toString)
}
s"result in termination of Performer with the name '$orchestration_name/MY-ENSEMBLE-0002/TEST-0001'" in {
TestProbe().notExpectActor(ensemble2Test1ref.path.toString)
}
s"result in removing key '$orchestration_name' at FEY_CACHE.activeOrchestrations" in {
FEY_CACHE.activeOrchestrations should not contain key(orchestration_name)
}
}
"Sending FeyCore.STOP_EMPTY_ORCHESTRATION to FeyCore" should {
s"result in termination of 'TEST-ORCH-2'" in {
feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.orchestration_test_json), None)
val ref = TestProbe().expectActor(s"$feyPath/TEST-ORCH-2")
FEY_CACHE.activeOrchestrations should have size(1)
FEY_CACHE.activeOrchestrations should contain key("TEST-ORCH-2")
feyCoreRef ! FeyCore.STOP_EMPTY_ORCHESTRATION("TEST-ORCH-2")
TestProbe().verifyActorTermination(ref)
}
s"result in sending Terminate message to Monitor actor" in{
monitor.expectMsgClass(1.seconds, classOf[Monitor.TERMINATE])
}
s"result in empty FEY_CACHE.activeOrchestrations" in {
FEY_CACHE.activeOrchestrations shouldBe empty
}
}
var receiverRef:ActorRef = _
var receiverEnsenble:ActorRef = _
var receiverOrch:ActorRef = _
val receiverJSON = getJSValueFromString(Utils_JSONTest.generic_receiver_json)
val receiverOrchName = (receiverJSON \ JSON_PATH.GUID).as[String]
"Sending FeyCore.ORCHESTRATION_RECEIVED with CREATE command to FeyCore of a GenericReceiverActor" should {
s"result in creating an Orchestration child actor with the name '$receiverOrchName'" in {
feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(receiverJSON, None)
receiverOrch = TestProbe().expectActor(s"$feyPath/$receiverOrchName")
}
s"result in creating an Ensemble child actor with the name '$receiverOrchName/RECEIVER-ENSEMBLE'" in {
receiverEnsenble = TestProbe().expectActor(s"$feyPath/$receiverOrchName/RECEIVER-ENSEMBLE")
}
s"result in creating a Performer child actor with the name '$receiverOrchName/RECEIVER-ENSEMBLE/MY_RECEIVER_PERFORMER'" in {
receiverRef = TestProbe().expectActor(s"$feyPath/$receiverOrchName/RECEIVER-ENSEMBLE/MY_RECEIVER_PERFORMER")
}
s"result in new entry to FEY_CACHE.activeOrchestrations with key '$receiverOrchName'" in {
FEY_CACHE.activeOrchestrations should contain key(receiverOrchName)
}
}
"Sending PROCESS message to the Receiver Performer" should {
"Send FeyCore.ORCHESTRATION_RECEIVED to FeyCore" in {
receiverRef ! FeyGenericActor.PROCESS(Utils_JSONTest.json_for_receiver_test)
TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER", FEY_SYSTEM.system)
}
s"result in creating an Orchestration child actor with the name 'RECEIVED-BY-ACTOR-RECEIVER'" in {
TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER", FEY_SYSTEM.system)
}
s"result in creating an Ensemble child actor with the name 'RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0001'" in {
TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0001", FEY_SYSTEM.system)
}
s"result in creating an Ensemble child actor with the name 'RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0002'" in {
TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0002", FEY_SYSTEM.system)
}
s"result in creating a Performer child actor with the name 'RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0002/TEST-0001'" in {
TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0002/TEST-0001", FEY_SYSTEM.system)
}
s"result in creating a Performer child actor with the name 'RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0001/TEST-0001'" in {
TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER/MY-ENSEMBLE-REC-0001/TEST-0001", FEY_SYSTEM.system)
}
s"result in one new entry to FEY_CACHE.activeOrchestrations with key 'RECEIVED-BY-ACTOR-RECEIVER'" in {
FEY_CACHE.activeOrchestrations should have size(2)
FEY_CACHE.activeOrchestrations should contain key(receiverOrchName)
FEY_CACHE.activeOrchestrations should contain key("RECEIVED-BY-ACTOR-RECEIVER")
}
}
"Sending PROCESS message to the Receiver Performer with command DELETE" should {
"STOP running orchestration" in {
val ref = TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER", FEY_SYSTEM.system)
receiverRef ! FeyGenericActor.PROCESS(Utils_JSONTest.json_for_receiver_test_delete)
TestProbe().verifyActorTermination(ref)
}
s"result in one entry in FEY_CACHE.activeOrchestrations" in {
FEY_CACHE.activeOrchestrations should have size(1)
FEY_CACHE.activeOrchestrations should contain key(receiverOrchName)
}
}
"Sending PROCESS message to Receiver with checkpoint enabled" should {
"Save received JSON to checkpoint dir" in {
CONFIG.CHEKPOINT_ENABLED = true
receiverRef ! FeyGenericActor.PROCESS(Utils_JSONTest.json_for_receiver_test)
TestProbe().expectActorInSystem(s"${FEY_CORE_ACTOR.actorRef.path}/RECEIVED-BY-ACTOR-RECEIVER", FEY_SYSTEM.system)
Files.exists(Paths.get(s"${CONFIG.CHECKPOINT_DIR}/RECEIVED-BY-ACTOR-RECEIVER.json")) should be(true)
CONFIG.CHEKPOINT_ENABLED = false
}
}
val global_orch_name = "GLOBAL-PERFORMER"
var globalRef:ActorRef= null
"Sending FeyCore.ORCHESTRATION_RECEIVED with CREATE AND GLOBAL performer command to FeyCore" should {
s"result in creating an Orchestration child actor with the name '$global_orch_name'" in {
feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.global_perf_test), None)
orchestrationref = TestProbe().expectActor(s"$feyPath/$global_orch_name")
}
s"result in creating an Ensemble child actor with the name '$global_orch_name/ENS-GLOBAL'" in {
ensemble1ref = TestProbe().expectActor(s"$feyPath/$global_orch_name/ENS-GLOBAL")
}
s"result in creating a global Performer child actor with the name '$global_orch_name/GLOBAL_MANAGER/GLOBAL-TEST'" in {
globalRef = TestProbe().expectActor(s"$feyPath/$global_orch_name/GLOBAL_MANAGER/GLOBAL-TEST")
}
s"result in creating a Performer child actor with the name '$global_orch_name/ENS-GLOBAL/PERFORMER-SCHEDULER'" in {
ensemble2Test1ref = TestProbe().expectActor(s"$feyPath/$global_orch_name/ENS-GLOBAL/PERFORMER-SCHEDULER")
}
s"result in new entry to FEY_CACHE.activeOrchestrations with key '$global_orch_name'" in {
FEY_CACHE.activeOrchestrations should contain key(global_orch_name)
}
s"result in one global actor created for orchestration" in {
GlobalPerformer.activeGlobalPerformers should have size(1)
GlobalPerformer.activeGlobalPerformers should contain key(global_orch_name)
}
s"result in globa metadata add to table" in {
ORCHESTRATION_CACHE.orchestration_globals should have size(1)
ORCHESTRATION_CACHE.orchestration_globals should contain key(global_orch_name)
}
s"result in right running actors" in {
globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(feyCoreRef.path.toString)
Thread.sleep(500)
IdentifyFeyActors.actorsPath should contain(s"$feyPath/$global_orch_name")
IdentifyFeyActors.actorsPath should contain(s"$feyPath/$global_orch_name/GLOBAL_MANAGER")
IdentifyFeyActors.actorsPath should contain(s"$feyPath/$global_orch_name/GLOBAL_MANAGER/GLOBAL-TEST")
IdentifyFeyActors.actorsPath should contain(s"$feyPath/$global_orch_name/ENS-GLOBAL/PERFORMER-SCHEDULER")
}
}
"Stopping Global actor" should {
"result in sending logging error" in {
EventFilter.error(pattern = s".*DEAD Global Performer.*", occurrences = 1) intercept {
globalRef ! PoisonPill
}
}
"result in orchestration restarted" in {
TestProbe().expectActor(s"$feyPath/$global_orch_name/GLOBAL_MANAGER/GLOBAL-TEST")
TestProbe().expectActor(s"$feyPath/$global_orch_name/ENS-GLOBAL/PERFORMER-SCHEDULER")
TestProbe().expectActor(s"$feyPath/$global_orch_name/ENS-GLOBAL")
}
"all previous actors restarted" in {
val routee = """$a"""
val routee2 = """$b"""
val routee3 = """$c"""
val routee4 = """$d"""
globalIdentifierRef ! IdentifyFeyActors.IDENTIFY_TREE(s"${feyCoreRef.path.toString}/$global_orch_name")
Thread.sleep(500)
IdentifyFeyActors.actorsPath should have size (8)
IdentifyFeyActors.actorsPath should contain(s"$feyPath/$global_orch_name/GLOBAL_MANAGER")
IdentifyFeyActors.actorsPath should contain(s"$feyPath/$global_orch_name/GLOBAL_MANAGER/GLOBAL-TEST")
IdentifyFeyActors.actorsPath should contain(s"$feyPath/$global_orch_name/ENS-GLOBAL")
IdentifyFeyActors.actorsPath should contain(s"$feyPath/$global_orch_name/ENS-GLOBAL/PERFORMER-SCHEDULER")
IdentifyFeyActors.actorsPath should contain(s"$feyPath/$global_orch_name/ENS-GLOBAL/PERFORMER-SCHEDULER/$routee")
IdentifyFeyActors.actorsPath should contain(s"$feyPath/$global_orch_name/ENS-GLOBAL/PERFORMER-SCHEDULER/$routee2")
IdentifyFeyActors.actorsPath should contain(s"$feyPath/$global_orch_name/ENS-GLOBAL/PERFORMER-SCHEDULER/$routee3")
IdentifyFeyActors.actorsPath should contain(s"$feyPath/$global_orch_name/ENS-GLOBAL/PERFORMER-SCHEDULER/$routee4")
}
}
"Stopping orchestration with global performer" should {
"result in sending TERMINATE message to Monitor actor" in {
orchestrationref ! PoisonPill
monitor.expectMsgClass(1.seconds, classOf[Monitor.TERMINATE])
}
"result in no global actors for orchestration" in {
GlobalPerformer.activeGlobalPerformers should have size(0)
ORCHESTRATION_CACHE.orchestration_globals should have size(0)
ORCHESTRATION_CACHE.orchestration_metadata should not contain key(global_orch_name)
}
}
"Stopping FeyCore" should {
"result in sending STOP message to Monitor actor" in {
feyCoreRef ! PoisonPill
monitor.expectMsgClass(1.seconds, classOf[Monitor.STOP])
TestProbe().verifyActorTermination(receiverRef)
}
}
//TODO: Test restart
//TODO: Test checkpoint
}