blob: 84cee70680cafea39e4e551c04d8d988846bb1a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _ACTIVEMQ_CORE_ACTIVEMQCONSUMER_H_
#define _ACTIVEMQ_CORE_ACTIVEMQCONSUMER_H_
#include <cms/MessageConsumer.h>
#include <cms/MessageListener.h>
#include <cms/Message.h>
#include <cms/CMSException.h>
#include <activemq/connector/ConsumerInfo.h>
#include <activemq/connector/ConnectorResourceListener.h>
#include <activemq/util/Queue.h>
#include <activemq/core/ActiveMQAckHandler.h>
#include <activemq/core/Dispatcher.h>
#include <activemq/concurrent/Mutex.h>
namespace activemq{
namespace core{
class ActiveMQSession;
class ActiveMQConsumer :
public cms::MessageConsumer,
public ActiveMQAckHandler,
public Dispatcher,
public connector::ConnectorResourceListener
{
private:
/**
* The session that owns this Consumer
*/
ActiveMQSession* session;
/**
* The Consumer info for this Consumer
*/
connector::ConsumerInfo* consumerInfo;
/**
* The Message Listener for this Consumer
*/
cms::MessageListener* listener;
/**
* Queue of unconsumed messages.
*/
util::Queue<DispatchData> unconsumedMessages;
/**
* Boolean that indicates if the consumer has been closed
*/
bool closed;
public:
/**
* Constructor
*/
ActiveMQConsumer( connector::ConsumerInfo* consumerInfo,
ActiveMQSession* session );
virtual ~ActiveMQConsumer();
public: // Interface Implementation
/**
* Closes the Consumer. This will return all allocated resources
* and purge any outstanding messages. This method will block if
* there is a call to receive in progress, or a dispatch to a
* MessageListener in place
* @throws CMSException
*/
virtual void close() throw ( cms::CMSException );
/**
* Synchronously Receive a Message
* @return new message
* @throws CMSException
*/
virtual cms::Message* receive() throw ( cms::CMSException );
/**
* Synchronously Receive a Message, time out after defined interval.
* Returns null if nothing read.
* @param millisecs the time in milliseconds to wait before returning
* @return new message or null on timeout
* @throws CMSException
*/
virtual cms::Message* receive( int millisecs ) throw ( cms::CMSException );
/**
* Receive a Message, does not wait if there isn't a new message
* to read, returns NULL if nothing read.
* @return new message
* @throws CMSException
*/
virtual cms::Message* receiveNoWait() throw ( cms::CMSException );
/**
* Sets the MessageListener that this class will send notifs on
* @param listener MessageListener interface pointer
*/
virtual void setMessageListener( cms::MessageListener* listener );
/**
* Gets the MessageListener that this class will send notifs on
* @param MessageListener interface pointer
*/
virtual cms::MessageListener* getMessageListener() const {
return this->listener;
}
/**
* Gets this message consumer's message selector expression.
* @return This Consumer's selector expression or "".
* @throws cms::CMSException
*/
virtual std::string getMessageSelector() const
throw ( cms::CMSException );
/**
* Method called to acknowledge the message passed
* @param message the Message to Acknowlegde
* @throw CMSException
*/
virtual void acknowledgeMessage( const ActiveMQMessage* message )
throw ( cms::CMSException );
public: // Dispatcher Methods
/**
* Called asynchronously by the session to dispatch a message.
* @param message object pointer
*/
virtual void dispatch( DispatchData& message );
public: // ActiveMQConsumer Methods
/**
* Get the Consumer information for this consumer
* @return Pointer to a Consumer Info Object
*/
virtual connector::ConsumerInfo* getConsumerInfo() {
return consumerInfo;
}
protected: // ConnectorResourceListener
/**
* When a Connector Resouce is closed it will notify any registered
* Listeners of its close so that they can take the appropriate
* action.
* @param resource - The ConnectorResource that was closed.
*/
virtual void onConnectorResourceClosed(
const connector::ConnectorResource* resource ) throw ( cms::CMSException );
protected:
/**
* Purges all messages currently in the queue. This can be as a
* result of a rollback, or of the consumer being shutdown.
*/
virtual void purgeMessages() throw (exceptions::ActiveMQException);
/**
* Destroys the message if the session is transacted, otherwise
* does nothing.
* @param message the message to destroy
*/
virtual void destroyMessage( ActiveMQMessage* message )
throw (exceptions::ActiveMQException);
/**
* Used by synchronous receive methods to wait for messages to come in.
* @param timeout - The maximum number of milliseconds to wait before
* returning.
* If -1, it will block until a messages is received or this consumer
* is closed.
* If 0, will not block at all. If > 0, will wait at a maximum the
* specified number of milliseconds before returning.
* @return the message, if received within the allotted time.
* Otherwise NULL.
* @throws InvalidStateException if this consumer is closed upon
* entering this method.
*/
ActiveMQMessage* dequeue(int timeout) throw ( cms::CMSException );
/**
* Pre-consume processing
* @param message - the message being consumed.
*/
virtual void beforeMessageIsConsumed( ActiveMQMessage* message );
/**
* Post-consume processing
* @param message - the consumed message
* @param messageExpired - flag indicating if the message has expired.
*/
virtual void afterMessageIsConsumed( ActiveMQMessage* message, bool messageExpired );
};
}}
#endif /*_ACTIVEMQ_CORE_ACTIVEMQCONSUMER_H_*/