[IOTA-28] Preparing for Receiver actor
- Creating separate object for fey_core ActorRef
- Updating ORCHESTRATION_RECEIVED to accept Option[File]
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
index eb08d63..637fbf0 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
@@ -42,8 +42,9 @@
import FEY_SYSTEM._
- val fey = system.actorOf(FeyCore.props, name = "FEY-CORE")
- fey ! FeyCore.START
+ FEY_CORE_ACTOR
+
+ FEY_CORE_ACTOR.actorRef ! FeyCore.START
val service = system.actorOf(Props[MyServiceActor], name = "FEY_REST_API")
@@ -52,6 +53,11 @@
}
+object FEY_CORE_ACTOR{
+ import FEY_SYSTEM._
+ val actorRef = system.actorOf(FeyCore.props, name = "FEY-CORE")
+}
+
object FEY_MONITOR{
import FEY_SYSTEM._
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
index 8028f05..4909a0b 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
@@ -52,17 +52,15 @@
val jsonReceiverActor: ActorRef = context.actorOf(Props[JsonReceiverActor], name = JSON_RECEIVER_NAME)
context.watch(jsonReceiverActor)
- case ORCHESTRATION_RECEIVED(orchestrationJson, file) =>
- log.info(s"NEW FILE ${file.getAbsolutePath}")
- try{
- processJson(orchestrationJson)
- renameProcessedFile(file, "processed")
- }catch {
- case e: Exception =>
- renameProcessedFile(file, "failed")
- log.error(e, s"JSON not processed ${file.getAbsolutePath}")
+ case ORCHESTRATION_RECEIVED(orchestrationJson, optionFile) =>
+ optionFile match {
+ case Some(file) =>
+ orchestrationReceivedWithFile(orchestrationJson, file)
+ case None =>
+ orchestrationReceivedNoFile(orchestrationJson)
}
+
case STOP_EMPTY_ORCHESTRATION(orchID) =>
log.warning(s"Deleting Empty Orchestration $orchID")
deleteOrchestration(orchID)
@@ -76,6 +74,29 @@
}
+ private def orchestrationReceivedNoFile(json: JsValue) = {
+ val orchGUID = (json \ GUID).as[String]
+ log.info(s"Orchestration $orchGUID received")
+ try{
+ processJson(json)
+ }catch {
+ case e: Exception =>
+ log.error(e, s"JSON for orchestration $orchGUID could not be processed")
+ }
+ }
+
+ private def orchestrationReceivedWithFile(json: JsValue, file: File) = {
+ log.info(s"NEW FILE ${file.getAbsolutePath}")
+ try{
+ processJson(json)
+ renameProcessedFile(file, "processed")
+ }catch {
+ case e: Exception =>
+ renameProcessedFile(file, "failed")
+ log.error(e, s"JSON not processed ${file.getAbsolutePath}")
+ }
+ }
+
private def processTerminatedMessage(actorRef: ActorRef) = {
monitoring_actor ! Monitor.TERMINATE(actorRef.path.toString, Utils.getTimestamp)
log.info(s"TERMINATED ${actorRef.path.name}")
@@ -306,7 +327,8 @@
* @param json
* @param file
*/
- case class ORCHESTRATION_RECEIVED(json: JsValue, file: File)
+ case class ORCHESTRATION_RECEIVED(json: JsValue, file: Option[File])
+
case class STOP_EMPTY_ORCHESTRATION(orchID: String)
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
index 9a6fb78..36f39fa 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
@@ -1,3 +1,4 @@
+
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
index 1664478..3fb5330 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
@@ -29,13 +29,16 @@
import JsonReceiverActor._
val monitoring_actor = FEY_MONITOR.actorRef
- val watchFileTask = new WatchServiceReceiver(self)
- var watchThread = new Thread(watchFileTask, GLOBAL_DEFINITIONS.WATCH_SERVICE_THREAD)
+ var watchFileTask: WatchServiceReceiver = _
+ var watchThread: Thread = _
override def preStart() {
prepareDynamicJarRepo()
processCheckpointFiles()
+ watchFileTask = new WatchServiceReceiver(self)
+ watchThread = new Thread(watchFileTask, GLOBAL_DEFINITIONS.WATCH_SERVICE_THREAD)
+
monitoring_actor ! Monitor.START(Utils.getTimestamp)
watchThread.setDaemon(true)
watchThread.start()
@@ -74,7 +77,7 @@
override def receive: Receive = {
case JSON_RECEIVED(json, file) =>
log.info(s"JSON RECEIVED => ${Json.stringify(json)}")
- context.parent ! FeyCore.ORCHESTRATION_RECEIVED(json, file)
+ context.parent ! FeyCore.ORCHESTRATION_RECEIVED(json, Some(file))
case _ =>
}
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala b/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
index 41a6982..0935c5b 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/MyService.scala
@@ -49,7 +49,7 @@
get{
respondWithMediaType(`text/html`) {
complete {
- SYSTEM_ACTORS.fey ! JSON_TREE
+ FEY_CORE_ACTOR.actorRef ! JSON_TREE
Thread.sleep(2000)
val json = IdentifyFeyActors.generateTreeJson()
IdentifyFeyActors.getHTMLTree(json)
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala
index b5a2ca9..c616821 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/FeyCoreSpec.scala
@@ -62,7 +62,7 @@
val orchestration_name = "TEST-ACTOR"
"Sending FeyCore.ORCHESTRATION_RECEIVED with CREATE command to FeyCore" should {
- feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.create_json_test), new File("/tmp/fey/test/json"))
+ feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.create_json_test), None)
s"result in creating an Orchestration child actor with the name '$orchestration_name'" in {
orchestrationref = TestProbe().expectActor(s"$feyPath/$orchestration_name")
}
@@ -85,14 +85,14 @@
"Sending FeyCore.ORCHESTRATION_RECEIVED with UPDATE command to FeyCore" should {
s"result in creating a new Performer child actor with the name '$orchestration_name/MY-ENSEMBLE-0001/TEST-0002'" in {
- feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.update_json_test), new File("/tmp/fey/test/json"))
+ feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.update_json_test), None)
ensemble1Test2ref = TestProbe().expectActor(s"$feyPath/$orchestration_name/MY-ENSEMBLE-0001/TEST-0002")
}
}
"Sending FeyCore.ORCHESTRATION_RECEIVED with UPDATE command and DELETE ensemble to FeyCore" should {
s"result in termination of Ensemble with the name '$orchestration_name/MY-ENSEMBLE-0001'" in {
- feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.update_delete_json_test), new File("/tmp/fey/test/json"))
+ feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.update_delete_json_test), None)
TestProbe().verifyActorTermination(ensemble1ref)
}
s"result in termination of Performer with the name '$orchestration_name/MY-ENSEMBLE-0001/TEST-0001'" in {
@@ -106,7 +106,7 @@
"Sending FeyCore.ORCHESTRATION_RECEIVED with RECREATE command and same Timestamp to FeyCore" should {
s"result in logging a 'not recreated' message at Warn " in {
EventFilter.warning(pattern = s".*$orchestration_name not recreated.*", occurrences = 1) intercept {
- feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.recreate_timestamp_json_test), new File("/tmp/fey/test/json"))
+ feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.recreate_timestamp_json_test), None)
}
}
}
@@ -121,7 +121,7 @@
"Sending FeyCore.ORCHESTRATION_RECEIVED with DELETE command to FeyCore" should {
s"result in termination of Orchestration with the name '$orchestration_name'" in {
- feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.delete_json_test), new File("/tmp/fey/test/json"))
+ feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.delete_json_test), None)
TestProbe().verifyActorTermination(orchestrationref)
}
"result in sending TERMINATE message to Monitor actor" in {
@@ -140,7 +140,7 @@
"Sending FeyCore.STOP_EMPTY_ORCHESTRATION to FeyCore" should {
s"result in termination of 'TEST-ORCH-2'" in {
- feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.orchestration_test_json), new File("/tmp/fey/test/json"))
+ feyCoreRef ! FeyCore.ORCHESTRATION_RECEIVED(getJSValueFromString(Utils_JSONTest.orchestration_test_json), None)
val ref = TestProbe().expectActor(s"$feyPath/TEST-ORCH-2")
FEY_CACHE.activeOrchestrations should have size(1)
FEY_CACHE.activeOrchestrations should contain key("TEST-ORCH-2")