[AMQ-9437] AdvancedDestination statistics networkEnqueue and networkDequeue counters
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
index 12e9f77..c8af4a6 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java
@@ -190,9 +190,16 @@
                 dest.clearPendingMessages(opCount);
                 dest.getDestinationStatistics().getEnqueues().add(opCount);
                 dest.getDestinationStatistics().getMessages().add(opCount);
+
+                if(dest.isAdvancedNetworkStatisticsEnabled() && transactionBroker.context != null && transactionBroker.context.isNetworkConnection()) {
+                    dest.getDestinationStatistics().getNetworkEnqueues().add(opCount);
+                }
                 LOG.debug("cleared pending from afterCommit: {}", destination);
             } else {
                 dest.getDestinationStatistics().getDequeues().add(opCount);
+                if(dest.isAdvancedNetworkStatisticsEnabled() && transactionBroker.context != null && transactionBroker.context.isNetworkConnection()) {
+                    dest.getDestinationStatistics().getNetworkDequeues().add(opCount);
+                }
             }
         }
     }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index 8abcc67..02a7f65 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -599,4 +599,25 @@
     public long getMaxUncommittedExceededCount() {
         return destination.getDestinationStatistics().getMaxUncommittedExceededCount().getCount();
     }
+
+    @Override
+    public boolean isAdvancedNetworkStatisticsEnabled() {
+        return destination.isAdvancedNetworkStatisticsEnabled();
+    }
+
+    @Override
+    public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
+        destination.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
+    }
+
+    @Override
+    public long getNetworkEnqueues() {
+        return destination.getDestinationStatistics().getNetworkEnqueues().getCount();
+    }
+
+    @Override
+    public long getNetworkDequeues() {
+        return destination.getDestinationStatistics().getNetworkDequeues().getCount();
+    }
+
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
index 45ed51b..328ddb0 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java
@@ -481,4 +481,16 @@
 
     @MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination")
     long getMaxUncommittedExceededCount();
+
+    @MBeanInfo("Query Advanced Network Statistics flag")
+    boolean isAdvancedNetworkStatisticsEnabled();
+
+    @MBeanInfo("Toggle Advanced Network Statistics flag")
+    void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled);
+
+    @MBeanInfo("Number of messages sent to the destination via network connection")
+    long getNetworkEnqueues();
+
+    @MBeanInfo("Number of messages acknowledged from the destination via network connection")
+    long getNetworkDequeues();
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 4ca3913..e34f23a 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -110,6 +110,8 @@
     protected final Scheduler scheduler;
     private boolean disposed = false;
     private boolean doOptimzeMessageStorage = true;
+    private boolean advancedNetworkStatisticsEnabled = false;
+
     /*
      * percentage of in-flight messages above which optimize message store is disabled
      */
@@ -868,6 +870,15 @@
         this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
     }
 
+    @Override
+    public boolean isAdvancedNetworkStatisticsEnabled() {
+        return this.advancedNetworkStatisticsEnabled;
+    }
+
+    @Override
+    public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
+        this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
+    }
 
     @Override
     public abstract List<Subscription> getConsumers();
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
index 70f807b..45e3de7 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java
@@ -258,4 +258,10 @@
     boolean isSendDuplicateFromStoreToDLQ();
 
     void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ);
+
+    // [AMQ-9437]
+    boolean isAdvancedNetworkStatisticsEnabled();
+
+    void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled);
+
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
index 6b288a2..85ef367 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
@@ -409,6 +409,16 @@
         next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ);
     }
 
