Merge pull request #3294 from agresch/agresch_storm_3648

STORM-3648 add meter to track worker heartbeat rate
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
index 1843755..2cbee19 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
@@ -47,8 +47,8 @@
     private static void ackingProducerSimulation() {
         WaitStrategyPark ws = new WaitStrategyPark(100);
         StormMetricRegistry registry = new StormMetricRegistry();
-        JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
-        JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+        JCQueue spoutQ = new JCQueue("spoutQ", "spoutQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+        JCQueue ackQ = new JCQueue("ackQ", "ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
 
         final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ);
         final Acker acker = new Acker(ackQ, spoutQ);
@@ -59,9 +59,9 @@
     private static void producerFwdConsumer(int prodBatchSz) {
         WaitStrategyPark ws = new WaitStrategyPark(100);
         StormMetricRegistry registry = new StormMetricRegistry();
-        JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test",
+        JCQueue q1 = new JCQueue("q1", "q1", 1024, 0, prodBatchSz, ws, "test", "test",
                 Collections.singletonList(1000), 1000, registry);
-        JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+        JCQueue q2 = new JCQueue("q2", "q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
 
         final Producer prod = new Producer(q1);
         final Forwarder fwd = new Forwarder(q1, q2);
@@ -72,7 +72,7 @@
 
 
     private static void oneProducer1Consumer(int prodBatchSz) {
-        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+        JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
                 Collections.singletonList(1000), 1000, new StormMetricRegistry());
 
         final Producer prod1 = new Producer(q1);
@@ -82,7 +82,7 @@
     }
 
     private static void twoProducer1Consumer(int prodBatchSz) {
-        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+        JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
                 Collections.singletonList(1000), 1000, new StormMetricRegistry());
 
         final Producer prod1 = new Producer(q1);
@@ -93,7 +93,7 @@
     }
 
     private static void threeProducer1Consumer(int prodBatchSz) {
-        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+        JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
                 Collections.singletonList(1000), 1000, new StormMetricRegistry());
 
         final Producer prod1 = new Producer(q1);
@@ -108,8 +108,8 @@
     private static void oneProducer2Consumers(int prodBatchSz) {
         WaitStrategyPark ws = new WaitStrategyPark(100);
         StormMetricRegistry registry = new StormMetricRegistry();
-        JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
-        JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+        JCQueue q1 = new JCQueue("q1", "q1", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+        JCQueue q2 = new JCQueue("q2", "q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
 
         final Producer2 prod1 = new Producer2(q1, q2);
         final Consumer cons1 = new Consumer(q1);
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index c27b468..93c5499 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -385,12 +385,22 @@
     /**
      * A list of host names that this topology would prefer to be scheduled on (no guarantee is given though). This is intended for
      * debugging only.
+     *
+     * <p>Favored nodes are moved to the front of the node selection list.
+     * If the same node is also present in {@link #TOPOLOGY_SCHEDULER_UNFAVORED_NODES}
+     * then the node is considered only as a favored node and is removed from the unfavored list.
+     * </p>
      */
     @IsStringList
     public static final String TOPOLOGY_SCHEDULER_FAVORED_NODES = "topology.scheduler.favored.nodes";
     /**
      * A list of host names that this topology would prefer to NOT be scheduled on (no guarantee is given though). This is intended for
      * debugging only.
+     *
+     * <p>Unfavored nodes are moved to the end of the node selection list.
+     * If the same node is also present in {@link #TOPOLOGY_SCHEDULER_FAVORED_NODES}
+     * then the node is considered only as a favored node and is removed from the unfavored list.
+     * </p>
      */
     @IsStringList
     public static final String TOPOLOGY_SCHEDULER_UNFAVORED_NODES = "topology.scheduler.unfavored.nodes";
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index ee8e005..989d464 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -148,7 +148,7 @@
         String portStr = args[3];
         String workerId = args[4];
         Map<String, Object> conf = ConfigUtils.readStormConfig();
-        Utils.setupDefaultUncaughtExceptionHandler();
+        Utils.setupWorkerUncaughtExceptionHandler();
         StormCommon.validateDistributedMode(conf);
         int supervisorPortInt = Integer.parseInt(supervisorPort);
         Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 70840ac..db0c0e5 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -36,7 +36,6 @@
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.StormTimer;
-import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.cluster.IStateStorage;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.cluster.VersionedData;
@@ -70,14 +69,12 @@
 import org.apache.storm.serialization.KryoTupleSerializer;
 import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
 import org.apache.storm.shade.com.google.common.collect.Sets;
-import org.apache.storm.shade.org.apache.commons.lang.Validate;
 import org.apache.storm.task.WorkerTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.SupervisorClient;
 import org.apache.storm.utils.SupervisorIfaceFactory;
 import org.apache.storm.utils.ThriftTopologyUtils;
 import org.apache.storm.utils.Utils;
@@ -569,7 +566,7 @@
         queue.recordMsgDrop();
         LOG.warn(
             "Dropping message as overflow threshold has reached for Q = {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}",
-            queue.getName(), queue.getOverflowCount(), dropCount, tuple);
+            queue.getQueueName(), queue.getOverflowCount(), dropCount, tuple);
     }
 
     public void checkSerialize(KryoTupleSerializer serializer, AddressedTuple tuple) {
@@ -679,7 +676,7 @@
         Map<List<Long>, JCQueue> receiveQueueMap = new HashMap<>();
         for (List<Long> executor : executors) {
             List<Integer> taskIds = StormCommon.executorIdToTasks(executor);
-            receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(),
+            receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(), "receive-queue",
                                                       recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
                 this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, taskIds, this.getPort(), metricRegistry));
 
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
index 68f4d82..f2da7bc 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
@@ -66,7 +66,8 @@
                                                + Config.TOPOLOGY_TRANSFER_BUFFER_SIZE + ":" + xferQueueSz);
         }
 
