UIMA-1913 replace use of activemq utility functions with apr

git-svn-id: https://svn.apache.org/repos/asf/uima/uimacpp/trunk@1027691 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/utils/ActiveMQAnalysisEngineService.cpp b/src/utils/ActiveMQAnalysisEngineService.cpp
index dd0f648..381c9d0 100644
--- a/src/utils/ActiveMQAnalysisEngineService.cpp
+++ b/src/utils/ActiveMQAnalysisEngineService.cpp
@@ -24,12 +24,10 @@
 #include "ActiveMQAnalysisEngineService.hpp"
 #include "deployCppService.hpp"
 
-#include <activemq/concurrent/Thread.h>
-#include <activemq/concurrent/Runnable.h>
-#include <activemq/concurrent/Concurrent.h>
 #include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/core/ActiveMQProducer.h>
 #include <activemq/core/ActiveMQConstants.h>
-#include <activemq/util/Integer.h>
+#include <activemq/core/ActiveMQConnection.h>
 
 #include "uima/xmlwriter.hpp"
 #include "uima/xcasdeserializer.hpp"
@@ -38,14 +36,11 @@
 #include "uima/xmlerror_handler.hpp"
 
 using namespace activemq::core;
-using namespace activemq::util;
-using namespace activemq::exceptions;
-using namespace activemq::concurrent;
 using namespace uima;
 
 enum traceLevels {NONE, INFO, FINE, FINER, FINEST };
 traceLevels uimacpp_ee_tracelevel=INFO;
-#define MSGHEADER   apr_time_now() << " ThreadId: " << Thread::getId() << __FILE__ << ":" << __LINE__ 
+#define MSGHEADER   apr_time_now() << " ThreadId: " << apr_os_thread_current() << __FILE__ << ":" << __LINE__ 
 #define FORMATMSG(x)  stringstream lstr; lstr << MSGHEADER << " " << x;
 
 #define LOGINFO(n,x) { if (n > uimacpp_ee_tracelevel) {} else { FORMATMSG(x); logMessage(lstr.str().c_str()); } }
@@ -76,10 +71,19 @@
 //AMQConnection
 //---------------------------------------------------
  ConnectionFactory * AMQConnection::createConnectionFactory(ServiceParameters & params) {
+
+   //encode prefetch size in the broker url as there is no api to set it.
    stringstream str;
-   str << params.getBrokerURL() << "?jms.prefetchPolicy.queuePrefetch=" << params.getPrefetchSize() << endl;
-   ConnectionFactory * pConnFactory = ConnectionFactory::createCMSConnectionFactory(str.str());
-   cout << "AMQConnection()::createConnectionFactory " <<  params.getBrokerURL() << " prefetch=" << params.getPrefetchSize() << endl;
+   str << params.getBrokerURL();
+   if (str.str().find("?") == std::string::npos)
+     str << "?jms.prefetchPolicy.queuePrefetch=" << params.getPrefetchSize() ;
+   else 
+      str << "&jms.prefetchPolicy.queuePrefetch=" << params.getPrefetchSize() ;
+  
+   //str << params.getBrokerURL() << "?consumer.prefetchSize=" << params.getPrefetchSize() << endl;
+   //str << "tcp://127.0.0.1:61616";
+   ConnectionFactory * pConnFactory = ConnectionFactory::createCMSConnectionFactory(params.getBrokerURL());
+   cout << "AMQConnection()::createConnectionFactory " <<  str.str() << endl;
    return pConnFactory;
  };
 
@@ -148,6 +152,7 @@
             stringstream str;
             str << "AMQConnection::initialize() Failed to connect to " << iv_brokerURL
                          << e.getMessage() << ". Retrying..." << endl;
+			cout << e.getMessage() << endl;
             LOGWARN(str.str());
             retrying = true;
             apr_sleep(30000000); //wait 30 seconds to reconnect
@@ -161,7 +166,6 @@
 
       //default exception listener
       this->iv_pConnection->setExceptionListener(this);	
-      
       // Create a Producer Session
       LOGINFO(FINEST,"AMQConnection() create Producer Session " + iv_brokerURL);
       this->iv_pProducerSession = this->iv_pConnection->createSession( Session::AUTO_ACKNOWLEDGE );
@@ -192,8 +196,11 @@
           ErrorInfo::unrecoverable);
       }
       this->iv_pProducer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
