blob: e529749d2b3e0a72405bb46218b9893c6d48ca17 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicemix.examples.akka
import akka.actor.{ActorRef, Actor, Props, ActorSystem}
import akka.pattern._
import akka.util.Timeout
import akka.util.duration._
import collection.mutable.Map
import collection.SortedSet
import org.apache.servicemix.examples.akka.Stats.Metric
import java.util.concurrent.atomic.AtomicBoolean
/**
* A statistics engine that generates basic descriptive statistics (count, average and standard deviation) for a
* stream of key,value pairs. Use Stats(<ActorSystem>) to start the engine and return an actor that you can interact
* with using these two messages:
* - Input(key, value) to submit another value for calculation
* - Report() to request an overall report for all scores
*/
object Stats {
case class Report()
case class Input(val key: String, val value: Int)
case class Metric[T](val metric: String, val key: String, val result: T)
def apply(system: ActorSystem) : ActorRef = system.actorOf(Props[Stats], name = "stats")
}
class Stats extends Actor {
import Stats._
val average = context.actorOf(Props[Average], "average")
val stddev = context.actorOf(Props(new StdDev(average)), "stddev")
val batcher = context.actorOf(Props(new Batcher(Seq(average, stddev))), "batcher")
val metrics = Map.empty[String, Map[String, Any]].withDefaultValue(Map.empty[String, Any])
val changes = new AtomicBoolean(false)
def receive = {
case Input(key, value) => batcher ! (key, value);
case Metric(metric, key, result) => {
metrics(key) = metrics(key) + (metric -> result)
changes.set(true)
}
case Report() => {
if (changes.getAndSet(false)) {
val results = SortedSet(metrics.keys.toArray:_*).map { key =>
val results = metrics(key)
"%s,%d,%.4f,%.4f".format(key, results.getOrElse("count", 0),
results.getOrElse("avg", Double.NaN),
results.getOrElse("stddev", Double.NaN))
}
sender ! ("key,count,average,stddev" +: results.toSeq).mkString(sys.props("line.separator"))
}
}
}
}
class Batcher(val stats: Seq[ActorRef]) extends Actor {
val batches = Map.empty[String, Seq[Int]].withDefaultValue(Seq())
def receive = {
case (key: String, value: Int) => {
val batch = value +: batches(key)
batches(key) = batch
for (stat <- stats) stat.tell((key, batch), sender)
sender ! Metric("count", key, batch.size)
}
}
}
class Average extends Actor {
def receive = {
case (key: String, items: Seq[Int]) => sender ! Metric("avg", key, avg(items))
case items: Seq[Int] => sender ! avg(items)
}
def avg(items: scala.Seq[Int]): Double = {
items.foldLeft(0)(_ + _).toDouble / items.size
}
}
class StdDev(val average: ActorRef) extends Actor {
val sum_of_squares = context.actorOf(Props[SumOfSquares], "sum_of_squares")
implicit val timeout = Timeout(1 seconds)
def receive = {
case (key: String, items: Seq[Int]) if items.size > 1 =>
val original = sender
ask(average, items).onSuccess {
case avg : Double => {
ask(sum_of_squares, (items, avg)).onSuccess {
case ss : Double => {
original ! Metric("stddev", key, math.sqrt(ss / items.size - 1))
}
}
}
}
}
}
class SumOfSquares extends Actor {
def receive = {
case (items: Seq[Int], avg: Double) =>
sender ! items.map(item => math.pow(item.toDouble - avg, 2)).foldLeft(0d)(_ + _)
}
}