Adding unit tests
(cherry picked from commit 781baa10586c4da09e48e4ecb953892efcb6c3b9)
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java
index dd96f6d..e871277 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java
@@ -38,6 +38,7 @@
private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
private final Supplier<Admin> adminSupplier;
private TopologyContext topologyContext;
+
private KafkaOffsetPartitionAndTopicMetrics kafkaOffsetPartitionAndTopicMetrics;
@@ -60,4 +61,8 @@
topologyContext.registerMetricSet("kafkaOffset", topicPartitionMetricSet);
}
+ public KafkaOffsetPartitionAndTopicMetrics getKafkaOffsetPartitionAndTopicMetrics() {
+ return kafkaOffsetPartitionAndTopicMetrics;
+ }
+
}
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java
index cd0fbfc..900bb86 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java
@@ -41,7 +41,7 @@
private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetTopicMetrics.class);
- private String topic;
+ private final String topic;
long totalSpoutLag;
long totalEarliestTimeOffset;
long totalLatestTimeOffset;
@@ -115,25 +115,4 @@
metrics.put(topic + "/" + "totalRecordsInPartitions", totalRecordsInPartitionsGauge);
return metrics;
}
-
- private class TopicMetrics {
- long totalSpoutLag = 0L;
- long totalEarliestTimeOffset = 0L;
- long totalLatestTimeOffset = 0L;
- long totalLatestEmittedOffset = 0L;
- long totalLatestCompletedOffset = 0L;
- long totalRecordsInPartitions = 0L;
-
- public void incrementTotalSpoutLag(long offset) {
- totalSpoutLag += offset;
- }
-
- public void incrementTotalEarliestTimeOffset(long offset) {
- totalEarliestTimeOffset += offset;
- }
-
- public void incrementTotalLatestTimeOffset(long offset) {
- totalLatestTimeOffset += offset;
- }
- }
}
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
index b4cab2a..4e5702a 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
@@ -29,9 +29,12 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
import java.util.HashMap;
import java.util.Map;
+import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -65,6 +68,7 @@
private final SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
private final long commitOffsetPeriodMs = 2_000;
private Consumer<String, String> consumerSpy;
+ private Admin adminSpy;
private KafkaSpout<String, String> spout;
private final int maxPollRecords = 10;
@@ -78,9 +82,12 @@
.build();
ClientFactory<String, String> clientFactory = new ClientFactoryDefault<>();
this.consumerSpy = spy(clientFactory.createConsumer(spoutConfig.getKafkaProps()));
+ this.adminSpy = spy(clientFactory.createAdmin(spoutConfig.getKafkaProps()));
ClientFactory<String, String> clientFactoryMock = mock(ClientFactory.class);
when(clientFactoryMock.createConsumer(any()))
.thenReturn(consumerSpy);
+ when(clientFactoryMock.createAdmin(any()))
+ .thenReturn(adminSpy);
this.spout = new KafkaSpout<>(spoutConfig, clientFactoryMock, new TopicAssigner());
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitExtension.getKafkaUnit(), SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, topologyContext, collector);
@@ -147,4 +154,23 @@
//With earliest, the spout should also resume where it left off, rather than restart at the earliest offset.
doReactivationTest(FirstPollOffsetStrategy.EARLIEST);
}
+
+ @Test
+ public void testSpoutMustHandleGettingMetricsWhileDeactivated() throws Exception {
+ //Storm will try to get metrics from the spout even while deactivated, the spout must be able to handle this
+ prepareSpout(10, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
+
+ for (int i = 0; i < 5; i++) {
+ KafkaSpoutMessageId msgId = emitOne();
+ spout.ack(msgId);
+ }
+
+ spout.deactivate();
+
+ Map<String, Metric> offsetMetric = spout.getKafkaOffsetMetricManager().getKafkaOffsetPartitionAndTopicMetrics().getMetrics();
+ Long partitionLag = (Long) ((Gauge) offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC + "/partition_0/spoutLag")).getValue();
+ Long spoutLag = (Long) ((Gauge) offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC + "/totalSpoutLag")).getValue();
+ assertThat(partitionLag, is(5L));
+ assertThat(spoutLag, is(5L));
+ }
}