blob: 40b8cdef6ea54127d619e886f6f0e1756e629034 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ConnectionAudit.h"
#include <decaf/util/LinkedHashMap.h>
#include <decaf/util/StlMap.h>
#include <activemq/core/Dispatcher.h>
#include <activemq/core/ActiveMQMessageAudit.h>
#include <activemq/commands/ActiveMQDestination.h>
using namespace activemq;
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::commands;
using namespace activemq::exceptions;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
using namespace decaf::util;
using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
namespace activemq {
namespace core {
class ConnectionAuditImpl {
private:
ConnectionAuditImpl(const ConnectionAuditImpl&);
ConnectionAuditImpl& operator= (const ConnectionAuditImpl&);
public:
Mutex mutex;
StlMap<Pointer<ActiveMQDestination>, Pointer<ActiveMQMessageAudit>, ActiveMQDestination::COMPARATOR> destinations;
LinkedHashMap<Dispatcher*, Pointer<ActiveMQMessageAudit> > dispatchers;
ConnectionAuditImpl() : mutex(), destinations(), dispatchers(1000) {
}
};
}}
////////////////////////////////////////////////////////////////////////////////
ConnectionAudit::ConnectionAudit() : impl(new ConnectionAuditImpl),
checkForDuplicates(true),
auditDepth(ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE),
auditMaximumProducerNumber(ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT) {
}
////////////////////////////////////////////////////////////////////////////////
ConnectionAudit::ConnectionAudit(int auditDepth, int maxProducers) :
impl(new ConnectionAuditImpl),
checkForDuplicates(true),
auditDepth(auditDepth),
auditMaximumProducerNumber(maxProducers) {
}
////////////////////////////////////////////////////////////////////////////////
ConnectionAudit::~ConnectionAudit() {
try {
delete this->impl;
}
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void ConnectionAudit::removeDispatcher(Dispatcher* dispatcher) {
synchronized(&this->impl->mutex) {
this->impl->dispatchers.remove(dispatcher);
}
}
////////////////////////////////////////////////////////////////////////////////
bool ConnectionAudit::isDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message) {
synchronized(&this->impl->mutex) {
if (checkForDuplicates && message != NULL) {
Pointer<ActiveMQDestination> destination = message->getDestination();
if (destination != NULL) {
if (destination->isQueue()) {
Pointer<ActiveMQMessageAudit> audit;
try {
audit = this->impl->destinations.get(destination);
} catch (NoSuchElementException& ex) {
audit.reset(new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber));
this->impl->destinations.put(destination, audit);
}
bool result = audit->isDuplicate(message->getMessageId());
return result;
}
Pointer<ActiveMQMessageAudit> audit;
try {
audit = this->impl->dispatchers.get(dispatcher);
} catch (NoSuchElementException& ex) {
audit.reset(new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber));
this->impl->dispatchers.put(dispatcher, audit);
}
bool result = audit->isDuplicate(message->getMessageId());
return result;
}
}
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
void ConnectionAudit::rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message) {
synchronized(&this->impl->mutex) {
if (checkForDuplicates && message != NULL) {
Pointer<ActiveMQDestination> destination = message->getDestination();
if (destination != NULL) {
if (destination->isQueue()) {
try {
Pointer<ActiveMQMessageAudit> audit = this->impl->destinations.get(destination);
audit->rollback(message->getMessageId());
} catch (NoSuchElementException& ex) {}
} else {
try {
Pointer<ActiveMQMessageAudit> audit = this->impl->dispatchers.get(dispatcher);
audit->rollback(message->getMessageId());
} catch (NoSuchElementException& ex) {}
}
}
}
}
}