blob: d07b92b141a3d09c415f36368971a79f49d83612 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.util.LinkedHashMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.LRUCache;
/**
* An auditor class for a Connection that looks for duplicates
*/
class ConnectionAudit {
private boolean checkForDuplicates;
private LinkedHashMap<ActiveMQDestination, ActiveMQMessageAudit> destinations = new LRUCache<ActiveMQDestination, ActiveMQMessageAudit>(1000);
private LinkedHashMap<ActiveMQDispatcher, ActiveMQMessageAudit> dispatchers = new LRUCache<ActiveMQDispatcher, ActiveMQMessageAudit>(1000);
private int auditDepth = ActiveMQMessageAudit.DEFAULT_WINDOW_SIZE;
private int auditMaximumProducerNumber = ActiveMQMessageAudit.MAXIMUM_PRODUCER_COUNT;
synchronized void removeDispatcher(ActiveMQDispatcher dispatcher) {
dispatchers.remove(dispatcher);
}
synchronized boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
if (checkForDuplicates && message != null) {
ActiveMQDestination destination = message.getDestination();
if (destination != null) {
if (destination.isQueue()) {
ActiveMQMessageAudit audit = destinations.get(destination);
if (audit == null) {
audit = new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber);
destinations.put(destination, audit);
}
boolean result = audit.isDuplicate(message);
return result;
}
ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
if (audit == null) {
audit = new ActiveMQMessageAudit(auditDepth, auditMaximumProducerNumber);
dispatchers.put(dispatcher, audit);
}
boolean result = audit.isDuplicate(message);
return result;
}
}
return false;
}
protected synchronized void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
if (checkForDuplicates && message != null) {
ActiveMQDestination destination = message.getDestination();
if (destination != null) {
if (destination.isQueue()) {
ActiveMQMessageAudit audit = destinations.get(destination);
if (audit != null) {
audit.rollback(message);
}
} else {
ActiveMQMessageAudit audit = dispatchers.get(dispatcher);
if (audit != null) {
audit.rollback(message);
}
}
}
}
}
/**
* @return the checkForDuplicates
*/
boolean isCheckForDuplicates() {
return this.checkForDuplicates;
}
/**
* @param checkForDuplicates the checkForDuplicates to set
*/
void setCheckForDuplicates(boolean checkForDuplicates) {
this.checkForDuplicates = checkForDuplicates;
}
public int getAuditDepth() {
return auditDepth;
}
public void setAuditDepth(int auditDepth) {
this.auditDepth = auditDepth;
}
public int getAuditMaximumProducerNumber() {
return auditMaximumProducerNumber;
}
public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
this.auditMaximumProducerNumber = auditMaximumProducerNumber;
}
}