blob: c8bd0f16c15e094e5098def7d27c0f6313833a38 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
#define _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_
#include <cms/Connection.h>
#include <cms/ExceptionListener.h>
#include <activemq/core/ActiveMQConnectionData.h>
#include <activemq/core/ActiveMQMessage.h>
#include <activemq/core/Dispatcher.h>
#include <activemq/connector/ConsumerMessageListener.h>
#include <activemq/util/Properties.h>
#include <activemq/util/Map.h>
#include <activemq/util/Set.h>
#include <string>
namespace activemq{
namespace connector {
class ConsumerInfo;
}
namespace core{
class cms::Session;
class ActiveMQSession;
class ActiveMQConsumer;
/**
* Concrete connection used for all connectors to the
* ActiveMQ broker.
*/
class ActiveMQConnection :
public cms::Connection,
public connector::ConsumerMessageListener,
public cms::ExceptionListener
{
private:
/**
* the registered exception listener
*/
cms::ExceptionListener* exceptionListener;
/**
* All the data that is used to connect this Connection
*/
ActiveMQConnectionData* connectionData;
/**
* Indicates if this Connection is started
*/
bool started;
/**
* Indicates that this connection has been closed, it is no longer
* usable after this becomes true
*/
bool closed;
/**
* Map of message dispatchers indexed by consumer id.
*/
util::Map< long long, Dispatcher* > dispatchers;
/**
* Maintain the set of all active sessions.
*/
util::Set<ActiveMQSession*> activeSessions;
public:
/**
* Constructor
* @param Pointer to an ActiveMQConnectionData object, owned here
*/
ActiveMQConnection( ActiveMQConnectionData* connectionData );
virtual ~ActiveMQConnection();
/**
* Removes the session resources for the given session
* instance.
* @param session The session to be unregistered from this connection.
*/
virtual void removeSession( ActiveMQSession* session ) throw ( cms::CMSException );
/**
* Adds a dispatcher for a consumer.
* @param consumer - The consumer for which to register a dispatcher.
* @param dispatcher - The dispatcher to handle incoming messages for the consumer.
*/
virtual void addDispatcher( connector::ConsumerInfo* consumer, Dispatcher* dispatcher );
/**
* Removes the dispatcher for a consumer.
* @param consumer - The consumer for which to remove the dispatcher.
*/
virtual void removeDispatcher( const connector::ConsumerInfo* consumer );
public: // Connection Interface Methods
/**
* Creates a new Session to work for this Connection
* @throws CMSException
*/
virtual cms::Session* createSession() throw ( cms::CMSException );
/**
* Creates a new Session to work for this Connection using the
* specified acknowledgment mode
* @param ackMode the Acknowledgement Mode to use.
* @throws CMSException
*/
virtual cms::Session* createSession( cms::Session::AcknowledgeMode ackMode )
throw ( cms::CMSException );
/**
* Get the Client Id for this session
* @return string version of Client Id
*/
virtual std::string getClientID() const;
/**
* Retrieves the Connection Data object for this object.
* @return pointer to a connection data object.
*/
virtual ActiveMQConnectionData* getConnectionData(){
return connectionData;
}
/**
* Gets the registered Exception Listener for this connection
* @return pointer to an exception listnener or NULL
*/
virtual cms::ExceptionListener* getExceptionListener() const{
return exceptionListener; };
/**
* Sets the registed Exception Listener for this connection
* @param listener pointer to and <code>ExceptionListener</code>
*/
virtual void setExceptionListener( cms::ExceptionListener* listener ){
exceptionListener = listener;
};
/**
* Closes this connection as well as any Sessions
* created from it (and those Sessions' consumers and
* producers).
* @throws CMSException
*/
virtual void close() throw ( cms::CMSException );
/**
* Starts or (restarts) a connections delivery of incoming messages
* @throws CMSException
*/
virtual void start() throw ( cms::CMSException );
/**
* Stop the flow of incoming messages
* @throws CMSException
*/
virtual void stop() throw ( cms::CMSException );
public: // ExceptionListener interface methods
/**
* Called when an exception occurs. Once notified of an exception
* the caller should no longer use the resource that generated the
* exception.
* @param Exception Object that occurred.
*/
virtual void onException( const cms::CMSException& ex );
public: // ConsumerMessageListener interface methods
/**
* Called to dispatch a message to a particular consumer.
* @param consumer the target consumer of the dispatch.
* @param message the message to be dispatched.
* @param own If true, it is the responsibility of the callee
* to destroy the message object. Otherwise, the callee must NOT
* destroy it.
*
*/
virtual void onConsumerMessage( connector::ConsumerInfo* consumer,
core::ActiveMQMessage* message );
public:
/**
* Notify the excpetion listener
* @param ex the exception to fire
*/
void fire( exceptions::ActiveMQException& ex )
{
if( exceptionListener != NULL )
{
try
{
exceptionListener->onException( ex );
}
catch(...){}
}
}
};
}}
#endif /*ACTIVEMQCONNECTION_H_*/