Adding unit tests

(cherry picked from commit 781baa10586c4da09e48e4ecb953892efcb6c3b9)
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java
index dd96f6d..e871277 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java
@@ -38,6 +38,7 @@
     private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
     private final Supplier<Admin> adminSupplier;
     private TopologyContext topologyContext;
+
     private KafkaOffsetPartitionAndTopicMetrics kafkaOffsetPartitionAndTopicMetrics;
 
 
@@ -60,4 +61,8 @@
         topologyContext.registerMetricSet("kafkaOffset", topicPartitionMetricSet);
     }
 
+    public KafkaOffsetPartitionAndTopicMetrics getKafkaOffsetPartitionAndTopicMetrics() {
+        return kafkaOffsetPartitionAndTopicMetrics;
+    }
+
 }
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java
index cd0fbfc..900bb86 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java
@@ -41,7 +41,7 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetTopicMetrics.class);
 
-    private String topic;
+    private final String topic;
     long totalSpoutLag;
     long totalEarliestTimeOffset;
     long totalLatestTimeOffset;
@@ -115,25 +115,4 @@
         metrics.put(topic + "/" + "totalRecordsInPartitions", totalRecordsInPartitionsGauge);
         return metrics;
     }
-
-    private class TopicMetrics {
-        long totalSpoutLag = 0L;
-        long totalEarliestTimeOffset = 0L;
-        long totalLatestTimeOffset = 0L;
-        long totalLatestEmittedOffset = 0L;
-        long totalLatestCompletedOffset = 0L;
-        long totalRecordsInPartitions = 0L;
-
-        public void incrementTotalSpoutLag(long offset) {
-            totalSpoutLag += offset;
-        }
-
-        public void incrementTotalEarliestTimeOffset(long offset) {
-            totalEarliestTimeOffset += offset;
-        }
-
-        public void incrementTotalLatestTimeOffset(long offset) {
-            totalLatestTimeOffset += offset;
-        }
-    }
 }
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
index b4cab2a..4e5702a 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
@@ -29,9 +29,12 @@
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -65,6 +68,7 @@
     private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
     private final long commitOffsetPeriodMs = 2_000;
     private Consumer<String, String> consumerSpy;
+    private Admin adminSpy;
     private KafkaSpout<String, String> spout;
     private final int maxPollRecords = 10;
 
@@ -78,9 +82,12 @@
                 .build();
         ClientFactory<String, String> clientFactory = new ClientFactoryDefault<>();
         this.consumerSpy = spy(clientFactory.createConsumer(spoutConfig.getKafkaProps()));
+        this.adminSpy = spy(clientFactory.createAdmin(spoutConfig.getKafkaProps()));
         ClientFactory<String, String> clientFactoryMock = mock(ClientFactory.class);
         when(clientFactoryMock.createConsumer(any()))
             .thenReturn(consumerSpy);
+        when(clientFactoryMock.createAdmin(any()))
+                .thenReturn(adminSpy);
         this.spout = new KafkaSpout<>(spoutConfig, clientFactoryMock, new TopicAssigner());
         SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitExtension.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
         SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector);
@@ -147,4 +154,23 @@
         //With earliest, the spout should also resume where it left off, rather than restart at the earliest offset.
         doReactivationTest(FirstPollOffsetStrategy.EARLIEST);
     }
+
+    @Test
+    public void testSpoutMustHandleGettingMetricsWhileDeactivated() throws Exception {
+        //Storm will try to get metrics from the spout even while deactivated, the spout must be able to handle this
+        prepareSpout(10, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
+
+        for (int i = 0; i < 5; i++) {
+            KafkaSpoutMessageId msgId = emitOne();
+            spout.ack(msgId);
+        }
+
+        spout.deactivate();
+
+        Map<String, Metric> offsetMetric = spout.getKafkaOffsetMetricManager().getKafkaOffsetPartitionAndTopicMetrics().getMetrics();
+        Long partitionLag = (Long) ((Gauge) offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC + "/partition_0/spoutLag")).getValue();
+        Long spoutLag = (Long) ((Gauge) offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC + "/totalSpoutLag")).getValue();
+        assertThat(partitionLag, is(5L));
+        assertThat(spoutLag, is(5L));
+    }
 }