UIMA-1913 replace use of activemq utility functions with apr
git-svn-id: https://svn.apache.org/repos/asf/uima/uimacpp/trunk@1027691 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/utils/ActiveMQAnalysisEngineService.cpp b/src/utils/ActiveMQAnalysisEngineService.cpp
index dd0f648..381c9d0 100644
--- a/src/utils/ActiveMQAnalysisEngineService.cpp
+++ b/src/utils/ActiveMQAnalysisEngineService.cpp
@@ -24,12 +24,10 @@
#include "ActiveMQAnalysisEngineService.hpp"
#include "deployCppService.hpp"
-#include <activemq/concurrent/Thread.h>
-#include <activemq/concurrent/Runnable.h>
-#include <activemq/concurrent/Concurrent.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/core/ActiveMQProducer.h>
#include <activemq/core/ActiveMQConstants.h>
-#include <activemq/util/Integer.h>
+#include <activemq/core/ActiveMQConnection.h>
#include "uima/xmlwriter.hpp"
#include "uima/xcasdeserializer.hpp"
@@ -38,14 +36,11 @@
#include "uima/xmlerror_handler.hpp"
using namespace activemq::core;
-using namespace activemq::util;
-using namespace activemq::exceptions;
-using namespace activemq::concurrent;
using namespace uima;
enum traceLevels {NONE, INFO, FINE, FINER, FINEST };
traceLevels uimacpp_ee_tracelevel=INFO;
-#define MSGHEADER apr_time_now() << " ThreadId: " << Thread::getId() << __FILE__ << ":" << __LINE__
+#define MSGHEADER apr_time_now() << " ThreadId: " << apr_os_thread_current() << __FILE__ << ":" << __LINE__
#define FORMATMSG(x) stringstream lstr; lstr << MSGHEADER << " " << x;
#define LOGINFO(n,x) { if (n > uimacpp_ee_tracelevel) {} else { FORMATMSG(x); logMessage(lstr.str().c_str()); } }
@@ -76,10 +71,19 @@
//AMQConnection
//---------------------------------------------------
ConnectionFactory * AMQConnection::createConnectionFactory(ServiceParameters & params) {
+
+ //encode prefetch size in the broker url as there is no api to set it.
stringstream str;
- str << params.getBrokerURL() << "?jms.prefetchPolicy.queuePrefetch=" << params.getPrefetchSize() << endl;
- ConnectionFactory * pConnFactory = ConnectionFactory::createCMSConnectionFactory(str.str());
- cout << "AMQConnection()::createConnectionFactory " << params.getBrokerURL() << " prefetch=" << params.getPrefetchSize() << endl;
+ str << params.getBrokerURL();
+ if (str.str().find("?") == std::string::npos)
+ str << "?jms.prefetchPolicy.queuePrefetch=" << params.getPrefetchSize() ;
+ else
+ str << "&jms.prefetchPolicy.queuePrefetch=" << params.getPrefetchSize() ;
+
+ //str << params.getBrokerURL() << "?consumer.prefetchSize=" << params.getPrefetchSize() << endl;
+ //str << "tcp://127.0.0.1:61616";
+ ConnectionFactory * pConnFactory = ConnectionFactory::createCMSConnectionFactory(params.getBrokerURL());
+ cout << "AMQConnection()::createConnectionFactory " << str.str() << endl;
return pConnFactory;
};
@@ -148,6 +152,7 @@
stringstream str;
str << "AMQConnection::initialize() Failed to connect to " << iv_brokerURL
<< e.getMessage() << ". Retrying..." << endl;
+ cout << e.getMessage() << endl;
LOGWARN(str.str());
retrying = true;
apr_sleep(30000000); //wait 30 seconds to reconnect
@@ -161,7 +166,6 @@
//default exception listener
this->iv_pConnection->setExceptionListener(this);
-
// Create a Producer Session
LOGINFO(FINEST,"AMQConnection() create Producer Session " + iv_brokerURL);
this->iv_pProducerSession = this->iv_pConnection->createSession( Session::AUTO_ACKNOWLEDGE );
@@ -192,8 +196,11 @@
ErrorInfo::unrecoverable);
}
this->iv_pProducer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
-
- //create TextMessage
+
+ //TODO set sendTimeout ?
+ ////((ActiveMQProducer*)this->iv_pProducer)->setSendTimeout(3000);
+
+ //create TextMessage
this->iv_pReplyMessage = this->iv_pProducerSession->createTextMessage();
if (this->iv_pReplyMessage == NULL) {
LOGERROR("AMQConnection() create textMessage failed. ");
@@ -278,7 +285,7 @@
/* create a MessageConsumer session and register a MessageListener
to receive messages from the input queue. */
- void AMQConnection::createMessageConsumer(string queueName, string selector) {
+ void AMQConnection::createMessageConsumer(string queueName, string selector, int prefetchSize) {
LOGINFO(FINEST, "AMQConnection::createMessageConsumer() consumer start " + queueName);
this->iv_inputQueueName = queueName;
stringstream str;
@@ -327,6 +334,7 @@
}
iv_selector = selector;
+ this->iv_prefetchSize = prefetchSize;
if (selector.length() > 0) {
this->iv_pConsumer = this->iv_pConsumerSession->createConsumer(this->iv_pInputQueue, iv_selector);
} else {
@@ -345,7 +353,9 @@
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
-
+ //set prefetch size
+ //no api to set prefetchsize so encode in broker url;
+
LOGINFO(FINEST, "AMQConnection::createMessageConsumer() " + queueName + " successful.");
}
@@ -359,7 +369,6 @@
<< iv_brokerURL << " is broken. Reconnecting ... " << ex.getMessage() << endl;
LOGWARN(str.str());
}
-
TextMessage * AMQConnection::getTextMessage() {
if (this->iv_pReplyMessage == NULL) {
@@ -423,7 +432,9 @@
}
void AMQConnection::sendMessage(const Destination * cmsReplyTo) {
- LOGINFO(FINEST, "AMQConnection::sendMessage() to " + cmsReplyTo->toProviderString());
+ LOGINFO(FINEST, "AMQConnection::sendMessage() to " +
+ ((Queue*)cmsReplyTo)->getQueueName() );
+ //LOGINFO(FINEST, "AMQConnection::sendMessage() to " + cmsReplyTo->toProviderString());
if (this->iv_pProducer == NULL) {
LOGERROR("AMQConnection::getTextMessage Invalid message producer. ");
ErrorInfo errInfo;
@@ -435,11 +446,11 @@
ErrorInfo::unrecoverable);
}
this->iv_pProducer->send(cmsReplyTo,this->iv_pReplyMessage);
-
//cout << "producer->send elapsed time " << (apr_time_now() - stime) << endl;
this->iv_pReplyMessage->clearBody();
this->iv_pReplyMessage->clearProperties();
- LOGINFO(4, "AMQConnection::sendMessage() successful to " + cmsReplyTo->toProviderString());
+ LOGINFO(4, "AMQConnection::sendMessage() successful to " + ((Queue*)cmsReplyTo)->getQueueName());
+ //LOGINFO(4, "AMQConnection::sendMessage() successful to " + cmsReplyTo->toProviderString());
}
// must be called to start receiving messages
@@ -524,7 +535,7 @@
apr_sleep(30000000); //wait 30 seconds to reconnect
//cout << "AMQConnection::reconnect() calling initialize >>>" << endl;
this->initialize();
- this->createMessageConsumer(this->iv_inputQueueName,this->iv_selector);
+ this->createMessageConsumer(this->iv_inputQueueName,this->iv_selector, this->iv_prefetchSize);
this->start();
stringstream str;
str << "AMQConnection::reconnect() Successfully reconnected to >>> " << iv_brokerURL << endl;
@@ -773,12 +784,19 @@
LOGERROR("AMQListener::validateRequest " + errmsg);
valid=false;
}
- string text = textMessage->getText().c_str();
- if (text.length() == 0) {
- errmsg = "There is no payload data. Nothing to process.";
- LOGERROR("AMQListener::validateRequest " + errmsg);
- valid = false;
- }
+
+ try {
+ string text = textMessage->getText().c_str();
+ if (text.length() == 0) {
+ errmsg = "There is no payload data. Nothing to process.";
+ LOGERROR("AMQListener::validateRequest " + errmsg);
+ valid = false;
+ }
+ } catch (CMSException& e) {
+ errmsg = e.getMessage();
+ LOGERROR("AMQListener::validateRequest " + e.getMessage());
+ valid = false;
+ }
}
}
}
@@ -918,7 +936,8 @@
int payload = textMessage->getIntProperty("Payload");
//get the text in the payload
string text = textMessage->getText().c_str();
-
+ UnicodeString utext(text.c_str());
+ text = UnicodeStringRef(utext).asUTF8();
astr.str("");
astr << "Payload: " << payload << " Content: " << text ;
LOGINFO(FINER, astr.str());
@@ -930,6 +949,7 @@
//reset the CAS
iv_pCas->reset();
+ ios::openmode mode = ios::binary;
stringstream xmlstr;
//deserialize payload data into the CAS,
//call AE process method and serialize
@@ -969,6 +989,7 @@
LOGINFO(FINEST, "AMQListener::handleRequest() calling serialize.");
XmiWriter xmiwriter(*iv_pCas, true, &sharedData);
xmiwriter.write(xmlstr);
+ //cout << "SERIALIZED CAS " << xmlstr.str() << endl;
timeToSerializeCAS = apr_time_now() - startSerialize;
LOGINFO(FINEST, "AMQListener::handleRequest() done processing CAS.");
}
@@ -1149,12 +1170,13 @@
LOGINFO(INFO,str.str());
if (cmsReplyTo != NULL) {
- str << " to " << cmsReplyTo->toProviderString();
+ str << " to " << ((Queue*)cmsReplyTo)->getQueueName();
} else {
str << " to " << request->getStringProperty("MessageFrom")
<< " at " << serverURI;
}
str << " Message text: " << msgContent;
+ //std::cout << "****Reply" << msgContent << std::endl;
LOGINFO(FINEST,"PRINT Reply message:\n" + str.str());
//send
@@ -1218,6 +1240,7 @@
//create connection factory
LOGINFO(FINEST,"AMQAnalysisEngineService::initialize() Create connection factory");
this->iv_pConnFact = AMQConnection::createConnectionFactory(params);
+
if (iv_pConnFact == NULL) {
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQAnalysisEngineService::initialize() Failed to create connection factory.");
@@ -1318,7 +1341,7 @@
this->iv_listeners[i] = newListener;
//cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() create message consumer " << endl;
//create MessageConsumer session and register Listener
- this->iv_vecpConnections.at(i)->createMessageConsumer(iv_inputQueueName,annotator_selector);
+ this->iv_vecpConnections.at(i)->createMessageConsumer(iv_inputQueueName,annotator_selector, params.getPrefetchSize());
}
iv_pMonitor->setNumberOfInstances(iv_numInstances);
@@ -1360,7 +1383,7 @@
}
this->iv_listeners[this->iv_numInstances] = newListener;
iv_pgetMetaConnection->createMessageConsumer(iv_inputQueueName,
- getmeta_selector);
+ getmeta_selector,0);
this->iv_pMonitor->setGetMetaListenerId(iv_numInstances);
//cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() done" << endl;
} catch (uima::Exception e) {
@@ -1511,7 +1534,8 @@
break;
}
}
- Thread::sleep(1000);
+ //Thread::sleep(1000);
+ apr_sleep(1000000);
}
}
diff --git a/src/utils/ActiveMQAnalysisEngineService.hpp b/src/utils/ActiveMQAnalysisEngineService.hpp
index 3d990d6..83b420a 100644
--- a/src/utils/ActiveMQAnalysisEngineService.hpp
+++ b/src/utils/ActiveMQAnalysisEngineService.hpp
@@ -84,6 +84,7 @@
MessageListener * iv_pListener;
TextMessage * iv_pReceivedMessage;
string iv_selector;
+ int iv_prefetchSize;
//producer session
Session * iv_pProducerSession;
@@ -93,7 +94,6 @@
void initialize();
public:
-
static ConnectionFactory * createConnectionFactory(ServiceParameters & params);
/** Establish connection to the broker and create a Message Producer session.
@@ -103,7 +103,7 @@
/** Creates a MessageConsumer session and registers a listener.
Caller owns the listener. */
//void createMessageConsumer(string aQueueName);
- void createMessageConsumer(string aQueueName, string selector);
+ void createMessageConsumer(string aQueueName, string selector, int prefetchSize);
/** destructor */
~AMQConnection();
diff --git a/src/utils/deployCppService.cpp b/src/utils/deployCppService.cpp
index a2596ab..7a22455 100644
--- a/src/utils/deployCppService.cpp
+++ b/src/utils/deployCppService.cpp
@@ -25,7 +25,7 @@
#include "deployCppService.hpp"
#include <activemq/core/ActiveMQConsumer.h>
using namespace activemq::exceptions;
-using namespace std;
+
int shutdown_service;
//===================================================
//Main
@@ -47,7 +47,6 @@
cout << __FILE__ << " Starting the UIMA C++ Remote Service using ActiveMQ broker." << std::endl;
// cout << "-----------------------------------------------------\n";
// _CrtSetBreakAlloc(1988);
-
try {
if (argc < 3) {
printUsage();
@@ -85,7 +84,7 @@
aeService.startProcessingThreads();
/*start receiving messages*/
- cout << __FILE__ << " Start receiving messages " << endl;
+ cout << __FILE__ << __LINE__ << " Start receiving messages " << endl;
aeService.start();
cout << __FILE__ << " UIMA C++ Service " << serviceDesc.getQueueName() << " at " <<
@@ -193,6 +192,7 @@
//cout << "-----------------------------------------------------\n";
cout << __FILE__ " UIMA C++ Remote Service terminated:\n";
//cout << "=====================================================\n";
+
exit(0);
} //main