ESME-360: Finished work on XMPP Consumer component action.

git-svn-id: https://svn.apache.org/repos/asf/esme/branches/akka@1375985 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/build.sbt b/build.sbt
index e3dfbd1..16aebd5 100644
--- a/build.sbt
+++ b/build.sbt
@@ -48,7 +48,7 @@
     "org.scalaz" %% "scalaz-core" % scalazVersion % "compile->default",
     "com.typesafe.akka" % "akka-actor" % akkaVersion % "compile->default",
     "com.typesafe.akka" % "akka-camel" % "2.1-20120701-002745" % "compile->default",
-    "org.apache.camel" % "camel-xmpp" % "2.8.0" % "compile->default",
+    "org.apache.camel" % "camel-xmpp" % "2.10.0" % "compile->default",
     "javax.servlet" % "servlet-api" % "2.5" % "provided->default",
     "org.compass-project" % "compass" % compassVersion % "compile->default",
     "org.apache.lucene" % "lucene-core" % luceneVersion % "compile->default",
diff --git a/pom.xml b/pom.xml
index 2b1a75e..8d09437 100644
--- a/pom.xml
+++ b/pom.xml
@@ -226,7 +226,7 @@
         <dependency>

             <groupId>org.apache.camel</groupId>

             <artifactId>camel-xmpp</artifactId>

-            <version>2.8.0</version>

+            <version>2.10.0</version>

         </dependency>

         <dependency>

             <groupId>org.compass-project</groupId>

diff --git a/src/main/scala/bootstrap/liftweb/Boot.scala b/src/main/scala/bootstrap/liftweb/Boot.scala
index f0bd331..aac7c48 100644
--- a/src/main/scala/bootstrap/liftweb/Boot.scala
+++ b/src/main/scala/bootstrap/liftweb/Boot.scala
@@ -251,8 +251,9 @@
     ConvDistributor.touch
     // ScalaInterpreter.touch
 
-    val sys = ActorSystem("camel")
-    val xmppSupervisor = sys.actorOf(AkkaProps(new XmppSupervisor()), "XmppSupervisor")
+
+    val xmppSupervisor = AkkaActorSystem.sys.actorOf(AkkaProps(new XmppSupervisor()), "XmppSupervisor")
+    xmppSupervisor ! XmppSupervisor.Init()
 
     // Initiating popular links and resent messages
     val resentPeriod = Props.getLong("stats.resent.period", 1 week)
@@ -300,6 +301,11 @@
   private def makeUtf8(req: HTTPRequest) = {req.setCharacterEncoding("UTF-8")}
 }
 
+
+object AkkaActorSystem {
+  lazy val sys = ActorSystem("camel")
+}
+
 /*
 * Set up compass search environment
 */
diff --git a/src/main/scala/org/apache/esme/actor/UserActor.scala b/src/main/scala/org/apache/esme/actor/UserActor.scala
index 3f99c3d..ac906ca 100644
--- a/src/main/scala/org/apache/esme/actor/UserActor.scala
+++ b/src/main/scala/org/apache/esme/actor/UserActor.scala
@@ -31,6 +31,7 @@
 import lib._
 
 import akka.actor.{Props => AkkaProps, ActorSystem}
+import bootstrap.liftweb.AkkaActorSystem.sys
 import XmppSender._
 
 import java.util.{TimeZone, Calendar}
@@ -64,7 +65,6 @@
   val xmppUsr = Props.get("xmpp.user") openOr ""
   val xmppPwd = Props.get("xmpp.password") openOr ""
   val xmppServiceName = Props.get("xmpp.serviceName") openOr ""
-  val sys = ActorSystem("camel")
   val XmppSender = sys.actorOf(AkkaProps(new XmppSender(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName)), "XmppSender")
 }
 
@@ -290,8 +290,7 @@
               Distributor.AddMessageToMailbox(id, msg, ResendReason(userId))
 
             case XmppFrom(_) => {
-              val sys = ActorSystem("camel")
-              sys.actorFor("XmppSupervisor") ! XmppSupervisor.Fetch(td.performId)
+              sys.actorFor("akka://camel/user/XmppSupervisor") ! XmppSupervisor.Fetch(td.performId)
             }
 
 
diff --git a/src/main/scala/org/apache/esme/actor/XmppReceiver.scala b/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
index 8471062..be72364 100644
--- a/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
+++ b/src/main/scala/org/apache/esme/actor/XmppReceiver.scala
@@ -20,6 +20,10 @@
 
   def endpointUri = {val uri = "xmpp://%s@%s:%s/%s?password=%s" format (esmeUsr, esmeSrv, esmePort, participant, esmePwd); logger.info("XMPP URI is: %s".format(uri)); uri}
 
