| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "ActiveMQConnection.h" |
| |
| #include <cms/Session.h> |
| #include <activemq/core/ActiveMQSession.h> |
| #include <activemq/core/ActiveMQConsumer.h> |
| #include <activemq/exceptions/NullPointerException.h> |
| #include <activemq/util/Boolean.h> |
| #include <activemq/util/Iterator.h> |
| |
| using namespace cms; |
| using namespace activemq; |
| using namespace activemq::core; |
| using namespace activemq::util; |
| using namespace activemq::connector; |
| using namespace activemq::exceptions; |
| using namespace std; |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| ActiveMQConnection::ActiveMQConnection(ActiveMQConnectionData* connectionData) |
| { |
| this->connectionData = connectionData; |
| this->started = false; |
| this->closed = false; |
| this->exceptionListener = NULL; |
| |
| // Register for messages and exceptions from the connector. |
| Connector* connector = connectionData->getConnector(); |
| connector->setConsumerMessageListener( this ); |
| connector->setExceptionListener( this ); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| ActiveMQConnection::~ActiveMQConnection() |
| { |
| try { |
| close(); |
| } |
| AMQ_CATCH_NOTHROW( ActiveMQException ) |
| AMQ_CATCHALL_NOTHROW( ) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::addDispatcher( connector::ConsumerInfo* consumer, |
| Dispatcher* dispatcher ) |
| { |
| // Add the consumer to the map. |
| synchronized( &dispatchers ) { |
| dispatchers.setValue( consumer->getConsumerId(), dispatcher ); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::removeDispatcher( const connector::ConsumerInfo* consumer ) { |
| |
| // Remove the consumer from the map. |
| synchronized( &dispatchers ) { |
| dispatchers.remove( consumer->getConsumerId() ); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::Session* ActiveMQConnection::createSession() throw ( cms::CMSException ) |
| { |
| try { |
| return createSession( Session::AUTO_ACKNOWLEDGE ); |
| } |
| AMQ_CATCH_RETHROW( ActiveMQException ) |
| AMQ_CATCHALL_THROW( ActiveMQException ) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::Session* ActiveMQConnection::createSession( |
| cms::Session::AcknowledgeMode ackMode ) throw ( cms::CMSException ) |
| { |
| try { |
| |
| // Create the session instance. |
| ActiveMQSession* session = new ActiveMQSession( |
| connectionData->getConnector()->createSession( ackMode ), |
| connectionData->getProperties(), |
| this ); |
| |
| // Add the session to the set of active sessions. |
| synchronized( &activeSessions ) { |
| activeSessions.add( session ); |
| } |
| |
| // If we're already started, start the session. |
| if( started ) { |
| session->start(); |
| } |
| |
| return session; |
| } |
| AMQ_CATCH_RETHROW( ActiveMQException ) |
| AMQ_CATCHALL_THROW( ActiveMQException ) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| std::string ActiveMQConnection::getClientID() const |
| { |
| if( closed ) { |
| return ""; |
| } |
| |
| return connectionData->getConnector()->getClientId(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::close() throw ( cms::CMSException ) |
| { |
| try { |
| |
| if( closed ) { |
| return; |
| } |
| |
| // Get the complete list of active sessions. |
| std::vector<ActiveMQSession*> allSessions; |
| synchronized( &activeSessions ) { |
| allSessions = activeSessions.toArray(); |
| } |
| |
| // Close all of the resources. |
| for( unsigned int ix=0; ix<allSessions.size(); ++ix ){ |
| cms::Session* session = allSessions[ix]; |
| try{ |
| session->close(); |
| } catch( cms::CMSException& ex ){ |
| /* Absorb */ |
| } |
| } |
| |
| // Once current deliveries are done this stops the delivery |
| // of any new messages. |
| started = false; |
| closed = true; |
| |
| // Destroy the connection data. This will close the connector |
| // and transports. |
| if( connectionData != NULL ){ |
| delete connectionData; |
| connectionData = NULL; |
| } |
| } |
| AMQ_CATCH_RETHROW( ActiveMQException ) |
| AMQ_CATCHALL_THROW( ActiveMQException ) |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::start() throw ( cms::CMSException ) |
| { |
| // This starts or restarts the delivery of all incomming messages |
| // messages delivered while this connection is stopped are dropped |
| // and not acknowledged. |
| started = true; |
| |
| // Start all the sessions. |
| std::vector<ActiveMQSession*> sessions = activeSessions.toArray(); |
| for( unsigned int ix=0; ix<sessions.size(); ++ix ) { |
| sessions[ix]->start(); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::stop() throw ( cms::CMSException ) |
| { |
| // Once current deliveries are done this stops the delivery of any |
| // new messages. |
| started = false; |
| |
| Iterator<ActiveMQSession*>* iter = activeSessions.iterator(); |
| while( iter->hasNext() ){ |
| iter->next()->stop(); |
| } |
| delete iter; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::onConsumerMessage( connector::ConsumerInfo* consumer, |
| core::ActiveMQMessage* message ) |
| { |
| try { |
| |
| if( connectionData == NULL) { |
| NullPointerException ex( |
| __FILE__, __LINE__, |
| "ActiveMQConnection::onConsumerMessage - " |
| "Connection Data Null, could be closed." ); |
| |
| fire( ex ); |
| |
| return; |
| } |
| |
| // Look up the dispatcher. |
| Dispatcher* dispatcher = NULL; |
| synchronized( &dispatchers ) { |
| |
| dispatcher = dispatchers.getValue(consumer->getConsumerId()); |
| |
| // If we have no registered dispatcher, the consumer was probably |
| // just closed. Just delete the message. |
| if( dispatcher == NULL ) { |
| delete message; |
| } else { |
| |
| // Dispatch the message. |
| DispatchData data( consumer, message ); |
| dispatcher->dispatch( data ); |
| } |
| } |
| } |
| catch( exceptions::ActiveMQException& ex ) { |
| ex.setMark( __FILE__, __LINE__ ); |
| fire( ex ); |
| } |
| catch( ... ) { |
| exceptions::ActiveMQException ex( |
| __FILE__, __LINE__, |
| "IOTransport::run - caught unknown exception" ); |
| fire( ex ); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::onException( const CMSException& ex ){ |
| |
| if( exceptionListener != NULL ){ |
| exceptionListener->onException( ex ); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnection::removeSession( ActiveMQSession* session ) |
| throw ( cms::CMSException ) |
| { |
| try { |
| |
| // Remove this session from the set of active sessions. |
| synchronized( &activeSessions ) { |
| activeSessions.remove( session ); |
| } |
| } |
| AMQ_CATCH_RETHROW( ActiveMQException ) |
| AMQ_CATCHALL_THROW( ActiveMQException ) |
| } |