blob: 23a9a00972a619f59333f1a6173d39729094ac97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _ACTIVEMQ_CORE_ACTIVEMQCONSUMER_H_
#define _ACTIVEMQ_CORE_ACTIVEMQCONSUMER_H_
#include <cms/MessageConsumer.h>
#include <cms/MessageListener.h>
#include <cms/Message.h>
#include <cms/CMSException.h>
#include <activemq/util/Config.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/commands/ConsumerInfo.h>
#include <activemq/commands/MessageAck.h>
#include <activemq/commands/MessageDispatch.h>
#include <activemq/core/Dispatcher.h>
#include <activemq/core/RedeliveryPolicy.h>
#include <activemq/core/MessageDispatchChannel.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/lang/Pointer.h>
#include <decaf/util/concurrent/Mutex.h>
namespace activemq{
namespace core{
using decaf::lang::Pointer;
using decaf::util::concurrent::atomic::AtomicBoolean;
class ActiveMQSession;
class ActiveMQConsumerMembers;
class AMQCPP_API ActiveMQConsumer : public cms::MessageConsumer,
public Dispatcher
{
private:
/**
* Internal Class that holds Members of this class, allows for changes without API breakage.
*/
ActiveMQConsumerMembers* internal;
/**
* The ActiveMQSession that owns this class instance.
*/
ActiveMQSession* session;
/**
* The ConsumerInfo object for this class instance.
*/
Pointer<commands::ConsumerInfo> consumerInfo;
private:
ActiveMQConsumer( const ActiveMQConsumer& );
ActiveMQConsumer& operator= ( const ActiveMQConsumer& );
public:
/**
* Constructor
*/
ActiveMQConsumer( ActiveMQSession* session,
const Pointer<commands::ConsumerId>& id,
const Pointer<commands::ActiveMQDestination>& destination,
const std::string& name,
const std::string& selector,
int prefetch,
int maxPendingMessageCount,
bool noLocal,
bool browser,
bool dispatchAsync,
cms::MessageListener* listener );
virtual ~ActiveMQConsumer() throw();
public: // Interface Implementation
virtual void start();
virtual void stop();
virtual void close();
virtual cms::Message* receive();
virtual cms::Message* receive( int millisecs );
virtual cms::Message* receiveNoWait();
virtual void setMessageListener( cms::MessageListener* listener );
virtual cms::MessageListener* getMessageListener() const;
virtual std::string getMessageSelector() const;
virtual void acknowledge( const Pointer<commands::MessageDispatch>& dispatch );
public: // Dispatcher Methods
virtual void dispatch( const Pointer<MessageDispatch>& message );
public: // ActiveMQConsumer Methods
/**
* Method called to acknowledge all messages that have been received so far.
*
* @throw CMSException if an error occurs while ack'ing the message.
*/
void acknowledge();
/**
* Called to Commit the current set of messages in this Transaction
*
* @throw ActiveMQException if an error occurs while performing the operation.
*/
void commit();
/**
* Called to Roll back the current set of messages in this Transaction
*
* @throw ActiveMQException if an error occurs while performing the operation.
*/
void rollback();
/**
* Performs the actual close operation on this consumer
*
* @throw ActiveMQException if an error occurs while performing the operation.
*/
void doClose();
/**
* Cleans up this objects internal resources.
*
* @throw ActiveMQException if an error occurs while performing the operation.
*/
void dispose();
/**
* Get the Consumer information for this consumer
* @return Reference to a Consumer Info Object
*/
const Pointer<commands::ConsumerInfo>& getConsumerInfo() const;
/**
* Get the Consumer Id for this consumer
* @return Reference to a Consumer Id Object
*/
const Pointer<commands::ConsumerId>& getConsumerId() const;
/**
* @returns if this Consumer has been closed.
*/
bool isClosed() const;
/**
* Has this Consumer Transaction Synchronization been added to the transaction
* @return true if the synchronization has been added.
*/
bool isSynchronizationRegistered() const ;
/**
* Sets the Synchronization Registered state of this consumer.
* @param value - true if registered false otherwise.
*/
void setSynchronizationRegistered( bool value );
/**
* Deliver any pending messages to the registered MessageListener if there
* is one, return true if not all dispatched, or false if no listener or all
* pending messages have been dispatched.
*/
bool iterate();
/**
* Forces this consumer to send all pending acks to the broker.
*
* @throw ActiveMQException if an error occurs while performing the operation.
*/
void deliverAcks();
/**
* Called on a Failover to clear any pending messages.
*/
void clearMessagesInProgress();
/**
* Signals that a Failure occurred and that anything in-progress in the
* consumer should be cleared.
*/
void inProgressClearRequired();
/**
* Gets the currently set Last Delivered Sequence Id
*
* @returns long long containing the sequence id of the last delivered Message.
*/
long long getLastDeliveredSequenceId() const;
/**
* Sets the value of the Last Delivered Sequence Id
*
* @param value
* The new value to assign to the Last Delivered Sequence Id property.
*/
void setLastDeliveredSequenceId( long long value );
/**
* @returns the number of Message's this consumer is waiting to Dispatch.
*/
int getMessageAvailableCount() const;
/**
* Sets the RedeliveryPolicy this Consumer should use when a rollback is
* performed on a transacted Consumer. The Consumer takes ownership of the
* passed pointer. The Consumer's redelivery policy can never be null, a
* call to this method with a NULL pointer is ignored.
*
* @param policy
* Pointer to a Redelivery Policy object that his Consumer will use.
*/
void setRedeliveryPolicy( RedeliveryPolicy* policy );
/**
* Gets a pointer to this Consumer's Redelivery Policy object, the Consumer
* retains ownership of this pointer so the caller should not delete it.
*
* @returns a Pointer to a RedeliveryPolicy that is in use by this Consumer.
*/
RedeliveryPolicy* getRedeliveryPolicy() const;
/**
* Sets the Exception that has caused this Consumer to be in a failed state.
*
* @param error
* The error that is to be thrown when a Receive call is made.
*/
void setFailureError( decaf::lang::Exception* error );
/**
* Gets the error that caused this Consumer to be in a Failed state, or NULL if
* there is no Error.
*
* @returns pointer to the error that faulted this Consumer or NULL.
*/
decaf::lang::Exception* getFailureError() const;
protected:
/**
* Used by synchronous receive methods to wait for messages to come in.
* @param timeout - The maximum number of milliseconds to wait before
* returning.
*
* If -1, it will block until a messages is received or this consumer
* is closed.
* If 0, will not block at all. If > 0, will wait at a maximum the
* specified number of milliseconds before returning.
* @return the message, if received within the allotted time.
* Otherwise NULL.
*
* @throws InvalidStateException if this consumer is closed upon
* entering this method.
*/
Pointer<MessageDispatch> dequeue( long long timeout );
/**
* Pre-consume processing
* @param dispatch - the message being consumed.
*/
void beforeMessageIsConsumed(
const Pointer<commands::MessageDispatch>& dispatch );
/**
* Post-consume processing
* @param dispatch - the consumed message
* @param messageExpired - flag indicating if the message has expired.
*/
void afterMessageIsConsumed(
const Pointer<commands::MessageDispatch>& dispatch, bool messageExpired );
private:
// Using options from the Destination URI override any settings that are
// defined for this consumer.
void applyDestinationOptions( const Pointer<commands::ConsumerInfo>& info );
// If supported sends a message pull request to the service provider asking
// for the delivery of a new message. This is used in the case where the
// service provider has been configured with a zero prefetch or is only
// capable of delivering messages on a pull basis. No request is made if
// there are already messages in the unconsumed queue since there's no need
// for a server round-trip in that instance.
void sendPullRequest( long long timeout );
// Checks for the closed state and throws if so.
void checkClosed() const;
// Sends an ack as needed in order to keep them coming in if the current
// ack mode allows the consumer to receive up to the prefetch limit before
// an real ack is sent.
void ackLater( const Pointer<commands::MessageDispatch>& message, int ackType );
// Create an Ack Message that acks all messages that have been delivered so far.
Pointer<commands::MessageAck> makeAckForAllDeliveredMessages( int type );
// Should Acks be sent on each dispatched message
bool isAutoAcknowledgeEach() const;
// Can Acks be batched for less network overhead.
bool isAutoAcknowledgeBatch() const;
};
}}
#endif /*_ACTIVEMQ_CORE_ACTIVEMQCONSUMER_H_*/