blob: b27ef218d58f2e7275b927594f496ff7e534f438 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* Analysis Engine service wrapper implementation based on
* Active MQ C++ client.
*/
#include "ActiveMQAnalysisEngineService.hpp"
#include "deployCppService.hpp"
#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Runnable.h>
#include <activemq/concurrent/Concurrent.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/util/Integer.h>
#include "uima/xmlwriter.hpp"
#include "uima/xcasdeserializer.hpp"
#include "uima/xmiwriter.hpp"
#include "uima/xmideserializer.hpp"
#include "uima/xmlerror_handler.hpp"
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::exceptions;
using namespace activemq::concurrent;
using namespace uima;
enum traceLevels {NONE, INFO, FINE, FINER, FINEST };
traceLevels uimacpp_ee_tracelevel=INFO;
#define MSGHEADER apr_time_now() << " ThreadId: " << Thread::getId() << __FILE__ << ":" << __LINE__
#define FORMATMSG(x) stringstream lstr; lstr << MSGHEADER << " " << x;
#define LOGINFO(n,x) { if (n > uimacpp_ee_tracelevel) {} else { FORMATMSG(x); logMessage(lstr.str().c_str()); } }
#define LOGERROR(x){ FORMATMSG(x); logError(lstr.str().c_str()); }
#define LOGWARN(x) { FORMATMSG(x); logWarning(lstr.str().c_str());}
//===================================================
//AMQConnection
//---------------------------------------------------
AMQConnection::AMQConnection( const char * aBrokerURL, Monitor * pStatistics) :
iv_brokerURL(aBrokerURL),
iv_pConnection(0),
iv_pConsumerSession(0),
iv_pConsumer(0),
iv_inputQueueName(),
iv_pInputQueue(0),
iv_pListener(0),
iv_pProducerSession(0),
iv_pProducer(0),
iv_pTextMessage(0),
iv_replyDestinations(),
iv_valid(false) {
try {
iv_pMonitor = pStatistics;
LOGINFO(INFO, "AMQConnection() connecting to " + iv_brokerURL);
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory =
new ActiveMQConnectionFactory(iv_brokerURL);
// Create a Connection
if (connectionFactory == NULL) {
LOGERROR("AMQConnection() could not create connection factory");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection() could not create connection factory");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_pConnection = connectionFactory->createConnection();
if (this->iv_pConnection == NULL) {
LOGERROR("AMQConnection() could not create connection.");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection() could not create connection to " + iv_brokerURL);
ErrorInfo errInfo;
errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE);
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
} else {
//default exception listener
this->iv_pConnection->setExceptionListener(this);
}
delete connectionFactory;
// Create a Producer Session
LOGINFO(FINEST,"AMQConnection() create Producer Session " + iv_brokerURL);
this->iv_pProducerSession = this->iv_pConnection->createSession( Session::AUTO_ACKNOWLEDGE );
if (this->iv_pProducerSession == NULL) {
LOGERROR("AMQConnection() createSession() failed.");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection() createSession failed." );
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_pProducer = this->iv_pProducerSession->createProducer(NULL);
if (this->iv_pProducer == NULL) {
LOGERROR("AMQConnection() could not create MessageProducer ");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection() create MessageProducer failed.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_pProducer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
//create TextMessage
this->iv_pTextMessage = this->iv_pProducerSession->createTextMessage();
if (this->iv_pTextMessage == NULL) {
LOGERROR("AMQConnection() create textMessage failed. ");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection() failed to create message.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_valid = true;
LOGINFO(0, "AMQConnection() connected successfully to " + iv_brokerURL);
} catch (cms::CMSException& e) {
LOGERROR("AMQConnection(): " + e.getMessage());
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam(e.getMessage());
ErrorInfo errInfo;
errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE);
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
} catch (...) {
cout << "... " << endl;
LOGERROR("AMQConnection() failed to create a connection");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection() create connection failed");
ErrorInfo errInfo;
errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE);
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
}
/* destructor */
AMQConnection::~AMQConnection() {
if (this->iv_pConsumer != NULL) {
delete this->iv_pConsumer;
this->iv_pConsumer = NULL;
}
if (this->iv_pProducer != NULL) {
delete this->iv_pProducer;
this->iv_pProducer = NULL;
}
if (this->iv_pConsumerSession != NULL) {
delete this->iv_pConsumerSession;
this->iv_pConsumerSession = NULL;
}
if (this->iv_pProducerSession != NULL) {
delete this->iv_pProducerSession;
this->iv_pProducerSession = NULL;
}
if (this->iv_pInputQueue != NULL) {
delete this->iv_pInputQueue;
this->iv_pInputQueue=NULL;
}
if (this->iv_pConnection != NULL) {
this->iv_pConnection->close();
delete this->iv_pConnection;
this->iv_pConnection = NULL;
}
if (this->iv_pTextMessage != NULL) {
delete this->iv_pTextMessage;
this->iv_pTextMessage=NULL;
}
//destinations
map<string, cms::Destination*>::iterator ite;
for (ite= iv_replyDestinations.begin();ite != iv_replyDestinations.end();ite++) {
delete ite->second;
}
}
/* create a MessageConsumer session and register a MessageListener
to receive messages from the input queue. */
void AMQConnection::createMessageConsumer(string queueName,
MessageListener * listener,
int prefetch) {
LOGINFO(FINEST, "AMQConnection::createMessageConsumer() consumer start " + queueName);
this->iv_inputQueueName = queueName;
stringstream str;
//we add one to prefetch size to get the same behavior as the
//Spring listener used in UIMA Java SDK which calls the
//ActiveMQMessageConsumer.receive() api.
//create endpoint destination
str << queueName;
str << "?consumer.prefetchSize=" << prefetch+1 << endl;
if (this->iv_pConsumerSession != NULL || iv_pConsumer != NULL) {
LOGERROR("AMQConnection::createMessageConsumer() A session already exists. ");
ErrorInfo errInfo;
errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "A session already exists."));
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_pConsumerSession = this->iv_pConnection->createSession( Session::AUTO_ACKNOWLEDGE );
if (this->iv_pConsumerSession == NULL) {
LOGERROR("AMQConnection() createSession failed.");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConneciton() createSession failed." );
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_pInputQueue = this->iv_pConsumerSession->createQueue(this->iv_inputQueueName);
if (this->iv_pInputQueue == NULL) {
LOGERROR("AMQConnection::createMessageConsumer() createQueue failed. " + queueName);
stringstream str;
str << "AMQConnection::createMessageConsumer() createQueue failed. " << queueName << endl;
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam(str.str());
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_pConsumer = this->iv_pConsumerSession->createConsumer(this->iv_pInputQueue);
if (this->iv_pConsumer == NULL) {
LOGERROR("AMQConnection::createMessageConsumer() createConsumer failed. " + queueName);
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection::createMessageConsumer() createConsumer() failed.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
//register listener
this->iv_pConsumer->setMessageListener(listener); //caller owns listener
this->iv_pListener = listener;
LOGINFO(FINEST, "AMQConnection::createMessageConsumer() " + queueName + " successful.");
}
//caller owns the ExceptionListener
void AMQConnection::setExceptionListener(ExceptionListener * el) {
this->iv_pConnection->setExceptionListener(el);
}
void AMQConnection::onException(const CMSException & ex) {
//mark endpoint as broken.
this->iv_valid = false;
//log that connection is invalid.
LOGWARN("AMQConnection()::onException() Connection to " + iv_brokerURL
+ " may be broken. " + ex.getMessage());
}
TextMessage * AMQConnection::getTextMessage() {
if (this->iv_pTextMessage == NULL) {
LOGERROR("AMQConnection::getTextMessage() failed. ");
ErrorInfo errInfo;
errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "TextMessage could not be created."));
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
iv_pTextMessage->clearProperties();
iv_pTextMessage->clearBody();
return this->iv_pTextMessage;
}
void AMQConnection::sendMessage(string queuename) {
LOGINFO(FINEST, "AMQConnection::sendMessage() to " + queuename);
if (this->iv_pProducer == NULL) {
LOGERROR("AMQConnection::sendMessage() Invalid Message producer. ");
ErrorInfo errInfo;
errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "No message producer."));
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
//look up destination queue
cms::Destination * pDest;
map<string,Destination*>::iterator ite = this->iv_replyDestinations.find(queuename);
if (ite != this->iv_replyDestinations.end()) {
pDest = ite->second;
} else {
pDest = this->iv_pProducerSession->createQueue(queuename);
if (pDest != NULL) {
this->iv_replyDestinations[queuename] = pDest;
}
}
if (pDest == NULL) {
LOGERROR("AMQConnection::sendMessage() invalid destination " + queuename);
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection::sendMessage() invalid destination.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_pProducer->send(pDest,this->iv_pTextMessage);
//cout << "producer->send elapsed time " << (apr_time_now() - stime) << endl;
this->iv_pTextMessage->clearBody();
this->iv_pTextMessage->clearProperties();
LOGINFO(FINEST, "AMQConnection::sendMessage() successful to " + queuename);
}
void AMQConnection::sendMessage(const Destination * cmsReplyTo) {
LOGINFO(FINEST, "AMQConnection::sendMessage() to " + cmsReplyTo->toProviderString());
if (this->iv_pProducer == NULL) {
LOGERROR("AMQConnection::getTextMessage Invalid message producer. ");
ErrorInfo errInfo;
errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "A MessageProducer does not exist."));
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_pProducer->send(cmsReplyTo,this->iv_pTextMessage);
//cout << "producer->send elapsed time " << (apr_time_now() - stime) << endl;
this->iv_pTextMessage->clearBody();
this->iv_pTextMessage->clearProperties();
LOGINFO(4, "AMQConnection::sendMessage() successful to " + cmsReplyTo->toProviderString());
}
// must be called to start receiving messages
void AMQConnection::start() {
if (this->iv_pConnection != NULL) {
this->iv_pConnection->start();
LOGINFO(0,"AMQConnection::start() Start receiving messages.");
} else {
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnection::start() failed. A connection does not exist.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
}
//stops receiving messages
void AMQConnection::stop() {
LOGINFO(0,"AMQConnection::stop(). ");
if (this->iv_pConnection != NULL) {
this->iv_pConnection->stop();
} else {
LOGERROR("AMQConnection::stop() invalid connection. ");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("stop() invalid connection.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
}
//===================================================
//AMQConnectionCache
//---------------------------------------------------
AMQConnectionsCache::AMQConnectionsCache(Monitor * stats) {
iv_pMonitor = stats;
}
AMQConnectionsCache::~AMQConnectionsCache() {
map<string,AMQConnection*>::iterator ite;
for (ite = this->iv_connections.begin(); ite != this->iv_connections.end(); ite++) {
if (ite->second != NULL) {
delete ite->second;
}
}
}
//Retrieves a Connection from the cache if it
//exists or establishes a connection to the
//the specified broker and adds to the cache
//and returns the new connection.
AMQConnection * AMQConnectionsCache::getConnection(string brokerURL) {
LOGINFO(FINE,"AMQConnectionCache::getConnection() looking up connection to "
+ brokerURL );
AMQConnection * connection = NULL;
try {
map<string,AMQConnection*>::iterator ite;
ite = this->iv_connections.find(brokerURL);
if (ite == iv_connections.end()) {
LOGINFO(FINE,"AMQConnectionsCache::getConnection() create new connection to " +
brokerURL);
connection = new AMQConnection(brokerURL.c_str(), iv_pMonitor);
if (connection == NULL) {
LOGERROR("AMQConnectionCache::getConnection Could not create a endpoint connection to " +
brokerURL);
} else {
this->iv_connections[brokerURL] = connection;
}
} else {
connection = ite->second;
//if not a valid connection, reconnect
if (connection == NULL) {
LOGERROR("AMQConnectionCache::getConnection() could not connect to "
+ brokerURL);
} else {
if (!connection->isValid()) {
LOGWARN("AMQConnectionCache::getEndPoint() Existing connection invalid. Reconnecting to " + brokerURL );
delete connection;
this->iv_connections.erase(brokerURL);
connection = new AMQConnection(brokerURL.c_str(), iv_pMonitor);
if (connection == NULL) {
LOGERROR("AMQConnectionCache::getConnection() could not connect to "
+ brokerURL );
} else {
LOGWARN("AMQConnectionCache::getConnection() reconnected to " +
brokerURL);
this->iv_connections[brokerURL] = connection;
}
}
}
}
return connection;
} catch (cms::CMSException & e) {
LOGERROR("AMQConnectionCache::getConnection() " + e.getMessage());
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnectionCache::getConnection() " + e.getMessage());
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
} catch (...) {
LOGERROR("AMQConnectionCache::getConnection() Unknown Exception. ");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQConnectionCache::getConnection Unknown Exception");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
}
//===================================================
//Listener that process UIMA requests.
//---------------------------------------------------
AMQListener::AMQListener(int id,
AMQConnection * connection,
AnalysisEngine * ae,
CAS * cas,
Monitor * stats) :
iv_id(id),
iv_pConnection(connection),
iv_pEngine(ae),
iv_pCas(cas),
//iv_pMonitor(stats),
iv_replyToConnections(stats),
iv_timeLastRequestCompleted(0),
iv_busy(false),
iv_count(0),
iv_aeDescriptor(),
iv_brokerURL(connection->getBrokerURL()),
iv_inputQueueName(connection->getInputQueueName()) {
iv_pMonitor = stats;
//get AE descriptor as XML to use when processing GETMETA requests.
const AnalysisEngineMetaData & aeMetaData = iv_pEngine->getAnalysisEngineMetaData();
icu::UnicodeString xmlBuffer;
xmlBuffer.insert(0, "<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
iv_pEngine->getAnnotatorContext().getTaeSpecifier().toXMLBuffer(aeMetaData,
false, xmlBuffer);
UnicodeStringRef uref(xmlBuffer.getBuffer(), xmlBuffer.length());
this->iv_aeDescriptor = uref.asUTF8();
}
AMQListener::~AMQListener() {
}
/*
* Receive a TextMessage and examine the header properties
* to determine the type of request and payload. Processes
* one request at a time and is blocked till the requestis
* handled and the response sent.
*/
void AMQListener::onMessage( const Message* message ) {
apr_time_t startTime = apr_time_now();
apr_time_t endTime;
apr_time_t startSerialize;
apr_time_t startDeserialize;
apr_time_t startAnnotatorProcess;
apr_time_t timeToDeserializeCAS = 0;
apr_time_t timeToSerializeCAS = 0;
apr_time_t timeInAnalytic = 0;
apr_time_t timeIdle = apr_time_now() - iv_timeLastRequestCompleted;
const TextMessage* textMessage=0;
int command = 0;
stringstream astr;
astr << "*****Message#: " << ++iv_count << "*****" << endl;
LOGINFO(INFO,astr.str());
astr.seekp(0);
try {
iv_busy=true;
textMessage = dynamic_cast< const TextMessage* >( message );
if (textMessage==0) {
LOGERROR("AMQListener::onMessage() invalid pointer to TextMessage");
endTime = apr_time_now();
iv_pMonitor->processingComplete(0,false,endTime-startTime);
this->iv_timeLastRequestCompleted = apr_time_now();
return;
}
if (textMessage->propertyExists("MessageFrom")) {
LOGINFO(FINER,"Received from " + textMessage->getStringProperty("MessageFrom"));
} else {
LOGERROR("AMQListener::onMessage() ERROR MessageFrom not set.");
endTime = apr_time_now();
iv_pMonitor->processingComplete(0,false,endTime-startTime);
this->iv_timeLastRequestCompleted = apr_time_now();
return;
}
if (textMessage->propertyExists("Command") ){
command = textMessage->getIntProperty("Command");
} else {
LOGERROR("AMQListener::onMessage() Required property Command not set.");
endTime = apr_time_now();
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle, endTime-startTime,
"Required property 'Command' not set." ,true);
endTime = apr_time_now();
iv_pMonitor->processingComplete(0,false,endTime-startTime);
this->iv_timeLastRequestCompleted = apr_time_now();
return;
}
astr.seekp(0);
astr << "Received request Command: " << command ;
if (textMessage->propertyExists("CasReference")) {
astr << " CasReference " << textMessage->getStringProperty("CasReference");
}
LOGINFO(INFO,astr.str());
astr.seekp(0);
if (command == PROCESS_CAS_COMMAND) { //process CAS
//get the payload property
if (!textMessage->propertyExists("Payload")) {
LOGERROR("AMQListener::onMessage() Required property Payload not set.");
endTime = apr_time_now();
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle,endTime-startTime,
"Required property 'Command' not set." ,true);
endTime = apr_time_now();
iv_pMonitor->processingComplete(command,false,endTime-startTime);
this->iv_timeLastRequestCompleted = apr_time_now();
return;
}
int payload = textMessage->getIntProperty("Payload");
//get the text in the payload
string text = textMessage->getText();
if (text.length() == 0) {
LOGERROR("AMQListener::onMessage() There is no payload data. Nothing to process.");
text = "There is no payload data. Nothing to process.";
endTime = apr_time_now();
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle, endTime-startTime,
text ,true);
endTime=apr_time_now();
iv_pMonitor->processingComplete(command,false,endTime-startTime);
this->iv_timeLastRequestCompleted = apr_time_now();
return;
}
astr << "Payload: " << payload;
LOGINFO(FINER, astr.str());
LOGINFO(FINER, " Content: " + text);
astr.seekp(0);
//InputSource
MemBufInputSource memIS((XMLByte const *)text.c_str(),
text.length(),
"sysID");
//reset the CAS
iv_pCas->reset();
stringstream xmlstr;
//deserialize payload data into the CAS,
//call AE process method and serialize
//the CAS which will be sent with the
//response.
if (payload == XCAS_PAYLOAD) {
startDeserialize = apr_time_now();
XCASDeserializer deserializer;
deserializer.deserialize(memIS, *iv_pCas);
startAnnotatorProcess=apr_time_now();
timeToDeserializeCAS = startAnnotatorProcess-startDeserialize;
iv_pEngine->process(*iv_pCas);
startSerialize=apr_time_now();
timeInAnalytic = startSerialize - startAnnotatorProcess;
XCASWriter xcaswriter(*iv_pCas, true);
xcaswriter.write(xmlstr);
timeToSerializeCAS = apr_time_now() - startSerialize;
} else if (payload == XMI_PAYLOAD) {
//deserialize incoming xmi CAS data.
startDeserialize = apr_time_now();
XmiSerializationSharedData sharedData;
XmiDeserializer deserializer;
deserializer.deserialize(memIS,*iv_pCas,sharedData);
startAnnotatorProcess=apr_time_now();
timeToDeserializeCAS = startAnnotatorProcess-startDeserialize;
iv_pEngine->process(*iv_pCas);
startSerialize=apr_time_now();
timeInAnalytic = startSerialize - startAnnotatorProcess;
//serialize CAS
XmiWriter xmiwriter(*iv_pCas, true, &sharedData);
xmiwriter.write(xmlstr);
timeToSerializeCAS = apr_time_now() - startSerialize;
} else {
xmlstr << "Invalid Payload " << payload;
LOGERROR("AMQListener::onMessage() " + xmlstr.str());
endTime=apr_time_now();
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle,endTime-startTime,xmlstr.str() ,true);
endTime=apr_time_now();
iv_pMonitor->processingComplete(command,false,endTime-startTime);
this->iv_timeLastRequestCompleted = apr_time_now();
return;
}
//done with this CAS.
iv_pCas->reset();
//record end time
endTime = apr_time_now();
//send reply
LOGINFO(FINER,"AnalysisEngine::process() completed successfully. Sending reply.");
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle, endTime-startTime,
xmlstr.str(),false);
endTime=apr_time_now();
iv_pMonitor->processingComplete(command,true,endTime-startTime,
timeToDeserializeCAS, timeInAnalytic, timeToSerializeCAS);
LOGINFO(FINE,"Process CAS finished.");
} else if (command == GET_META_COMMAND ) { //get Meta
LOGINFO(FINE, "Process getMeta request start.");
endTime = apr_time_now();
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle, endTime-startTime,this->iv_aeDescriptor,false);
endTime=apr_time_now();
iv_pMonitor->processingComplete(command,true,endTime-startTime);
LOGINFO(FINE,"Process getMeta request finished.");
} else if (command == CPC_COMMAND ) { //CPC
LOGINFO(FINE, "Processing CollectionProcessComplete request start");
iv_pEngine->collectionProcessComplete();
endTime = apr_time_now();
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle,endTime-startTime,"CPC completed.",false);
endTime=apr_time_now();
iv_pMonitor->processingComplete(command,true,endTime-startTime);
LOGINFO(FINE, "Processing CollectionProcessComplete request finished.");
} else {
endTime = apr_time_now();
stringstream str;
str << " Invalid Request " << command << endl;
LOGERROR(str.str());
sendResponse(textMessage,
timeToDeserializeCAS,
timeToSerializeCAS,
timeInAnalytic,
timeIdle,
endTime-startTime,str.str(),true);
endTime = apr_time_now();
iv_pMonitor->processingComplete(0,false,endTime-startTime);
}
iv_busy=false;
iv_timeLastRequestCompleted = apr_time_now();
} catch (XMLException& e) {
stringstream str;
str << "AMQListener::onMessage() XMLException." << e.getMessage();
LOGERROR(str.str());
endTime = apr_time_now();
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle, endTime-startTime,str.str(),true);
endTime = apr_time_now();
iv_pMonitor->processingComplete(command,false,endTime-startTime);
iv_timeLastRequestCompleted = apr_time_now();
iv_busy=false;
} catch (CMSException& e) {
LOGERROR("AMQListener::onMessage()" + e.getMessage());
endTime = apr_time_now();
iv_pMonitor->processingComplete(0,false,endTime-startTime);
iv_timeLastRequestCompleted = apr_time_now();
iv_busy=false;
} catch (uima::Exception e) {
LOGERROR("AMQListener::onMessage() UIMA Exception " + e.asString());
endTime = apr_time_now();
sendResponse(textMessage, timeToDeserializeCAS,
timeToSerializeCAS, timeInAnalytic,
timeIdle,endTime-startTime,e.asString(),true);
endTime = apr_time_now();
iv_pMonitor->processingComplete(command,false,endTime-startTime);
iv_timeLastRequestCompleted = apr_time_now();
iv_busy=false;
} catch(...) {
LOGERROR("AMQListener::onMessage() Unknown exception ");
//TODO: log / shurdown ?}
endTime = apr_time_now();
iv_pMonitor->processingComplete(command,false,endTime-startTime);
iv_timeLastRequestCompleted = apr_time_now();
iv_busy=false;
}
iv_timeLastRequestCompleted = apr_time_now();
}
void AMQListener::sendResponse(const TextMessage * request,
apr_time_t timeToDeserialize,
apr_time_t timeToSerialize,
apr_time_t timeInAnalytic,
apr_time_t idleTime,
apr_time_t elapsedTime,
string msgContent, bool isExceptionMsg) {
//TODO retry
string serverURI;
AMQConnection * replyTo=NULL;
TextMessage * reply=NULL;
try {
const Destination * cmsReplyTo = request->getCMSReplyTo();
if (cmsReplyTo != NULL) {
reply = this->iv_pConnection->getTextMessage();
} else {
LOGINFO(2, "AMQListener::sendResponse() start");
if (!request->propertyExists("ServerURI") ) {
LOGERROR("AMQListener::sendResponse() ServerURI header property does not exist.");
return;
}
if (!request->propertyExists("MessageFrom") ) {
LOGERROR("AMQListener::sendResponse() MessageFrom header property does not exist.");
return;
}
//get handle to a connection
string tmp = request->getStringProperty("ServerURI");
LOGINFO(FINEST,"replyTo ServerURI: " + tmp);
//special handling when protocol is http.
//HTTP protocol not supported by ActiveMQ C++ client.
//replace the http broker URL with the broker URL of
//the queue this listener is attached to.
if (tmp.find("http:") != string::npos) {
serverURI=this->iv_brokerURL;
LOGINFO(FINER,"HTTP reply address: " + tmp);
} else { //send reply via tcp
size_t begpos = tmp.find("tcp:",0);
tmp = tmp.substr(begpos);
size_t endpos = tmp.find_first_of(",");
if (begpos == string::npos) {
LOGERROR("AMQListener::sendResponse() Could not find tcp URL in ServerURI header property.");
return;
}
if (endpos == string::npos) {
serverURI = tmp;
} else {
serverURI = tmp.substr(0, endpos);
}
}
LOGINFO(FINER,"ReplyTo BrokerURL " + serverURI);
//look up the endpoint connection
replyTo = this->iv_replyToConnections.getConnection(serverURI);
if (replyTo == NULL) {
LOGERROR("Could not get a connection to " + serverURI);
return;
}
//get message object
reply = replyTo->getTextMessage();
}
if (reply == NULL) {
LOGERROR("AMQListener::sendResponse() invalid textMessage object " );
return;
}
//construct the reply message
reply->setStringProperty("MessageFrom", this->iv_pConnection->getInputQueueName() );
reply->setStringProperty("ServerURI", this->iv_brokerURL);
reply->setIntProperty("MessageType",RESPONSE);
reply->setLongProperty("TimeInService", elapsedTime*1000);
reply->setLongProperty("IdleTime", idleTime*1000);
if (request->propertyExists("CasReference") ) {
reply->setStringProperty("CasReference", request->getStringProperty("CasReference"));
}
if (request->propertyExists("Command") ) {
reply->setIntProperty("Command", request->getIntProperty("Command"));
if (request->getIntProperty("Command") == PROCESS_CAS_COMMAND) {
reply->setLongProperty("TimeToSerializeCAS", timeToSerialize*1000);
reply->setLongProperty("TimeToDeserializeCAS", timeToDeserialize*1000);
reply->setLongProperty("TimeInAnalytic", timeInAnalytic*1000);
reply->setLongProperty("TimeInProcessCAS", timeInAnalytic*1000);
}
}
if (isExceptionMsg) {
reply->setIntProperty("Payload",EXC_PAYLOAD);
} else {
if (request->propertyExists("Payload") ) {
reply->setIntProperty("Payload",request->getIntProperty("Payload"));
} else {
reply->setIntProperty("Payload",NO_PAYLOAD);
}
}
//cargo
reply->setText(msgContent);
//log the reply message content
stringstream str;
str << "Sending Reply Command: " << reply->getIntProperty("Command") << " MessageType: " << reply->getIntProperty("MessageType") << " ";
if (reply->propertyExists("CasReference") ) {
str << "CasReference: " << reply->getStringProperty("CasReference");
}
LOGINFO(INFO,str.str());
if (cmsReplyTo != NULL) {
str << " to " << cmsReplyTo->toProviderString();
} else {
str << " to " << request->getStringProperty("MessageFrom")
<< " at " << serverURI;
}
str << " Message text: " << msgContent;
LOGINFO(FINEST,"PRINT Reply message:\n" + str.str());
//send
if (cmsReplyTo != NULL) {
//cout << "cmsReplyTo=" << cmsReplyTo->toProviderString() << endl;
iv_pConnection->sendMessage(cmsReplyTo);
} else {
replyTo->sendMessage(request->getStringProperty("MessageFrom"));
}
LOGINFO(FINER,"AMQListener::sendResponse DONE");
} catch (CMSException& ex ) {
LOGERROR("AMQListener::onMessage()" + ex.getMessage());
} catch (...) {
LOGERROR("AMQListener::onMessage() UnExpected error sending reply.");
}
}
//===================================================
//AMQAnalysisEngineService
//---------------------------------------------------
AMQAnalysisEngineService::~AMQAnalysisEngineService() {
cleanup();
}
AMQAnalysisEngineService::AMQAnalysisEngineService(ServiceParameters & desc,
Monitor * stats) :
//iv_pMonitor(stats),
iv_numInstances(desc.getNumberOfInstances()),
iv_brokerURL(desc.getBrokerURL()),
iv_inputQueueName(desc.getQueueName()),
iv_aeDescriptor(desc.getAEDescriptor()),
iv_prefetchSize(desc.getPrefetchSize()),
iv_initialFSHeapSize(desc.getInitialFSHeapSize()),
iv_vecpConnections(),
iv_vecpAnalysisEngines(),
iv_vecpCas(),
iv_listeners() {
try {
iv_pMonitor = stats;
initialize();
} catch (CMSException & e) {
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam(e.getMessage());
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
}
void AMQAnalysisEngineService::initialize() {
try {
//create a connection for each instance
for (int i=0; i < iv_numInstances; i++) {
//create a connection to MQ Broker
AMQConnection * newConnection = new AMQConnection(this->iv_brokerURL.c_str(), iv_pMonitor);
if (newConnection == NULL) {
LOGERROR("AMQAnalysisEngineService::initialize() Could not create ActiveMQ endpoint connection.");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQAnalysisEngineService::initialize() Failed to connect to broker.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
newConnection->setExceptionListener(this);
this->iv_vecpConnections.push_back(newConnection);
}
if (!uima::ResourceManager::hasInstance()) {
uima::ResourceManager::createInstance("ActiveMQAnalysisEngineService");
}
ErrorInfo errInfo;
UnicodeString ustr(this->iv_aeDescriptor.c_str());
UnicodeString ufn = ResourceManager::resolveFilename(ustr,ustr);
//create a AnalysisEngine and CAS for each instance
for (int i=0; i < iv_numInstances; i++) {
//create AE
AnalysisEngine * pEngine = uima::TextAnalysisEngine::createTextAnalysisEngine((UnicodeStringRef(ufn).asUTF8().c_str()),
errInfo);
if (pEngine) {
LOGINFO(0,"AMQAnalysisEngineService::initialize() AnalysisEngine initialization successful.");
} else {
LOGERROR("AMQAnalysisEngineService::initializer() could not create AE" + errInfo.asString());
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQListener::initialize() create AE failed. " + errInfo.getMessage().asString() );
ErrorInfo errInfo;
errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE),
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_vecpAnalysisEngines.push_back(pEngine);
//initial FSHeap size
if (this->iv_initialFSHeapSize > 0) {
pEngine->getAnnotatorContext().
assignValue(UIMA_ENGINE_CONFIG_OPTION_FSHEAP_PAGESIZE,this->iv_initialFSHeapSize);
}
//create CAS
CAS * cas = pEngine->newCAS();
if (cas == NULL) {
LOGERROR("AMQAnalysisEngineService::initialize() Could not create CAS");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQListener::initialize() create CAS failed.");
ErrorInfo errInfo;
errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE),
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_vecpCas.push_back(cas);
}
//create listeners and register these
for (int i=0; i < iv_numInstances; i++) {
AMQListener * newListener = new AMQListener(i,iv_vecpConnections.at(i),
iv_vecpAnalysisEngines.at(i), iv_vecpCas.at(i),
iv_pMonitor);
if (newListener == NULL) {
LOGERROR("AMQAnalysisEngineService::initialize() Could not create listener.");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AnalysisEngineServcie::initialize() Could not create listener.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
this->iv_listeners[i] = newListener;
//create MessageConsumer session and register Listener
this->iv_vecpConnections.at(i)->createMessageConsumer(iv_inputQueueName,
newListener,this->iv_prefetchSize);
}
iv_pMonitor->setNumberOfInstances(iv_numInstances);
} catch (uima::Exception e) {
cout << __LINE__ << "got a uima exception " << endl;
LOGERROR("AMQAnalysisEngineService::initialize() " + e.asString());
throw e;
}
}
void AMQAnalysisEngineService::setTraceLevel(int level) {
if (level < 0) {
uimacpp_ee_tracelevel=NONE;
} else if (level == 0) {
uimacpp_ee_tracelevel = INFO;
} else if (level == 1) {
uimacpp_ee_tracelevel = FINE;
} else if (level == 2) {
uimacpp_ee_tracelevel = FINER;
} else if (level > 2) {
uimacpp_ee_tracelevel = FINEST;
} else {
uimacpp_ee_tracelevel = INFO;
}
cout << "tracelevel=" << uimacpp_ee_tracelevel << endl;
}
void AMQAnalysisEngineService::onException( const CMSException& ex ) {
LOGERROR("CMS Exception occured. Shutting down the service." + ex.getMessage());
cerr << "Broken connection. Stopped receiving messages." << endl;
stop();
this->iv_pMonitor->shutdown();
}
void AMQAnalysisEngineService::onMessage( const Message* message ) {
LOGINFO(0,"AMQAnalsisEngineService::onMessage() Got a message.");
}
void AMQAnalysisEngineService::start() {
for (size_t i=0; i < iv_vecpConnections.size(); i++) {
cout << "Starting listener " << i << endl;
AMQConnection * connection = iv_vecpConnections.at(i);
if (connection != NULL) {
connection->start();
} else {
LOGERROR("AMQAnalysisServiceEngine::start() Invalid connection object.");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQAnalysisServiceEngine::start() Connection does not exist.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
}
//update the start time
iv_pMonitor->setStartTime();
}
int AMQAnalysisEngineService::stop() {
//TODO: let listeners finish processing first
//stop messages notification
for (size_t i=0; i < iv_vecpConnections.size(); i++) {
AMQConnection * connection = iv_vecpConnections.at(i);
if (connection != NULL) {
connection->stop();
} else {
LOGERROR("AMQAnalysisEngineService::stop() Connection object is NULL.");
ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR);
msg.addParam("AMQAnalysisServiceEngine::start() Connection does not exist.");
ErrorInfo errInfo;
errInfo.setMessage(msg);
UIMA_EXC_THROW_NEW(uima::Uima_runtime_error,
UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE,
errInfo.getMessage(),
errInfo.getMessage().getMessageID(),
ErrorInfo::unrecoverable);
}
}
return 0;
}
void AMQAnalysisEngineService::cleanup() {
// Destroy resources.
try {
//cout << "num consumerConnections " << consumerConnections.size() << endl;
for (size_t i=0; i < iv_vecpConnections.size(); i++) {
//cout << "deleting consumerConnection " << i << endl;
if (iv_vecpConnections.at(i) != NULL) {
iv_vecpConnections.at(i)->stop();
delete iv_vecpConnections.at(i);
}
if (iv_vecpAnalysisEngines.at(i) != NULL) {
delete iv_vecpAnalysisEngines.at(i);
}
if (iv_vecpCas.at(i) != NULL) {
delete iv_vecpCas.at(i);
}
}
iv_vecpAnalysisEngines.clear();
iv_vecpCas.clear();
iv_vecpConnections.clear();
map<int, AMQListener*>::iterator ite;
for (ite= iv_listeners.begin();ite != iv_listeners.end();ite++) {
delete ite->second;
}
iv_listeners.clear();
} catch (CMSException& e) {
LOGERROR("AMQAnalysisEngineService::cleanup() " + e.getMessage());
}
}
//===================================================
//CommonUtils
//---------------------------------------------------
void CommonUtils::logError(string msg) {
iv_pMonitor->logError(msg);
//cerr << "ERROR: " << msg << endl;
}
void CommonUtils::logWarning(string msg) {
iv_pMonitor->logWarning(msg);
//cout << "WARN: " << msg << endl;
}
void CommonUtils::logMessage(string msg) {
iv_pMonitor->logMessage(msg);
//cout << "INFO: " << msg << endl;
}