+  override def preStart() {
+    logger.info("XmppReceiver - preStart() called")
+  }
+
   def receive = {
     case msg: CamelMessage => {
       messages = (msg.bodyAs[String], System.currentTimeMillis) :: messages
diff --git a/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala b/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
index 62784e8..0334c08 100644
--- a/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
+++ b/src/main/scala/org/apache/esme/actor/XmppSupervisor.scala
@@ -11,6 +11,7 @@
 
 
   sealed trait XmppSupervisorActions
+  case class Init() extends XmppSupervisorActions
   case class Fetch(id: Long) extends XmppSupervisorActions
   case class Start(id: Long, who: String, usr: User) extends XmppSupervisorActions
   case class Stop(id: Long) extends XmppSupervisorActions
@@ -30,7 +31,7 @@
 
 
   override def preStart() {
-    logger.info("preStart() called")
+    logger.info("XmppSupervisor - preStart() called")
 
     xmppHost = Props.get("xmpp.host") openOr ""
     xmppPort = Props.get("xmpp.port") openOr ""
@@ -40,20 +41,22 @@
   }
 
   def receive = {
+    case Init() => logger.info("XmppSupervisor - Init message received")
     case Start(id, who, usr) => {
-      logger.info("Start message received. User: %s, who: %s".format(usr, who))
+      logger.info("XmppSupervisor - Start message received. User: %s, who: %s".format(usr, who))
       xmppPullActors += (id -> context.actorOf(AkkaProps(new XmppReceiver(xmppHost, xmppPort.toInt, xmppUsr, xmppPwd, xmppServiceName, who, usr))))
     }
     case Stop(id) => {
+      logger.info("XmppSupervisor - Stop message received")
       xmppPullActors.get(id).foreach { ref =>
         context.stop(ref)
         xmppPullActors -= id
       }
     }
     case Fetch(id) => {
-      logger.info("Fetch message received")
+      logger.info("XmppSupervisor - Fetch message received")
       xmppPullActors.get(id).foreach(ref => ref ! FetchMessages)
     }
-    case _ => logger.info("Unknown message received")
+    case _ => logger.info("XmppSupervisor - Unknown message received")
   }
 }
diff --git a/src/main/scala/org/apache/esme/model/Action.scala b/src/main/scala/org/apache/esme/model/Action.scala
index e641032..44a27f5 100644
--- a/src/main/scala/org/apache/esme/model/Action.scala
+++ b/src/main/scala/org/apache/esme/model/Action.scala
@@ -35,13 +35,11 @@
 import scala.xml.{Text, Node, Elem => XmlElem}
 
 import akka.actor.{Props => AkkaProps, ActorSystem}
+import bootstrap.liftweb.AkkaActorSystem.sys
 
 object Action extends Action with LongKeyedMetaMapper[Action] with Logger {
 
   val logger: Logger = Logger("org.apache.esme.model")
-  val sys = ActorSystem("camel")
-  val xmppSupervisor = sys.actorFor("XmppSupervisor")
-
 
   override def afterCommit = notifyDistributor _ :: super.afterCommit
 
@@ -50,7 +48,7 @@
                                                 Distributor.PerformTrackingType)
   
   }
-  
+
   override def create: Action = {
     val ap = super.create
     ap.createdDate(new Date())
@@ -66,7 +64,7 @@
     } else {
       SchedulerActor ! SchedulerActor.StopRegular(in.id)
       MessagePullActor ! MessagePullActor.StopPullActor(in.id)
-      xmppSupervisor ! XmppSupervisor.Stop(in.id)
+      sys.actorFor("akka://camel/user/XmppSupervisor") ! XmppSupervisor.Stop(in.id)
     }
   }
   
@@ -194,7 +192,6 @@
  */
 class Action extends LongKeyedMapper[Action] {
 
-  import Action.xmppSupervisor
   import Action.logger
 
   /**
@@ -230,8 +227,9 @@
       }
       case XmppFrom(who) => {
         User.find(user) match {
-          case Full(u) =>
-            xmppSupervisor ! XmppSupervisor.Start(id.is, who, u)
+          case Full(u) => {
+            sys.actorFor("akka://camel/user/XmppSupervisor") ! XmppSupervisor.Start(id.is, who, u)
+          }
           case _ =>
         }
       }