| /* |
| |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| |
| * Analysis Engine service wrapper implementation based on |
| * Active MQ C++ client. |
| */ |
| |
| #include "ActiveMQAnalysisEngineService.hpp" |
| #include "deployCppService.hpp" |
| |
| #include <activemq/core/ActiveMQConnectionFactory.h> |
| #include <activemq/core/ActiveMQProducer.h> |
| #include <activemq/core/ActiveMQConstants.h> |
| #include <activemq/core/ActiveMQConnection.h> |
| |
| #include "uima/xmlwriter.hpp" |
| #include "uima/xcasdeserializer.hpp" |
| #include "uima/xmiwriter.hpp" |
| #include "uima/xmideserializer.hpp" |
| #include "uima/xmlerror_handler.hpp" |
| |
| using namespace activemq::core; |
| using namespace uima; |
| |
| enum traceLevels {NONE, INFO, FINE, FINER, FINEST }; |
| traceLevels uimacpp_ee_tracelevel=INFO; |
| #define MSGHEADER apr_time_now() << " ThreadId: " << apr_os_thread_current() << __FILE__ << ":" << __LINE__ |
| #define FORMATMSG(x) stringstream lstr; lstr << MSGHEADER << " " << x; |
| |
| #define LOGINFO(n,x) { if (n > uimacpp_ee_tracelevel) {} else { FORMATMSG(x); logMessage(lstr.str().c_str()); } } |
| #define LOGERROR(x){ FORMATMSG(x); logError(lstr.str().c_str()); } |
| #define LOGWARN(x) { FORMATMSG(x); logWarning(lstr.str().c_str());} |
| |
| static void listener_signal_handler(int signum) { |
| stringstream str; |
| str << __FILE__ << __LINE__ << " Received Signal: " << signum; |
| cerr << str.str() << endl; |
| } |
| |
| |
| //================================================================= |
| // |
| //Message processing function executed by message handling threads. |
| //----------------------------------------------------------------- |
| static void* APR_THREAD_FUNC handleMessages(apr_thread_t *thd, void *data) { |
| //cout << __FILE__ << __LINE__ << Thread::getId() << " handleMessages start " << endl; |
| AMQListener * handler = (AMQListener*) data; |
| handler->receiveAndProcessMessages(thd); |
| apr_thread_exit(thd, APR_SUCCESS); |
| return NULL; |
| } |
| |
| |
| //=================================================== |
| //AMQConnection |
| //--------------------------------------------------- |
| ConnectionFactory * AMQConnection::createConnectionFactory(ServiceParameters & params) { |
| |
| //encode prefetch size in the broker url as there is no api to set it. |
| stringstream str; |
| str << params.getBrokerURL(); |
| if (str.str().find("?") == std::string::npos) |
| str << "?jms.prefetchPolicy.queuePrefetch=" << params.getPrefetchSize() ; |
| else |
| str << "&jms.prefetchPolicy.queuePrefetch=" << params.getPrefetchSize() ; |
| |
| //str << params.getBrokerURL() << "?consumer.prefetchSize=" << params.getPrefetchSize() << endl; |
| //str << "tcp://127.0.0.1:61616"; |
| ConnectionFactory * pConnFactory = ConnectionFactory::createCMSConnectionFactory(params.getBrokerURL()); |
| cout << "AMQConnection()::createConnectionFactory " << str.str() << endl; |
| return pConnFactory; |
| }; |
| |
| AMQConnection::AMQConnection( ConnectionFactory * connFact, |
| string brokerURL, Monitor * pMonitor, int id) : |
| iv_id(id), |
| iv_pConnFact(connFact), |
| //iv_pMonitor(0), |
| //iv_brokerURL(((ActiveMQConnectionFactory*)connFact)->getBrokerURL()), |
| iv_brokerURL(brokerURL), |
| iv_pConnection(0), |
| iv_pConsumerSession(0), |
| iv_pConsumer(0), |
| iv_inputQueueName(), |
| iv_pInputQueue(0), |
| iv_pListener(0), |
| iv_pProducerSession(0), |
| iv_pProducer(0), |
| iv_pReplyMessage(0), |
| iv_replyDestinations(), |
| iv_valid(false), |
| iv_started(false), |
| iv_reconnecting(false) { |
| int pos = iv_brokerURL.find_first_of("?"); |
| if (pos != string::npos) { |
| iv_brokerURL = iv_brokerURL.substr(0,pos); |
| } |
| iv_pMonitor = pMonitor; |
| initialize(); |
| } |
| void AMQConnection::initialize( ) { |
| |
| try { |
| |
| LOGINFO(INFO, "AMQConnection() connecting to " + iv_brokerURL); |
| |
| // Create a Connection |
| if (iv_pConnFact == NULL) { |
| LOGERROR("AMQConnection() invalid connection factory"); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQConnection() invalid create connection factory"); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| |
| bool retrying = false; |
| while (iv_pConnection == NULL) { |
| try { |
| iv_pConnection = iv_pConnFact->createConnection(); |
| if (iv_pConnection == NULL ) { |
| if (!retrying) { |
| stringstream str; |
| str << " AMQConnection::initialize() Connection object is null. Failed to connect to " << iv_brokerURL |
| << ". Retrying..." << endl; |
| LOGWARN(str.str()); |
| retrying = true; |
| apr_sleep(30000000); //wait 30 seconds to reconnect |
| } |
| } |
| } catch (cms::CMSException& e) { |
| if (!retrying) { |
| stringstream str; |
| str << "AMQConnection::initialize() Failed to connect to " << iv_brokerURL |
| << e.getMessage() << ". Retrying..." << endl; |
| cout << e.getMessage() << endl; |
| LOGWARN(str.str()); |
| retrying = true; |
| apr_sleep(30000000); //wait 30 seconds to reconnect |
| } |
| } |
| } |
| |
| if (retrying) { |
| LOGWARN("AMQConnection::initialize() Connected to " + iv_brokerURL); |
| } |
| |
| //default exception listener |
| this->iv_pConnection->setExceptionListener(this); |
| ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( this->iv_pConnection ); |
| if( amqConnection != NULL ) { |
| amqConnection->addTransportListener( this ); |
| } |
| // Create a Producer Session |
| LOGINFO(FINEST,"AMQConnection() create Producer Session " + iv_brokerURL); |
| this->iv_pProducerSession = this->iv_pConnection->createSession( Session::AUTO_ACKNOWLEDGE ); |
| |
| if (this->iv_pProducerSession == NULL) { |
| LOGERROR("AMQConnection() createSession() failed."); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQConnection() createSession failed." ); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| this->iv_pProducer = this->iv_pProducerSession->createProducer(NULL); |
| if (this->iv_pProducer == NULL) { |
| LOGERROR("AMQConnection() could not create MessageProducer "); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQConnection() create MessageProducer failed."); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| this->iv_pProducer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); |
| |
| //TODO set sendTimeout ? |
| ////((ActiveMQProducer*)this->iv_pProducer)->setSendTimeout(3000); |
| |
| //create TextMessage |
| this->iv_pReplyMessage = this->iv_pProducerSession->createTextMessage(); |
| if (this->iv_pReplyMessage == NULL) { |
| LOGERROR("AMQConnection() create textMessage failed. "); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQConnection() failed to create message."); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| this->iv_valid = true; |
| LOGINFO(0, "AMQConnection() connected successfully to " + iv_brokerURL); |
| } catch (cms::CMSException& e) { |
| LOGERROR("AMQConnection(): " + e.getMessage()); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam(e.getMessage()); |
| ErrorInfo errInfo; |
| errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE); |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } catch (...) { |
| cout << "... " << endl; |
| LOGERROR("AMQConnection() failed to create a connection"); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQConnection() create connection failed"); |
| ErrorInfo errInfo; |
| errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE); |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| } |
| |
| /* destructor */ |
| AMQConnection::~AMQConnection() { |
| if (this->iv_pConsumer != NULL) { |
| delete this->iv_pConsumer; |
| this->iv_pConsumer = NULL; |
| } |
| if (this->iv_pProducer != NULL) { |
| delete this->iv_pProducer; |
| this->iv_pProducer = NULL; |
| } |
| if (this->iv_pConsumerSession != NULL) { |
| delete this->iv_pConsumerSession; |
| this->iv_pConsumerSession = NULL; |
| } |
| if (this->iv_pProducerSession != NULL) { |
| delete this->iv_pProducerSession; |
| this->iv_pProducerSession = NULL; |
| } |
| if (this->iv_pInputQueue != NULL) { |
| delete this->iv_pInputQueue; |
| this->iv_pInputQueue=NULL; |
| } |
| if (this->iv_pConnection != NULL) { |
| this->iv_pConnection->close(); |
| delete this->iv_pConnection; |
| this->iv_pConnection = NULL; |
| } |
| if (this->iv_pReplyMessage != NULL) { |
| delete this->iv_pReplyMessage; |
| this->iv_pReplyMessage=NULL; |
| } |
| |
| //destinations |
| map<string, cms::Destination*>::iterator ite; |
| for (ite= iv_replyDestinations.begin();ite != iv_replyDestinations.end();ite++) { |
| delete ite->second; |
| } |
| } |
| |
| /* create a MessageConsumer session and register a MessageListener |
| to receive messages from the input queue. */ |
| void AMQConnection::createMessageConsumer(string queueName, string selector, int prefetchSize) { |
| LOGINFO(FINEST, "AMQConnection::createMessageConsumer() consumer start " + queueName); |
| this->iv_inputQueueName = queueName; |
| stringstream str; |
| str << queueName; |
| |
| if (this->iv_pConsumerSession != NULL || iv_pConsumer != NULL) { |
| LOGERROR("AMQConnection::createMessageConsumer() A session already exists. "); |
| ErrorInfo errInfo; |
| errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "A session already exists.")); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| |
| this->iv_pConsumerSession = this->iv_pConnection->createSession( Session::AUTO_ACKNOWLEDGE ); |
| if (this->iv_pConsumerSession == NULL) { |
| LOGERROR("AMQConnection() createSession failed."); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQConneciton() createSession failed." ); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| |
| this->iv_pInputQueue = this->iv_pConsumerSession->createQueue(this->iv_inputQueueName); |
| |
| if (this->iv_pInputQueue == NULL) { |
| LOGERROR("AMQConnection::createMessageConsumer() createQueue failed. " + queueName); |
| stringstream str; |
| str << "AMQConnection::createMessageConsumer() createQueue failed. " << queueName << endl; |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam(str.str()); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| |
| iv_selector = selector; |
| this->iv_prefetchSize = prefetchSize; |
| if (selector.length() > 0) { |
| this->iv_pConsumer = this->iv_pConsumerSession->createConsumer(this->iv_pInputQueue, iv_selector); |
| } else { |
| this->iv_pConsumer = this->iv_pConsumerSession->createConsumer(this->iv_pInputQueue); |
| } |
| |
| if (this->iv_pConsumer == NULL) { |
| LOGERROR("AMQConnection::createMessageConsumer() createConsumer failed. " + queueName); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQConnection::createMessageConsumer() createConsumer() failed."); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| //set prefetch size |
| //no api to set prefetchsize so encode in broker url; |
| |
| LOGINFO(FINEST, "AMQConnection::createMessageConsumer() " + queueName + " successful."); |
| } |
| |
| |
| void AMQConnection::onException(const CMSException & ex) { |
| //mark endpoint as broken. |
| this->iv_valid = false; |
| //log that connection is invalid. |
| stringstream str; |
| str << "AMQConnection()::onException() Connection to " |
| << iv_brokerURL << " is broken. Reconnecting ... " << ex.getMessage() << endl; |
| LOGWARN(str.str()); |
| } |
| void AMQConnection::transportInterrupted() { |
| std::cout << "The Connection's Transport has been Interrupted." << std::endl; |
| stringstream str; |
| str << "AMQConnection()::transportInterrupted() Connection to " |
| << iv_brokerURL << " has been interrupted. Reconnecting ... " << endl; |
| LOGWARN(str.str()); |
| } |
| |
| void AMQConnection::transportResumed() { |
| std::cout << "The Connection's Transport has been Restored." << std::endl; |
| stringstream str; |
| str << "AMQConnection()::transportResumed() Reconnected to " |
| << iv_brokerURL << endl; |
| LOGWARN(str.str()); |
| } |
| |
| TextMessage * AMQConnection::getTextMessage() { |
| if (this->iv_pReplyMessage == NULL) { |
| LOGERROR("AMQConnection::getTextMessage() failed. "); |
| ErrorInfo errInfo; |
| errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "TextMessage could not be created.")); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| iv_pReplyMessage->clearProperties(); |
| iv_pReplyMessage->clearBody(); |
| return this->iv_pReplyMessage; |
| } |
| |
| void AMQConnection::sendMessage(string queuename) { |
| LOGINFO(FINEST, "AMQConnection::sendMessage() to " + queuename); |
| if (this->iv_pProducer == NULL) { |
| LOGERROR("AMQConnection::sendMessage() Invalid Message producer. "); |
| ErrorInfo errInfo; |
| errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "No message producer.")); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| //look up destination queue |
| cms::Destination * pDest; |
| map<string,Destination*>::iterator ite = this->iv_replyDestinations.find(queuename); |
| if (ite != this->iv_replyDestinations.end()) { |
| pDest = ite->second; |
| } else { |
| pDest = this->iv_pProducerSession->createQueue(queuename); |
| if (pDest != NULL) { |
| this->iv_replyDestinations[queuename] = pDest; |
| } |
| } |
| |
| if (pDest == NULL) { |
| LOGERROR("AMQConnection::sendMessage() invalid destination " + queuename); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQConnection::sendMessage() invalid destination."); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| |
| this->iv_pProducer->send(pDest,this->iv_pReplyMessage); |
| |
| //cout << "producer->send elapsed time " << (apr_time_now() - stime) << endl; |
| this->iv_pReplyMessage->clearBody(); |
| this->iv_pReplyMessage->clearProperties(); |
| LOGINFO(FINEST, "AMQConnection::sendMessage() successful to " + queuename); |
| } |
| |
| void AMQConnection::sendMessage(const Destination * cmsReplyTo) { |
| LOGINFO(FINEST, "AMQConnection::sendMessage() to " + |
| ((Queue*)cmsReplyTo)->getQueueName() ); |
| //LOGINFO(FINEST, "AMQConnection::sendMessage() to " + cmsReplyTo->toProviderString()); |
| if (this->iv_pProducer == NULL) { |
| LOGERROR("AMQConnection::getTextMessage Invalid message producer. "); |
| ErrorInfo errInfo; |
| errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "A MessageProducer does not exist.")); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| this->iv_pProducer->send(cmsReplyTo,this->iv_pReplyMessage); |
| //cout << "producer->send elapsed time " << (apr_time_now() - stime) << endl; |
| this->iv_pReplyMessage->clearBody(); |
| this->iv_pReplyMessage->clearProperties(); |
| LOGINFO(4, "AMQConnection::sendMessage() successful to " + ((Queue*)cmsReplyTo)->getQueueName()); |
| //LOGINFO(4, "AMQConnection::sendMessage() successful to " + cmsReplyTo->toProviderString()); |
| } |
| |
| // must be called to start receiving messages |
| void AMQConnection::start() { |
| if (this->iv_pConnection != NULL) { |
| this->iv_pConnection->start(); |
| LOGINFO(0,"AMQConnection::start() Start receiving messages."); |
| } else { |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQConnection::start() failed. A connection does not exist."); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| } |
| |
| //stops receiving messages |
| void AMQConnection::stop() { |
| if (this->iv_pConnection != NULL) { |
| this->iv_pConnection->stop(); |
| } else { |
| LOGERROR("AMQConnection::stop() invalid connection. "); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("stop() invalid connection."); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| } |
| |
| void AMQConnection::resetBeforeReconnect() { |
| this->iv_reconnecting = true; |
| |
| if (this->iv_pConsumer != NULL) { |
| delete this->iv_pConsumer; |
| this->iv_pConsumer = NULL; |
| } |
| if (this->iv_pProducer != NULL) { |
| delete this->iv_pProducer; |
| this->iv_pProducer = NULL; |
| } |
| if (this->iv_pConsumerSession != NULL) { |
| delete this->iv_pConsumerSession; |
| this->iv_pConsumerSession = NULL; |
| } |
| if (this->iv_pProducerSession != NULL) { |
| delete this->iv_pProducerSession; |
| this->iv_pProducerSession = NULL; |
| } |
| if (this->iv_pConnection != NULL) { |
| delete this->iv_pConnection; |
| this->iv_pConnection = NULL; |
| } |
| |
| this->iv_started = false; |
| this->iv_valid = false; |
| |
| if (this->iv_pReplyMessage != NULL) { |
| delete this->iv_pReplyMessage; |
| this->iv_pReplyMessage=NULL; |
| } |
| |
| //destinations |
| map<string, cms::Destination*>::iterator ite; |
| for (ite= iv_replyDestinations.begin();ite != iv_replyDestinations.end();ite++) { |
| delete ite->second; |
| } |
| } |
| |
| void AMQConnection::reconnect() { |
| try { |
| this->iv_reconnecting = true; |
| this->iv_pMonitor->reconnecting(iv_id); |
| apr_sleep(30000000); //wait 30 seconds to reconnect |
| //cout << "AMQConnection::reconnect() calling initialize >>>" << endl; |
| this->initialize(); |
| this->createMessageConsumer(this->iv_inputQueueName,this->iv_selector, this->iv_prefetchSize); |
| this->start(); |
| stringstream str; |
| str << "AMQConnection::reconnect() Successfully reconnected to >>> " << iv_brokerURL << endl; |
| LOGWARN(str.str()); |
| this->iv_pMonitor->reconnectionSuccess(iv_id); |
| this->iv_reconnecting = false; |
| } catch (uima::Exception e) { |
| cerr << "AMQConnection::reconnect() " << e << endl; |
| resetBeforeReconnect(); |
| } catch (...) { |
| cerr << "AMQConnection::reconnect() catch ... " << endl; |
| resetBeforeReconnect(); |
| } |
| |
| } |
| |
| |
| Message * AMQConnection::receive(const int delay) { |
| if (iv_valid) { |
| return iv_pConsumer->receive(delay); |
| } else { |
| this->resetBeforeReconnect(); |
| this->reconnect(); |
| this->iv_valid = true; |
| return NULL; |
| } |
| } |
| |
| //=================================================== |
| //AMQConnectionCache |
| //--------------------------------------------------- |
| AMQConnectionsCache::AMQConnectionsCache(ConnectionFactory * pConnFact,Monitor * stats) { |
| iv_pConnFact = pConnFact; |
| iv_pMonitor = stats; |
| } |
| |
| AMQConnectionsCache::~AMQConnectionsCache() { |
| map<string,AMQConnection*>::iterator ite; |
| for (ite = this->iv_connections.begin(); ite != this->iv_connections.end(); ite++) { |
| if (ite->second != NULL) { |
| delete ite->second; |
| } |
| } |
| } |
| |
| //Retrieves a Connection from the cache if it |
| //exists or establishes a connection to the |
| //the specified broker and adds to the cache |
| //and returns the new connection. |
| AMQConnection * AMQConnectionsCache::getConnection(string brokerURL) { |
| LOGINFO(FINE,"AMQConnectionCache::getConnection() looking up connection to " |
| + brokerURL ); |
| AMQConnection * connection = NULL; |
| |
| try { |
| map<string,AMQConnection*>::iterator ite; |
| ite = this->iv_connections.find(brokerURL); |
| if (ite == iv_connections.end()) { |
| LOGINFO(FINE,"AMQConnectionsCache::getConnection() create new connection to " + |
| brokerURL); |
| connection = new AMQConnection(iv_pConnFact, brokerURL, iv_pMonitor, iv_connections.size()); |
| if (connection == NULL) { |
| LOGERROR("AMQConnectionCache::getConnection Could not create a endpoint connection to " + |
| brokerURL); |
| } else { |
| this->iv_connections[brokerURL] = connection; |
| } |
| } else { |
| connection = ite->second; |
| //if not a valid connection, reconnect |
| if (connection == NULL) { |
| LOGERROR("AMQConnectionCache::getConnection() could not connect to " |
| + brokerURL); |
| } else { |
| if (!connection->isValid()) { |
| LOGWARN("AMQConnectionCache::getEndPoint() Existing connection invalid. Reconnecting to " + brokerURL ); |
| delete connection; |
| this->iv_connections.erase(brokerURL); |
| connection = new AMQConnection(iv_pConnFact, brokerURL, iv_pMonitor, iv_connections.size()); |
| if (connection == NULL) { |
| LOGERROR("AMQConnectionCache::getConnection() could not connect to " |
| + brokerURL ); |
| } else { |
| LOGWARN("AMQConnectionCache::getConnection() reconnected to " + |
| brokerURL); |
| this->iv_connections[brokerURL] = connection; |
| } |
| } |
| } |
| } |
| return connection; |
| } catch (cms::CMSException & e) { |
| LOGERROR("AMQConnectionCache::getConnection() " + e.getMessage()); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQConnectionCache::getConnection() " + e.getMessage()); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| |
| } catch (...) { |
| LOGERROR("AMQConnectionCache::getConnection() Unknown Exception. "); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQConnectionCache::getConnection Unknown Exception"); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| } |
| |
| |
| //=================================================== |
| //Listener that process UIMA requests. |
| //--------------------------------------------------- |
| |
| AMQListener::AMQListener(int id, |
| AMQConnection * connection, |
| AnalysisEngine * ae, |
| CAS * cas, |
| Monitor * stats) : |
| iv_id(id), |
| iv_pConnection(connection), |
| iv_pEngine(ae), |
| iv_pCas(cas), |
| //iv_pMonitor(stats), |
| iv_replyToConnections(connection->iv_pConnFact,stats), |
| iv_timeLastRequestCompleted(0), |
| iv_busy(false), |
| iv_stopProcessing(false), |
| iv_count(0), |
| iv_aeDescriptor(), |
| iv_brokerURL(connection->getBrokerURL()), |
| iv_inputQueueName(connection->getInputQueueName()) { |
| |
| iv_pMonitor = stats; |
| getMetaData(iv_pEngine); |
| |
| } |
| |
| AMQListener::AMQListener(int id, |
| AMQConnection * connection, |
| AnalysisEngine * ae, |
| Monitor * stats) : |
| iv_id(id), |
| iv_pConnection(connection), |
| iv_pEngine(ae), |
| //iv_pMonitor(stats), |
| iv_replyToConnections(connection->iv_pConnFact, stats), |
| iv_timeLastRequestCompleted(0), |
| iv_busy(false), |
| iv_count(0), |
| iv_aeDescriptor(), |
| iv_brokerURL(connection->getBrokerURL()), |
| iv_inputQueueName(connection->getInputQueueName()) { |
| |
| iv_pMonitor = stats; |
| getMetaData(iv_pEngine); |
| |
| } |
| |
| AMQListener::~AMQListener() { |
| |
| } |
| |
| //extract AE descriptor as XML to use when processing GETMETA requests. |
| void AMQListener::getMetaData(AnalysisEngine * pEngine) { |
| if (pEngine == NULL) { |
| LOGERROR("AMQListener::getMetaData() Invalid handle to AnalysisEngine."); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQListener::getMetaData() Invalid handle to AnalysisEngine."); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| |
| CAS * mcas = pEngine->newCAS(); |
| delete mcas; |
| const AnalysisEngineMetaData & aeMetaData = pEngine->getAnalysisEngineMetaData(); |
| icu::UnicodeString xmlBuffer; |
| xmlBuffer.insert(0, "<?xml version=\"1.0\"?>"); |
| pEngine->getAnnotatorContext().getTaeSpecifier().toXMLBuffer(aeMetaData, |
| false, xmlBuffer); |
| UnicodeStringRef uref(xmlBuffer.getBuffer(), xmlBuffer.length()); |
| |
| this->iv_aeDescriptor = uref.asUTF8(); |
| //cout << this->iv_aeDescriptor << endl; |
| } |
| |
| |
| |
| bool AMQListener::validateRequest(const TextMessage * textMessage, string & errmsg) { |
| bool valid = true; |
| |
| if ( textMessage->getCMSReplyTo() == NULL) |
| LOGWARN("AMQListener::validateRequest() JMSReplyTo is not set. " ); |
| |
| if (textMessage->getCMSReplyTo() == NULL && |
| !textMessage->propertyExists("MessageFrom") ) { |
| errmsg = "Reply to destination not set."; |
| return false; |
| } |
| |
| if (!textMessage->propertyExists("Command") ) { |
| errmsg = "Required property 'Command' is not set."; |
| LOGERROR("AMQListener::validateRequest " + errmsg); |
| valid = false; |
| } else { |
| int command = textMessage->getIntProperty("Command"); |
| if (command != PROCESS_CAS_COMMAND && |
| command != GET_META_COMMAND && |
| command != CPC_COMMAND) { |
| stringstream str; |
| str << "Unexpected value for 'Command' " << command; |
| errmsg = str.str(); |
| LOGERROR("AMQListener::validateRequest " + errmsg); |
| valid=false; |
| } else if (command == CPC_COMMAND) { |
| if (iv_pEngine == NULL) { |
| errmsg = "CPC request received but AnalysisEngine not available."; |
| LOGERROR("AMQListener::validateRequest() " + errmsg); |
| valid = false; |
| } |
| } else if (command == PROCESS_CAS_COMMAND) { |
| if (iv_pCas == NULL || iv_pEngine == NULL) { |
| errmsg = "Process Cas request but an AnalysisEngine and CAS not available."; |
| LOGERROR("AMQListener::validateRequest() " + errmsg); |
| valid = false; |
| } |
| if (!textMessage->propertyExists("Payload") ) { |
| errmsg = "Required property 'Payload' is not set."; |
| LOGERROR("AMQListener::validateRequest " + errmsg); |
| valid = false; |
| } else { |
| int payload = textMessage->getIntProperty("Payload"); |
| if (payload != XCAS_PAYLOAD && payload != XMI_PAYLOAD) { |
| stringstream str; |
| str << "Unexpected value for 'Payload' " << payload; |
| errmsg = str.str(); |
| LOGERROR("AMQListener::validateRequest " + errmsg); |
| valid=false; |
| } |
| |
| try { |
| string text = textMessage->getText().c_str(); |
| if (text.length() == 0) { |
| errmsg = "There is no payload data. Nothing to process."; |
| LOGERROR("AMQListener::validateRequest " + errmsg); |
| valid = false; |
| } |
| } catch (CMSException& e) { |
| errmsg = e.getMessage(); |
| LOGERROR("AMQListener::validateRequest " + e.getMessage()); |
| valid = false; |
| } |
| } |
| } |
| } |
| return valid; |
| } |
| |
| |
| void AMQListener::receiveAndProcessMessages(apr_thread_t * thd) { |
| |
| try { |
| this->thd = thd; |
| cout << "Instance: " << iv_id << " ThreadId: " << apr_os_thread_current() << " started." << endl; |
| //start receiving messages |
| //this->iv_pConnection->start(); |
| |
| this->iv_stopProcessing = false; |
| apr_time_t lastStatsTime = apr_time_now(); |
| while (!iv_stopProcessing) { |
| |
| stringstream astr; |
| |
| //cout << "receiveAndProcess going to call recieve" << endl; |
| Message * msg = this->iv_pConnection->receive(2000); |
| |
| if (msg != NULL) { |
| iv_busy = true; |
| astr << this->iv_id; |
| astr << " *****Message#: "<< ++iv_count << "*****"; |
| //cerr << astr.str() << endl; |
| //LOGINFO(FINE,astr.str()); |
| this->handleRequest(msg); |
| ///this->testHandleRequest(msg); |
| delete msg; |
| msg = 0; |
| //cerr << iv_id << " ****Message#: " << iv_count << " deleted " << endl; |
| iv_busy = false; |
| this->iv_timeLastRequestCompleted = apr_time_now(); |
| //cout << iv_id << " processMessages done: getnext " << iv_count << endl; |
| } |
| |
| } |
| LOGWARN("AMQListener::receiveAndProcessMessage() stopped receiving messages."); |
| } catch (uima::Exception ex ) { |
| LOGERROR("AMQListener::receiveAndProcessMessages() " + |
| ex.getErrorInfo().getMessage().asString()); |
| this->iv_stopProcessing = true; |
| //tell the monitor that the thread has stopped processing |
| iv_pMonitor->listenerStopped(this->iv_id); |
| } catch (...) { |
| LOGERROR("AMQListener::receiveAndProcessMessages UnExpected error."); |
| iv_stopProcessing = true; |
| //tell the monitor that the thread has stopped processing |
| iv_pMonitor->listenerStopped(this->iv_id); |
| } |
| } |
| |
| |
| /* |
| * Receive a TextMessage and examine the header properties |
| * to determine the type of request and payload. Processes |
| * one request at a time and is blocked till the requestis |
| * handled and the response sent. |
| */ |
| void AMQListener::handleRequest( const Message* message ) { |
| |
| |
| apr_time_t startTime = apr_time_now(); |
| apr_time_t timeIdle = 0; |
| // Idle time is computed as the interval from time last request |
| // was processed. If this is the first request, idle time is |
| // computed from the time the service was started. |
| if (iv_timeLastRequestCompleted != 0) |
| timeIdle = startTime - iv_timeLastRequestCompleted; |
| else |
| timeIdle = startTime - iv_pMonitor->getServiceStartTime(); |
| |
| this->iv_pMonitor->processingStarted(this->iv_id, startTime, timeIdle ); |
| stringstream astr; |
| astr << this->iv_id; |
| astr << " ****handleRequest(): "<< iv_count << " start"<< endl; |
| LOGINFO(FINE,astr.str()); |
| ///cout << astr.str() << endl; |
| ///LOGWARN(astr.str()); |
| apr_time_t endTime = 0; |
| apr_time_t startSerialize = 0; |
| apr_time_t startDeserialize = 0; |
| apr_time_t startAnnotatorProcess = 0; |
| apr_time_t startSendResponse = 0; |
| apr_time_t timeToDeserializeCAS = 0; |
| apr_time_t timeToSerializeCAS = 0; |
| apr_time_t timeInAnalytic = 0; |
| |
| const TextMessage* textMessage=0; |
| int command = 0; |
| |
| try { |
| |
| textMessage = dynamic_cast< const TextMessage* >( message ); |
| if (textMessage==0) { |
| LOGERROR("AMQListener::handleRequest() invalid pointer to TextMessage"); |
| endTime = apr_time_now(); |
| iv_pMonitor->processingComplete(iv_id, 0,false,endTime-startTime,0,0,0,endTime-startTime); |
| this->iv_timeLastRequestCompleted = apr_time_now(); |
| return; |
| } |
| if (textMessage->propertyExists("MessageFrom")) { |
| LOGINFO(FINER,"Received from " + textMessage->getStringProperty("MessageFrom")); |
| } |
| |
| //validate request properties |
| string errormessage; |
| if (!validateRequest(textMessage, errormessage)) { |
| LOGERROR("Listener::handleRequest() " + errormessage); |
| endTime = apr_time_now(); |
| sendResponse(textMessage, 0,0,0,timeIdle, endTime-startTime, |
| errormessage ,true); |
| endTime = apr_time_now(); |
| iv_pMonitor->processingComplete(iv_id, 0,false,endTime-startTime,0,0,0,endTime-startTime); |
| return; |
| } |
| |
| command = textMessage->getIntProperty("Command"); |
| astr.str(""); |
| astr << "Received request Command: " << command ; |
| if (textMessage->propertyExists("CasReference")) { |
| astr << " CasReference " << textMessage->getStringProperty("CasReference"); |
| } |
| |
| astr << "Received request Command: " << command ; |
| if (textMessage->propertyExists("CasReference")) { |
| astr << " CasReference " << textMessage->getStringProperty("CasReference"); |
| } |
| LOGINFO(INFO,astr.str()); |
| |
| if (command == PROCESS_CAS_COMMAND) { //process CAS |
| LOGINFO(FINE,"Process CAS request start."); |
| int payload = textMessage->getIntProperty("Payload"); |
| //get the text in the payload |
| string text = textMessage->getText().c_str(); |
| UnicodeString utext(text.c_str()); |
| text = UnicodeStringRef(utext).asUTF8(); |
| astr.str(""); |
| astr << "Payload: " << payload << " Content: " << text ; |
| LOGINFO(FINER, astr.str()); |
| |
| //InputSource |
| MemBufInputSource memIS((XMLByte const *)text.c_str(), |
| text.length(), |
| "sysID"); |
| |
| //reset the CAS |
| iv_pCas->reset(); |
| ios::openmode mode = ios::binary; |
| stringstream xmlstr; |
| //deserialize payload data into the CAS, |
| //call AE process method and serialize |
| //the CAS which will be sent with the |
| //response. |
| if (payload == XCAS_PAYLOAD) { |
| LOGINFO(FINEST, "AMQListener::handleRequest() XCAS serialization."); |
| startDeserialize = apr_time_now(); |
| XCASDeserializer deserializer; |
| deserializer.deserialize(memIS, *iv_pCas); |
| startAnnotatorProcess=apr_time_now(); |
| timeToDeserializeCAS = startAnnotatorProcess-startDeserialize; |
| |
| iv_pEngine->process(*iv_pCas); |
| startSerialize=apr_time_now(); |
| timeInAnalytic = startSerialize - startAnnotatorProcess; |
| |
| XCASWriter xcaswriter(*iv_pCas, true); |
| xcaswriter.write(xmlstr); |
| timeToSerializeCAS = apr_time_now() - startSerialize; |
| } else if (payload == XMI_PAYLOAD) { |
| //deserialize incoming xmi CAS data. |
| LOGINFO(FINEST, "AMQListener::handleRequest() XMI serialization."); |
| |
| startDeserialize = apr_time_now(); |
| XmiSerializationSharedData sharedData; |
| XmiDeserializer deserializer; |
| deserializer.deserialize(memIS,*iv_pCas,sharedData); |
| startAnnotatorProcess=apr_time_now(); |
| timeToDeserializeCAS = startAnnotatorProcess-startDeserialize; |
| LOGINFO(FINEST, "AMQListener::handleRequest() calling process."); |
| iv_pEngine->process(*iv_pCas); |
| startSerialize=apr_time_now(); |
| timeInAnalytic = startSerialize - startAnnotatorProcess; |
| |
| //serialize CAS |
| LOGINFO(FINEST, "AMQListener::handleRequest() calling serialize."); |
| XmiWriter xmiwriter(*iv_pCas, true, &sharedData); |
| xmiwriter.write(xmlstr); |
| //cout << "SERIALIZED CAS " << xmlstr.str() << endl; |
| timeToSerializeCAS = apr_time_now() - startSerialize; |
| LOGINFO(FINEST, "AMQListener::handleRequest() done processing CAS."); |
| } |
| //done with this CAS. |
| iv_pCas->reset(); |
| //record end time |
| endTime = apr_time_now(); |
| //send reply |
| LOGINFO(FINER,"AnalysisEngine::process() completed successfully. Sending reply."); |
| sendResponse(textMessage, timeToDeserializeCAS, |
| timeToSerializeCAS, timeInAnalytic, |
| timeIdle, endTime-startTime, |
| xmlstr.str(),false); |
| endTime=apr_time_now(); |
| iv_pMonitor->processingComplete(iv_id, command,true,endTime-startTime, |
| timeToDeserializeCAS, timeInAnalytic, timeToSerializeCAS, |
| endTime-startSendResponse); |
| LOGINFO(FINE,"Process CAS finished."); |
| } else if (command == GET_META_COMMAND ) { //get Meta |
| LOGINFO(FINE, "Process getMeta request start."); |
| endTime = apr_time_now(); |
| startSendResponse = apr_time_now(); |
| sendResponse(textMessage, timeToDeserializeCAS, |
| timeToSerializeCAS, timeInAnalytic, |
| timeIdle, endTime-startTime,this->iv_aeDescriptor,false); |
| endTime=apr_time_now(); |
| //record timing |
| iv_pMonitor->processingComplete(iv_id, command,true,endTime-startTime,0,0,0,endTime-startSendResponse); |
| LOGINFO(FINE,"Process getMeta request finished."); |
| } else if (command == CPC_COMMAND ) { //CPC |
| LOGINFO(FINE, "Processing CollectionProcessComplete request start"); |
| iv_pEngine->collectionProcessComplete(); |
| endTime = apr_time_now(); |
| startSendResponse = apr_time_now(); |
| sendResponse(textMessage, 0, |
| 0, timeInAnalytic, |
| timeIdle,endTime-startTime,"CPC completed.",false); |
| endTime=apr_time_now(); |
| iv_pMonitor->processingComplete(iv_id, command,true,endTime-startTime,0,0,0,endTime-startSendResponse); |
| LOGINFO(FINE, "Processing CollectionProcessComplete request finished."); |
| } |
| } catch (XMLException& e) { |
| stringstream str; |
| str << "AMQListener::handleRequest XMLException." << e.getMessage(); |
| LOGERROR(str.str()); |
| endTime = apr_time_now(); |
| startSendResponse = apr_time_now(); |
| sendResponse(textMessage, timeToDeserializeCAS, |
| timeToSerializeCAS, timeInAnalytic, |
| timeIdle, endTime-startTime,str.str(),true); |
| endTime = apr_time_now(); |
| iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startSendResponse); |
| } catch (uima::Exception e) { |
| LOGERROR("AMQListener::handleRequest UIMA Exception " + e.asString()); |
| endTime = apr_time_now(); |
| startSendResponse = apr_time_now(); |
| sendResponse(textMessage, timeToDeserializeCAS, |
| timeToSerializeCAS, timeInAnalytic, |
| timeIdle,endTime-startTime,e.asString(),true); |
| endTime = apr_time_now(); |
| iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startSendResponse); |
| } catch(...) { |
| LOGERROR("AMQListener::handleRequest Unknown exception "); |
| //TODO: log / shurdown ?} |
| endTime = apr_time_now(); |
| iv_pMonitor->processingComplete(iv_id, command,false,endTime-startTime,0,0,0,endTime-startTime); |
| |
| } |
| } |
| |
| void AMQListener::sendResponse(const TextMessage * request, |
| apr_time_t timeToDeserialize, |
| apr_time_t timeToSerialize, |
| apr_time_t timeInAnalytic, |
| apr_time_t idleTime, |
| apr_time_t elapsedTime, |
| string msgContent, bool isExceptionMsg) { |
| |
| //TODO retry |
| string serverURI; |
| AMQConnection * replyTo=NULL; |
| TextMessage * reply=NULL; |
| |
| try { |
| const Destination * cmsReplyTo = request->getCMSReplyTo(); |
| if (cmsReplyTo != NULL) { |
| reply = this->iv_pConnection->getTextMessage(); |
| } else { |
| LOGINFO(2, "AMQListener::sendResponse() start"); |
| if (!request->propertyExists("ServerURI") ) { |
| LOGERROR("AMQListener::sendResponse() ServerURI header property does not exist."); |
| return; |
| } |
| if (!request->propertyExists("MessageFrom") ) { |
| LOGERROR("AMQListener::sendResponse() MessageFrom header property does not exist."); |
| return; |
| } |
| |
| //get handle to a connection |
| string tmp = request->getStringProperty("ServerURI"); |
| LOGINFO(FINEST,"replyTo ServerURI: " + tmp); |
| |
| //special handling when protocol is http. |
| //HTTP protocol not supported by ActiveMQ C++ client. |
| //replace the http broker URL with the broker URL of |
| //the queue this listener is attached to. |
| if (tmp.find("http:") != string::npos) { |
| serverURI=this->iv_brokerURL; |
| LOGINFO(FINER,"HTTP reply address: " + tmp); |
| } else { //send reply via tcp |
| size_t begpos = tmp.find("tcp:",0); |
| tmp = tmp.substr(begpos); |
| size_t endpos = tmp.find_first_of(","); |
| |
| if (begpos == string::npos) { |
| LOGERROR("AMQListener::sendResponse() Could not find tcp URL in ServerURI header property."); |
| return; |
| } |
| if (endpos == string::npos) { |
| serverURI = tmp; |
| } else { |
| serverURI = tmp.substr(0, endpos); |
| } |
| } |
| LOGINFO(FINER,"ReplyTo BrokerURL " + serverURI); |
| |
| //look up the endpoint connection |
| replyTo = this->iv_replyToConnections.getConnection(serverURI); |
| if (replyTo == NULL) { |
| LOGERROR("Could not get a connection to " + serverURI); |
| return; |
| } |
| //get message object |
| reply = replyTo->getTextMessage(); |
| } |
| |
| if (reply == NULL) { |
| LOGERROR("AMQListener::sendResponse() invalid textMessage object " ); |
| return; |
| } |
| |
| //construct the reply message |
| reply->setStringProperty("MessageFrom", this->iv_pConnection->getInputQueueName() ); |
| reply->setStringProperty("ServerURI", this->iv_brokerURL); |
| |
| reply->setIntProperty("MessageType",RESPONSE); |
| reply->setLongProperty("TimeInService", elapsedTime*1000); |
| reply->setLongProperty("IdleTime", idleTime*1000); |
| if (request->propertyExists("CasReference") ) { |
| reply->setStringProperty("CasReference", request->getStringProperty("CasReference")); |
| } |
| if (request->propertyExists("Command") ) { |
| reply->setIntProperty("Command", request->getIntProperty("Command")); |
| if (request->getIntProperty("Command") == PROCESS_CAS_COMMAND) { |
| reply->setLongProperty("TimeToSerializeCAS", timeToSerialize*1000); |
| reply->setLongProperty("TimeToDeserializeCAS", timeToDeserialize*1000); |
| reply->setLongProperty("TimeInAnalytic", timeInAnalytic*1000); |
| reply->setLongProperty("TimeInProcessCAS", timeInAnalytic*1000); |
| } |
| } |
| if (isExceptionMsg) { |
| reply->setIntProperty("Payload",EXC_PAYLOAD); |
| } else { |
| if (request->propertyExists("Payload") ) { |
| reply->setIntProperty("Payload",request->getIntProperty("Payload")); |
| } else { |
| reply->setIntProperty("Payload",NO_PAYLOAD); |
| } |
| } |
| //cargo |
| reply->setText(msgContent); |
| //log the reply message content |
| stringstream str; |
| str << "Sending Reply Command: " << reply->getIntProperty("Command") << " MessageType: " << reply->getIntProperty("MessageType") << " "; |
| if (reply->propertyExists("CasReference") ) { |
| str << "CasReference: " << reply->getStringProperty("CasReference"); |
| } |
| LOGINFO(INFO,str.str()); |
| |
| if (cmsReplyTo != NULL) { |
| str << " to " << ((Queue*)cmsReplyTo)->getQueueName(); |
| } else { |
| str << " to " << request->getStringProperty("MessageFrom") |
| << " at " << serverURI; |
| } |
| str << " Message text: " << msgContent; |
| //std::cout << "****Reply" << msgContent << std::endl; |
| LOGINFO(FINEST,"PRINT Reply message:\n" + str.str()); |
| |
| //send |
| if (cmsReplyTo != NULL) { |
| //cout << "cmsReplyTo=" << cmsReplyTo->toProviderString() << endl; |
| iv_pConnection->sendMessage(cmsReplyTo); |
| } else { |
| replyTo->sendMessage(request->getStringProperty("MessageFrom")); |
| } |
| LOGINFO(FINER,"AMQListener::sendResponse DONE"); |
| |
| } catch (CMSException& ex ) { |
| LOGERROR("AMQListener::handleMessage()" + ex.getMessage()); |
| } catch (...) { |
| LOGERROR("AMQListener::handleRequest() UnExpected error sending reply."); |
| } |
| } |
| |
| //=================================================== |
| //AMQAnalysisEngineService |
| //--------------------------------------------------- |
| AMQAnalysisEngineService::~AMQAnalysisEngineService() { |
| //stop(); |
| cleanup(); |
| } |
| |
| AMQAnalysisEngineService::AMQAnalysisEngineService(ServiceParameters & desc, |
| Monitor * stats, apr_pool_t * pool) : |
| //iv_pMonitor(stats), |
| iv_pool(pool), |
| iv_numInstances(desc.getNumberOfInstances()), |
| iv_brokerURL(desc.getBrokerURL()), |
| iv_inputQueueName(desc.getQueueName()), |
| iv_aeDescriptor(desc.getAEDescriptor()), |
| iv_prefetchSize(desc.getPrefetchSize()), |
| iv_initialFSHeapSize(desc.getInitialFSHeapSize()), |
| iv_vecpConnections(), |
| iv_vecpAnalysisEngines(), |
| iv_vecpCas(), |
| iv_closed(false), |
| iv_listeners() { |
| try { |
| iv_pMonitor = stats; |
| initialize(desc); |
| } catch (CMSException & e) { |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam(e.getMessage()); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| } |
| |
| void AMQAnalysisEngineService::initialize(ServiceParameters & params) { |
| |
| try { |
| //create connection factory |
| LOGINFO(FINEST,"AMQAnalysisEngineService::initialize() Create connection factory"); |
| this->iv_pConnFact = AMQConnection::createConnectionFactory(params); |
| |
| if (iv_pConnFact == NULL) { |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQAnalysisEngineService::initialize() Failed to create connection factory."); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| if (!uima::ResourceManager::hasInstance()) { |
| uima::ResourceManager::createInstance("ActiveMQAnalysisEngineService"); |
| } |
| ErrorInfo errInfo; |
| UnicodeString ustr(this->iv_aeDescriptor.c_str()); |
| UnicodeString ufn = ResourceManager::resolveFilename(ustr,ustr); |
| |
| //create a AnalysisEngine and CAS for each instance |
| for (int i=0; i < iv_numInstances; i++) { |
| //create the connection |
| AMQConnection * newConnection = new AMQConnection(this->iv_pConnFact, params.getBrokerURL(), this->iv_pMonitor, i); |
| if (newConnection == NULL) { |
| LOGERROR("AMQAnalysisEngineService::initialize() Could not create ActiveMQ endpoint connection."); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQAnalysisEngineService::initialize() Failed to connect to broker."); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| /////newConnection->setExceptionListener(this); |
| this->iv_vecpConnections.push_back(newConnection); |
| //create AE |
| AnalysisEngine * pEngine = uima::TextAnalysisEngine::createTextAnalysisEngine((UnicodeStringRef(ufn).asUTF8().c_str()), |
| errInfo); |
| if (pEngine) { |
| LOGINFO(0,"AMQAnalysisEngineService::initialize() AnalysisEngine initialization successful."); |
| } else { |
| LOGERROR("AMQAnalysisEngineService::initializer() could not create AE" + errInfo.asString()); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQAnalysisEngineService::initialize() create AE failed. " + errInfo.getMessage().asString() ); |
| ErrorInfo errInfo; |
| errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE), |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| |
| this->iv_vecpAnalysisEngines.push_back(pEngine); |
| |
| //initial FSHeap size |
| if (this->iv_initialFSHeapSize > 0) { |
| pEngine->getAnnotatorContext(). |
| assignValue(UIMA_ENGINE_CONFIG_OPTION_FSHEAP_PAGESIZE,this->iv_initialFSHeapSize); |
| } |
| |
| //create CAS |
| CAS * cas = pEngine->newCAS(); |
| if (cas == NULL) { |
| LOGERROR("AMQAnalysisEngineService::initialize() Could not create CAS"); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQListener::initialize() create CAS failed."); |
| ErrorInfo errInfo; |
| errInfo.setErrorId(UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE), |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| this->iv_vecpCas.push_back(cas); |
| |
| //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() create listener " << endl; |
| //create listeners and register these |
| AMQListener * newListener = new AMQListener(i,iv_vecpConnections.at(i), |
| iv_vecpAnalysisEngines.at(i), iv_vecpCas.at(i), |
| iv_pMonitor); |
| if (newListener == NULL) { |
| LOGERROR("AMQAnalysisEngineService::initialize() Could not create listener."); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AnalysisEngineService::initialize() Could not create listener."); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| this->iv_listeners[i] = newListener; |
| //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() create message consumer " << endl; |
| //create MessageConsumer session and register Listener |
| this->iv_vecpConnections.at(i)->createMessageConsumer(iv_inputQueueName,annotator_selector, params.getPrefetchSize()); |
| } |
| |
| iv_pMonitor->setNumberOfInstances(iv_numInstances); |
| //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() setup getmeta " << endl; |
| //Fast GetMeta |
| //create connection |
| LOGINFO(FINEST, "AMQAnalysisEngineService::initialize() Setup GETMETA instance."); |
| iv_pgetMetaConnection = new AMQConnection(this->iv_pConnFact, params.getBrokerURL(), this->iv_pMonitor, iv_numInstances); |
| |
| if (iv_pgetMetaConnection == NULL) { |
| LOGERROR("AMQAnalysisEngineService::initialize() Could not create fast getmeta ActiveMQ endpoint connection."); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQAnalysisEngineService::initialize() Failed to connect to broker."); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| |
| //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() setup getmeta listener" << endl; |
| //create a MessageListener MessageConsumer to handle getMeta requests only. |
| AMQListener * newListener = new AMQListener(iv_numInstances,iv_pgetMetaConnection, |
| iv_vecpAnalysisEngines.at(0), |
| this->iv_pMonitor); |
| if (newListener == NULL) { |
| LOGERROR("AMQAnalysisEngineService::initialize() Could not create Fast getMeta listener."); |
| ErrorMessage msg(UIMA_MSG_ID_LOG_ERROR); |
| msg.addParam("AMQAnalysisEngineService::initialize() Could not create listener."); |
| ErrorInfo errInfo; |
| errInfo.setMessage(msg); |
| UIMA_EXC_THROW_NEW(Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| this->iv_listeners[this->iv_numInstances] = newListener; |
| iv_pgetMetaConnection->createMessageConsumer(iv_inputQueueName, |
| getmeta_selector,0); |
| this->iv_pMonitor->setGetMetaListenerId(iv_numInstances); |
| //cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() done" << endl; |
| } catch (uima::Exception e) { |
| cout << __FILE__ << __LINE__ << "AMQAnalysisEngineService::initialize() failed " << e.getErrorInfo().asString() << endl; |
| LOGERROR("AMQAnalysisEngineService::initialize() " + e.asString()); |
| throw e; |
| } |
| } |
| |
| void AMQAnalysisEngineService::setTraceLevel(int level) { |
| if (level < 0) { |
| uimacpp_ee_tracelevel=NONE; |
| } else if (level == 0) { |
| uimacpp_ee_tracelevel = INFO; |
| } else if (level == 1) { |
| uimacpp_ee_tracelevel = FINE; |
| } else if (level == 2) { |
| uimacpp_ee_tracelevel = FINER; |
| } else if (level > 2) { |
| uimacpp_ee_tracelevel = FINEST; |
| } else { |
| uimacpp_ee_tracelevel = INFO; |
| } |
| cout << "tracelevel=" << uimacpp_ee_tracelevel << endl; |
| } |
| |
| void AMQAnalysisEngineService::startProcessingThreads() { |
| LOGINFO(FINER,"AMQAnalysisEngineService::start() create listener threads."); |
| //create the listener threads |
| thd_attr=0; |
| apr_status_t rv; |
| rv = apr_threadattr_create(&thd_attr, iv_pool); |
| assert(rv == APR_SUCCESS); |
| map<int, AMQListener*>::iterator ite; |
| int i=0; |
| for (ite= iv_listeners.begin();ite != iv_listeners.end();ite++) { |
| apr_thread_t *thread=0; |
| rv = apr_thread_create(&thread, thd_attr, handleMessages, ite->second, iv_pool); |
| assert(rv == APR_SUCCESS); |
| iv_listenerThreads.push_back(thread); |
| } |
| this->iv_pMonitor->setStartTime(); |
| } |
| |
| |
| void AMQAnalysisEngineService::start() { |
| |
| if (iv_pgetMetaConnection != NULL) { |
| cerr << "Startinging GetMetaData instance" << endl; |
| this->iv_pgetMetaConnection->start(); |
| } |
| for (size_t i=0; i < iv_vecpConnections.size(); i++) { |
| cerr << "Starting Annotator instance " << i << endl; |
| AMQConnection * connection = iv_vecpConnections.at(i); |
| if (connection != NULL) { |
| connection->start(); |
| } else { |
| LOGERROR("AMQAnalysisEngineService::start() Connection object is NULL."); |
| ErrorInfo errInfo; |
| errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "connection object is NULL.")); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| } |
| } |
| |
| void AMQAnalysisEngineService::shutdown() { |
| //LOGWARN("AMQAnalysisEngineService::shutdown()"); |
| |
| stop(); |
| |
| cout << "AMQAnalysisEngineService::shutdown() going to terminate threads " << endl;; |
| //terminate the threads |
| apr_status_t rv; |
| for (size_t i=0; i < this->iv_listenerThreads.size(); i++) { |
| //cout << "wait for thread " << i << " to end " << endl; |
| this->iv_listeners[i]->stopProcessing(); |
| if (!iv_listeners[i]->isReconnecting()) { |
| apr_thread_join(&rv, this->iv_listenerThreads.at(i)); |
| } |
| } |
| |
| cout << "AMQAnalysisEngineService::shutdown stopped all connection" << endl; |
| cleanup(); |
| cout << "AMQAnalysisEngineService::shutdown shutdown done" << endl; |
| } |
| |
| int AMQAnalysisEngineService::stop() { |
| |
| //TODO: let listeners finish processing first |
| //stop messages notification |
| |
| if (iv_pgetMetaConnection != NULL) { |
| cerr << "Stopping GetMetaData instance" << endl; |
| this->iv_pgetMetaConnection->stop(); |
| } |
| for (size_t i=0; i < iv_vecpConnections.size(); i++) { |
| cerr << "Stopping Annotator instance " << i << endl; |
| AMQConnection * connection = iv_vecpConnections.at(i); |
| if (connection != NULL) { |
| connection->stop(); |
| } else { |
| LOGERROR("AMQAnalysisEngineService::stop() Connection object is NULL."); |
| ErrorInfo errInfo; |
| errInfo.setMessage(ErrorMessage(UIMA_MSG_ID_LOG_ERROR, "connection object is NULL.")); |
| UIMA_EXC_THROW_NEW(uima::Uima_runtime_error, |
| UIMA_ERR_RESMGR_COULD_NOT_INITIALIZE_RESOURCE, |
| errInfo.getMessage(), |
| errInfo.getMessage().getMessageID(), |
| ErrorInfo::unrecoverable); |
| } |
| } |
| return 0; |
| } |
| |
| void AMQAnalysisEngineService::quiesceAndStop() { |
| |
| quiesce(); |
| |
| //shutdown worker threads. |
| cout << "AMQAnalysisEngineService::quiesceAndStop() going to terminate threads " << endl;; |
| if (this->iv_pMonitor->getQuiesceAndStop() ) { |
| for (size_t i=0; i < this->iv_listenerThreads.size(); i++) { |
| //cout << "wait for thread " << i << " to end " << endl; |
| this->iv_listeners[i]->stopProcessing(); |
| apr_thread_join(&rv, this->iv_listenerThreads.at(i)); |
| } |
| } |
| |
| cleanup(); |
| } |
| |
| void AMQAnalysisEngineService::quiesce() { |
| //stop connections - does not work |
| stop(); |
| |
| //check whether processing threads are finished. |
| bool allfinished = false; |
| while (!allfinished) { |
| allfinished = true; |
| map<int, AMQListener*>::iterator ite; |
| for (ite= iv_listeners.begin();ite != iv_listeners.end();ite++) { |
| if (ite->second->isBusy()) { |
| allfinished = false; |
| break; |
| } |
| } |
| //Thread::sleep(1000); |
| apr_sleep(1000000); |
| } |
| } |
| |
| void AMQAnalysisEngineService::cleanup() { |
| // Destroy resources. |
| try { |
| if (iv_closed) { |
| return; |
| } |
| if (iv_pgetMetaConnection != NULL) { |
| delete this->iv_pgetMetaConnection; |
| this->iv_pgetMetaConnection = 0; |
| } |
| |
| for (size_t i=0; i < iv_vecpConnections.size(); i++) { |
| //cout << "deleting consumerConnection " << i << endl; |
| if (iv_vecpConnections.at(i) != NULL) { |
| iv_vecpConnections.at(i)->stop(); |
| delete iv_vecpConnections.at(i); |
| } |
| } |
| |
| for (size_t i=0; i < iv_vecpAnalysisEngines.size(); i++) { |
| delete iv_vecpAnalysisEngines.at(i); |
| } |
| |
| for (size_t i=0; i < iv_vecpCas.size(); i++) { |
| delete iv_vecpCas.at(i); |
| } |
| |
| iv_vecpAnalysisEngines.clear(); |
| iv_vecpCas.clear(); |
| iv_vecpConnections.clear(); |
| |
| map<int, AMQListener*>::iterator ite; |
| for (ite= iv_listeners.begin();ite != iv_listeners.end();ite++) { |
| delete ite->second; |
| } |
| |
| iv_listeners.clear(); |
| |
| if (iv_pConnFact != NULL) { |
| delete iv_pConnFact; |
| iv_pConnFact = NULL; |
| } |
| iv_closed = true; |
| |
| } catch (CMSException& e) { |
| cerr << "AMQAnalysisEngineService::cleanup() " << e.getMessage() << endl;; |
| } |
| } |
| |
| |
| //=================================================== |
| //CommonUtils |
| //--------------------------------------------------- |
| void CommonUtils::logError(string msg) { |
| this->iv_pMonitor->logError(msg); |
| //cerr << "ERROR: " << msg << endl; |
| } |
| void CommonUtils::logWarning(string msg) { |
| this->iv_pMonitor->logWarning(msg); |
| //cout << "WARN: " << msg << endl; |
| } |
| void CommonUtils::logMessage(string msg) { |
| cout << "INFO: " << msg << endl; |
| this->iv_pMonitor->logMessage(msg); |
| //cout << "INFO: " << msg << endl; |
| } |
| |