Doing release candidate 4
git-svn-id: https://svn.apache.org/repos/asf/activemq/tags/activemq-5.0.0@602137 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/activemq-core/pom.xml b/activemq-core/pom.xml
index d787adb..c031ed6 100755
--- a/activemq-core/pom.xml
+++ b/activemq-core/pom.xml
@@ -37,6 +37,7 @@
<activemq.osgi.import.pkg>
javax.jmdns*;resolution:=optional,
org.apache.activeio*;resolution:=optional,
+ org.apache.camel*;resolution:=optional,
org.apache.commons.pool*;resolution:=optional,
org.apache.derby*;resolution:=optional,
org.apache.tools.ant*;resolution:=optional,
@@ -403,6 +404,11 @@
<exclude>**/nio/**</exclude>
<!-- A test used for memory profiling only. -->
<exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude>
+
+ <exclude>**/NetworkConnectionsCleanedupTest.*/**</exclude>
+
+ <exclude>**/amq1490/*</exclude>
+ <exclude>**/archive/*</exclude>
<exclude>**/AMQDeadlockTest3.*</exclude>
</excludes>
diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
index faf39bc..eba6818 100755
--- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
+++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageAudit.java
@@ -33,8 +33,8 @@
*/
public class ActiveMQMessageAudit {
- private static final int DEFAULT_WINDOW_SIZE = 1024;
- private static final int MAXIMUM_PRODUCER_COUNT = 128;
+ private static final int DEFAULT_WINDOW_SIZE = 2048;
+ private static final int MAXIMUM_PRODUCER_COUNT = 64;
private int auditDepth;
private int maximumNumberOfProducersToTrack;
private LRUCache<Object, BitArrayBin> map;
@@ -220,23 +220,33 @@
/**
* Check the MessageId is in order
+ * @param message
+ * @return
+ */
+ public synchronized boolean isInOrder(final MessageReference message) {
+ return isInOrder(message.getMessageId());
+ }
+
+ /**
+ * Check the MessageId is in order
* @param id
* @return
*/
public synchronized boolean isInOrder(final MessageId id) {
- boolean answer = true;
-
+ boolean answer = false;
+
if (id != null) {
ProducerId pid = id.getProducerId();
if (pid != null) {
BitArrayBin bab = map.get(pid);
- if (bab != null) {
- answer = bab.isInOrder(id.getProducerSequenceId());
+ if (bab == null) {
+ bab = new BitArrayBin(auditDepth);
+ map.put(pid, bab);
}
-
+ answer = bab.isInOrder(id.getProducerSequenceId());
+
}
}
return answer;
}
-
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
index 0e4477a..83b5a65 100644
--- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -113,7 +113,7 @@
private TaskRunnerFactory persistenceTaskRunnerFactory;
private SystemUsage systemUsage;
private SystemUsage producerSystemUsage;
- private SystemUsage consumerSystemUsage;
+ private SystemUsage consumerSystemUsaage;
private PersistenceAdapter persistenceAdapter;
private PersistenceAdapterFactory persistenceFactory;
private DestinationFactory destinationFactory;
@@ -668,23 +668,23 @@
* @throws IOException
*/
public SystemUsage getConsumerSystemUsage() throws IOException {
- if (consumerSystemUsage == null) {
- consumerSystemUsage = new SystemUsage(getSystemUsage(), "Consumer");
- consumerSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
- addService(consumerSystemUsage);
+ if (this.consumerSystemUsaage == null) {
+ this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
+ this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(0.5f);
+ addService(this.consumerSystemUsaage);
}
- return consumerSystemUsage;
+ return this.consumerSystemUsaage;
}
/**
- * @param consumerUsageManager the consumerUsageManager to set
+ * @param consumerSystemUsaage the storeSystemUsage to set
*/
- public void setConsumerSystemUsage(SystemUsage consumerUsageManager) {
- if (this.consumerSystemUsage != null) {
- removeService(this.consumerSystemUsage);
+ public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
+ if (this.consumerSystemUsaage != null) {
+ removeService(this.consumerSystemUsaage);
}
- this.consumerSystemUsage = consumerUsageManager;
- addService(this.producerSystemUsage);
+ this.consumerSystemUsaage = consumerSystemUsaage;
+ addService(this.consumerSystemUsaage);
}
/**
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index 539d9b4..3917e47 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -56,7 +56,7 @@
protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
protected final DestinationMap destinationMap = new DestinationMap();
protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
- protected final SystemUsage memoryManager;
+ protected final SystemUsage usageManager;
protected final DestinationFactory destinationFactory;
protected final DestinationStatistics destinationStatistics;
protected final RegionBroker broker;
@@ -73,7 +73,7 @@
}
this.broker = broker;
this.destinationStatistics = destinationStatistics;
- this.memoryManager = memoryManager;
+ this.usageManager = memoryManager;
this.taskRunnerFactory = taskRunnerFactory;
if (broker == null) {
throw new IllegalArgumentException("null destinationFactory");
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 673aad1..a9ac0b7 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -17,7 +17,9 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker;
@@ -38,19 +40,18 @@
public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener {
- private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
+ private static final Log LOG = LogFactory.getLog(DurableTopicSubscription.class);
private final ConcurrentHashMap<MessageId, Integer> redeliveredMessages = new ConcurrentHashMap<MessageId, Integer>();
private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final SubscriptionKey subscriptionKey;
private final boolean keepDurableSubsActive;
- private final SystemUsage usageManager;
private boolean active;
public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws InvalidSelectorException {
- super(broker, context, info);
+ super(broker,usageManager, context, info);
this.pending = new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize(), this);
- this.usageManager = usageManager;
+ this.pending.setSystemUsage(usageManager);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
}
@@ -119,9 +120,8 @@
topic.deactivate(context, this);
}
}
- for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
+ for (final MessageReference node : dispatched) {
// Mark the dispatched messages as redelivered for next time.
- MessageReference node = (MessageReference)iter.next();
Integer count = redeliveredMessages.get(node.getMessageId());
if (count != null) {
redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
@@ -135,8 +135,8 @@
} else {
node.decrementReferenceCount();
}
- iter.remove();
}
+ dispatched.clear();
if (!keepDurableSubsActive && pending.isTransient()) {
synchronized (pending) {
try {
@@ -191,7 +191,7 @@
return active;
}
- protected synchronized void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
+ protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
node.getRegionDestination().acknowledge(context, this, ack, node);
redeliveredMessages.remove(node.getMessageId());
node.decrementReferenceCount();
@@ -238,7 +238,7 @@
}
/**
- * @param memoryManager
+ * @param usageManager
* @param oldPercentUsage
* @param newPercentUsage
* @see org.apache.activemq.usage.UsageListener#onMemoryUseChanged(org.apache.activemq.usage.SystemUsage,
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 8cc2761..8fcd668 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -17,12 +17,15 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
+import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@@ -38,6 +41,7 @@
import org.apache.activemq.command.Response;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -50,19 +54,25 @@
private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
protected PendingMessageCursor pending;
- protected final LinkedList<MessageReference> dispatched = new LinkedList<MessageReference>();
+ protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
protected int prefetchExtension;
protected long enqueueCounter;
protected long dispatchCounter;
protected long dequeueCounter;
+ protected boolean optimizedDispatch=true;
+ private int maxProducersToAudit=32;
+ private int maxAuditDepth=2048;
+ protected final SystemUsage usageManager;
+ protected ActiveMQMessageAudit audit = new ActiveMQMessageAudit();
- public PrefetchSubscription(Broker broker, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
+ public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
super(broker, context, info);
+ this.usageManager=usageManager;
pending = cursor;
}
- public PrefetchSubscription(Broker broker, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
- this(broker, context, info, new VMPendingMessageCursor());
+ public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+ this(broker,usageManager,context, info, new VMPendingMessageCursor());
}
/**
@@ -118,8 +128,7 @@
boolean pendingEmpty = false;
pendingEmpty = pending.isEmpty();
enqueueCounter++;
-
- if (!isFull() && pendingEmpty && !isSlave()) {
+ if (optimizedDispatch && !isFull() && pendingEmpty && !isSlave()) {
dispatch(node);
} else {
optimizePrefetch();
@@ -128,6 +137,7 @@
LOG.debug("Prefetch limit.");
}
pending.addMessageLast(node);
+ dispatchMatched();
}
}
}
@@ -140,7 +150,7 @@
if (node.getMessageId().equals(mdn.getMessageId())) {
pending.remove();
createMessageDispatch(node, node.getMessage());
- dispatched.addLast(node);
+ dispatched.add(node);
return;
}
}
@@ -150,7 +160,8 @@
throw new JMSException("Slave broker out of sync with master: Dispatched message (" + mdn.getMessageId() + ") was not in the pending list");
}
- public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
+ public synchronized void acknowledge(final ConnectionContext context,
+ final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case.
boolean callDispatchMatched = false;
if (ack.isStandardAck()) {
@@ -158,36 +169,42 @@
// acknowledgment.
int index = 0;
boolean inAckRange = false;
- for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();) {
- final MessageReference node = iter.next();
+ List<MessageReference> removeList = new ArrayList<MessageReference>();
+ for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
- if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
+ if (ack.getFirstMessageId() == null
+ || ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
// Don't remove the nodes until we are committed.
if (!context.isInTransaction()) {
dequeueCounter++;
- node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
- iter.remove();
+ node.getRegionDestination().getDestinationStatistics()
+ .getDequeues().increment();
+ removeList.add(node);
} else {
// setup a Synchronization to remove nodes from the
// dispatched list.
- context.getTransaction().addSynchronization(new Synchronization() {
+ context.getTransaction().addSynchronization(
+ new Synchronization() {
- public void afterCommit() throws Exception {
- synchronized (PrefetchSubscription.this) {
- dequeueCounter++;
- dispatched.remove(node);
- node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
- prefetchExtension--;
- }
- }
+ public void afterCommit() throws Exception {
+ synchronized (PrefetchSubscription.this) {
+ dequeueCounter++;
+ dispatched.remove(node);
+ node.getRegionDestination()
+ .getDestinationStatistics()
+ .getDequeues().increment();
+ prefetchExtension--;
+ }
+ }
- public void afterRollback() throws Exception {
- super.afterRollback();
- }
- });
+ public void afterRollback()
+ throws Exception {
+ super.afterRollback();
+ }
+ });
}
index++;
acknowledge(context, ack, node);
@@ -196,21 +213,28 @@
// extend prefetch window only if not a pulling
// consumer
if (getPrefetchSize() != 0) {
- prefetchExtension = Math.max(prefetchExtension, index + 1);
+ prefetchExtension = Math.max(prefetchExtension,
+ index + 1);
}
} else {
- prefetchExtension = Math.max(0, prefetchExtension - (index + 1));
+ prefetchExtension = Math.max(0, prefetchExtension
+ - (index + 1));
}
callDispatchMatched = true;
break;
}
}
}
+ for (final MessageReference node : removeList) {
+ dispatched.remove(node);
+ }
// this only happens after a reconnect - get an ack which is not
// valid
if (!callDispatchMatched) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Could not correlate acknowledgment with dispatched message: " + ack);
+ LOG
+ .debug("Could not correlate acknowledgment with dispatched message: "
+ + ack);
}
}
} else if (ack.isDeliveredAck()) {
@@ -219,7 +243,8 @@
// Acknowledge all dispatched messages up till the message id of the
// acknowledgment.
int index = 0;
- for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
+ for (Iterator<MessageReference> iter = dispatched.iterator(); iter
+ .hasNext(); index++) {
final MessageReference node = iter.next();
if (ack.getLastMessageId().equals(node.getMessageId())) {
prefetchExtension = Math.max(prefetchExtension, index + 1);
@@ -228,17 +253,20 @@
}
}
if (!callDispatchMatched) {
- throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack);
+ throw new JMSException(
+ "Could not correlate acknowledgment with dispatched message: "
+ + ack);
}
- } else if (ack.isRedeliveredAck() ) {
- // Message was re-delivered but it was not yet considered to be a DLQ message.
+ } else if (ack.isRedeliveredAck()) {
+ // Message was re-delivered but it was not yet considered to be a
+ // DLQ message.
// Acknowledge all dispatched messages up till the message id of the
// acknowledgment.
boolean inAckRange = false;
- for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();) {
- final MessageReference node = iter.next();
+ for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
- if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
+ if (ack.getFirstMessageId() == null
+ || ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
@@ -250,49 +278,65 @@
}
}
if (!callDispatchMatched) {
- throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack);
+ throw new JMSException(
+ "Could not correlate acknowledgment with dispatched message: "
+ + ack);
}
} else if (ack.isPoisonAck()) {
// TODO: what if the message is already in a DLQ???
// Handle the poison ACK case: we need to send the message to a DLQ
if (ack.isInTransaction()) {
- throw new JMSException("Poison ack cannot be transacted: " + ack);
+ throw new JMSException("Poison ack cannot be transacted: "
+ + ack);
}
// Acknowledge all dispatched messages up till the message id of the
// acknowledgment.
int index = 0;
boolean inAckRange = false;
- for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();) {
- final MessageReference node = iter.next();
+ List<MessageReference> removeList = new ArrayList<MessageReference>();
+ for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
- if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
+ if (ack.getFirstMessageId() == null
+ || ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
sendToDLQ(context, node);
- node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
- iter.remove();
+ node.getRegionDestination().getDestinationStatistics()
+ .getDequeues().increment();
+ removeList.add(node);
dequeueCounter++;
index++;
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
- prefetchExtension = Math.max(0, prefetchExtension - (index + 1));
+ prefetchExtension = Math.max(0, prefetchExtension
+ - (index + 1));
callDispatchMatched = true;
break;
}
}
}
+ for (final MessageReference node : removeList) {
+ dispatched.remove(node);
+ }
if (!callDispatchMatched) {
- throw new JMSException("Could not correlate acknowledgment with dispatched message: " + ack);
+ throw new JMSException(
+ "Could not correlate acknowledgment with dispatched message: "
+ + ack);
}
}
if (callDispatchMatched) {
dispatchMatched();
} else {
if (isSlave()) {
- throw new JMSException("Slave broker out of sync with master: Acknowledgment (" + ack + ") was not in the dispatch list: " + dispatched);
+ throw new JMSException(
+ "Slave broker out of sync with master: Acknowledgment ("
+ + ack + ") was not in the dispatch list: "
+ + dispatched);
} else {
- LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): " + ack);
+ LOG
+ .debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
+ + ack);
}
}
}
@@ -364,6 +408,9 @@
public synchronized void setPending(PendingMessageCursor pending) {
this.pending = pending;
+ if (this.pending!=null) {
+ this.pending.setSystemUsage(usageManager);
+ }
}
/**
@@ -439,7 +486,10 @@
// NULL messages don't count... they don't get Acked.
if (node != QueueMessageReference.NULL_MESSAGE) {
dispatchCounter++;
- dispatched.addLast(node);
+ dispatched.add(node);
+ if(pending != null) {
+ pending.dispatched(message);
+ }
} else {
prefetchExtension = Math.max(0, prefetchExtension - 1);
}
@@ -459,8 +509,6 @@
context.getConnection().dispatchSync(md);
onDispatch(node, message);
}
- // System.err.println(broker.getBrokerName() + " " + this + " (" +
- // enqueueCounter + ", " + dispatchCounter +") " + node);
return true;
} else {
return false;
@@ -536,4 +584,28 @@
protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException {
}
+ public boolean isOptimizedDispatch() {
+ return optimizedDispatch;
+ }
+
+ public void setOptimizedDispatch(boolean optimizedDispatch) {
+ this.optimizedDispatch = optimizedDispatch;
+ }
+
+ public int getMaxProducersToAudit() {
+ return maxProducersToAudit;
+ }
+
+ public void setMaxProducersToAudit(int maxProducersToAudit) {
+ this.maxProducersToAudit = maxProducersToAudit;
+ }
+
+ public int getMaxAuditDepth() {
+ return maxAuditDepth;
+ }
+
+ public void setMaxAuditDepth(int maxAuditDepth) {
+ this.maxAuditDepth = maxAuditDepth;
+ }
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
index 2d4e2c7..c7b3bbe 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -96,7 +96,7 @@
private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
private final MessageEvaluationContext queueMsgConext = new MessageEvaluationContext();
private final Object exclusiveLockMutex = new Object();
- private TaskRunner taskRunner;
+ private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
@@ -449,7 +449,11 @@
final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this);
if (store != null && message.isPersistent()) {
- systemUsage.getStoreUsage().waitForSpace();
+ while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
+ if (context.getStopping().get()) {
+ throw new IOException("Connection closed, send aborted.");
+ }
+ }
store.addMessage(context, message);
}
if (context.isInTransaction()) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
index b0bf3d9..9dc76c5 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
@@ -25,14 +25,15 @@
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.usage.SystemUsage;
public class QueueBrowserSubscription extends QueueSubscription {
boolean browseDone;
- public QueueBrowserSubscription(Broker broker, ConnectionContext context, ConsumerInfo info)
+ public QueueBrowserSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
- super(broker, context, info);
+ super(broker,usageManager, context, info);
}
protected boolean canDispatch(MessageReference node) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
index a555f7b..41143b1 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
@@ -41,15 +41,15 @@
public String toString() {
return "QueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size()
- + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
+ + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
if (info.isBrowser()) {
- return new QueueBrowserSubscription(broker, context, info);
+ return new QueueBrowserSubscription(broker,usageManager, context, info);
} else {
- return new QueueSubscription(broker, context, info);
+ return new QueueSubscription(broker, usageManager,context, info);
}
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
index 8f22505..425a8db 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
@@ -28,6 +28,7 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,8 +36,8 @@
private static final Log LOG = LogFactory.getLog(QueueSubscription.class);
- public QueueSubscription(Broker broker, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
- super(broker, context, info);
+ public QueueSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
+ super(broker,usageManager, context, info);
}
/**
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
index 8ff5a45..1374930 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
@@ -41,7 +41,7 @@
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination;
- return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory, null) {
+ return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory, null) {
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
@@ -58,14 +58,14 @@
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
if (info.isBrowser()) {
- return new QueueBrowserSubscription(broker, context, info);
+ return new QueueBrowserSubscription(broker,usageManager,context, info);
} else {
- return new QueueSubscription(broker, context, info);
+ return new QueueSubscription(broker, usageManager,context, info);
}
}
public String toString() {
- return "TempQueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
+ return "TempQueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
}
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
index 3758233..b077ff4 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
@@ -47,13 +47,13 @@
throw new JMSException("A durable subscription cannot be created for a temporary topic.");
}
try {
- TopicSubscription answer = new TopicSubscription(broker, context, info, memoryManager);
+ TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
// lets configure the subscription depending on the destination
ActiveMQDestination destination = info.getDestination();
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
- entry.configure(broker, memoryManager, answer);
+ entry.configure(broker, usageManager, answer);
}
}
answer.init();
@@ -67,7 +67,7 @@
}
public String toString() {
- return "TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
+ return "TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
}
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
index 1c7082f..f4b1611 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -50,6 +50,8 @@
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
@@ -65,7 +67,7 @@
*
* @version $Revision: 1.21 $
*/
-public class Topic extends BaseDestination {
+public class Topic extends BaseDestination implements Task{
private static final Log LOG = LogFactory.getLog(Topic.class);
protected final ActiveMQDestination destination;
protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
@@ -81,28 +83,20 @@
private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
-
+ private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
public void run() {
-
- // We may need to do this in async thread since this is run for
- // within a synchronization
- // that the UsageManager is holding.
-
- synchronized (messagesWaitingForSpace) {
- while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
- Runnable op = messagesWaitingForSpace.removeFirst();
- op.run();
+ try {
+ Topic.this.taskRunner.wakeup();
+ } catch (InterruptedException e) {
}
- }
-
};
};
private final Broker broker;
public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, SystemUsage systemUsage, DestinationStatistics parentStats,
- TaskRunnerFactory taskFactory) {
+ TaskRunnerFactory taskFactory) throws Exception {
this.broker = broker;
this.destination = destination;
this.store = store; // this could be NULL! (If an advisory)
@@ -115,7 +109,8 @@
}else{
//set the default
subscriptionRecoveryPolicy= new FixedSizedSubscriptionRecoveryPolicy();
- }
+ }
+ this.taskRunner = taskFactory.createTaskRunner(this, "Topic " + destination.getPhysicalName());
// Let the store know what usage manager we are using so that he can
// flush messages to disk
// when usage gets high.
@@ -388,41 +383,53 @@
* @throws IOException
* @throws Exception
*/
- synchronized void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, Exception {
- final ConnectionContext context = producerExchange.getConnectionContext();
+ synchronized void doMessageSend(
+ final ProducerBrokerExchange producerExchange, final Message message)
+ throws IOException, Exception {
+ final ConnectionContext context = producerExchange
+ .getConnectionContext();
message.setRegionDestination(this);
- if (store != null && message.isPersistent() && !canOptimizeOutPersistence()) {
- systemUsage.getStoreUsage().waitForSpace();
+ if (store != null && message.isPersistent()
+ && !canOptimizeOutPersistence()) {
+ while (!systemUsage.getStoreUsage().waitForSpace(1000)) {
+ if (context.getStopping().get()) {
+ throw new IOException("Connection closed, send aborted.");
+ }
+ }
store.addMessage(context, message);
}
message.incrementReferenceCount();
- try {
- if (context.isInTransaction()) {
- context.getTransaction().addSynchronization(new Synchronization() {
- public void afterCommit() throws Exception {
- // It could take while before we receive the commit
- // operration.. by that time the message could have
- // expired..
- if (broker.isExpired(message)) {
- broker.messageExpired(context, message);
- message.decrementReferenceCount();
- destinationStatistics.getMessages().decrement();
- return;
- }
- dispatch(context, message);
+ if (context.isInTransaction()) {
+ context.getTransaction().addSynchronization(new Synchronization() {
+ public void afterCommit() throws Exception {
+ // It could take while before we receive the commit
+ // operration.. by that time the message could have
+ // expired..
+ if (broker.isExpired(message)) {
+ broker.messageExpired(context, message);
+ message.decrementReferenceCount();
+ destinationStatistics.getMessages().decrement();
+ return;
}
- });
+ try {
+ dispatch(context, message);
+ } finally {
+ message.decrementReferenceCount();
+ }
+ }
+ });
- } else {
+ } else {
+ try {
dispatch(context, message);
+ } finally {
+ message.decrementReferenceCount();
}
-
- } finally {
- message.decrementReferenceCount();
}
+
}
private boolean canOptimizeOutPersistence() {
@@ -463,6 +470,9 @@
}
public void stop() throws Exception {
+ if (taskRunner != null) {
+ taskRunner.shutdown();
+ }
this.subscriptionRecoveryPolicy.stop();
if (memoryUsage != null) {
memoryUsage.stop();
@@ -499,6 +509,15 @@
}
return result.toArray(new Message[result.size()]);
}
+
+ public boolean iterate() {
+ while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
+ Runnable op = messagesWaitingForSpace.removeFirst();
+ op.run();
+ }
+ return false;
+ }
+
// Properties
// -------------------------------------------------------------------------
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
index f8f5cc0..59b8813 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
@@ -100,7 +100,7 @@
+ " subscriberName: " + key.getSubscriptionName());
}
}
- sub.activate(memoryManager, context, info);
+ sub.activate(usageManager, context, info);
return sub;
} else {
return super.addConsumer(context, info);
@@ -140,7 +140,7 @@
}
public String toString() {
- return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
+ return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + usageManager.getMemoryUsage().getPercentUsage() + "%";
}
@Override
@@ -230,12 +230,12 @@
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
DurableTopicSubscription sub = durableSubscriptions.get(key);
if (sub == null) {
- sub = new DurableTopicSubscription(broker, memoryManager, context, info, keepDurableSubsActive);
+ sub = new DurableTopicSubscription(broker, usageManager, context, info, keepDurableSubsActive);
ActiveMQDestination destination = info.getDestination();
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
- entry.configure(broker, memoryManager, sub);
+ entry.configure(broker, usageManager, sub);
}
}
durableSubscriptions.put(key, sub);
@@ -245,13 +245,13 @@
return sub;
}
try {
- TopicSubscription answer = new TopicSubscription(broker, context, info, memoryManager);
+ TopicSubscription answer = new TopicSubscription(broker, context, info, usageManager);
// lets configure the subscription depending on the destination
ActiveMQDestination destination = info.getDestination();
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
- entry.configure(broker, memoryManager, answer);
+ entry.configure(broker, usageManager, answer);
}
}
answer.init();
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
index 80c4976..7395439 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
@@ -50,6 +50,7 @@
public synchronized void stop() throws Exception {
started=false;
+ audit=null;
gc();
}
@@ -238,6 +239,13 @@
public boolean isTransient() {
return false;
}
+
+ /**
+ * Mark a message as already dispatched
+ * @param message
+ */
+ public void dispatched(MessageReference message) {
+ }
protected synchronized boolean isDuplicate(MessageId messageId) {
@@ -246,7 +254,12 @@
}
return this.audit.isDuplicate(messageId);
}
-
-
+
+ protected synchronized void rollback(MessageId id) {
+ if (this.audit != null) {
+ audit.rollback(id);
+ }
+ }
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
index 0921071..7bcb9c1 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
@@ -142,6 +142,7 @@
for (Iterator<MessageReference> i = getDiskList().iterator(); i.hasNext() && count < maxItems;) {
Message message = (Message)i.next();
message.setRegionDestination(regionDestination);
+ message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
result.add(message);
count++;
@@ -210,6 +211,7 @@
if (!isDiskListEmpty()) {
// got from disk
message.setRegionDestination(regionDestination);
+ message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
}
return message;
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
index 8ff3f84..13e3fad 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
@@ -247,6 +247,12 @@
* disappears when the broker shuts down
*/
public boolean isTransient();
+
+ /**
+ * Mark a message as already dispatched
+ * @param message
+ */
+ public void dispatched(MessageReference message);
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index 4f66292..56f597b 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -119,6 +119,7 @@
Message result = batchList.removeFirst();
result.decrementReferenceCount();
result.setRegionDestination(regionDestination);
+ result.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
return result;
}
@@ -133,6 +134,7 @@
throws Exception {
if (!isDuplicate(message.getMessageId())) {
message.setRegionDestination(regionDestination);
+ message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
message.incrementReferenceCount();
batchList.addLast(message);
} else {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
index 13396ba..f5d38d6 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
@@ -288,6 +288,20 @@
nonPersistent.setEnableAudit(enableAudit);
}
}
+
+ /**
+ * Mark a message as already dispatched
+ * @param message
+ */
+ public void dispatched(MessageReference message) {
+ super.dispatched(message);
+ for (PendingMessageCursor cursor : storePrefetches) {
+ cursor.dispatched(message);
+ }
+ if (nonPersistent != null) {
+ nonPersistent.dispatched(message);
+ }
+ }
protected synchronized PendingMessageCursor getNextCursor() throws Exception {
if (currentCursor == null || currentCursor.isEmpty()) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
index a281c12..495b8ae 100755
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
@@ -18,7 +18,9 @@
import java.io.IOException;
import java.util.Iterator;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
@@ -28,6 +30,9 @@
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.usage.Usage;
+import org.apache.activemq.usage.UsageListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,21 +42,19 @@
*
* @version $Revision$
*/
-class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener {
+class TopicStorePrefetch extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener {
private static final Log LOG = LogFactory.getLog(TopicStorePrefetch.class);
private TopicMessageStore store;
- private final LinkedList<Message> batchList = new LinkedList<Message>();
+ private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
private String clientId;
private String subscriberName;
private Destination regionDestination;
- private MessageId firstMessageId;
- private MessageId lastMessageId;
private boolean batchResetNeeded = true;
private boolean storeMayHaveMoreMessages = true;
private boolean started;
private final Subscription subscription;
-
+
/**
* @param topic
* @param clientId
@@ -63,12 +66,15 @@
this.store = (TopicMessageStore)topic.getMessageStore();
this.clientId = clientId;
this.subscriberName = subscriberName;
+ this.maxProducersToAudit=32;
+ this.maxAuditDepth=10000;
}
public synchronized void start() throws Exception {
if (!started) {
started = true;
super.start();
+ getSystemUsage().getMemoryUsage().addUsageListener(this);
safeFillBatch();
}
}
@@ -76,6 +82,7 @@
public synchronized void stop() throws Exception {
if (started) {
started = false;
+ getSystemUsage().getMemoryUsage().removeUsageListener(this);
super.stop();
store.resetBatching(clientId, subscriberName);
gc();
@@ -97,22 +104,16 @@
public synchronized void addMessageLast(MessageReference node) throws Exception {
if (node != null) {
- if (isEmpty() && started) {
- firstMessageId = node.getMessageId();
- }
- lastMessageId = node.getMessageId();
- node.decrementReferenceCount();
storeMayHaveMoreMessages=true;
+ node.decrementReferenceCount();
}
}
public synchronized void addMessageFirst(MessageReference node) throws Exception {
if (node != null) {
- if (started) {
- firstMessageId = node.getMessageId();
- }
- node.decrementReferenceCount();
storeMayHaveMoreMessages=true;
+ node.decrementReferenceCount();
+ rollback(node.getMessageId());
}
}
@@ -127,7 +128,8 @@
}
public synchronized boolean hasNext() {
- return !isEmpty();
+ boolean result = !isEmpty();
+ return result;
}
public synchronized MessageReference next() {
@@ -136,13 +138,11 @@
if (batchList.isEmpty()) {
return null;
} else {
- result = batchList.removeFirst();
- if (lastMessageId != null) {
- if (result.getMessageId().equals(lastMessageId)) {
- // pendingCount=0;
- }
- }
+ Iterator i = batchList.entrySet().iterator();
+ result = (Message) ((Map.Entry)i.next()).getValue();
+ i.remove();
result.setRegionDestination(regionDestination);
+ result.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
}
return result;
}
@@ -154,16 +154,23 @@
public void finished() {
}
- public synchronized boolean recoverMessage(Message message) throws Exception {
+ public synchronized boolean recoverMessage(Message message)
+ throws Exception {
MessageEvaluationContext messageEvaluationContext = new MessageEvaluationContext();
messageEvaluationContext.setMessageReference(message);
- if( subscription.matches(message, messageEvaluationContext) ) {
+ if (subscription.matches(message, messageEvaluationContext)) {
message.setRegionDestination(regionDestination);
- // only increment if count is zero (could have been cached)
- if (message.getReferenceCount() == 0) {
- message.incrementReferenceCount();
+ if (!isDuplicate(message.getMessageId())) {
+ // only increment if count is zero (could have been cached)
+ if (message.getReferenceCount() == 0) {
+ message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+ message.incrementReferenceCount();
+
+ }
+ batchList.put(message.getMessageId(), message);
+ }else {
+ this.storeMayHaveMoreMessages=true;
}
- batchList.addLast(message);
}
return true;
}
@@ -172,9 +179,23 @@
// shouldn't get called
throw new RuntimeException("Not supported");
}
+
+ /**
+ * Mark a message as already dispatched
+ * @param message
+ */
+ public synchronized void dispatched(MessageReference message) {
+ if (this.audit != null) {
+ isDuplicate(message.getMessageId());
+ Message removed = this.batchList.remove(message.getMessageId());
+ if (removed != null) {
+ removed.decrementReferenceCount();
+ }
+ }
+ }
// implementation
- protected void safeFillBatch() {
+ protected synchronized void safeFillBatch() {
try {
fillBatch();
} catch (Exception e) {
@@ -184,29 +205,17 @@
}
protected synchronized void fillBatch() throws Exception {
- if( batchResetNeeded ) {
- store.resetBatching(clientId, subscriberName);
- batchResetNeeded=false;
- storeMayHaveMoreMessages=true;
+ if (batchResetNeeded) {
+ this.store.resetBatching(clientId, subscriberName);
+ this.batchResetNeeded = false;
+ this.storeMayHaveMoreMessages = true;
}
-
- while( batchList.isEmpty() && storeMayHaveMoreMessages ) {
- store.recoverNextMessages(clientId, subscriberName, maxBatchSize, this);
- if( batchList.isEmpty() ) {
- storeMayHaveMoreMessages = false;
- } else {
- if (firstMessageId != null) {
- int pos = 0;
- for (Iterator<Message> iter = batchList.iterator(); iter.hasNext();) {
- Message msg = iter.next();
- if (msg.getMessageId().equals(firstMessageId)) {
- firstMessageId = null;
- break;
- } else {
- iter.remove();
- }
- }
- }
+ while (this.batchList.isEmpty() && this.storeMayHaveMoreMessages) {
+ this.storeMayHaveMoreMessages = false;
+ this.store.recoverNextMessages(clientId, subscriberName,
+ maxBatchSize, this);
+ if (!this.batchList.isEmpty()) {
+ this.storeMayHaveMoreMessages=true;
}
}
}
@@ -221,12 +230,23 @@
}
public synchronized void gc() {
- for (Message msg : batchList) {
+ for (Message msg : batchList.values()) {
msg.decrementReferenceCount();
}
batchList.clear();
batchResetNeeded = true;
}
+
+ public void onUsageChanged(Usage usage, int oldPercentUsage,int newPercentUsage) {
+ if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
+ storeMayHaveMoreMessages = true;
+ try {
+ fillBatch();
+ } catch (Exception e) {
+ LOG.error("Failed to fill batch ", e);
+ }
+ }
+ }
public String toString() {
return "TopicStorePrefetch" + System.identityHashCode(this) + "(" + clientId + "," + subscriberName + ")";
diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index 0697c3a..c20cfaa 100644
--- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -51,11 +51,13 @@
private PendingQueueMessageStoragePolicy pendingQueuePolicy;
private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
- private int maxProducersToAudit=1024;
- private int maxAuditDepth=1;
+ private int maxProducersToAudit=32;
+ private int maxAuditDepth=1024;
+ private int maxQueueAuditDepth=1;
private boolean enableAudit=true;
private boolean producerFlowControl = true;
-
+ private boolean optimizedDispatch=false;
+
public void configure(Queue queue, Store tmpStore) {
if (dispatchPolicy != null) {
queue.setDispatchPolicy(dispatchPolicy);
@@ -73,7 +75,7 @@
}
queue.setProducerFlowControl(isProducerFlowControl());
queue.setEnableAudit(isEnableAudit());
- queue.setMaxAuditDepth(getMaxAuditDepth());
+ queue.setMaxAuditDepth(getMaxQueueAuditDepth());
queue.setMaxProducersToAudit(getMaxProducersToAudit());
}
@@ -132,6 +134,8 @@
cursor.setSystemUsage(memoryManager);
sub.setPending(cursor);
}
+ sub.setMaxAuditDepth(getMaxAuditDepth());
+ sub.setMaxProducersToAudit(getMaxProducersToAudit());
}
// Properties
@@ -331,4 +335,20 @@
this.enableAudit = enableAudit;
}
+ public int getMaxQueueAuditDepth() {
+ return maxQueueAuditDepth;
+ }
+
+ public void setMaxQueueAuditDepth(int maxQueueAuditDepth) {
+ this.maxQueueAuditDepth = maxQueueAuditDepth;
+ }
+
+ public boolean isOptimizedDispatch() {
+ return optimizedDispatch;
+ }
+
+ public void setOptimizedDispatch(boolean optimizedDispatch) {
+ this.optimizedDispatch = optimizedDispatch;
+ }
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Message.java b/activemq-core/src/main/java/org/apache/activemq/command/Message.java
index ba61fc1..03150cd 100755
--- a/activemq-core/src/main/java/org/apache/activemq/command/Message.java
+++ b/activemq-core/src/main/java/org/apache/activemq/command/Message.java
@@ -26,6 +26,7 @@
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
@@ -81,6 +82,7 @@
private transient short referenceCount;
private transient ActiveMQConnection connection;
private transient org.apache.activemq.broker.region.Destination regionDestination;
+ private transient MemoryUsage memoryUsage;
private BrokerId[] brokerPath;
private BrokerId[] cluster;
@@ -127,6 +129,7 @@
copy.regionDestination = regionDestination;
copy.brokerInTime = brokerInTime;
copy.brokerOutTime = brokerOutTime;
+ copy.memoryUsage=this.memoryUsage;
// copying the broker path breaks networks - if a consumer re-uses a
// consumed
// message and forwards it on
@@ -567,6 +570,17 @@
public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
this.regionDestination = destination;
+ if(this.memoryUsage==null) {
+ this.memoryUsage=regionDestination.getBrokerMemoryUsage();
+ }
+ }
+
+ public MemoryUsage getMemoryUsage() {
+ return this.memoryUsage;
+ }
+
+ public void setMemoryUsage(MemoryUsage usage) {
+ this.memoryUsage=usage;
}
public boolean isMarshallAware() {
@@ -581,16 +595,15 @@
size = getSize();
}
- if (rc == 1 && regionDestination != null) {
- regionDestination.getBrokerMemoryUsage().increaseUsage(size);
+ if (rc == 1 && getMemoryUsage() != null) {
+ getMemoryUsage().increaseUsage(size);
}
- // System.out.println(" + "+getDestination()+" :::: "+getMessageId()+"
- // "+rc);
+ //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
return rc;
}
- public synchronized int decrementReferenceCount() {
+ public int decrementReferenceCount() {
int rc;
int size;
synchronized (this) {
@@ -598,11 +611,10 @@
size = getSize();
}
- if (rc == 0 && regionDestination != null) {
- regionDestination.getBrokerMemoryUsage().decreaseUsage(size);
+ if (rc == 0 && getMemoryUsage() != null) {
+ getMemoryUsage().decreaseUsage(size);
}
- // System.out.println(" - "+getDestination()+" :::: "+getMessageId()+"
- // "+rc);
+ //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
return rc;
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java b/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
index c073dc6..c7872f9 100644
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
@@ -49,6 +49,11 @@
* Command Marshaller
*/
Marshaller COMMAND_MARSHALLER = new CommandMarshaller();
+
+ /**
+ * MessageId marshaller
+ */
+ Marshaller MESSAGEID_MARSHALLER = new MessageIdMarshaller();
/**
* close the store
@@ -270,4 +275,5 @@
public boolean isPersistentIndex();
public void setPersistentIndex(boolean persistentIndex);
+
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
index 5745417..d26bf52 100644
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
@@ -178,15 +178,7 @@
}
}
if (directory != null && directory.isDirectory()) {
- File[] files = directory.listFiles();
- if (files != null) {
- for (int i = 0; i < files.length; i++) {
- File file = files[i];
- if (!file.isDirectory()) {
- result &= file.delete();
- }
- }
- }
+ result =IOHelper.deleteChildren(directory);
String str = result ? "successfully deleted" : "failed to delete";
LOG.info("Kaha Store " + str + " data directory " + directory);
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
index 1d68e2e..bebff6c 100644
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/AsyncDataManager.java
@@ -28,6 +28,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -39,10 +40,11 @@
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.IOHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+
/**
* Manages DataFiles
*
@@ -87,6 +89,7 @@
private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
+ private Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
private DataFile currentWriteFile;
private Location mark;
@@ -131,7 +134,7 @@
return dir.equals(directory) && n.startsWith(filePrefix);
}
});
-
+
if (files != null) {
for (int i = 0; i < files.length; i++) {
try {
@@ -157,6 +160,7 @@
currentWriteFile.linkAfter(df);
}
currentWriteFile = df;
+ fileByFileMap.put(df.getFile(), df);
}
}
@@ -254,8 +258,10 @@
int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1;
String fileName = filePrefix + nextNum;
- DataFile nextWriteFile = new DataFile(new File(directory, fileName), nextNum, preferedFileLength);
+ File file = new File(directory, fileName);
+ DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
+ fileByFileMap.put(file, nextWriteFile);
if (currentWriteFile != null) {
currentWriteFile.linkAfter(nextWriteFile);
if (currentWriteFile.isUnused()) {
@@ -289,6 +295,16 @@
}
return dataFile;
}
+
+ File getFile(Location item) throws IOException {
+ Integer key = Integer.valueOf(item.getDataFileId());
+ DataFile dataFile = fileMap.get(key);
+ if (dataFile == null) {
+ LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
+ throw new IOException("Could not locate data file " + filePrefix + "-" + item.getDataFileId());
+ }
+ return dataFile.getFile();
+ }
private DataFile getNextDataFile(DataFile dataFile) {
return (DataFile)dataFile.getNext();
@@ -303,6 +319,7 @@
storeState(false);
appender.close();
fileMap.clear();
+ fileByFileMap.clear();
controlFile.unlock();
controlFile.dispose();
started = false;
@@ -327,6 +344,7 @@
result &= dataFile.delete();
}
fileMap.clear();
+ fileByFileMap.clear();
lastAppendLocation.set(null);
mark = null;
currentWriteFile = null;
@@ -415,6 +433,7 @@
private synchronized void forceRemoveDataFile(DataFile dataFile)
throws IOException {
accessorPool.disposeDataFileAccessors(dataFile);
+ fileByFileMap.remove(dataFile.getFile());
DataFile removed = fileMap.remove(dataFile.getDataFileId());
storeSize.addAndGet(-dataFile.getLength());
dataFile.unlink();
@@ -461,16 +480,6 @@
cur = new Location();
cur.setDataFileId(head.getDataFileId());
cur.setOffset(0);
-
- // DataFileAccessor reader =
- // accessorPool.openDataFileAccessor(head);
- // try {
- // if( !reader.readLocationDetailsAndValidate(cur) ) {
- // return null;
- // }
- // } finally {
- // accessorPool.closeDataFileAccessor(reader);
- // }
} else {
// Set to the next offset..
cur = new Location(location);
@@ -509,6 +518,64 @@
}
}
}
+
+ public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{
+ DataFile df = fileByFileMap.get(file);
+ return getNextLocation(df, lastLocation,thisFileOnly);
+ }
+
+ public synchronized Location getNextLocation(DataFile dataFile,
+ Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException {
+
+ Location cur = null;
+ while (true) {
+ if (cur == null) {
+ if (lastLocation == null) {
+ DataFile head = (DataFile)dataFile.getHeadNode();
+ cur = new Location();
+ cur.setDataFileId(head.getDataFileId());
+ cur.setOffset(0);
+ } else {
+ // Set to the next offset..
+ cur = new Location(lastLocation);
+ cur.setOffset(cur.getOffset() + cur.getSize());
+ }
+ } else {
+ cur.setOffset(cur.getOffset() + cur.getSize());
+ }
+
+
+ // Did it go into the next file??
+ if (dataFile.getLength() <= cur.getOffset()) {
+ if (thisFileOnly) {
+ return null;
+ }else {
+ dataFile = getNextDataFile(dataFile);
+ if (dataFile == null) {
+ return null;
+ } else {
+ cur.setDataFileId(dataFile.getDataFileId().intValue());
+ cur.setOffset(0);
+ }
+ }
+ }
+
+ // Load in location size and type.
+ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+ try {
+ reader.readLocationDetails(cur);
+ } finally {
+ accessorPool.closeDataFileAccessor(reader);
+ }
+
+ if (cur.getType() == 0) {
+ return null;
+ } else if (cur.getType() > 0) {
+ // Only return user records.
+ return cur;
+ }
+ }
+ }
public ByteSequence read(Location location) throws IOException, IllegalStateException {
DataFile dataFile = getDataFile(location);
@@ -611,4 +678,12 @@
return null;
return currentWriteFile.getDataFileId();
}
+
+ /**
+ * Get a set of files - only valid after start()
+ * @return files currently being used
+ */
+ public Set<File> getFiles(){
+ return fileByFileMap.keySet();
+ }
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
index e6672f9..43e9c83 100644
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFile.java
@@ -43,6 +43,10 @@
this.dataFileId = Integer.valueOf(number);
length = (int)(file.exists() ? file.length() : 0);
}
+
+ File getFile() {
+ return file;
+ }
public Integer getDataFileId() {
return dataFileId;
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
index e1f1b76..1bbab91 100644
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java
@@ -187,8 +187,9 @@
protected final void delete(final IndexItem keyItem, final IndexItem prevItem, final IndexItem nextItem) {
if (keyItem != null) {
try {
+ root = indexList.getRoot();
IndexItem prev = prevItem == null ? root : prevItem;
- IndexItem next = nextItem != root ? nextItem : null;
+ IndexItem next = (nextItem == null || !nextItem.equals(root)) ? nextItem : null;
dataManager.removeInterestInFile(keyItem.getKeyFile());
dataManager.removeInterestInFile(keyItem.getValueFile());
if (next != null) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
index 9aeda1f..fcaa268 100644
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java
@@ -821,6 +821,9 @@
next = indexList.getNextEntry(root);
} else if (insertPos >= indexList.size()) {
prev = indexList.getLast();
+ if (prev==null) {
+ prev=root;
+ }
next = null;
} else {
prev = indexList.get(insertPos);
@@ -836,6 +839,7 @@
updateIndexes(next);
}
storeIndex(index);
+ indexList.setRoot(root);
}
} catch (IOException e) {
LOG.error("Failed to insert " + value, e);
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
index 626c84e..c34bf37 100755
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java
@@ -37,6 +37,7 @@
import org.apache.activemq.kaha.impl.index.IndexManager;
import org.apache.activemq.kaha.impl.index.VMIndex;
import org.apache.activemq.kaha.impl.index.hash.HashIndex;
+import org.apache.activemq.util.IOHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -67,7 +68,7 @@
if (index == null) {
if (persistentIndex) {
String name = containerId.getDataContainerName() + "_" + containerId.getKey();
- name = name.replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_");
+ name=IOHelper.toFileSystemSafeName(name);
try {
HashIndex hashIndex = new HashIndex(directory, name, indexManager);
hashIndex.setNumberOfBins(getIndexBinSize());
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
index 57a9242..b6b601d 100755
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java
@@ -42,7 +42,7 @@
return root;
}
- void setRoot(IndexItem e) {
+ public void setRoot(IndexItem e) {
this.root = e;
}
@@ -186,7 +186,7 @@
* @throws IndexOutOfBoundsException if the specified index is out of range (<tt>index < 0 || index > size()</tt>).
*/
public synchronized void add(int index, IndexItem element) {
- if (index == size - 1) {
+ if (index == size) {
last = element;
}
size++;
@@ -297,7 +297,7 @@
}
// essential root get's updated consistently
if (result != null && root != null && root.equals(result)) {
- return root;
+ return null;
}
return result;
}
@@ -340,7 +340,7 @@
}
public synchronized void remove(IndexItem e) {
- if (e == root || e.equals(root)) {
+ if (e==null || e == root || e.equals(root)) {
return;
}
if (e == last || e.equals(last)) {
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java
index 8c58316..1ea1a89 100644
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexLinkedList.java
@@ -24,6 +24,12 @@
* @version $Revision$
*/
public interface IndexLinkedList {
+
+ /**
+ * Set the new Root
+ * @param newRoot
+ */
+ void setRoot(IndexItem newRoot);
/**
* @return the root used by the List
diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
index 8b93236..99c5062 100755
--- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
+++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedList.java
@@ -35,6 +35,10 @@
this.root = header;
this.root.next=this.root.prev=this.root;
}
+
+ public void setRoot(IndexItem newRoot) {
+ this.root=newRoot;
+ }
public synchronized IndexItem getRoot() {
return root;
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
index b14785a..2d40489 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
@@ -59,13 +59,15 @@
topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener));
}
- public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, final MessageRecoveryListener listener) throws Exception {
+ public void recoverNextMessages(String clientId, String subscriptionName,
+ int maxReturned, final MessageRecoveryListener listener)
+ throws Exception {
RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
- topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, recoveryListener);
- if (recoveryListener.size() == 0) {
- flush();
- topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, recoveryListener);
- }
+ topicReferenceStore.recoverNextMessages(clientId, subscriptionName,maxReturned, recoveryListener);
+ if (recoveryListener.size() == 0) {
+ flush();
+ topicReferenceStore.recoverNextMessages(clientId,subscriptionName, maxReturned, recoveryListener);
+ }
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
@@ -145,14 +147,18 @@
* @param key
* @throws IOException
*/
- protected void acknowledge(ConnectionContext context,MessageId messageId, Location location, String clientId,String subscriptionName) throws IOException {
+ protected void acknowledge(ConnectionContext context, MessageId messageId,
+ Location location, String clientId, String subscriptionName)
+ throws IOException {
synchronized (this) {
lastLocation = location;
- if (topicReferenceStore.acknowledgeReference(context, clientId, subscriptionName, messageId)){
- MessageAck ack = new MessageAck();
- ack.setLastMessageId(messageId);
- removeMessage(context, ack);
- }
+ }
+ if (topicReferenceStore.acknowledgeReference(context, clientId,
+ subscriptionName, messageId)) {
+ MessageAck ack = new MessageAck();
+ ack.setLastMessageId(messageId);
+ removeMessage(context, ack);
+
}
try {
asyncWriteTask.wakeup();
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
index 9fddf34..ef43ed2 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java
@@ -63,10 +63,14 @@
throw new RuntimeException("Use addMessageReference instead");
}
- protected final boolean recoverReference(MessageRecoveryListener listener, ReferenceRecord record)
- throws Exception {
- listener.recoverMessageReference(new MessageId(record.getMessageId()));
- return listener.hasSpace();
+ protected final boolean recoverReference(MessageRecoveryListener listener,
+ ReferenceRecord record) throws Exception {
+ MessageId id = new MessageId(record.getMessageId());
+ if (listener.hasSpace()) {
+ listener.recoverMessageReference(id);
+ return true;
+ }
+ return false;
}
public synchronized void recover(MessageRecoveryListener listener) throws Exception {
@@ -90,14 +94,15 @@
entry = messageContainer.getNext(entry);
}
}
- if (entry != null) {
+ if (entry != null) {
int count = 0;
do {
ReferenceRecord msg = messageContainer.getValue(entry);
- if (msg != null) {
- recoverReference(listener, msg);
- count++;
- lastBatchId = msg.getMessageId();
+ if (msg != null ) {
+ if ( recoverReference(listener, msg)) {
+ count++;
+ lastBatchId = msg.getMessageId();
+ }
} else {
lastBatchId = null;
}
@@ -134,7 +139,7 @@
removeMessage(ack.getLastMessageId());
}
- public synchronized void removeMessage(MessageId msgId) throws IOException {
+ public synchronized void removeMessage(MessageId msgId) throws IOException {
StoreEntry entry = messageContainer.getEntry(msgId);
if (entry != null) {
ReferenceRecord rr = messageContainer.remove(msgId);
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
index 4744256..4eed8eb 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
@@ -56,7 +56,7 @@
private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
private static final String STORE_STATE = "store-state";
private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
- private static final Integer INDEX_VERSION = new Integer(2);
+ private static final Integer INDEX_VERSION = new Integer(3);
private static final String RECORD_REFERENCES = "record-references";
private static final String TRANSACTIONS = "transactions-state";
private MapContainer stateMap;
@@ -91,11 +91,13 @@
boolean empty = store.getMapContainerIds().isEmpty();
stateMap = store.getMapContainer("state", STORE_STATE);
stateMap.load();
+ storeValid=true;
if (!empty) {
AtomicBoolean status = (AtomicBoolean)stateMap.get(STORE_STATE);
if (status != null) {
storeValid = status.get();
}
+
if (storeValid) {
//check what version the indexes are at
Integer indexVersion = (Integer) stateMap.get(INDEX_VERSION_NAME);
@@ -236,7 +238,9 @@
* @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
*/
public void clearMessages() throws IOException {
- deleteAllMessages();
+ //don't delete messages as it will clear state - call base
+ //class method to clear out the data instead
+ super.deleteAllMessages();
}
/**
@@ -247,6 +251,7 @@
public void recoverState() throws IOException {
for (Iterator<SubscriptionInfo> i = durableSubscribers.iterator(); i.hasNext();) {
SubscriptionInfo info = i.next();
+ LOG.info("Recovering subscriber state for durable subscriber: " + info);
TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
ts.addSubsciption(info, false);
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
index 61a45af..f58689f 100755
--- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java
@@ -117,7 +117,7 @@
subscriberContainer.put(key, info);
}
// add the subscriber
- ListContainer container = addSubscriberMessageContainer(key);
+ addSubscriberMessageContainer(key);
/*
* if(retroactive){ for(StoreEntry
* entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
@@ -200,33 +200,39 @@
return result;
}
- protected ListContainer addSubscriberMessageContainer(Object key) throws IOException {
- ListContainer container = store.getListContainer(key, "topic-subs");
+ protected MapContainer addSubscriberMessageContainer(Object key) throws IOException {
+ MapContainer container = store.getMapContainer(key, "topic-subs");
+ container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
Marshaller marshaller = new ConsumerMessageRefMarshaller();
- container.setMarshaller(marshaller);
+ container.setValueMarshaller(marshaller);
TopicSubContainer tsc = new TopicSubContainer(container);
subscriberMessages.put(key, tsc);
return container;
}
- protected void removeSubscriberMessageContainer(Object key) throws IOException {
+ protected void removeSubscriberMessageContainer(Object key)
+ throws IOException {
subscriberContainer.remove(key);
TopicSubContainer container = subscriberMessages.remove(key);
- for (Iterator i = container.iterator(); i.hasNext();) {
- ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
- if (ref != null) {
- TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
- if (tsa != null) {
- if (tsa.decrementCount() <= 0) {
- ackContainer.remove(ref.getAckEntry());
- messageContainer.remove(tsa.getMessageEntry());
- } else {
- ackContainer.update(ref.getAckEntry(), tsa);
+ if (container != null) {
+ for (Iterator i = container.iterator(); i.hasNext();) {
+ ConsumerMessageRef ref = (ConsumerMessageRef) i.next();
+ if (ref != null) {
+ TopicSubAck tsa = ackContainer.get(ref.getAckEntry());
+ if (tsa != null) {
+ if (tsa.decrementCount() <= 0) {
+ ackContainer.remove(ref.getAckEntry());
+ messageContainer.remove(tsa.getMessageEntry());
+ } else {
+ ackContainer.update(ref.getAckEntry(), tsa);
+ }
}
}
}
+ container.clear();
}
store.deleteListContainer(key, "topic-subs");
+
}
public int getMessageCount(String clientId, String subscriberName) throws IOException {
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
index 2da1537..607a316 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java
@@ -33,6 +33,7 @@
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicReferenceStore;
+import org.apache.activemq.util.SubscriptionKey;
public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore {
@@ -40,6 +41,7 @@
protected Map<String, TopicSubContainer> subscriberMessages = new ConcurrentHashMap<String, TopicSubContainer>();
private Map<String, SubscriptionInfo> subscriberContainer;
private Store store;
+ private static final String TOPIC_SUB_NAME = "tsn";
public KahaTopicReferenceStore(Store store, KahaReferenceStoreAdapter adapter,
MapContainer<MessageId, ReferenceRecord> messageContainer, ListContainer<TopicSubAck> ackContainer,
@@ -108,10 +110,12 @@
}
}
- protected ListContainer addSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
- ListContainer container = store.getListContainer(clientId+":"+subscriptionName+":"+destination.getQualifiedName(), "topic-subs-references");
+
+ protected MapContainer addSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
+ MapContainer container = store.getMapContainer(getSubscriptionContainerName(getSubscriptionKey(clientId, subscriptionName)));
+ container.setKeyMarshaller(Store.MESSAGEID_MARSHALLER);
Marshaller marshaller = new ConsumerMessageRefMarshaller();
- container.setMarshaller(marshaller);
+ container.setValueMarshaller(marshaller);
TopicSubContainer tsc = new TopicSubContainer(container);
subscriberMessages.put(getSubscriptionKey(clientId, subscriptionName), tsc);
return container;
@@ -192,7 +196,7 @@
adapter.addSubscriberState(info);
}
// add the subscriber
- ListContainer container = addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
+ addSubscriberMessageContainer(info.getClientId(), info.getSubscriptionName());
if (retroactive) {
/*
* for(StoreEntry
@@ -210,13 +214,13 @@
if (info != null) {
adapter.removeSubscriberState(info);
}
- String key = getSubscriptionKey(clientId, subscriptionName);
- removeSubscriberMessageContainer(key);
+ removeSubscriberMessageContainer(clientId,subscriptionName);
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
- return subscriberContainer.values()
+ SubscriptionInfo[] result = subscriberContainer.values()
.toArray(new SubscriptionInfo[subscriberContainer.size()]);
+ return result;
}
public int getMessageCount(String clientId, String subscriberName) throws IOException {
@@ -244,15 +248,19 @@
entry = container.getNextEntry(entry);
}
}
-
+
if (entry != null) {
do {
ConsumerMessageRef consumerRef = container.get(entry);
- ReferenceRecord msg = messageContainer.getValue(consumerRef.getMessageEntry());
+ ReferenceRecord msg = messageContainer.getValue(consumerRef
+ .getMessageEntry());
if (msg != null) {
- recoverReference(listener, msg);
- count++;
- container.setBatchEntry(msg.getMessageId(), entry);
+ if (recoverReference(listener, msg)) {
+ count++;
+ container.setBatchEntry(msg.getMessageId(), entry);
+ } else {
+ break;
+ }
} else {
container.reset();
}
@@ -288,9 +296,11 @@
}
}
- protected void removeSubscriberMessageContainer(String key) throws IOException {
- subscriberContainer.remove(key);
- TopicSubContainer container = subscriberMessages.remove(key);
+ protected void removeSubscriberMessageContainer(String clientId, String subscriptionName) throws IOException {
+ String subscriberKey = getSubscriptionKey(clientId, subscriptionName);
+ String containerName = getSubscriptionContainerName(subscriberKey);
+ subscriberContainer.remove(subscriberKey);
+ TopicSubContainer container = subscriberMessages.remove(subscriberKey);
for (Iterator i = container.iterator(); i.hasNext();) {
ConsumerMessageRef ref = (ConsumerMessageRef)i.next();
if (ref != null) {
@@ -305,12 +315,18 @@
}
}
}
- store.deleteListContainer(destination, "topic-subs-references-" + key);
+ store.deleteMapContainer(containerName);
}
protected String getSubscriptionKey(String clientId, String subscriberName) {
- String result = clientId + ":";
- result += subscriberName != null ? subscriberName : "NOT_SET";
- return result;
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(clientId).append(":");
+ String name = subscriberName != null ? subscriberName : "NOT_SET";
+ return buffer.append(name).toString();
+ }
+
+ private String getSubscriptionContainerName(String subscriptionKey) {
+ StringBuffer buffer = new StringBuffer(subscriptionKey);
+ return buffer.append(":").append(destination.getQualifiedName()).append(TOPIC_SUB_NAME).toString();
}
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
index 8497fea..5e4b576 100644
--- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
+++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/TopicSubContainer.java
@@ -19,7 +19,7 @@
import java.util.Iterator;
import org.apache.activemq.command.MessageId;
-import org.apache.activemq.kaha.ListContainer;
+import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.StoreEntry;
/**
@@ -28,11 +28,11 @@
* @version $Revision: 1.10 $
*/
public class TopicSubContainer {
- private transient ListContainer listContainer;
+ private transient MapContainer mapContainer;
private transient StoreEntry batchEntry;
- public TopicSubContainer(ListContainer container) {
- this.listContainer = container;
+ public TopicSubContainer(MapContainer container) {
+ this.mapContainer = container;
}
/**
@@ -55,75 +55,56 @@
}
public boolean isEmpty() {
- return listContainer.isEmpty();
+ return mapContainer.isEmpty();
}
public StoreEntry add(ConsumerMessageRef ref) {
- return listContainer.placeLast(ref);
+ return mapContainer.place(ref.getMessageId(),ref);
}
public ConsumerMessageRef remove(MessageId id) {
ConsumerMessageRef result = null;
- if (!listContainer.isEmpty()) {
- StoreEntry entry = listContainer.getFirst();
- while (entry != null) {
- ConsumerMessageRef ref = (ConsumerMessageRef)listContainer.get(entry);
- listContainer.remove(entry);
- if (listContainer != null && batchEntry != null && (listContainer.isEmpty() || batchEntry.equals(entry))) {
- reset();
- }
- if (ref != null && ref.getMessageId().equals(id)) {
- result = ref;
- break;
- }
- entry = listContainer.getFirst();
+ StoreEntry entry = mapContainer.getEntry(id);
+ if (entry != null) {
+ result = (ConsumerMessageRef) mapContainer.getValue(entry);
+ mapContainer.remove(entry);
+ if (batchEntry != null && batchEntry.equals(entry)) {
+ reset();
}
}
+ if(mapContainer.isEmpty()) {
+ reset();
+ }
return result;
}
- public ConsumerMessageRef removeFirst() {
- ConsumerMessageRef result = null;
- if (!listContainer.isEmpty()) {
- StoreEntry entry = listContainer.getFirst();
-
- result = (ConsumerMessageRef) listContainer.get(entry);
- listContainer.remove(entry);
- if (listContainer != null && batchEntry != null
- && (listContainer.isEmpty() || batchEntry.equals(entry))) {
- reset();
- }
-
- }
- return result;
- }
-
+
public ConsumerMessageRef get(StoreEntry entry) {
- return (ConsumerMessageRef)listContainer.get(entry);
+ return (ConsumerMessageRef)mapContainer.getValue(entry);
}
public StoreEntry getEntry() {
- return listContainer.getFirst();
+ return mapContainer.getFirst();
}
public StoreEntry refreshEntry(StoreEntry entry) {
- return listContainer.refresh(entry);
+ return mapContainer.refresh(entry);
}
public StoreEntry getNextEntry(StoreEntry entry) {
- return listContainer.getNext(entry);
+ return mapContainer.getNext(entry);
}
public Iterator iterator() {
- return listContainer.iterator();
+ return mapContainer.values().iterator();
}
public int size() {
- return listContainer.size();
+ return mapContainer.size();
}
public void clear() {
reset();
- listContainer.clear();
+ mapContainer.clear();
}
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
index dfb2f55..6953415 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
@@ -74,13 +74,18 @@
if (!commandSent.get()) {
LOG.trace("No message sent since last write check, sending a KeepAliveInfo");
- try {
- synchronized (writeChecker) {
- next.oneway(new KeepAliveInfo());
- }
- } catch (IOException e) {
- onException(e);
- }
+ // TODO: use a thread pool for this..
+ Thread thread = new Thread("ActiveMQ: Activity Generator: "+next.getRemoteAddress()) {
+ public void run() {
+ try {
+ oneway(new KeepAliveInfo());
+ } catch (IOException e) {
+ onException(e);
+ }
+ };
+ };
+ thread.setDaemon(true);
+ thread.start();
} else {
LOG.trace("Message sent since last write check, resetting flag");
}
@@ -96,9 +101,16 @@
if (!commandReceived.get()) {
LOG.debug("No message received since last read check for " + toString() + "! Throwing InactivityIOException.");
- synchronized (readChecker) {
- onException(new InactivityIOException("Channel was inactive for too long."));
- }
+
+ // TODO: use a thread pool for this..
+ Thread thread = new Thread("ActiveMQ: Inactivity Handler: "+next.getRemoteAddress()) {
+ public void run() {
+ onException(new InactivityIOException("Channel was inactive for too long: "+next.getRemoteAddress()));
+ };
+ };
+ thread.setDaemon(true);
+ thread.start();
+
} else {
LOG.trace("Message received since last read check, resetting flag: ");
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
index 281dce0..183d5fd 100755
--- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
+++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
@@ -104,6 +104,7 @@
private Map<String, Object> socketOptions;
private Boolean keepAlive;
private Boolean tcpNoDelay;
+ private Thread runnerThread;
/**
* Connect to a remote Node - e.g. a Broker
@@ -165,6 +166,7 @@
*/
public void run() {
LOG.trace("TCP consumer thread starting");
+ this.runnerThread=Thread.currentThread();
try {
while (!isStopped()) {
doRun();
@@ -436,7 +438,7 @@
public void stop() throws Exception {
super.stop();
CountDownLatch countDownLatch = stoppedLatch.get();
- if (countDownLatch != null) {
+ if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
countDownLatch.await();
}
}
diff --git a/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java b/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
index 9c9e2d7..8c2ce06 100755
--- a/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
+++ b/activemq-core/src/main/java/org/apache/activemq/usage/MemoryUsage.java
@@ -118,15 +118,15 @@
if (value == 0) {
return;
}
- if (parent != null) {
- ((MemoryUsage)parent).increaseUsage(value);
- }
int percentUsage;
synchronized (usageMutex) {
usage += value;
percentUsage = caclPercentUsage();
}
setPercentUsage(percentUsage);
+ if (parent != null) {
+ ((MemoryUsage)parent).increaseUsage(value);
+ }
}
/**
@@ -138,15 +138,15 @@
if (value == 0) {
return;
}
- if (parent != null) {
- parent.decreaseUsage(value);
- }
int percentUsage;
synchronized (usageMutex) {
usage -= value;
percentUsage = caclPercentUsage();
}
setPercentUsage(percentUsage);
+ if (parent != null) {
+ parent.decreaseUsage(value);
+ }
}
protected long retrieveUsage() {
diff --git a/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java b/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
index 30e245f..9d88f35 100755
--- a/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
+++ b/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
@@ -30,6 +30,7 @@
private int firstIndex = -1;
private int firstBin = -1;
private long lastBitSet=-1;
+ private long lastInOrderBit=-1;
/**
* Create a BitArrayBin to a certain window size (number of messages to
@@ -76,10 +77,15 @@
* @return true if next message is in order
*/
public boolean isInOrder(long index) {
- if (lastBitSet== -1) {
- return true;
+ boolean result = false;
+ if (lastInOrderBit == -1) {
+ result = true;
+ } else {
+ result = lastInOrderBit + 1 == index;
}
- return lastBitSet+1==index;
+ lastInOrderBit = index;
+ return result;
+
}
/**
diff --git a/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java b/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java
index 374d356..c2257c1 100644
--- a/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java
+++ b/activemq-core/src/main/java/org/apache/activemq/util/IOHelper.java
@@ -23,7 +23,7 @@
* @version $Revision$
*/
public final class IOHelper {
-
+ protected static final int MAX_FILE_NAME_LENGTH;
private IOHelper() {
}
@@ -74,6 +74,10 @@
rc.append(HexSupport.toHexFromInt(c, true));
}
}
+ String result = rc.toString();
+ if (result.length() > MAX_FILE_NAME_LENGTH) {
+ result = result.substring(0,MAX_FILE_NAME_LENGTH);
+ }
return rc.toString();
}
@@ -120,6 +124,10 @@
throw new IOException("Failed to move " + src + " to " + targetDirectory);
}
}
+
+ static {
+ MAX_FILE_NAME_LENGTH = Integer.valueOf(System.getProperty("MaximumFileNameLength","200")).intValue();
+ }
}
diff --git a/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java b/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
index 1157d31..5ac7d0c 100755
--- a/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/ActiveMQMessageAuditTest.java
@@ -105,6 +105,7 @@
String id = idGen.generateId();
if (i==0) {
assertFalse(audit.isDuplicate(id));
+ assertTrue(audit.isInOrder(id));
}
if (i > 1 && i%2 != 0) {
list.add(id);
diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
index 34f6f0d..2c4dbcf 100755
--- a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
@@ -63,6 +63,8 @@
MessageConsumer consumer = consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
consumerConnection.start();
consumerConnection.close();
+ broker.stop();
+ broker =createBroker(false);
Connection producerConnection = factory.createConnection();
@@ -79,7 +81,8 @@
}
}
producerConnection.close();
-
+ broker.stop();
+ broker =createBroker(false);
consumerConnection = factory.createConnection();
consumerConnection.setClientID(CONSUMER_NAME);
@@ -102,7 +105,7 @@
protected void setUp() throws Exception {
if (broker == null) {
- broker = createBroker();
+ broker = createBroker(true);
}
@@ -128,19 +131,19 @@
*
* @throws Exception
*/
- protected BrokerService createBroker() throws Exception {
+ protected BrokerService createBroker(boolean deleteStore) throws Exception {
BrokerService answer = new BrokerService();
- configureBroker(answer);
+ configureBroker(answer,deleteStore);
answer.start();
return answer;
}
- protected void configureBroker(BrokerService answer) throws Exception {
+ protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception {
+ answer.setDeleteAllMessagesOnStartup(deleteStore);
answer.addConnector(bindAddress);
- answer.setDeleteAllMessagesOnStartup(true);
- answer.setUseShutdownHook(false);
+ answer.setUseShutdownHook(true);
}
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
index 39a739e..99aa98a 100644
--- a/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java
@@ -123,10 +123,7 @@
WorkMessage work = (WorkMessage)((ObjectMessage)message).getObject();
long c = counter.incrementAndGet();
- if (c % 1 == 0) {
- System.out.println("Worker now has message count of: " + c);
- }
-
+
// Don't create a new work item for every BATCH_SIZE message. */
if (c % BATCH_SIZE != 0)
{
diff --git a/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedListTest.java b/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedListTest.java
new file mode 100644
index 0000000..033c35c
--- /dev/null
+++ b/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedListTest.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.kaha.impl.index;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.activemq.kaha.impl.DataManager;
+import org.apache.activemq.kaha.impl.data.DataManagerImpl;
+import org.apache.activemq.kaha.impl.data.Item;
+import org.apache.activemq.util.IOHelper;
+
+
+public class DiskIndexLinkedListTest extends VMIndexLinkedListTest {
+
+ private IndexManager im;
+ protected IndexLinkedList createList(IndexItem root) throws IOException {
+ String dirName = System.getProperty("basedir", ".") + "/target/activemq-data/testIndex";
+ File file = new File(dirName);
+ file.mkdirs();
+ IOHelper.deleteChildren(file);
+ DataManager dm = new DataManagerImpl(file,"test",new AtomicLong());
+ im = new IndexManager(file,"test","rw",dm,new AtomicLong());
+ root = im.createNewIndex();
+ im.storeIndex(root);
+ return new DiskIndexLinkedList(im,root);
+ }
+
+ IndexItem createIndex(IndexLinkedList indexList,int offset) throws IOException {
+ IndexItem result = im.createNewIndex();
+ im.storeIndex(result);
+ return result;
+ }
+
+ protected void addToList(IndexLinkedList list,IndexItem item) throws IOException {
+ IndexItem root = list.getRoot();
+ IndexItem prev = list.getLast();
+ prev = prev != null ? prev : root;
+ IndexItem next = list.getNextEntry(prev);
+ prev.setNextItem(item.getOffset());
+ item.setPreviousItem(prev.getOffset());
+ im.updateIndexes(prev);
+ if (next != null) {
+ next.setPreviousItem(item.getOffset());
+ item.setNextItem(next.getOffset());
+ im.updateIndexes(next);
+ }
+ im.storeIndex(item);
+ list.add(item);
+ }
+
+ protected void insertToList(IndexLinkedList list,int pos,IndexItem item) throws IOException {
+ IndexItem root = list.getRoot();
+ IndexItem prev = null;
+ IndexItem next = null;
+ if (pos <= 0) {
+ prev = root;
+ next = list.getNextEntry(root);
+ } else if (pos >= list.size()) {
+ prev = list.getLast();
+ if (prev==null) {
+ prev=root;
+ }
+ next = null;
+ } else {
+ prev = list.get(pos);
+ prev = prev != null ? prev : root;
+ next = list.getNextEntry(prev);
+ }
+ prev.setNextItem(item.getOffset());
+ item.setPreviousItem(prev.getOffset());
+ im.updateIndexes(prev);
+ if (next != null) {
+ next.setPreviousItem(item.getOffset());
+ item.setNextItem(next.getOffset());
+ im.updateIndexes(next);
+ }
+ im.storeIndex(item);
+ list.setRoot(root);
+ list.add(pos,item);
+ }
+
+ protected void insertFirst(IndexLinkedList list,IndexItem item) throws IOException {
+ IndexItem root = list.getRoot();
+ IndexItem prev = root;
+ IndexItem next = list.getNextEntry(prev);
+ prev.setNextItem(item.getOffset());
+ item.setPreviousItem(prev.getOffset());
+ im.updateIndexes(prev);
+ if (next != null) {
+ next.setPreviousItem(item.getOffset());
+ item.setNextItem(next.getOffset());
+ im.updateIndexes(next);
+ }
+ im.storeIndex(item);
+ list.addFirst(item);
+ }
+
+ protected synchronized void remove(IndexLinkedList list,IndexItem item) throws IOException {
+ IndexItem root = list.getRoot();
+ IndexItem prev = list.getPrevEntry(item);
+ IndexItem next = list.getNextEntry(item);
+ list.remove(item);
+
+ prev = prev == null ? root : prev;
+ next = (next == null || !next.equals(root)) ? next : null;
+
+ if (next != null) {
+ prev.setNextItem(next.getOffset());
+ next.setPreviousItem(prev.getOffset());
+ im.updateIndexes(next);
+ } else {
+ prev.setNextItem(Item.POSITION_NOT_SET);
+ }
+ im.updateIndexes(prev);
+ }
+}
diff --git a/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java b/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
index 2d0040b..63dff4b 100644
--- a/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
+++ b/activemq-core/src/test/java/org/apache/activemq/kaha/impl/index/VMIndexLinkedListTest.java
@@ -16,30 +16,34 @@
*/
package org.apache.activemq.kaha.impl.index;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.activemq.kaha.StoreEntry;
+import org.apache.activemq.kaha.impl.data.Item;
/**
* @version $Revision: 1.2 $
*/
public class VMIndexLinkedListTest extends TestCase {
- static final int NUMBER = 10;
+ static final int NUMBER = 30;
private IndexItem root;
- private List testData = new ArrayList();
+ private List<IndexItem> testData = new ArrayList<IndexItem>();
private IndexLinkedList list;
protected void setUp() throws Exception {
super.setUp();
+
+ IndexItem item = new IndexItem();
+ list = createList(item);
+ this.root = list.getRoot();
+
for (int i = 0; i < NUMBER; i++) {
- IndexItem item = new IndexItem();
- item.setOffset(i);
+ item = createIndex(list,i);
testData.add(item);
}
- root = new IndexItem();
- list = new VMIndexLinkedList(root);
}
protected void tearDown() throws Exception {
@@ -47,95 +51,125 @@
testData.clear();
list = null;
}
+
+ IndexItem createIndex(IndexLinkedList list,int offset) throws IOException {
+ IndexItem result = new IndexItem();
+ result.setOffset(offset);
+ return result;
+ }
+ protected IndexLinkedList createList(IndexItem root) throws IOException {
+ return new VMIndexLinkedList(root);
+ }
+
+ protected void addToList(IndexLinkedList list,IndexItem item) throws IOException {
+ list.add(item);
+ }
+
+ protected void insertToList(IndexLinkedList list,int pos,IndexItem item) throws IOException {
+ list.add(pos, item);
+ }
+
+ protected void insertFirst(IndexLinkedList list,IndexItem item) throws IOException {
+ list.addFirst(item);
+ }
+
+ protected synchronized void remove(IndexLinkedList list,IndexItem item) throws IOException {
+ IndexItem root = list.getRoot();
+ IndexItem prev = list.getPrevEntry(item);
+ IndexItem next = list.getNextEntry(item);
+ list.remove(item);
+ }
+
/*
- * Test method for
+ * test method for
* 'org.apache.activemq.kaha.impl.VMIndexLinkedList.getFirst()'
*/
- public void testGetFirst() {
+ public void testGetFirst() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.add((IndexItem)testData.get(i));
+ addToList(list,testData.get(i));
}
- assertTrue(list.getFirst() == testData.get(0));
+ assertNotNull(list.getFirst());
+ assertTrue(list.getFirst().equals(testData.get(0)));
}
/*
- * Test method for
+ * test method for
* 'org.apache.activemq.kaha.impl.VMIndexLinkedList.getLast()'
*/
- public void testGetLast() {
+ public void testGetLast() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.add((IndexItem)testData.get(i));
+ addToList(list,testData.get(i));
}
assertTrue(list.getLast() == testData.get(testData.size() - 1));
}
/*
- * Test method for
+ * test method for
* 'org.apache.activemq.kaha.impl.VMIndexLinkedList.removeFirst()'
*/
- public void testRemoveFirst() {
+ public void testRemoveFirst() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.add((IndexItem)testData.get(i));
+ addToList(list,testData.get(i));
}
- assertTrue(list.removeFirst() == testData.get(0));
+ assertTrue(list.removeFirst().equals(testData.get(0)));
}
/*
- * Test method for
+ * test method for
* 'org.apache.activemq.kaha.impl.VMIndexLinkedList.removeLast()'
*/
- public void testRemoveLast() {
+ public void testRemoveLast() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.add((IndexItem)testData.get(i));
+ addToList(list,testData.get(i));
}
- assertTrue(list.removeLast() == testData.get(testData.size() - 1));
+ assertTrue(list.removeLast().equals(testData.get(testData.size() - 1)));
}
/*
- * Test method for
+ * test method for
* 'org.apache.activemq.kaha.impl.VMIndexLinkedList.addFirst(IndexItem)'
*/
- public void testAddFirst() {
+ public void testAddFirst() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.addFirst((IndexItem)testData.get(i));
+ insertFirst(list, testData.get(i));
}
int count = 0;
for (int i = testData.size() - 1; i >= 0; i--) {
- assertTrue(testData.get(i) == list.get(count++));
+ assertTrue(testData.get(i).equals(list.get(count++)));
}
}
/*
- * Test method for
+ * test method for
* 'org.apache.activemq.kaha.impl.VMIndexLinkedList.addLast(IndexItem)'
*/
- public void testAddLast() {
+ public void testAddLast() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.addLast((IndexItem)testData.get(i));
+ addToList(list,testData.get(i));
}
for (int i = 0; i < testData.size(); i++) {
- assertTrue(testData.get(i) == list.get(i));
+ assertTrue(testData.get(i).equals(list.get(i)));
}
}
/*
- * Test method for 'org.apache.activemq.kaha.impl.VMIndexLinkedList.size()'
+ * test method for 'org.apache.activemq.kaha.impl.VMIndexLinkedList.size()'
*/
- public void testSize() {
+ public void testSize() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.addLast((IndexItem)testData.get(i));
+ addToList(list,testData.get(i));
assertTrue(list.size() == i + 1);
}
}
/*
- * Test method for
+ * test method for
* 'org.apache.activemq.kaha.impl.VMIndexLinkedList.isEmpty()'
*/
- public void testIsEmpty() {
+ public void testIsEmpty() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.addLast((IndexItem)testData.get(i));
+ addToList(list,testData.get(i));
assertTrue(list.size() == i + 1);
}
list.clear();
@@ -143,24 +177,24 @@
}
/*
- * Test method for
+ * test method for
* 'org.apache.activemq.kaha.impl.VMIndexLinkedList.add(IndexItem)'
*/
- public void testAddIndexItem() {
+ public void testAddIndexItem() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.add((IndexItem)testData.get(i));
+ addToList(list,testData.get(i));
}
for (int i = 0; i < testData.size(); i++) {
- assertTrue(testData.get(i) == list.get(i));
+ assertTrue(testData.get(i).equals(list.get(i)));
}
}
/*
- * Test method for 'org.apache.activemq.kaha.impl.VMIndexLinkedList.clear()'
+ * test method for 'org.apache.activemq.kaha.impl.VMIndexLinkedList.clear()'
*/
- public void testClear() {
+ public void testClear() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.addLast((IndexItem)testData.get(i));
+ addToList(list,testData.get(i));
assertTrue(list.size() == i + 1);
}
list.clear();
@@ -168,32 +202,32 @@
}
/*
- * Test method for 'org.apache.activemq.kaha.impl.VMIndexLinkedList.add(int,
+ * test method for 'org.apache.activemq.kaha.impl.VMIndexLinkedList.add(int,
* IndexItem)'
*/
- public void testAddIntIndexItem() {
- for (int i = 0; i < testData.size(); i++) {
- list.add(i, (IndexItem)testData.get(i));
+ public void testAddIntIndexItem() throws IOException {
+ for (int i = 0; i < this.testData.size(); i++) {
+ insertToList(list, i, testData.get(i));
}
for (int i = 0; i < testData.size(); i++) {
- assertTrue(testData.get(i) == list.get(i));
+ assertTrue(testData.get(i).equals(list.get(i)));
}
}
/*
- * Test method for
+ * test method for
* 'org.apache.activemq.kaha.impl.VMIndexLinkedList.remove(int)'
*/
- public void testRemoveInt() {
+ public void testRemoveInt() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.add(i, (IndexItem)testData.get(i));
+ insertToList(list, i, testData.get(i));
}
for (int i = 0; i < testData.size(); i++) {
list.remove(0);
}
assertTrue(list.isEmpty());
for (int i = 0; i < testData.size(); i++) {
- list.add(i, (IndexItem)testData.get(i));
+ insertToList(list, i, testData.get(i));
}
for (int i = 0; i < testData.size(); i++) {
list.remove(list.size() - 1);
@@ -202,63 +236,81 @@
}
/*
- * Test method for
+ * test method for
* 'org.apache.activemq.kaha.impl.VMIndexLinkedList.indexOf(IndexItem)'
*/
- public void testIndexOf() {
+ public void testIndexOf() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.add(i, (IndexItem)testData.get(i));
+ addToList(list,testData.get(i));
}
for (int i = 0; i < testData.size(); i++) {
- assertTrue(list.indexOf((StoreEntry)testData.get(i)) == i);
+ assertTrue(list.indexOf(testData.get(i)) == i);
}
}
/*
- * Test method for
+ * test method for
* 'org.apache.activemq.kaha.impl.VMIndexLinkedList.getNextEntry(IndexItem)'
*/
- public void testGetNextEntry() {
+ public void testGetNextEntry() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.add(i, (IndexItem)testData.get(i));
+ addToList(list,testData.get(i));
}
IndexItem next = list.getFirst();
int count = 0;
while (next != null) {
- assertTrue(next == testData.get(count++));
+ assertTrue(next.equals(testData.get(count++)));
next = list.getNextEntry(next);
- assertTrue(next != root);
+ assertTrue(next == null || !next.equals(root));
}
}
/*
- * Test method for
+ * test method for
* 'org.apache.activemq.kaha.impl.VMIndexLinkedList.getPrevEntry(IndexItem)'
*/
- public void testGetPrevEntry() {
+ public void testGetPrevEntry() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.add(i, (IndexItem)testData.get(i));
+ addToList(list,testData.get(i));
}
IndexItem next = list.getLast();
int count = testData.size() - 1;
while (next != null) {
- assertTrue(next == testData.get(count--));
+ assertTrue(next.equals(testData.get(count--)));
next = list.getPrevEntry(next);
- assertTrue(next != root);
+ assertTrue(next == null || !next.equals(root));
}
}
/*
- * Test method for
+ * test method for
* 'org.apache.activemq.kaha.impl.VMIndexLinkedList.remove(IndexItem)'
*/
- public void testRemoveIndexItem() {
+ public void testRemoveIndexItem() throws IOException {
for (int i = 0; i < testData.size(); i++) {
- list.add(i, (IndexItem)testData.get(i));
+ addToList(list,testData.get(i));
}
for (int i = 0; i < testData.size(); i++) {
- list.remove((IndexItem)testData.get(i));
+ list.remove(testData.get(i));
assertTrue(list.size() == testData.size() - i - 1);
}
}
+
+ public void testAddRemove() throws IOException {
+ IndexItem a = createIndex(list,0);
+ addToList(list, a);
+ IndexItem b = createIndex(list,1);
+ addToList(list, b);
+ IndexItem c = createIndex(list,2);
+ addToList(list, c);
+ IndexItem d = createIndex(list,3);
+ addToList(list, d);
+ remove(list, d);
+ assertTrue(list.getLast().equals(c));
+ assertTrue(list.getNextEntry(b).equals(c));
+ remove(list, b);
+ assertTrue(list.getNextEntry(a).equals(c));
+ assertTrue(list.getLast().equals(c));
+
+ }
}
diff --git a/activemq-web-demo/src/main/webapp/index.html b/activemq-web-demo/src/main/webapp/index.html
index 1aa5e9a..726bbd7 100755
--- a/activemq-web-demo/src/main/webapp/index.html
+++ b/activemq-web-demo/src/main/webapp/index.html
@@ -38,7 +38,7 @@
</p>
<p>
-<a href="portfolio/portfolio.html">Porfolio</a> example shows how you could make an interactive trading portfolio which
+<a href="portfolio/portfolio.html">Portfolio</a> example shows how you could make an interactive trading portfolio which
updates in real time as the market prices change
</p>