-        this.transferQueue = new JCQueue("worker-transfer-queue", xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy,
+        this.transferQueue = new JCQueue("worker-transfer-queue", "worker-transfer-queue",
+            xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy,
             workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, Collections.singletonList(-1), workerState.getPort(),
             workerState.getMetricRegistry());
     }
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 6d5b9f2..d45042b 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -275,7 +275,7 @@
 
         TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
         if (isDebug) {
-            LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
+            LOG.info("Processing received TUPLE: {} for TASK: {} ", tuple, taskId);
         }
 
         try {
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
index 542dd9c..5cc5c05 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
@@ -63,7 +63,7 @@
             // Doing nothing (probably due to an oom issue) and hoping Utils.handleUncaughtException will handle it
         }
         try {
-            Utils.handleUncaughtException(cause, ALLOWED_EXCEPTIONS);
+            Utils.handleUncaughtException(cause, ALLOWED_EXCEPTIONS, false);
             ctx.close();
         } catch (Error error) {
             LOG.info("Received error in netty thread.. terminating server...");
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index f907a23..f7f866a 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -13,9 +13,7 @@
 package org.apache.storm.security.auth.kerberos;
 
 import com.codahale.metrics.Gauge;
-import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
-import java.security.Principal;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -210,35 +208,16 @@
                 return;
             }
 
-            // We are just trying to do the following:
-            //
-            // Configuration conf = new Configuration();
-            // HadoopKerberosName.setConfiguration(conf);
-            // subject.getPrincipals().add(new User(tgt.getClient().toString(), AuthenticationMethod.KERBEROS, null));
+            LOG.info("Invoking Hadoop UserGroupInformation.loginUserFromSubject.");
+            Method login = ugi.getMethod("loginUserFromSubject", Subject.class);
+            login.invoke(null, subject);
 
