Merge pull request #3168 from dandsager1/STORM-3539
STORM-3539 Add metric for worker start time out
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
index da84979..496e1d8 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -24,6 +24,7 @@
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.metric.api.IMetric;
import org.slf4j.Logger;
@@ -76,8 +77,17 @@
Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
Set<TopicPartition> topicPartitions = offsetManagers.keySet();
- Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
- Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
+ Map<TopicPartition, Long> beginningOffsets;
+ Map<TopicPartition, Long> endOffsets;
+
+ try {
+ beginningOffsets = consumer.beginningOffsets(topicPartitions);
+ endOffsets = consumer.endOffsets(topicPartitions);
+ } catch (RetriableException e) {
+ LOG.warn("Failed to get offsets from Kafka! Will retry on next metrics tick.", e);
+ return null;
+ }
+
//map to hold partition level and topic level metrics
Map<String, Long> result = new HashMap<>();
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 512d274..d7f563f 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -21,17 +21,14 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyListOf;
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
import java.util.HashSet;
import java.util.List;
@@ -39,8 +36,10 @@
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Time;
@@ -428,4 +427,16 @@
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue(), 10);
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue(), 0);
}
+
+ @Test
+ public void testOffsetMetricsReturnsNullWhenRetriableExceptionThrown() throws Exception {
+ final int messageCount = 10;
+ prepareSpout(messageCount);
+
+ // Ensure a timeout exception results in the return value being null
+ when(getKafkaConsumer().beginningOffsets(anyCollection())).thenThrow(TimeoutException.class);
+
+ Map<String, Long> offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+ assertNull(offsetMetric);
+ }
}
diff --git a/storm-client/src/jvm/org/apache/storm/Constants.java b/storm-client/src/jvm/org/apache/storm/Constants.java
index 57af8d1..7a1c518 100644
--- a/storm-client/src/jvm/org/apache/storm/Constants.java
+++ b/storm-client/src/jvm/org/apache/storm/Constants.java
@@ -55,5 +55,7 @@
public static final String COMMON_ONHEAP_MEMORY_RESOURCE_NAME = "onheap.memory.mb";
public static final String COMMON_OFFHEAP_MEMORY_RESOURCE_NAME = "offheap.memory.mb";
public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb";
+
+ public static final String NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS = "nimbus:num-send-assignment-exceptions";
}
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index a3b7575..b56a623 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -268,6 +268,7 @@
private final Meter shutdownCalls;
private final Meter processWorkerMetricsCalls;
private final Meter mkAssignmentsErrors;
+ private final Meter sendAssignmentExceptions; // used in AssignmentDistributionService.java
//Timer
private final Timer fileUploadDuration;
@@ -305,7 +306,7 @@
IStormClusterState state = nimbus.getStormClusterState();
Assignment oldAssignment = state.assignmentInfo(topoId, null);
state.removeStorm(topoId);
- notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer());
+ notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer(), nimbus.getMetricsRegistry());
nimbus.heartbeatsCache.removeTopo(topoId);
nimbus.getIdToExecutors().getAndUpdate(new Dissoc<>(topoId));
return null;
@@ -517,6 +518,7 @@
this.shutdownCalls = metricsRegistry.registerMeter("nimbus:num-shutdown-calls");
this.processWorkerMetricsCalls = metricsRegistry.registerMeter("nimbus:process-worker-metric-calls");
this.mkAssignmentsErrors = metricsRegistry.registerMeter("nimbus:mkAssignments-Errors");
+ this.sendAssignmentExceptions = metricsRegistry.registerMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS);
this.fileUploadDuration = metricsRegistry.registerTimer("nimbus:files-upload-duration-ms");
this.schedulingDuration = metricsRegistry.registerTimer("nimbus:topology-scheduling-duration-ms");
this.numAddedExecPerScheduling = metricsRegistry.registerHistogram("nimbus:num-added-executors-per-scheduling");
@@ -1578,7 +1580,8 @@
*/
private static void notifySupervisorsAssignments(Map<String, Assignment> assignments,
AssignmentDistributionService service, Map<String, String> nodeHost,
- Map<String, SupervisorDetails> supervisorDetails) {
+ Map<String, SupervisorDetails> supervisorDetails,
+ StormMetricsRegistry metricsRegistry) {
for (Map.Entry<String, String> nodeEntry : nodeHost.entrySet()) {
try {
String nodeId = nodeEntry.getKey();
@@ -1586,7 +1589,7 @@
supervisorAssignments.set_storm_assignment(assignmentsForNode(assignments, nodeEntry.getKey()));
SupervisorDetails details = supervisorDetails.get(nodeId);
Integer serverPort = details != null ? details.getServerPort() : null;
- service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments);
+ service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments, metricsRegistry);
} catch (Throwable tr1) {
//just skip when any error happens wait for next round assignments reassign
LOG.error("Exception when add assignments distribution task for node {}", nodeEntry.getKey());
@@ -1595,10 +1598,10 @@
}
private static void notifySupervisorsAsKilled(IStormClusterState clusterState, Assignment oldAss,
- AssignmentDistributionService service) {
+ AssignmentDistributionService service, StormMetricsRegistry metricsRegistry) {
Map<String, String> nodeHost = assignmentChangedNodes(oldAss, null);
notifySupervisorsAssignments(clusterState.assignmentsInfo(), service, nodeHost,
- basicSupervisorDetailsMap(clusterState));
+ basicSupervisorDetailsMap(clusterState), metricsRegistry);
}
@VisibleForTesting
@@ -1654,6 +1657,10 @@
return assignmentsDistributer;
}
+ private StormMetricsRegistry getMetricsRegistry() {
+ return metricsRegistry;
+ }
+
@VisibleForTesting
public HeartbeatCache getHeartbeatsCache() {
return heartbeatsCache;
@@ -2520,7 +2527,7 @@
totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment));
}
notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes,
- basicSupervisorDetailsMap);
+ basicSupervisorDetailsMap, getMetricsRegistry());
Map<String, Collection<WorkerSlot>> addedSlots = new HashMap<>();
for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
index cc98804..5db347e 100644
--- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
+++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
@@ -68,6 +68,10 @@
registry.removeMatching((name, metric) -> nameToMetric.containsKey(name));
}
+ public Meter getMeter(String meterName) {
+ return registry.getMeters().get(meterName);
+ }
+
public void startMetricsReporters(Map<String, Object> daemonConf) {
reporters = MetricsUtils.getPreparableReporters(daemonConf);
for (PreparableReporter reporter : reporters) {
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
index 4f84997..4eb1bb4 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
@@ -21,9 +21,12 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.Constants;
import org.apache.storm.DaemonConfig;
import org.apache.storm.daemon.supervisor.Supervisor;
import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.SupervisorClient;
@@ -146,7 +149,8 @@
* @param serverPort node thrift server port.
* @param assignments the {@link org.apache.storm.generated.SupervisorAssignments}
*/
- public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+ public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments,
+ StormMetricsRegistry metricsRegistry) {
try {
//For some reasons, we can not get supervisor port info, eg: supervisor shutdown,
//Just skip for this scheduling round.
@@ -155,7 +159,8 @@
return;
}
- boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort, assignments), 5L, TimeUnit.SECONDS);
+ boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort,
+ assignments, metricsRegistry), 5L, TimeUnit.SECONDS);
if (!success) {
LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full.", node);
}
@@ -211,17 +216,20 @@
private String host;
private Integer serverPort;
private SupervisorAssignments assignments;
+ private StormMetricsRegistry metricsRegistry;
- private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+ private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments,
+ StormMetricsRegistry metricsRegistry) {
this.node = node;
this.host = host;
this.serverPort = serverPort;
this.assignments = assignments;
+ this.metricsRegistry = metricsRegistry;
}
public static NodeAssignments getInstance(String node, String host, Integer serverPort,
- SupervisorAssignments assignments) {
- return new NodeAssignments(node, host, serverPort, assignments);
+ SupervisorAssignments assignments, StormMetricsRegistry metricsRegistry) {
+ return new NodeAssignments(node, host, serverPort, assignments, metricsRegistry);
}
//supervisor assignment id/supervisor id
@@ -241,6 +249,9 @@
return this.assignments;
}
+ public StormMetricsRegistry getMetricsRegistry() {
+ return metricsRegistry;
+ }
}
/**
@@ -289,14 +300,13 @@
try {
client.getIface().sendSupervisorAssignments(assignments.getAssignments());
} catch (Exception e) {
- //just ignore the exception.
+ assignments.getMetricsRegistry().getMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS).mark();
LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage());
}
} catch (Throwable e) {
//just ignore any error/exception.
LOG.error("Exception to create supervisor client for node {}: {}", assignments.getNode(), e.getMessage());
}
-
}
}
}