blob: 70fad6a9c998e2e1bf427a3a18804b1a8dd8fdd8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ActiveMQConnectionFactory.h"
#include <decaf/net/URI.h>
#include <decaf/util/Properties.h>
#include <decaf/lang/Boolean.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Pointer.h>
#include <decaf/lang/Math.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <activemq/exceptions/ExceptionDefines.h>
#include <activemq/transport/TransportRegistry.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq/core/policies/DefaultRedeliveryPolicy.h>
#include <activemq/util/URISupport.h>
#include <memory>
using namespace std;
using namespace activemq;
using namespace activemq::core;
using namespace activemq::core::policies;
using namespace activemq::exceptions;
using namespace activemq::transport;
using namespace decaf;
using namespace decaf::net;
using namespace decaf::util;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
const std::string ActiveMQConnectionFactory::DEFAULT_URI = "failover:(tcp://localhost:61616)";
////////////////////////////////////////////////////////////////////////////////
namespace activemq{
namespace core{
class FactorySettings {
public:
Pointer<Properties> properties;
std::string username;
std::string password;
std::string clientId;
URI brokerURI;
bool dispatchAsync;
bool alwaysSyncSend;
bool useAsyncSend;
bool useCompression;
int compressionLevel;
unsigned int sendTimeout;
unsigned int closeTimeout;
unsigned int producerWindowSize;
cms::ExceptionListener* defaultListener;
std::auto_ptr<PrefetchPolicy> defaultPrefetchPolicy;
std::auto_ptr<RedeliveryPolicy> defaultRedeliveryPolicy;
FactorySettings() : brokerURI( ActiveMQConnectionFactory::DEFAULT_URI ),
dispatchAsync( true ),
alwaysSyncSend( false ),
useAsyncSend( false ),
useCompression( false ),
compressionLevel( -1 ),
sendTimeout( 0 ),
closeTimeout( 15000 ),
producerWindowSize( 0 ),
defaultListener( NULL ),
defaultPrefetchPolicy( NULL ),
defaultRedeliveryPolicy( NULL ) {
this->properties.reset( new Properties() );
this->defaultPrefetchPolicy.reset( new DefaultPrefetchPolicy() );
this->defaultRedeliveryPolicy.reset( new DefaultRedeliveryPolicy() );
}
void updateConfiguration( const URI& uri ) {
this->brokerURI = uri;
this->properties->clear();
activemq::util::URISupport::parseQuery( uri.getQuery(), properties.get() );
// Check the connection options
this->alwaysSyncSend = Boolean::parseBoolean(
properties->getProperty(
core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_ALWAYSSYNCSEND ), "false" ) );
this->useAsyncSend = Boolean::parseBoolean(
properties->getProperty(
core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_USEASYNCSEND ), "false" ) );
this->useCompression = Boolean::parseBoolean(
properties->getProperty(
core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_USECOMPRESSION ), "false" ) );
this->compressionLevel = decaf::lang::Integer::parseInt(
properties->getProperty( "connection.compressionLevel", "-1" ) );
this->dispatchAsync = Boolean::parseBoolean(
properties->getProperty(
core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_DISPATCHASYNC ), "true" ) );
this->producerWindowSize = decaf::lang::Integer::parseInt(
properties->getProperty(
core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_PRODUCERWINDOWSIZE ), "0" ) );
this->sendTimeout = decaf::lang::Integer::parseInt(
properties->getProperty(
core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_SENDTIMEOUT ), "0" ) );
this->closeTimeout = decaf::lang::Integer::parseInt(
properties->getProperty(
core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_CLOSETIMEOUT ), "15000" ) );
this->clientId = properties->getProperty(
core::ActiveMQConstants::toString(
core::ActiveMQConstants::PARAM_CLIENTID ), "" );
this->username = properties->getProperty(
core::ActiveMQConstants::toString(
core::ActiveMQConstants::PARAM_USERNAME ), "" );
this->password = properties->getProperty(
core::ActiveMQConstants::toString(
core::ActiveMQConstants::PARAM_PASSWORD ), "" );
this->defaultPrefetchPolicy->configure( *properties );
this->defaultRedeliveryPolicy->configure( *properties );
}
};
}}
////////////////////////////////////////////////////////////////////////////////
cms::ConnectionFactory* cms::ConnectionFactory::createCMSConnectionFactory( const std::string& brokerURI ) {
return new ActiveMQConnectionFactory( brokerURI );
}
////////////////////////////////////////////////////////////////////////////////
ActiveMQConnectionFactory::ActiveMQConnectionFactory() : settings( new FactorySettings() ) {
}
////////////////////////////////////////////////////////////////////////////////
ActiveMQConnectionFactory::ActiveMQConnectionFactory( const std::string& url,
const std::string& username,
const std::string& password ) : settings( new FactorySettings() ) {
this->setBrokerURI( URI( url ) );
// Store login data in the properties
if( !username.empty() ) {
this->settings->username = username;
}
if( !password.empty() ) {
this->settings->password = password;
}
}
////////////////////////////////////////////////////////////////////////////////
ActiveMQConnectionFactory::ActiveMQConnectionFactory( const decaf::net::URI& uri,
const std::string& username,
const std::string& password ) : settings( new FactorySettings() ) {
this->setBrokerURI( uri );
// Store login data in the properties
if( !username.empty() ) {
this->settings->username = username;
}
if( !password.empty() ) {
this->settings->password = password;
}
}
////////////////////////////////////////////////////////////////////////////////
ActiveMQConnectionFactory::~ActiveMQConnectionFactory() throw() {
try{
delete this->settings;
}
DECAF_CATCH_NOTHROW( Exception )
DECAF_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
cms::Connection* ActiveMQConnectionFactory::createConnection() {
return doCreateConnection( settings->brokerURI, settings->username, settings->password, settings->clientId );
}
////////////////////////////////////////////////////////////////////////////////
cms::Connection* ActiveMQConnectionFactory::createConnection( const std::string& username,
const std::string& password ) {
return doCreateConnection( settings->brokerURI, username, password, settings->clientId );
}
////////////////////////////////////////////////////////////////////////////////
cms::Connection* ActiveMQConnectionFactory::createConnection( const std::string& username,
const std::string& password,
const std::string& clientId ) {
return doCreateConnection( settings->brokerURI, username, password, clientId );
}
////////////////////////////////////////////////////////////////////////////////
cms::Connection* ActiveMQConnectionFactory::doCreateConnection( const decaf::net::URI& uri,
const std::string& username,
const std::string& password,
const std::string& clientId ) {
Pointer<Transport> transport;
auto_ptr<ActiveMQConnection> connection;
try{
this->setBrokerURI( uri );
// Store login data in the properties
if( !username.empty() ) {
this->settings->username = username;
}
if( !password.empty() ) {
this->settings->password = password;
}
if( !clientId.empty() ) {
this->settings->clientId = clientId;
}
// Use the TransportBuilder to get our Transport
transport =
TransportRegistry::getInstance().findFactory( uri.getScheme() )->create( uri );
if( transport == NULL ){
throw ActiveMQException(
__FILE__, __LINE__,
"ActiveMQConnectionFactory::createConnection - "
"failed creating new Transport" );
}
Pointer<Properties> properties( this->settings->properties->clone() );
// Create and Return the new connection object.
connection.reset( new ActiveMQConnection( transport, properties ) );
// Set all options parsed from the URI.
configureConnection( connection.get() );
// Now start the connection since all other configuration is done.
transport->start();
if( !this->settings->clientId.empty() ) {
connection->setDefaultClientId( this->settings->clientId );
}
return connection.release();
}
catch( cms::CMSException& ex ){
ex.setMark( __FILE__, __LINE__ );
throw ex;
} catch( activemq::exceptions::ActiveMQException& ex ){
ex.setMark( __FILE__, __LINE__ );
throw ex.convertToCMSException();
} catch( decaf::lang::Exception& ex ){
ex.setMark( __FILE__, __LINE__ );
activemq::exceptions::ActiveMQException amqEx( ex );
throw amqEx.convertToCMSException();
} catch( std::exception& ex ) {
throw cms::CMSException( ex.what(), NULL );
} catch(...) {
throw cms::CMSException( "Caught Unknown Exception", NULL );
}
}
////////////////////////////////////////////////////////////////////////////////
cms::Connection* ActiveMQConnectionFactory::createConnection(
const std::string& uri,
const std::string& username,
const std::string& password,
const std::string& clientId ) {
ActiveMQConnectionFactory factory;
return factory.doCreateConnection( URI( uri ), username, password, clientId );
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::configureConnection( ActiveMQConnection* connection ) {
connection->setUsername( this->settings->username );
connection->setPassword( this->settings->password );
connection->setDispatchAsync( this->settings->dispatchAsync );
connection->setAlwaysSyncSend( this->settings->alwaysSyncSend );
connection->setUseAsyncSend( this->settings->useAsyncSend );
connection->setUseCompression( this->settings->useCompression );
connection->setCompressionLevel( this->settings->compressionLevel );
connection->setSendTimeout( this->settings->sendTimeout );
connection->setCloseTimeout( this->settings->closeTimeout );
connection->setProducerWindowSize( this->settings->producerWindowSize );
connection->setPrefetchPolicy( this->settings->defaultPrefetchPolicy->clone() );
connection->setRedeliveryPolicy( this->settings->defaultRedeliveryPolicy->clone() );
if( this->settings->defaultListener ) {
connection->setExceptionListener( this->settings->defaultListener );
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setUsername( const std::string& username ) {
settings->username = username;
}
////////////////////////////////////////////////////////////////////////////////
const std::string& ActiveMQConnectionFactory::getUsername() const {
return settings->username;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setPassword( const std::string& password ){
settings->password = password;
}
////////////////////////////////////////////////////////////////////////////////
const std::string& ActiveMQConnectionFactory::getPassword() const {
return settings->password;
}
////////////////////////////////////////////////////////////////////////////////
std::string ActiveMQConnectionFactory::getClientId() const {
return this->settings->clientId;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setClientId( const std::string& clientId ) {
this->settings->clientId = clientId;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setBrokerURI( const std::string& uri ) {
this->setBrokerURI( URI( uri ) );
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setBrokerURI( const decaf::net::URI& uri ) {
// Update configuration with new authentication info if any was provided.
this->settings->updateConfiguration( uri );
}
////////////////////////////////////////////////////////////////////////////////
const decaf::net::URI& ActiveMQConnectionFactory::getBrokerURI() const {
return settings->brokerURI;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setExceptionListener( cms::ExceptionListener* listener ) {
this->settings->defaultListener = listener;
}
////////////////////////////////////////////////////////////////////////////////
cms::ExceptionListener* ActiveMQConnectionFactory::getExceptionListener() const {
return this->settings->defaultListener;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setPrefetchPolicy( PrefetchPolicy* policy ) {
this->settings->defaultPrefetchPolicy.reset( policy );
}
////////////////////////////////////////////////////////////////////////////////
PrefetchPolicy* ActiveMQConnectionFactory::getPrefetchPolicy() const {
return this->settings->defaultPrefetchPolicy.get();
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setRedeliveryPolicy( RedeliveryPolicy* policy ) {
this->settings->defaultRedeliveryPolicy.reset( policy );
}
////////////////////////////////////////////////////////////////////////////////
RedeliveryPolicy* ActiveMQConnectionFactory::getRedeliveryPolicy() const {
return this->settings->defaultRedeliveryPolicy.get();
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnectionFactory::isDispatchAsync() const {
return this->settings->dispatchAsync;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setDispatchAsync( bool value ) {
this->settings->dispatchAsync = value;
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnectionFactory::isAlwaysSyncSend() const {
return this->settings->alwaysSyncSend;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setAlwaysSyncSend( bool value ) {
this->settings->alwaysSyncSend = value;
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnectionFactory::isUseAsyncSend() const {
return this->settings->useAsyncSend;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setUseAsyncSend( bool value ) {
this->settings->useAsyncSend = value;
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnectionFactory::isUseCompression() const {
return this->settings->useCompression;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setUseCompression( bool value ) {
this->settings->useCompression = value;
}
////////////////////////////////////////////////////////////////////////////////
int ActiveMQConnectionFactory::getCompressionLevel() const {
return this->settings->compressionLevel;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setCompressionLevel( int value ) {
if( value < 0 ) {
this->settings->compressionLevel = -1;
}
this->settings->compressionLevel = Math::min( value, 9 );
}
////////////////////////////////////////////////////////////////////////////////
unsigned int ActiveMQConnectionFactory::getSendTimeout() const {
return this->settings->sendTimeout;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setSendTimeout( unsigned int timeout ) {
this->settings->sendTimeout = timeout;
}
////////////////////////////////////////////////////////////////////////////////
unsigned int ActiveMQConnectionFactory::getCloseTimeout() const {
return this->settings->closeTimeout;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setCloseTimeout( unsigned int timeout ) {
this->settings->closeTimeout = timeout;
}
////////////////////////////////////////////////////////////////////////////////
unsigned int ActiveMQConnectionFactory::getProducerWindowSize() const {
return this->settings->producerWindowSize;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnectionFactory::setProducerWindowSize( unsigned int windowSize ) {
this->settings->producerWindowSize = windowSize;
}