https://issues.apache.org/jira/browse/AMQCPP-546
Make the method in this class thread safe
diff --git a/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp b/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
index b8e3c5e..40b8cde 100644
--- a/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
+++ b/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
@@ -90,30 +90,31 @@
////////////////////////////////////////////////////////////////////////////////
bool ConnectionAudit::isDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message) {
-
- if (checkForDuplicates && message != NULL) {
- Pointer<ActiveMQDestination> destination = message->getDestination();
- if (destination != NULL) {
- if (destination->isQueue()) {
+ synchronized(&this->impl->mutex) {
+ if (checkForDuplicates && message != NULL) {
+ Pointer<ActiveMQDestination> destination = message->getDestination();
+ if (destination != NULL) {
+ if (destination->isQueue()) {
+ Pointer<ActiveMQMessageAudit> audit;
+ try {
+ audit = this->impl->destinations.get(destination);
+ } catch (NoSuchElementException& ex) {
+ audit.reset(new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber));
+ this->impl->destinations.put(destination, audit);
+ }
+ bool result = audit->isDuplicate(message->getMessageId());
+ return result;
+ }
Pointer<ActiveMQMessageAudit> audit;
try {
- audit = this->impl->destinations.get(destination);
+ audit = this->impl->dispatchers.get(dispatcher);
} catch (NoSuchElementException& ex) {
audit.reset(new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber));
- this->impl->destinations.put(destination, audit);
+ this->impl->dispatchers.put(dispatcher, audit);
}
bool result = audit->isDuplicate(message->getMessageId());
return result;
}
- Pointer<ActiveMQMessageAudit> audit;
- try {
- audit = this->impl->dispatchers.get(dispatcher);
- } catch (NoSuchElementException& ex) {
- audit.reset(new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber));
- this->impl->dispatchers.put(dispatcher, audit);
- }
- bool result = audit->isDuplicate(message->getMessageId());
- return result;
}
}
return false;
@@ -121,19 +122,21 @@
////////////////////////////////////////////////////////////////////////////////
void ConnectionAudit::rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message) {
- if (checkForDuplicates && message != NULL) {
- Pointer<ActiveMQDestination> destination = message->getDestination();
- if (destination != NULL) {
- if (destination->isQueue()) {
- try {
- Pointer<ActiveMQMessageAudit> audit = this->impl->destinations.get(destination);
- audit->rollback(message->getMessageId());
- } catch (NoSuchElementException& ex) {}
- } else {
- try {
- Pointer<ActiveMQMessageAudit> audit = this->impl->dispatchers.get(dispatcher);
- audit->rollback(message->getMessageId());
- } catch (NoSuchElementException& ex) {}
+ synchronized(&this->impl->mutex) {
+ if (checkForDuplicates && message != NULL) {
+ Pointer<ActiveMQDestination> destination = message->getDestination();
+ if (destination != NULL) {
+ if (destination->isQueue()) {
+ try {
+ Pointer<ActiveMQMessageAudit> audit = this->impl->destinations.get(destination);
+ audit->rollback(message->getMessageId());
+ } catch (NoSuchElementException& ex) {}
+ } else {
+ try {
+ Pointer<ActiveMQMessageAudit> audit = this->impl->dispatchers.get(dispatcher);
+ audit->rollback(message->getMessageId());
+ } catch (NoSuchElementException& ex) {}
+ }
}
}
}