Merge pull request #3208 from Ethanlm/STORM-3579

[STORM-3579]  use the topo conf for thrift client in Worker code
diff --git a/.travis.yml b/.travis.yml
index 6e46704..901c5a7 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -32,6 +32,12 @@
 jdk:
   - oraclejdk8
   - openjdk11
+  
+matrix:
+  include:
+    - arch: s390x
+      jdk: openjdk11
+      
 before_install:
   - rvm reload
   - rvm use 2.4.2 --install
@@ -46,9 +52,12 @@
   - export MVN_HOME=$HOME/apache-maven-3.6.3
   - if [ ! -d $MVN_HOME/bin ]; then wget https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz -P $HOME; tar xzvf $HOME/apache-maven-3.6.3-bin.tar.gz -C $HOME; fi
   - export PATH=$MVN_HOME/bin:$PATH
+
 install: /bin/bash ./dev-tools/travis/travis-install.sh `pwd`
 script:
-  - /bin/bash ./dev-tools/travis/travis-script.sh `pwd` $MODULES
+  - if [[ $(uname -m) != 's390x' ]]; then 
+      /bin/bash ./dev-tools/travis/travis-script.sh `pwd` $MODULES;
+    fi
 cache:
   directories:
     - "$HOME/.m2/repository"
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java b/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
index 6a9b703..3db487d 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java
@@ -409,9 +409,17 @@
                 Set<Object> creds = subject.getPrivateCredentials();
                 synchronized (creds) {
                     WorkerToken previous = findWorkerToken(subject, type);
-                    creds.add(token);
-                    if (previous != null) {
-                        creds.remove(previous);
+                    boolean notAlreadyContained = creds.add(token);
+                    if (notAlreadyContained) {
+                        if (previous != null) {
+                            //this means token is not equal to previous so we should remove previous
+                            creds.remove(previous);
+                            LOG.info("Replaced WorkerToken for service type {}", type);
+                        } else {
+                            LOG.info("Added new WorkerToken for service type {}", type);
+                        }
+                    } else {
+                        LOG.info("The new WorkerToken for service type {} is the same as the previous token", type);
                     }
                 }
             }
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index 6936555..668d4b3 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -172,6 +172,7 @@
 
     @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
     private void populateSubjectWithTGT(Subject subject, Map<String, String> credentials) {
+        LOG.info("Populating TGT from credentials");
         KerberosTicket tgt = getTGT(credentials);
         if (tgt != null) {
             clearCredentials(subject, tgt);
diff --git a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index ccb5feb..734d660 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -138,7 +138,7 @@
         try {
             curator.delete().forPath(path);
         } catch (KeeperException.NoNodeException nne) {
-            LOG.warn("Path {} already deleted.");
+            LOG.warn("Path {} already deleted.", path);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
index f5db47c..f52df59 100644
--- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -901,7 +901,7 @@
                     LOG.warn("We loosen some constraints here to support topologies of older version running on the current version");
                     validateField(name, className.replace("backtype.storm", "org.apache.storm"));
                 } else {
-                    throw new RuntimeException(e);
+                    throw new RuntimeException("Failed to validate config " + name + " with value " + className, e);
                 }
             }
         }
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 6ab3af5..121dbdb 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -4169,21 +4169,10 @@
             }
 
             Map<String, Object> topoConf = Utils.merge(conf, common.topoConf);
-            Map<String, NormalizedResourceRequest> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
-            for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
-                CommonAggregateStats commonStats = entry.getValue().get_common_stats();
-                setResourcesDefaultIfNotSet(spoutResources, entry.getKey(), topoConf);
-                commonStats.set_resources_map(spoutResources.get(entry.getKey()).toNormalizedMap());
-            }
-            maybeAddPlaceholderSpoutAggStats(topoPageInfo, topology);
 
-            Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topoConf);
-            for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_bolt_agg_stats().entrySet()) {
-                CommonAggregateStats commonStats = entry.getValue().get_common_stats();
-                setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf);
-                commonStats.set_resources_map(boltResources.get(entry.getKey()).toNormalizedMap());
-            }
-            maybeAddPlaceholderBoltAggStats(topoPageInfo, topology, includeSys);
+
+            addSpoutAggStats(topoPageInfo, topology, topoConf);
+            addBoltAggStats(topoPageInfo, topology, topoConf, includeSys);
 
             if (workerSummaries != null) {
                 topoPageInfo.set_workers(workerSummaries);
@@ -4242,20 +4231,31 @@
     }
 
     /**
-     * Add placeholder AggStats allowing topology page to show components before AggStats are populated.
+     * If aggStats are not populated, compute common and component(spout) agg and create placeholder stat.
+     * This allow the topology page to show component spec even the topo is not scheduled.
+     * Otherwise, just fetch data from current topoPageInfo.
      *
      * @param topoPageInfo topology page info holding spout AggStats
      * @param topology     storm topology used to get spout names
+     * @param topoConf     storm topology config
      */
