Merge pull request #3206 from Ethanlm/STORM-3577
[STORM-3578] ClientAuthUtils.insertWorkerTokens removed tokens incorrectly
diff --git a/.travis.yml b/.travis.yml
index 6e46704..901c5a7 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -32,6 +32,12 @@
jdk:
- oraclejdk8
- openjdk11
+
+matrix:
+ include:
+ - arch: s390x
+ jdk: openjdk11
+
before_install:
- rvm reload
- rvm use 2.4.2 --install
@@ -46,9 +52,12 @@
- export MVN_HOME=$HOME/apache-maven-3.6.3
- if [ ! -d $MVN_HOME/bin ]; then wget https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz -P $HOME; tar xzvf $HOME/apache-maven-3.6.3-bin.tar.gz -C $HOME; fi
- export PATH=$MVN_HOME/bin:$PATH
+
install: /bin/bash ./dev-tools/travis/travis-install.sh `pwd`
script:
- - /bin/bash ./dev-tools/travis/travis-script.sh `pwd` $MODULES
+ - if [[ $(uname -m) != 's390x' ]]; then
+ /bin/bash ./dev-tools/travis/travis-script.sh `pwd` $MODULES;
+ fi
cache:
directories:
- "$HOME/.m2/repository"
diff --git a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index ccb5feb..734d660 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -138,7 +138,7 @@
try {
curator.delete().forPath(path);
} catch (KeeperException.NoNodeException nne) {
- LOG.warn("Path {} already deleted.");
+ LOG.warn("Path {} already deleted.", path);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
index f5db47c..f52df59 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -901,7 +901,7 @@
LOG.warn("We loosen some constraints here to support topologies of older version running on the current version");
validateField(name, className.replace("backtype.storm", "org.apache.storm"));
} else {
- throw new RuntimeException(e);
+ throw new RuntimeException("Failed to validate config " + name + " with value " + className, e);
}
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 6ab3af5..121dbdb 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -4169,21 +4169,10 @@
}
Map<String, Object> topoConf = Utils.merge(conf, common.topoConf);
- Map<String, NormalizedResourceRequest> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
- for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
- CommonAggregateStats commonStats = entry.getValue().get_common_stats();
- setResourcesDefaultIfNotSet(spoutResources, entry.getKey(), topoConf);
- commonStats.set_resources_map(spoutResources.get(entry.getKey()).toNormalizedMap());
- }
- maybeAddPlaceholderSpoutAggStats(topoPageInfo, topology);
- Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topoConf);
- for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_bolt_agg_stats().entrySet()) {
- CommonAggregateStats commonStats = entry.getValue().get_common_stats();
- setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf);
- commonStats.set_resources_map(boltResources.get(entry.getKey()).toNormalizedMap());
- }
- maybeAddPlaceholderBoltAggStats(topoPageInfo, topology, includeSys);
+
+ addSpoutAggStats(topoPageInfo, topology, topoConf);
+ addBoltAggStats(topoPageInfo, topology, topoConf, includeSys);
if (workerSummaries != null) {
topoPageInfo.set_workers(workerSummaries);
@@ -4242,20 +4231,31 @@
}
/**
- * Add placeholder AggStats allowing topology page to show components before AggStats are populated.
+ * If aggStats are not populated, compute common and component(spout) agg and create placeholder stat.
+ * This allow the topology page to show component spec even the topo is not scheduled.
+ * Otherwise, just fetch data from current topoPageInfo.
*
* @param topoPageInfo topology page info holding spout AggStats
* @param topology storm topology used to get spout names
+ * @param topoConf storm topology config
*/
- private void maybeAddPlaceholderSpoutAggStats(TopologyPageInfo topoPageInfo, StormTopology topology) {
+ private void addSpoutAggStats(TopologyPageInfo topoPageInfo, StormTopology topology, Map<String, Object> topoConf) {
+ Map<String, NormalizedResourceRequest> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
+
+ // if agg stats were not populated yet, create placeholder
if (topoPageInfo.get_id_to_spout_agg_stats().isEmpty()) {
for (Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
+ String spoutName = entry.getKey();
+ SpoutSpec spoutSpec = entry.getValue();
+
// component
ComponentAggregateStats placeholderComponentStats = new ComponentAggregateStats();
placeholderComponentStats.set_type(ComponentType.SPOUT);
// common aggregate
- CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(entry.getValue());
+ CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(spoutSpec);
+ commonStats.set_resources_map(spoutResources.getOrDefault(spoutName, new NormalizedResourceRequest())
+ .toNormalizedMap());
placeholderComponentStats.set_common_stats(commonStats);
// spout aggregate
@@ -4264,23 +4264,36 @@
SpecificAggregateStats specificStats = new SpecificAggregateStats();
specificStats.set_spout(spoutAggStats);
placeholderComponentStats.set_specific_stats(specificStats);
-
- topoPageInfo.get_id_to_spout_agg_stats().put(entry.getKey(), placeholderComponentStats);
+ topoPageInfo.get_id_to_spout_agg_stats().put(spoutName, placeholderComponentStats);
+ }
+ } else {
+ for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
+ CommonAggregateStats commonStats = entry.getValue().get_common_stats();
+ setResourcesDefaultIfNotSet(spoutResources, entry.getKey(), topoConf);
+ commonStats.set_resources_map(spoutResources.get(entry.getKey()).toNormalizedMap());
}
}
}
/**
- * Add placeholder AggStats allowing topology page to show components before AggStats are populated.
+ * If aggStats are not populated, compute common and component(bolt) agg and create placeholder stat.
+ * This allow the topology page to show component spec even the topo is not scheduled.
+ * Otherwise, just fetch data from current topoPageInfo.
*
* @param topoPageInfo topology page info holding bolt AggStats
* @param topology storm topology used to get bolt names
+ * @param topoConf storm topology config
* @param includeSys whether to show system bolts
*/
- private void maybeAddPlaceholderBoltAggStats(TopologyPageInfo topoPageInfo, StormTopology topology, boolean includeSys) {
+ private void addBoltAggStats(TopologyPageInfo topoPageInfo, StormTopology topology,
+ Map<String, Object> topoConf, boolean includeSys) {
+ Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topoConf);
+
+ // if agg stats were not populated yet, create placeholder
if (topoPageInfo.get_id_to_bolt_agg_stats().isEmpty()) {
for (Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
String boltName = entry.getKey();
+ Bolt bolt = entry.getValue();
if ((!includeSys && Utils.isSystemId(boltName)) || boltName.equals(Constants.SYSTEM_COMPONENT_ID)) {
continue;
}
@@ -4290,7 +4303,9 @@
placeholderComponentStats.set_type(ComponentType.BOLT);
// common aggregate
- CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(entry.getValue());
+ CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(bolt);
+ commonStats.set_resources_map(boltResources.getOrDefault(boltName, new NormalizedResourceRequest())
+ .toNormalizedMap());
placeholderComponentStats.set_common_stats(commonStats);
// bolt aggregate
@@ -4305,6 +4320,12 @@
topoPageInfo.get_id_to_bolt_agg_stats().put(boltName, placeholderComponentStats);
}
+ } else {
+ for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_bolt_agg_stats().entrySet()) {
+ CommonAggregateStats commonStats = entry.getValue().get_common_stats();
+ setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf);
+ commonStats.set_resources_map(boltResources.get(entry.getKey()).toNormalizedMap());
+ }
}
}
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 85e5f9a..de29ace 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -803,18 +803,21 @@
}
if (dynamicState.container.didMainProcessExit()) {
- LOG.warn("SLOT {}: main process has exited", staticState.port);
+ LOG.warn("SLOT {}: main process has exited for topology: {}",
+ staticState.port, dynamicState.currentAssignment.get_topology_id());
return killContainerFor(KillReason.PROCESS_EXIT, dynamicState, staticState);
}
if (dynamicState.container.isMemoryLimitViolated(dynamicState.currentAssignment)) {
- LOG.warn("SLOT {}: violated memory limits", staticState.port);
+ LOG.warn("SLOT {}: violated memory limits for topology: {}",
+ staticState.port, dynamicState.currentAssignment.get_topology_id());
return killContainerFor(KillReason.MEMORY_VIOLATION, dynamicState, staticState);
}
LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
if (hb == null) {
- LOG.warn("SLOT {}: HB returned as null", staticState.port);
+ LOG.warn("SLOT {}: HB returned as null for topology: {}",
+ staticState.port, dynamicState.currentAssignment.get_topology_id());
//This can happen if the supervisor crashed after launching a
// worker that never came up.
return killContainerFor(KillReason.HB_NULL, dynamicState, staticState);
@@ -823,7 +826,8 @@
long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState);
if (timeDiffMs > hbTimeoutMs) {
- LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, timeDiffMs, hbTimeoutMs);
+ LOG.warn("SLOT {}: HB is too old {} > {} for topology: {}",
+ staticState.port, timeDiffMs, hbTimeoutMs, dynamicState.currentAssignment.get_topology_id());
return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
}
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/NoOpMetricStore.java b/storm-server/src/main/java/org/apache/storm/metricstore/NoOpMetricStore.java
new file mode 100644
index 0000000..c9f67c1
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/NoOpMetricStore.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metricstore;
+
+import java.util.Map;
+import org.apache.storm.metric.StormMetricsRegistry;
+
+@SuppressWarnings("unused")
+public class NoOpMetricStore implements MetricStore {
+
+ @Override
+ public void prepare(Map<String, Object> config, StormMetricsRegistry metricsRegistry) {}
+
+ @Override
+ public void insert(Metric metric) { }
+
+ @Override
+ public boolean populateValue(Metric metric) {
+ return true;
+ }
+
+ @Override
+ public void close() { }
+
+ @Override
+ public void scan(FilterOptions filter, ScanCallback scanCallback) { }
+}
+
+
+
+
diff --git a/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java b/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java
index 79d8046..acca1f9 100644
--- a/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java
+++ b/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java
@@ -830,6 +830,9 @@
Map<String, Map<K, Double>> ret = new HashMap<>();
Map<String, Map<K, List>> expands = expandAveragesSeq(avgSeq, countSeq);
+ if (expands == null) {
+ return ret;
+ }
for (Map.Entry<String, Map<K, List>> entry : expands.entrySet()) {
String k = entry.getKey();
@@ -2305,6 +2308,9 @@
initVal = mergeWithAddPair(initVal, expandAverages(avg, count));
}
}
+ if (initVal == null) {
+ initVal = new HashMap<>();
+ }
return initVal;
}
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/WEB-INF/topology.html b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/WEB-INF/topology.html
index 0dda533..59c7c52 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/WEB-INF/topology.html
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/WEB-INF/topology.html
@@ -142,6 +142,10 @@
return;
}
}
+ if (!loggerName) {
+ alert ("Logger name must be provided. Use \"ROOT\" if you want to change current default log behavior.");
+ return;
+ }
var data = {};
var loggerSetting;
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java
index 3cbff1c..c13a4c3 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java
@@ -214,7 +214,7 @@
}
/**
- * /api/v1/supervisor/summary -> topo history.
+ * /api/v1/supervisor/summary -> supervisor summary.
*/
@GET
@Path("/supervisor/summary")
@@ -402,6 +402,9 @@
UIHelpers.getVisualizationData(nimbusClient.getClient(), window, id, sys),
callback
);
+ } catch (RuntimeException e) {
+ LOG.error("Failure getting topology visualization", e);
+ throw e;
}
}