QPID-8621 - [Broker-J] Add operation "resetStatistics" to Producer (#174)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java b/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
index 7235e6a..316d124 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/Producer.java
@@ -47,11 +47,14 @@
     @DerivedAttribute(description = "Destination type (exchange or queue)")
     DestinationType getDestinationType();
 
-    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES)
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.MESSAGES, resettable = true)
     int getMessagesOut();
 
-    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES)
+    @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = StatisticUnit.BYTES, resettable = true)
     long getBytesOut();
 
+    @ManagedOperation(description = "Resets producer statistics", changesConfiguredObjectState = true)
+    void resetStatistics();
+
     ListenableFuture<Void> deleteNoChecks();
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
index 431b4e3..52e350f 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java
@@ -149,4 +149,11 @@
     {
         return _bytesOut.get();
     }
+
+    @Override
+    public void resetStatistics()
+    {
+        _bytesOut.set(0);
+        _messagesOut.set(0);
+    }
 }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/session/AbstractAMQPSessionTest.java b/broker-core/src/test/java/org/apache/qpid/server/session/AbstractAMQPSessionTest.java
index fb5f6a8..5e9c9e3 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/session/AbstractAMQPSessionTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/session/AbstractAMQPSessionTest.java
@@ -41,6 +41,8 @@
 import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.Connection;
 import org.apache.qpid.server.model.Model;
+import org.apache.qpid.server.model.Producer;
+import org.apache.qpid.server.model.PublishingLink;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.security.auth.TestPrincipalUtils;
@@ -74,7 +76,6 @@
         when(_connection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
         when(_connection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
         mockAMQPSession = new MockAMQPSession(_connection, 123);
-
     }
 
     @AfterEach
@@ -154,6 +155,36 @@
         assertEquals(0L, statisticsAfterReset.get("transactedMessagesOut"));
     }
 
+    @Test
+    public void producer() throws Exception
+    {
+        final PublishingLink link = mock(PublishingLink.class);
+        when(link.getName()).thenReturn("test-link");
+        when(link.getType()).thenReturn(PublishingLink.TYPE_LINK);
+
+        final Queue<?> queue = mock(Queue.class);
+
+        final Producer<?> producer = mockAMQPSession.addProducer(link, queue);
+
+        Map<String, Object> statistics = producer.getStatistics();
+
+        assertEquals(0, statistics.get("messagesOut"));
+        assertEquals(0L, statistics.get("bytesOut"));
+
+        producer.registerMessageDelivered(100L);
+
+        statistics = producer.getStatistics();
+
+        assertEquals(1, statistics.get("messagesOut"));
+        assertEquals(100L, statistics.get("bytesOut"));
+
+        producer.resetStatistics();
+        statistics = producer.getStatistics();
+
+        assertEquals(0, statistics.get("messagesOut"));
+        assertEquals(0L, statistics.get("bytesOut"));
+    }
+
     private static class MockAMQPSession extends AbstractAMQPSession
     {
 
diff --git a/broker-core/src/test/java/org/apache/qpid/server/exchange/ProducerTest.java b/broker-core/src/test/java/org/apache/qpid/server/session/ProducerTest.java
similarity index 97%
rename from broker-core/src/test/java/org/apache/qpid/server/exchange/ProducerTest.java
rename to broker-core/src/test/java/org/apache/qpid/server/session/ProducerTest.java
index 0635bda..adc5829 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/exchange/ProducerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/session/ProducerTest.java
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.exchange;
+package org.apache.qpid.server.session;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.mock;
@@ -31,6 +31,7 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import org.apache.qpid.server.exchange.ExchangeDefaults;
 import org.apache.qpid.server.message.MessageSender;
 import org.apache.qpid.server.model.BrokerTestHelper;
 import org.apache.qpid.server.model.Exchange;