blob: 6d3f26a2b06532f82c31f0acf622ad5b03ae373f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ActiveMQConsumer.h"
#include <activemq/exceptions/NullPointerException.h>
#include <activemq/exceptions/InvalidStateException.h>
#include <activemq/exceptions/IllegalArgumentException.h>
#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/ActiveMQMessage.h>
#include <activemq/util/Date.h>
#include <cms/ExceptionListener.h>
using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::connector;
using namespace activemq::exceptions;
using namespace activemq::concurrent;
////////////////////////////////////////////////////////////////////////////////
ActiveMQConsumer::ActiveMQConsumer( connector::ConsumerInfo* consumerInfo,
ActiveMQSession* session )
{
if( session == NULL || consumerInfo == NULL )
{
throw NullPointerException(
__FILE__, __LINE__,
"ActiveMQConsumer::ActiveMQConsumer - Init with NULL Session");
}
// Init Producer Data
this->session = session;
this->consumerInfo = consumerInfo;
this->listener = NULL;
this->closed = false;
// Listen for our resource to close
this->consumerInfo->addListener( this );
}
////////////////////////////////////////////////////////////////////////////////
ActiveMQConsumer::~ActiveMQConsumer()
{
try
{
close();
delete consumerInfo;
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW( )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::close()
throw ( cms::CMSException )
{
try
{
if( !closed ) {
// Identifies any errors encountered during shutdown.
bool haveException = false;
ActiveMQException error;
// Close the ConsumerInfo
if( !consumerInfo->isClosed() ) {
try{
// We don't want a callback now
this->consumerInfo->removeListener( this );
this->consumerInfo->close();
} catch( ActiveMQException& ex ){
if( !haveException ){
ex.setMark( __FILE__, __LINE__ );
error = ex;
haveException = true;
}
}
}
closed = true;
// Purge all the pending messages
try{
purgeMessages();
} catch ( ActiveMQException& ex ){
if( !haveException ){
ex.setMark( __FILE__, __LINE__ );
error = ex;
haveException = true;
}
}
// If we encountered an error, propagate it.
if( haveException ){
error.setMark( __FILE__, __LINE__ );
throw error;
}
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
std::string ActiveMQConsumer::getMessageSelector() const
throw ( cms::CMSException )
{
try
{
// Fetch the Selector
return consumerInfo->getMessageSelector();
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
ActiveMQMessage* ActiveMQConsumer::dequeue(int timeout)
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQConsumer::receive - This Consumer is closed" );
}
synchronized( &unconsumedMessages )
{
// Calculate the deadline
long long deadline = 0;
if (timeout > 0) {
deadline = Date::getCurrentTimeMilliseconds() + timeout;
}
// Loop until the time is up or we get a non-expired message
while( true ) {
// Wait until either the deadline is met, a message arrives, or
// we've closed.
while( !closed && unconsumedMessages.empty() && timeout != 0 )
{
if( timeout < 0 ) {
unconsumedMessages.wait();
} else if( timeout > 0 ) {
unconsumedMessages.wait(timeout);
timeout = std::max((int)(deadline - Date::getCurrentTimeMilliseconds()), 0);
}
}
if( unconsumedMessages.empty() ) {
return NULL;
}
// Fetch the Message then copy it so it can be handed off
// to the user.
DispatchData data = unconsumedMessages.pop();
// Get the message.
ActiveMQMessage* message = data.getMessage();
// If it's expired, process the message and then go back to waiting.
if( message->isExpired() ) {
beforeMessageIsConsumed(message);
afterMessageIsConsumed(message, true);
if (timeout > 0) {
timeout = std::max((int)(deadline - Date::getCurrentTimeMilliseconds()), 0);
}
// Go back to waiting for a non-expired message.
continue;
}
// Return the message.
return message;
} // while( true )
} // synchronized( &unconsumedMessages )
return NULL;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* ActiveMQConsumer::receive() throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQConsumer::receive - This Consumer is closed" );
}
// Wait for the next message.
ActiveMQMessage* msg = dequeue( -1 );
if( msg == NULL ) {
return NULL;
}
// Message preprocessing
beforeMessageIsConsumed(msg);
// Need to clone the message because the user is responsible for freeing
// its copy of the message.
cms::Message* clonedMsg = dynamic_cast<cms::Message*>(msg)->clone();
// Post processing (may result in the message being deleted)
afterMessageIsConsumed(msg, false);
// Return the cloned message.
return clonedMsg;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* ActiveMQConsumer::receive( int millisecs )
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQConsumer::receive - This Consumer is closed" );
}
// Wait for the next message.
ActiveMQMessage* msg = dequeue( millisecs );
if( msg == NULL ) {
return NULL;
}
// Message preprocessing
beforeMessageIsConsumed(msg);
// Need to clone the message because the user is responsible for freeing
// its copy of the message.
cms::Message* clonedMsg = dynamic_cast<cms::Message*>(msg)->clone();
// Post processing (may result in the message being deleted)
afterMessageIsConsumed(msg, false);
// Return the cloned message.
return clonedMsg;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* ActiveMQConsumer::receiveNoWait()
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQConsumer::receive - This Consumer is closed" );
}
// Get the next available message, if there is one.
ActiveMQMessage* msg = dequeue( 0 );
if( msg == NULL ) {
return NULL;
}
// Message preprocessing
beforeMessageIsConsumed(msg);
// Need to clone the message because the user is responsible for freeing
// its copy of the message.
cms::Message* clonedMsg = dynamic_cast<cms::Message*>(msg)->clone();
// Post processing (may result in the message being deleted)
afterMessageIsConsumed(msg, false);
// Return the cloned message.
return clonedMsg;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::setMessageListener( cms::MessageListener* listener )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQConsumer::receive - This Consumer is closed" );
}
this->listener = listener;
if( listener != NULL && session != NULL ) {
// Now that we have a valid message listener,
// redispatch all the messages that it missed.
bool wasStarted = session->isStarted();
if( wasStarted ) {
session->stop();
}
session->redispatch( unconsumedMessages );
if( wasStarted ) {
session->start();
}
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::beforeMessageIsConsumed( ActiveMQMessage* message ) {
// If the Session is in ClientAcknowledge mode, then we set the
// handler in the message to this object and send it out. Otherwise
// we ack it here for all the other Modes.
if( session->isClientAcknowledge() ) {
// Register ourself so that we can handle the Message's
// acknowledge method.
message->setAckHandler( this );
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::afterMessageIsConsumed( ActiveMQMessage* message,
bool messageExpired AMQCPP_UNUSED )
{
try
{
if( !session->isClientAcknowledge() ) {
session->acknowledge( this, message );
}
// The Message is cleaned up here if the Session is not
// transacted, otherwise we let the transaction clean up
// this message as it will have already been ack'd and
// stored for later redelivery.
destroyMessage( message );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::acknowledgeMessage( const ActiveMQMessage* message )
throw ( cms::CMSException )
{
try
{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQConsumer::receive - This Consumer is closed" );
}
// Delegate the Ack to the Session, we cast away copnstness since
// in a transactional session we might need to redeliver this
// message and update its data.
session->acknowledge( this, const_cast<ActiveMQMessage*>(message) );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::dispatch( DispatchData& data )
{
try
{
ActiveMQMessage* message = data.getMessage();
// Don't dispatch expired messages, ack it and then destroy it
if( message->isExpired() ) {
session->acknowledge( this, message );
delete message;
// stop now, don't queue
return;
}
// If we have a listener, send the message.
if( listener != NULL ) {
ActiveMQMessage* message = data.getMessage();
// Preprocessing.
beforeMessageIsConsumed( message );
// Notify the listener
listener->onMessage( dynamic_cast<cms::Message*>(message) );
// Postprocessing
afterMessageIsConsumed( message, false );
} else {
// No listener, add it to the unconsumed messages list
synchronized( &unconsumedMessages ) {
unconsumedMessages.push( data );
unconsumedMessages.notifyAll();
}
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::purgeMessages() throw ( ActiveMQException )
{
try
{
synchronized( &unconsumedMessages )
{
while( !unconsumedMessages.empty() )
{
// destroy these messages if this is not a transacted
// session, if it is then the tranasction will clean
// the messages up.
destroyMessage( unconsumedMessages.pop().getMessage() );
}
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::destroyMessage( ActiveMQMessage* message )
throw ( ActiveMQException )
{
try
{
/**
* Only destroy the message if the session is NOT transacted. If
* it is, the session will take care of it.
*/
if( message != NULL && !session->isTransacted() )
{
delete message;
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::onConnectorResourceClosed(
const ConnectorResource* resource ) throw ( cms::CMSException ) {
try{
if( closed )
{
throw InvalidStateException(
__FILE__, __LINE__,
"ActiveMQConsumer::onConnectorResourceClosed - "
"Producer Already Closed");
}
if( resource != consumerInfo ) {
throw IllegalArgumentException(
__FILE__, __LINE__,
"ActiveMQConsumer::onConnectorResourceClosed - "
"Unknown object passed to this callback");
}
// If our producer isn't closed already, then lets close
this->close();
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}