blob: c09eb28708f72f1272f0b852eaa9f1ad96cfa4a3 [file] [log] [blame]
package org.apache.esme.actor
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.apache.esme.actor.Distributor.AddMessageToMailbox
import org.apache.esme.model._
import net.liftweb.http.ActorWatcher
import net.liftweb.util._
import Helpers._
import net.liftweb.common._
import net.liftweb.actor._
object PopStatsActor extends LiftActor {
import scala.collection.mutable.Map
private val actors: Map[StatParam,Map[Long,PopStatsActor]] = Map()
protected def now = System.currentTimeMillis
protected def messageHandler = {
case StartStats(what, period, refresh) =>
if (!actors.contains(what)) {
actors(what) = Map()
}
val stat = actors(what)
if(!stat.contains(period)) {
val statActor = new PopStatsActor(period, refresh)
stat(period) = statActor
statActor ! StartUp
}
case StopStats(what, period) => // TODO: not used
case TopStats(what, n, period) =>
(for (stat <- actors.get(what);
availableActor <- stat.get(period)) yield availableActor) match {
case Some(statActor) => forwardMessageTo(Top(n), statActor)
case _ => reply(Nil)
}
case IncrStats(what, hitItem) =>
for (stat <- actors.get(what);
statActor <- stat.values)
statActor ! Hit(hitItem)
}
// do nothing
def touch {
}
case object StartUp
case object ByeBye
case class Top(n: Int)
case class Hit(id: Long)
case object Expire
case class StartStats(what: StatParam, period: Long, refresh: Long)
case class StopStats(what: StatParam, period: Long)
case class TopStats(what: StatParam, n: Byte, period: Long)
case class IncrStats(what: StatParam, hitItem: Long)
private class PopStatsActor(period: Long,
refreshInterval: Long) extends LiftActor {
private var queue: List[StatEvent] = List()
private var stats: Map[Long,Int] = Map()
private var running = true
protected def messageHandler = {
case StartUp => ActorPing.schedule(this, Expire, refreshInterval)
case ByeBye =>
running = false
case Hit(id) =>
queue ::= StatEvent(id, now)
stats += (id -> (stats.getOrElse(id,0) + 1))
case Top(n) =>
val topList = stats.toList.sortWith{
case ((_,freq1),(_,freq2)) =>
freq2 < freq1
}.take(n)
reply(topList)
case Expire => {
val (live, expired) = queue.partition(_.when + period > now)
expired.foreach(stats -= _.id)
ActorPing.schedule(this, Expire, refreshInterval)
live
}
}
case class StatEvent(id: Long, when: Long)
}
}
sealed trait StatParam
case object ResendStat extends StatParam
case object LinkClickedStat extends StatParam