blob: 920cdfd86266fe427a4ed34a4a908fdd4c546c94 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ActiveMQConsumer.h"
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/exceptions/InvalidStateException.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
#include <decaf/lang/Math.h>
#include <decaf/lang/System.h>
#include <decaf/lang/Boolean.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
#include <activemq/util/Config.h>
#include <activemq/util/CMSExceptionSupport.h>
#include <activemq/util/ActiveMQProperties.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/commands/Message.h>
#include <activemq/commands/MessageAck.h>
#include <activemq/commands/MessagePull.h>
#include <activemq/commands/RemoveInfo.h>
#include <activemq/commands/TransactionInfo.h>
#include <activemq/commands/TransactionId.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/ActiveMQTransactionContext.h>
#include <activemq/core/ActiveMQAckHandler.h>
#include <activemq/core/FifoMessageDispatchChannel.h>
#include <activemq/core/SimplePriorityMessageDispatchChannel.h>
#include <activemq/core/RedeliveryPolicy.h>
#include <activemq/threads/Scheduler.h>
#include <cms/ExceptionListener.h>
#include <memory>
using namespace std;
using namespace activemq;
using namespace activemq::util;
using namespace activemq::core;
using namespace activemq::commands;
using namespace activemq::exceptions;
using namespace activemq::threads;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
using namespace decaf::util;
using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
namespace activemq{
namespace core {
class ActiveMQConsumerMembers {
private:
ActiveMQConsumerMembers( const ActiveMQConsumerMembers& );
ActiveMQConsumerMembers& operator= ( const ActiveMQConsumerMembers& );
public:
cms::MessageListener* listener;
decaf::util::concurrent::Mutex listenerMutex;
AtomicBoolean deliveringAcks;
AtomicBoolean started;
Pointer<MessageDispatchChannel> unconsumedMessages;
decaf::util::LinkedList< decaf::lang::Pointer<commands::MessageDispatch> > dispatchedMessages;
long long lastDeliveredSequenceId;
Pointer<commands::MessageAck> pendingAck;
int deliveredCounter;
int additionalWindowSize;
volatile bool synchronizationRegistered;
bool clearDispatchList;
bool inProgressClearRequiredFlag;
long long redeliveryDelay;
Pointer<RedeliveryPolicy> redeliveryPolicy;
Pointer<Exception> failureError;
Pointer<Scheduler> scheduler;
ActiveMQConsumerMembers() : listener(NULL),
listenerMutex(),
deliveringAcks(),
started(),
unconsumedMessages(),
dispatchedMessages(),
lastDeliveredSequenceId(0),
pendingAck(),
deliveredCounter(0),
additionalWindowSize(0),
synchronizationRegistered(false),
clearDispatchList(false),
inProgressClearRequiredFlag(false),
redeliveryDelay(0),
redeliveryPolicy(),
failureError(),
scheduler() {
}
};
/**
* Class used to deal with consumers in an active transaction. This
* class calls back into the consumer when the transaction is Committed or
* Rolled Back to process that event.
*/
class TransactionSynhcronization : public Synchronization {
private:
ActiveMQConsumer* consumer;
private:
TransactionSynhcronization( const TransactionSynhcronization& );
TransactionSynhcronization& operator= ( const TransactionSynhcronization& );
public:
TransactionSynhcronization( ActiveMQConsumer* consumer ) : consumer(consumer) {
if( consumer == NULL ) {
throw NullPointerException(
__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
}
}
virtual ~TransactionSynhcronization() {}
virtual void beforeEnd() {
consumer->acknowledge();
consumer->setSynchronizationRegistered( false );
}
virtual void afterCommit() {
consumer->commit();
consumer->setSynchronizationRegistered( false );
}
virtual void afterRollback() {
consumer->rollback();
consumer->setSynchronizationRegistered( false );
}
};
/**
* Class used to Hook a consumer that has been closed into the Transaction
* it is currently a part of. Once the Transaction has been Committed or
* Rolled back this Synchronization can finish the Close of the consumer.
*/
class CloseSynhcronization : public Synchronization {
private:
ActiveMQConsumer* consumer;
private:
CloseSynhcronization( const CloseSynhcronization& );
CloseSynhcronization& operator= ( const CloseSynhcronization& );
public:
CloseSynhcronization( ActiveMQConsumer* consumer ) : consumer(consumer) {
if( consumer == NULL ) {
throw NullPointerException(
__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
}
}
virtual ~CloseSynhcronization() {}
virtual void beforeEnd() {
}
virtual void afterCommit() {
consumer->doClose();
}
virtual void afterRollback() {
consumer->doClose();
}
};
/**
* ActiveMQAckHandler used to support Client Acknowledge mode.
*/
class ClientAckHandler : public ActiveMQAckHandler {
private:
ActiveMQSession* session;
private:
ClientAckHandler( const ClientAckHandler& );
ClientAckHandler& operator= ( const ClientAckHandler& );
public:
ClientAckHandler( ActiveMQSession* session ) : session(session) {
if( session == NULL ) {
throw NullPointerException(
__FILE__, __LINE__, "Ack Handler Created with NULL Session.");
}
}
void acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED ) {
try {
this->session->acknowledge();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
};
/**
* ActiveMQAckHandler used to enable the Individual Acknowledge mode.
*/
class IndividualAckHandler : public ActiveMQAckHandler {
private:
ActiveMQConsumer* consumer;
Pointer<commands::MessageDispatch> dispatch;
private:
IndividualAckHandler( const IndividualAckHandler& );
IndividualAckHandler& operator= ( const IndividualAckHandler& );
public:
IndividualAckHandler( ActiveMQConsumer* consumer, const Pointer<MessageDispatch>& dispatch ) :
consumer(consumer), dispatch(dispatch) {
if( consumer == NULL ) {
throw NullPointerException(
__FILE__, __LINE__, "Ack Handler Created with NULL consumer.");
}
}
void acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED ) {
try {
if( this->dispatch != NULL ) {
this->consumer->acknowledge( this->dispatch );
this->dispatch.reset( NULL );
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
};
/**
* Class used to Start a Consumer's dispatch queue asynchronously from the
* configured Scheduler.
*/
class StartConsumerTask : public Runnable {
private:
ActiveMQConsumer* consumer;
private:
StartConsumerTask( const StartConsumerTask& );
StartConsumerTask& operator= ( const StartConsumerTask& );
public:
StartConsumerTask( ActiveMQConsumer* consumer ) : Runnable(), consumer(NULL) {
if( consumer == NULL ) {
throw NullPointerException(
__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
}
this->consumer = consumer;
}
virtual ~StartConsumerTask() {}
virtual void run() {
try{
if(!this->consumer->isClosed()) {
this->consumer->start();
}
} catch(cms::CMSException& ex) {
// TODO - Need Connection onAsyncException method.
}
}
};
}}
////////////////////////////////////////////////////////////////////////////////
ActiveMQConsumer::ActiveMQConsumer( ActiveMQSession* session,
const Pointer<ConsumerId>& id,
const Pointer<ActiveMQDestination>& destination,
const std::string& name,
const std::string& selector,
int prefetch,
int maxPendingMessageCount,
bool noLocal,
bool browser,
bool dispatchAsync,
cms::MessageListener* listener ) : internal(NULL), session(NULL), consumerInfo() {
if( session == NULL ) {
throw ActiveMQException(
__FILE__, __LINE__,
"ActiveMQConsumer::ActiveMQConsumer - Init with NULL Session" );
}
if( destination == NULL ) {
throw ActiveMQException(
__FILE__, __LINE__,
"ActiveMQConsumer::ActiveMQConsumer - Init with NULL Destination" );
}
if( destination->getPhysicalName() == "" ) {
throw ActiveMQException(
__FILE__, __LINE__,
"ActiveMQConsumer::ActiveMQConsumer - Destination given has no Physical Name." );
}
this->internal = new ActiveMQConsumerMembers();
Pointer<ConsumerInfo> consumerInfo( new ConsumerInfo() );
consumerInfo->setConsumerId( id );
consumerInfo->setDestination( destination );
consumerInfo->setSubscriptionName( name );
consumerInfo->setSelector( selector );
consumerInfo->setPrefetchSize( prefetch );
consumerInfo->setMaximumPendingMessageLimit( maxPendingMessageCount );
consumerInfo->setBrowser( browser );
consumerInfo->setDispatchAsync( dispatchAsync );
consumerInfo->setNoLocal( noLocal );
// Initialize Consumer Data
this->session = session;
this->consumerInfo = consumerInfo;
this->internal->lastDeliveredSequenceId = -1;
this->internal->synchronizationRegistered = false;
this->internal->additionalWindowSize = 0;
this->internal->deliveredCounter = 0;
this->internal->clearDispatchList = false;
this->internal->inProgressClearRequiredFlag = false;
this->internal->listener = NULL;
this->internal->redeliveryDelay = 0;
this->internal->redeliveryPolicy.reset( this->session->getConnection()->getRedeliveryPolicy()->clone() );
this->internal->scheduler = this->session->getScheduler();
if( this->session->getConnection()->isMessagePrioritySupported() ) {
this->internal->unconsumedMessages.reset( new SimplePriorityMessageDispatchChannel() );
} else {
this->internal->unconsumedMessages.reset( new FifoMessageDispatchChannel() );
}
if( listener != NULL ) {
this->setMessageListener( listener );
}
applyDestinationOptions(this->consumerInfo);
if (this->consumerInfo->getPrefetchSize() < 0) {
throw IllegalArgumentException(__FILE__, __LINE__, "Cannot create a consumer with a negative prefetch");
}
}
////////////////////////////////////////////////////////////////////////////////
ActiveMQConsumer::~ActiveMQConsumer() throw() {
try {
try{
this->close();
} catch(...) {}
delete this->internal;
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW( )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::start() {
if( this->internal->unconsumedMessages->isClosed() ) {
return;
}
this->internal->started.set( true );
this->internal->unconsumedMessages->start();
this->session->wakeup();
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::stop() {
this->internal->started.set( false );
this->internal->unconsumedMessages->stop();
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumer::isClosed() const {
return this->internal->unconsumedMessages->isClosed();
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::close() {
try{
if( !this->isClosed() ) {
if( this->session->getTransactionContext() != NULL &&
this->session->getTransactionContext()->isInTransaction() ) {
// TODO - Currently we can do this since the consumer could be
// deleted right after the close call so it won't stick around
// long enough to clean up the transaction data. For now we
// just have to close badly.
//
//Pointer<Synchronization> sync( new CloseSynhcronization( this ) );
//this->transaction->addSynchronization( sync );
doClose();
throw UnsupportedOperationException(
__FILE__, __LINE__,
"The Consumer is still in an Active Transaction, commit it first." );
} else {
doClose();
}
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::doClose() {
try {
dispose();
// Remove at the Broker Side, consumer has been removed from the local
// Session and Connection objects so if the remote call to remove throws
// it is okay to propagate to the client.
Pointer<RemoveInfo> info( new RemoveInfo );
info->setObjectId( this->consumerInfo->getConsumerId() );
info->setLastDeliveredSequenceId( this->internal->lastDeliveredSequenceId );
this->session->oneway( info );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::dispose() {
try{
if( !this->isClosed() ) {
if( !session->isTransacted() ) {
deliverAcks();
}
this->internal->started.set( false );
// Identifies any errors encountered during shutdown.
bool haveException = false;
ActiveMQException error;
// Purge all the pending messages
try{
this->internal->unconsumedMessages->clear();
} catch ( ActiveMQException& ex ){
if( !haveException ){
ex.setMark( __FILE__, __LINE__ );
error = ex;
haveException = true;
}
}
// Stop and Wakeup all sync consumers.
this->internal->unconsumedMessages->close();
if( this->session->isIndividualAcknowledge() ) {
// For IndividualAck Mode we need to unlink the ack handler to remove a
// cyclic reference to the MessageDispatch that brought the message to us.
synchronized( &internal->dispatchedMessages ) {
std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( this->internal->dispatchedMessages.iterator() );
while( iter->hasNext() ) {
iter->next()->getMessage()->setAckHandler( Pointer<ActiveMQAckHandler>() );
}
this->internal->dispatchedMessages.clear();
}
}
// Remove this Consumer from the Connections set of Dispatchers
this->session->removeConsumer( this->consumerInfo->getConsumerId() );
// If we encountered an error, propagate it.
if( haveException ){
error.setMark( __FILE__, __LINE__ );
throw error;
}
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
std::string ActiveMQConsumer::getMessageSelector() const {
try {
// Fetch the Selector
return this->consumerInfo->getSelector();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
decaf::lang::Pointer<MessageDispatch> ActiveMQConsumer::dequeue( long long timeout ) {
try {
this->checkClosed();
// Calculate the deadline
long long deadline = 0;
if( timeout > 0 ) {
deadline = System::currentTimeMillis() + timeout;
}
// Loop until the time is up or we get a non-expired message
while( true ) {
Pointer<MessageDispatch> dispatch = this->internal->unconsumedMessages->dequeue( timeout );
if( dispatch == NULL ) {
if( timeout > 0 && !this->internal->unconsumedMessages->isClosed() ) {
timeout = Math::max( deadline - System::currentTimeMillis(), 0LL );
} else {
if( this->internal->failureError != NULL ) {
throw CMSExceptionSupport::create(*this->internal->failureError);
} else {
return Pointer<MessageDispatch>();
}
}
} else if( dispatch->getMessage() == NULL ) {
return Pointer<MessageDispatch>();
} else if( dispatch->getMessage()->isExpired() ) {
beforeMessageIsConsumed( dispatch );
afterMessageIsConsumed( dispatch, true );
if( timeout > 0 ) {
timeout = Math::max( deadline - System::currentTimeMillis(), 0LL );
}
continue;
}
// Return the message.
return dispatch;
}
return Pointer<MessageDispatch>();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* ActiveMQConsumer::receive() {
try{
this->checkClosed();
// Send a request for a new message if needed
this->sendPullRequest( 0 );
// Wait for the next message.
Pointer<MessageDispatch> message = dequeue( -1 );
if( message == NULL ) {
return NULL;
}
// Message pre-processing
beforeMessageIsConsumed( message );
// Need to clone the message because the user is responsible for freeing
// its copy of the message.
cms::Message* clonedMessage =
dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
// Post processing (may result in the message being deleted)
afterMessageIsConsumed( message, false );
// Return the cloned message.
return clonedMessage;
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* ActiveMQConsumer::receive( int millisecs ) {
try {
this->checkClosed();
// Send a request for a new message if needed
this->sendPullRequest( millisecs );
// Wait for the next message.
Pointer<MessageDispatch> message = dequeue( millisecs );
if( message == NULL ) {
return NULL;
}
// Message preprocessing
beforeMessageIsConsumed( message );
// Need to clone the message because the user is responsible for freeing
// its copy of the message.
cms::Message* clonedMessage =
dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
// Post processing (may result in the message being deleted)
afterMessageIsConsumed( message, false );
// Return the cloned message.
return clonedMessage;
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* ActiveMQConsumer::receiveNoWait() {
try {
this->checkClosed();
// Send a request for a new message if needed
this->sendPullRequest( -1 );
// Get the next available message, if there is one.
Pointer<MessageDispatch> message = dequeue( 0 );
if( message == NULL ) {
return NULL;
}
// Message preprocessing
beforeMessageIsConsumed( message );
// Need to clone the message because the user is responsible for freeing
// its copy of the message.
cms::Message* clonedMessage =
dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
// Post processing (may result in the message being deleted)
afterMessageIsConsumed( message, false );
// Return the cloned message.
return clonedMessage;
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::setMessageListener( cms::MessageListener* listener ) {
try{
this->checkClosed();
if( this->consumerInfo->getPrefetchSize() == 0 && listener != NULL ) {
throw ActiveMQException(
__FILE__, __LINE__,
"Cannot deliver async when Prefetch is Zero, set Prefecth to at least One.");
}
if( listener != NULL ) {
// Now that we have a valid message listener,
// redispatch all the messages that it missed.
bool wasStarted = session->isStarted();
if( wasStarted ) {
session->stop();
}
synchronized( &(this->internal->listenerMutex) ) {
this->internal->listener = listener;
}
this->session->redispatch( *(this->internal->unconsumedMessages) );
if( wasStarted ) {
this->session->start();
}
} else {
synchronized( &(this->internal->listenerMutex) ) {
this->internal->listener = NULL;
}
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::beforeMessageIsConsumed( const Pointer<MessageDispatch>& dispatch ) {
// If the Session is in ClientAcknowledge or IndividualAcknowledge mode, then
// we set the handler in the message to this object and send it out.
if( session->isClientAcknowledge() ) {
Pointer<ActiveMQAckHandler> ackHandler( new ClientAckHandler( this->session ) );
dispatch->getMessage()->setAckHandler( ackHandler );
} else if( session->isIndividualAcknowledge() ) {
Pointer<ActiveMQAckHandler> ackHandler( new IndividualAckHandler( this, dispatch ) );
dispatch->getMessage()->setAckHandler( ackHandler );
}
this->internal->lastDeliveredSequenceId =
dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
if( !isAutoAcknowledgeBatch() ) {
// When not in an Auto
synchronized( &this->internal->dispatchedMessages ) {
this->internal->dispatchedMessages.addFirst( dispatch );
}
if( this->session->isTransacted() ) {
ackLater( dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED );
}
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::afterMessageIsConsumed( const Pointer<MessageDispatch>& message,
bool messageExpired ) {
try{
if( this->internal->unconsumedMessages->isClosed() ) {
return;
}
if( messageExpired == true ) {
synchronized(&this->internal->dispatchedMessages) {
this->internal->dispatchedMessages.remove(message);
}
ackLater(message, ActiveMQConstants::ACK_TYPE_DELIVERED);
return;
}
if( session->isTransacted() ) {
return;
} else if( isAutoAcknowledgeEach() ) {
if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
synchronized( &this->internal->dispatchedMessages ) {
if( !this->internal->dispatchedMessages.isEmpty() ) {
Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(
ActiveMQConstants::ACK_TYPE_CONSUMED );
if( ack != NULL ) {
this->internal->dispatchedMessages.clear();
session->oneway( ack );
}
}
}
this->internal->deliveringAcks.set( false );
}
} else if( isAutoAcknowledgeBatch() ) {
ackLater( message, ActiveMQConstants::ACK_TYPE_CONSUMED );
} else if( session->isClientAcknowledge() || session->isIndividualAcknowledge() ) {
bool messageUnackedByConsumer = false;
synchronized( &this->internal->dispatchedMessages ) {
std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( this->internal->dispatchedMessages.iterator() );
while( iter->hasNext() ) {
if( iter->next() == message ) {
messageUnackedByConsumer = true;
break;
}
}
}
if( messageUnackedByConsumer ) {
this->ackLater( message, ActiveMQConstants::ACK_TYPE_DELIVERED );
}
} else {
throw IllegalStateException( __FILE__, __LINE__, "Invalid Session State" );
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::deliverAcks() {
try{
Pointer<MessageAck> ack;
if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
if( isAutoAcknowledgeEach() ) {
synchronized( &this->internal->dispatchedMessages ) {
ack = makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED );
if( ack != NULL ) {
this->internal->dispatchedMessages.clear();
} else {
ack.swap( internal->pendingAck );
}
}
} else if( this->internal->pendingAck != NULL &&
this->internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED ) {
ack.swap( this->internal->pendingAck );
}
if( ack != NULL ) {
try{
this->session->oneway( ack );
} catch(...) {}
} else {
this->internal->deliveringAcks.set( false );
}
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::ackLater( const Pointer<MessageDispatch>& dispatch, int ackType ) {
// Don't acknowledge now, but we may need to let the broker know the
// consumer got the message to expand the pre-fetch window
if( session->isTransacted() ) {
session->doStartTransaction();
if( !this->internal->synchronizationRegistered ) {
this->internal->synchronizationRegistered = true;
Pointer<Synchronization> sync( new TransactionSynhcronization( this ) );
this->session->getTransactionContext()->addSynchronization( sync );
}
}
// The delivered message list is only needed for the recover method
// which is only used with client ack.
this->internal->deliveredCounter++;
Pointer<MessageAck> oldPendingAck = this->internal->pendingAck;
this->internal->pendingAck.reset( new MessageAck() );
this->internal->pendingAck->setConsumerId( dispatch->getConsumerId() );
this->internal->pendingAck->setAckType( (unsigned char)ackType );
this->internal->pendingAck->setDestination( dispatch->getDestination() );
this->internal->pendingAck->setLastMessageId( dispatch->getMessage()->getMessageId() );
this->internal->pendingAck->setMessageCount( internal->deliveredCounter );
if( oldPendingAck == NULL ) {
this->internal->pendingAck->setFirstMessageId( this->internal->pendingAck->getLastMessageId() );
} else if ( oldPendingAck->getAckType() == this->internal->pendingAck->getAckType() ) {
this->internal->pendingAck->setFirstMessageId( oldPendingAck->getFirstMessageId() );
} else {
// old pending ack being superseded by ack of another type, if is is not a delivered
// ack and hence important, send it now so it is not lost.
if( oldPendingAck->getAckType() != ActiveMQConstants::ACK_TYPE_DELIVERED ) {
session->oneway( oldPendingAck );
}
}
if( session->isTransacted() ) {
this->internal->pendingAck->setTransactionId( this->session->getTransactionContext()->getTransactionId() );
}
if( ( 0.5 * this->consumerInfo->getPrefetchSize() ) <= ( internal->deliveredCounter - internal->additionalWindowSize ) ) {
session->oneway( this->internal->pendingAck );
this->internal->pendingAck.reset( NULL );
this->internal->deliveredCounter = 0;
this->internal->additionalWindowSize = 0;
}
}
////////////////////////////////////////////////////////////////////////////////
Pointer<MessageAck> ActiveMQConsumer::makeAckForAllDeliveredMessages( int type ) {
synchronized( &this->internal->dispatchedMessages ) {
if( !this->internal->dispatchedMessages.isEmpty() ) {
Pointer<MessageDispatch> dispatched = this->internal->dispatchedMessages.getFirst();
Pointer<MessageAck> ack( new MessageAck() );
ack->setAckType( (unsigned char)type );
ack->setConsumerId( dispatched->getConsumerId() );
ack->setDestination( dispatched->getDestination() );
ack->setMessageCount( (int)this->internal->dispatchedMessages.size() );
ack->setLastMessageId( dispatched->getMessage()->getMessageId() );
ack->setFirstMessageId( this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId() );
return ack;
}
}
return Pointer<MessageAck>();
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::acknowledge( const Pointer<commands::MessageDispatch>& dispatch ) {
try{
this->checkClosed();
if( this->session->isIndividualAcknowledge() ) {
Pointer<MessageAck> ack( new MessageAck() );
ack->setAckType( ActiveMQConstants::ACK_TYPE_CONSUMED );
ack->setConsumerId( this->consumerInfo->getConsumerId() );
ack->setDestination( this->consumerInfo->getDestination() );
ack->setMessageCount( 1 );
ack->setLastMessageId( dispatch->getMessage()->getMessageId() );
ack->setFirstMessageId( dispatch->getMessage()->getMessageId() );
session->oneway( ack );
synchronized( &this->internal->dispatchedMessages ) {
std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( this->internal->dispatchedMessages.iterator() );
while( iter->hasNext() ) {
if( iter->next() == dispatch ) {
iter->remove();
break;
}
}
}
} else {
throw IllegalStateException(
__FILE__, __LINE__,
"Session is not in IndividualAcknowledge mode." );
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::acknowledge() {
try{
synchronized( &this->internal->dispatchedMessages ) {
// Acknowledge all messages so far.
Pointer<MessageAck> ack =
makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED );
if( ack == NULL ) {
return;
}
if( session->isTransacted() ) {
session->doStartTransaction();
ack->setTransactionId( session->getTransactionContext()->getTransactionId() );
}
session->oneway( ack );
this->internal->pendingAck.reset( NULL );
// Adjust the counters
this->internal->deliveredCounter =
Math::max( 0, this->internal->deliveredCounter - (int)this->internal->dispatchedMessages.size());
this->internal->additionalWindowSize =
Math::max(0, this->internal->additionalWindowSize - (int)this->internal->dispatchedMessages.size());
if( !session->isTransacted() ) {
this->internal->dispatchedMessages.clear();
}
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::commit() {
synchronized( &(this->internal->dispatchedMessages) ) {
this->internal->dispatchedMessages.clear();
}
this->internal->redeliveryDelay = 0;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::rollback() {
synchronized( this->internal->unconsumedMessages.get() ) {
synchronized( &this->internal->dispatchedMessages ) {
if( this->internal->dispatchedMessages.isEmpty() ) {
return;
}
// Only increase the redelivery delay after the first redelivery..
Pointer<MessageDispatch> lastMsg = this->internal->dispatchedMessages.getFirst();
const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
if( currentRedeliveryCount > 0 ) {
this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay( internal->redeliveryDelay );
} else {
this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
}
Pointer<MessageId> firstMsgId =
this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId();
std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( internal->dispatchedMessages.iterator() );
while( iter->hasNext() ) {
Pointer<Message> message = iter->next()->getMessage();
message->setRedeliveryCounter( message->getRedeliveryCounter() + 1 );
}
if( this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES &&
lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries() ) {
// We need to NACK the messages so that they get sent to the DLQ.
// Acknowledge the last message.
Pointer<MessageAck> ack( new MessageAck() );
ack->setAckType( ActiveMQConstants::ACK_TYPE_POISON );
ack->setConsumerId( this->consumerInfo->getConsumerId() );
ack->setDestination( lastMsg->getDestination() );
ack->setMessageCount( (int)this->internal->dispatchedMessages.size() );
ack->setLastMessageId( lastMsg->getMessage()->getMessageId() );
ack->setFirstMessageId( firstMsgId );
session->oneway( ack );
// Adjust the window size.
this->internal->additionalWindowSize =
Math::max( 0, this->internal->additionalWindowSize - (int)this->internal->dispatchedMessages.size() );
this->internal->redeliveryDelay = 0;
} else {
// only redelivery_ack after first delivery
if( currentRedeliveryCount > 0 ) {
Pointer<MessageAck> ack( new MessageAck() );
ack->setAckType( ActiveMQConstants::ACK_TYPE_REDELIVERED );
ack->setConsumerId( this->consumerInfo->getConsumerId() );
ack->setDestination( lastMsg->getDestination() );
ack->setMessageCount( (int)this->internal->dispatchedMessages.size() );
ack->setLastMessageId( lastMsg->getMessage()->getMessageId() );
ack->setFirstMessageId( firstMsgId );
session->oneway( ack );
}
// stop the delivery of messages.
this->internal->unconsumedMessages->stop();
std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( this->internal->dispatchedMessages.iterator() );
while( iter->hasNext() ) {
this->internal->unconsumedMessages->enqueueFirst( iter->next() );
}
if( internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed() ) {
// TODO - Can't do this until we can control object lifetime.
// Start up the delivery again a little later.
// this->internal->scheduler->executeAfterDelay(
// new StartConsumerTask(this), internal->redeliveryDelay);
start();
} else {
start();
}
}
this->internal->deliveredCounter -= (int)internal->dispatchedMessages.size();
this->internal->dispatchedMessages.clear();
}
}
if( this->internal->listener != NULL ) {
session->redispatch( *this->internal->unconsumedMessages );
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::dispatch( const Pointer<MessageDispatch>& dispatch ) {
try {
synchronized( this->internal->unconsumedMessages.get() ) {
clearMessagesInProgress();
if( this->internal->clearDispatchList ) {
// we are reconnecting so lets flush the in progress
// messages
this->internal->clearDispatchList = false;
this->internal->unconsumedMessages->clear();
}
if( !this->internal->unconsumedMessages->isClosed() ) {
// Don't dispatch expired messages, ack it and then destroy it
if( dispatch->getMessage() != NULL && dispatch->getMessage()->isExpired() ) {
this->ackLater( dispatch, ActiveMQConstants::ACK_TYPE_CONSUMED );
// stop now, don't queue
return;
}
synchronized( &this->internal->listenerMutex ) {
// If we have a listener, send the message.
if( this->internal->listener != NULL && internal->unconsumedMessages->isRunning() ) {
// Preprocessing.
beforeMessageIsConsumed( dispatch );
// Notify the listener
this->internal->listener->onMessage(
dynamic_cast<cms::Message*>( dispatch->getMessage().get() ) );
// Postprocessing
afterMessageIsConsumed( dispatch, false );
} else {
// No listener, add it to the unconsumed messages list
this->internal->unconsumedMessages->enqueue( dispatch );
}
}
}
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::sendPullRequest( long long timeout ) {
try {
this->checkClosed();
// There are still local message, consume them first.
if( !this->internal->unconsumedMessages->isEmpty() ) {
return;
}
if( this->consumerInfo->getPrefetchSize() == 0 ) {
Pointer<MessagePull> messagePull( new MessagePull() );
messagePull->setConsumerId( this->consumerInfo->getConsumerId() );
messagePull->setDestination( this->consumerInfo->getDestination() );
messagePull->setTimeout( timeout );
this->session->oneway( messagePull );
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::checkClosed() const {
if( this->isClosed() ) {
throw ActiveMQException(
__FILE__, __LINE__,
"ActiveMQConsumer - Consumer Already Closed" );
}
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumer::iterate() {
synchronized( &this->internal->listenerMutex ) {
if( this->internal->listener != NULL ) {
Pointer<MessageDispatch> dispatch = internal->unconsumedMessages->dequeueNoWait();
if( dispatch != NULL ) {
try {
beforeMessageIsConsumed( dispatch );
this->internal->listener->onMessage(
dynamic_cast<cms::Message*>( dispatch->getMessage().get() ) );
afterMessageIsConsumed( dispatch, false );
} catch( ActiveMQException& ex ) {
this->session->fire( ex );
}
return true;
}
}
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::inProgressClearRequired() {
this->internal->inProgressClearRequiredFlag = true;
// Clears dispatched messages async to avoid lock contention with inprogress acks.
this->internal->clearDispatchList = true;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::clearMessagesInProgress() {
if( this->internal->inProgressClearRequiredFlag ) {
synchronized( this->internal->unconsumedMessages.get() ) {
if( this->internal->inProgressClearRequiredFlag ) {
// TODO - Rollback duplicates.
// allow dispatch on this connection to resume
this->session->getConnection()->setTransportInterruptionProcessingComplete();
this->internal->inProgressClearRequiredFlag = false;
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumer::isAutoAcknowledgeEach() const {
return this->session->isAutoAcknowledge() ||
( this->session->isDupsOkAcknowledge() && this->consumerInfo->getDestination()->isQueue() );
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumer::isAutoAcknowledgeBatch() const {
return this->session->isDupsOkAcknowledge() && !this->consumerInfo->getDestination()->isQueue();
}
////////////////////////////////////////////////////////////////////////////////
int ActiveMQConsumer::getMessageAvailableCount() const {
return this->internal->unconsumedMessages->size();
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::applyDestinationOptions( const Pointer<ConsumerInfo>& info ) {
decaf::lang::Pointer<commands::ActiveMQDestination> amqDestination = info->getDestination();
// Get any options specified in the destination and apply them to the
// ConsumerInfo object.
const ActiveMQProperties& options = amqDestination->getOptions();
std::string noLocalStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_NOLOCAL);
if (options.hasProperty(noLocalStr)) {
info->setNoLocal(Boolean::parseBoolean(options.getProperty(noLocalStr)));
}
std::string selectorStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_SELECTOR);
if (options.hasProperty(selectorStr)) {
info->setSelector(options.getProperty(selectorStr));
}
std::string priorityStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_PRIORITY);
if (options.hasProperty(priorityStr)) {
info->setPriority((unsigned char) Integer::parseInt(options.getProperty(priorityStr)));
}
std::string dispatchAsyncStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_DISPATCHASYNC);
if (options.hasProperty(dispatchAsyncStr)) {
info->setDispatchAsync(Boolean::parseBoolean(options.getProperty(dispatchAsyncStr)));
}
std::string exclusiveStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_EXCLUSIVE);
if (options.hasProperty(exclusiveStr)) {
info->setExclusive(Boolean::parseBoolean(options.getProperty(exclusiveStr)));
}
std::string maxPendingMsgLimitStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT);
if (options.hasProperty(maxPendingMsgLimitStr)) {
info->setMaximumPendingMessageLimit(Integer::parseInt(options.getProperty(maxPendingMsgLimitStr)));
}
std::string prefetchSizeStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_PREFECTCHSIZE);
if (options.hasProperty(prefetchSizeStr)) {
info->setPrefetchSize(Integer::parseInt(options.getProperty(prefetchSizeStr, "1000")));
}
std::string retroactiveStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_RETROACTIVE);
if (options.hasProperty(retroactiveStr)) {
info->setRetroactive(Boolean::parseBoolean(options.getProperty(retroactiveStr)));
}
std::string networkSubscriptionStr = "consumer.networkSubscription";
if (options.hasProperty(networkSubscriptionStr)) {
info->setNetworkSubscription(Boolean::parseBoolean(options.getProperty(networkSubscriptionStr)));
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::setRedeliveryPolicy( RedeliveryPolicy* policy ) {
if( policy != NULL ) {
this->internal->redeliveryPolicy.reset( policy );
}
}
////////////////////////////////////////////////////////////////////////////////
RedeliveryPolicy* ActiveMQConsumer::getRedeliveryPolicy() const {
return this->internal->redeliveryPolicy.get();
}
////////////////////////////////////////////////////////////////////////////////
cms::MessageListener* ActiveMQConsumer::getMessageListener() const {
return this->internal->listener;
}
////////////////////////////////////////////////////////////////////////////////
const Pointer<commands::ConsumerInfo>& ActiveMQConsumer::getConsumerInfo() const {
this->checkClosed();
return this->consumerInfo;
}
////////////////////////////////////////////////////////////////////////////////
const Pointer<commands::ConsumerId>& ActiveMQConsumer::getConsumerId() const {
this->checkClosed();
return this->consumerInfo->getConsumerId();
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumer::isSynchronizationRegistered() const {
return this->internal->synchronizationRegistered;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::setSynchronizationRegistered( bool value ) {
this->internal->synchronizationRegistered = value;
}
////////////////////////////////////////////////////////////////////////////////
long long ActiveMQConsumer::getLastDeliveredSequenceId() const {
return this->internal->lastDeliveredSequenceId;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::setLastDeliveredSequenceId( long long value ) {
this->internal->lastDeliveredSequenceId = value;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::setFailureError( decaf::lang::Exception* error ) {
if( error != NULL ) {
this->internal->failureError.reset( error->clone() );
}
}
////////////////////////////////////////////////////////////////////////////////
decaf::lang::Exception* ActiveMQConsumer::getFailureError() const {
if( this->internal->failureError == NULL ) {
return NULL;
}
return this->internal->failureError.get();
}