Merge branch 'IOTA-29' of https://github.com/shiv4nsh/incubator-iota
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index 0e075e3..4db5f98 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -163,21 +163,21 @@
*/
private def createFeyActor(performerID: String, connectionIDs: Array[String], tmpActors:HashMap[String, ActorRef]):(String, ActorRef) = {
if(!tmpActors.contains(performerID)){
- val performerInfo = performers_metadata.getOrElse(performerID, null)
- if (Option(performerInfo).isDefined) {
+ val performerInfo = performers_metadata.get(performerID)
+ if (performerInfo.isDefined) {
val connections: Map[String, ActorRef] = connectionIDs.map(connID => {
createFeyActor(connID, connectors.getOrElse(connID,Array.empty),tmpActors)
}).toMap
var actor: ActorRef = null
- val actorProps = getPerformer(performerInfo, connections)
- if(performerInfo.autoScale) {
+ val actorProps = getPerformer(performerInfo.get, connections)
+ if(performerInfo.get.autoScale) {
- val resizer = DefaultResizer(lowerBound = performerInfo.lowerBound, upperBound = performerInfo.upperBound,
- messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.backoffThreshold, backoffRate = 0.1)
+ val resizer = DefaultResizer(lowerBound = performerInfo.get.lowerBound, upperBound = performerInfo.get.upperBound,
+ messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.get.backoffThreshold, backoffRate = 0.1)
val strategy =
- if(performerInfo.isRoundRobin) {
+ if(performerInfo.get.isRoundRobin) {
log.info(s"Using Round Robin for performer ${performerID}")
RoundRobinPool(1, Some(resizer))
} else {
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
index 970a99f..d5be525 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
@@ -50,7 +50,7 @@
/**
* Keeps reference to the cancellable
*/
- @volatile private var scheduler: Cancellable = null
+ @volatile private var scheduler: Option[Cancellable] = None
@volatile private var endBackoff: Long = 0
private[fey] val monitoring_actor = FEY_MONITOR.actorRef
@@ -111,8 +111,8 @@
* Stops the scheduler
*/
private final def stopScheduler() = {
- if (Option(scheduler).isDefined) {
- scheduler.cancel()
+ if (scheduler.isDefined) {
+ scheduler.get.cancel()
scheduler = null
}
}
@@ -129,14 +129,14 @@
* The time interval to be used is the one passed to the constructor
*/
private final def startScheduler() = {
- if(Option(scheduler).isEmpty && schedulerTimeInterval.toNanos != 0){
- scheduler = context.system.scheduler.schedule(1.seconds, schedulerTimeInterval){
+ if (scheduler.isEmpty && schedulerTimeInterval.toNanos != 0) {
+ scheduler = Option(context.system.scheduler.schedule(1.seconds, schedulerTimeInterval) {
try{
execute()
}catch{
case e: Exception => self ! EXCEPTION(e)
}
- }
+ })
}
}
@@ -146,7 +146,7 @@
* @return true if scheduller is running
*/
final def isShedulerRunning():Boolean = {
- if(Option(scheduler).isDefined && !scheduler.isCancelled){
+ if(scheduler.isDefined && !scheduler.get.isCancelled){
true
}else{
false
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala b/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
index 4e0c3a4..98fa3e8 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
@@ -57,7 +57,7 @@
eventsStore.append(sender().path.toString,MonitorEvent(EVENTS.STOP, timestamp, info))
case RESTART(reason, timestamp) =>
- logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", reason)
+ logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", Option(reason))
eventsStore.append(sender().path.toString,MonitorEvent(EVENTS.RESTART, timestamp, reason.getMessage))
case TERMINATE(actorPath, timestamp, info) =>
@@ -75,7 +75,7 @@
Monitor.simpleEvents.put(sender().path.toString, ('O',timestamp))
case RESTART(reason, timestamp) =>
- logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", reason)
+ logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", Option(reason))
Monitor.simpleEvents.put(sender().path.toString, ('R',timestamp))
case TERMINATE(actorPath, timestamp, info) =>
@@ -87,9 +87,9 @@
case _ =>
}
- def logInfo(path:String, event:String, timestamp: Long, info:String, reason:Throwable = null): Unit = {
- if(Option(reason).isDefined){
- log.error(reason, s"$event | $timestamp | $path | $info")
+ def logInfo(path: String, event: String, timestamp: Long, info: String, reason: Option[Throwable] = null): Unit = {
+ if (reason.isDefined) {
+ log.error(reason.get, s"$event | $timestamp | $path | $info")
}else{
log.info(s"$event | $timestamp | $path | $info")
}
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala b/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
index 902e5f2..bbcb50d 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
@@ -29,25 +29,27 @@
protected class Trie(systemName: String){
- private val root: TrieNode = TrieNode(systemName, ArrayBuffer.empty, ArrayBuffer.empty)
+ private val DEFAULT_TRIE_NODE = TrieNode(systemName, ArrayBuffer.empty, ArrayBuffer.empty)
+
+ private val root: Option[TrieNode] = Option(DEFAULT_TRIE_NODE)
var elements: Int = 0
def append(path: String, event: Monitor.MonitorEvent = null): Unit = {
- append(path.replaceFirst("akka://","").split("/"),root,1,event)
+ append(path.replaceFirst("akka://", "").split("/"), root, 1, event)
}
- @tailrec private def append(path: Array[String], root: TrieNode, index: Int, event: Monitor.MonitorEvent): Unit = {
- if (Option(root).isDefined && index < path.length) {
- var nextRoot = root.children.filter(child => child.path == path(index))
+ @tailrec private def append(path: Array[String], root: Option[TrieNode], index: Int, event: Monitor.MonitorEvent): Unit = {
+ if (root.isDefined && index < path.length) {
+ var nextRoot = root.fold(DEFAULT_TRIE_NODE)(identity).children.filter(child => child.path == path(index))
if(nextRoot.isEmpty){
nextRoot = ArrayBuffer(TrieNode(path(index), ArrayBuffer.empty, ArrayBuffer.empty))
- root.children += nextRoot(0)
+ root.fold(DEFAULT_TRIE_NODE)(identity).children += nextRoot(0)
elements += 1
}
if (Option(event).isDefined && index == path.length - 1) {
nextRoot(0).events += event
}
- append(path, nextRoot(0), index + 1, event)
+ append(path, nextRoot.headOption, index + 1, event)
}
}
@@ -55,13 +57,13 @@
recHasPath(root, path.replaceFirst("akka://","").split("/"),1)
}
- @tailrec private def recHasPath(root: TrieNode, path: Array[String], index: Int): Boolean = {
- if (Option(root).isDefined && index < path.length) {
- var nextRoot = root.children.filter(child => child.path == path(index))
+ @tailrec private def recHasPath(root: Option[TrieNode], path: Array[String], index: Int): Boolean = {
+ if (root.isDefined && index < path.length) {
+ var nextRoot = root.fold(DEFAULT_TRIE_NODE)(identity).children.filter(child => child.path == path(index))
if(nextRoot.isEmpty){
false
}else{
- recHasPath(nextRoot(0), path, index + 1)
+ recHasPath(nextRoot.headOption, path, index + 1)
}
}else{
true
@@ -72,16 +74,16 @@
recGetNode(root, path.replaceFirst("akka://","").split("/"),1)
}
- @tailrec private def recGetNode(root: TrieNode, path: Array[String], index: Int): Option[TrieNode]= {
- if (Option(root).isDefined && index < path.length) {
- var nextRoot = root.children.filter(child => child.path == path(index))
+ @tailrec private def recGetNode(root: Option[TrieNode], path: Array[String], index: Int): Option[TrieNode] = {
+ if (root.isDefined && index < path.length) {
+ var nextRoot = root.fold(DEFAULT_TRIE_NODE)(identity).children.filter(child => child.path == path(index))
if(nextRoot.isEmpty){
None
}else{
if(path.length - 1 == index){
Some(nextRoot(0))
}else {
- recGetNode(nextRoot(0), path, index + 1)
+ recGetNode(nextRoot.headOption, path, index + 1)
}
}
}else{
@@ -91,8 +93,8 @@
def removeAllNodes(): Unit = {
var index = 0
- while(index < root.children.length){
- root.children.remove(index)
+ while (index < root.fold(DEFAULT_TRIE_NODE)(identity).children.length) {
+ root.fold(DEFAULT_TRIE_NODE)(identity).children.remove(index)
index += 1
}
elements = 0
@@ -107,31 +109,31 @@
}
def getRootChildren():ArrayBuffer[TrieNode] = {
- root.children
+ root.fold(DEFAULT_TRIE_NODE)(identity).children
}
- private def getObject(root: TrieNode, parent: TrieNode):JsObject = {
- if (Option(root).isDefined) {
- Json.obj("name" -> root.path,
- "parent" -> (if (Option(parent).isDefined) parent.path else "null"),
- "children" -> root.children.map(getObject(_, root))
+ private def getObject(root: Option[TrieNode], parent: Option[TrieNode]): JsObject = {
+ if (root.isDefined) {
+ Json.obj("name" -> root.fold(DEFAULT_TRIE_NODE)(identity).path,
+ "parent" -> (if (parent.isDefined) parent.fold(DEFAULT_TRIE_NODE)(identity).path else "null"),
+ "children" -> root.fold(DEFAULT_TRIE_NODE)(identity).children.map(a => getObject(Option(a), root))
)
}else{
Json.obj()
}
}
- private def getObjectEvent(root: TrieNode, parent: TrieNode):JsObject = {
- if (Option(root).isDefined) {
- Json.obj("name" -> root.path,
- "parent" -> (if (Option(parent).isDefined) parent.path else "null"),
- "events" -> root.events.map(event => {
+ private def getObjectEvent(root: Option[TrieNode], parent: Option[TrieNode]):JsObject = {
+ if (root.isDefined) {
+ Json.obj("name" -> root.fold(DEFAULT_TRIE_NODE)(identity).path,
+ "parent" -> (if (parent.isDefined) parent.fold(DEFAULT_TRIE_NODE)(identity).path else "null"),
+ "events" -> root.fold(DEFAULT_TRIE_NODE)(identity).events.map(event => {
Json.obj("type" -> event.event,
"timestamp" -> event.timestamp,
"info" -> event.info
)
}),
- "children" -> root.children.map(getObjectEvent(_, root))
+ "children" -> root.fold(DEFAULT_TRIE_NODE)(identity).children.map(a=>getObjectEvent(Option(a), root))
)
}else{
Json.obj()