Merge pull request #3294 from agresch/agresch_storm_3648
STORM-3648 add meter to track worker heartbeat rate
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
index 1843755..2cbee19 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
@@ -47,8 +47,8 @@
private static void ackingProducerSimulation() {
WaitStrategyPark ws = new WaitStrategyPark(100);
StormMetricRegistry registry = new StormMetricRegistry();
- JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
- JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+ JCQueue spoutQ = new JCQueue("spoutQ", "spoutQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+ JCQueue ackQ = new JCQueue("ackQ", "ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ);
final Acker acker = new Acker(ackQ, spoutQ);
@@ -59,9 +59,9 @@
private static void producerFwdConsumer(int prodBatchSz) {
WaitStrategyPark ws = new WaitStrategyPark(100);
StormMetricRegistry registry = new StormMetricRegistry();
- JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test",
+ JCQueue q1 = new JCQueue("q1", "q1", 1024, 0, prodBatchSz, ws, "test", "test",
Collections.singletonList(1000), 1000, registry);
- JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+ JCQueue q2 = new JCQueue("q2", "q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
final Producer prod = new Producer(q1);
final Forwarder fwd = new Forwarder(q1, q2);
@@ -72,7 +72,7 @@
private static void oneProducer1Consumer(int prodBatchSz) {
- JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+ JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
Collections.singletonList(1000), 1000, new StormMetricRegistry());
final Producer prod1 = new Producer(q1);
@@ -82,7 +82,7 @@
}
private static void twoProducer1Consumer(int prodBatchSz) {
- JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+ JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
Collections.singletonList(1000), 1000, new StormMetricRegistry());
final Producer prod1 = new Producer(q1);
@@ -93,7 +93,7 @@
}
private static void threeProducer1Consumer(int prodBatchSz) {
- JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+ JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
Collections.singletonList(1000), 1000, new StormMetricRegistry());
final Producer prod1 = new Producer(q1);
@@ -108,8 +108,8 @@
private static void oneProducer2Consumers(int prodBatchSz) {
WaitStrategyPark ws = new WaitStrategyPark(100);
StormMetricRegistry registry = new StormMetricRegistry();
- JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
- JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+ JCQueue q1 = new JCQueue("q1", "q1", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+ JCQueue q2 = new JCQueue("q2", "q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
final Producer2 prod1 = new Producer2(q1, q2);
final Consumer cons1 = new Consumer(q1);
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index c27b468..93c5499 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -385,12 +385,22 @@
/**
* A list of host names that this topology would prefer to be scheduled on (no guarantee is given though). This is intended for
* debugging only.
+ *
+ * <p>Favored nodes are moved to the front of the node selection list.
+ * If the same node is also present in {@link #TOPOLOGY_SCHEDULER_UNFAVORED_NODES}
+ * then the node is considered only as a favored node and is removed from the unfavored list.
+ * </p>
*/
@IsStringList
public static final String TOPOLOGY_SCHEDULER_FAVORED_NODES = "topology.scheduler.favored.nodes";
/**
* A list of host names that this topology would prefer to NOT be scheduled on (no guarantee is given though). This is intended for
* debugging only.
+ *
+ * <p>Unfavored nodes are moved to the end of the node selection list.
+ * If the same node is also present in {@link #TOPOLOGY_SCHEDULER_FAVORED_NODES}
+ * then the node is considered only as a favored node and is removed from the unfavored list.
+ * </p>
*/
@IsStringList
public static final String TOPOLOGY_SCHEDULER_UNFAVORED_NODES = "topology.scheduler.unfavored.nodes";
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index ee8e005..989d464 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -148,7 +148,7 @@
String portStr = args[3];
String workerId = args[4];
Map<String, Object> conf = ConfigUtils.readStormConfig();
- Utils.setupDefaultUncaughtExceptionHandler();
+ Utils.setupWorkerUncaughtExceptionHandler();
StormCommon.validateDistributedMode(conf);
int supervisorPortInt = Integer.parseInt(supervisorPort);
Worker worker = new Worker(conf, null, stormId, assignmentId, supervisorPortInt, Integer.parseInt(portStr), workerId);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 70840ac..db0c0e5 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -36,7 +36,6 @@
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.StormTimer;
-import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStateStorage;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.cluster.VersionedData;
@@ -70,14 +69,12 @@
import org.apache.storm.serialization.KryoTupleSerializer;
import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
import org.apache.storm.shade.com.google.common.collect.Sets;
-import org.apache.storm.shade.org.apache.commons.lang.Validate;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.SupervisorClient;
import org.apache.storm.utils.SupervisorIfaceFactory;
import org.apache.storm.utils.ThriftTopologyUtils;
import org.apache.storm.utils.Utils;
@@ -569,7 +566,7 @@
queue.recordMsgDrop();
LOG.warn(
"Dropping message as overflow threshold has reached for Q = {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}",
- queue.getName(), queue.getOverflowCount(), dropCount, tuple);
+ queue.getQueueName(), queue.getOverflowCount(), dropCount, tuple);
}
public void checkSerialize(KryoTupleSerializer serializer, AddressedTuple tuple) {
@@ -679,7 +676,7 @@
Map<List<Long>, JCQueue> receiveQueueMap = new HashMap<>();
for (List<Long> executor : executors) {
List<Integer> taskIds = StormCommon.executorIdToTasks(executor);
- receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(),
+ receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(), "receive-queue",
recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, taskIds, this.getPort(), metricRegistry));
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
index 68f4d82..f2da7bc 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
@@ -66,7 +66,8 @@
+ Config.TOPOLOGY_TRANSFER_BUFFER_SIZE + ":" + xferQueueSz);
}
- this.transferQueue = new JCQueue("worker-transfer-queue", xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy,
+ this.transferQueue = new JCQueue("worker-transfer-queue", "worker-transfer-queue",
+ xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy,
workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, Collections.singletonList(-1), workerState.getPort(),
workerState.getMetricRegistry());
}
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 6d5b9f2..d45042b 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -275,7 +275,7 @@
TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
if (isDebug) {
- LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
+ LOG.info("Processing received TUPLE: {} for TASK: {} ", tuple, taskId);
}
try {
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
index 542dd9c..5cc5c05 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
@@ -63,7 +63,7 @@
// Doing nothing (probably due to an oom issue) and hoping Utils.handleUncaughtException will handle it
}
try {
- Utils.handleUncaughtException(cause, ALLOWED_EXCEPTIONS);
+ Utils.handleUncaughtException(cause, ALLOWED_EXCEPTIONS, false);
ctx.close();
} catch (Error error) {
LOG.info("Received error in netty thread.. terminating server...");
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index f907a23..f7f866a 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -13,9 +13,7 @@
package org.apache.storm.security.auth.kerberos;
import com.codahale.metrics.Gauge;
-import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
-import java.security.Principal;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -210,35 +208,16 @@
return;
}
- // We are just trying to do the following:
- //
- // Configuration conf = new Configuration();
- // HadoopKerberosName.setConfiguration(conf);
- // subject.getPrincipals().add(new User(tgt.getClient().toString(), AuthenticationMethod.KERBEROS, null));
+ LOG.info("Invoking Hadoop UserGroupInformation.loginUserFromSubject.");
+ Method login = ugi.getMethod("loginUserFromSubject", Subject.class);
+ login.invoke(null, subject);
- Class<?> confClass = Class.forName("org.apache.hadoop.conf.Configuration");
- Constructor confCons = confClass.getConstructor();
- Object conf = confCons.newInstance();
- Class<?> hknClass = Class.forName("org.apache.hadoop.security.HadoopKerberosName");
- Method hknSetConf = hknClass.getMethod("setConfiguration", confClass);
- hknSetConf.invoke(null, conf);
-
- Class<?> authMethodClass = Class.forName("org.apache.hadoop.security.UserGroupInformation$AuthenticationMethod");
- Object kerbAuthMethod = null;
- for (Object authMethod : authMethodClass.getEnumConstants()) {
- if ("KERBEROS".equals(authMethod.toString())) {
- kerbAuthMethod = authMethod;
- break;
- }
- }
-
- Class<?> userClass = Class.forName("org.apache.hadoop.security.User");
- Constructor userCons = userClass.getConstructor(String.class, authMethodClass, LoginContext.class);
- userCons.setAccessible(true);
- String name = getTGT(subject).getClient().toString();
- Object user = userCons.newInstance(name, kerbAuthMethod, null);
- subject.getPrincipals().add((Principal) user);
-
+ //Refer to STORM-3606 for details
+ LOG.warn("UserGroupInformation.loginUserFromSubject will spawn a TGT renewal thread (\"TGT Renewer for <username>\") "
+ + "to execute \"kinit -R\" command some time before the current TGT expires. "
+ + "It will fail because TGT is not in the local TGT cache and the thread will eventually abort. "
+ + "Exceptions from this TGT renewal thread can be ignored. Note: TGT for the Worker is kept in memory. "
+ + "Please refer to STORM-3606 for detailed explanations");
} catch (Exception e) {
LOG.error("Something went wrong while trying to initialize Hadoop through reflection. This version of hadoop "
+ "may not be compatible.", e);
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
index 4a40f82..f330b42 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
@@ -44,15 +44,16 @@
private final IWaitStrategy backPressureWaitStrategy;
private final String queueName;
- public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy,
- String topologyId, String componentId, List<Integer> taskIds, int port, StormMetricRegistry metricRegistry) {
+ public JCQueue(String queueName, String metricNamePrefix, int size, int overflowLimit, int producerBatchSz,
+ IWaitStrategy backPressureWaitStrategy, String topologyId, String componentId, List<Integer> taskIds,
+ int port, StormMetricRegistry metricRegistry) {
this.queueName = queueName;
this.overflowLimit = overflowLimit;
this.recvQueue = new MpscArrayQueue<>(size);
this.overflowQ = new MpscUnboundedArrayQueue<>(size);
for (Integer taskId : taskIds) {
- this.jcqMetrics.add(new JCQueueMetrics(queueName, topologyId, componentId, taskId, port,
+ this.jcqMetrics.add(new JCQueueMetrics(metricNamePrefix, topologyId, componentId, taskId, port,
metricRegistry, recvQueue, overflowQ));
}
@@ -61,7 +62,7 @@
this.backPressureWaitStrategy = backPressureWaitStrategy;
}
- public String getName() {
+ public String getQueueName() {
return queueName;
}
@@ -282,7 +283,8 @@
jcQueueMetric.notifyInsertFailure();
}
if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop
- LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", queue.getName());
+ LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait",
+ queue.getQueueName());
}
idleCount = queue.backPressureWaitStrategy.idle(idleCount);
@@ -372,7 +374,8 @@
jcQueueMetric.notifyInsertFailure();
}
if (retryCount == 0) { // check avoids multiple log msgs when in a idle loop
- LOG.debug("Experiencing Back Pressure when flushing batch to Q: {}. Entering BackPressure Wait.", queue.getName());
+ LOG.debug("Experiencing Back Pressure when flushing batch to Q: '{}'. Entering BackPressure Wait.",
+ queue.getQueueName());
}
retryCount = queue.backPressureWaitStrategy.idle(retryCount);
if (Thread.interrupted()) {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
index eec5b33..6a6a56f 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
@@ -32,7 +32,7 @@
private final RateTracker insertFailuresTracker = new RateTracker(10000, 10);
private final AtomicLong droppedMessages = new AtomicLong(0);
- public JCQueueMetrics(String queueName, String topologyId, String componentId, int taskId, int port,
+ public JCQueueMetrics(String metricNamePrefix, String topologyId, String componentId, int taskId, int port,
StormMetricRegistry metricRegistry, MpscArrayQueue<Object> receiveQ,
MpscUnboundedArrayQueue<Object> overflowQ) {
@@ -95,14 +95,14 @@
}
};
- metricRegistry.gauge(queueName + "-capacity", cap, topologyId, componentId, taskId, port);
- metricRegistry.gauge(queueName + "-pct_full", pctFull, topologyId, componentId, taskId, port);
- metricRegistry.gauge(queueName + "-population", pop, topologyId, componentId, taskId, port);
- metricRegistry.gauge(queueName + "-arrival_rate_secs", arrivalRate, topologyId, componentId, taskId, port);
- metricRegistry.gauge(queueName + "-sojourn_time_ms", sojourn, topologyId, componentId, taskId, port);
- metricRegistry.gauge(queueName + "-insert_failures", insertFailures, topologyId, componentId, taskId, port);
- metricRegistry.gauge(queueName + "-dropped_messages", dropped, topologyId, componentId, taskId, port);
- metricRegistry.gauge(queueName + "-overflow", overflow, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-capacity", cap, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-pct_full", pctFull, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-population", pop, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-arrival_rate_secs", arrivalRate, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-sojourn_time_ms", sojourn, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-insert_failures", insertFailures, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-dropped_messages", dropped, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-overflow", overflow, topologyId, componentId, taskId, port);
}
public void notifyArrivals(long counts) {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index ff656fa..fec30cc 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -630,11 +630,12 @@
&& !((String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME)).isEmpty());
}
- public static void handleUncaughtException(Throwable t) {
- handleUncaughtException(t, defaultAllowedExceptions);
- }
-
- public static void handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions) {
+ /**
+ * Handles uncaught exceptions.
+ *
+ * @param worker true if this is for handling worker exceptions
+ */
+ public static void handleUncaughtException(Throwable t, Set<Class<?>> allowedExceptions, boolean worker) {
if (t != null) {
if (t instanceof OutOfMemoryError) {
try {
@@ -651,10 +652,38 @@
return;
}
+ if (worker && isAllowedWorkerException(t)) {
+ LOG.info("Swallowing {} {}", t.getClass(), t);
+ return;
+ }
+
//Running in daemon mode, we would pass Error to calling thread.
throw new Error(t);
}
+ public static void handleUncaughtException(Throwable t) {
+ handleUncaughtException(t, defaultAllowedExceptions, false);
+ }
+
+ public static void handleWorkerUncaughtException(Throwable t) {
+ handleUncaughtException(t, defaultAllowedExceptions, true);
+ }
+
+ // Hadoop UserGroupInformation can launch an autorenewal thread that can cause a NullPointerException
+ // for workers. See STORM-3606 for an explanation.
+ private static boolean isAllowedWorkerException(Throwable t) {
+ if (t instanceof NullPointerException) {
+ StackTraceElement[] stackTrace = t.getStackTrace();
+ for (StackTraceElement trace : stackTrace) {
+ if (trace.getClassName().startsWith("org.apache.hadoop.security.UserGroupInformation")
+ && trace.getMethodName().equals("run")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
public static byte[] thriftSerialize(TBase t) {
try {
TSerializer ser = threadSer.get();
@@ -1020,11 +1049,26 @@
}
};
}
-
+
+ public static UncaughtExceptionHandler createWorkerUncaughtExceptionHandler() {
+ return (thread, thrown) -> {
+ try {
+ handleWorkerUncaughtException(thrown);
+ } catch (Error err) {
+ LOG.error("Received error in thread {}.. terminating worker...", thread.getName(), err);
+ Runtime.getRuntime().exit(-2);
+ }
+ };
+ }
+
public static void setupDefaultUncaughtExceptionHandler() {
Thread.setDefaultUncaughtExceptionHandler(createDefaultUncaughtExceptionHandler());
}
+ public static void setupWorkerUncaughtExceptionHandler() {
+ Thread.setDefaultUncaughtExceptionHandler(createWorkerUncaughtExceptionHandler());
+ }
+
/**
* parses the arguments to extract jvm heap memory size in MB.
*
diff --git a/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java b/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java
index 03240c8..8e493ef 100644
--- a/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java
@@ -95,7 +95,7 @@
//taskId for worker transfer queue should be -1.
//But there is already one worker transfer queue initialized by WorkerTransfer class (taskId=-1).
//However the taskId is only used for metrics and it is not important here. Making it -100 to avoid collision.
- transferQueue = new JCQueue("worker-transfer-queue", 1024, 0, 1, new WaitStrategyPark(100),
+ transferQueue = new JCQueue("worker-transfer-queue", "worker-transfer-queue", 1024, 0, 1, new WaitStrategyPark(100),
workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, Collections.singletonList(-100), workerState.getPort(),
workerState.getMetricRegistry());
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
index 07a9123..91e26c9 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
@@ -22,7 +22,7 @@
public class JCQueueBackpressureTest {
private static JCQueue createQueue(String name, int queueSize) {
- return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
+ return new JCQueue(name, name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
}
@Test
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
index 4a98937..1d093af 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
@@ -157,7 +157,7 @@
}
private JCQueue createQueue(String name, int batchSize, int queueSize) {
- return new JCQueue(name, queueSize, 0, batchSize, waitStrategy, "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
+ return new JCQueue(name, name, queueSize, 0, batchSize, waitStrategy, "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
}
private static class IncProducer implements Runnable {
diff --git a/storm-client/src/resources/mockito-extensions/org.mockito.plugins.MockMaker b/storm-client/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
similarity index 100%
rename from storm-client/src/resources/mockito-extensions/org.mockito.plugins.MockMaker
rename to storm-client/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index db548bc..e14fb29 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -57,8 +57,6 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.UnaryOperator;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
@@ -1123,6 +1121,21 @@
ret.put(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS));
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, mergedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
+ // storm.messaging.netty.authentication is about inter-worker communication
+ // enforce netty authentication when either topo or daemon set it to true
+ boolean enforceNettyAuth = false;
+ if (!topoConf.containsKey(Config.STORM_MESSAGING_NETTY_AUTHENTICATION)) {
+ enforceNettyAuth = (Boolean) conf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+ } else {
+ enforceNettyAuth = (Boolean) topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION)
+ || (Boolean) conf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
+ }
+ LOG.debug("For netty authentication, topo conf is: {}, cluster conf is: {}, Enforce netty auth: {}",
+ topoConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION),
+ conf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION),
+ enforceNettyAuth);
+ ret.put(Config.STORM_MESSAGING_NETTY_AUTHENTICATION, enforceNettyAuth);
+
// Don't allow topoConf to override various cluster-specific properties.
// Specifically adding the cluster settings to the topoConf here will make sure these settings
// also override the subsequently generated conf picked up locally on the classpath.
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
index ce597de..c72b4cb 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java
@@ -181,11 +181,11 @@
throw Utils.wrapInRuntime(e);
}
- this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+ this.heartbeatTimer = new StormTimer("HBTimer", new DefaultUncaughtExceptionHandler());
- this.workerHeartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+ this.workerHeartbeatTimer = new StormTimer("WorkerHBTimer", new DefaultUncaughtExceptionHandler());
- this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler());
+ this.eventTimer = new StormTimer("EventTimer", new DefaultUncaughtExceptionHandler());
this.supervisorThriftInterface = createSupervisorIface();
}