-    private void maybeAddPlaceholderSpoutAggStats(TopologyPageInfo topoPageInfo, StormTopology topology) {
+    private void addSpoutAggStats(TopologyPageInfo topoPageInfo, StormTopology topology, Map<String, Object> topoConf) {
+        Map<String, NormalizedResourceRequest> spoutResources = ResourceUtils.getSpoutsResources(topology, topoConf);
+
+        // if agg stats were not populated yet, create placeholder
         if (topoPageInfo.get_id_to_spout_agg_stats().isEmpty()) {
             for (Entry<String, SpoutSpec> entry : topology.get_spouts().entrySet()) {
+                String spoutName = entry.getKey();
+                SpoutSpec spoutSpec = entry.getValue();
+
                 // component
                 ComponentAggregateStats placeholderComponentStats = new ComponentAggregateStats();
                 placeholderComponentStats.set_type(ComponentType.SPOUT);
 
                 // common aggregate
-                CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(entry.getValue());
+                CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(spoutSpec);
+                commonStats.set_resources_map(spoutResources.getOrDefault(spoutName, new NormalizedResourceRequest())
+                            .toNormalizedMap());
                 placeholderComponentStats.set_common_stats(commonStats);
 
                 // spout aggregate
@@ -4264,23 +4264,36 @@
                 SpecificAggregateStats specificStats = new SpecificAggregateStats();
                 specificStats.set_spout(spoutAggStats);
                 placeholderComponentStats.set_specific_stats(specificStats);
-
-                topoPageInfo.get_id_to_spout_agg_stats().put(entry.getKey(), placeholderComponentStats);
+                topoPageInfo.get_id_to_spout_agg_stats().put(spoutName, placeholderComponentStats);
+            }
+        } else {
+            for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_spout_agg_stats().entrySet()) {
+                CommonAggregateStats commonStats = entry.getValue().get_common_stats();
+                setResourcesDefaultIfNotSet(spoutResources, entry.getKey(), topoConf);
+                commonStats.set_resources_map(spoutResources.get(entry.getKey()).toNormalizedMap());
             }
         }
     }
 
     /**
-     * Add placeholder AggStats allowing topology page to show components before AggStats are populated.
+     * If aggStats are not populated, compute common and component(bolt) agg and create placeholder stat.
+     * This allow the topology page to show component spec even the topo is not scheduled.
+     * Otherwise, just fetch data from current topoPageInfo.
      *
      * @param topoPageInfo  topology page info holding bolt AggStats
      * @param topology      storm topology used to get bolt names
+     * @param topoConf      storm topology config
      * @param includeSys    whether to show system bolts
      */
