https://issues.apache.org/jira/browse/AMQCPP-553
Fix a leak that occurs when consumers are closed but they leave some
audit data in the ConnectionAudit instance..
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
index 914b2c2..6bfed72 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
@@ -1909,6 +1909,11 @@
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::removeAuditedDispatcher(Dispatcher* dispatcher) {
+ this->config->connectionAudit.removeDispatcher(dispatcher);
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message) {
this->config->connectionAudit.rollbackDuplicate(dispatcher, message);
}
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
index 12fd1d4..727ca69 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
@@ -260,6 +260,14 @@
*/
void rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message);
+ /**
+ * Removes the Audit information stored for a given MessageConsumer
+ *
+ * @param dispatcher
+ * The Dispatcher instance that has received the Message.
+ */
+ void removeAuditedDispatcher(Dispatcher* dispatcher);
+
public: // Connection Interface Methods
/**
diff --git a/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp b/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
index 40b8cde..d2af6dc 100644
--- a/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
+++ b/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
@@ -84,7 +84,10 @@
////////////////////////////////////////////////////////////////////////////////
void ConnectionAudit::removeDispatcher(Dispatcher* dispatcher) {
synchronized(&this->impl->mutex) {
- this->impl->dispatchers.remove(dispatcher);
+ try {
+ this->impl->dispatchers.remove(dispatcher);
+ } catch (NoSuchElementException& ex) {
+ }
}
}
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
index 949d51b..aaf0a38 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
@@ -656,7 +656,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-cms::MessageProducer* ActiveMQSessionKernel::createProducer( const cms::Destination* destination ) {
+cms::MessageProducer* ActiveMQSessionKernel::createProducer(const cms::Destination* destination ) {
try {
@@ -702,7 +702,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-cms::QueueBrowser* ActiveMQSessionKernel::createBrowser( const cms::Queue* queue ) {
+cms::QueueBrowser* ActiveMQSessionKernel::createBrowser(const cms::Queue* queue ) {
try {
return ActiveMQSessionKernel::createBrowser(queue, "");
@@ -1271,6 +1271,7 @@
this->config->consumerLock.writeLock().lock();
try {
this->config->consumers.remove(consumer);
+ this->connection->removeAuditedDispatcher(consumer.get());
this->config->consumerLock.writeLock().unlock();
} catch (Exception& ex) {
this->config->consumerLock.writeLock().unlock();