blob: 2100cbd19e960f0902d887c195977c687800f8bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.iota.fey
import akka.actor.{Actor, ActorLogging}
import akka.event.{DiagnosticLoggingAdapter, Logging}
import scala.collection.mutable.ArrayBuffer
/**
* Created by barbaragomes on 7/8/16.
*/
protected class Monitor(eventsStore: Trie) extends Actor with ActorLogging {
import Monitor._
//val log: DiagnosticLoggingAdapter = Logging(this)
//log.mdc(Map("fileName" -> "monitor_events"))
override def preStart(): Unit = {
if(CONFIG.MONITORING_ENABLED) {
if (CONFIG.MONITORING_TYPE == "SIMPLE") {
context.become(simple)
} else {
context.become(complete)
}
}
}
override def postStop(): Unit = {
//log.clearMDC()
}
private def complete: Receive = {
case START(timestamp, info) =>
logInfo(sender().path.toString, EVENTS.START, timestamp, info)
eventsStore.append(sender().path.toString,MonitorEvent(EVENTS.START, timestamp, info))
case STOP(timestamp, info) =>
logInfo(sender().path.toString, EVENTS.STOP, timestamp, info)
eventsStore.append(sender().path.toString,MonitorEvent(EVENTS.STOP, timestamp, info))
case RESTART(reason, timestamp) =>
logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", reason)
eventsStore.append(sender().path.toString,MonitorEvent(EVENTS.RESTART, timestamp, reason.getMessage))
case TERMINATE(actorPath, timestamp, info) =>
logInfo(actorPath, EVENTS.TERMINATE, timestamp, info)
eventsStore.append(actorPath,MonitorEvent(EVENTS.TERMINATE, timestamp, info))
}
private def simple: Receive = {
case START(timestamp, info) =>
logInfo(sender().path.toString, EVENTS.START, timestamp, info)
Monitor.simpleEvents.put(sender().path.toString, ('S',timestamp))
case STOP(timestamp, info) =>
logInfo(sender().path.toString, EVENTS.STOP, timestamp, info)
Monitor.simpleEvents.put(sender().path.toString, ('O',timestamp))
case RESTART(reason, timestamp) =>
logInfo(sender().path.toString, EVENTS.RESTART, timestamp, "", reason)
Monitor.simpleEvents.put(sender().path.toString, ('R',timestamp))
case TERMINATE(actorPath, timestamp, info) =>
logInfo(actorPath, EVENTS.TERMINATE, timestamp, info)
Monitor.simpleEvents.put(actorPath, ('T',timestamp))
}
override def receive: Receive = {
case _ =>
}
def logInfo(path:String, event:String, timestamp: Long, info:String, reason:Throwable = null) = {
if(reason != null){
log.error(reason, s"$event | $timestamp | $path | $info")
}else{
log.info(s"$event | $timestamp | $path | $info")
}
}
}
protected object Monitor{
// Monitoring Messages
case class START(timestamp: Long, info: String = "")
case class STOP(timestamp: Long, info: String = "")
case class RESTART(reason: Throwable, timestamp: Long)
case class TERMINATE(actorPath: String, timestamp: Long, info: String = "")
// Stores Monitoring event
case class MonitorEvent(event: String, timestamp: Long, info: String)
/**
* Contains the lifecycle events for actors in Fey
*/
val events: Trie = new Trie("FEY-MANAGEMENT-SYSTEM")
val simpleEvents:scala.collection.mutable.HashMap[String,(Char, Long)] = scala.collection.mutable.HashMap.empty
//Static HTML content from d3
val html = scala.io.Source.fromInputStream(getClass.getResourceAsStream("/eventsTable.html"), "UTF-8")
.getLines()
.mkString("\n")
def getHTMLevents: String = {
html.replace("$EVENTS_TABLE_CONTENT", mapEventsToRows(events.getRootChildren(), "").mkString("\n"))
}
def mapEventsToRows(actors: ArrayBuffer[TrieNode], prefix:String): ArrayBuffer[String] = {
actors.map(actor => {
val currentPath = if (prefix == "/user/FEY-CORE") actor.path else s"$prefix/${actor.path}"
val events = actor.events.map(event => {
getTableLine(currentPath, event.timestamp, event.event, event.info)
})
mapEventsToRows(actor.children, currentPath) ++ events
}).flatten
}
def getSimpleHTMLEvents: String = {
val content = simpleEvents.map(event => {
event._2._1 match {
case 'S' => getTableLine(event._1, event._2._2, "START", "")
case 'O' => getTableLine(event._1, event._2._2, "STOP", "")
case 'R' => getTableLine(event._1, event._2._2, "RESTART", "")
case 'T' => getTableLine(event._1, event._2._2, "TERMINATE", "")
}
}).mkString("\n")
html.replace("$EVENTS_TABLE_CONTENT", content)
}
private def getTableLine(path: String,timestamp: Long, event: String, info: String):String = {
s"<tr><td>$path</td><td>$event</td><td>$info</td><td>$timestamp</td></tr>"
}
}
/**
* Events Name
*/
object EVENTS {
val START = "START"
val STOP = "STOP"
val TERMINATE = "TERMINATE"
val RESTART = "RESTART"
}