Merge pull request #3196 from RuiLi8080/STORM-3568
[STORM-3568] prevent user from changing log level with empty logger name from topo UI
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 6ab3af5..121dbdb 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -4169,21 +4169,10 @@
}
Map<String, Object> topoConf = Utils.merge(conf, common.topoConf);
- Map<String, NormalizedResourceRequest> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
- for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
- CommonAggregateStats commonStats = entry.getValue().get_common_stats();
- setResourcesDefaultIfNotSet(spoutResources, entry.getKey(), topoConf);
- commonStats.set_resources_map(spoutResources.get(entry.getKey()).toNormalizedMap());
- }
- maybeAddPlaceholderSpoutAggStats(topoPageInfo, topology);
- Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topoConf);
- for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_bolt_agg_stats().entrySet()) {
- CommonAggregateStats commonStats = entry.getValue().get_common_stats();
- setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf);
- commonStats.set_resources_map(boltResources.get(entry.getKey()).toNormalizedMap());
- }
- maybeAddPlaceholderBoltAggStats(topoPageInfo, topology, includeSys);
+
+ addSpoutAggStats(topoPageInfo, topology, topoConf);
+ addBoltAggStats(topoPageInfo, topology, topoConf, includeSys);
if (workerSummaries != null) {
topoPageInfo.set_workers(workerSummaries);
@@ -4242,20 +4231,31 @@
}
/**
- * Add placeholder AggStats allowing topology page to show components before AggStats are populated.
+ * If aggStats are not populated, compute common and component(spout) agg and create placeholder stat.
+ * This allow the topology page to show component spec even the topo is not scheduled.
+ * Otherwise, just fetch data from current topoPageInfo.
*
* @param topoPageInfo topology page info holding spout AggStats
* @param topology storm topology used to get spout names
+ * @param topoConf storm topology config
*/
- private void maybeAddPlaceholderSpoutAggStats(TopologyPageInfo topoPageInfo, StormTopology topology) {
+ private void addSpoutAggStats(TopologyPageInfo topoPageInfo, StormTopology topology, Map<String, Object> topoConf) {
+ Map<String, NormalizedResourceRequest> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
+
+ // if agg stats were not populated yet, create placeholder
if (topoPageInfo.get_id_to_spout_agg_stats().isEmpty()) {
for (Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
+ String spoutName = entry.getKey();
+ SpoutSpec spoutSpec = entry.getValue();
+
// component
ComponentAggregateStats placeholderComponentStats = new ComponentAggregateStats();
placeholderComponentStats.set_type(ComponentType.SPOUT);
// common aggregate
- CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(entry.getValue());
+ CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(spoutSpec);
+ commonStats.set_resources_map(spoutResources.getOrDefault(spoutName, new NormalizedResourceRequest())
+ .toNormalizedMap());
placeholderComponentStats.set_common_stats(commonStats);
// spout aggregate
@@ -4264,23 +4264,36 @@
SpecificAggregateStats specificStats = new SpecificAggregateStats();
specificStats.set_spout(spoutAggStats);
placeholderComponentStats.set_specific_stats(specificStats);
-
- topoPageInfo.get_id_to_spout_agg_stats().put(entry.getKey(), placeholderComponentStats);
+ topoPageInfo.get_id_to_spout_agg_stats().put(spoutName, placeholderComponentStats);
+ }
+ } else {
+ for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
+ CommonAggregateStats commonStats = entry.getValue().get_common_stats();
+ setResourcesDefaultIfNotSet(spoutResources, entry.getKey(), topoConf);
+ commonStats.set_resources_map(spoutResources.get(entry.getKey()).toNormalizedMap());
}
}
}
/**
- * Add placeholder AggStats allowing topology page to show components before AggStats are populated.
+ * If aggStats are not populated, compute common and component(bolt) agg and create placeholder stat.
+ * This allow the topology page to show component spec even the topo is not scheduled.
+ * Otherwise, just fetch data from current topoPageInfo.
*
* @param topoPageInfo topology page info holding bolt AggStats
* @param topology storm topology used to get bolt names
+ * @param topoConf storm topology config
* @param includeSys whether to show system bolts
*/
- private void maybeAddPlaceholderBoltAggStats(TopologyPageInfo topoPageInfo, StormTopology topology, boolean includeSys) {
+ private void addBoltAggStats(TopologyPageInfo topoPageInfo, StormTopology topology,
+ Map<String, Object> topoConf, boolean includeSys) {
+ Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topoConf);
+
+ // if agg stats were not populated yet, create placeholder
if (topoPageInfo.get_id_to_bolt_agg_stats().isEmpty()) {
for (Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
String boltName = entry.getKey();
+ Bolt bolt = entry.getValue();
if ((!includeSys && Utils.isSystemId(boltName)) || boltName.equals(Constants.SYSTEM_COMPONENT_ID)) {
continue;
}
@@ -4290,7 +4303,9 @@
placeholderComponentStats.set_type(ComponentType.BOLT);
// common aggregate
- CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(entry.getValue());
+ CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(bolt);
+ commonStats.set_resources_map(boltResources.getOrDefault(boltName, new NormalizedResourceRequest())
+ .toNormalizedMap());
placeholderComponentStats.set_common_stats(commonStats);
// bolt aggregate
@@ -4305,6 +4320,12 @@
topoPageInfo.get_id_to_bolt_agg_stats().put(boltName, placeholderComponentStats);
}
+ } else {
+ for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_bolt_agg_stats().entrySet()) {
+ CommonAggregateStats commonStats = entry.getValue().get_common_stats();
+ setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf);
+ commonStats.set_resources_map(boltResources.get(entry.getKey()).toNormalizedMap());
+ }
}
}