[AMQ-9437] AdvancedDestination statistics networkEnqueue and networkDequeue counters
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
index 12e9f77..c8af4a6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
@@ -190,9 +190,16 @@
dest.clearPendingMessages(opCount);
dest.getDestinationStatistics().getEnqueues().add(opCount);
dest.getDestinationStatistics().getMessages().add(opCount);
+
+ if(dest.isAdvancedNetworkStatisticsEnabled() && transactionBroker.context != null && transactionBroker.context.isNetworkConnection()) {
+ dest.getDestinationStatistics().getNetworkEnqueues().add(opCount);
+ }
LOG.debug("cleared pending from afterCommit: {}", destination);
} else {
dest.getDestinationStatistics().getDequeues().add(opCount);
+ if(dest.isAdvancedNetworkStatisticsEnabled() && transactionBroker.context != null && transactionBroker.context.isNetworkConnection()) {
+ dest.getDestinationStatistics().getNetworkDequeues().add(opCount);
+ }
}
}
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index 8abcc67..02a7f65 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -599,4 +599,25 @@
public long getMaxUncommittedExceededCount() {
return destination.getDestinationStatistics().getMaxUncommittedExceededCount().getCount();
}
+
+ @Override
+ public boolean isAdvancedNetworkStatisticsEnabled() {
+ return destination.isAdvancedNetworkStatisticsEnabled();
+ }
+
+ @Override
+ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
+ destination.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
+ }
+
+ @Override
+ public long getNetworkEnqueues() {
+ return destination.getDestinationStatistics().getNetworkEnqueues().getCount();
+ }
+
+ @Override
+ public long getNetworkDequeues() {
+ return destination.getDestinationStatistics().getNetworkDequeues().getCount();
+ }
+
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
index 45ed51b..328ddb0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
@@ -481,4 +481,16 @@
@MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination")
long getMaxUncommittedExceededCount();
+
+ @MBeanInfo("Query Advanced Network Statistics flag")
+ boolean isAdvancedNetworkStatisticsEnabled();
+
+ @MBeanInfo("Toggle Advanced Network Statistics flag")
+ void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled);
+
+ @MBeanInfo("Number of messages sent to the destination via network connection")
+ long getNetworkEnqueues();
+
+ @MBeanInfo("Number of messages acknowledged from the destination via network connection")
+ long getNetworkDequeues();
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 4ca3913..e34f23a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -110,6 +110,8 @@
protected final Scheduler scheduler;
private boolean disposed = false;
private boolean doOptimzeMessageStorage = true;
+ private boolean advancedNetworkStatisticsEnabled = false;
+
/*
* percentage of in-flight messages above which optimize message store is disabled
*/
@@ -868,6 +870,15 @@
this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
}
+ @Override
+ public boolean isAdvancedNetworkStatisticsEnabled() {
+ return this.advancedNetworkStatisticsEnabled;
+ }
+
+ @Override
+ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
+ this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
+ }
@Override
public abstract List<Subscription> getConsumers();
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
index 70f807b..45e3de7 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
@@ -258,4 +258,10 @@
boolean isSendDuplicateFromStoreToDLQ();
void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ);
+
+ // [AMQ-9437]
+ boolean isAdvancedNetworkStatisticsEnabled();
+
+ void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled);
+
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index 6b288a2..85ef367 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -409,6 +409,16 @@
next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ);
}
+ @Override
+ public boolean isAdvancedNetworkStatisticsEnabled() {
+ return next.isAdvancedNetworkStatisticsEnabled();
+ }
+
+ @Override
+ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
+ next.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
+ }
+
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (next instanceof DestinationFilter) {
DestinationFilter filter = (DestinationFilter) next;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
index 9d30c62..dc6b17d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
@@ -46,6 +46,10 @@
protected SizeStatisticImpl messageSize;
protected CountStatisticImpl maxUncommittedExceededCount;
+ // [AMQ-9437] Advanced Statistics are optionally enabled
+ protected CountStatisticImpl networkEnqueues;
+ protected CountStatisticImpl networkDequeues;
+
public DestinationStatistics() {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
@@ -68,6 +72,10 @@
blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control");
messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination");
maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded");
+
+ networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
+ networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");
+
addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
@@ -83,6 +91,9 @@
addStatistic("blockedTime",blockedTime);
addStatistic("messageSize",messageSize);
addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount);
+
+ addStatistic("networkEnqueues", networkEnqueues);
+ addStatistic("networkDequeues", networkDequeues);
}
public CountStatisticImpl getEnqueues() {
@@ -151,6 +162,14 @@
return this.maxUncommittedExceededCount;
}
+ public CountStatisticImpl getNetworkEnqueues() {
+ return networkEnqueues;
+ }
+
+ public CountStatisticImpl getNetworkDequeues() {
+ return networkDequeues;
+ }
+
public void reset() {
if (this.isDoReset()) {
super.reset();
@@ -165,6 +184,8 @@
blockedTime.reset();
messageSize.reset();
maxUncommittedExceededCount.reset();
+ networkEnqueues.reset();
+ networkDequeues.reset();
}
}
@@ -187,6 +208,9 @@
messageSize.setEnabled(enabled);
maxUncommittedExceededCount.setEnabled(enabled);
+ // [AMQ-9437] Advanced Statistics
+ networkEnqueues.setEnabled(enabled);
+ networkDequeues.setEnabled(enabled);
}
public void setParent(DestinationStatistics parent) {
@@ -207,6 +231,8 @@
blockedTime.setParent(parent.blockedTime);
messageSize.setParent(parent.messageSize);
maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
+ networkEnqueues.setParent(parent.networkEnqueues);
+ networkDequeues.setParent(parent.networkDequeues);
} else {
enqueues.setParent(null);
dispatched.setParent(null);
@@ -224,6 +250,8 @@
blockedTime.setParent(null);
messageSize.setParent(null);
maxUncommittedExceededCount.setParent(null);
+ networkEnqueues.setParent(null);
+ networkDequeues.setParent(null);
}
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index e0dc6d0..6946a33 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -371,6 +371,9 @@
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
if (info.isNetworkSubscription()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
+ if(((Destination)node.getRegionDestination()).isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
+ ((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
+ }
}
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 6502a20..0ed6763 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1873,7 +1873,7 @@
// This sends the ack the the journal..
if (!ack.isInTransaction()) {
acknowledge(context, sub, ack, reference);
- dropMessage(reference);
+ dropMessage(context, reference);
} else {
try {
acknowledge(context, sub, ack, reference);
@@ -1882,7 +1882,7 @@
@Override
public void afterCommit() throws Exception {
- dropMessage(reference);
+ dropMessage(context, reference);
wakeup();
}
@@ -1910,11 +1910,16 @@
reference.setAcked(true);
}
- private void dropMessage(QueueMessageReference reference) {
+ private void dropMessage(ConnectionContext context, QueueMessageReference reference) {
//use dropIfLive so we only process the statistics at most one time
if (reference.dropIfLive()) {
getDestinationStatistics().getDequeues().increment();
getDestinationStatistics().getMessages().decrement();
+
+ if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
+ getDestinationStatistics().getNetworkDequeues().increment();
+ }
+
pagedInMessagesLock.writeLock().lock();
try {
pagedInMessages.remove(reference);
@@ -1969,6 +1974,11 @@
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
destinationStatistics.getMessageSize().addSize(msg.getSize());
+
+ if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
+ destinationStatistics.getNetworkEnqueues().increment();
+ }
+
messageDelivered(context, msg);
consumersLock.readLock().lock();
try {
@@ -2115,7 +2125,7 @@
LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong());
if (store != null) {
ConnectionContext connectionContext = createConnectionContext();
- dropMessage(ref);
+ dropMessage(connectionContext, ref);
if (gotToTheStore(ref.getMessage())) {
LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage());
store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1));
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index cad0d3b..a9e0787 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -778,6 +778,11 @@
// misleading metrics.
// destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();
+
+ if(isAdvancedNetworkStatisticsEnabled() && context != null && context.isNetworkConnection()) {
+ destinationStatistics.getNetworkEnqueues().increment();
+ }
+
destinationStatistics.getMessageSize().addSize(message.getSize());
MessageEvaluationContext msgContext = null;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index a5b9724..4403dea 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -449,6 +449,9 @@
destination.getDestinationStatistics().getInflight().subtract(count);
if (info.isNetworkSubscription()) {
destination.getDestinationStatistics().getForwards().add(count);
+ if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
+ destination.getDestinationStatistics().getNetworkDequeues().add(count);
+ }
}
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(count);
@@ -746,6 +749,9 @@
matched.remove(message);
if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment();
+ if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
+ destination.getDestinationStatistics().getNetworkDequeues().increment();
+ }
}
Destination dest = (Destination) message.getRegionDestination();
if (dest != null) {
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index 7230957..e33f13b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -106,7 +106,7 @@
private boolean doOptimzeMessageStorage = true;
private int maxDestinations = -1;
private boolean useTopicSubscriptionInflightStats = true;
-
+ private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437]
/*
* percentage of in-flight messages above which optimize message store is disabled
*/
@@ -306,6 +306,9 @@
if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) {
destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ());
}
+ if (isUpdate("advancedNetworkStatisticsEnabled", includedProperties)) {
+ destination.setAdvancedNetworkStatisticsEnabled(isAdvancedNetworkStatisticsEnabled());
+ }
}
public void baseConfiguration(Broker broker, BaseDestination destination) {
@@ -1175,5 +1178,13 @@
public MessageInterceptorStrategy getMessageInterceptorStrategy() {
return this.messageInterceptorStrategy;
+ }
+
+ public boolean isAdvancedNetworkStatisticsEnabled() {
+ return this.advancedNetworkStatisticsEnabled;
+ }
+
+ public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
+ this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
}
}
diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
index 0f46e08..53341c3 100644
--- a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
+++ b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
@@ -58,6 +58,18 @@
}
@Test
+ public void testModAdvancedNetworkStatistics() throws Exception {
+ final String brokerConfig = configurationSeed + "-policy-ml-broker";
+ applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedNetworkStatistics");
+ startBroker(brokerConfig);
+ assertTrue("broker alive", brokerService.isStarted());
+
+ verifyBooleanField("AMQ.9437", "advancedNetworkStatisticsEnabled", false);
+ applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedNetworkStatistics-mod", SLEEP);
+ verifyBooleanField("AMQ.9437", "advancedNetworkStatisticsEnabled", true);
+ }
+
+ @Test
public void testAddNdMod() throws Exception {
final String brokerConfig = configurationSeed + "-policy-ml-broker";
applyNewConfig(brokerConfig, configurationSeed + "-policy-ml");
@@ -121,6 +133,9 @@
session.createConsumer(session.createQueue(dest));
switch(fieldName) {
+ case "advancedNetworkStatisticsEnabled":
+ assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isAdvancedNetworkStatisticsEnabled());
+ break;
case "sendDuplicateFromStoreToDLQ":
assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isSendDuplicateFromStoreToDLQ());
break;
diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml
new file mode 100644
index 0000000..534f884
--- /dev/null
+++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
+ <plugins>
+ <runtimeConfigurationPlugin checkPeriod="1000" />
+ </plugins>
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry queue="AMQ.9437" advancedNetworkStatisticsEnabled="true"/>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+ </broker>
+</beans>
diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml
new file mode 100644
index 0000000..a6c710e
--- /dev/null
+++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
+ <plugins>
+ <runtimeConfigurationPlugin checkPeriod="1000" />
+ </plugins>
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry queue="AMQ.9437"/>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+ </broker>
+</beans>
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
new file mode 100644
index 0000000..df99bab
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageListener;
+import jakarta.jms.MessageProducer;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.springframework.context.support.AbstractApplicationContext;
+
+@RunWith(value = Parameterized.class)
+public class NetworkAdvancedStatisticsTest extends BaseNetworkTest {
+
+ @Parameterized.Parameters(name="includedDestination={0}, excludedDestination={1}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ { new ActiveMQTopic("include.test.bar"), new ActiveMQTopic("exclude.test.bar")},
+ { new ActiveMQQueue("include.test.foo"), new ActiveMQQueue("exclude.test.foo")}});
+ }
+
+ protected static final int MESSAGE_COUNT = 10;
+
+ protected AbstractApplicationContext context;
+ protected String consumerName = "durableSubs";
+
+ private final ActiveMQDestination includedDestination;
+ private final ActiveMQDestination excludedDestination;
+
+ public NetworkAdvancedStatisticsTest(ActiveMQDestination includedDestionation, ActiveMQDestination excludedDestination) {
+ this.includedDestination = includedDestionation;
+ this.excludedDestination = excludedDestination;
+ }
+
+ @Override
+ protected void doSetUp(boolean deleteAllMessages) throws Exception {
+ super.doSetUp(deleteAllMessages);
+ }
+
+ @Override
+ protected String getRemoteBrokerURI() {
+ return "org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml";
+ }
+
+ @Override
+ protected String getLocalBrokerURI() {
+ return "org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml";
+ }
+
+ //Added for AMQ-9437 test advancedStatistics for networkEnqueue and networkDequeue
+ @Test(timeout = 60 * 1000)
+ public void testNetworkAdvancedStatistics() throws Exception {
+
+ // create a remote durable consumer to create demand
+ MessageConsumer remoteConsumer;
+ if(includedDestination.isTopic()) {
+ remoteConsumer = remoteSession.createDurableSubscriber(ActiveMQTopic.class.cast(includedDestination), consumerName);
+ } else {
+ remoteConsumer = remoteSession.createConsumer(includedDestination);
+ remoteConsumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ }
+ });
+ }
+ Thread.sleep(1000);
+
+ MessageProducer producer = localSession.createProducer(includedDestination);
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message test = localSession.createTextMessage("test-" + i);
+ producer.send(test);
+ }
+ Thread.sleep(1000);
+
+ MessageProducer producerExcluded = localSession.createProducer(excludedDestination);
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ Message test = localSession.createTextMessage("test-" + i);
+ producerExcluded.send(test);
+ }
+ Thread.sleep(1000);
+
+ //Make sure stats are correct for local -> remote
+ assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueues().getCount());
+ assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeues().getCount());
+ assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getForwards().getCount());
+ assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
+ assertEquals(0, localBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
+ assertEquals(MESSAGE_COUNT, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueues().getCount());
+ assertEquals(0, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getForwards().getCount());
+ assertEquals(MESSAGE_COUNT, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
+ assertEquals(0, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
+
+ // Make sure stats do not increment for local-only
+ assertEquals(MESSAGE_COUNT, localBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueues().getCount());
+ assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount());
+ assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
+ assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
+ assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueues().getCount());
+ assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getDequeues().getCount());
+ assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount());
+ assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
+ assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
+
+ if(includedDestination.isTopic()) {
+ assertTrue(Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
+ }
+ }, 10000, 500));
+ } else {
+ assertTrue(Wait.waitFor(new Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ // The number of message that remain is due to the exclude queue
+ return localBroker.getAdminView().getTotalMessageCount() == MESSAGE_COUNT;
+ }
+ }, 10000, 500));
+ }
+ remoteConsumer.close();
+ }
+
+ protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception {
+
+ final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+ final NetworkBridge remoteBridge = remoteBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
+ 0 == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() &&
+ expectedRemoteSent == remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
+ 0 == remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
+ }
+ }));
+ }
+}
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml
new file mode 100644
index 0000000..b543169
--- /dev/null
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+<broker brokerName="localBroker" start="false" persistent="true" useShutdownHook="false" monitorConnectionSplits="true" xmlns="http://activemq.apache.org/schema/core">
+
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry queue="exclude.>" advancedNetworkStatisticsEnabled="true"/>
+ <policyEntry queue="include.>" advancedNetworkStatisticsEnabled="true"/>
+ <policyEntry topic="ActiveMQ.Advisory.>" />
+ <policyEntry topic="exclude.>" advancedNetworkStatisticsEnabled="true"/>
+ <policyEntry topic="include.>" advancedNetworkStatisticsEnabled="true"/>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+
+ <networkConnectors>
+ <networkConnector uri="static:(tcp://localhost:61617)"
+ dynamicOnly = "false"
+ conduitSubscriptions = "true"
+ decreaseNetworkConsumerPriority = "false"
+ name="networkConnector">
+ <dynamicallyIncludedDestinations>
+ <queue physicalName="include.test.foo"/>
+ <topic physicalName="include.test.bar"/>
+ </dynamicallyIncludedDestinations>
+ <excludedDestinations>
+ <queue physicalName="exclude.test.foo"/>
+ <topic physicalName="exclude.test.bar"/>
+ </excludedDestinations>
+ </networkConnector>
+ </networkConnectors>
+
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:61616"/>
+ </transportConnectors>
+
+ </broker>
+</beans>
+
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml
new file mode 100644
index 0000000..a9cf93f
--- /dev/null
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:amq="http://activemq.apache.org/schema/core"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+ <broker brokerName="remoteBroker" start="false" useJmx="false" persistent="true" useShutdownHook="false" monitorConnectionSplits="false" xmlns="http://activemq.apache.org/schema/core">
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry queue="exclude.>" advancedNetworkStatisticsEnabled="true"/>
+ <policyEntry queue="include.>" advancedNetworkStatisticsEnabled="true"/>
+ <policyEntry topic="ActiveMQ.Advisory.>" />
+ <policyEntry topic="exclude.>" advancedNetworkStatisticsEnabled="true"/>
+ <policyEntry topic="include.>" advancedNetworkStatisticsEnabled="true"/>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+ <networkConnectors>
+ <networkConnector uri="static:(tcp://localhost:61616)" />
+ </networkConnectors>
+
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:61617"/>
+ </transportConnectors>
+ </broker>
+
+</beans>
+