+    @Override
+    public boolean isAdvancedNetworkStatisticsEnabled() {
+        return next.isAdvancedNetworkStatisticsEnabled();
+    }
+
+    @Override
+    public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
+        next.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
+    }
+
     public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
         if (next instanceof DestinationFilter) {
             DestinationFilter filter = (DestinationFilter) next;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
index 9d30c62..dc6b17d 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
@@ -46,6 +46,10 @@
     protected SizeStatisticImpl messageSize;
     protected CountStatisticImpl maxUncommittedExceededCount;
 
+    // [AMQ-9437] Advanced Statistics are optionally enabled
+    protected CountStatisticImpl networkEnqueues;
+    protected CountStatisticImpl networkDequeues;
+
     public DestinationStatistics() {
 
         enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
@@ -68,6 +72,10 @@
         blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control");
         messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination");
         maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded");
+
+        networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
+        networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");
+
         addStatistic("enqueues", enqueues);
         addStatistic("dispatched", dispatched);
         addStatistic("dequeues", dequeues);
@@ -83,6 +91,9 @@
         addStatistic("blockedTime",blockedTime);
         addStatistic("messageSize",messageSize);
         addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount);
+
+        addStatistic("networkEnqueues", networkEnqueues);
+        addStatistic("networkDequeues", networkDequeues);
     }
 
     public CountStatisticImpl getEnqueues() {
@@ -151,6 +162,14 @@
         return this.maxUncommittedExceededCount;
     }
 
+    public CountStatisticImpl getNetworkEnqueues() {
+        return networkEnqueues;
+    }
+
+    public CountStatisticImpl getNetworkDequeues() {
+        return networkDequeues;
+    }
+
     public void reset() {
         if (this.isDoReset()) {
             super.reset();
@@ -165,6 +184,8 @@
             blockedTime.reset();
             messageSize.reset();
             maxUncommittedExceededCount.reset();
+            networkEnqueues.reset();
+            networkDequeues.reset();
         }
     }
 
@@ -187,6 +208,9 @@
         messageSize.setEnabled(enabled);
         maxUncommittedExceededCount.setEnabled(enabled);
 
+        // [AMQ-9437] Advanced Statistics
+        networkEnqueues.setEnabled(enabled);
+        networkDequeues.setEnabled(enabled);
     }
 
     public void setParent(DestinationStatistics parent) {
@@ -207,6 +231,8 @@
             blockedTime.setParent(parent.blockedTime);
             messageSize.setParent(parent.messageSize);
             maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
+            networkEnqueues.setParent(parent.networkEnqueues);
+            networkDequeues.setParent(parent.networkDequeues);
         } else {
             enqueues.setParent(null);
             dispatched.setParent(null);
@@ -224,6 +250,8 @@
             blockedTime.setParent(null);
             messageSize.setParent(null);
             maxUncommittedExceededCount.setParent(null);
+            networkEnqueues.setParent(null);
+            networkDequeues.setParent(null);
         }
     }
 
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index e0dc6d0..6946a33 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -371,6 +371,9 @@
         ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
         if (info.isNetworkSubscription()) {
             ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
+            if(((Destination)node.getRegionDestination()).isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
+                ((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
+            }
         }
     }
 
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 6502a20..0ed6763 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1873,7 +1873,7 @@
         // This sends the ack the the journal..
         if (!ack.isInTransaction()) {
             acknowledge(context, sub, ack, reference);
-            dropMessage(reference);
+            dropMessage(context, reference);
         } else {
             try {
                 acknowledge(context, sub, ack, reference);
@@ -1882,7 +1882,7 @@
 
                     @Override
                     public void afterCommit() throws Exception {
-                        dropMessage(reference);
+                        dropMessage(context, reference);
                         wakeup();
                     }
 
@@ -1910,11 +1910,16 @@
         reference.setAcked(true);
     }
 
-    private void dropMessage(QueueMessageReference reference) {
+    private void dropMessage(ConnectionContext context, QueueMessageReference reference) {
         //use dropIfLive so we only process the statistics at most one time
         if (reference.dropIfLive()) {
             getDestinationStatistics().getDequeues().increment();
             getDestinationStatistics().getMessages().decrement();
+
+            if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
+                getDestinationStatistics().getNetworkDequeues().increment();
+            }
+
             pagedInMessagesLock.writeLock().lock();
             try {
                 pagedInMessages.remove(reference);
@@ -1969,6 +1974,11 @@
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();
         destinationStatistics.getMessageSize().addSize(msg.getSize());
+
+        if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
+            destinationStatistics.getNetworkEnqueues().increment();
+        }
+
         messageDelivered(context, msg);
         consumersLock.readLock().lock();
         try {
@@ -2115,7 +2125,7 @@
                             LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong());
                             if (store != null) {
                                 ConnectionContext connectionContext = createConnectionContext();
-                                dropMessage(ref);
+                                dropMessage(connectionContext, ref);
                                 if (gotToTheStore(ref.getMessage())) {
                                     LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage());
                                     store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1));
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index cad0d3b..a9e0787 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -778,6 +778,11 @@
         // misleading metrics.
         // destinationStatistics.getMessages().increment();
         destinationStatistics.getEnqueues().increment();
+
+        if(isAdvancedNetworkStatisticsEnabled() && context != null && context.isNetworkConnection()) {
+            destinationStatistics.getNetworkEnqueues().increment();
+        }
+
         destinationStatistics.getMessageSize().addSize(message.getSize());
         MessageEvaluationContext msgContext = null;
 
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index a5b9724..4403dea 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -449,6 +449,9 @@
         destination.getDestinationStatistics().getInflight().subtract(count);
         if (info.isNetworkSubscription()) {
             destination.getDestinationStatistics().getForwards().add(count);
+            if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
+                destination.getDestinationStatistics().getNetworkDequeues().add(count);
+            }
         }
         if (ack.isExpiredAck()) {
             destination.getDestinationStatistics().getExpired().add(count);
@@ -746,6 +749,9 @@
             matched.remove(message);
             if (destination != null) {
                 destination.getDestinationStatistics().getDequeues().increment();
+                if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
+                    destination.getDestinationStatistics().getNetworkDequeues().increment();
+                }
             }
             Destination dest = (Destination) message.getRegionDestination();
             if (dest != null) {
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
index 7230957..e33f13b 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
@@ -106,7 +106,7 @@
     private boolean doOptimzeMessageStorage = true;
     private int maxDestinations = -1;
     private boolean useTopicSubscriptionInflightStats = true;
-
+    private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437]
     /*
      * percentage of in-flight messages above which optimize message store is disabled
      */
@@ -306,6 +306,9 @@
         if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) {
             destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ());
         }
