Merge pull request #3168 from dandsager1/STORM-3539

STORM-3539 Add metric for worker start time out
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
index da84979..496e1d8 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -24,6 +24,7 @@
 import java.util.function.Supplier;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.internal.OffsetManager;
 import org.apache.storm.metric.api.IMetric;
 import org.slf4j.Logger;
@@ -76,8 +77,17 @@
         Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
         Set<TopicPartition> topicPartitions = offsetManagers.keySet();
 
-        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
-        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
+        Map<TopicPartition, Long> beginningOffsets;
+        Map<TopicPartition, Long> endOffsets;
+
+        try {
+            beginningOffsets = consumer.beginningOffsets(topicPartitions);
+            endOffsets = consumer.endOffsets(topicPartitions);
+        } catch (RetriableException e) {
+            LOG.warn("Failed to get offsets from Kafka! Will retry on next metrics tick.", e);
+            return null;
+        }
+
         //map to hold partition level and topic level metrics
         Map<String, Long> result = new HashMap<>();
 
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 512d274..d7f563f 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -21,17 +21,14 @@
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyListOf;
 import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
 
 import java.util.HashSet;
 import java.util.List;
@@ -39,8 +36,10 @@
 import java.util.Set;
 import java.util.regex.Pattern;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Time;
@@ -428,4 +427,16 @@
         assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue(), 10);
         assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue(), 0);
     }
+
+    @Test
+    public void testOffsetMetricsReturnsNullWhenRetriableExceptionThrown() throws Exception {
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        // Ensure a timeout exception results in the return value being null
+        when(getKafkaConsumer().beginningOffsets(anyCollection())).thenThrow(TimeoutException.class);
+
+        Map<String, Long> offsetMetric  = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+        assertNull(offsetMetric);
+    }
 }
diff --git a/storm-client/src/jvm/org/apache/storm/Constants.java b/storm-client/src/jvm/org/apache/storm/Constants.java
index 57af8d1..7a1c518 100644
--- a/storm-client/src/jvm/org/apache/storm/Constants.java
+++ b/storm-client/src/jvm/org/apache/storm/Constants.java
@@ -55,5 +55,7 @@
     public static final String COMMON_ONHEAP_MEMORY_RESOURCE_NAME = "onheap.memory.mb";
     public static final String COMMON_OFFHEAP_MEMORY_RESOURCE_NAME = "offheap.memory.mb";
     public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb";
+
+    public static final String NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS = "nimbus:num-send-assignment-exceptions";
 }
     
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index a3b7575..b56a623 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -268,6 +268,7 @@
     private final Meter shutdownCalls;
     private final Meter processWorkerMetricsCalls;
     private final Meter mkAssignmentsErrors;
+    private final Meter sendAssignmentExceptions;   // used in AssignmentDistributionService.java
 
     //Timer
     private final Timer fileUploadDuration;
@@ -305,7 +306,7 @@
         IStormClusterState state = nimbus.getStormClusterState();
         Assignment oldAssignment = state.assignmentInfo(topoId, null);
         state.removeStorm(topoId);
-        notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer());
+        notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer(), nimbus.getMetricsRegistry());
         nimbus.heartbeatsCache.removeTopo(topoId);
         nimbus.getIdToExecutors().getAndUpdate(new Dissoc<>(topoId));
         return null;
@@ -517,6 +518,7 @@
         this.shutdownCalls = metricsRegistry.registerMeter("nimbus:num-shutdown-calls");
         this.processWorkerMetricsCalls = metricsRegistry.registerMeter("nimbus:process-worker-metric-calls");
         this.mkAssignmentsErrors = metricsRegistry.registerMeter("nimbus:mkAssignments-Errors");
+        this.sendAssignmentExceptions = metricsRegistry.registerMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS);
         this.fileUploadDuration = metricsRegistry.registerTimer("nimbus:files-upload-duration-ms");
         this.schedulingDuration = metricsRegistry.registerTimer("nimbus:topology-scheduling-duration-ms");
         this.numAddedExecPerScheduling = metricsRegistry.registerHistogram("nimbus:num-added-executors-per-scheduling");