-            Class<?> confClass = Class.forName("org.apache.hadoop.conf.Configuration");
-            Constructor confCons = confClass.getConstructor();
-            Object conf = confCons.newInstance();
-            Class<?> hknClass = Class.forName("org.apache.hadoop.security.HadoopKerberosName");
-            Method hknSetConf = hknClass.getMethod("setConfiguration", confClass);
-            hknSetConf.invoke(null, conf);
-
-            Class<?> authMethodClass = Class.forName("org.apache.hadoop.security.UserGroupInformation$AuthenticationMethod");
-            Object kerbAuthMethod = null;
-            for (Object authMethod : authMethodClass.getEnumConstants()) {
-                if ("KERBEROS".equals(authMethod.toString())) {
-                    kerbAuthMethod = authMethod;
-                    break;
-                }
-            }
-
-            Class<?> userClass = Class.forName("org.apache.hadoop.security.User");
-            Constructor userCons = userClass.getConstructor(String.class, authMethodClass, LoginContext.class);
-            userCons.setAccessible(true);
-            String name = getTGT(subject).getClient().toString();
-            Object user = userCons.newInstance(name, kerbAuthMethod, null);
-            subject.getPrincipals().add((Principal) user);
-
+            //Refer to STORM-3606 for details
+            LOG.warn("UserGroupInformation.loginUserFromSubject will spawn a TGT renewal thread (\"TGT Renewer for <username>\") "
+                    + "to execute \"kinit -R\" command some time before the current TGT expires. "
+                    + "It will fail because TGT is not in the local TGT cache and the thread will eventually abort. "
+                    + "Exceptions from this TGT renewal thread can be ignored. Note: TGT for the Worker is kept in memory. "
+                    + "Please refer to STORM-3606 for detailed explanations");
         } catch (Exception e) {
             LOG.error("Something went wrong while trying to initialize Hadoop through reflection. This version of hadoop "
                      + "may not be compatible.", e);
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
index 4a40f82..f330b42 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
@@ -44,15 +44,16 @@
     private final IWaitStrategy backPressureWaitStrategy;
     private final String queueName;
 
-    public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy,
-                   String topologyId, String componentId, List<Integer> taskIds, int port, StormMetricRegistry metricRegistry) {
+    public JCQueue(String queueName, String metricNamePrefix, int size, int overflowLimit, int producerBatchSz,
+                   IWaitStrategy backPressureWaitStrategy, String topologyId, String componentId, List<Integer> taskIds,
+                   int port, StormMetricRegistry metricRegistry) {
         this.queueName = queueName;
         this.overflowLimit = overflowLimit;
         this.recvQueue = new MpscArrayQueue<>(size);
         this.overflowQ = new MpscUnboundedArrayQueue<>(size);
 
         for (Integer taskId : taskIds) {
-            this.jcqMetrics.add(new JCQueueMetrics(queueName, topologyId, componentId, taskId, port,
+            this.jcqMetrics.add(new JCQueueMetrics(metricNamePrefix, topologyId, componentId, taskId, port,
                     metricRegistry, recvQueue, overflowQ));
         }
 
@@ -61,7 +62,7 @@
         this.backPressureWaitStrategy = backPressureWaitStrategy;
     }
 
-    public String getName() {
+    public String getQueueName() {
         return queueName;
     }
 
@@ -282,7 +283,8 @@
                     jcQueueMetric.notifyInsertFailure();
                 }
                 if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop
-                    LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", queue.getName());
+                    LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait",
+                        queue.getQueueName());
                 }
 
                 idleCount = queue.backPressureWaitStrategy.idle(idleCount);
@@ -372,7 +374,8 @@
                     jcQueueMetric.notifyInsertFailure();
                 }
                 if (retryCount == 0) { // check avoids multiple log msgs when in a idle loop
-                    LOG.debug("Experiencing Back Pressure when flushing batch to Q: {}. Entering BackPressure Wait.", queue.getName());
+                    LOG.debug("Experiencing Back Pressure when flushing batch to Q: '{}'. Entering BackPressure Wait.",
+                        queue.getQueueName());
                 }
                 retryCount = queue.backPressureWaitStrategy.idle(retryCount);
                 if (Thread.interrupted()) {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
index eec5b33..6a6a56f 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
@@ -32,7 +32,7 @@
     private final RateTracker insertFailuresTracker = new RateTracker(10000, 10);
     private final AtomicLong droppedMessages = new AtomicLong(0);
 
-    public JCQueueMetrics(String queueName, String topologyId, String componentId, int taskId, int port,
+    public JCQueueMetrics(String metricNamePrefix, String topologyId, String componentId, int taskId, int port,
                           StormMetricRegistry metricRegistry, MpscArrayQueue<Object> receiveQ,
                           MpscUnboundedArrayQueue<Object> overflowQ) {
 
@@ -95,14 +95,14 @@
             }
         };
 
-        metricRegistry.gauge(queueName + "-capacity", cap, topologyId, componentId, taskId, port);
-        metricRegistry.gauge(queueName + "-pct_full", pctFull, topologyId, componentId, taskId, port);
-        metricRegistry.gauge(queueName + "-population", pop, topologyId, componentId, taskId, port);
-        metricRegistry.gauge(queueName + "-arrival_rate_secs", arrivalRate, topologyId, componentId, taskId, port);
-        metricRegistry.gauge(queueName + "-sojourn_time_ms", sojourn, topologyId, componentId, taskId, port);
-        metricRegistry.gauge(queueName + "-insert_failures", insertFailures, topologyId, componentId, taskId, port);
-        metricRegistry.gauge(queueName + "-dropped_messages", dropped, topologyId, componentId, taskId, port);
-        metricRegistry.gauge(queueName + "-overflow", overflow, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-capacity", cap, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-pct_full", pctFull, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-population", pop, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-arrival_rate_secs", arrivalRate, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-sojourn_time_ms", sojourn, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-insert_failures", insertFailures, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-dropped_messages", dropped, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-overflow", overflow, topologyId, componentId, taskId, port);
     }
 
     public void notifyArrivals(long counts) {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index ff656fa..fec30cc 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -630,11 +630,12 @@
                 && !((String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
     }
 
-    public static void handleUncaughtException(Throwable t) {
-        handleUncaughtException(t, defaultAllowedExceptions);
-    }
-
-    public static void handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions) {
+    /**
+     * Handles uncaught exceptions.
+     *
+     * @param worker true if this is for handling worker exceptions
+     */
+    public static void handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions, boolean worker) {
         if (t != null) {
             if (t instanceof OutOfMemoryError) {
                 try {
@@ -651,10 +652,38 @@
             return;
         }
 
+        if (worker && isAllowedWorkerException(t)) {
+            LOG.info("Swallowing {} {}", t.getClass(), t);
+            return;
+        }
+
         //Running in daemon mode, we would pass Error to calling thread.
         throw new Error(t);
     }
 
+    public static void handleUncaughtException(Throwable t) {
+        handleUncaughtException(t, defaultAllowedExceptions, false);
+    }
+
+    public static void handleWorkerUncaughtException(Throwable t) {
+        handleUncaughtException(t, defaultAllowedExceptions, true);
+    }
+
+    // Hadoop UserGroupInformation can launch an autorenewal thread that can cause a NullPointerException
+    // for workers.  See STORM-3606 for an explanation.
+    private static boolean isAllowedWorkerException(Throwable t) {
+        if (t instanceof NullPointerException) {
+            StackTraceElement[] stackTrace = t.getStackTrace();
+            for (StackTraceElement trace : stackTrace) {
+                if (trace.getClassName().startsWith("org.apache.hadoop.security.UserGroupInformation")
+                        && trace.getMethodName().equals("run")) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     public static byte[] thriftSerialize(TBase t) {
         try {
             TSerializer ser = threadSer.get();
@@ -1020,11 +1049,26 @@
             }
         };
     }
-    
+
+    public static UncaughtExceptionHandler createWorkerUncaughtExceptionHandler() {
+        return (thread, thrown) -> {
+            try {
+                handleWorkerUncaughtException(thrown);
+            } catch (Error err) {
+                LOG.error("Received error in thread {}.. terminating worker...", thread.getName(), err);
+                Runtime.getRuntime().exit(-2);
+            }
+        };
+    }
+
     public static void setupDefaultUncaughtExceptionHandler() {
         Thread.setDefaultUncaughtExceptionHandler(createDefaultUncaughtExceptionHandler());
     }
 
+    public static void setupWorkerUncaughtExceptionHandler() {
+        Thread.setDefaultUncaughtExceptionHandler(createWorkerUncaughtExceptionHandler());
+    }
+
     /**
      * parses the arguments to extract jvm heap memory size in MB.
      *
diff --git a/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java b/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java
index 03240c8..8e493ef 100644
--- a/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java
@@ -95,7 +95,7 @@
         //taskId for worker transfer queue should be -1.
         //But there is already one worker transfer queue initialized by WorkerTransfer class (taskId=-1).
         //However the taskId is only used for metrics and it is not important here. Making it -100 to avoid collision.
-        transferQueue = new JCQueue("worker-transfer-queue", 1024, 0, 1, new WaitStrategyPark(100),
+        transferQueue = new JCQueue("worker-transfer-queue", "worker-transfer-queue", 1024, 0, 1, new WaitStrategyPark(100),
             workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, Collections.singletonList(-100), workerState.getPort(),
             workerState.getMetricRegistry());
 
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
index 07a9123..91e26c9 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
@@ -22,7 +22,7 @@
 public class JCQueueBackpressureTest {
     
     private static JCQueue createQueue(String name, int queueSize) {
-        return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
+        return new JCQueue(name, name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
     }
 
     @Test
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
index 4a98937..1d093af 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
@@ -157,7 +157,7 @@
     }
 
     private JCQueue createQueue(String name, int batchSize, int queueSize) {
-        return new JCQueue(name, queueSize, 0, batchSize, waitStrategy, "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
+        return new JCQueue(name, name, queueSize, 0, batchSize, waitStrategy, "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
     }
 
     private static class IncProducer implements Runnable {
diff --git a/storm-client/src/resources/mockito-extensions/org.mockito.plugins.MockMaker b/storm-client/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
similarity index 100%
rename from storm-client/src/resources/mockito-extensions/org.mockito.plugins.MockMaker
rename to storm-client/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index db548bc..e14fb29 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -57,8 +57,6 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.UnaryOperator;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import javax.security.auth.Subject;
 
@@ -1123,6 +1121,21 @@
         ret.put(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS));
         ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, mergedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
 
+        // storm.messaging.netty.authentication is about inter-worker communication
+        // enforce netty authentication when either topo or daemon set it to true
+        boolean enforceNettyAuth = false;
+        if (!topoConf.containsKey(Config.STORM_MESSAGING_NETTY_AUTHENTICATION)) {
+            enforceNettyAuth = (Boolean) conf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+        } else {
+            enforceNettyAuth = (Boolean) topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION)
+                                || (Boolean) conf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+        }
+        LOG.debug("For netty authentication, topo conf is: {}, cluster conf is: {}, Enforce netty auth: {}",
+            topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION),
+            conf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION),
+            enforceNettyAuth);
+        ret.put(Config.STORM_MESSAGING_NETTY_AUTHENTICATION, enforceNettyAuth);
+
         // Don't allow topoConf to override various cluster-specific properties.
         // Specifically adding the cluster settings to the topoConf here will make sure these settings
         // also override the subsequently generated conf picked up locally on the classpath.
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index ce597de..c72b4cb 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -181,11 +181,11 @@
             throw Utils.wrapInRuntime(e);
         }
 
-        this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+        this.heartbeatTimer = new StormTimer("HBTimer", new DefaultUncaughtExceptionHandler());
 
-        this.workerHeartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+        this.workerHeartbeatTimer = new StormTimer("WorkerHBTimer", new DefaultUncaughtExceptionHandler());
 
-        this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+        this.eventTimer = new StormTimer("EventTimer", new DefaultUncaughtExceptionHandler());
         
         this.supervisorThriftInterface = createSupervisorIface();
     }