Merge pull request #3163 from dandsager1/STORM-3534
STORM-3534 Add generic resources to UI
diff --git a/SECURITY.md b/SECURITY.md
index 668ea8a..5e321e5 100644
--- a/SECURITY.md
+++ b/SECURITY.md
@@ -28,6 +28,7 @@
|--------------|--------------|------------------------|--------|
| 2181 | `storm.zookeeper.port` | Nimbus, Supervisors, and Worker processes | ZooKeeper |
| 6627 | `nimbus.thrift.port` | Storm clients, Supervisors, and UI | Nimbus |
+| 6628 | `supervisor.thrift.port` | Nimbus | Supervisors |
| 8080 | `ui.port` | Client Web Browsers | UI |
| 8000 | `logviewer.port` | Client Web Browsers | Logviewer |
| 3772 | `drpc.port` | External DRPC Clients | DRPC |
diff --git a/docs/ClusterMetrics.md b/docs/ClusterMetrics.md
index 4e4d0f1..f7f7b4f 100644
--- a/docs/ClusterMetrics.md
+++ b/docs/ClusterMetrics.md
@@ -185,6 +185,7 @@
| supervisor:num-launched | meter | number of times the supervisor is launched. |
| supervisor:num-shell-exceptions | meter | number of exceptions calling shell commands. |
| supervisor:num-slots-used-gauge | gauge | number of slots used on the supervisor. |
+| supervisor:num-worker-start-timed-out | meter | number of times worker start timed out. |
| supervisor:num-worker-transitions-into-empty | meter | number of transitions into empty state. |
| supervisor:num-worker-transitions-into-kill | meter | number of transitions into kill state. |
| supervisor:num-worker-transitions-into-kill-and-relaunch | meter | number of transitions into kill-and-relaunch state |
diff --git a/docs/Generic-resources.md b/docs/Generic-resources.md
new file mode 100644
index 0000000..f3bfe3e
--- /dev/null
+++ b/docs/Generic-resources.md
@@ -0,0 +1,39 @@
+---
+title: Generic Resources
+layout: documentation
+documentation: true
+---
+
+### Generic Resources
+Generic Resources allow Storm to reference arbitrary resource types. Generic Resources may be considered an extension of the resources enumerated by the [Resource Aware Scheduler](Resource_Aware_Scheduler_overview.html), which accounts for CPU and memory.
+
+### API Overview
+For a Storm Topology, the user can now specify the amount of generic resources a topology component (i.e. Spout or Bolt) is required to run a single instance of the component. The user can specify the resource requirement for a topology component by using the following API call.
+```
+ public T addResource(String resourceName, Number resourceValue)
+```
+Parameters:
+- resourceName – The name of the generic resource
+- resourceValue – The amount of the generic resource
+
+Example of Usage:
+```
+ SpoutDeclarer s1 = builder.setSpout("word", new TestWordSpout(), 10);
+ s1.addResouce("gpu.count", 1.0);
+```
+
+### Specifying Generic Cluster Resources
+
+A storm administrator can specify node resource availability by modifying the _conf/storm.yaml_ file located in the storm home directory of that node.
+```
+ supervisor.resources.map: {[type<String>] : [amount<Double>]}
+```
+Example of Usage:
+```
+ supervisor.resources.map: {"gpu.count" : 2.0}
+```
+
+
+### Generic Resources in UI
+
+![Storm Cluster UI](images/storm_ui.png)
diff --git a/docs/SECURITY.md b/docs/SECURITY.md
index 74a67ef..6e4d7e1 100644
--- a/docs/SECURITY.md
+++ b/docs/SECURITY.md
@@ -36,6 +36,7 @@
|--------------|--------------|------------------------|--------|
| 2181 | `storm.zookeeper.port` | Nimbus, Supervisors, and Worker processes | Zookeeper |
| 6627 | `nimbus.thrift.port` | Storm clients, Supervisors, and UI | Nimbus |
+| 6628 | `supervisor.thrift.port` | Nimbus | Supervisors |
| 8080 | `ui.port` | Client Web Browsers | UI |
| 8000 | `logviewer.port` | Client Web Browsers | Logviewer |
| 3772 | `drpc.port` | External DRPC Clients | DRPC |
diff --git a/docs/images/storm_ui.png b/docs/images/storm_ui.png
new file mode 100644
index 0000000..45aae41
--- /dev/null
+++ b/docs/images/storm_ui.png
Binary files differ
diff --git a/docs/index.md b/docs/index.md
index df1df93..36cf63f 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -65,6 +65,7 @@
* [CGroup Enforcement](cgroups_in_storm.html)
* [Pacemaker reduces load on zookeeper for large clusters](Pacemaker.html)
* [Resource Aware Scheduler](Resource_Aware_Scheduler_overview.html)
+* [Generic Resources](Generic-resources.html)
* [Daemon Metrics/Monitoring](ClusterMetrics.html)
* [Windows users guide](windows-users-guide.html)
* [Classpath handling](Classpath-handling.html)
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
index da84979..496e1d8 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -24,6 +24,7 @@
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.metric.api.IMetric;
import org.slf4j.Logger;
@@ -76,8 +77,17 @@
Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
Set<TopicPartition> topicPartitions = offsetManagers.keySet();
- Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
- Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
+ Map<TopicPartition, Long> beginningOffsets;
+ Map<TopicPartition, Long> endOffsets;
+
+ try {
+ beginningOffsets = consumer.beginningOffsets(topicPartitions);
+ endOffsets = consumer.endOffsets(topicPartitions);
+ } catch (RetriableException e) {
+ LOG.warn("Failed to get offsets from Kafka! Will retry on next metrics tick.", e);
+ return null;
+ }
+
//map to hold partition level and topic level metrics
Map<String, Long> result = new HashMap<>();
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 512d274..d7f563f 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -21,17 +21,14 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyListOf;
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
import java.util.HashSet;
import java.util.List;
@@ -39,8 +36,10 @@
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Time;
@@ -428,4 +427,16 @@
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue(), 10);
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue(), 0);
}
+
+ @Test
+ public void testOffsetMetricsReturnsNullWhenRetriableExceptionThrown() throws Exception {
+ final int messageCount = 10;
+ prepareSpout(messageCount);
+
+ // Ensure a timeout exception results in the return value being null
+ when(getKafkaConsumer().beginningOffsets(anyCollection())).thenThrow(TimeoutException.class);
+
+ Map<String, Long> offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+ assertNull(offsetMetric);
+ }
}
diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
index 8299c14..50570e1 100644
--- a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
+++ b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@ -18,12 +18,17 @@
package org.apache.storm.flux.parser;
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.storm.flux.model.BoltDef;
import org.apache.storm.flux.model.IncludeDef;
@@ -40,17 +45,20 @@
*/
public class FluxParser {
private static final Logger LOG = LoggerFactory.getLogger(FluxParser.class);
+ private static final Pattern propertyPattern =
+ Pattern.compile(".*\\$\\{(?<var>ENV-(?<envVar>.+)|(?<list>.+)\\[(?<listIndex>\\d+)]|.+)}.*");
private FluxParser() {
}
/**
* Parse a flux topology definition.
- * @param inputFile source YAML file
- * @param dumpYaml if true, dump the parsed YAML to stdout
+ *
+ * @param inputFile source YAML file
+ * @param dumpYaml if true, dump the parsed YAML to stdout
* @param processIncludes whether or not to process includes
- * @param properties properties file for variable substitution
- * @param envSub whether or not to perform environment variable substitution
+ * @param properties properties file for variable substitution
+ * @param envSub whether or not to perform environment variable substitution
* @return resulting topologuy definition
* @throws IOException if there is a problem reading file(s)
*/
@@ -65,11 +73,12 @@
/**
* Parse a flux topology definition from a classpath resource..
- * @param resource YAML resource
- * @param dumpYaml if true, dump the parsed YAML to stdout
+ *
+ * @param resource YAML resource
+ * @param dumpYaml if true, dump the parsed YAML to stdout
* @param processIncludes whether or not to process includes
- * @param properties properties file for variable substitution
- * @param envSub whether or not to perform environment variable substitution
+ * @param properties properties file for variable substitution
+ * @param envSub whether or not to perform environment variable substitution
* @return resulting topologuy definition
* @throws IOException if there is a problem reading file(s)
*/
@@ -84,11 +93,12 @@
/**
* Parse a flux topology definition.
- * @param inputStream InputStream representation of YAML file
- * @param dumpYaml if true, dump the parsed YAML to stdout
+ *
+ * @param inputStream InputStream representation of YAML file
+ * @param dumpYaml if true, dump the parsed YAML to stdout
* @param processIncludes whether or not to process includes
- * @param properties properties file for variable substitution
- * @param envSub whether or not to perform environment variable substitution
+ * @param properties properties file for variable substitution
+ * @param envSub whether or not to perform environment variable substitution
* @return resulting topology definition
* @throws IOException if there is a problem reading file(s)
*/
@@ -116,10 +126,11 @@
/**
* Parse filter properties file.
+ *
* @param propertiesFile properties file for variable substitution
- * @param resource whether or not to load properties file from classpath
+ * @param resource whether or not to load properties file from classpath
* @return resulting filter properties
- * @throws IOException if there is a problem reading file
+ * @throws IOException if there is a problem reading file
*/
public static Properties parseProperties(String propertiesFile, boolean resource) throws IOException {
Properties properties = null;
@@ -140,36 +151,43 @@
}
private static TopologyDef loadYaml(Yaml yaml, InputStream in, Properties properties, boolean envSubstitution) throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
LOG.info("loading YAML from input stream...");
- int b = -1;
- while ((b = in.read()) != -1) {
- bos.write(b);
- }
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
+ String conf = reader.lines().map(line -> {
+ Matcher m = propertyPattern.matcher(line);
+ return m.find()
+ ? getPropertyReplacement(properties, m, envSubstitution)
+ .map(propValue -> line.replace("${" + m.group("var") + "}", propValue))
+ .orElseGet(() -> {
+ LOG.warn("Could not find replacement for property: " + m.group("var"));
+ return line;
+ })
+ : line;
+ }).collect(Collectors.joining(System.lineSeparator()));
- // TODO substitution implementation is not exactly efficient or kind to memory...
- String str = bos.toString();
- // properties file substitution
- if (properties != null) {
- LOG.info("Performing property substitution.");
- for (Object key : properties.keySet()) {
- str = str.replace("${" + key + "}", properties.getProperty((String)key));
- }
- } else {
- LOG.info("Not performing property substitution.");
+ return (TopologyDef) yaml.load(conf);
}
+ }
- // environment variable substitution
- if (envSubstitution) {
- LOG.info("Performing environment variable substitution...");
- Map<String, String> envs = System.getenv();
- for (String key : envs.keySet()) {
- str = str.replace("${ENV-" + key + "}", envs.get(key));
- }
+ private static Optional<String> getPropertyReplacement(Properties properties, Matcher match, boolean envSubstitution) {
+ if (match.group("listIndex") != null) {
+ String prop = properties.getProperty(match.group("list"));
+ return Optional.of(parseListAndExtractElem(prop, match.group("listIndex")));
+ } else if (envSubstitution && match.group("envVar") != null) {
+ String envVar = System.getenv().get(match.group("envVar"));
+ return Optional.ofNullable(envVar);
} else {
- LOG.info("Not performing environment variable substitution.");
+ return Optional.ofNullable(properties.getProperty(match.group("var")));
}
- return (TopologyDef) yaml.load(str);
+ }
+
+ private static String parseListAndExtractElem(String strList, String index) {
+ String[] listProp = strList.substring(1, strList.length() - 1).split(",");
+ String listElem = listProp[Integer.parseInt(index)];
+
+ // remove whitespaces and double quotes from beginning and end of a given string
+ String trimmed = listElem.trim();
+ return trimmed.substring(1, trimmed.length() - 1);
}
private static void dumpYaml(TopologyDef topology, Yaml yaml) {
@@ -191,14 +209,15 @@
/**
* Process includes contained within a yaml file.
+ *
* @param yaml the yaml parser for parsing the include file(s)
* @param topologyDef the topology definition containing (possibly zero) includes
- * @param properties properties file for variable substitution
- * @param envSub whether or not to perform environment variable substitution
+ * @param properties properties file for variable substitution
+ * @param envSub whether or not to perform environment variable substitution
* @return The TopologyDef with includes resolved.
*/
private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, Properties properties, boolean envSub)
- throws IOException {
+ throws IOException {
//TODO support multiple levels of includes
if (topologyDef.getIncludes() != null) {
for (IncludeDef include : topologyDef.getIncludes()) {
diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
index 90613c9..275a720 100644
--- a/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
+++ b/flux/flux-core/src/test/java/org/apache/storm/flux/TCKTest.java
@@ -275,6 +275,11 @@
Collections.singletonList("A string list"),
is(context.getTopologyDef().getConfig().get("list.property.target")));
+ //Test substitution where the target type is a List element
+ assertThat("List element property is not replaced by the expected value",
+ "A string list",
+ is(context.getTopologyDef().getConfig().get("list.element.property.target")));
+
}
@Test
diff --git a/flux/flux-core/src/test/resources/configs/substitution-test.yaml b/flux/flux-core/src/test/resources/configs/substitution-test.yaml
index 9707936..67ac92a 100644
--- a/flux/flux-core/src/test/resources/configs/substitution-test.yaml
+++ b/flux/flux-core/src/test/resources/configs/substitution-test.yaml
@@ -45,6 +45,8 @@
test.env.value: "${ENV-PATH}"
# test variable substitution for list type
list.property.target: ${a.list.property}
+ # test variable substitution for list element
+ list.element.property.target: ${a.list.property[0]}
# spout definitions
spouts:
diff --git a/pom.xml b/pom.xml
index 0601a84..4ff5218 100644
--- a/pom.xml
+++ b/pom.xml
@@ -236,6 +236,15 @@
</roles>
<timezone>-6</timezone>
</developer>
+ <developer>
+ <id>agresch</id>
+ <name>Aaron Gresch</name>
+ <email>agresch@gmail.com</email>
+ <roles>
+ <role>Committer</role>
+ </roles>
+ <timezone>-6</timezone>
+ </developer>
</developers>
diff --git a/storm-client/src/jvm/org/apache/storm/Constants.java b/storm-client/src/jvm/org/apache/storm/Constants.java
index 57af8d1..7a1c518 100644
--- a/storm-client/src/jvm/org/apache/storm/Constants.java
+++ b/storm-client/src/jvm/org/apache/storm/Constants.java
@@ -55,5 +55,7 @@
public static final String COMMON_ONHEAP_MEMORY_RESOURCE_NAME = "onheap.memory.mb";
public static final String COMMON_OFFHEAP_MEMORY_RESOURCE_NAME = "offheap.memory.mb";
public static final String COMMON_TOTAL_MEMORY_RESOURCE_NAME = "memory.mb";
+
+ public static final String NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS = "nimbus:num-send-assignment-exceptions";
}
diff --git a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
index e6a12d9..4713e95 100644
--- a/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/topology/WindowedBoltExecutor.java
@@ -16,6 +16,7 @@
import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -345,7 +346,7 @@
@Override
public Map<String, Object> getComponentConfiguration() {
- return bolt.getComponentConfiguration();
+ return bolt.getComponentConfiguration() != null ? bolt.getComponentConfiguration() : Collections.emptyMap();
}
protected WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
diff --git a/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java b/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
index 5162cc6..0e38838 100644
--- a/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
+++ b/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java
@@ -39,6 +39,7 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
@@ -214,6 +215,14 @@
Mockito.verify(outputCollector).emit("$late", Arrays.asList(tuple), new Values(tuple));
}
+ @Test
+ public void testEmptyConfigOnWrappedBolt() {
+ IWindowedBolt wrappedBolt = Mockito.mock(IWindowedBolt.class);
+ Mockito.when(wrappedBolt.getComponentConfiguration()).thenReturn(null);
+ executor = new WindowedBoltExecutor(wrappedBolt);
+ assertTrue("Configuration is not empty", executor.getComponentConfiguration().isEmpty());
+ }
+
private static class TestWindowedBolt extends BaseWindowedBolt {
List<TupleWindow> tupleWindows = new ArrayList<>();
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 957d27b..6ab3af5 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -268,6 +268,7 @@
private final Meter shutdownCalls;
private final Meter processWorkerMetricsCalls;
private final Meter mkAssignmentsErrors;
+ private final Meter sendAssignmentExceptions; // used in AssignmentDistributionService.java
//Timer
private final Timer fileUploadDuration;
@@ -305,7 +306,7 @@
IStormClusterState state = nimbus.getStormClusterState();
Assignment oldAssignment = state.assignmentInfo(topoId, null);
state.removeStorm(topoId);
- notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer());
+ notifySupervisorsAsKilled(state, oldAssignment, nimbus.getAssignmentsDistributer(), nimbus.getMetricsRegistry());
nimbus.heartbeatsCache.removeTopo(topoId);
nimbus.getIdToExecutors().getAndUpdate(new Dissoc<>(topoId));
return null;
@@ -517,6 +518,7 @@
this.shutdownCalls = metricsRegistry.registerMeter("nimbus:num-shutdown-calls");
this.processWorkerMetricsCalls = metricsRegistry.registerMeter("nimbus:process-worker-metric-calls");
this.mkAssignmentsErrors = metricsRegistry.registerMeter("nimbus:mkAssignments-Errors");
+ this.sendAssignmentExceptions = metricsRegistry.registerMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS);
this.fileUploadDuration = metricsRegistry.registerTimer("nimbus:files-upload-duration-ms");
this.schedulingDuration = metricsRegistry.registerTimer("nimbus:topology-scheduling-duration-ms");
this.numAddedExecPerScheduling = metricsRegistry.registerHistogram("nimbus:num-added-executors-per-scheduling");
@@ -1578,7 +1580,8 @@
*/
private static void notifySupervisorsAssignments(Map<String, Assignment> assignments,
AssignmentDistributionService service, Map<String, String> nodeHost,
- Map<String, SupervisorDetails> supervisorDetails) {
+ Map<String, SupervisorDetails> supervisorDetails,
+ StormMetricsRegistry metricsRegistry) {
for (Map.Entry<String, String> nodeEntry : nodeHost.entrySet()) {
try {
String nodeId = nodeEntry.getKey();
@@ -1586,7 +1589,7 @@
supervisorAssignments.set_storm_assignment(assignmentsForNode(assignments, nodeEntry.getKey()));
SupervisorDetails details = supervisorDetails.get(nodeId);
Integer serverPort = details != null ? details.getServerPort() : null;
- service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments);
+ service.addAssignmentsForNode(nodeId, nodeEntry.getValue(), serverPort, supervisorAssignments, metricsRegistry);
} catch (Throwable tr1) {
//just skip when any error happens wait for next round assignments reassign
LOG.error("Exception when add assignments distribution task for node {}", nodeEntry.getKey());
@@ -1595,10 +1598,10 @@
}
private static void notifySupervisorsAsKilled(IStormClusterState clusterState, Assignment oldAss,
- AssignmentDistributionService service) {
+ AssignmentDistributionService service, StormMetricsRegistry metricsRegistry) {
Map<String, String> nodeHost = assignmentChangedNodes(oldAss, null);
notifySupervisorsAssignments(clusterState.assignmentsInfo(), service, nodeHost,
- basicSupervisorDetailsMap(clusterState));
+ basicSupervisorDetailsMap(clusterState), metricsRegistry);
}
@VisibleForTesting
@@ -1654,6 +1657,10 @@
return assignmentsDistributer;
}
+ private StormMetricsRegistry getMetricsRegistry() {
+ return metricsRegistry;
+ }
+
@VisibleForTesting
public HeartbeatCache getHeartbeatsCache() {
return heartbeatsCache;
@@ -2520,7 +2527,7 @@
totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment));
}
notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes,
- basicSupervisorDetailsMap);
+ basicSupervisorDetailsMap, getMetricsRegistry());
Map<String, Collection<WorkerSlot>> addedSlots = new HashMap<>();
for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 7575a91..df419b9 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -684,6 +684,7 @@
long timeDiffms = (Time.currentTimeMillis() - dynamicState.startTime);
long hbFirstTimeoutMs = getFirstHbTimeoutMs(staticState, dynamicState);
if (timeDiffms > hbFirstTimeoutMs) {
+ staticState.slotMetrics.numWorkerStartTimedOut.mark();
LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", staticState.port, dynamicState.container,
hbFirstTimeoutMs);
return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SlotMetrics.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SlotMetrics.java
index f8e13fd..8b2f5f1 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SlotMetrics.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SlotMetrics.java
@@ -26,6 +26,7 @@
class SlotMetrics {
final Meter numWorkersLaunched;
+ final Meter numWorkerStartTimedOut;
final Map<Slot.KillReason, Meter> numWorkersKilledFor;
final Timer workerLaunchDuration;
final Map<Slot.MachineState, Meter> transitionIntoState;
@@ -34,6 +35,7 @@
SlotMetrics(StormMetricsRegistry metricsRegistry) {
numWorkersLaunched = metricsRegistry.registerMeter("supervisor:num-workers-launched");
+ numWorkerStartTimedOut = metricsRegistry.registerMeter("supervisor:num-worker-start-timed-out");
numWorkersKilledFor = Collections.unmodifiableMap(EnumUtil.toEnumMap(Slot.KillReason.class,
killReason -> metricsRegistry.registerMeter("supervisor:num-workers-killed-" + killReason.toString())));
workerLaunchDuration = metricsRegistry.registerTimer("supervisor:worker-launch-duration");
diff --git a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
index 578be2b..fdd130b 100644
--- a/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
+++ b/storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java
@@ -21,7 +21,6 @@
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
@@ -95,7 +94,8 @@
private final ConcurrentHashMap<String, CompletableFuture<Void>> topologyBasicDownloaded = new ConcurrentHashMap<>();
private final Path localBaseDir;
private final int blobDownloadRetries;
- private final ScheduledExecutorService execService;
+ private final ScheduledExecutorService downloadExecService;
+ private final ScheduledExecutorService taskExecService;
private final long cacheCleanupPeriod;
private final StormMetricsRegistry metricsRegistry;
// cleanup
@@ -120,13 +120,14 @@
cacheCleanupPeriod = ObjectReader.getInt(conf.get(
DaemonConfig.SUPERVISOR_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS), 30 * 1000).longValue();
- // if we needed we could make config for update thread pool size
- int threadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
blobDownloadRetries = ObjectReader.getInt(conf.get(
DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_MAX_RETRIES), 3);
- execService = Executors.newScheduledThreadPool(threadPoolSize,
- new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Executor - %d").build());
+ int downloadThreadPoolSize = ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_BLOBSTORE_DOWNLOAD_THREAD_COUNT), 5);
+ downloadExecService = Executors.newScheduledThreadPool(downloadThreadPoolSize,
+ new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Download Executor - %d").build());
+ taskExecService = Executors.newScheduledThreadPool(3,
+ new ThreadFactoryBuilder().setNameFormat("AsyncLocalizer Task Executor - %d").build());
reconstructLocalizedResources();
symlinksDisabled = (boolean) conf.getOrDefault(Config.DISABLE_SYMLINKS, false);
@@ -213,7 +214,7 @@
blobPending.compute(topologyId, (tid, old) -> {
CompletableFuture<Void> ret = old;
if (ret == null) {
- ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), execService);
+ ret = CompletableFuture.supplyAsync(new DownloadBlobs(pna, cb), taskExecService);
} else {
try {
addReferencesToBlobs(pna, cb);
@@ -291,7 +292,7 @@
}
}
LOG.debug("FINISHED download of {}", blob);
- }, execService);
+ }, downloadExecService);
i++;
}
return CompletableFuture.allOf(all);
@@ -337,14 +338,15 @@
* Start any background threads needed. This includes updating blobs and cleaning up unused blobs over the configured size limit.
*/
public void start() {
- execService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
+ taskExecService.scheduleWithFixedDelay(this::updateBlobs, 30, 30, TimeUnit.SECONDS);
LOG.debug("Scheduling cleanup every {} millis", cacheCleanupPeriod);
- execService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
+ taskExecService.scheduleAtFixedRate(this::cleanup, cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
}
@Override
public void close() throws InterruptedException {
- execService.shutdown();
+ downloadExecService.shutdown();
+ taskExecService.shutdown();
}
private List<LocalResource> getLocalResources(PortAndAssignment pna) throws IOException {
@@ -450,15 +452,7 @@
topoConfBlob.removeReference(pna);
}
- List<LocalResource> localResources;
- try {
- localResources = getLocalResources(pna);
- } catch (FileNotFoundException e) {
- LOG.warn("Local resources for {} no longer available", pna, e);
- return;
- }
-
- for (LocalResource lr : localResources) {
+ for (LocalResource lr : getLocalResources(pna)) {
try {
removeBlobReference(lr.getBlobName(), pna, lr.shouldUncompress());
} catch (Exception e) {
diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
index cc98804..5db347e 100644
--- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
+++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java
@@ -68,6 +68,10 @@
registry.removeMatching((name, metric) -> nameToMetric.containsKey(name));
}
+ public Meter getMeter(String meterName) {
+ return registry.getMeters().get(meterName);
+ }
+
public void startMetricsReporters(Map<String, Object> daemonConf) {
reporters = MetricsUtils.getPreparableReporters(daemonConf);
for (PreparableReporter reporter : reporters) {
diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
index 4f84997..4eb1bb4 100644
--- a/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
+++ b/storm-server/src/main/java/org/apache/storm/nimbus/AssignmentDistributionService.java
@@ -21,9 +21,12 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+
+import org.apache.storm.Constants;
import org.apache.storm.DaemonConfig;
import org.apache.storm.daemon.supervisor.Supervisor;
import org.apache.storm.generated.SupervisorAssignments;
+import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.SupervisorClient;
@@ -146,7 +149,8 @@
* @param serverPort node thrift server port.
* @param assignments the {@link org.apache.storm.generated.SupervisorAssignments}
*/
- public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+ public void addAssignmentsForNode(String node, String host, Integer serverPort, SupervisorAssignments assignments,
+ StormMetricsRegistry metricsRegistry) {
try {
//For some reasons, we can not get supervisor port info, eg: supervisor shutdown,
//Just skip for this scheduling round.
@@ -155,7 +159,8 @@
return;
}
- boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort, assignments), 5L, TimeUnit.SECONDS);
+ boolean success = nextQueue().offer(NodeAssignments.getInstance(node, host, serverPort,
+ assignments, metricsRegistry), 5L, TimeUnit.SECONDS);
if (!success) {
LOG.warn("Discard an assignment distribution for node {} because the target sub queue is full.", node);
}
@@ -211,17 +216,20 @@
private String host;
private Integer serverPort;
private SupervisorAssignments assignments;
+ private StormMetricsRegistry metricsRegistry;
- private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments) {
+ private NodeAssignments(String node, String host, Integer serverPort, SupervisorAssignments assignments,
+ StormMetricsRegistry metricsRegistry) {
this.node = node;
this.host = host;
this.serverPort = serverPort;
this.assignments = assignments;
+ this.metricsRegistry = metricsRegistry;
}
public static NodeAssignments getInstance(String node, String host, Integer serverPort,
- SupervisorAssignments assignments) {
- return new NodeAssignments(node, host, serverPort, assignments);
+ SupervisorAssignments assignments, StormMetricsRegistry metricsRegistry) {
+ return new NodeAssignments(node, host, serverPort, assignments, metricsRegistry);
}
//supervisor assignment id/supervisor id
@@ -241,6 +249,9 @@
return this.assignments;
}
+ public StormMetricsRegistry getMetricsRegistry() {
+ return metricsRegistry;
+ }
}
/**
@@ -289,14 +300,13 @@
try {
client.getIface().sendSupervisorAssignments(assignments.getAssignments());
} catch (Exception e) {
- //just ignore the exception.
+ assignments.getMetricsRegistry().getMeter(Constants.NIMBUS_SEND_ASSIGNMENT_EXCEPTIONS).mark();
LOG.error("Exception when trying to send assignments to node {}: {}", assignments.getNode(), e.getMessage());
}
} catch (Throwable e) {
//just ignore any error/exception.
LOG.error("Exception to create supervisor client for node {}: {}", assignments.getNode(), e.getMessage());
}
-
}
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index cba27cd..75ae8c7 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -26,7 +26,6 @@
import java.util.stream.Collectors;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
-import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SchedulerAssignment;
@@ -36,7 +35,6 @@
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceOffer;
import org.apache.storm.scheduler.resource.normalization.NormalizedResourceRequest;
-import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService;
@@ -374,22 +372,29 @@
}
String getRemainingRequiredResourcesMessage() {
+ StringBuilder message = new StringBuilder();
+ message.append("After evicting lower priority topologies: ");
+
+ NormalizedResourceOffer clusterRemainingAvailableResources = new NormalizedResourceOffer();
+ clusterRemainingAvailableResources.add(clusterAvailableResources);
+ clusterRemainingAvailableResources.remove(topologyScheduledResources);
+
double memoryNeeded = remainingRequiredTopologyMemory;
double cpuNeeded = remainingRequiredTopologyResources.getTotalCpu();
- StringBuilder message = new StringBuilder();
- if (memoryNeeded > 0 || cpuNeeded > 0) {
- if (memoryNeeded > 0) {
- message.append(memoryNeeded).append(" MB ");
- }
- if (cpuNeeded > 0) {
- message.append(cpuNeeded).append("% CPU ");
- }
- if (remainingRequiredTopologyResources.getNormalizedResources().anyNonCpuOverZero()) {
- message.append(" ");
- message.append(remainingRequiredTopologyResources.getNormalizedResources().toString());
- message.append(" ");
- }
- message.append("needed even after evicting lower priority topologies. ");
+ if (memoryNeeded > 0) {
+ message.append("Additional Memory Required: ").append(memoryNeeded).append(" MB ");
+ message.append("(Available: ").append(clusterRemainingAvailableResources.getTotalMemoryMb()).append(" MB). ");
+ }
+ if (cpuNeeded > 0) {
+ message.append("Additional CPU Required: ").append(cpuNeeded).append("% CPU ");
+ message.append("(Available: ").append(clusterRemainingAvailableResources.getTotalCpu()).append(" % CPU).");
+ }
+ if (remainingRequiredTopologyResources.getNormalizedResources().anyNonCpuOverZero()) {
+ message.append(" Additional Topology Required Resources: ");
+ message.append(remainingRequiredTopologyResources.getNormalizedResources().toString());
+ message.append(" Cluster Available Resources: ");
+ message.append(clusterRemainingAvailableResources.getNormalizedResources().toString());
+ message.append(". ");
}
return message.toString();
}
diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
index 4e5ec5c..2b5afe0 100644
--- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
+++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
@@ -126,7 +126,7 @@
} else {
String comp = td.getExecutorToComponent().get(exec);
NormalizedResourceRequest requestedResources = td.getTotalResources(exec);
- LOG.error("Not Enough Resources to schedule Task {} - {} {}", exec, comp, requestedResources);
+ LOG.warn("Not Enough Resources to schedule Task {} - {} {}", exec, comp, requestedResources);
return false;
}
}