| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "ActiveMQConnectionFactory.h" |
| |
| #include <cms/MessageTransformer.h> |
| #include <decaf/net/URI.h> |
| #include <decaf/util/Properties.h> |
| #include <decaf/util/concurrent/Mutex.h> |
| #include <decaf/lang/Boolean.h> |
| #include <decaf/lang/Integer.h> |
| #include <decaf/lang/Long.h> |
| #include <decaf/lang/Pointer.h> |
| #include <decaf/lang/Math.h> |
| #include <decaf/lang/exceptions/NullPointerException.h> |
| #include <activemq/exceptions/ExceptionDefines.h> |
| #include <activemq/transport/TransportRegistry.h> |
| #include <activemq/core/ActiveMQConnection.h> |
| #include <activemq/core/ActiveMQConstants.h> |
| #include <activemq/core/ActiveMQMessageAudit.h> |
| #include <activemq/core/policies/DefaultPrefetchPolicy.h> |
| #include <activemq/core/policies/DefaultRedeliveryPolicy.h> |
| #include <activemq/util/URISupport.h> |
| #include <activemq/util/CompositeData.h> |
| #include <memory> |
| |
| using namespace std; |
| using namespace activemq; |
| using namespace activemq::util; |
| using namespace activemq::core; |
| using namespace activemq::core::policies; |
| using namespace activemq::exceptions; |
| using namespace activemq::transport; |
| using namespace decaf; |
| using namespace decaf::net; |
| using namespace decaf::util; |
| using namespace decaf::util::concurrent; |
| using namespace decaf::lang; |
| using namespace decaf::lang::exceptions; |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| const std::string ActiveMQConnectionFactory::DEFAULT_URI = "failover:(tcp://localhost:61616)"; |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| namespace activemq{ |
| namespace core{ |
| |
| class FactorySettings { |
| private: |
| |
| FactorySettings(const FactorySettings&); |
| FactorySettings& operator=(const FactorySettings&); |
| |
| public: |
| |
| Mutex configLock; |
| |
| Pointer<Properties> properties; |
| |
| std::string username; |
| std::string password; |
| std::string clientId; |
| |
| URI brokerURI; |
| |
| bool dispatchAsync; |
| bool alwaysSyncSend; |
| bool useAsyncSend; |
| bool sendAcksAsync; |
| bool messagePrioritySupported; |
| bool useCompression; |
| bool useRetroactiveConsumer; |
| bool watchTopicAdvisories; |
| bool checkForDuplicates; |
| bool optimizeAcknowledge; |
| bool exclusiveConsumer; |
| bool transactedIndividualAck; |
| bool nonBlockingRedelivery; |
| bool alwaysSessionAsync; |
| int compressionLevel; |
| unsigned int sendTimeout; |
| unsigned int closeTimeout; |
| unsigned int producerWindowSize; |
| int auditDepth; |
| int auditMaximumProducerNumber; |
| long long optimizeAcknowledgeTimeOut; |
| long long optimizedAckScheduledAckInterval; |
| long long consumerFailoverRedeliveryWaitPeriod; |
| bool consumerExpiryCheckEnabled; |
| |
| cms::ExceptionListener* defaultListener; |
| cms::MessageTransformer* defaultTransformer; |
| std::auto_ptr<PrefetchPolicy> defaultPrefetchPolicy; |
| std::auto_ptr<RedeliveryPolicy> defaultRedeliveryPolicy; |
| |
| FactorySettings() : configLock(), |
| properties(new Properties()), |
| username(), |
| password(), |
| clientId(), |
| brokerURI(ActiveMQConnectionFactory::DEFAULT_URI), |
| dispatchAsync(true), |
| alwaysSyncSend(false), |
| useAsyncSend(false), |
| sendAcksAsync(true), |
| messagePrioritySupported(false), |
| useCompression(false), |
| useRetroactiveConsumer(false), |
| watchTopicAdvisories(true), |
| checkForDuplicates(true), |
| optimizeAcknowledge(false), |
| exclusiveConsumer(false), |
| transactedIndividualAck(false), |
| nonBlockingRedelivery(false), |
| alwaysSessionAsync(true), |
| compressionLevel(-1), |
| sendTimeout(0), |
| closeTimeout(15000), |
| producerWindowSize(0), |
| auditDepth(ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE), |
| auditMaximumProducerNumber(ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT), |
| optimizeAcknowledgeTimeOut(300), |
| optimizedAckScheduledAckInterval(0), |
| consumerFailoverRedeliveryWaitPeriod(0), |
| consumerExpiryCheckEnabled(true), |
| defaultListener(NULL), |
| defaultTransformer(NULL), |
| defaultPrefetchPolicy(new DefaultPrefetchPolicy()), |
| defaultRedeliveryPolicy(new DefaultRedeliveryPolicy()) { |
| } |
| |
| void updateConfiguration(const URI& uri) { |
| |
| this->brokerURI = uri; |
| this->properties->clear(); |
| |
| if (uri.getQuery() != "") { |
| // Not a composite URI so this works fine. |
| try { |
| URISupport::parseQuery(uri.getQuery(), properties.get()); |
| } catch (URISyntaxException& ex) { |
| } |
| } else { |
| // Composite URI won't indicate it has a query even if it does. |
| try { |
| CompositeData composite = URISupport::parseComposite(uri); |
| *this->properties = composite.getParameters(); |
| } catch (URISyntaxException& ex) { |
| } |
| } |
| |
| // Check the connection options |
| this->alwaysSyncSend = Boolean::parseBoolean( |
| properties->getProperty(core::ActiveMQConstants::toString( |
| core::ActiveMQConstants::CONNECTION_ALWAYSSYNCSEND), Boolean::toString(alwaysSyncSend))); |
| this->useAsyncSend = Boolean::parseBoolean( |
| properties->getProperty(core::ActiveMQConstants::toString( |
| core::ActiveMQConstants::CONNECTION_USEASYNCSEND), Boolean::toString(useAsyncSend))); |
| this->useCompression = Boolean::parseBoolean( |
| properties->getProperty(core::ActiveMQConstants::toString( |
| core::ActiveMQConstants::CONNECTION_USECOMPRESSION), Boolean::toString(useCompression))); |
| this->compressionLevel = Integer::parseInt( |
| properties->getProperty("connection.compressionLevel", Integer::toString(compressionLevel))); |
| this->messagePrioritySupported = Boolean::parseBoolean( |
| properties->getProperty("connection.messagePrioritySupported", Boolean::toString(messagePrioritySupported))); |
| this->checkForDuplicates = Boolean::parseBoolean( |
| properties->getProperty("connection.checkForDuplicates", Boolean::toString(checkForDuplicates))); |
| this->auditDepth = Integer::parseInt( |
| properties->getProperty("connection.auditDepth", Integer::toString(auditDepth))); |
| this->auditMaximumProducerNumber = Integer::parseInt( |
| properties->getProperty("connection.auditMaximumProducerNumber", Integer::toString(auditMaximumProducerNumber))); |
| this->dispatchAsync = Boolean::parseBoolean( |
| properties->getProperty(core::ActiveMQConstants::toString( |
| core::ActiveMQConstants::CONNECTION_DISPATCHASYNC), Boolean::toString(dispatchAsync))); |
| this->producerWindowSize = Integer::parseInt( |
| properties->getProperty(core::ActiveMQConstants::toString( |
| core::ActiveMQConstants::CONNECTION_PRODUCERWINDOWSIZE), Integer::toString(producerWindowSize))); |
| this->sendTimeout = decaf::lang::Integer::parseInt( |
| properties->getProperty(core::ActiveMQConstants::toString( |
| core::ActiveMQConstants::CONNECTION_SENDTIMEOUT), Integer::toString(sendTimeout))); |
| this->closeTimeout = decaf::lang::Integer::parseInt( |
| properties->getProperty(core::ActiveMQConstants::toString( |
| core::ActiveMQConstants::CONNECTION_CLOSETIMEOUT), Integer::toString(closeTimeout))); |
| this->clientId = properties->getProperty( |
| core::ActiveMQConstants::toString(core::ActiveMQConstants::PARAM_CLIENTID), clientId); |
| this->username = properties->getProperty( |
| core::ActiveMQConstants::toString(core::ActiveMQConstants::PARAM_USERNAME), username); |
| this->password = properties->getProperty( |
| core::ActiveMQConstants::toString(core::ActiveMQConstants::PARAM_PASSWORD), password); |
| this->optimizeAcknowledge = Boolean::parseBoolean( |
| properties->getProperty("connection.optimizeAcknowledge", Boolean::toString(optimizeAcknowledge))); |
| this->exclusiveConsumer = Boolean::parseBoolean( |
| properties->getProperty("connection.exclusiveConsumer", Boolean::toString(exclusiveConsumer))); |
| this->transactedIndividualAck = Boolean::parseBoolean( |
| properties->getProperty("connection.transactedIndividualAck", Boolean::toString(transactedIndividualAck))); |
| this->useRetroactiveConsumer = Boolean::parseBoolean( |
| properties->getProperty("connection.useRetroactiveConsumer", Boolean::toString(useRetroactiveConsumer))); |
| this->sendAcksAsync = Boolean::parseBoolean( |
| properties->getProperty("connection.sendAcksAsync", Boolean::toString(sendAcksAsync))); |
| this->optimizeAcknowledgeTimeOut = Long::parseLong( |
| properties->getProperty("connection.optimizeAcknowledgeTimeOut", Long::toString(optimizeAcknowledgeTimeOut))); |
| this->optimizedAckScheduledAckInterval = Long::parseLong( |
| properties->getProperty("connection.optimizedAckScheduledAckInterval", Long::toString(optimizedAckScheduledAckInterval))); |
| this->consumerFailoverRedeliveryWaitPeriod = Long::parseLong( |
| properties->getProperty("connection.consumerFailoverRedeliveryWaitPeriod", Long::toString(consumerFailoverRedeliveryWaitPeriod))); |
| this->nonBlockingRedelivery = Boolean::parseBoolean( |
| properties->getProperty("connection.nonBlockingRedelivery", Boolean::toString(nonBlockingRedelivery))); |
| this->watchTopicAdvisories = Boolean::parseBoolean( |
| properties->getProperty("connection.watchTopicAdvisories", Boolean::toString(watchTopicAdvisories))); |
| this->alwaysSessionAsync = Boolean::parseBoolean( |
| properties->getProperty("connection.alwaysSessionAsync", Boolean::toString(alwaysSessionAsync))); |
| this->consumerExpiryCheckEnabled = Boolean::parseBoolean( |
| properties->getProperty("connection.consumerExpiryCheckEnabled", Boolean::toString(consumerExpiryCheckEnabled))); |
| |
| this->defaultPrefetchPolicy->configure(*properties); |
| this->defaultRedeliveryPolicy->configure(*properties); |
| } |
| |
| static URI createURI(const std::string& uriString) { |
| try { |
| return URI(uriString); |
| } catch (URISyntaxException& ex) { |
| throw cms::CMSException("Invalid Connection Uri detected."); |
| } |
| } |
| }; |
| |
| }} |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::ConnectionFactory* cms::ConnectionFactory::createCMSConnectionFactory(const std::string& brokerURI) { |
| return new ActiveMQConnectionFactory( brokerURI ); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| ActiveMQConnectionFactory::ActiveMQConnectionFactory() : settings(new FactorySettings()) { |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| ActiveMQConnectionFactory::ActiveMQConnectionFactory(const std::string& uri, |
| const std::string& username, |
| const std::string& password) : settings( new FactorySettings() ) { |
| |
| this->setBrokerURI(FactorySettings::createURI(uri)); |
| |
| // Store login data in the properties |
| if (!username.empty()) { |
| this->settings->username = username; |
| } |
| if (!password.empty()) { |
| this->settings->password = password; |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| ActiveMQConnectionFactory::ActiveMQConnectionFactory(const decaf::net::URI& uri, |
| const std::string& username, |
| const std::string& password) : settings( new FactorySettings() ) { |
| |
| this->setBrokerURI(uri); |
| |
| // Store login data in the properties |
| if (!username.empty()) { |
| this->settings->username = username; |
| } |
| if (!password.empty()) { |
| this->settings->password = password; |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| ActiveMQConnectionFactory::~ActiveMQConnectionFactory() { |
| try { |
| delete this->settings; |
| } |
| DECAF_CATCH_NOTHROW(Exception) |
| DECAF_CATCHALL_NOTHROW() |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::Connection* ActiveMQConnectionFactory::createConnection() { |
| return doCreateConnection(settings->brokerURI, settings->username, settings->password, settings->clientId); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::Connection* ActiveMQConnectionFactory::createConnection(const std::string& username, const std::string& password) { |
| return doCreateConnection(settings->brokerURI, username, password, settings->clientId); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::Connection* ActiveMQConnectionFactory::createConnection(const std::string& username, const std::string& password, const std::string& clientId) { |
| return doCreateConnection(settings->brokerURI, username, password, clientId); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::Connection* ActiveMQConnectionFactory::doCreateConnection(const decaf::net::URI& uri, |
| const std::string& username, |
| const std::string& password, |
| const std::string& clientId) { |
| |
| Pointer<Transport> transport; |
| auto_ptr<ActiveMQConnection> connection; |
| |
| try { |
| |
| synchronized(&this->settings->configLock) { |
| |
| this->setBrokerURI(uri); |
| |
| // Store login data in the properties |
| if (!username.empty()) { |
| this->settings->username = username; |
| } |
| if (!password.empty()) { |
| this->settings->password = password; |
| } |
| if (!clientId.empty()) { |
| this->settings->clientId = clientId; |
| } |
| |
| // Use the TransportBuilder to get our Transport |
| transport = TransportRegistry::getInstance().findFactory(uri.getScheme())->create(uri); |
| |
| if (transport == NULL) { |
| throw ActiveMQException(__FILE__, __LINE__, "ActiveMQConnectionFactory::createConnection - " |
| "failed creating new Transport"); |
| } |
| |
| Pointer<Properties> properties(this->settings->properties->clone()); |
| |
| // Create and Return the new connection object. |
| connection.reset(createActiveMQConnection(transport, properties)); |
| |
| // Set all options parsed from the URI. |
| configureConnection(connection.get()); |
| |
| // Now start the connection since all other configuration is done. |
| transport->start(); |
| |
| if (!this->settings->clientId.empty()) { |
| connection->setDefaultClientId(this->settings->clientId); |
| } |
| } |
| |
| return connection.release(); |
| } catch (cms::CMSException& ex) { |
| ex.setMark(__FILE__, __LINE__); |
| throw ex; |
| } catch (activemq::exceptions::ActiveMQException& ex) { |
| ex.setMark(__FILE__, __LINE__); |
| throw ex.convertToCMSException(); |
| } catch (decaf::lang::Exception& ex) { |
| ex.setMark(__FILE__, __LINE__); |
| activemq::exceptions::ActiveMQException amqEx(ex); |
| throw amqEx.convertToCMSException(); |
| } catch (std::exception& ex) { |
| throw cms::CMSException(ex.what(), NULL); |
| } catch (...) { |
| throw cms::CMSException("Caught Unknown Exception", NULL); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| ActiveMQConnection* ActiveMQConnectionFactory::createActiveMQConnection( |
| const Pointer<transport::Transport>& transport, |
| const Pointer<decaf::util::Properties>& properties) { |
| |
| return new ActiveMQConnection(transport, properties); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::Connection* ActiveMQConnectionFactory::createConnection(const std::string& uri, |
| const std::string& username, |
| const std::string& password, |
| const std::string& clientId) { |
| ActiveMQConnectionFactory factory; |
| return factory.doCreateConnection(URI(uri), username, password, clientId); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::configureConnection(ActiveMQConnection* connection) { |
| |
| connection->setUsername(this->settings->username); |
| connection->setPassword(this->settings->password); |
| connection->setDispatchAsync(this->settings->dispatchAsync); |
| connection->setAlwaysSyncSend(this->settings->alwaysSyncSend); |
| connection->setUseAsyncSend(this->settings->useAsyncSend); |
| connection->setUseCompression(this->settings->useCompression); |
| connection->setCompressionLevel(this->settings->compressionLevel); |
| connection->setSendTimeout(this->settings->sendTimeout); |
| connection->setCloseTimeout(this->settings->closeTimeout); |
| connection->setProducerWindowSize(this->settings->producerWindowSize); |
| connection->setPrefetchPolicy(this->settings->defaultPrefetchPolicy->clone()); |
| connection->setRedeliveryPolicy(this->settings->defaultRedeliveryPolicy->clone()); |
| connection->setMessagePrioritySupported(this->settings->messagePrioritySupported); |
| connection->setWatchTopicAdvisories(this->settings->watchTopicAdvisories); |
| connection->setCheckForDuplicates(this->settings->checkForDuplicates); |
| connection->setAuditDepth(this->settings->auditDepth); |
| connection->setAuditMaximumProducerNumber(this->settings->auditMaximumProducerNumber); |
| connection->setOptimizeAcknowledge(this->settings->optimizeAcknowledge); |
| connection->setOptimizeAcknowledgeTimeOut(this->settings->optimizeAcknowledgeTimeOut); |
| connection->setOptimizedAckScheduledAckInterval(this->settings->optimizedAckScheduledAckInterval); |
| connection->setSendAcksAsync(this->settings->sendAcksAsync); |
| connection->setExclusiveConsumer(this->settings->exclusiveConsumer); |
| connection->setTransactedIndividualAck(this->settings->transactedIndividualAck); |
| connection->setUseRetroactiveConsumer(this->settings->useRetroactiveConsumer); |
| connection->setNonBlockingRedelivery(this->settings->nonBlockingRedelivery); |
| connection->setConsumerFailoverRedeliveryWaitPeriod(this->settings->consumerFailoverRedeliveryWaitPeriod); |
| connection->setAlwaysSessionAsync(this->settings->alwaysSessionAsync); |
| connection->setConsumerExpiryCheckEnabled(this->settings->consumerExpiryCheckEnabled); |
| |
| if (this->settings->defaultListener) { |
| connection->setExceptionListener(this->settings->defaultListener); |
| } |
| |
| if (this->settings->defaultTransformer) { |
| connection->setMessageTransformer(this->settings->defaultTransformer); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setUsername(const std::string& username) { |
| settings->username = username; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| const std::string& ActiveMQConnectionFactory::getUsername() const { |
| return settings->username; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setPassword(const std::string& password) { |
| settings->password = password; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| const std::string& ActiveMQConnectionFactory::getPassword() const { |
| return settings->password; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| std::string ActiveMQConnectionFactory::getClientId() const { |
| return this->settings->clientId; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setClientId(const std::string& clientId) { |
| this->settings->clientId = clientId; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setBrokerURI(const std::string& uri) { |
| this->setBrokerURI(FactorySettings::createURI(uri)); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setBrokerURI(const decaf::net::URI& uri) { |
| synchronized(&this->settings->configLock) { |
| this->settings->updateConfiguration(uri); |
| } |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| const decaf::net::URI& ActiveMQConnectionFactory::getBrokerURI() const { |
| return settings->brokerURI; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setExceptionListener(cms::ExceptionListener* listener) { |
| this->settings->defaultListener = listener; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::ExceptionListener* ActiveMQConnectionFactory::getExceptionListener() const { |
| return this->settings->defaultListener; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setMessageTransformer(cms::MessageTransformer* transformer) { |
| this->settings->defaultTransformer = transformer; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| cms::MessageTransformer* ActiveMQConnectionFactory::getMessageTransformer() const { |
| return this->settings->defaultTransformer; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setPrefetchPolicy(PrefetchPolicy* policy) { |
| this->settings->defaultPrefetchPolicy.reset(policy); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| PrefetchPolicy* ActiveMQConnectionFactory::getPrefetchPolicy() const { |
| return this->settings->defaultPrefetchPolicy.get(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setRedeliveryPolicy(RedeliveryPolicy* policy) { |
| this->settings->defaultRedeliveryPolicy.reset(policy); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| RedeliveryPolicy* ActiveMQConnectionFactory::getRedeliveryPolicy() const { |
| return this->settings->defaultRedeliveryPolicy.get(); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isDispatchAsync() const { |
| return this->settings->dispatchAsync; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setDispatchAsync(bool value) { |
| this->settings->dispatchAsync = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isAlwaysSyncSend() const { |
| return this->settings->alwaysSyncSend; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setAlwaysSyncSend(bool value) { |
| this->settings->alwaysSyncSend = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isUseAsyncSend() const { |
| return this->settings->useAsyncSend; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setUseAsyncSend(bool value) { |
| this->settings->useAsyncSend = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isSendAcksAsync() const { |
| return this->settings->sendAcksAsync; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setSendAcksAsync(bool sendAcksAsync) { |
| this->settings->sendAcksAsync = sendAcksAsync; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isUseCompression() const { |
| return this->settings->useCompression; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setUseCompression(bool value) { |
| this->settings->useCompression = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| int ActiveMQConnectionFactory::getCompressionLevel() const { |
| return this->settings->compressionLevel; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setCompressionLevel(int value) { |
| |
| if (value < 0) { |
| this->settings->compressionLevel = -1; |
| } |
| |
| this->settings->compressionLevel = Math::min(value, 9); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| unsigned int ActiveMQConnectionFactory::getSendTimeout() const { |
| return this->settings->sendTimeout; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setSendTimeout(unsigned int timeout) { |
| this->settings->sendTimeout = timeout; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| unsigned int ActiveMQConnectionFactory::getCloseTimeout() const { |
| return this->settings->closeTimeout; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setCloseTimeout(unsigned int timeout) { |
| this->settings->closeTimeout = timeout; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| unsigned int ActiveMQConnectionFactory::getProducerWindowSize() const { |
| return this->settings->producerWindowSize; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setProducerWindowSize(unsigned int windowSize) { |
| this->settings->producerWindowSize = windowSize; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isMessagePrioritySupported() const { |
| return this->settings->messagePrioritySupported; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setMessagePrioritySupported(bool value) { |
| this->settings->messagePrioritySupported = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isWatchTopicAdvisories() const { |
| return this->settings->watchTopicAdvisories; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setWatchTopicAdvisories(bool value) { |
| this->settings->watchTopicAdvisories = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| int ActiveMQConnectionFactory::getAuditDepth() const { |
| return this->settings->auditDepth; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setAuditDepth(int auditDepth) { |
| this->settings->auditDepth = auditDepth; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| int ActiveMQConnectionFactory::getAuditMaximumProducerNumber() const { |
| return this->settings->auditMaximumProducerNumber; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { |
| this->settings->auditMaximumProducerNumber = auditMaximumProducerNumber; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isCheckForDuplicates() const { |
| return this->settings->checkForDuplicates; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setCheckForDuplicates(bool checkForDuplicates) { |
| this->settings->checkForDuplicates = checkForDuplicates; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isTransactedIndividualAck() const { |
| return this->settings->transactedIndividualAck; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setTransactedIndividualAck(bool transactedIndividualAck) { |
| this->settings->transactedIndividualAck = transactedIndividualAck; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isNonBlockingRedelivery() const { |
| return this->settings->nonBlockingRedelivery; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setNonBlockingRedelivery(bool nonBlockingRedelivery) { |
| this->settings->nonBlockingRedelivery = nonBlockingRedelivery; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isOptimizeAcknowledge() const { |
| return this->settings->optimizeAcknowledge; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setOptimizeAcknowledge(bool optimizeAcknowledge) { |
| this->settings->optimizeAcknowledge = optimizeAcknowledge; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| long long ActiveMQConnectionFactory::getOptimizeAcknowledgeTimeOut() const { |
| return this->settings->optimizeAcknowledgeTimeOut; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setOptimizeAcknowledgeTimeOut(long long optimizeAcknowledgeTimeOut) { |
| this->settings->optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| long long ActiveMQConnectionFactory::getOptimizedAckScheduledAckInterval() const { |
| return this->settings->optimizedAckScheduledAckInterval; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval) { |
| this->settings->optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| long long ActiveMQConnectionFactory::getConsumerFailoverRedeliveryWaitPeriod() const { |
| return this->settings->consumerFailoverRedeliveryWaitPeriod; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setConsumerFailoverRedeliveryWaitPeriod(long long value) { |
| this->settings->consumerFailoverRedeliveryWaitPeriod = value; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isUseRetroactiveConsumer() const { |
| return this->settings->useRetroactiveConsumer; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setUseRetroactiveConsumer(bool useRetroactiveConsumer) { |
| this->settings->useRetroactiveConsumer = useRetroactiveConsumer; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isExclusiveConsumer() const { |
| return this->settings->exclusiveConsumer; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setExclusiveConsumer(bool exclusiveConsumer) { |
| this->settings->exclusiveConsumer = exclusiveConsumer; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isAlwaysSessionAsync() const { |
| return this->settings->alwaysSessionAsync; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setAlwaysSessionAsync(bool alwaysSessionAsync) { |
| this->settings->alwaysSessionAsync = alwaysSessionAsync; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| bool ActiveMQConnectionFactory::isConsumerExpiryCheckEnabled() { |
| return this->settings->consumerExpiryCheckEnabled; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| void ActiveMQConnectionFactory::setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled) { |
| this->settings->consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; |
| } |