Merge branch 'IOTA-29' of https://github.com/shiv4nsh/incubator-iota
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index 0e075e3..4db5f98 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -163,21 +163,21 @@
     */
   private def createFeyActor(performerID: String, connectionIDs: Array[String], tmpActors:HashMap[String, ActorRef]):(String, ActorRef) = {
     if(!tmpActors.contains(performerID)){
-      val performerInfo = performers_metadata.getOrElse(performerID, null)
-      if (Option(performerInfo).isDefined) {
+      val performerInfo = performers_metadata.get(performerID)
+      if (performerInfo.isDefined) {
         val connections: Map[String, ActorRef] = connectionIDs.map(connID => {
           createFeyActor(connID, connectors.getOrElse(connID,Array.empty),tmpActors)
         }).toMap
 
         var actor: ActorRef = null
-        val actorProps = getPerformer(performerInfo, connections)
-        if(performerInfo.autoScale) {
+        val actorProps = getPerformer(performerInfo.get, connections)
+        if(performerInfo.get.autoScale) {
 
-          val resizer = DefaultResizer(lowerBound = performerInfo.lowerBound, upperBound = performerInfo.upperBound,
-            messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.backoffThreshold, backoffRate = 0.1)
+          val resizer = DefaultResizer(lowerBound = performerInfo.get.lowerBound, upperBound = performerInfo.get.upperBound,
+            messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.get.backoffThreshold, backoffRate = 0.1)
 
           val strategy =
-            if(performerInfo.isRoundRobin) {
+            if(performerInfo.get.isRoundRobin) {
                log.info(s"Using Round Robin for performer ${performerID}")
                RoundRobinPool(1, Some(resizer))
              } else {
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
index 970a99f..d5be525 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
@@ -50,7 +50,7 @@
   /**
     * Keeps reference to the cancellable
     */
-  @volatile private var scheduler: Cancellable = null
+  @volatile private var scheduler: Option[Cancellable] = None
   @volatile private var endBackoff: Long = 0
   private[fey] val monitoring_actor = FEY_MONITOR.actorRef
 
@@ -111,8 +111,8 @@
     * Stops the scheduler
     */
   private final def stopScheduler() = {
-    if (Option(scheduler).isDefined) {
-      scheduler.cancel()
+    if (scheduler.isDefined) {
+      scheduler.get.cancel()
       scheduler = null
     }
   }
@@ -129,14 +129,14 @@
     * The time interval to be used is the one passed to the constructor
     */
   private final def startScheduler() = {
-    if(Option(scheduler).isEmpty && schedulerTimeInterval.toNanos != 0){
-      scheduler = context.system.scheduler.schedule(1.seconds, schedulerTimeInterval){
+    if (scheduler.isEmpty && schedulerTimeInterval.toNanos != 0) {
+      scheduler = Option(context.system.scheduler.schedule(1.seconds, schedulerTimeInterval) {
         try{
           execute()
         }catch{
           case e: Exception => self ! EXCEPTION(e)
         }
-      }
+      })
     }
   }
 
@@ -146,7 +146,7 @@
     * @return true if scheduller is running
     */
   final def isShedulerRunning():Boolean = {
-    if(Option(scheduler).isDefined && !scheduler.isCancelled){
+    if(scheduler.isDefined && !scheduler.get.isCancelled){
       true
     }else{
       false
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala b/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
index 4e0c3a4..98fa3e8 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
@@ -57,7 +57,7 @@
       eventsStore.append(sender().path.toString,MonitorEvent(EVENTS.STOP, timestamp, info))
 
     case RESTART(reason, timestamp) =>
-      logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", reason)
+      logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", Option(reason))
       eventsStore.append(sender().path.toString,MonitorEvent(EVENTS.RESTART, timestamp, reason.getMessage))
 
     case TERMINATE(actorPath, timestamp, info) =>
@@ -75,7 +75,7 @@
       Monitor.simpleEvents.put(sender().path.toString, ('O',timestamp))
 
     case RESTART(reason, timestamp) =>
-      logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", reason)
+      logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", Option(reason))
       Monitor.simpleEvents.put(sender().path.toString, ('R',timestamp))
 
     case TERMINATE(actorPath, timestamp, info) =>
@@ -87,9 +87,9 @@
     case _ =>
   }
 
-  def logInfo(path:String, event:String, timestamp: Long, info:String, reason:Throwable = null): Unit = {
-    if(Option(reason).isDefined){
-      log.error(reason, s"$event | $timestamp | $path | $info")
+  def logInfo(path: String, event: String, timestamp: Long, info: String, reason: Option[Throwable] = null): Unit = {
+    if (reason.isDefined) {
+      log.error(reason.get, s"$event | $timestamp | $path | $info")
     }else{
       log.info(s"$event | $timestamp | $path | $info")
     }
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala b/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
index 902e5f2..bbcb50d 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
@@ -29,25 +29,27 @@
 
 protected class Trie(systemName: String){
 
-  private val root: TrieNode = TrieNode(systemName, ArrayBuffer.empty, ArrayBuffer.empty)
+  private val DEFAULT_TRIE_NODE = TrieNode(systemName, ArrayBuffer.empty, ArrayBuffer.empty)
+
+  private val root: Option[TrieNode] = Option(DEFAULT_TRIE_NODE)
   var elements: Int = 0
 
   def append(path: String, event: Monitor.MonitorEvent = null): Unit = {
-    append(path.replaceFirst("akka://","").split("/"),root,1,event)
+    append(path.replaceFirst("akka://", "").split("/"), root, 1, event)
   }
 
-  @tailrec private def append(path: Array[String], root: TrieNode, index: Int, event: Monitor.MonitorEvent): Unit = {
-    if (Option(root).isDefined && index < path.length) {
-      var nextRoot = root.children.filter(child => child.path == path(index))
+  @tailrec private def append(path: Array[String], root: Option[TrieNode], index: Int, event: Monitor.MonitorEvent): Unit = {
+    if (root.isDefined && index < path.length) {
+      var nextRoot = root.fold(DEFAULT_TRIE_NODE)(identity).children.filter(child => child.path == path(index))
       if(nextRoot.isEmpty){
         nextRoot = ArrayBuffer(TrieNode(path(index), ArrayBuffer.empty, ArrayBuffer.empty))
-        root.children += nextRoot(0)
+        root.fold(DEFAULT_TRIE_NODE)(identity).children += nextRoot(0)
         elements += 1
       }
       if (Option(event).isDefined && index == path.length - 1) {
         nextRoot(0).events += event
       }
-      append(path, nextRoot(0), index + 1, event)
+      append(path, nextRoot.headOption, index + 1, event)
     }
   }
 
@@ -55,13 +57,13 @@
     recHasPath(root, path.replaceFirst("akka://","").split("/"),1)
   }
 
-  @tailrec private def recHasPath(root: TrieNode, path: Array[String], index: Int): Boolean = {
-    if (Option(root).isDefined && index < path.length) {
-      var nextRoot = root.children.filter(child => child.path == path(index))
+  @tailrec private def recHasPath(root: Option[TrieNode], path: Array[String], index: Int): Boolean = {
+    if (root.isDefined && index < path.length) {
+      var nextRoot = root.fold(DEFAULT_TRIE_NODE)(identity).children.filter(child => child.path == path(index))
       if(nextRoot.isEmpty){
         false
       }else{
-        recHasPath(nextRoot(0), path, index + 1)
+        recHasPath(nextRoot.headOption, path, index + 1)
       }
     }else{
       true
@@ -72,16 +74,16 @@
     recGetNode(root, path.replaceFirst("akka://","").split("/"),1)
   }
 
-  @tailrec private def recGetNode(root: TrieNode, path: Array[String], index: Int): Option[TrieNode]= {
-    if (Option(root).isDefined && index < path.length) {
-      var nextRoot = root.children.filter(child => child.path == path(index))
+  @tailrec private def recGetNode(root: Option[TrieNode], path: Array[String], index: Int): Option[TrieNode] = {
+    if (root.isDefined && index < path.length) {
+      var nextRoot = root.fold(DEFAULT_TRIE_NODE)(identity).children.filter(child => child.path == path(index))
       if(nextRoot.isEmpty){
         None
       }else{
         if(path.length - 1 == index){
             Some(nextRoot(0))
         }else {
-          recGetNode(nextRoot(0), path, index + 1)
+          recGetNode(nextRoot.headOption, path, index + 1)
         }
       }
     }else{
@@ -91,8 +93,8 @@
 
   def removeAllNodes(): Unit = {
     var index = 0
-    while(index < root.children.length){
-      root.children.remove(index)
+    while (index < root.fold(DEFAULT_TRIE_NODE)(identity).children.length) {
+      root.fold(DEFAULT_TRIE_NODE)(identity).children.remove(index)
       index += 1
     }
     elements = 0
@@ -107,31 +109,31 @@
   }
 
   def getRootChildren():ArrayBuffer[TrieNode] = {
-    root.children
+    root.fold(DEFAULT_TRIE_NODE)(identity).children
   }
 
-  private def getObject(root: TrieNode, parent: TrieNode):JsObject = {
-    if (Option(root).isDefined) {
-     Json.obj("name" -> root.path,
-       "parent" -> (if (Option(parent).isDefined) parent.path else "null"),
-        "children" -> root.children.map(getObject(_, root))
+  private def getObject(root: Option[TrieNode], parent: Option[TrieNode]): JsObject = {
+    if (root.isDefined) {
+      Json.obj("name" -> root.fold(DEFAULT_TRIE_NODE)(identity).path,
+       "parent" -> (if (parent.isDefined) parent.fold(DEFAULT_TRIE_NODE)(identity).path else "null"),
+        "children" -> root.fold(DEFAULT_TRIE_NODE)(identity).children.map(a => getObject(Option(a), root))
      )
     }else{
       Json.obj()
     }
   }
 
-  private def getObjectEvent(root: TrieNode, parent: TrieNode):JsObject = {
-    if (Option(root).isDefined) {
-      Json.obj("name" -> root.path,
-        "parent" -> (if (Option(parent).isDefined) parent.path else "null"),
-        "events" -> root.events.map(event => {
+  private def getObjectEvent(root: Option[TrieNode], parent: Option[TrieNode]):JsObject = {
+    if (root.isDefined) {
+      Json.obj("name" -> root.fold(DEFAULT_TRIE_NODE)(identity).path,
+        "parent" -> (if (parent.isDefined) parent.fold(DEFAULT_TRIE_NODE)(identity).path else "null"),
+        "events" -> root.fold(DEFAULT_TRIE_NODE)(identity).events.map(event => {
           Json.obj("type" -> event.event,
           "timestamp" -> event.timestamp,
             "info" -> event.info
           )
         }),
-        "children" -> root.children.map(getObjectEvent(_, root))
+        "children" -> root.fold(DEFAULT_TRIE_NODE)(identity).children.map(a=>getObjectEvent(Option(a), root))
       )
     }else{
       Json.obj()