Merge branch 'IOTA-29' of https://github.com/shiv4nsh/incubator-iota
diff --git a/fey-core/src/main/resources/application.conf b/fey-core/src/main/resources/application.conf
index c0eaea7..2dabef0 100644
--- a/fey-core/src/main/resources/application.conf
+++ b/fey-core/src/main/resources/application.conf
@@ -13,7 +13,6 @@
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
-//
// Defines the default configuration for Fey
// User can override this configuration by creating a configuration file
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
index 34bbc29..4298a7e 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Application.scala
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.iota.fey
import akka.actor.{ActorSystem, Props}
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/CheckpointProcessor.scala b/fey-core/src/main/scala/org/apache/iota/fey/CheckpointProcessor.scala
index c66be1c..663ffdf 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/CheckpointProcessor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/CheckpointProcessor.scala
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
index a606217..4db5f98 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Ensemble.scala
@@ -163,21 +163,21 @@
*/
private def createFeyActor(performerID: String, connectionIDs: Array[String], tmpActors:HashMap[String, ActorRef]):(String, ActorRef) = {
if(!tmpActors.contains(performerID)){
- val performerInfo = performers_metadata.getOrElse(performerID, null)
- if(performerInfo != null){
+ val performerInfo = performers_metadata.get(performerID)
+ if (performerInfo.isDefined) {
val connections: Map[String, ActorRef] = connectionIDs.map(connID => {
createFeyActor(connID, connectors.getOrElse(connID,Array.empty),tmpActors)
}).toMap
- var actor:ActorRef = null
- val actorProps = getPerformer(performerInfo, connections)
- if(performerInfo.autoScale) {
+ var actor: ActorRef = null
+ val actorProps = getPerformer(performerInfo.get, connections)
+ if(performerInfo.get.autoScale) {
- val resizer = DefaultResizer(lowerBound = performerInfo.lowerBound, upperBound = performerInfo.upperBound,
- messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.backoffThreshold, backoffRate = 0.1)
+ val resizer = DefaultResizer(lowerBound = performerInfo.get.lowerBound, upperBound = performerInfo.get.upperBound,
+ messagesPerResize = CONFIG.MESSAGES_PER_RESIZE, backoffThreshold = performerInfo.get.backoffThreshold, backoffRate = 0.1)
val strategy =
- if(performerInfo.isRoundRobin) {
+ if(performerInfo.get.isRoundRobin) {
log.info(s"Using Round Robin for performer ${performerID}")
RoundRobinPool(1, Some(resizer))
} else {
@@ -204,6 +204,7 @@
/**
* Creates actor props based on JSON configuration
+ *
* @param performerInfo Performer object
* @param connections connections
* @return Props of actor based on JSON config
@@ -271,11 +272,11 @@
* @return map of connectors
*/
def extractConnections(connectors: List[JsObject]): Map[String,Array[String]] = {
- connectors.map(connector => {
+ connectors.flatMap(connector => {
connector.keys.map(key => {
(key, (connector \ key).as[List[String]].toArray)
}).toMap
- }).flatten.toMap
+ }).toMap
}
/**
@@ -296,12 +297,18 @@
if(lowerBound > upperBound){
throw new IllegalArgumentException(" Could not define performer. Autoscale param: Lower bound greater than upper bound")
}
- val threshold: Double = if (autoScale && (performer \ PERFORMER_AUTO_SCALE).as[JsObject].keys.contains(PERFORMER_BACKOFF_THRESHOLD))
- (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_BACKOFF_THRESHOLD).as[Double] else 0.3
- val roundRobin: Boolean = if (autoScale && (performer \ PERFORMER_AUTO_SCALE).as[JsObject].keys.contains(PERFORMER_ROUND_ROBIN))
- (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_ROUND_ROBIN).as[Boolean] else false
-
-
+ val threshold: Double = if (autoScale && (performer \ PERFORMER_AUTO_SCALE).as[JsObject].keys.contains(PERFORMER_BACKOFF_THRESHOLD)) {
+ (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_BACKOFF_THRESHOLD).as[Double]
+ }
+ else {
+ 0.3
+ }
+ val roundRobin: Boolean = if (autoScale && (performer \ PERFORMER_AUTO_SCALE).as[JsObject].keys.contains(PERFORMER_ROUND_ROBIN)) {
+ (performer \ PERFORMER_AUTO_SCALE \ PERFORMER_ROUND_ROBIN).as[Boolean]
+ }
+ else {
+ false
+ }
val jarName: String = (performer \ SOURCE \ SOURCE_NAME).as[String]
val classPath: String = (performer \ SOURCE \ SOURCE_CLASSPATH).as[String]
val params:Map[String,String] = getMapOfParams((performer \ SOURCE \ SOURCE_PARAMS).as[JsObject])
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
index 3018d03..9c2b61d 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyCore.scala
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -18,25 +17,23 @@
package org.apache.iota.fey
-import java.nio.file.Paths
import java.io.File
-import scala.concurrent.duration._
-import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, PoisonPill, Props, Terminated}
-import Utils._
import akka.actor.SupervisorStrategy._
-import play.api.libs.json._
-import JSON_PATH._
+import akka.actor.{Actor, ActorLogging, ActorRef, OneForOneStrategy, PoisonPill, Props, Terminated}
import akka.routing.GetRoutees
-import org.apache.iota.fey.Orchestration.{CREATE_ENSEMBLES, DELETE_ENSEMBLES, UPDATE_ENSEMBLES}
import com.eclipsesource.schema._
+import org.apache.iota.fey.JSON_PATH._
+import org.apache.iota.fey.Orchestration.{CREATE_ENSEMBLES, DELETE_ENSEMBLES, UPDATE_ENSEMBLES}
+import org.apache.iota.fey.Utils._
+import play.api.libs.json._
import scala.collection.mutable.HashMap
+import scala.concurrent.duration._
protected class FeyCore extends Actor with ActorLogging{
import FeyCore._
- import CONFIG._
val monitoring_actor = FEY_MONITOR.actorRef
@@ -371,4 +368,4 @@
}
sealed case class OrchestrationInformation(ensembleSpecJson: List[JsObject], orchestrationID: String,
- orchestrationName: String, orchestrationTimestamp: String)
\ No newline at end of file
+ orchestrationName: String, orchestrationTimestamp: String)
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
index 36f39fa..5114c95 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActor.scala
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -45,12 +44,13 @@
extends Actor with ActorLogging{
+
import FeyGenericActor._
/**
* Keeps reference to the cancellable
*/
- @volatile private var scheduler: Cancellable = null
+ @volatile private var scheduler: Option[Cancellable] = None
@volatile private var endBackoff: Long = 0
private[fey] val monitoring_actor = FEY_MONITOR.actorRef
@@ -111,9 +111,9 @@
* Stops the scheduler
*/
private final def stopScheduler() = {
- if (scheduler != null) {
- scheduler.cancel()
- scheduler = null
+ if (scheduler.isDefined) {
+ scheduler.get.cancel()
+ scheduler = None
}
}
/**
@@ -129,23 +129,24 @@
* The time interval to be used is the one passed to the constructor
*/
private final def startScheduler() = {
- if(scheduler == null && schedulerTimeInterval.toNanos != 0){
- scheduler = context.system.scheduler.schedule(1.seconds, schedulerTimeInterval){
+ if (scheduler.isEmpty && schedulerTimeInterval.toNanos != 0) {
+ scheduler = Option(context.system.scheduler.schedule(1.seconds, schedulerTimeInterval) {
try{
execute()
}catch{
case e: Exception => self ! EXCEPTION(e)
}
- }
+ })
}
}
/**
* Check state of scheduler
+ *
* @return true if scheduller is running
*/
final def isShedulerRunning():Boolean = {
- if(scheduler != null && !scheduler.isCancelled){
+ if(scheduler.isDefined && !scheduler.get.isCancelled){
true
}else{
false
@@ -154,6 +155,7 @@
/**
* get endBackoff
+ *
* @return
*/
final def getEndBackoff(): Long = {
@@ -170,6 +172,7 @@
* Called every time actors receives the PROCESS message.
* The default implementation propagates the message to the connected actors
* and fires up the backoff
+ *
* @param message message to be processed
* @tparam T Any
*/
@@ -220,12 +223,14 @@
/**
* Used to set a info message when sending Stop monitor events
+ *
* @return String info
*/
def stopMonitorInfo:String = "Stopped"
/**
* Used to set a info message when sending Start monitor events
+ *
* @return String info
*/
def startMonitorInfo:String = "Started"
@@ -263,3 +268,4 @@
+
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala
index 7361d01..e3b8001 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyGenericActorReceiver.scala
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -173,7 +172,7 @@
}catch{
case e: Exception =>
- if(outputStream != null) {
+ if(Option(outputStream).isDefined) {
outputStream.close()
(new File(s"${CONFIG.DYNAMIC_JAR_REPO}/$jarName")).delete()
}
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/FeyUIService.scala b/fey-core/src/main/scala/org/apache/iota/fey/FeyUIService.scala
index cb48192..46dd832 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/FeyUIService.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/FeyUIService.scala
@@ -38,10 +38,11 @@
class FeyUIService(urlPath: String, port: Int) extends NettyServerComponents with BuiltInComponents {
+ val THREAD_SLEEP_TIME = 2000
lazy val router = Router.from {
case GET(p"/fey/activeactors") => Action {
FEY_CORE_ACTOR.actorRef ! JSON_TREE
- Thread.sleep(2000)
+ Thread.sleep(THREAD_SLEEP_TIME)
val json = IdentifyFeyActors.generateTreeJson()
val jsonTree: String = IdentifyFeyActors.getHTMLTree(json)
Results.Ok(jsonTree).as("text/html")
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiver.scala b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiver.scala
index 6781f01..c9e1925 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiver.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiver.scala
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -146,7 +145,7 @@
}catch{
case e: Exception =>
- if(outputStream != null) {
+ if (Option(outputStream).isDefined) {
outputStream.close()
(new File(s"${CONFIG.DYNAMIC_JAR_REPO}/$jarName")).delete()
}
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
index 3fb5330..875009b 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/JsonReceiverActor.scala
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala b/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
index 2100cbd..340958f 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Monitor.scala
@@ -57,7 +57,7 @@
eventsStore.append(sender().path.toString,MonitorEvent(EVENTS.STOP, timestamp, info))
case RESTART(reason, timestamp) =>
- logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", reason)
+ logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", Option(reason))
eventsStore.append(sender().path.toString,MonitorEvent(EVENTS.RESTART, timestamp, reason.getMessage))
case TERMINATE(actorPath, timestamp, info) =>
@@ -75,7 +75,7 @@
Monitor.simpleEvents.put(sender().path.toString, ('O',timestamp))
case RESTART(reason, timestamp) =>
- logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", reason)
+ logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", Option(reason))
Monitor.simpleEvents.put(sender().path.toString, ('R',timestamp))
case TERMINATE(actorPath, timestamp, info) =>
@@ -87,9 +87,9 @@
case _ =>
}
- def logInfo(path:String, event:String, timestamp: Long, info:String, reason:Throwable = null) = {
- if(reason != null){
- log.error(reason, s"$event | $timestamp | $path | $info")
+ def logInfo(path: String, event: String, timestamp: Long, info: String, reason: Option[Throwable] = None): Unit = {
+ if (reason.isDefined) {
+ log.error(reason.get, s"$event | $timestamp | $path | $info")
}else{
log.info(s"$event | $timestamp | $path | $info")
}
@@ -123,13 +123,13 @@
}
def mapEventsToRows(actors: ArrayBuffer[TrieNode], prefix:String): ArrayBuffer[String] = {
- actors.map(actor => {
+ actors.flatMap(actor => {
val currentPath = if (prefix == "/user/FEY-CORE") actor.path else s"$prefix/${actor.path}"
val events = actor.events.map(event => {
getTableLine(currentPath, event.timestamp, event.event, event.info)
})
mapEventsToRows(actor.children, currentPath) ++ events
- }).flatten
+ })
}
def getSimpleHTMLEvents: String = {
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
index d247279..1e47a80 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Orchestration.scala
@@ -75,7 +75,7 @@
}
}
- override def postStop() = {
+ override def postStop(): Unit = {
monitoring_actor ! Monitor.STOP(Utils.getTimestamp)
log.info(s"STOPPED ${self.path.name}")
}
@@ -250,4 +250,4 @@
*/
val orchestration_metadata: HashMap[String, Map[String,JsObject]] = HashMap.empty[String, Map[String,JsObject]]
val orchestration_name: HashMap[String, String] = HashMap.empty
-}
\ No newline at end of file
+}
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala b/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
index 4de3da7..0e2a087 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/TrieNode.scala
@@ -17,7 +17,7 @@
package org.apache.iota.fey
-import play.api.libs.json.{JsArray, JsObject, JsValue, Json}
+import play.api.libs.json.{JsObject, JsValue, Json}
import scala.annotation.tailrec
import scala.collection.mutable.ArrayBuffer
@@ -29,25 +29,27 @@
protected class Trie(systemName: String){
- private val root: TrieNode = TrieNode(systemName, ArrayBuffer.empty, ArrayBuffer.empty)
+ private val DEFAULT_TRIE_NODE = TrieNode(systemName, ArrayBuffer.empty, ArrayBuffer.empty)
+
+ private val root: Option[TrieNode] = Option(DEFAULT_TRIE_NODE)
var elements: Int = 0
def append(path: String, event: Monitor.MonitorEvent = null): Unit = {
- append(path.replaceFirst("akka://","").split("/"),root,1,event)
+ append(path.replaceFirst("akka://", "").split("/"), root, 1, event)
}
- @tailrec private def append(path: Array[String], root: TrieNode, index: Int, event: Monitor.MonitorEvent): Unit = {
- if(root != null && index < path.length){
- var nextRoot = root.children.filter(child => child.path == path(index))
+ @tailrec private def append(path: Array[String], root: Option[TrieNode], index: Int, event: Monitor.MonitorEvent): Unit = {
+ if (root.isDefined && index < path.length) {
+ var nextRoot = root.fold(DEFAULT_TRIE_NODE)(identity).children.filter(child => child.path == path(index))
if(nextRoot.isEmpty){
nextRoot = ArrayBuffer(TrieNode(path(index), ArrayBuffer.empty, ArrayBuffer.empty))
- root.children += nextRoot(0)
+ root.fold(DEFAULT_TRIE_NODE)(identity).children += nextRoot(0)
elements += 1
}
- if(event != null && index == path.length - 1){
+ if (Option(event).isDefined && index == path.length - 1) {
nextRoot(0).events += event
}
- append(path, nextRoot(0),index+1, event)
+ append(path, nextRoot.headOption, index + 1, event)
}
}
@@ -55,13 +57,13 @@
recHasPath(root, path.replaceFirst("akka://","").split("/"),1)
}
- @tailrec private def recHasPath(root: TrieNode, path: Array[String], index: Int): Boolean = {
- if(root != null && index < path.length) {
- var nextRoot = root.children.filter(child => child.path == path(index))
+ @tailrec private def recHasPath(root: Option[TrieNode], path: Array[String], index: Int): Boolean = {
+ if (root.isDefined && index < path.length) {
+ var nextRoot = root.fold(DEFAULT_TRIE_NODE)(identity).children.filter(child => child.path == path(index))
if(nextRoot.isEmpty){
false
}else{
- recHasPath(nextRoot(0), path, index + 1)
+ recHasPath(nextRoot.headOption, path, index + 1)
}
}else{
true
@@ -72,16 +74,16 @@
recGetNode(root, path.replaceFirst("akka://","").split("/"),1)
}
- @tailrec private def recGetNode(root: TrieNode, path: Array[String], index: Int): Option[TrieNode]= {
- if(root != null && index < path.length) {
- var nextRoot = root.children.filter(child => child.path == path(index))
+ @tailrec private def recGetNode(root: Option[TrieNode], path: Array[String], index: Int): Option[TrieNode] = {
+ if (root.isDefined && index < path.length) {
+ var nextRoot = root.fold(DEFAULT_TRIE_NODE)(identity).children.filter(child => child.path == path(index))
if(nextRoot.isEmpty){
None
}else{
if(path.length - 1 == index){
Some(nextRoot(0))
}else {
- recGetNode(nextRoot(0), path, index + 1)
+ recGetNode(nextRoot.headOption, path, index + 1)
}
}
}else{
@@ -91,47 +93,47 @@
def removeAllNodes(): Unit = {
var index = 0
- while(index < root.children.length){
- root.children.remove(index)
+ while (index < root.fold(DEFAULT_TRIE_NODE)(identity).children.length) {
+ root.fold(DEFAULT_TRIE_NODE)(identity).children.remove(index)
index += 1
}
elements = 0
}
def print:JsValue = {
- getObject(root, null)
+ getObject(root, None)
}
def printWithEvents:JsValue = {
- getObjectEvent(root, null)
+ getObjectEvent(root, None)
}
def getRootChildren():ArrayBuffer[TrieNode] = {
- root.children
+ root.fold(DEFAULT_TRIE_NODE)(identity).children
}
- private def getObject(root: TrieNode, parent: TrieNode):JsObject = {
- if(root != null) {
- Json.obj("name" -> root.path,
- "parent" -> (if(parent != null) parent.path else "null"),
- "children" -> root.children.map(getObject(_, root))
+ private def getObject(root: Option[TrieNode], parent: Option[TrieNode]): JsObject = {
+ if (root.isDefined) {
+ Json.obj("name" -> root.fold(DEFAULT_TRIE_NODE)(identity).path,
+ "parent" -> (if (parent.isDefined) parent.fold(DEFAULT_TRIE_NODE)(identity).path else "null"),
+ "children" -> root.fold(DEFAULT_TRIE_NODE)(identity).children.map(a => getObject(Option(a), root))
)
}else{
Json.obj()
}
}
- private def getObjectEvent(root: TrieNode, parent: TrieNode):JsObject = {
- if(root != null) {
- Json.obj("name" -> root.path,
- "parent" -> (if(parent != null) parent.path else "null"),
- "events" -> root.events.map(event => {
+ private def getObjectEvent(root: Option[TrieNode], parent: Option[TrieNode]):JsObject = {
+ if (root.isDefined) {
+ Json.obj("name" -> root.fold(DEFAULT_TRIE_NODE)(identity).path,
+ "parent" -> (if (parent.isDefined) parent.fold(DEFAULT_TRIE_NODE)(identity).path else "null"),
+ "events" -> root.fold(DEFAULT_TRIE_NODE)(identity).events.map(event => {
Json.obj("type" -> event.event,
"timestamp" -> event.timestamp,
"info" -> event.info
)
}),
- "children" -> root.children.map(getObjectEvent(_, root))
+ "children" -> root.fold(DEFAULT_TRIE_NODE)(identity).children.map(a=>getObjectEvent(Option(a), root))
)
}else{
Json.obj()
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
index ff4e574..fda1a30 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/Utils.scala
@@ -211,6 +211,9 @@
val CONSOLE_APPENDER = "FEY-CONSOLE"
val CONTROL_AWARE_MAILBOX = "akka.fey-dispatchers.control-aware-dispatcher"
+ val DEFAULT_MESSAGE = 500
+ val DEFAULT_PORT = 8080
+
var CHECKPOINT_DIR = ""
var JSON_REPOSITORY = ""
var JSON_EXTENSION = ""
@@ -218,13 +221,13 @@
var CHEKPOINT_ENABLED = true
var LOG_LEVEL = ""
var LOG_APPENDER = ""
- var MESSAGES_PER_RESIZE:Int = 500
+ var MESSAGES_PER_RESIZE: Int = DEFAULT_MESSAGE
var DYNAMIC_JAR_REPO = ""
var DYNAMIC_JAR_FORCE_PULL = false
var CUSTOM_DISPATCHERS: ConfigValue = null
var MONITORING_ENABLED: Boolean = true
var MONITORING_TYPE: String = "COMPLETE"
- var PORT = 8080
+ var PORT = DEFAULT_PORT
var URL_PATH = "localhost"
def loadUserConfiguration(path: String) : Unit = {
@@ -325,4 +328,4 @@
case class IllegalPerformerCreation(message:String) extends Exception(message)
case class NetworkNotDefined(message:String) extends Exception(message)
case class CommandNotRecognized(message:String) extends Exception(message)
-case class RestartEnsemble(message:String) extends Exception(message)
\ No newline at end of file
+case class RestartEnsemble(message:String) extends Exception(message)
diff --git a/fey-core/src/main/scala/org/apache/iota/fey/WatchServiceReceiver.scala b/fey-core/src/main/scala/org/apache/iota/fey/WatchServiceReceiver.scala
index a07a8d2..df78c51 100644
--- a/fey-core/src/main/scala/org/apache/iota/fey/WatchServiceReceiver.scala
+++ b/fey-core/src/main/scala/org/apache/iota/fey/WatchServiceReceiver.scala
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
diff --git a/fey-core/src/test/scala/org/apache/iota/fey/MonitorSpec.scala b/fey-core/src/test/scala/org/apache/iota/fey/MonitorSpec.scala
index 3008518..43499ef 100644
--- a/fey-core/src/test/scala/org/apache/iota/fey/MonitorSpec.scala
+++ b/fey-core/src/test/scala/org/apache/iota/fey/MonitorSpec.scala
@@ -122,7 +122,7 @@
}
"result in logging error when throwable is specified" in {
EventFilter[IllegalArgumentException](occurrences = 1) intercept {
- monitorRef.underlyingActor.logInfo("path", "STOP", 1, "info", new IllegalArgumentException("Test"))
+ monitorRef.underlyingActor.logInfo("path", "STOP", 1, "info", Option(new IllegalArgumentException("Test")))
}
}
}
diff --git a/performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala b/performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala
index ce5164d..3ba4f1a 100644
--- a/performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala
+++ b/performers/stream/src/main/scala/org/apache/iota/fey/performer/Application.scala
@@ -24,16 +24,16 @@
object Application extends App {
-// println("Starting")
-//
-// implicit val system = ActorSystem("STREAM-RUN")
-//
-// val timestamp = system.actorOf(Props(classOf[Timestamp], Map.empty, 1.minutes, Map.empty, 1.seconds, "", "", false), name = "TIMESTAMP")
-//
-// timestamp ! PROCESS("Stream it")
-//
-// val heartbeat = system.actorOf(Props(classOf[Heartbeat], Map.empty, 1.minutes, Map.empty, 1.seconds, "", "", false), name = "HEARTBEAT")
-//
-// heartbeat ! PROCESS("Stream it")
+ /* print("Starting")
+ implicit val system = ActorSystem("STREAM-RUN")
+
+ val timestamp = system.actorOf(Props(classOf[Timestamp], Map.empty, 1.minutes, Map.empty, 1.seconds, "", "", false), name = "TIMESTAMP")
+
+ timestamp ! PROCESS("Stream it")
+
+ val heartbeat = system.actorOf(Props(classOf[Heartbeat], Map.empty, 1.minutes, Map.empty, 1.seconds, "", "", false), name = "HEARTBEAT")
+
+ heartbeat ! PROCESS("Stream it")
+*/
}
diff --git a/performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala b/performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala
index cf2e87d..9409c63 100644
--- a/performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala
+++ b/performers/stream/src/main/scala/org/apache/iota/fey/performer/Heartbeat.scala
@@ -1,18 +1,19 @@
-
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// ee the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.iota.fey.performer
@@ -52,4 +53,4 @@
propagateMessage("alive")
}
-}
\ No newline at end of file
+}
diff --git a/performers/stream/src/main/scala/org/apache/iota/fey/performer/RandomDouble.scala b/performers/stream/src/main/scala/org/apache/iota/fey/performer/RandomDouble.scala
index 5f04791..e73c80c 100644
--- a/performers/stream/src/main/scala/org/apache/iota/fey/performer/RandomDouble.scala
+++ b/performers/stream/src/main/scala/org/apache/iota/fey/performer/RandomDouble.scala
@@ -1,18 +1,19 @@
-
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// ee the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.iota.fey.performer
@@ -53,4 +54,4 @@
propagateMessage(rd)
}
-}
\ No newline at end of file
+}
diff --git a/performers/stream/src/main/scala/org/apache/iota/fey/performer/RandomInteger.scala b/performers/stream/src/main/scala/org/apache/iota/fey/performer/RandomInteger.scala
index a694111..c104f87 100644
--- a/performers/stream/src/main/scala/org/apache/iota/fey/performer/RandomInteger.scala
+++ b/performers/stream/src/main/scala/org/apache/iota/fey/performer/RandomInteger.scala
@@ -1,18 +1,19 @@
-
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// ee the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.iota.fey.performer
@@ -53,4 +54,4 @@
propagateMessage(ri)
}
-}
\ No newline at end of file
+}
diff --git a/performers/stream/src/main/scala/org/apache/iota/fey/performer/RandomUUID.scala b/performers/stream/src/main/scala/org/apache/iota/fey/performer/RandomUUID.scala
index be3f3cf..c0e3493 100644
--- a/performers/stream/src/main/scala/org/apache/iota/fey/performer/RandomUUID.scala
+++ b/performers/stream/src/main/scala/org/apache/iota/fey/performer/RandomUUID.scala
@@ -1,18 +1,19 @@
-
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// ee the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.iota.fey.performer
@@ -53,4 +54,4 @@
propagateMessage(uuid)
}
-}
\ No newline at end of file
+}
diff --git a/performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala b/performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala
index c138118..b0dfcb9 100644
--- a/performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala
+++ b/performers/stream/src/main/scala/org/apache/iota/fey/performer/Timestamp.scala
@@ -1,18 +1,19 @@
-
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// ee the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.iota.fey.performer
@@ -53,4 +54,4 @@
propagateMessage(ts)
}
-}
\ No newline at end of file
+}
diff --git a/performers/virtual_sensor/src/main/scala/org/apache/iota/fey/performer/Sensor.scala b/performers/virtual_sensor/src/main/scala/org/apache/iota/fey/performer/Sensor.scala
index c80669c..e87cb2f 100644
--- a/performers/virtual_sensor/src/main/scala/org/apache/iota/fey/performer/Sensor.scala
+++ b/performers/virtual_sensor/src/main/scala/org/apache/iota/fey/performer/Sensor.scala
@@ -1,29 +1,31 @@
-
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// ee the License for the specific language governing permissions and
-// limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.iota.fey.performer
import akka.actor.ActorRef
+import org.apache.commons.math3.distribution.NormalDistribution
import org.apache.iota.fey.FeyGenericActor
+import org.joda.time.DateTime
+import play.api.libs.json.{JsObject, JsValue, Json}
import scala.collection.immutable.Map
import scala.concurrent.duration._
-import org.apache.commons.math3.distribution.NormalDistribution
-import org.joda.time.DateTime
-import play.api.libs.json.{JsObject, JsValue, Json}
+import scala.util.Try
class Sensor(override val params: Map[String, String] = Map.empty,
@@ -38,8 +40,8 @@
var name = "sensor"
var expected_value: Double = 70.0
var sigma: Double = 1.0
- var exceptions: JsValue = null
- var output: NormalDistribution = null
+ var exceptions: Option[JsValue] = None
+ var output: Option[NormalDistribution] = None
var sensor_type = "environmental"
// wav vibration
var vib = ""
@@ -87,13 +89,13 @@
val ts = java.lang.System.currentTimeMillis().toString
var out = ""
if (!isException) {
- output = new NormalDistribution(expected_value, sigma)
+ output = Try(new NormalDistribution(expected_value, sigma)).toOption
sound = normal_sound
vib = normal_vib
}
sensor_type match {
- case "environmental" => out = s"""$lrn|$ts|{"x":${output.sample()}}"""
+ case "environmental" => out = s"""$lrn|$ts|{"x":${output.fold(0.0D)(x => x.sample())}}"""
case "vibration" => out = s"""$lrn|$ts|{"blob":$vib}"""
case "wav" => out = s"""$lrn|$ts|{"wav":$sound}"""
}
@@ -105,7 +107,7 @@
var efile = ""
var ev = 0.0
val date = DateTime.now
- val all_exceptions = exceptions.as[List[JsObject]]
+ val all_exceptions = exceptions.fold(List[JsObject]())(value => value.as[List[JsObject]])
all_exceptions.foreach(x => {
try {
val st = (x \ "start_time").as[Array[Int]]
@@ -120,7 +122,7 @@
sound = exception_sound
} else {
ev = (x \ "expected_value").as[Double]
- output = new NormalDistribution(ev, sigma)
+ output = Option(new NormalDistribution(ev, sigma))
}
true
}
@@ -152,7 +154,7 @@
if (params.contains("exceptions")) {
val p = params("exceptions")
try {
- exceptions = Json.parse(p)
+ exceptions = Option(Json.parse(p))
} catch {
case e: Exception => log.error(s"Invalid JSON defining exception $p")
}
@@ -167,4 +169,4 @@
}
}
-}
\ No newline at end of file
+}
diff --git a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala
index d99a826..c745bbb 100644
--- a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala
+++ b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/Application.scala
@@ -22,7 +22,7 @@
object Application extends App {
- //println("Starting")
+ //print("Starting")
//implicit val system = ActorSystem("ZMQ-RUN")
diff --git a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
index d7dbf90..856f91c 100644
--- a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
+++ b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQPublisher.scala
@@ -32,24 +32,27 @@
override val autoScale: Boolean = false) extends FeyGenericActor {
//-------default params----------
- var port: Int = 5559
+ val DEFAULT_PORT = 5559
+ var port: Int = DEFAULT_PORT
var target: String = "localhost"
+ val DEFAULT_LINGER = 200
+ val DEFAULT_HMW = 10
//-------class vars-------------------
var ctx: ZMQ.Context = null
var pub: ZMQ.Socket = null
var count: Int = 0
- override def onStart = {
+ override def onStart: Unit = {
log.info("Starting ZMQ Publisher")
try {
- _params_check()
+ checkParams()
ctx = ZMQ.context(1)
pub = ctx.socket(ZMQ.PUB)
- pub.setLinger(200)
- pub.setHWM(10)
+ pub.setLinger(DEFAULT_LINGER)
+ pub.setHWM(DEFAULT_HMW)
pub.connect("tcp://" + target + ":" + port)
}
catch {
@@ -57,23 +60,23 @@
}
}
- override def onStop = {
+ override def onStop: Unit = {
pub.disconnect("tcp://" + target + ":" + port)
}
- override def onRestart(reason: Throwable) = {
+ override def onRestart(reason: Throwable): Unit = {
// Called after actor is up and running - after self restart
try {
- if (pub != null) {
+ if (Option(pub).isDefined) {
pub.close()
}
- if (ctx != null) {
+ if (Option(ctx).isDefined) {
ctx.close()
}
ctx = ZMQ.context(1)
pub = ctx.socket(ZMQ.PUB)
- pub.setLinger(200)
- pub.setHWM(10)
+ pub.setLinger(DEFAULT_LINGER)
+ pub.setHWM(DEFAULT_HMW)
pub.connect("tcp://" + target + ":" + port)
}
catch {
@@ -87,7 +90,7 @@
case x => log.debug(s"Untreated $x")
}
- override def execute() = {
+ override def execute(): Unit = {
log.debug(s"Msg count: $count")
}
@@ -96,7 +99,7 @@
message match {
case message: String =>
// Assuming each String message has only point data
- _zmq_send(s"$message")
+ sendZMQ(s"$message")
// case message: Map[String, (String,String,String,String)] =>
// val formatted_msgs: Array[String] = message.map(point => _format_messages(point._2)).toArray
@@ -106,20 +109,20 @@
}
}
- def _format_messages(fields: (String, String, String, String)): String = {
+ def formatMessages(fields: (String, String, String, String)): String = {
// The tuple has the following elements: lrn, timestamp, value, type
// And we have to create a message with the format:
// DATA|cloud|lrn|timestamp|{"<type>" : <value>}
- "DATA|cloud|" + fields._1 + "|" + fields._2 + "|" + s"""{"${fields._3}":"${fields._4}"}"""
+ s"""DATA|cloud| ${fields._1}|${fields._2}|{${fields._3}:${fields._4}}"""
}
- def _zmq_send(Message: String) = {
- log.debug(s"messsage =$Message")
+ def sendZMQ(Message: String): Unit = {
+ log.debug(s"message =$Message")
pub.send(Message)
count += 1
}
- def _params_check() = {
+ def checkParams(): Unit = {
if (params.contains("zmq_port")) {
port = params("zmq_port").toInt
}
@@ -132,3 +135,4 @@
+
diff --git a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala
index c7d0381..299c59e 100644
--- a/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala
+++ b/performers/zmq/src/main/scala/org/apache/iota/fey/performer/ZMQSubscriber.scala
@@ -31,7 +31,8 @@
override val autoScale: Boolean = false) extends FeyGenericActor {
//-------default params----------
- var port: Int = 5563
+ val DEFAULT_PORT = 5563
+ var port: Int = DEFAULT_PORT
var target: String = "localhost"
val topic_filter: String = "DATA"
@@ -40,16 +41,14 @@
var pub: ZMQ.Socket = null
var count: Int = 0
- override def onStart = {
+ override def onStart: Unit = {
log.info("Starting ZMQ Subscriber")
try {
-
- _params_check()
+ checkParams()
// Prepare our context and subscriber
ctx = ZMQ.context(1)
val subscriber = ctx.socket(ZMQ.SUB)
-
subscriber.bind(s"tcp://$target:$port")
subscriber.subscribe(topic_filter.getBytes())
while (true) {
@@ -66,7 +65,7 @@
}
}
- override def onStop = {
+ override def onStop: Unit = {
pub.disconnect("tcp://" + target + ":" + port)
pub.close()
ctx.close()
@@ -74,13 +73,13 @@
ctx = null
}
- override def onRestart(reason: Throwable) = {
+ override def onRestart(reason: Throwable): Unit = {
// Called after actor is up and running - after self restart
try {
- if (pub != null) {
+ if (Option(pub).isDefined) {
pub.close()
}
- if (ctx != null) {
+ if (Option(ctx).isDefined) {
ctx.close()
}
}
@@ -95,9 +94,7 @@
case x => log.debug(s"Untreated $x")
}
- override def execute() = {
- log.debug(s"Msg count: $count")
- }
+ override def execute(): Unit = log.debug(s"Msg count: $count")
override def processMessage[T](message: T, sender: ActorRef): Unit = {
message match {
@@ -105,14 +102,14 @@
}
}
- def _format_messages(fields: (String, String, String, String)): String = {
+ def formatMessages(fields: (String, String, String, String)): String = {
// The tuple has the following elements: lrn, timestamp, value, type
// And we have to create a message with the format:
// DATA|cloud|lrn|timestamp|{"<type>" : <value>}
- "DATA|cloud|" + fields._1 + "|" + fields._2 + "|" + s"""{"${fields._3}":"${fields._4}"}"""
+ s"""DATA|cloud|${fields._1}|${fields._2}|{${fields._3}:${fields._4}}"""
}
- def _params_check() = {
+ def checkParams(): Unit = {
if (params.contains("zmq_port")) {
port = params("zmq_port").toInt
}
@@ -121,4 +118,4 @@
}
}
-}
\ No newline at end of file
+}
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 7e3596f..545e432 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -1,117 +1,136 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ -->
<scalastyle>
- <name>Scalastyle standard configuration</name>
- <check level="warning" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.file.FileLengthChecker" enabled="true">
- <parameters>
- <parameter name="maxFileLength"><![CDATA[800]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
- <parameters>
- <parameter name="header"><![CDATA[// Copyright (C) 2011-2012 the original author or authors.
-// See the LICENCE.txt file distributed with this work for additional
-// information regarding copyright ownership.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
- <parameters>
- <parameter name="maxLineLength"><![CDATA[160]]></parameter>
- <parameter name="tabSize"><![CDATA[4]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
- <parameters>
- <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
- <parameters>
- <parameter name="maxParameters"><![CDATA[8]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true">
- <parameters>
- <parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.file.RegexChecker" enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[println]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true">
- <parameters>
- <parameter name="maxTypes"><![CDATA[30]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true">
- <parameters>
- <parameter name="maximum"><![CDATA[10]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
- <parameters>
- <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
- <parameter name="doubleLineAllowed"><![CDATA[false]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
- <parameters>
- <parameter name="maxLength"><![CDATA[50]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true">
- <parameters>
- <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true">
- <parameters>
- <parameter name="maxMethods"><![CDATA[30]]></parameter>
- </parameters>
- </check>
- <check level="warning" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
- <check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
+ <name>Scalastyle standard configuration</name>
+ <check level="warning" class="org.scalastyle.file.FileTabChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.FileLengthChecker" enabled="true">
+ <parameters>
+ <parameter name="maxFileLength"><![CDATA[800]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.file.HeaderMatchesChecker" enabled="true">
+ <parameters>
+ <parameter name="header"><![CDATA[/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.SpacesAfterPlusChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.WhitespaceEndOfLineChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.SpacesBeforePlusChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.FileLineLengthChecker" enabled="true">
+ <parameters>
+ <parameter name="maxLineLength"><![CDATA[160]]></parameter>
+ <parameter name="tabSize"><![CDATA[4]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.ClassNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.ObjectNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[[A-Z][A-Za-z]*]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.PackageObjectNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[^[a-z][A-Za-z]*$]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true">
+ <parameters>
+ <parameter name="illegalImports"><![CDATA[sun._,java.awt._]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
+ <parameters>
+ <parameter name="maxParameters"><![CDATA[8]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.MagicNumberChecker" enabled="true">
+ <parameters>
+ <parameter name="ignore"><![CDATA[-1,0,1,2,3]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceBeforeLeftBracketChecker"
+ enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NoWhitespaceAfterLeftBracketChecker"
+ enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.ReturnChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NullChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NoCloneChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.NoFinalizeChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.CovariantEqualsChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.StructuralTypeChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.RegexChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[println]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.NumberOfTypesChecker" enabled="true">
+ <parameters>
+ <parameter name="maxTypes"><![CDATA[30]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.CyclomaticComplexityChecker" enabled="true">
+ <parameters>
+ <parameter name="maximum"><![CDATA[10]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.UppercaseLChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.SimplifyBooleanExpressionChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.scalariform.IfBraceChecker" enabled="true">
+ <parameters>
+ <parameter name="singleLineAllowed"><![CDATA[true]]></parameter>
+ <parameter name="doubleLineAllowed"><![CDATA[false]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.MethodLengthChecker" enabled="true">
+ <parameters>
+ <parameter name="maxLength"><![CDATA[50]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true">
+ <parameters>
+ <parameter name="regex"><![CDATA[^[a-z][A-Za-z0-9]*$]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.NumberOfMethodsInTypeChecker" enabled="true">
+ <parameters>
+ <parameter name="maxMethods"><![CDATA[30]]></parameter>
+ </parameters>
+ </check>
+ <check level="warning" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
+ <check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
</scalastyle>
\ No newline at end of file