-
-      //create TextMessage
+	  
+	  //TODO set sendTimeout ?
+      ////((ActiveMQProducer*)this->iv_pProducer)->setSendTimeout(3000);
+      
+	  //create TextMessage
       this->iv_pReplyMessage = this->iv_pProducerSession->createTextMessage();
       if (this->iv_pReplyMessage == NULL) {
         LOGERROR("AMQConnection() create textMessage failed. ");
@@ -278,7 +285,7 @@
 
 /* create a MessageConsumer session and register a MessageListener
    to receive messages from the input queue. */
-  void AMQConnection::createMessageConsumer(string queueName, string selector) {
+  void AMQConnection::createMessageConsumer(string queueName, string selector, int prefetchSize) {
     LOGINFO(FINEST, "AMQConnection::createMessageConsumer() consumer start " + queueName);
     this->iv_inputQueueName = queueName;
     stringstream str;
@@ -327,6 +334,7 @@
     }
 
     iv_selector = selector;
+	this->iv_prefetchSize = prefetchSize;
     if (selector.length() > 0) {
       this->iv_pConsumer = this->iv_pConsumerSession->createConsumer(this->iv_pInputQueue, iv_selector);
     } else {
@@ -345,7 +353,9 @@
         errInfo.getMessage().getMessageID(),
         ErrorInfo::unrecoverable);
     }
-    
+    //set prefetch size
+	//no api to set prefetchsize so encode in broker url;
+
     LOGINFO(FINEST, "AMQConnection::createMessageConsumer() " + queueName + " successful.");
   }
 
@@ -359,7 +369,6 @@
                        << iv_brokerURL << " is broken. Reconnecting ... " << ex.getMessage() << endl;
     LOGWARN(str.str());
   }
- 
 
   TextMessage * AMQConnection::getTextMessage() {
     if (this->iv_pReplyMessage == NULL) {
@@ -423,7 +432,9 @@
   }
 
   void AMQConnection::sendMessage(const Destination * cmsReplyTo) {
-    LOGINFO(FINEST, "AMQConnection::sendMessage() to " + cmsReplyTo->toProviderString());
+    LOGINFO(FINEST, "AMQConnection::sendMessage() to " +
+ ((Queue*)cmsReplyTo)->getQueueName() );
+    //LOGINFO(FINEST, "AMQConnection::sendMessage() to " + cmsReplyTo->toProviderString());
     if (this->iv_pProducer == NULL) {
       LOGERROR("AMQConnection::getTextMessage Invalid message producer. ");
       ErrorInfo errInfo;
@@ -435,11 +446,11 @@
         ErrorInfo::unrecoverable);
     }
     this->iv_pProducer->send(cmsReplyTo,this->iv_pReplyMessage);
-
     //cout << "producer->send elapsed time " << (apr_time_now() - stime) << endl;
     this->iv_pReplyMessage->clearBody();
     this->iv_pReplyMessage->clearProperties();
-    LOGINFO(4, "AMQConnection::sendMessage() successful to " + cmsReplyTo->toProviderString());
+    LOGINFO(4, "AMQConnection::sendMessage() successful to " + ((Queue*)cmsReplyTo)->getQueueName());
+	//LOGINFO(4, "AMQConnection::sendMessage() successful to " + cmsReplyTo->toProviderString());
   }
 
 // must be called to start receiving messages 
@@ -524,7 +535,7 @@
         apr_sleep(30000000); //wait 30 seconds to reconnect
         //cout << "AMQConnection::reconnect() calling initialize >>>" << endl;
         this->initialize();
-        this->createMessageConsumer(this->iv_inputQueueName,this->iv_selector);
+        this->createMessageConsumer(this->iv_inputQueueName,this->iv_selector, this->iv_prefetchSize);
         this->start();
         stringstream str;
         str << "AMQConnection::reconnect() Successfully reconnected to >>> " <<  iv_brokerURL << endl;
@@ -773,12 +784,19 @@
            LOGERROR("AMQListener::validateRequest " + errmsg);
            valid=false;
         } 
-        string text = textMessage->getText().c_str();
-        if (text.length() == 0) {
-          errmsg = "There is no payload data. Nothing to process.";
-          LOGERROR("AMQListener::validateRequest " + errmsg);
-          valid = false;  
-        }
+		
+		try {
+          string text = textMessage->getText().c_str();
+          if (text.length() == 0) {
+            errmsg = "There is no payload data. Nothing to process.";
+            LOGERROR("AMQListener::validateRequest " + errmsg);
+            valid = false;  
+          }
+		} catch (CMSException& e) {
+			errmsg = e.getMessage();
+			LOGERROR("AMQListener::validateRequest " + e.getMessage());
+			valid = false;
+		}
       }
     }
   }
@@ -918,7 +936,8 @@
         int payload = textMessage->getIntProperty("Payload");
         //get the text in the payload 
         string text = textMessage->getText().c_str();
-
+        UnicodeString utext(text.c_str());
+		text = UnicodeStringRef(utext).asUTF8();
         astr.str("");
         astr << "Payload: " << payload << " Content: " << text ;
         LOGINFO(FINER, astr.str());
@@ -930,6 +949,7 @@
 
         //reset the CAS
         iv_pCas->reset();
+        ios::openmode mode = ios::binary;
         stringstream xmlstr;  
         //deserialize payload data into the CAS,
         //call AE process method and serialize
@@ -969,6 +989,7 @@
           LOGINFO(FINEST, "AMQListener::handleRequest() calling serialize.");
           XmiWriter xmiwriter(*iv_pCas, true, &sharedData);
           xmiwriter.write(xmlstr);
