blob: fb3f0deaa9d378349957c425422bc1b2a2291cd7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* Analysis Engine service wrapper based on
* Active MQ C++ client.
*/
#ifndef __ACTIVEMQ_AE_SERVICE__
#define __ACTIVEMQ_AE_SERVICE__
#include "uima/api.hpp"
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/ExceptionListener.h>
using namespace cms;
using namespace uima;
//Forward declarations
class Monitor;
class ServiceParameters;
/** common base class */
class CommonUtils {
protected:
Monitor * iv_pMonitor;
void logError(string msg);
void logWarning(string msg);
void logMessage(string msg);
};
//==========================================================
// This class wraps a ActiveMQ JMS connection
// and provides utility methods to create a MessageProducer
// and MessageConsumer and to send and receive messages.
//----------------------------------------------------------
class AMQConnection : public ExceptionListener,
public CommonUtils {
private:
string iv_brokerURL;
Connection* iv_pConnection;
bool iv_valid;
//consumer session
Session* iv_pConsumerSession;
MessageConsumer * iv_pConsumer;
string iv_inputQueueName;
cms::Queue * iv_pInputQueue;
MessageListener * iv_pListener;
//producer session
Session * iv_pProducerSession;
MessageProducer * iv_pProducer;
TextMessage * iv_pTextMessage;
map<string, cms::Destination*> iv_replyDestinations; //queuename-destination
public:
/** Establish connection to the broker and create a Message Producer session.
*/
AMQConnection ( const char * aBrokerURL, Monitor * pStatistics);
/** Creates a MessageConsumer session and registers a listener.
Caller owns the listener. */
void createMessageConsumer(string aQueueName, MessageListener * pListener, int prefetch);
/** destructor */
~AMQConnection();
/** caller owns the ExceptionListener */
void setExceptionListener(ExceptionListener * el);
/** This also is a default Exception handler */
void onException( const CMSException& ex);
/** If this is consumer, must be called to start receiving messages */
void start();
/** If this is a consumer, stops receiving messages. */
void stop();
/** returns a TextMessage owned by this class. */
TextMessage * getTextMessage();
/** sends the message and clears it. */
void sendMessage(string queueName);
/** sends the message and clears it. */
void sendMessage(const Destination * cmsReplyTo);
bool isValid() {
return iv_valid;
}
/** get the brokerURL */
string getBrokerURL() {
return this->iv_brokerURL;
}
/** get the input queue name */
string getInputQueueName() {
return this->iv_inputQueueName;
}
};
//==================================================
// This class is used to cache and reuse connections
// used to send reply messages.
//--------------------------------------------------
class AMQConnectionsCache : public CommonUtils {
private:
map<string, AMQConnection *> iv_connections; //key is brokerurl
public:
AMQConnectionsCache(Monitor * pStatistics);
~AMQConnectionsCache();
/**
* Retrieves a Connection from the cache if it
* exists or establishes a new connection to the
* the specified broker and adds it to the cache.
*/
AMQConnection * getConnection(string brokerURL);
};
//======================================================
// A MessageListener that handles getMeta, processCAS
// and Collection Processing Complete requests.
//
// Records timing and error JMX statistics.
//
//------------------------------------------------------
class AMQListener : public MessageListener,
public CommonUtils {
private:
int iv_id; //Listener id
string iv_inputQueueName; //queue this listener gets messages from
string iv_brokerURL; //broker this listener is connected to
AMQConnection * iv_pConnection; //connection this Listener is registered with.
//Used to send replies to queues on the same broker.
AnalysisEngine * iv_pEngine; //AnalysisEngine
CAS * iv_pCas; //CAS
string iv_aeDescriptor; //AE descriptor XML
int iv_count; //num messages processed
bool iv_busy; //is processing a message
apr_time_t iv_timeLastRequestCompleted; //used to calculate idle time between requests
AMQConnectionsCache iv_replyToConnections; //maintain connections cache for
//sending reply to other brokers.
void sendResponse(const TextMessage * request, apr_time_t timeToDeserialize,
apr_time_t timeToSerialize, apr_time_t timeInAnalytic,
apr_time_t idleTime, apr_time_t elapsedTime,
string msgContent, bool isExceptionMsg);
public:
/** constructor */
AMQListener(int id,
AMQConnection * pConnection,
AnalysisEngine * pEngine,
CAS * pCas,
Monitor * pStatistics);
/** destructor */
~AMQListener();
bool isBusy() {
return this->iv_busy;
}
/*
* Process the message. Handles GETMETA, PROCESSCAS
* and CPC requests.
*/
virtual void onMessage( const Message* message );
};
//===================================================
// This class creates and configures a C++ service
// according to parameters passed in as arguments.
//
// This class creates connections to an ActiveMQ broker
// and registers one or more MessageConsumers to receive
// messages from a specified queue. The MessageConsumers can
// process requests for GETMETA, PROCESSCAS and Collection
// Process Complete.
//
// Each MessageConsumer registers a MessageListener that will
// receive and process messages. Each instance of the MessageListener
// is initialized with an instance of the AnalysisEngine and a CAS.
//
// The service wrapper sets acknowledgment mode to AUTO_ACKNOWLEDGE mode
// by default and lets the underlying middleware handle the message
// acknowledgements.
//
// The lifecycle of the service may be managed by the UIMA AS Java
// controller bean org.apache.uima.controller.UimacppServiceController.
// In this case, the C++ service is started by the controller bean.
// A socket connection is established between the controller bean and
// the C++ process and used to route logging messages and JMX statistics.
//
// See the UIMA-EE documentation for how to start and manage a C++
// servcice from Java using the UimacppServiceController bean.
//------------------------------------------------------------------------
class AMQAnalysisEngineService : public ExceptionListener,
public MessageListener,
public CommonUtils {
private:
string iv_brokerURL;
string iv_inputQueueName;
string iv_aeDescriptor;
int iv_numInstances;
int iv_prefetchSize;
size_t iv_initialFSHeapSize;
vector <AMQConnection*> iv_vecpConnections;
vector <AnalysisEngine *> iv_vecpAnalysisEngines;
vector <CAS *> iv_vecpCas;
map<int, AMQListener *> iv_listeners; //id - listener
void initialize();
public:
void cleanup();
~AMQAnalysisEngineService();
AMQAnalysisEngineService(ServiceParameters & desc, Monitor * pStatistics);
void setTraceLevel(int level);
void onException( const CMSException& ex );
virtual void onMessage( const Message* message );
void start();
int stop();
};
#endif