-    private void maybeAddPlaceholderBoltAggStats(TopologyPageInfo topoPageInfo, StormTopology topology, boolean includeSys) {
+    private void addBoltAggStats(TopologyPageInfo topoPageInfo, StormTopology topology,
+                                                 Map<String, Object> topoConf, boolean includeSys) {
+        Map<String, NormalizedResourceRequest> boltResources = ResourceUtils.getBoltsResources(topology, topoConf);
+
+        // if agg stats were not populated yet, create placeholder
         if (topoPageInfo.get_id_to_bolt_agg_stats().isEmpty()) {
             for (Entry<String, Bolt> entry : topology.get_bolts().entrySet()) {
                 String boltName = entry.getKey();
+                Bolt bolt = entry.getValue();
                 if ((!includeSys && Utils.isSystemId(boltName)) || boltName.equals(Constants.SYSTEM_COMPONENT_ID)) {
                     continue;
                 }
@@ -4290,7 +4303,9 @@
                 placeholderComponentStats.set_type(ComponentType.BOLT);
 
                 // common aggregate
-                CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(entry.getValue());
+                CommonAggregateStats commonStats = getPlaceholderCommonAggregateStats(bolt);
+                commonStats.set_resources_map(boltResources.getOrDefault(boltName, new NormalizedResourceRequest())
+                            .toNormalizedMap());
                 placeholderComponentStats.set_common_stats(commonStats);
 
                 // bolt aggregate
@@ -4305,6 +4320,12 @@
 
                 topoPageInfo.get_id_to_bolt_agg_stats().put(boltName, placeholderComponentStats);
             }
+        } else {
+            for (Entry<String, ComponentAggregateStats> entry : topoPageInfo.get_id_to_bolt_agg_stats().entrySet()) {
+                CommonAggregateStats commonStats = entry.getValue().get_common_stats();
+                setResourcesDefaultIfNotSet(boltResources, entry.getKey(), topoConf);
+                commonStats.set_resources_map(boltResources.get(entry.getKey()).toNormalizedMap());
+            }
         }
     }
 
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
index 85e5f9a..de29ace 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java
@@ -803,18 +803,21 @@
         }
 
         if (dynamicState.container.didMainProcessExit()) {
-            LOG.warn("SLOT {}: main process has exited", staticState.port);
+            LOG.warn("SLOT {}: main process has exited for topology: {}",
+                    staticState.port, dynamicState.currentAssignment.get_topology_id());
             return killContainerFor(KillReason.PROCESS_EXIT, dynamicState, staticState);
         }
 
         if (dynamicState.container.isMemoryLimitViolated(dynamicState.currentAssignment)) {
-            LOG.warn("SLOT {}: violated memory limits", staticState.port);
+            LOG.warn("SLOT {}: violated memory limits for topology: {}",
+                    staticState.port, dynamicState.currentAssignment.get_topology_id());
             return killContainerFor(KillReason.MEMORY_VIOLATION, dynamicState, staticState);
         }
 
         LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
         if (hb == null) {
-            LOG.warn("SLOT {}: HB returned as null", staticState.port);
+            LOG.warn("SLOT {}: HB returned as null for topology: {}",
+                    staticState.port, dynamicState.currentAssignment.get_topology_id());
             //This can happen if the supervisor crashed after launching a
             // worker that never came up.
             return killContainerFor(KillReason.HB_NULL, dynamicState, staticState);
@@ -823,7 +826,8 @@
         long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
         long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState);
         if (timeDiffMs > hbTimeoutMs) {
-            LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, timeDiffMs, hbTimeoutMs);
+            LOG.warn("SLOT {}: HB is too old {} > {} for topology: {}",
+                    staticState.port, timeDiffMs, hbTimeoutMs, dynamicState.currentAssignment.get_topology_id());
             return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
         }
 
diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/NoOpMetricStore.java b/storm-server/src/main/java/org/apache/storm/metricstore/NoOpMetricStore.java
new file mode 100644
index 0000000..c9f67c1
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/metricstore/NoOpMetricStore.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metricstore;
+
+import java.util.Map;
+import org.apache.storm.metric.StormMetricsRegistry;
+
+@SuppressWarnings("unused")
+public class NoOpMetricStore implements MetricStore {
+
+    @Override
+    public void prepare(Map<String, Object> config, StormMetricsRegistry metricsRegistry) {}
+
+    @Override
+    public void insert(Metric metric) { }
+
+    @Override
+    public boolean populateValue(Metric metric) {
+        return true;
+    }
+
+    @Override
+    public void close() { }
+
+    @Override
+    public void scan(FilterOptions filter, ScanCallback scanCallback) { }
+}
+
+
+
+
diff --git a/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java b/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java
index 79d8046..acca1f9 100644
--- a/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java
+++ b/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java
@@ -830,6 +830,9 @@
         Map<String, Map<K, Double>> ret = new HashMap<>();
 
         Map<String, Map<K, List>> expands = expandAveragesSeq(avgSeq, countSeq);
+        if (expands == null) {
+            return ret;
+        }
         for (Map.Entry<String, Map<K, List>> entry : expands.entrySet()) {
             String k = entry.getKey();
 
@@ -2305,6 +2308,9 @@
                 initVal = mergeWithAddPair(initVal, expandAverages(avg, count));
             }
         }
+        if (initVal == null) {
+            initVal = new HashMap<>();
+        }
         return initVal;
     }
 
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/WEB-INF/topology.html b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/WEB-INF/topology.html
index 0dda533..59c7c52 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/WEB-INF/topology.html
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/WEB-INF/topology.html
@@ -142,6 +142,10 @@
             return;
         }
     }
+    if (!loggerName) {
+        alert ("Logger name must be provided. Use \"ROOT\" if you want to change current default log behavior.");
+        return;
+    }
     var data = {};
     var loggerSetting;
 
diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java
index 3cbff1c..c13a4c3 100644
--- a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java
+++ b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/resources/StormApiResource.java
@@ -214,7 +214,7 @@
     }
 
     /**
-     * /api/v1/supervisor/summary -> topo history.
+     * /api/v1/supervisor/summary -> supervisor summary.
      */
     @GET
     @Path("/supervisor/summary")
@@ -402,6 +402,9 @@
                     UIHelpers.getVisualizationData(nimbusClient.getClient(), window, id, sys),
                     callback
             );
+        } catch (RuntimeException e) {
+            LOG.error("Failure getting topology visualization", e);
+            throw e;
         }
     }