AMQCPP-643: Added connection.connectionResponseTimeout to CMS CPP client and updated testURIOptionsProcessing.
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
index 5858385..0f20349 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
@@ -185,6 +185,7 @@
bool alwaysSessionAsync;
int compressionLevel;
unsigned int sendTimeout;
+ unsigned int connectResponseTimeout;
unsigned int closeTimeout;
unsigned int producerWindowSize;
int auditDepth;
@@ -255,6 +256,7 @@
alwaysSessionAsync(true),
compressionLevel(-1),
sendTimeout(0),
+ connectResponseTimeout(0),
closeTimeout(15000),
producerWindowSize(0),
auditDepth(ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE),
@@ -318,6 +320,7 @@
void waitForBrokerInfo() {
this->brokerInfoReceived->await();
}
+
};
// Static init.
@@ -1354,7 +1357,7 @@
}
// Now we ping the broker and see if we get an ack / nack
- syncRequest(this->config->connectionInfo);
+ syncRequest(this->config->connectionInfo, this->config->connectResponseTimeout);
this->config->isConnectionInfoSentToBroker = true;
@@ -1588,6 +1591,16 @@
}
////////////////////////////////////////////////////////////////////////////////
+unsigned int ActiveMQConnection::getConnectResponseTimeout() const {
+ return this->config->connectResponseTimeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setConnectResponseTimeout(unsigned int connectResponseTimeout) {
+ this->config->connectResponseTimeout = connectResponseTimeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
unsigned int ActiveMQConnection::getCloseTimeout() const {
return this->config->closeTimeout;
}
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
index 2544862..3f0aa51 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
@@ -507,6 +507,20 @@
void setSendTimeout(unsigned int timeout);
/**
+ * Gets the assigned connect response timeout for this Connector
+ * @return the connect response timeout configured in the connection uri
+ */
+ unsigned int getConnectResponseTimeout() const;
+
+ /**
+ * Sets the connect response timeout to use when sending Message objects, this will
+ * protect clients using a Synchronous request in the case of the broker not responding
+ * or missing the brokers response.
+ * @param timeout - The time to wait for a connect response.
+ */
+ void setConnectResponseTimeout(unsigned int connectResponseTimeout);
+
+ /**
* Gets the assigned close timeout for this Connector
* @return the close timeout configured in the connection uri
*/
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
index dda3439..b4282e5 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
@@ -92,6 +92,7 @@
bool alwaysSessionAsync;
int compressionLevel;
unsigned int sendTimeout;
+ unsigned int connectResponseTimeout;
unsigned int closeTimeout;
unsigned int producerWindowSize;
int auditDepth;
@@ -128,6 +129,7 @@
alwaysSessionAsync(true),
compressionLevel(-1),
sendTimeout(0),
+ connectResponseTimeout(0),
closeTimeout(15000),
producerWindowSize(0),
auditDepth(ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE),
@@ -191,6 +193,9 @@
this->sendTimeout = decaf::lang::Integer::parseInt(
properties->getProperty(core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_SENDTIMEOUT), Integer::toString(sendTimeout)));
+ this->connectResponseTimeout = decaf::lang::Integer::parseInt(
+ properties->getProperty(core::ActiveMQConstants::toString(
+ core::ActiveMQConstants::CONNECTION_CONNECTRESPONSETIMEOUT), Integer::toString(connectResponseTimeout)));
this->closeTimeout = decaf::lang::Integer::parseInt(
properties->getProperty(core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_CLOSETIMEOUT), Integer::toString(closeTimeout)));
@@ -401,6 +406,7 @@
connection->setUseCompression(this->settings->useCompression);
connection->setCompressionLevel(this->settings->compressionLevel);
connection->setSendTimeout(this->settings->sendTimeout);
+ connection->setConnectResponseTimeout(this->settings->connectResponseTimeout);
connection->setCloseTimeout(this->settings->closeTimeout);
connection->setProducerWindowSize(this->settings->producerWindowSize);
connection->setPrefetchPolicy(this->settings->defaultPrefetchPolicy->clone());
@@ -594,6 +600,16 @@
}
////////////////////////////////////////////////////////////////////////////////
+unsigned int ActiveMQConnectionFactory::getConnectResponseTimeout() const {
+ return this->settings->connectResponseTimeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setConnectResponseTimeout(unsigned int connectResponseTimeout) {
+ this->settings->connectResponseTimeout = connectResponseTimeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
unsigned int ActiveMQConnectionFactory::getCloseTimeout() const {
return this->settings->closeTimeout;
}
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
index 6f4e21c..0848619 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
@@ -381,6 +381,20 @@
void setSendTimeout(unsigned int timeout);
/**
+ * Gets the assigned connect response timeout for this Connector
+ * @return the connect response timeout configured in the connection uri
+ */
+ unsigned int getConnectResponseTimeout() const;
+
+ /**
+ * Sets the connect response timeout to use when sending Message objects, this will
+ * protect clients using a Synchronous request in the case of the broker not responding
+ * or missing the brokers response.
+ * @param timeout - The time to wait for a connect response.
+ */
+ void setConnectResponseTimeout(unsigned int connectResponseTimeout);
+
+ /**
* Gets the assigned close timeout for this Connector
* @return the close timeout configured in the connection uri
*/
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConstants.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConstants.cpp
index 44da232..27e57e5 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConstants.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConstants.cpp
@@ -55,6 +55,7 @@
uriParams[PARAM_USERNAME] = "username";
uriParams[PARAM_PASSWORD] = "password";
uriParams[PARAM_CLIENTID] = "client-id";
+ uriParams[CONNECTION_CONNECTRESPONSETIMEOUT] = "connection.connectResponseTimeout";
for (int ix = 0; ix < NUM_OPTIONS; ++ix) {
destOptionMap[destOptions[ix]] = (DestinationOption) ix;
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h b/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
index f83c2d7..2aa2a73 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConstants.h
@@ -94,6 +94,7 @@
PARAM_USERNAME,
PARAM_PASSWORD,
PARAM_CLIENTID,
+ CONNECTION_CONNECTRESPONSETIMEOUT,
NUM_PARAMS
};
diff --git a/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp b/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
index e9fd8e8..ae1f901 100644
--- a/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
+++ b/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
@@ -306,7 +306,8 @@
"mock://127.0.0.1:23232?connection.dispatchAsync=true&"
"connection.alwaysSyncSend=true&connection.useAsyncSend=true&"
"connection.useCompression=true&connection.compressionLevel=7&"
- "connection.closeTimeout=10000";
+ "connection.closeTimeout=10000&"
+ "connection.connectResponseTimeout=2000";
ActiveMQConnectionFactory connectionFactory( URI );
@@ -316,6 +317,7 @@
CPPUNIT_ASSERT( connectionFactory.isUseCompression() == true );
CPPUNIT_ASSERT( connectionFactory.getCloseTimeout() == 10000 );
CPPUNIT_ASSERT( connectionFactory.getCompressionLevel() == 7 );
+ CPPUNIT_ASSERT( connectionFactory.getConnectResponseTimeout() == 2000 );
cms::Connection* connection =
connectionFactory.createConnection();
@@ -330,6 +332,7 @@
CPPUNIT_ASSERT( amqConnection->isUseCompression() == true );
CPPUNIT_ASSERT( amqConnection->getCloseTimeout() == 10000 );
CPPUNIT_ASSERT( amqConnection->getCompressionLevel() == 7 );
+ CPPUNIT_ASSERT( amqConnection->getConnectResponseTimeout() == 2000 );
delete connection;