@@ -1578,7 +1580,8 @@
      */
     private static void notifySupervisorsAssignments(Map<String, Assignment> assignments,
                                                      AssignmentDistributionService service, Map<String, String> nodeHost,
-                                                     Map<String, SupervisorDetails> supervisorDetails) {
+                                                     Map<String, SupervisorDetails> supervisorDetails,
+                                                     StormMetricsRegistry metricsRegistry) {
         for (Map.Entry<String, String> nodeEntry : nodeHost.entrySet()) {
             try {
                 String nodeId = nodeEntry.getKey();
@@ -1586,7 +1589,7 @@
                 supervisorAssignments.set_storm_assignment(assignmentsForNode(assignments, nodeEntry.getKey()));
                 SupervisorDetails details = supervisorDetails.get(nodeId);
                 Integer serverPort = details != null ? details.getServerPort() : null;
-                service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments);
+                service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments, metricsRegistry);
             } catch (Throwable tr1) {
                 //just skip when any error happens wait for next round assignments reassign
                 LOG.error("Exception when add assignments distribution task for node {}", nodeEntry.getKey());
@@ -1595,10 +1598,10 @@
     }
 
     private static void notifySupervisorsAsKilled(IStormClusterState clusterState, Assignment oldAss,
-                                                  AssignmentDistributionService service) {
+                                                  AssignmentDistributionService service, StormMetricsRegistry metricsRegistry) {
         Map<String, String> nodeHost = assignmentChangedNodes(oldAss, null);
         notifySupervisorsAssignments(clusterState.assignmentsInfo(), service, nodeHost,
-                                     basicSupervisorDetailsMap(clusterState));
+                                     basicSupervisorDetailsMap(clusterState), metricsRegistry);
     }
 
     @VisibleForTesting
@@ -1654,6 +1657,10 @@
         return assignmentsDistributer;
     }
 
+    private StormMetricsRegistry getMetricsRegistry() {
+        return metricsRegistry;
+    }
+
     @VisibleForTesting
     public HeartbeatCache getHeartbeatsCache() {
         return heartbeatsCache;
@@ -2520,7 +2527,7 @@
                 totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment));
             }
             notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes,
-                    basicSupervisorDetailsMap);
+                                        basicSupervisorDetailsMap, getMetricsRegistry());
 
             Map<String, Collection<WorkerSlot>> addedSlots = new HashMap<>();
             for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
index cc98804..5db347e 100644
--- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
+++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
@@ -68,6 +68,10 @@
         registry.removeMatching((name, metric) -> nameToMetric.containsKey(name));
     }
 
+    public Meter getMeter(String meterName) {
+        return registry.getMeters().get(meterName);
+    }
+
     public void startMetricsReporters(Map<String, Object> daemonConf) {
         reporters = MetricsUtils.getPreparableReporters(daemonConf);
         for (PreparableReporter reporter : reporters) {
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
index 4f84997..4eb1bb4 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
@@ -21,9 +21,12 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.Constants;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.daemon.supervisor.Supervisor;
 import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.metric.StormMetricsRegistry;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.SupervisorClient;
@@ -146,7 +149,8 @@
      * @param serverPort node thrift server port.
      * @param assignments the {@link org.apache.storm.generated.SupervisorAssignments}
      */
-    public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+    public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments,
+                                      StormMetricsRegistry metricsRegistry) {
         try {
             //For some reasons, we can not get supervisor port info, eg: supervisor shutdown,
             //Just skip for this scheduling round.
@@ -155,7 +159,8 @@
                 return;
             }
 
-            boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort, assignments), 5L, TimeUnit.SECONDS);
+            boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort,
+                                                assignments, metricsRegistry), 5L, TimeUnit.SECONDS);
             if (!success) {
                 LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full.", node);
             }
@@ -211,17 +216,20 @@
         private String host;
         private Integer serverPort;
         private SupervisorAssignments assignments;
+        private StormMetricsRegistry metricsRegistry;
 
-        private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+        private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments,
+                                StormMetricsRegistry metricsRegistry) {
             this.node = node;
             this.host = host;
             this.serverPort = serverPort;
             this.assignments = assignments;
+            this.metricsRegistry = metricsRegistry;
         }
 
         public static NodeAssignments getInstance(String node, String host, Integer serverPort,
-                                                  SupervisorAssignments assignments) {
-            return new NodeAssignments(node, host, serverPort, assignments);
+                                                  SupervisorAssignments assignments, StormMetricsRegistry metricsRegistry) {
+            return new NodeAssignments(node, host, serverPort, assignments, metricsRegistry);
         }
 
         //supervisor assignment id/supervisor id
@@ -241,6 +249,9 @@
             return this.assignments;
         }
 
+        public StormMetricsRegistry getMetricsRegistry() {
+            return metricsRegistry;
+        }
     }
 
     /**
@@ -289,14 +300,13 @@
                     try {
                         client.getIface().sendSupervisorAssignments(assignments.getAssignments());
                     } catch (Exception e) {
-                        //just ignore the exception.
+                        assignments.getMetricsRegistry().getMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS).mark();
                         LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage());
                     }
                 } catch (Throwable e) {
                     //just ignore any error/exception.
                     LOG.error("Exception to create supervisor client for node {}: {}", assignments.getNode(), e.getMessage());
                 }
-
             }
         }
     }