Merge pull request #3167 from dandsager1/STORM-3538

STORM-3538 Add Meter for sendSupervisorAssignments exception
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
index da84979..496e1d8 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -24,6 +24,7 @@
 import java.util.function.Supplier;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.internal.OffsetManager;
 import org.apache.storm.metric.api.IMetric;
 import org.slf4j.Logger;
@@ -76,8 +77,17 @@
         Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
         Set<TopicPartition> topicPartitions = offsetManagers.keySet();
 
-        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
-        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
+        Map<TopicPartition, Long> beginningOffsets;
+        Map<TopicPartition, Long> endOffsets;
+
+        try {
+            beginningOffsets = consumer.beginningOffsets(topicPartitions);
+            endOffsets = consumer.endOffsets(topicPartitions);
+        } catch (RetriableException e) {
+            LOG.warn("Failed to get offsets from Kafka! Will retry on next metrics tick.", e);
+            return null;
+        }
+
         //map to hold partition level and topic level metrics
         Map<String, Long> result = new HashMap<>();
 
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 512d274..d7f563f 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -21,17 +21,14 @@
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyListOf;
 import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
 
 import java.util.HashSet;
 import java.util.List;
@@ -39,8 +36,10 @@
 import java.util.Set;
 import java.util.regex.Pattern;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Time;
@@ -428,4 +427,16 @@
         assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue(), 10);
         assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue(), 0);
     }
+
+    @Test
+    public void testOffsetMetricsReturnsNullWhenRetriableExceptionThrown() throws Exception {
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        // Ensure a timeout exception results in the return value being null
+        when(getKafkaConsumer().beginningOffsets(anyCollection())).thenThrow(TimeoutException.class);
+
+        Map<String, Long> offsetMetric  = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+        assertNull(offsetMetric);
+    }
 }