[STORM-3654] remove executor id from JCQueue receive-queue name (#3289)
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
index 1843755..2cbee19 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
@@ -47,8 +47,8 @@
private static void ackingProducerSimulation() {
WaitStrategyPark ws = new WaitStrategyPark(100);
StormMetricRegistry registry = new StormMetricRegistry();
- JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
- JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+ JCQueue spoutQ = new JCQueue("spoutQ", "spoutQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+ JCQueue ackQ = new JCQueue("ackQ", "ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ);
final Acker acker = new Acker(ackQ, spoutQ);
@@ -59,9 +59,9 @@
private static void producerFwdConsumer(int prodBatchSz) {
WaitStrategyPark ws = new WaitStrategyPark(100);
StormMetricRegistry registry = new StormMetricRegistry();
- JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test",
+ JCQueue q1 = new JCQueue("q1", "q1", 1024, 0, prodBatchSz, ws, "test", "test",
Collections.singletonList(1000), 1000, registry);
- JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+ JCQueue q2 = new JCQueue("q2", "q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
final Producer prod = new Producer(q1);
final Forwarder fwd = new Forwarder(q1, q2);
@@ -72,7 +72,7 @@
private static void oneProducer1Consumer(int prodBatchSz) {
- JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+ JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
Collections.singletonList(1000), 1000, new StormMetricRegistry());
final Producer prod1 = new Producer(q1);
@@ -82,7 +82,7 @@
}
private static void twoProducer1Consumer(int prodBatchSz) {
- JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+ JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
Collections.singletonList(1000), 1000, new StormMetricRegistry());
final Producer prod1 = new Producer(q1);
@@ -93,7 +93,7 @@
}
private static void threeProducer1Consumer(int prodBatchSz) {
- JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+ JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
Collections.singletonList(1000), 1000, new StormMetricRegistry());
final Producer prod1 = new Producer(q1);
@@ -108,8 +108,8 @@
private static void oneProducer2Consumers(int prodBatchSz) {
WaitStrategyPark ws = new WaitStrategyPark(100);
StormMetricRegistry registry = new StormMetricRegistry();
- JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
- JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+ JCQueue q1 = new JCQueue("q1", "q1", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+ JCQueue q2 = new JCQueue("q2", "q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
final Producer2 prod1 = new Producer2(q1, q2);
final Consumer cons1 = new Consumer(q1);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 70840ac..db0c0e5 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -36,7 +36,6 @@
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.StormTimer;
-import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStateStorage;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.cluster.VersionedData;
@@ -70,14 +69,12 @@
import org.apache.storm.serialization.KryoTupleSerializer;
import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
import org.apache.storm.shade.com.google.common.collect.Sets;
-import org.apache.storm.shade.org.apache.commons.lang.Validate;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.SupervisorClient;
import org.apache.storm.utils.SupervisorIfaceFactory;
import org.apache.storm.utils.ThriftTopologyUtils;
import org.apache.storm.utils.Utils;
@@ -569,7 +566,7 @@
queue.recordMsgDrop();
LOG.warn(
"Dropping message as overflow threshold has reached for Q = {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}",
- queue.getName(), queue.getOverflowCount(), dropCount, tuple);
+ queue.getQueueName(), queue.getOverflowCount(), dropCount, tuple);
}
public void checkSerialize(KryoTupleSerializer serializer, AddressedTuple tuple) {
@@ -679,7 +676,7 @@
Map<List<Long>, JCQueue> receiveQueueMap = new HashMap<>();
for (List<Long> executor : executors) {
List<Integer> taskIds = StormCommon.executorIdToTasks(executor);
- receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(),
+ receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(), "receive-queue",
recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, taskIds, this.getPort(), metricRegistry));
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
index 68f4d82..f2da7bc 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
@@ -66,7 +66,8 @@
+ Config.TOPOLOGY_TRANSFER_BUFFER_SIZE + ":" + xferQueueSz);
}
- this.transferQueue = new JCQueue("worker-transfer-queue", xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy,
+ this.transferQueue = new JCQueue("worker-transfer-queue", "worker-transfer-queue",
+ xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy,
workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, Collections.singletonList(-1), workerState.getPort(),
workerState.getMetricRegistry());
}
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 6d5b9f2..d45042b 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -275,7 +275,7 @@
TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
if (isDebug) {
- LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
+ LOG.info("Processing received TUPLE: {} for TASK: {} ", tuple, taskId);
}
try {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
index 4a40f82..f330b42 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
@@ -44,15 +44,16 @@
private final IWaitStrategy backPressureWaitStrategy;
private final String queueName;
- public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy,
- String topologyId, String componentId, List<Integer> taskIds, int port, StormMetricRegistry metricRegistry) {
+ public JCQueue(String queueName, String metricNamePrefix, int size, int overflowLimit, int producerBatchSz,
+ IWaitStrategy backPressureWaitStrategy, String topologyId, String componentId, List<Integer> taskIds,
+ int port, StormMetricRegistry metricRegistry) {
this.queueName = queueName;
this.overflowLimit = overflowLimit;
this.recvQueue = new MpscArrayQueue<>(size);
this.overflowQ = new MpscUnboundedArrayQueue<>(size);
for (Integer taskId : taskIds) {
- this.jcqMetrics.add(new JCQueueMetrics(queueName, topologyId, componentId, taskId, port,
+ this.jcqMetrics.add(new JCQueueMetrics(metricNamePrefix, topologyId, componentId, taskId, port,
metricRegistry, recvQueue, overflowQ));
}
@@ -61,7 +62,7 @@
this.backPressureWaitStrategy = backPressureWaitStrategy;
}
- public String getName() {
+ public String getQueueName() {
return queueName;
}
@@ -282,7 +283,8 @@
jcQueueMetric.notifyInsertFailure();
}
if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop
- LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", queue.getName());
+ LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait",
+ queue.getQueueName());
}
idleCount = queue.backPressureWaitStrategy.idle(idleCount);
@@ -372,7 +374,8 @@
jcQueueMetric.notifyInsertFailure();
}
if (retryCount == 0) { // check avoids multiple log msgs when in a idle loop
- LOG.debug("Experiencing Back Pressure when flushing batch to Q: {}. Entering BackPressure Wait.", queue.getName());
+ LOG.debug("Experiencing Back Pressure when flushing batch to Q: '{}'. Entering BackPressure Wait.",
+ queue.getQueueName());
}
retryCount = queue.backPressureWaitStrategy.idle(retryCount);
if (Thread.interrupted()) {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
index eec5b33..6a6a56f 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
@@ -32,7 +32,7 @@
private final RateTracker insertFailuresTracker = new RateTracker(10000, 10);
private final AtomicLong droppedMessages = new AtomicLong(0);
- public JCQueueMetrics(String queueName, String topologyId, String componentId, int taskId, int port,
+ public JCQueueMetrics(String metricNamePrefix, String topologyId, String componentId, int taskId, int port,
StormMetricRegistry metricRegistry, MpscArrayQueue<Object> receiveQ,
MpscUnboundedArrayQueue<Object> overflowQ) {
@@ -95,14 +95,14 @@
}
};
- metricRegistry.gauge(queueName + "-capacity", cap, topologyId, componentId, taskId, port);
- metricRegistry.gauge(queueName + "-pct_full", pctFull, topologyId, componentId, taskId, port);
- metricRegistry.gauge(queueName + "-population", pop, topologyId, componentId, taskId, port);
- metricRegistry.gauge(queueName + "-arrival_rate_secs", arrivalRate, topologyId, componentId, taskId, port);
- metricRegistry.gauge(queueName + "-sojourn_time_ms", sojourn, topologyId, componentId, taskId, port);
- metricRegistry.gauge(queueName + "-insert_failures", insertFailures, topologyId, componentId, taskId, port);
- metricRegistry.gauge(queueName + "-dropped_messages", dropped, topologyId, componentId, taskId, port);
- metricRegistry.gauge(queueName + "-overflow", overflow, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-capacity", cap, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-pct_full", pctFull, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-population", pop, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-arrival_rate_secs", arrivalRate, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-sojourn_time_ms", sojourn, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-insert_failures", insertFailures, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-dropped_messages", dropped, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(metricNamePrefix + "-overflow", overflow, topologyId, componentId, taskId, port);
}
public void notifyArrivals(long counts) {
diff --git a/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java b/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java
index 03240c8..8e493ef 100644
--- a/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java
@@ -95,7 +95,7 @@
//taskId for worker transfer queue should be -1.
//But there is already one worker transfer queue initialized by WorkerTransfer class (taskId=-1).
//However the taskId is only used for metrics and it is not important here. Making it -100 to avoid collision.
- transferQueue = new JCQueue("worker-transfer-queue", 1024, 0, 1, new WaitStrategyPark(100),
+ transferQueue = new JCQueue("worker-transfer-queue", "worker-transfer-queue", 1024, 0, 1, new WaitStrategyPark(100),
workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, Collections.singletonList(-100), workerState.getPort(),
workerState.getMetricRegistry());
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
index 07a9123..91e26c9 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
@@ -22,7 +22,7 @@
public class JCQueueBackpressureTest {
private static JCQueue createQueue(String name, int queueSize) {
- return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
+ return new JCQueue(name, name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
}
@Test
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
index 4a98937..1d093af 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
@@ -157,7 +157,7 @@
}
private JCQueue createQueue(String name, int batchSize, int queueSize) {
- return new JCQueue(name, queueSize, 0, batchSize, waitStrategy, "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
+ return new JCQueue(name, name, queueSize, 0, batchSize, waitStrategy, "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
}
private static class IncProducer implements Runnable {