+        if (isUpdate("advancedNetworkStatisticsEnabled", includedProperties)) {
+            destination.setAdvancedNetworkStatisticsEnabled(isAdvancedNetworkStatisticsEnabled());
+        }
     }
 
     public void baseConfiguration(Broker broker, BaseDestination destination) {
@@ -1175,5 +1178,13 @@
 
     public MessageInterceptorStrategy getMessageInterceptorStrategy() {
         return this.messageInterceptorStrategy;
+    } 
+
+    public boolean isAdvancedNetworkStatisticsEnabled() {
+        return this.advancedNetworkStatisticsEnabled;
+    }
+
+    public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
+        this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
     }
 }
diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
index 0f46e08..53341c3 100644
--- a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
+++ b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java
@@ -58,6 +58,18 @@
     }
 
     @Test
+    public void testModAdvancedNetworkStatistics() throws Exception {
+        final String brokerConfig = configurationSeed + "-policy-ml-broker";
+        applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedNetworkStatistics");
+        startBroker(brokerConfig);
+        assertTrue("broker alive", brokerService.isStarted());
+
+        verifyBooleanField("AMQ.9437", "advancedNetworkStatisticsEnabled", false);
+        applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedNetworkStatistics-mod", SLEEP);
+        verifyBooleanField("AMQ.9437", "advancedNetworkStatisticsEnabled", true);
+    }
+
+    @Test
     public void testAddNdMod() throws Exception {
         final String brokerConfig = configurationSeed + "-policy-ml-broker";
         applyNewConfig(brokerConfig, configurationSeed + "-policy-ml");
@@ -121,6 +133,9 @@
             session.createConsumer(session.createQueue(dest));
 
             switch(fieldName) {
+            case "advancedNetworkStatisticsEnabled":
+                assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isAdvancedNetworkStatisticsEnabled());
+                break;
             case "sendDuplicateFromStoreToDLQ":
                 assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isSendDuplicateFromStoreToDLQ());
                 break;
diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml
new file mode 100644
index 0000000..534f884
--- /dev/null
+++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+        xmlns="http://www.springframework.org/schema/beans"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+  <broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
+    <plugins>
+      <runtimeConfigurationPlugin checkPeriod="1000" />
+    </plugins>
+    <destinationPolicy>
+      <policyMap>
+        <policyEntries>
+          <policyEntry queue="AMQ.9437" advancedNetworkStatisticsEnabled="true"/>
+        </policyEntries>
+      </policyMap>
+    </destinationPolicy>
+  </broker>
+</beans>
diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml
new file mode 100644
index 0000000..a6c710e
--- /dev/null
+++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+        xmlns="http://www.springframework.org/schema/beans"
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+  <broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
+    <plugins>
+      <runtimeConfigurationPlugin checkPeriod="1000" />
+    </plugins>
+    <destinationPolicy>
+      <policyMap>
+        <policyEntries>
+          <policyEntry queue="AMQ.9437"/>
+        </policyEntries>
+      </policyMap>
+    </destinationPolicy>
+  </broker>
+</beans>
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
new file mode 100644
index 0000000..df99bab
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageListener;
+import jakarta.jms.MessageProducer;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.springframework.context.support.AbstractApplicationContext;
+
+@RunWith(value = Parameterized.class)
+public class NetworkAdvancedStatisticsTest extends BaseNetworkTest {
+
+    @Parameterized.Parameters(name="includedDestination={0}, excludedDestination={1}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                { new ActiveMQTopic("include.test.bar"), new ActiveMQTopic("exclude.test.bar")},
+                { new ActiveMQQueue("include.test.foo"), new ActiveMQQueue("exclude.test.foo")}});
+    }
+
+    protected static final int MESSAGE_COUNT = 10;
+
+    protected AbstractApplicationContext context;
+    protected String consumerName = "durableSubs";
+
+    private final ActiveMQDestination includedDestination;
+    private final ActiveMQDestination excludedDestination;
+
+    public NetworkAdvancedStatisticsTest(ActiveMQDestination includedDestionation, ActiveMQDestination excludedDestination) {
+        this.includedDestination = includedDestionation;
+        this.excludedDestination = excludedDestination;
+    }
+
+    @Override
+    protected void doSetUp(boolean deleteAllMessages) throws Exception {
+        super.doSetUp(deleteAllMessages);
+    }
+
+    @Override
+    protected String getRemoteBrokerURI() {
+        return "org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml";
+    }
+
+    @Override
+    protected String getLocalBrokerURI() {
+        return "org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml";
+    }
+
+    //Added for AMQ-9437 test advancedStatistics for networkEnqueue and networkDequeue
+    @Test(timeout = 60 * 1000)
+    public void testNetworkAdvancedStatistics() throws Exception {
+
+        // create a remote durable consumer to create demand
+        MessageConsumer remoteConsumer;
+        if(includedDestination.isTopic()) {
+            remoteConsumer = remoteSession.createDurableSubscriber(ActiveMQTopic.class.cast(includedDestination), consumerName);
+        } else {
+            remoteConsumer = remoteSession.createConsumer(includedDestination);
+            remoteConsumer.setMessageListener(new MessageListener() {                
+                @Override
+                public void onMessage(Message message) {
+                }
+            });
+        }
+        Thread.sleep(1000);
+
+        MessageProducer producer = localSession.createProducer(includedDestination);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message test = localSession.createTextMessage("test-" + i);
+            producer.send(test);
+        }
+        Thread.sleep(1000);
+
+        MessageProducer producerExcluded = localSession.createProducer(excludedDestination);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            Message test = localSession.createTextMessage("test-" + i);
+            producerExcluded.send(test);
+        }
+        Thread.sleep(1000);
+
+        //Make sure stats are correct for local -> remote
+        assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueues().getCount());
+        assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeues().getCount());
+        assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getForwards().getCount());
+        assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
+        assertEquals(0, localBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
+        assertEquals(MESSAGE_COUNT, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueues().getCount());
+        assertEquals(0, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getForwards().getCount());
+        assertEquals(MESSAGE_COUNT, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
+        assertEquals(0, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
+
+        // Make sure stats do not increment for local-only
+        assertEquals(MESSAGE_COUNT, localBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueues().getCount());
+        assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount());
+        assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
+        assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
+        assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueues().getCount());
+        assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getDequeues().getCount());
+        assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount());
+        assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
+        assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
+
+        if(includedDestination.isTopic()) {
+            assertTrue(Wait.waitFor(new Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
+                }
+            }, 10000, 500));
+        } else {
+            assertTrue(Wait.waitFor(new Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    // The number of message that remain is due to the exclude queue
+                    return localBroker.getAdminView().getTotalMessageCount() == MESSAGE_COUNT;
+                }
+            }, 10000, 500));
+        }
+        remoteConsumer.close();
+    }
+
+    protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception {
+
+        final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+        final NetworkBridge remoteBridge = remoteBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
+                       0 == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() &&
+                       expectedRemoteSent == remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
+                       0 == remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
+            }
+        }));
+    }
+}
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml
new file mode 100644
index 0000000..b543169
--- /dev/null
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
+  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+<broker brokerName="localBroker" start="false" persistent="true" useShutdownHook="false" monitorConnectionSplits="true" xmlns="http://activemq.apache.org/schema/core">
+   
+    <destinationPolicy>
+      <policyMap>
+        <policyEntries>
+          <policyEntry queue="exclude.>" advancedNetworkStatisticsEnabled="true"/>
+          <policyEntry queue="include.>" advancedNetworkStatisticsEnabled="true"/> 
+          <policyEntry topic="ActiveMQ.Advisory.>" />
+          <policyEntry topic="exclude.>" advancedNetworkStatisticsEnabled="true"/>
+          <policyEntry topic="include.>" advancedNetworkStatisticsEnabled="true"/> 
+        </policyEntries>
+      </policyMap>
+    </destinationPolicy>
+
+    <networkConnectors>
+      <networkConnector uri="static:(tcp://localhost:61617)"
+         dynamicOnly = "false"
+         conduitSubscriptions = "true"
+         decreaseNetworkConsumerPriority = "false"
+         name="networkConnector">
+      	<dynamicallyIncludedDestinations>
+      		<queue physicalName="include.test.foo"/>
+      		<topic physicalName="include.test.bar"/>
+      	</dynamicallyIncludedDestinations>
+      	<excludedDestinations>
+      		<queue physicalName="exclude.test.foo"/>
+      		<topic physicalName="exclude.test.bar"/>
+      	</excludedDestinations>
+      </networkConnector>
+    </networkConnectors>
+    
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:61616"/>
+    </transportConnectors>
+    
+  </broker>
+</beans>
+
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml
new file mode 100644
index 0000000..a9cf93f
--- /dev/null
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans 
+  xmlns="http://www.springframework.org/schema/beans" 
+  xmlns:amq="http://activemq.apache.org/schema/core"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker brokerName="remoteBroker" start="false" useJmx="false" persistent="true" useShutdownHook="false" monitorConnectionSplits="false" xmlns="http://activemq.apache.org/schema/core">
+    <destinationPolicy>
+      <policyMap>
+        <policyEntries>
+          <policyEntry queue="exclude.>" advancedNetworkStatisticsEnabled="true"/>
+          <policyEntry queue="include.>" advancedNetworkStatisticsEnabled="true"/> 
+          <policyEntry topic="ActiveMQ.Advisory.>" />
+          <policyEntry topic="exclude.>" advancedNetworkStatisticsEnabled="true"/>
+          <policyEntry topic="include.>" advancedNetworkStatisticsEnabled="true"/> 
+        </policyEntries>
+      </policyMap>
+    </destinationPolicy>
+    <networkConnectors>
+      <networkConnector uri="static:(tcp://localhost:61616)" />
+      </networkConnectors>
+
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:61617"/>
+    </transportConnectors>
+  </broker>
+
+</beans>
+