blob: c54cac079cfd303a45226f723e9ad5df0c0c68b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ActiveMQConsumerKernel.h"
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/exceptions/InvalidStateException.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
#include <decaf/lang/Math.h>
#include <decaf/lang/System.h>
#include <decaf/lang/Boolean.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
#include <decaf/util/HashMap.h>
#include <decaf/util/Collections.h>
#include <decaf/util/concurrent/ExecutorService.h>
#include <decaf/util/concurrent/Executors.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
#include <activemq/util/Config.h>
#include <activemq/util/CMSExceptionSupport.h>
#include <activemq/util/ActiveMQProperties.h>
#include <activemq/util/ActiveMQMessageTransformation.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/commands/Message.h>
#include <activemq/commands/MessageAck.h>
#include <activemq/commands/MessagePull.h>
#include <activemq/commands/RemoveInfo.h>
#include <activemq/commands/TransactionInfo.h>
#include <activemq/commands/TransactionId.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/core/ActiveMQTransactionContext.h>
#include <activemq/core/ActiveMQAckHandler.h>
#include <activemq/core/FifoMessageDispatchChannel.h>
#include <activemq/core/SimplePriorityMessageDispatchChannel.h>
#include <activemq/core/RedeliveryPolicy.h>
#include <activemq/core/kernels/ActiveMQSessionKernel.h>
#include <activemq/threads/Scheduler.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageTransformer.h>
#include <memory>
using namespace std;
using namespace activemq;
using namespace activemq::util;
using namespace activemq::core;
using namespace activemq::core::kernels;
using namespace activemq::commands;
using namespace activemq::exceptions;
using namespace activemq::threads;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::util::concurrent::atomic;
////////////////////////////////////////////////////////////////////////////////
namespace activemq {
namespace core {
namespace kernels {
class PreviouslyDeliveredMap : public HashMap<Pointer<MessageId>, bool> {
public:
Pointer<TransactionId> transactionId;
PreviouslyDeliveredMap(Pointer<TransactionId> transactionId) :
transactionId(transactionId) {
}
virtual ~PreviouslyDeliveredMap() {}
};
class ActiveMQConsumerKernelConfig {
private:
ActiveMQConsumerKernelConfig(const ActiveMQConsumerKernelConfig&);
ActiveMQConsumerKernelConfig& operator=(const ActiveMQConsumerKernelConfig&);
public:
cms::MessageListener* listener;
cms::MessageAvailableListener* messageAvailableListener;
cms::MessageTransformer* transformer;
decaf::util::concurrent::Mutex listenerMutex;
AtomicBoolean deliveringAcks;
AtomicBoolean started;
AtomicBoolean closeSyncRegistered;
Pointer<MessageDispatchChannel> unconsumedMessages;
decaf::util::LinkedList< decaf::lang::Pointer<commands::MessageDispatch> > deliveredMessages;
long long lastDeliveredSequenceId;
Pointer<commands::MessageAck> pendingAck;
int deliveredCounter;
int additionalWindowSize;
volatile bool synchronizationRegistered;
volatile bool isClearDeliveredList;
AtomicInteger inProgressClearRequiredFlag;
long long redeliveryDelay;
Pointer<RedeliveryPolicy> redeliveryPolicy;
Pointer<Exception> failureError;
Pointer<Scheduler> scheduler;
int hashCode;
Pointer<PreviouslyDeliveredMap> previouslyDeliveredMessages;
long long failoverRedeliveryWaitPeriod;
bool transactedIndividualAck;
bool nonBlockingRedelivery;
bool consumerExpiryCheckEnabled;
bool optimizeAcknowledge;
long long optimizeAckTimestamp;
long long optimizeAcknowledgeTimeOut;
long long optimizedAckScheduledAckInterval;
Runnable* optimizedAckTask;
int ackCounter;
int dispatchedCount;
Pointer<ExecutorService> executor;
ActiveMQSessionKernel* session;
ActiveMQConsumerKernel* parent;
Pointer<ConsumerInfo> info;
ActiveMQConsumerKernelConfig() : listener(NULL),
messageAvailableListener(NULL),
transformer(NULL),
listenerMutex(),
deliveringAcks(),
started(),
closeSyncRegistered(),
unconsumedMessages(),
deliveredMessages(),
lastDeliveredSequenceId(-1),
pendingAck(),
deliveredCounter(0),
additionalWindowSize(0),
synchronizationRegistered(false),
isClearDeliveredList(false),
inProgressClearRequiredFlag(0),
redeliveryDelay(0),
redeliveryPolicy(),
failureError(),
scheduler(),
hashCode(),
previouslyDeliveredMessages(),
failoverRedeliveryWaitPeriod(0),
transactedIndividualAck(false),
nonBlockingRedelivery(false),
consumerExpiryCheckEnabled(true),
optimizeAcknowledge(false),
optimizeAckTimestamp(System::currentTimeMillis()),
optimizeAcknowledgeTimeOut(),
optimizedAckScheduledAckInterval(),
optimizedAckTask(),
ackCounter(),
dispatchedCount(),
executor(),
session(),
parent(),
info() {
}
bool isTimeForOptimizedAck(int prefetchSize) const {
if (ackCounter + deliveredCounter >= (prefetchSize * 0.65)) {
return true;
}
long long nextAckTime = optimizeAckTimestamp + optimizeAcknowledgeTimeOut;
if (optimizeAcknowledgeTimeOut > 0 && System::currentTimeMillis() >= nextAckTime) {
return true;
}
return false;
}
void clearDeliveredList() {
if (isClearDeliveredList) {
synchronized (&this->deliveredMessages) {
if (isClearDeliveredList) {
if (!deliveredMessages.isEmpty()) {
if (session->isTransacted()) {
if (previouslyDeliveredMessages == NULL) {
previouslyDeliveredMessages.reset(new PreviouslyDeliveredMap(
session->getTransactionContext()->getTransactionId()));
}
Pointer<Iterator<Pointer<MessageDispatch> > > iter(deliveredMessages.iterator());
while (iter->hasNext()) {
Pointer<MessageDispatch> dispatch = iter->next();
previouslyDeliveredMessages->put(dispatch->getMessage()->getMessageId(), false);
}
} else {
if (session->isClientAcknowledge() || session->isIndividualAcknowledge()) {
if (!info->isBrowser()) {
Pointer<Iterator<Pointer<MessageDispatch> > > iter(deliveredMessages.iterator());
// allow redelivery
while (iter->hasNext()) {
Pointer<MessageDispatch> dispatch = iter->next();
session->getConnection()->rollbackDuplicate(parent, dispatch->getMessage());
}
}
}
deliveredMessages.clear();
pendingAck.reset(NULL);
}
}
isClearDeliveredList = false;
}
}
}
}
void clearPreviouslyDelivered() {
if (previouslyDeliveredMessages != NULL) {
previouslyDeliveredMessages->clear();
previouslyDeliveredMessages.reset(NULL);
}
}
// called with deliveredMessages locked
void removeFromDeliveredMessages(Pointer<MessageId> key) {
Pointer< Iterator< Pointer<MessageDispatch> > > iter(this->deliveredMessages.iterator());
while (iter->hasNext()) {
Pointer<MessageDispatch> candidate = iter->next();
if (key->equals(candidate->getMessage()->getMessageId().get())) {
session->getConnection()->rollbackDuplicate(this->parent, candidate->getMessage());
iter->remove();
break;
}
}
}
// called with unconsumedMessages && deliveredMessages locked remove any message
// not re-delivered as they can't be replayed to this consumer on rollback
void rollbackPreviouslyDeliveredAndNotRedelivered() {
if (previouslyDeliveredMessages != NULL) {
Set<MapEntry<Pointer<MessageId>, bool> >& entries = previouslyDeliveredMessages->entrySet();
Pointer<Iterator<MapEntry<Pointer<MessageId>, bool> > > iter(entries.iterator());
while (iter->hasNext()) {
MapEntry<Pointer<MessageId>, bool> entry = iter->next();
if (!entry.getValue()) {
removeFromDeliveredMessages(entry.getKey());
}
}
clearPreviouslyDelivered();
}
}
void rollbackOnFailedRecoveryRedelivery() {
if (previouslyDeliveredMessages != NULL) {
// if any previously delivered messages was not re-delivered, transaction is invalid
// and must roll back as messages have been dispatched elsewhere.
int numberNotReplayed = 0;
Set<MapEntry<Pointer<MessageId>, bool> >& entries = previouslyDeliveredMessages->entrySet();
Pointer<Iterator<MapEntry<Pointer<MessageId>, bool> > > iter(entries.iterator());
while (iter->hasNext()) {
MapEntry<Pointer<MessageId>, bool> entry = iter->next();
if (!entry.getValue()) {
numberNotReplayed++;
}
}
if (numberNotReplayed > 0) {
std::string txId = previouslyDeliveredMessages->transactionId != NULL ?
previouslyDeliveredMessages->transactionId->toString() : "<None>";
std::string message = std::string("rolling back transaction (") + txId +
") post failover recovery. " + Integer::toString(numberNotReplayed) +
" previously delivered message(s) not replayed to consumer: " +
info->getConsumerId()->toString();
throw cms::TransactionRolledBackException(message);
}
}
}
void waitForRedeliveries() {
if (failoverRedeliveryWaitPeriod > 0 && previouslyDeliveredMessages != NULL) {
long long expiry = System::currentTimeMillis() + failoverRedeliveryWaitPeriod;
int numberNotReplayed;
do {
numberNotReplayed = 0;
synchronized (&this->deliveredMessages) {
if (previouslyDeliveredMessages != NULL) {
Set<MapEntry<Pointer<MessageId>, bool> >& entries = previouslyDeliveredMessages->entrySet();
Pointer<Iterator<MapEntry<Pointer<MessageId>, bool> > > iter(entries.iterator());
while (iter->hasNext()) {
MapEntry<Pointer<MessageId>, bool> entry = iter->next();
if (!entry.getValue()) {
numberNotReplayed++;
}
}
}
}
if (numberNotReplayed > 0) {
try {
Thread::sleep(Math::max(500LL, failoverRedeliveryWaitPeriod/4));
} catch (InterruptedException& ex) {
break;
}
}
} while (numberNotReplayed > 0 && expiry < System::currentTimeMillis());
}
}
bool redeliveryExpectedInCurrentTransaction(Pointer<MessageDispatch> dispatch, bool markReceipt) {
if (session->isTransacted()) {
synchronized (&this->deliveredMessages) {
if (previouslyDeliveredMessages != NULL) {
if (previouslyDeliveredMessages->containsKey(dispatch->getMessage()->getMessageId())) {
if (markReceipt) {
previouslyDeliveredMessages->put(dispatch->getMessage()->getMessageId(), true);
}
return true;
}
}
}
}
return false;
}
bool redeliveryPendingInCompetingTransaction(Pointer<MessageDispatch> dispatch) {
ArrayList< Pointer<ActiveMQSessionKernel> > sessions = session->getConnection()->getSessions();
Pointer<Iterator<Pointer<ActiveMQSessionKernel> > > sessionIter(sessions.iterator());
while (sessionIter->hasNext()) {
Pointer<ActiveMQSessionKernel> session = sessionIter->next();
ArrayList< Pointer<ActiveMQConsumerKernel> > consumers = session->getConsumers();
Pointer<Iterator<Pointer<ActiveMQConsumerKernel> > > consumersIter(consumers.iterator());
while (consumersIter->hasNext()) {
Pointer<ActiveMQConsumerKernel> consumer = consumersIter->next();
return consumer->isRedeliveryExpectedInCurrentTransaction(dispatch);
}
}
return false;
}
bool consumeExpiredMessage(const Pointer<MessageDispatch> dispatch) {
if (dispatch->getMessage()->isExpired()) {
return !info->isBrowser() && consumerExpiryCheckEnabled;
}
return false;
}
bool redeliveryExceeded(Pointer<MessageDispatch> dispatch) {
try {
return session->isTransacted() && redeliveryPolicy != NULL &&
redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES &&
dispatch->getRedeliveryCounter() > redeliveryPolicy->getMaximumRedeliveries() &&
// redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
!dispatch->getMessage()->getMessageProperties().containsKey("redeliveryDelay");
} catch (Exception& ignored) {
return false;
}
}
void posionAck(Pointer<MessageDispatch> dispatch, const std::string& cause) {
Pointer<MessageAck> poisonAck(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_POISON, 1));
poisonAck->setFirstMessageId(dispatch->getMessage()->getMessageId());
poisonAck->setPoisonCause(createBrokerError(cause));
session->sendAck(poisonAck);
}
Pointer<BrokerError> createBrokerError(const std::string& message) {
Pointer<BrokerError> cause(new BrokerError());
cause->setExceptionClass("javax.jms.JMSException");
cause->setMessage(message);
return cause;
}
};
}}}
////////////////////////////////////////////////////////////////////////////////
namespace {
/**
* Class used to deal with consumers in an active transaction. This
* class calls back into the consumer when the transaction is Committed or
* Rolled Back to process that event.
*/
class TransactionSynhcronization : public Synchronization {
private:
Pointer<ActiveMQConsumerKernel> consumer;
ActiveMQConsumerKernelConfig* impl;
private:
TransactionSynhcronization(const TransactionSynhcronization&);
TransactionSynhcronization& operator=(const TransactionSynhcronization&);
public:
TransactionSynhcronization(Pointer<ActiveMQConsumerKernel> consumer, ActiveMQConsumerKernelConfig* impl) :
Synchronization(), consumer(consumer), impl(impl) {
if (consumer == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
}
}
virtual ~TransactionSynhcronization() {}
virtual void beforeEnd() {
if (impl->transactedIndividualAck) {
impl->clearDeliveredList();
impl->waitForRedeliveries();
synchronized(&impl->deliveredMessages) {
impl->rollbackOnFailedRecoveryRedelivery();
}
} else {
consumer->acknowledge();
}
consumer->setSynchronizationRegistered(false);
}
virtual void afterCommit() {
consumer->commit();
consumer->setSynchronizationRegistered(false);
consumer.reset(NULL);
}
virtual void afterRollback() {
consumer->rollback();
consumer->setSynchronizationRegistered(false);
consumer.reset(NULL);
}
};
/**
* Class used to Hook a consumer that has been closed into the Transaction
* it is currently a part of. Once the Transaction has been Committed or
* Rolled back this Synchronization can finish the Close of the consumer.
*/
class CloseSynhcronization : public Synchronization {
private:
Pointer<ActiveMQConsumerKernel> consumer;
private:
CloseSynhcronization(const CloseSynhcronization&);
CloseSynhcronization& operator=(const CloseSynhcronization&);
public:
CloseSynhcronization(Pointer<ActiveMQConsumerKernel> consumer) : consumer(consumer) {
if (consumer == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
}
}
virtual ~CloseSynhcronization() {}
virtual void beforeEnd() {
}
virtual void afterCommit() {
consumer->doClose();
consumer.reset(NULL);
}
virtual void afterRollback() {
consumer->doClose();
consumer.reset(NULL);
}
};
/**
* ActiveMQAckHandler used to support Managed Acknowledge modes.
*/
class NoOpAckHandler : public ActiveMQAckHandler {
private:
NoOpAckHandler(const NoOpAckHandler&);
NoOpAckHandler& operator=(const NoOpAckHandler&);
public:
NoOpAckHandler() {
}
void acknowledgeMessage(const commands::Message* message AMQCPP_UNUSED) {
}
};
/**
* ActiveMQAckHandler used to support Client Acknowledge mode.
*/
class ClientAckHandler : public ActiveMQAckHandler {
private:
ActiveMQSessionKernel* session;
private:
ClientAckHandler(const ClientAckHandler&);
ClientAckHandler& operator=(const ClientAckHandler&);
public:
ClientAckHandler(ActiveMQSessionKernel* session) : session(session) {
if (session == NULL) {
throw NullPointerException(
__FILE__, __LINE__, "Ack Handler Created with NULL Session.");
}
}
void acknowledgeMessage(const commands::Message* message AMQCPP_UNUSED) {
try {
this->session->acknowledge();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
};
/**
* ActiveMQAckHandler used to enable the Individual Acknowledge mode.
*/
class IndividualAckHandler : public ActiveMQAckHandler {
private:
ActiveMQConsumerKernel* consumer;
Pointer<commands::MessageDispatch> dispatch;
private:
IndividualAckHandler(const IndividualAckHandler&);
IndividualAckHandler& operator=(const IndividualAckHandler&);
public:
IndividualAckHandler(ActiveMQConsumerKernel* consumer, const Pointer<MessageDispatch>& dispatch) :
consumer(consumer), dispatch(dispatch) {
if (consumer == NULL) {
throw NullPointerException(
__FILE__, __LINE__, "Ack Handler Created with NULL consumer.");
}
}
void acknowledgeMessage(const commands::Message* message AMQCPP_UNUSED) {
try {
if (this->dispatch != NULL) {
this->consumer->acknowledge(this->dispatch);
this->dispatch.reset(NULL);
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
};
/**
* Class used to Start a Consumer's dispatch queue asynchronously from the
* configured Scheduler.
*/
class StartConsumerTask : public Runnable {
private:
Pointer<ActiveMQConsumerKernel> consumer;
ActiveMQSessionKernel* session;
private:
StartConsumerTask(const StartConsumerTask&);
StartConsumerTask& operator=(const StartConsumerTask&);
public:
StartConsumerTask(Pointer<ActiveMQConsumerKernel> consumer, ActiveMQSessionKernel* session) :
Runnable(), consumer(consumer), session(session) {
if (consumer == NULL) {
throw NullPointerException(
__FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
}
}
virtual ~StartConsumerTask() {}
virtual void run() {
try {
if (!this->consumer->isClosed()) {
this->consumer->start();
}
} catch(cms::CMSException& ex) {
Exception wrapper(ex.clone());
this->session->getConnection()->onAsyncException(wrapper);
}
this->consumer.reset(NULL);
}
};
class AsyncMessageAckTask : public Runnable {
private:
Pointer<MessageAck> ack;
ActiveMQSessionKernel* session;
ActiveMQConsumerKernelConfig* impl;
private:
AsyncMessageAckTask(const AsyncMessageAckTask&);
AsyncMessageAckTask& operator=(const AsyncMessageAckTask&);
public:
AsyncMessageAckTask(Pointer<MessageAck> ack, ActiveMQSessionKernel* session, ActiveMQConsumerKernelConfig* impl) :
Runnable(), ack(ack), session(session), impl(impl) {}
virtual ~AsyncMessageAckTask() {}
virtual void run() {
try {
this->session->sendAck(ack, true);
this->impl->deliveringAcks.set(false);
} catch(Exception& ex) {
this->impl->deliveringAcks.set(false);
} catch(cms::CMSException& ex) {
this->impl->deliveringAcks.set(false);
}
}
};
class OptimizedAckTask : public Runnable {
private:
Pointer<ActiveMQConsumerKernel> consumer;
ActiveMQConsumerKernelConfig* impl;
private:
OptimizedAckTask(const OptimizedAckTask&);
OptimizedAckTask& operator=(const OptimizedAckTask&);
public:
OptimizedAckTask(Pointer<ActiveMQConsumerKernel> consumer, ActiveMQConsumerKernelConfig* impl) :
Runnable(), consumer(consumer), impl(impl) {}
virtual ~OptimizedAckTask() {}
virtual void run() {
try {
if (impl->optimizeAcknowledge && !impl->unconsumedMessages->isClosed()) {
this->consumer->deliverAcks();
}
} catch(Exception& ex) {
impl->session->getConnection()->onAsyncException(ex);
}
this->consumer.reset(NULL);
}
};
class NonBlockingRedeliveryTask : public Runnable {
private:
ActiveMQSessionKernel* session;
Pointer<ActiveMQConsumerKernel> consumer;
ActiveMQConsumerKernelConfig* impl;
ArrayList<Pointer<MessageDispatch> > redeliveries;
private:
NonBlockingRedeliveryTask(const NonBlockingRedeliveryTask&);
NonBlockingRedeliveryTask& operator=(const NonBlockingRedeliveryTask&);
public:
NonBlockingRedeliveryTask(ActiveMQSessionKernel* session, Pointer<ActiveMQConsumerKernel> consumer, ActiveMQConsumerKernelConfig* impl) :
Runnable(), session(session), consumer(consumer), impl(impl), redeliveries() {
this->redeliveries.copy(impl->deliveredMessages);
Collections::reverse(this->redeliveries);
}
virtual ~NonBlockingRedeliveryTask() {}
virtual void run() {
try {
if (!impl->unconsumedMessages->isClosed()) {
Pointer<Iterator<Pointer<MessageDispatch> > > iter(redeliveries.iterator());
while (iter->hasNext() && !impl->unconsumedMessages->isClosed()) {
Pointer<MessageDispatch> dispatch = iter->next();
session->dispatch(dispatch);
}
}
} catch (Exception& e) {
session->getConnection()->onAsyncException(e);
}
this->consumer.reset(NULL);
}
};
}
////////////////////////////////////////////////////////////////////////////////
ActiveMQConsumerKernel::ActiveMQConsumerKernel(ActiveMQSessionKernel* session,
const Pointer<ConsumerId>& id,
const Pointer<ActiveMQDestination>& destination,
const std::string& name,
const std::string& selector,
int prefetch,
int maxPendingMessageCount,
bool noLocal,
bool browser,
bool dispatchAsync,
cms::MessageListener* listener) : internal(NULL),
session(NULL),
consumerInfo() {
if (session == NULL) {
throw IllegalArgumentException(__FILE__, __LINE__, "Consumer created with NULL Session");
}
if (destination == NULL) {
throw cms::InvalidDestinationException("Consumer created with NULL Destination");
} else if (destination->getPhysicalName() == "") {
throw cms::InvalidDestinationException("Destination given has no Physical Name.");
} else if (destination->isTemporary()) {
std::string physicalName = destination->getPhysicalName();
std::string connectionId = session->getConnection()->getConnectionInfo().getConnectionId()->getValue();
if (physicalName.find(connectionId) == std::string::npos) {
throw cms::InvalidDestinationException(
"Cannot use a Temporary destination from another Connection");
}
Pointer<ActiveMQTempDestination> tempDest = destination.dynamicCast<ActiveMQTempDestination>();
if (session->getConnection()->isDeleted(tempDest)) {
throw cms::InvalidDestinationException(
"Cannot use a Temporary destination that has been deleted");
}
}
if (prefetch < 0) {
throw cms::CMSException("Cannot have a prefetch size less than zero");
}
this->internal = new ActiveMQConsumerKernelConfig();
Pointer<ConsumerInfo> consumerInfo(new ConsumerInfo());
consumerInfo->setConsumerId(id);
consumerInfo->setClientId(session->getConnection()->getClientID());
consumerInfo->setDestination(destination);
consumerInfo->setSubscriptionName(name);
consumerInfo->setSelector(selector);
consumerInfo->setPrefetchSize(prefetch);
consumerInfo->setCurrentPrefetchSize(prefetch);
consumerInfo->setMaximumPendingMessageLimit(maxPendingMessageCount);
consumerInfo->setBrowser(browser);
consumerInfo->setDispatchAsync(dispatchAsync);
consumerInfo->setNoLocal(noLocal);
consumerInfo->setExclusive(session->getConnection()->isExclusiveConsumer());
consumerInfo->setRetroactive(session->getConnection()->isUseRetroactiveConsumer());
// Initialize Consumer Data
this->session = session;
this->consumerInfo = consumerInfo;
this->internal->session = session;
this->internal->parent = this;
this->internal->info = consumerInfo;
this->internal->redeliveryPolicy.reset(this->session->getConnection()->getRedeliveryPolicy()->clone());
this->internal->scheduler = this->session->getScheduler();
if (this->session->getConnection()->isMessagePrioritySupported()) {
this->internal->unconsumedMessages.reset(new SimplePriorityMessageDispatchChannel());
} else {
this->internal->unconsumedMessages.reset(new FifoMessageDispatchChannel());
}
if (listener != NULL) {
this->setMessageListener(listener);
}
applyDestinationOptions(this->consumerInfo);
if (session->getConnection()->isOptimizeAcknowledge() && session->isAutoAcknowledge() && !consumerInfo->isBrowser()) {
this->internal->optimizeAcknowledge = true;
}
if (this->internal->optimizeAcknowledge) {
this->internal->optimizeAcknowledgeTimeOut = session->getConnection()->getOptimizeAcknowledgeTimeOut();
this->setOptimizedAckScheduledAckInterval(
session->getConnection()->getOptimizedAckScheduledAckInterval());
}
consumerInfo->setOptimizedAcknowledge(this->internal->optimizeAcknowledge);
this->internal->failoverRedeliveryWaitPeriod =
session->getConnection()->getConsumerFailoverRedeliveryWaitPeriod();
this->internal->nonBlockingRedelivery = session->getConnection()->isNonBlockingRedelivery();
this->internal->transactedIndividualAck =
session->getConnection()->isTransactedIndividualAck() ||
this->internal->nonBlockingRedelivery ||
this->session->getConnection()->isMessagePrioritySupported();
this->internal->consumerExpiryCheckEnabled =
this->session->getConnection()->isConsumerExpiryCheckEnabled();
if (this->consumerInfo->getPrefetchSize() < 0) {
delete this->internal;
throw IllegalArgumentException(
__FILE__, __LINE__, "Cannot create a consumer with a negative prefetch");
}
}
////////////////////////////////////////////////////////////////////////////////
ActiveMQConsumerKernel::~ActiveMQConsumerKernel() {
try {
this->close();
}
AMQ_CATCHALL_NOTHROW()
try {
delete this->internal;
}
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::start() {
if (this->internal->unconsumedMessages->isClosed()) {
return;
}
this->internal->started.set(true);
this->internal->unconsumedMessages->start();
this->session->wakeup();
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::stop() {
this->internal->started.set(false);
this->internal->unconsumedMessages->stop();
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumerKernel::isClosed() const {
return this->internal->unconsumedMessages->isClosed();
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::close() {
try {
if (!this->isClosed()) {
if (!this->internal->deliveredMessages.isEmpty() &&
this->session->getTransactionContext() != NULL &&
this->session->getTransactionContext()->isInTransaction() &&
this->internal->closeSyncRegistered.compareAndSet(false, true)) {
Pointer<ActiveMQConsumerKernel> self =
this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
Pointer<Synchronization> sync(new CloseSynhcronization(self));
this->session->getTransactionContext()->addSynchronization(sync);
} else {
doClose();
}
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::doClose() {
try {
// Store interrupted state and clear so that Transport operations don't
// throw InterruptedException and we ensure that resources are clened up.
bool interrupted = Thread::interrupted();
dispose();
// Remove at the Broker Side, consumer has been removed from the local
// Session and Connection objects so if the remote call to remove throws
// it is okay to propagate to the client.
Pointer<RemoveInfo> info(new RemoveInfo);
info->setObjectId(this->consumerInfo->getConsumerId());
info->setLastDeliveredSequenceId(this->internal->lastDeliveredSequenceId);
this->session->oneway(info);
if (interrupted) {
Thread::currentThread()->interrupt();
}
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::dispose() {
try {
if (!this->isClosed()) {
if (!session->isTransacted()) {
deliverAcks();
if (isAutoAcknowledgeBatch()) {
acknowledge();
}
}
this->internal->started.set(false);
if (this->internal->executor != NULL) {
this->internal->executor->shutdown();
this->internal->executor->awaitTermination(60, TimeUnit::SECONDS);
this->internal->executor.reset(NULL);
}
if (this->internal->optimizedAckTask != NULL) {
this->session->getScheduler()->cancel(this->internal->optimizedAckTask);
this->internal->optimizedAckTask = NULL;
}
if (session->isClientAcknowledge() || session->isIndividualAcknowledge()) {
if (!this->consumerInfo->isBrowser()) {
// roll back duplicates that aren't acknowledged
ArrayList< Pointer<MessageDispatch> > tmp;
synchronized(&this->internal->deliveredMessages) {
tmp.copy(this->internal->deliveredMessages);
}
Pointer< Iterator<Pointer<MessageDispatch> > > iter(tmp.iterator());
while (iter->hasNext()) {
Pointer<MessageDispatch> msg = iter->next();
this->session->getConnection()->rollbackDuplicate(this, msg->getMessage());
}
tmp.clear();
}
}
// Identifies any errors encountered during shutdown.
bool haveException = false;
ActiveMQException error;
if (!this->internal->session->isTransacted()) {
// For IndividualAck Mode we need to unlink the ack handler to remove a
// cyclic reference to the MessageDispatch that brought the message to us.
synchronized(&internal->deliveredMessages) {
if (this->session->isIndividualAcknowledge()) {
std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(this->internal->deliveredMessages.iterator());
while (iter->hasNext()) {
iter->next()->getMessage()->setAckHandler(Pointer<ActiveMQAckHandler>());
}
}
this->internal->deliveredMessages.clear();
}
}
// Stop and Wakeup all sync consumers.
this->internal->unconsumedMessages->close();
// Remove this Consumer from the Connections set of Dispatchers
Pointer<ActiveMQConsumerKernel> consumer(this);
try {
this->session->removeConsumer(consumer);
} catch (Exception& e) {
consumer.release();
throw;
}
consumer.release();
// Ensure these are filtered as duplicates.
std::vector< Pointer<MessageDispatch> > list = this->internal->unconsumedMessages->removeAll();
if (!this->consumerInfo->isBrowser()) {
std::vector< Pointer<MessageDispatch> >::const_iterator iter = list.begin();
for (; iter != list.end(); ++iter) {
Pointer<MessageDispatch> md = *iter;
this->session->getConnection()->rollbackDuplicate(this, md->getMessage());
}
}
// If we encountered an error, propagate it.
if (haveException) {
error.setMark(__FILE__, __LINE__);
throw error;
}
}
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
std::string ActiveMQConsumerKernel::getMessageSelector() const {
try {
checkClosed();
return this->consumerInfo->getSelector();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long timeout) {
try {
// Calculate the deadline
long long deadline = 0;
if (timeout > 0) {
deadline = System::currentTimeMillis() + timeout;
}
// Loop until the time is up or we get a non-expired message
while (true) {
Pointer<MessageDispatch> dispatch = this->internal->unconsumedMessages->dequeue(timeout);
if (dispatch == NULL) {
if (timeout > 0 && !this->internal->unconsumedMessages->isClosed()) {
timeout = Math::max(deadline - System::currentTimeMillis(), 0LL);
} else {
if (this->internal->failureError != NULL) {
throw CMSExceptionSupport::create(*this->internal->failureError);
} else {
return Pointer<MessageDispatch>();
}
}
} else if (dispatch->getMessage() == NULL) {
return Pointer<MessageDispatch> ();
} else if (internal->consumeExpiredMessage(dispatch)) {
beforeMessageIsConsumed(dispatch);
afterMessageIsConsumed(dispatch, true);
if (timeout > 0) {
timeout = Math::max(deadline - System::currentTimeMillis(), 0LL);
}
sendPullRequest(timeout);
} else if (internal->redeliveryExceeded(dispatch)) {
internal->posionAck(dispatch,
"dispatch to " + getConsumerId()->toString() +
" exceeds RedeliveryPolicy limit: " +
Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries()));
if (timeout > 0) {
timeout = Math::max(deadline - System::currentTimeMillis(), 0LL);
}
sendPullRequest(timeout);
} else {
return dispatch;
}
}
return Pointer<MessageDispatch>();
} catch (InterruptedException& ex) {
Thread::currentThread()->interrupt();
throw CMSExceptionSupport::create(ex);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* ActiveMQConsumerKernel::receive() {
try {
this->checkClosed();
this->checkMessageListener();
// Send a request for a new message if needed
this->sendPullRequest(0);
// Wait for the next message.
Pointer<MessageDispatch> message = dequeue(-1);
if (message == NULL) {
return NULL;
}
beforeMessageIsConsumed(message);
afterMessageIsConsumed(message, false);
// Need to clone the message because the user is responsible for freeing
// its copy of the message, createCMSMessage will do this for us.
return createCMSMessage(message).release();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* ActiveMQConsumerKernel::receive(int timeout) {
try {
this->checkClosed();
this->checkMessageListener();
if (timeout == 0) {
return this->receive();
}
// Send a request for a new message if needed
this->sendPullRequest(timeout);
// Get the next available message, if there is one.
Pointer<MessageDispatch> message;
if (internal->info->getPrefetchSize() == 0) {
message = dequeue(-1); // Broker will signal if no message.
} else {
message = dequeue(timeout);
}
if (message == NULL) {
return NULL;
}
beforeMessageIsConsumed(message);
afterMessageIsConsumed(message, false);
// Need to clone the message because the user is responsible for freeing
// its copy of the message, createCMSMessage will do this for us.
return createCMSMessage(message).release();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* ActiveMQConsumerKernel::receiveNoWait() {
try {
this->checkClosed();
this->checkMessageListener();
// Send a request for a new message if needed
this->sendPullRequest(-1);
// Get the next available message, if there is one.
Pointer<MessageDispatch> message;
if (internal->info->getPrefetchSize() == 0) {
message = dequeue(-1); // Broker will signal if no message.
} else {
message = dequeue(0);
}
if (message == NULL) {
return NULL;
}
beforeMessageIsConsumed(message);
afterMessageIsConsumed(message, false);
// Need to clone the message because the user is responsible for freeing
// its copy of the message, createCMSMessage will do this for us.
return createCMSMessage(message).release();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::setMessageListener(cms::MessageListener* listener) {
try {
this->checkClosed();
if (this->consumerInfo->getPrefetchSize() == 0 && listener != NULL) {
throw ActiveMQException(__FILE__, __LINE__,
"Cannot deliver async when Prefetch is Zero, set Prefecth to at least One.");
}
if (listener != NULL) {
// Now that we have a valid message listener, redispatch all the messages that it missed.
bool wasStarted = session->isStarted();
if (wasStarted) {
session->stop();
}
synchronized(&(this->internal->listenerMutex)) {
this->internal->listener = listener;
}
this->session->redispatch(*(this->internal->unconsumedMessages));
if (wasStarted) {
this->session->start();
}
} else {
synchronized(&(this->internal->listenerMutex)) {
this->internal->listener = NULL;
}
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::beforeMessageIsConsumed(Pointer<MessageDispatch> dispatch) {
this->internal->lastDeliveredSequenceId = dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
if (!isAutoAcknowledgeBatch()) {
// When not in an Auto
synchronized(&this->internal->deliveredMessages) {
this->internal->deliveredMessages.addFirst(dispatch);
}
if (this->session->isTransacted()) {
if (this->internal->transactedIndividualAck) {
immediateIndividualTransactedAck(dispatch);
} else {
ackLater(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED);
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::immediateIndividualTransactedAck(Pointer<MessageDispatch> dispatch) {
// acks accumulate on the broker pending transaction completion to indicate delivery status
registerSync();
Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_INDIVIDUAL, 1));
ack->setTransactionId(this->session->getTransactionContext()->getTransactionId());
this->session->syncRequest(ack);
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::registerSync() {
this->session->doStartTransaction();
if (!this->internal->synchronizationRegistered) {
this->internal->synchronizationRegistered = true;
Pointer<ActiveMQConsumerKernel> self =
this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
Pointer<Synchronization> sync(new TransactionSynhcronization(self, this->internal));
this->session->getTransactionContext()->addSynchronization(sync);
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::afterMessageIsConsumed(Pointer<MessageDispatch> message, bool messageExpired ) {
try {
if (this->internal->unconsumedMessages->isClosed()) {
return;
} else if (messageExpired) {
acknowledge(message, ActiveMQConstants::ACK_TYPE_EXPIRED);
return;
} else if (session->isTransacted()) {
return;
}
if (isAutoAcknowledgeEach()) {
if (this->internal->deliveringAcks.compareAndSet(false, true)) {
synchronized(&this->internal->deliveredMessages) {
if (!this->internal->deliveredMessages.isEmpty()) {
if (this->internal->optimizeAcknowledge) {
this->internal->ackCounter++;
if (this->internal->isTimeForOptimizedAck(this->consumerInfo->getPrefetchSize())) {
Pointer<MessageAck> ack =
makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
if (ack != NULL) {
this->internal->deliveredMessages.clear();
this->internal->ackCounter = 0;
this->session->sendAck(ack);
this->internal->optimizeAckTimestamp = System::currentTimeMillis();
}
// As further optimization send ack for expired messages when there
// are any. This resets the deliveredCounter to 0 so that we won't
// send standard acks with every message just because the deliveredCounter
// just below 0.5 * prefetch as used in ackLater()
if (this->internal->pendingAck != NULL && this->internal->deliveredCounter > 0) {
this->session->sendAck(this->internal->pendingAck);
this->internal->pendingAck.reset(NULL);
this->internal->deliveredCounter = 0;
}
}
} else {
Pointer<MessageAck> ack =
makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
if (ack != NULL) {
this->internal->deliveredMessages.clear();
session->sendAck(ack);
}
}
}
}
this->internal->deliveringAcks.set(false);
}
} else if (isAutoAcknowledgeBatch()) {
ackLater(message, ActiveMQConstants::ACK_TYPE_CONSUMED);
} else if (session->isClientAcknowledge() || session->isIndividualAcknowledge()) {
bool messageUnackedByConsumer = false;
synchronized(&this->internal->deliveredMessages) {
messageUnackedByConsumer = this->internal->deliveredMessages.contains(message);
}
if (messageUnackedByConsumer) {
this->ackLater(message, ActiveMQConstants::ACK_TYPE_DELIVERED);
}
} else {
throw IllegalStateException(__FILE__, __LINE__, "Invalid Session State");
}
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::deliverAcks() {
try {
Pointer<MessageAck> ack;
if (this->internal->deliveringAcks.compareAndSet(false, true)) {
if (isAutoAcknowledgeEach()) {
synchronized(&this->internal->deliveredMessages) {
ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
if (ack != NULL) {
this->internal->deliveredMessages.clear();
this->internal->ackCounter = 0;
} else {
ack.swap(internal->pendingAck);
}
}
} else if (this->internal->pendingAck != NULL && this->internal->pendingAck->isStandardAck()) {
ack.swap(this->internal->pendingAck);
}
if (ack != NULL) {
if (this->internal->executor == NULL) {
this->internal->executor.reset(Executors::newSingleThreadExecutor());
}
Pointer< Future<bool> >( this->internal->executor->submit(
new AsyncMessageAckTask(ack, this->session, this->internal), true));
} else {
this->internal->deliveringAcks.set(false);
}
}
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::ackLater(Pointer<MessageDispatch> dispatch, int ackType) {
// Don't acknowledge now, but we may need to let the broker know the
// consumer got the message to expand the pre-fetch window
if (session->isTransacted()) {
registerSync();
}
// The delivered message list is only needed for the recover method
// which is only used with client ack.
this->internal->deliveredCounter++;
Pointer<MessageAck> oldPendingAck = this->internal->pendingAck;
this->internal->pendingAck.reset(new MessageAck(dispatch, ackType, internal->deliveredCounter));
if (oldPendingAck == NULL) {
this->internal->pendingAck->setFirstMessageId(this->internal->pendingAck->getLastMessageId());
} else if (oldPendingAck->getAckType() == this->internal->pendingAck->getAckType()) {
this->internal->pendingAck->setFirstMessageId(oldPendingAck->getFirstMessageId());
} else {
// old pending ack being superseded by ack of another type, if is is not a delivered
// ack and hence important, send it now so it is not lost.
if (!oldPendingAck->isDeliveredAck()) {
session->sendAck(oldPendingAck);
}
}
if (session->isTransacted()) {
this->internal->pendingAck->setTransactionId(this->session->getTransactionContext()->getTransactionId());
}
// Need to evaluate both expired and normal messages as otherwise consumer may get stalled
int pendingAcks = (internal->deliveredCounter + internal->ackCounter) - internal->additionalWindowSize;
if ((0.5 * this->consumerInfo->getPrefetchSize()) <= pendingAcks) {
session->sendAck(this->internal->pendingAck);
this->internal->pendingAck.reset(NULL);
this->internal->deliveredCounter = 0;
this->internal->additionalWindowSize = 0;
}
}
////////////////////////////////////////////////////////////////////////////////
Pointer<MessageAck> ActiveMQConsumerKernel::makeAckForAllDeliveredMessages(int type) {
synchronized( &this->internal->deliveredMessages ) {
if (!this->internal->deliveredMessages.isEmpty()) {
Pointer<MessageDispatch> dispatched = this->internal->deliveredMessages.getFirst();
Pointer<MessageAck> ack(new MessageAck(dispatched, type, this->internal->deliveredMessages.size()));
ack->setFirstMessageId(this->internal->deliveredMessages.getLast()->getMessage()->getMessageId());
return ack;
}
}
return Pointer<MessageAck>();
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::acknowledge(Pointer<commands::MessageDispatch> dispatch) {
this->acknowledge(dispatch, ActiveMQConstants::ACK_TYPE_INDIVIDUAL);
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::acknowledge(Pointer<commands::MessageDispatch> dispatch, int ackType) {
try {
Pointer<MessageAck> ack(new MessageAck(dispatch, ackType, 1));
if (ack->isExpiredAck()) {
ack->setFirstMessageId(ack->getLastMessageId());
}
session->sendAck(ack);
synchronized(&this->internal->deliveredMessages) {
this->internal->deliveredMessages.remove(dispatch);
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::acknowledge() {
try {
this->internal->clearDeliveredList();
this->internal->waitForRedeliveries();
synchronized(&this->internal->deliveredMessages) {
// Acknowledge all messages so far.
Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
if (ack == NULL) {
return;
}
if (session->isTransacted()) {
this->internal->rollbackOnFailedRecoveryRedelivery();
session->doStartTransaction();
ack->setTransactionId(session->getTransactionContext()->getTransactionId());
}
this->internal->pendingAck.reset(NULL);
session->sendAck(ack);
// Adjust the counters
this->internal->deliveredCounter = Math::max(0, this->internal->deliveredCounter - (int) this->internal->deliveredMessages.size());
this->internal->additionalWindowSize = Math::max(0, this->internal->additionalWindowSize - (int) this->internal->deliveredMessages.size());
if (!session->isTransacted()) {
this->internal->deliveredMessages.clear();
}
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::commit() {
synchronized(&(this->internal->deliveredMessages)) {
this->internal->deliveredMessages.clear();
this->internal->clearPreviouslyDelivered();
}
this->internal->redeliveryDelay = 0;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::rollback() {
clearDeliveredList();
synchronized(this->internal->unconsumedMessages.get()) {
if (this->internal->optimizeAcknowledge) {
// remove messages read but not acknowledged at the broker yet through optimizeAcknowledge
if (!this->consumerInfo->isBrowser()) {
synchronized(&this->internal->deliveredMessages) {
for (int i = 0; (i < this->internal->deliveredMessages.size()) &&
(i < this->internal->ackCounter); i++) {
// ensure we don't filter this as a duplicate
Pointer<MessageDispatch> md = this->internal->deliveredMessages.removeLast();
session->getConnection()->rollbackDuplicate(this, md->getMessage());
}
}
}
}
synchronized(&this->internal->deliveredMessages) {
this->internal->rollbackPreviouslyDeliveredAndNotRedelivered();
if (this->internal->deliveredMessages.isEmpty()) {
return;
}
// Only increase the redelivery delay after the first redelivery..
Pointer<MessageDispatch> lastMsg = this->internal->deliveredMessages.getFirst();
const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
if (currentRedeliveryCount > 0) {
this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay(internal->redeliveryDelay);
} else {
this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
}
Pointer<MessageId> firstMsgId = this->internal->deliveredMessages.getLast()->getMessage()->getMessageId();
Pointer<Iterator<Pointer<MessageDispatch> > > iter(internal->deliveredMessages.iterator());
while (iter->hasNext()) {
Pointer<Message> message = iter->next()->getMessage();
message->setRedeliveryCounter(message->getRedeliveryCounter() + 1);
// ensure we don't filter this as a duplicate
session->getConnection()->rollbackDuplicate(this, message);
}
if (this->internal->redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES &&
lastMsg->getMessage()->getRedeliveryCounter() > this->internal->redeliveryPolicy->getMaximumRedeliveries()) {
// We need to NACK the messages so that they get sent to the DLQ.
// Acknowledge the last message.
Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_POISON,
this->internal->deliveredMessages.size()));
ack->setFirstMessageId(firstMsgId);
std::string message = "Exceeded RedeliveryPolicy max redelivery limit: " +
Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries());
if (!lastMsg->getRollbackCause().getMessage().empty()) {
message.append(" cause: Exception -> ");
message.append(lastMsg->getRollbackCause().getMessage());
}
ack->setPoisonCause(internal->createBrokerError(message));
session->sendAck(ack, true);
// Adjust the window size.
this->internal->additionalWindowSize = Math::max(0,
this->internal->additionalWindowSize - (int) this->internal->deliveredMessages.size());
this->internal->redeliveryDelay = 0;
this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
this->internal->deliveredMessages.clear();
} else {
// only redelivery_ack after first delivery
if (currentRedeliveryCount > 0) {
Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_REDELIVERED,
this->internal->deliveredMessages.size()));
ack->setFirstMessageId(firstMsgId);
session->sendAck(ack);
}
if (this->internal->nonBlockingRedelivery) {
if (!this->internal->unconsumedMessages->isClosed()) {
Pointer<ActiveMQConsumerKernel> self =
this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
NonBlockingRedeliveryTask* redeliveryTask =
new NonBlockingRedeliveryTask(session, self, this->internal);
this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
this->internal->deliveredMessages.clear();
this->session->getScheduler()->executeAfterDelay(
redeliveryTask, this->internal->redeliveryDelay);
}
} else {
// stop the delivery of messages.
this->internal->unconsumedMessages->stop();
std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(
this->internal->deliveredMessages.iterator());
while (iter->hasNext()) {
this->internal->unconsumedMessages->enqueueFirst(iter->next());
}
this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
this->internal->deliveredMessages.clear();
if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()) {
Pointer<ActiveMQConsumerKernel> self =
this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
this->internal->scheduler->executeAfterDelay(
new StartConsumerTask(self, session), internal->redeliveryDelay);
} else {
start();
}
}
}
}
}
if (this->internal->listener != NULL) {
session->redispatch(*this->internal->unconsumedMessages);
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch) {
try {
clearMessagesInProgress();
clearDeliveredList();
synchronized(this->internal->unconsumedMessages.get()) {
if (!this->internal->unconsumedMessages->isClosed()) {
if (this->consumerInfo->isBrowser() || !session->getConnection()->isDuplicate(this, dispatch->getMessage())) {
synchronized(&this->internal->listenerMutex) {
if (this->internal->listener != NULL && this->internal->unconsumedMessages->isRunning()) {
if (this->internal->redeliveryExceeded(dispatch)) {
internal->posionAck(dispatch,
"dispatch to " + getConsumerId()->toString() +
" exceeds redelivery policy limit:" +
Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries()));
return;
}
Pointer<cms::Message> message = createCMSMessage(dispatch);
beforeMessageIsConsumed(dispatch);
try {
bool expired = isConsumerExpiryCheckEnabled() && dispatch->getMessage()->isExpired();
if (!expired) {
this->internal->listener->onMessage(message.get());
}
afterMessageIsConsumed(dispatch, expired);
} catch (RuntimeException& e) {
dispatch->setRollbackCause(e);
if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session->isIndividualAcknowledge()) {
// Schedule redelivery and possible DLQ processing
rollback();
} else {
// Transacted or Client ack: Deliver the next message.
afterMessageIsConsumed(dispatch, false);
}
}
} else {
if (!this->internal->unconsumedMessages->isRunning()) {
// delayed redelivery, ensure it can be re delivered
session->getConnection()->rollbackDuplicate(this, dispatch->getMessage());
}
this->internal->unconsumedMessages->enqueue(dispatch);
if (this->internal->messageAvailableListener != NULL) {
this->internal->messageAvailableListener->onMessageAvailable(this);
}
}
}
} else {
// deal with duplicate delivery
if (this->internal->redeliveryExpectedInCurrentTransaction(dispatch, true)) {
if (this->internal->transactedIndividualAck) {
immediateIndividualTransactedAck(dispatch);
} else {
Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED, 1));
internal->session->sendAck(ack);
}
} else if ((internal->redeliveryPendingInCompetingTransaction(dispatch))) {
internal->session->getConnection()->rollbackDuplicate(this, dispatch->getMessage());
this->dispatch(dispatch);
} else {
internal->posionAck(dispatch,
std::string("Suppressing duplicate delivery on connection, consumer ") + getConsumerId()->toString());
}
}
}
if (++internal->dispatchedCount % 1000 == 0) {
internal->dispatchedCount = 0;
Thread::yield();
}
}
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
Pointer<cms::Message> ActiveMQConsumerKernel::createCMSMessage(Pointer<MessageDispatch> dispatch) {
try {
Pointer<Message> message = dispatch->getMessage()->copy();
if (this->internal->transformer != NULL) {
cms::Message* source = dynamic_cast<cms::Message*>(message.get());
cms::Message* transformed = NULL;
if (this->internal->transformer->consumerTransform(
(cms::Session*)this->session, (cms::MessageConsumer*)this, source, &transformed)) {
if (transformed == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Client MessageTransformer returned a NULL message");
}
Message* amqMessage = NULL;
// If the transform create a new ActiveMQ Message command then we can discard the transformed
// cms::Message here, otherwise the transformed message was already an ActiveMQ Message
// command of some sort so we just place casted amqMessage in our Pointer and let it get
// cleaned up after its been dispatched.
if (ActiveMQMessageTransformation::transformMessage(transformed, this->session->getConnection(), &amqMessage)){
delete transformed;
}
message.reset(amqMessage);
}
}
// If the Session is in ClientAcknowledge or IndividualAcknowledge mode, then
// we set the handler in the message to this object and send it out.
if (session->isClientAcknowledge()) {
Pointer<ActiveMQAckHandler> ackHandler(new ClientAckHandler(this->session));
message->setAckHandler(ackHandler);
} else if (session->isIndividualAcknowledge()) {
Pointer<ActiveMQAckHandler> ackHandler(new IndividualAckHandler(this, dispatch));
message->setAckHandler(ackHandler);
} else {
Pointer<ActiveMQAckHandler> ackHandler(new NoOpAckHandler());
message->setAckHandler(ackHandler);
}
return message.dynamicCast<cms::Message>();
}
AMQ_CATCH_RETHROW(cms::CMSException)
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::sendPullRequest(long long timeout) {
try {
this->internal->clearDeliveredList();
// There are still local message, consume them first.
if (!this->internal->unconsumedMessages->isEmpty()) {
return;
}
if (this->consumerInfo->getPrefetchSize() == 0) {
Pointer<MessagePull> messagePull(new MessagePull());
messagePull->setConsumerId(this->consumerInfo->getConsumerId());
messagePull->setDestination(this->consumerInfo->getDestination());
messagePull->setTimeout(timeout);
this->session->oneway(messagePull);
}
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::checkClosed() const {
if (this->isClosed()) {
throw cms::IllegalStateException("Consumer Already Closed" );
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::checkMessageListener() const {
if (this->internal->listener != NULL) {
throw cms::IllegalStateException(
"Cannot synchronously receive a message when a MessageListener is set");
}
this->session->checkMessageListener();
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumerKernel::iterate() {
synchronized(&this->internal->listenerMutex) {
if (this->internal->listener != NULL) {
Pointer<MessageDispatch> dispatch = internal->unconsumedMessages->dequeueNoWait();
if (dispatch != NULL) {
this->dispatch(dispatch);
return true;
}
}
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::inProgressClearRequired() {
this->internal->inProgressClearRequiredFlag.incrementAndGet();
// Clears dispatched messages async to avoid lock contention with inprogress acks.
this->internal->isClearDeliveredList = true;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::clearDeliveredList() {
this->internal->clearDeliveredList();
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::clearMessagesInProgress() {
if (this->internal->inProgressClearRequiredFlag.get() > 0) {
synchronized(this->internal->unconsumedMessages.get()) {
if (this->internal->inProgressClearRequiredFlag.get() > 0) {
// ensure messages that were not yet consumed are rolled back up front as they
// may get redelivered to another consumer by the Broker.
std::vector< Pointer<MessageDispatch> > list = this->internal->unconsumedMessages->removeAll();
if (!this->consumerInfo->isBrowser()) {
std::vector< Pointer<MessageDispatch> >::const_iterator iter = list.begin();
for (; iter != list.end(); ++iter) {
Pointer<MessageDispatch> md = *iter;
this->session->getConnection()->rollbackDuplicate(this, md->getMessage());
}
}
// allow dispatch on this connection to resume
this->session->getConnection()->setTransportInterruptionProcessingComplete();
this->internal->inProgressClearRequiredFlag.decrementAndGet();
// Wake up any blockers and allow them to recheck state.
this->internal->unconsumedMessages->notifyAll();
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumerKernel::isAutoAcknowledgeEach() const {
return this->session->isAutoAcknowledge() ||
(this->session->isDupsOkAcknowledge() && this->consumerInfo->getDestination()->isQueue());
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumerKernel::isAutoAcknowledgeBatch() const {
return this->session->isDupsOkAcknowledge() && !this->consumerInfo->getDestination()->isQueue();
}
////////////////////////////////////////////////////////////////////////////////
int ActiveMQConsumerKernel::getMessageAvailableCount() const {
return this->internal->unconsumedMessages->size();
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::applyDestinationOptions(Pointer<ConsumerInfo> info) {
decaf::lang::Pointer<commands::ActiveMQDestination> amqDestination = info->getDestination();
// Get any options specified in the destination and apply them to the ConsumerInfo object.
const ActiveMQProperties& options = amqDestination->getOptions();
std::string noLocalStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_NOLOCAL);
if (options.hasProperty(noLocalStr)) {
info->setNoLocal(Boolean::parseBoolean(options.getProperty(noLocalStr)));
}
std::string selectorStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_SELECTOR);
if (options.hasProperty(selectorStr)) {
info->setSelector(options.getProperty(selectorStr));
}
std::string priorityStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_PRIORITY);
if (options.hasProperty(priorityStr)) {
info->setPriority((unsigned char) Integer::parseInt(options.getProperty(priorityStr)));
}
std::string dispatchAsyncStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_DISPATCHASYNC);
if (options.hasProperty(dispatchAsyncStr)) {
info->setDispatchAsync(Boolean::parseBoolean(options.getProperty(dispatchAsyncStr)));
}
std::string exclusiveStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_EXCLUSIVE);
if (options.hasProperty(exclusiveStr)) {
info->setExclusive(Boolean::parseBoolean(options.getProperty(exclusiveStr)));
}
std::string maxPendingMsgLimitStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT);
if (options.hasProperty(maxPendingMsgLimitStr)) {
info->setMaximumPendingMessageLimit(Integer::parseInt(options.getProperty(maxPendingMsgLimitStr)));
}
std::string prefetchSizeStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_PREFECTCHSIZE);
if (options.hasProperty(prefetchSizeStr)) {
info->setPrefetchSize(Integer::parseInt(options.getProperty(prefetchSizeStr, "1000")));
}
std::string retroactiveStr = core::ActiveMQConstants::toString(core::ActiveMQConstants::CONSUMER_RETROACTIVE);
if (options.hasProperty(retroactiveStr)) {
info->setRetroactive(Boolean::parseBoolean(options.getProperty(retroactiveStr)));
}
this->internal->nonBlockingRedelivery = Boolean::parseBoolean(
options.getProperty("consumer.nonBlockingRedelivery", "false"));
this->internal->transactedIndividualAck = Boolean::parseBoolean(
options.getProperty("consumer.transactedIndividualAck", "false"));
this->internal->consumerExpiryCheckEnabled = Boolean::parseBoolean(
options.getProperty("consumer.consumerExpiryCheckEnabled", "true"));
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::setRedeliveryPolicy(RedeliveryPolicy* policy) {
if (policy != NULL) {
this->internal->redeliveryPolicy.reset(policy);
}
}
////////////////////////////////////////////////////////////////////////////////
RedeliveryPolicy* ActiveMQConsumerKernel::getRedeliveryPolicy() const {
return this->internal->redeliveryPolicy.get();
}
////////////////////////////////////////////////////////////////////////////////
cms::MessageListener* ActiveMQConsumerKernel::getMessageListener() const {
return this->internal->listener;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::setMessageTransformer(cms::MessageTransformer* transformer) {
this->internal->transformer = transformer;
}
////////////////////////////////////////////////////////////////////////////////
cms::MessageTransformer* ActiveMQConsumerKernel::getMessageTransformer() const {
return this->internal->transformer;
}
////////////////////////////////////////////////////////////////////////////////
const Pointer<commands::ConsumerInfo>& ActiveMQConsumerKernel::getConsumerInfo() const {
return this->consumerInfo;
}
////////////////////////////////////////////////////////////////////////////////
const Pointer<commands::ConsumerId>& ActiveMQConsumerKernel::getConsumerId() const {
return this->consumerInfo->getConsumerId();
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumerKernel::isSynchronizationRegistered() const {
return this->internal->synchronizationRegistered;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::setSynchronizationRegistered(bool value) {
this->internal->synchronizationRegistered = value;
}
////////////////////////////////////////////////////////////////////////////////
long long ActiveMQConsumerKernel::getLastDeliveredSequenceId() const {
return this->internal->lastDeliveredSequenceId;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::setLastDeliveredSequenceId(long long value) {
this->internal->lastDeliveredSequenceId = value;
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumerKernel::isTransactedIndividualAck() const {
return this->internal->transactedIndividualAck;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::setTransactedIndividualAck(bool value) {
this->internal->transactedIndividualAck = value;
}
////////////////////////////////////////////////////////////////////////////////
long long ActiveMQConsumerKernel::setFailoverRedeliveryWaitPeriod() const {
return this->internal->failoverRedeliveryWaitPeriod;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::setFailoverRedeliveryWaitPeriod(long long value) {
this->internal->failoverRedeliveryWaitPeriod = value;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::setFailureError(decaf::lang::Exception* error) {
if (error != NULL) {
this->internal->failureError.reset(error->clone());
}
}
////////////////////////////////////////////////////////////////////////////////
decaf::lang::Exception* ActiveMQConsumerKernel::getFailureError() const {
if (this->internal->failureError == NULL) {
return NULL;
}
return this->internal->failureError.get();
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::setPrefetchSize(int prefetchSize) {
deliverAcks();
this->consumerInfo->setCurrentPrefetchSize(prefetchSize);
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumerKernel::isInUse(Pointer<ActiveMQDestination> destination) const {
return this->consumerInfo->getDestination()->equals(destination.get());
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::setMessageAvailableListener(cms::MessageAvailableListener* listener) {
this->internal->messageAvailableListener = listener;
}
////////////////////////////////////////////////////////////////////////////////
cms::MessageAvailableListener* ActiveMQConsumerKernel::getMessageAvailableListener() const {
return this->internal->messageAvailableListener;
}
////////////////////////////////////////////////////////////////////////////////
int ActiveMQConsumerKernel::getHashCode() const {
return this->internal->hashCode;
}
////////////////////////////////////////////////////////////////////////////////
long long ActiveMQConsumerKernel::getOptimizedAckScheduledAckInterval() const {
return this->internal->optimizedAckScheduledAckInterval;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::setOptimizedAckScheduledAckInterval(long long value) {
this->internal->optimizedAckScheduledAckInterval = value;
if (this->internal->optimizedAckTask != NULL) {
try {
this->session->getScheduler()->cancel(this->internal->optimizedAckTask);
this->internal->optimizedAckTask = NULL;
} catch (Exception& e) {
this->internal->optimizedAckTask = NULL;
throw CMSExceptionSupport::create(e);
}
}
// Should we periodically send out all outstanding acks.
if (this->internal->optimizeAcknowledge && this->internal->optimizedAckScheduledAckInterval > 0) {
Pointer<ActiveMQConsumerKernel> self =
this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
this->internal->optimizedAckTask = new OptimizedAckTask(self, this->internal);
try {
this->session->getScheduler()->executePeriodically(
this->internal->optimizedAckTask, this->internal->optimizedAckScheduledAckInterval);
} catch (Exception& e) {
throw CMSExceptionSupport::create(e);
}
}
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumerKernel::isOptimizeAcknowledge() const {
return this->internal->optimizeAcknowledge;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::setOptimizeAcknowledge(bool value) {
if (this->internal->optimizeAcknowledge && !value) {
deliverAcks();
}
this->internal->optimizeAcknowledge = value;
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumerKernel::isConsumerExpiryCheckEnabled() {
return this->internal->consumerExpiryCheckEnabled;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled) {
this->internal->consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumerKernel::isRedeliveryExpectedInCurrentTransaction(Pointer<MessageDispatch> dispatch) const {
return this->internal->redeliveryExpectedInCurrentTransaction(dispatch, false);
}