+          //cout << "SERIALIZED CAS " << xmlstr.str() << endl;
           timeToSerializeCAS = apr_time_now() - startSerialize;
           LOGINFO(FINEST, "AMQListener::handleRequest() done processing CAS.");
         }
@@ -1149,12 +1170,13 @@
        LOGINFO(INFO,str.str());
 
        if (cmsReplyTo != NULL) {
-         str << " to " << cmsReplyTo->toProviderString();
+         str << " to " << ((Queue*)cmsReplyTo)->getQueueName();
        } else {
          str << " to " << request->getStringProperty("MessageFrom") 
            << " at " << serverURI;
        }
        str << " Message text: " << msgContent;
+       //std::cout << "****Reply" << msgContent << std::endl;
        LOGINFO(FINEST,"PRINT Reply message:\n" + str.str());	
 
        //send
@@ -1218,6 +1240,7 @@
       //create connection factory
       LOGINFO(FINEST,"AMQAnalysisEngineService::initialize() Create connection factory");
       this->iv_pConnFact = AMQConnection::createConnectionFactory(params);
+	  
       if (iv_pConnFact == NULL) {
         ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
         msg.addParam("AMQAnalysisEngineService::initialize() Failed to create connection factory.");
@@ -1318,7 +1341,7 @@
         this->iv_listeners[i] = newListener;
         //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() create message consumer " << endl;
         //create MessageConsumer session and register Listener       
-        this->iv_vecpConnections.at(i)->createMessageConsumer(iv_inputQueueName,annotator_selector);  
+        this->iv_vecpConnections.at(i)->createMessageConsumer(iv_inputQueueName,annotator_selector, params.getPrefetchSize());  
      } 
      
      iv_pMonitor->setNumberOfInstances(iv_numInstances);
@@ -1360,7 +1383,7 @@
       }
       this->iv_listeners[this->iv_numInstances] = newListener; 
       iv_pgetMetaConnection->createMessageConsumer(iv_inputQueueName, 
-                      getmeta_selector); 
+                      getmeta_selector,0); 
       this->iv_pMonitor->setGetMetaListenerId(iv_numInstances);
       //cout <<  __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() done" << endl;
     } catch (uima::Exception e) {
@@ -1511,7 +1534,8 @@
         break;
       }
     }
-    Thread::sleep(1000);
+    //Thread::sleep(1000);
+	apr_sleep(1000000);
   }
 }
 
diff --git a/src/utils/ActiveMQAnalysisEngineService.hpp b/src/utils/ActiveMQAnalysisEngineService.hpp
index 3d990d6..83b420a 100644
--- a/src/utils/ActiveMQAnalysisEngineService.hpp
+++ b/src/utils/ActiveMQAnalysisEngineService.hpp
@@ -84,6 +84,7 @@
   MessageListener * iv_pListener;
   TextMessage * iv_pReceivedMessage;
   string iv_selector;
+  int iv_prefetchSize;
 	
   //producer session
   Session * iv_pProducerSession;
@@ -93,7 +94,6 @@
   
   void initialize();
 public:
-
   static ConnectionFactory * createConnectionFactory(ServiceParameters & params);
   
 	/** Establish connection to the broker and create a Message Producer session. 
@@ -103,7 +103,7 @@
   /** Creates a MessageConsumer session and registers a listener. 
       Caller owns the listener. */
 	//void createMessageConsumer(string aQueueName); 
-  void createMessageConsumer(string aQueueName, string selector); 
+  void createMessageConsumer(string aQueueName, string selector, int prefetchSize); 
  	
   /** destructor */
 	~AMQConnection();
diff --git a/src/utils/deployCppService.cpp b/src/utils/deployCppService.cpp
index a2596ab..7a22455 100644
--- a/src/utils/deployCppService.cpp
+++ b/src/utils/deployCppService.cpp
@@ -25,7 +25,7 @@
 #include "deployCppService.hpp"
 #include <activemq/core/ActiveMQConsumer.h>
 using namespace activemq::exceptions;
-using namespace std;
+
 int shutdown_service;
 //===================================================
 //Main
@@ -47,7 +47,6 @@
   cout << __FILE__ << " Starting the UIMA C++ Remote Service using ActiveMQ broker." << std::endl;
  // cout << "-----------------------------------------------------\n";
   // _CrtSetBreakAlloc(1988);
-
   try {
     if (argc < 3) {
       printUsage();
@@ -85,7 +84,7 @@
     aeService.startProcessingThreads();
 
     /*start receiving messages*/ 
-    cout << __FILE__ << " Start receiving messages " << endl;
+    cout << __FILE__ << __LINE__  << " Start receiving messages " << endl;
     aeService.start();
 
     cout << __FILE__ << " UIMA C++ Service " << serviceDesc.getQueueName() << " at " <<
@@ -193,6 +192,7 @@
   //cout << "-----------------------------------------------------\n";    
   cout << __FILE__ " UIMA C++ Remote Service terminated:\n";
   //cout << "=====================================================\n";    
+
    exit(0);
 
 }  //main