ESME-360: Finished work on XMPP Consumer component action.
git-svn-id: https://svn.apache.org/repos/asf/esme/branches/akka@1375985 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/build.sbt b/build.sbt
index e3dfbd1..16aebd5 100644
--- a/build.sbt
+++ b/build.sbt
@@ -48,7 +48,7 @@
"org.scalaz" %% "scalaz-core" % scalazVersion % "compile->default",
"com.typesafe.akka" % "akka-actor" % akkaVersion % "compile->default",
"com.typesafe.akka" % "akka-camel" % "2.1-20120701-002745" % "compile->default",
- "org.apache.camel" % "camel-xmpp" % "2.8.0" % "compile->default",
+ "org.apache.camel" % "camel-xmpp" % "2.10.0" % "compile->default",
"javax.servlet" % "servlet-api" % "2.5" % "provided->default",
"org.compass-project" % "compass" % compassVersion % "compile->default",
"org.apache.lucene" % "lucene-core" % luceneVersion % "compile->default",
diff --git a/pom.xml b/pom.xml
index 2b1a75e..8d09437 100644
--- a/pom.xml
+++ b/pom.xml
@@ -226,7 +226,7 @@
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-xmpp</artifactId>
- <version>2.8.0</version>
+ <version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.compass-project</groupId>
diff --git a/src/main/scala/bootstrap/liftweb/Boot.scala b/src/main/scala/bootstrap/liftweb/Boot.scala
index f0bd331..aac7c48 100644
--- a/src/main/scala/bootstrap/liftweb/Boot.scala
+++ b/src/main/scala/bootstrap/liftweb/Boot.scala
@@ -251,8 +251,9 @@
ConvDistributor.touch
// ScalaInterpreter.touch
- val sys = ActorSystem("camel")
- val xmppSupervisor = sys.actorOf(AkkaProps(new XmppSupervisor()), "XmppSupervisor")
+
+ val xmppSupervisor = AkkaActorSystem.sys.actorOf(AkkaProps(new XmppSupervisor()), "XmppSupervisor")
+ xmppSupervisor ! XmppSupervisor.Init()
// Initiating popular links and resent messages
val resentPeriod = Props.getLong("stats.resent.period", 1 week)
@@ -300,6 +301,11 @@
private def makeUtf8(req: HTTPRequest) = {req.setCharacterEncoding("UTF-8")}
}
+
+object AkkaActorSystem {
+ lazy val sys = ActorSystem("camel")
+}
+
/*
* Set up compass search environment
*/
diff --git a/src/main/scala/org/apache/esme/actor/UserActor.scala b/src/main/scala/org/apache/esme/actor/UserActor.scala
index 3f99c3d..ac906ca 100644
--- a/src/main/scala/org/apache/esme/actor/UserActor.scala
+++ b/src/main/scala/org/apache/esme/actor/UserActor.scala
@@ -31,6 +31,7 @@
import lib._
import akka.actor.{Props => AkkaProps, ActorSystem}
+import bootstrap.liftweb.AkkaActorSystem.sys
import XmppSender._
import java.util.{TimeZone, Calendar}
@@ -64,7 +65,6 @@
val xmppUsr = Props.get("xmpp.user") openOr ""
val xmppPwd = Props.get("xmpp.password") openOr ""
val xmppServiceName = Props.get("xmpp.serviceName") openOr ""
- val sys = ActorSystem("camel")
val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)), "XmppSender")
}
@@ -290,8 +290,7 @@
Distributor.AddMessageToMailbox(id, msg, ResendReason(userId))
case XmppFrom(_) => {
- val sys = ActorSystem("camel")
- sys.actorFor("XmppSupervisor") ! XmppSupervisor.Fetch(td.performId)
+ sys.actorFor("akka://camel/user/XmppSupervisor") ! XmppSupervisor.Fetch(td.performId)
}
diff --git a/src/main/scala/org/apache/esme/actor/XmppReceiver.scala b/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
index 8471062..be72364 100644
--- a/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
+++ b/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
@@ -20,6 +20,10 @@
def endpointUri = {val uri = "xmpp://%s@%s:%s/%s?password=%s" format (esmeUsr, esmeSrv, esmePort, participant, esmePwd); logger.info("XMPP URI is: %s".format(uri)); uri}
+ override def preStart() {
+ logger.info("XmppReceiver - preStart() called")
+ }
+
def receive = {
case msg: CamelMessage => {
messages = (msg.bodyAs[String], System.currentTimeMillis) :: messages
diff --git a/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala b/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
index 62784e8..0334c08 100644
--- a/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
+++ b/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
@@ -11,6 +11,7 @@
sealed trait XmppSupervisorActions
+ case class Init() extends XmppSupervisorActions
case class Fetch(id: Long) extends XmppSupervisorActions
case class Start(id: Long, who: String, usr: User) extends XmppSupervisorActions
case class Stop(id: Long) extends XmppSupervisorActions
@@ -30,7 +31,7 @@
override def preStart() {
- logger.info("preStart() called")
+ logger.info("XmppSupervisor - preStart() called")
xmppHost = Props.get("xmpp.host") openOr ""
xmppPort = Props.get("xmpp.port") openOr ""
@@ -40,20 +41,22 @@
}
def receive = {
+ case Init() => logger.info("XmppSupervisor - Init message received")
case Start(id, who, usr) => {
- logger.info("Start message received. User: %s, who: %s".format(usr, who))
+ logger.info("XmppSupervisor - Start message received. User: %s, who: %s".format(usr, who))
xmppPullActors += (id -> context.actorOf(AkkaProps(new XmppReceiver(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName, who, usr))))
}
case Stop(id) => {
+ logger.info("XmppSupervisor - Stop message received")
xmppPullActors.get(id).foreach { ref =>
context.stop(ref)
xmppPullActors -= id
}
}
case Fetch(id) => {
- logger.info("Fetch message received")
+ logger.info("XmppSupervisor - Fetch message received")
xmppPullActors.get(id).foreach(ref => ref ! FetchMessages)
}
- case _ => logger.info("Unknown message received")
+ case _ => logger.info("XmppSupervisor - Unknown message received")
}
}
diff --git a/src/main/scala/org/apache/esme/model/Action.scala b/src/main/scala/org/apache/esme/model/Action.scala
index e641032..44a27f5 100644
--- a/src/main/scala/org/apache/esme/model/Action.scala
+++ b/src/main/scala/org/apache/esme/model/Action.scala
@@ -35,13 +35,11 @@
import scala.xml.{Text, Node, Elem => XmlElem}
import akka.actor.{Props => AkkaProps, ActorSystem}
+import bootstrap.liftweb.AkkaActorSystem.sys
object Action extends Action with LongKeyedMetaMapper[Action] with Logger {
val logger: Logger = Logger("org.apache.esme.model")
- val sys = ActorSystem("camel")
- val xmppSupervisor = sys.actorFor("XmppSupervisor")
-
override def afterCommit = notifyDistributor _ :: super.afterCommit
@@ -50,7 +48,7 @@
Distributor.PerformTrackingType)
}
-
+
override def create: Action = {
val ap = super.create
ap.createdDate(new Date())
@@ -66,7 +64,7 @@
} else {
SchedulerActor ! SchedulerActor.StopRegular(in.id)
MessagePullActor ! MessagePullActor.StopPullActor(in.id)
- xmppSupervisor ! XmppSupervisor.Stop(in.id)
+ sys.actorFor("akka://camel/user/XmppSupervisor") ! XmppSupervisor.Stop(in.id)
}
}
@@ -194,7 +192,6 @@
*/
class Action extends LongKeyedMapper[Action] {
- import Action.xmppSupervisor
import Action.logger
/**
@@ -230,8 +227,9 @@
}
case XmppFrom(who) => {
User.find(user) match {
- case Full(u) =>
- xmppSupervisor ! XmppSupervisor.Start(id.is, who, u)
+ case Full(u) => {
+ sys.actorFor("akka://camel/user/XmppSupervisor") ! XmppSupervisor.Start(id.is, who, u)
+ }
case _ =>
}
}