Merge pull request #3277 from agresch/agresch_storm_3641
STORM-3641 upgrade metrics API for JCQueue
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
index 13d880b..1843755 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
@@ -18,11 +18,10 @@
package org.apache.storm.perf.queuetest;
-import java.util.concurrent.locks.LockSupport;
+import java.util.Collections;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.policy.WaitStrategyPark;
import org.apache.storm.utils.JCQueue;
-import org.apache.storm.utils.MutableLong;
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class JCQueuePerfTest {
@@ -48,8 +47,8 @@
private static void ackingProducerSimulation() {
WaitStrategyPark ws = new WaitStrategyPark(100);
StormMetricRegistry registry = new StormMetricRegistry();
- JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", "test", 1000, 1000, registry);
- JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", "test", 1000, 1000, registry);
+ JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+ JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ);
final Acker acker = new Acker(ackQ, spoutQ);
@@ -60,8 +59,9 @@
private static void producerFwdConsumer(int prodBatchSz) {
WaitStrategyPark ws = new WaitStrategyPark(100);
StormMetricRegistry registry = new StormMetricRegistry();
- JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000, registry);
- JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000, registry);
+ JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test",
+ Collections.singletonList(1000), 1000, registry);
+ JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
final Producer prod = new Producer(q1);
final Forwarder fwd = new Forwarder(q1, q2);
@@ -72,8 +72,8 @@
private static void oneProducer1Consumer(int prodBatchSz) {
- JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000,
- new StormMetricRegistry());
+ JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+ Collections.singletonList(1000), 1000, new StormMetricRegistry());
final Producer prod1 = new Producer(q1);
final Consumer cons1 = new Consumer(q1);
@@ -82,8 +82,8 @@
}
private static void twoProducer1Consumer(int prodBatchSz) {
- JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000,
- new StormMetricRegistry());
+ JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+ Collections.singletonList(1000), 1000, new StormMetricRegistry());
final Producer prod1 = new Producer(q1);
final Producer prod2 = new Producer(q1);
@@ -93,8 +93,8 @@
}
private static void threeProducer1Consumer(int prodBatchSz) {
- JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test", 1000, 1000,
- new StormMetricRegistry());
+ JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+ Collections.singletonList(1000), 1000, new StormMetricRegistry());
final Producer prod1 = new Producer(q1);
final Producer prod2 = new Producer(q1);
@@ -108,8 +108,8 @@
private static void oneProducer2Consumers(int prodBatchSz) {
WaitStrategyPark ws = new WaitStrategyPark(100);
StormMetricRegistry registry = new StormMetricRegistry();
- JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000, registry);
- JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", 1000, 1000, registry);
+ JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+ JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
final Producer2 prod1 = new Producer2(q1, q2);
final Consumer cons1 = new Consumer(q1);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
index 2d72e78..72d660f 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java
@@ -49,14 +49,6 @@
registerMetric("__send-iconnection", metric, topoConf, context);
}
- public static void registerQueueMetrics(Map<String, JCQueue> queues, Map<String, Object> topoConf, TopologyContext context) {
- for (Map.Entry<String, JCQueue> entry : queues.entrySet()) {
- String name = "__" + entry.getKey();
- IMetric metric = new StateMetric(entry.getValue());
- registerMetric(name, metric, topoConf, context);
- }
- }
-
public static void registerMetric(String name, IMetric metric, Map<String, Object> topoConf, TopologyContext context) {
int bucketSize = ((Number) topoConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)).intValue();
context.registerMetric(name, metric, bucketSize);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index eaab4e9..70840ac 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -240,11 +240,6 @@
this.receiver = this.mqContext.bind(topologyId, port, cb, newConnectionResponse);
}
- private static double getQueueLoad(JCQueue queue) {
- JCQueue.QueueMetrics queueMetrics = queue.getMetrics();
- return ((double) queueMetrics.population()) / queueMetrics.capacity();
- }
-
public static boolean isConnectionReady(IConnection connection) {
return !(connection instanceof ConnectionWithStatus)
|| ((ConnectionWithStatus) connection).status() == ConnectionWithStatus.Status.Ready;
@@ -475,7 +470,7 @@
Set<Integer> remoteTasks = Sets.difference(new HashSet<>(outboundTasks), new HashSet<>(localTaskIds));
Map<Integer, Double> localLoad = new HashMap<>();
for (IRunningExecutor exec : execs) {
- double receiveLoad = getQueueLoad(exec.getReceiveQueue());
+ double receiveLoad = exec.getReceiveQueue().getQueueLoad();
localLoad.put(exec.getExecutorId().get(0).intValue(), receiveLoad);
}
@@ -683,10 +678,10 @@
IWaitStrategy backPressureWaitStrategy = IWaitStrategy.createBackPressureWaitStrategy(topologyConf);
Map<List<Long>, JCQueue> receiveQueueMap = new HashMap<>();
for (List<Long> executor : executors) {
- int port = this.getPort();
+ List<Integer> taskIds = StormCommon.executorIdToTasks(executor);
receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(),
recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
- this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, this.getPort(), metricRegistry));
+ this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, taskIds, this.getPort(), metricRegistry));
}
return receiveQueueMap;
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
index 88f5921..916850e 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
@@ -18,6 +18,7 @@
package org.apache.storm.daemon.worker;
+import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -66,7 +67,7 @@
}
this.transferQueue = new JCQueue("worker-transfer-queue", xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy,
- workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, -1, workerState.getPort(),
+ workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, Collections.singletonList(-1), workerState.getPort(),
workerState.getMetricRegistry());
}
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index dd62e6a..68ea9da 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -122,9 +122,6 @@
((ICredentialsListener) boltObject).setCredentials(credentials);
}
if (Constants.SYSTEM_COMPONENT_ID.equals(componentId)) {
- Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue, "transfer", workerData.getTransferQueue());
- BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext);
-
Map<NodeInfo, IConnection> cachedNodePortToSocket = workerData.getCachedNodeToPortSocket().get();
BuiltinMetricsUtil.registerIconnectionClientMetrics(cachedNodePortToSocket, topoConf, userContext);
BuiltinMetricsUtil.registerIconnectionServerMetric(workerData.getReceiver(), topoConf, userContext);
@@ -138,9 +135,6 @@
}
}
}
- } else {
- Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue);
- BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, userContext);
}
this.outputCollector = new BoltOutputCollectorImpl(this, taskData, rand, hasEventLoggers, ackingEnabled, isDebug);
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index cb2af7f..6f4d7dd 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -140,8 +140,6 @@
this.outputCollectors.add(outputCollector);
builtInMetrics.registerAll(topoConf, taskData.getUserContext());
- Map<String, JCQueue> map = ImmutableMap.of("receive", receiveQueue);
- BuiltinMetricsUtil.registerQueueMetrics(map, topoConf, taskData.getUserContext());
if (spoutObject instanceof ICredentialsListener) {
((ICredentialsListener) spoutObject).setCredentials(credentials);
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java b/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java
deleted file mode 100644
index 82b9d13..0000000
--- a/storm-client/src/jvm/org/apache/storm/metrics2/JcMetrics.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.metrics2;
-
-import org.apache.storm.utils.JCQueue;
-
-public class JcMetrics {
- private final SimpleGauge<Long> capacity;
- private final SimpleGauge<Long> population;
-
- JcMetrics(SimpleGauge<Long> capacity,
- SimpleGauge<Long> population) {
- this.capacity = capacity;
- this.population = population;
- }
-
- public void setCapacity(Long capacity) {
- this.capacity.set(capacity);
- }
-
- public void setPopulation(Long population) {
- this.population.set(population);
- }
-
- public void set(JCQueue.QueueMetrics metrics) {
- this.capacity.set(metrics.capacity());
- this.population.set(metrics.population());
- }
-}
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 3401706..4d62cf4 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -67,10 +67,11 @@
return gauge;
}
- public JcMetrics jcMetrics(String name, String topologyId, String componentId, Integer taskId, Integer port) {
- SimpleGauge<Long> capacityGauge = gauge(0L, name + "-capacity", topologyId, componentId, taskId, port);
- SimpleGauge<Long> populationGauge = gauge(0L, name + "-population", topologyId, componentId, taskId, port);
- return new JcMetrics(capacityGauge, populationGauge);
+ public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId, String componentId, Integer taskId, Integer port) {
+ MetricNames metricNames = workerMetricName(name, topologyId, componentId, taskId, port);
+ gauge = registry.register(metricNames.getLongName(), gauge);
+ saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges);
+ return gauge;
}
public Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
index b0f2d9f..4a40f82 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
@@ -20,17 +20,9 @@
import java.io.Closeable;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.storm.metric.api.IStatefulObject;
-import org.apache.storm.metric.internal.RateTracker;
-import org.apache.storm.metrics2.JcMetrics;
+import java.util.List;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.policy.IWaitStrategy;
-import org.apache.storm.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.storm.shade.org.jctools.queues.MessagePassingQueue;
import org.apache.storm.shade.org.jctools.queues.MpscArrayQueue;
import org.apache.storm.shade.org.jctools.queues.MpscUnboundedArrayQueue;
@@ -38,17 +30,10 @@
import org.slf4j.LoggerFactory;
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
-public class JCQueue implements IStatefulObject, Closeable {
+public class JCQueue implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(JCQueue.class);
- private static final String PREFIX = "jc-";
- private static final ScheduledThreadPoolExecutor METRICS_REPORTER_EXECUTOR =
- new ScheduledThreadPoolExecutor(1,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setNameFormat(PREFIX + "metrics-reporter")
- .build());
private final ExitCondition continueRunning = () -> true;
- private final JcMetrics jcMetrics;
+ private final List<JCQueueMetrics> jcqMetrics = new ArrayList<>();
private final MpscArrayQueue<Object> recvQueue;
// only holds msgs from other workers (via WorkerTransfer), when recvQueue is full
private final MpscUnboundedArrayQueue<Object> overflowQ;
@@ -56,32 +41,24 @@
private final int producerBatchSz;
private final DirectInserter directInserter = new DirectInserter(this);
private final ThreadLocal<BatchInserter> thdLocalBatcher = new ThreadLocal<BatchInserter>(); // ensure 1 instance per producer thd.
- private final JCQueue.QueueMetrics metrics;
private final IWaitStrategy backPressureWaitStrategy;
private final String queueName;
public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy,
- String topologyId, String componentId, Integer taskId, int port, StormMetricRegistry metricRegistry) {
+ String topologyId, String componentId, List<Integer> taskIds, int port, StormMetricRegistry metricRegistry) {
this.queueName = queueName;
this.overflowLimit = overflowLimit;
this.recvQueue = new MpscArrayQueue<>(size);
this.overflowQ = new MpscUnboundedArrayQueue<>(size);
- this.metrics = new JCQueue.QueueMetrics();
- this.jcMetrics = metricRegistry.jcMetrics(queueName, topologyId, componentId, taskId, port);
+ for (Integer taskId : taskIds) {
+ this.jcqMetrics.add(new JCQueueMetrics(queueName, topologyId, componentId, taskId, port,
+ metricRegistry, recvQueue, overflowQ));
+ }
//The batch size can be no larger than half the full recvQueue size, to avoid contention issues.
this.producerBatchSz = Math.max(1, Math.min(producerBatchSz, size / 2));
this.backPressureWaitStrategy = backPressureWaitStrategy;
-
- if (!METRICS_REPORTER_EXECUTOR.isShutdown()) {
- METRICS_REPORTER_EXECUTOR.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- jcMetrics.set(metrics);
- }
- }, 15, 15, TimeUnit.SECONDS);
- }
}
public String getName() {
@@ -90,9 +67,9 @@
@Override
public void close() {
- //No need to block, the task run by the executor is safe to run even after metrics are closed
- METRICS_REPORTER_EXECUTOR.shutdown();
- metrics.close();
+ for (JCQueueMetrics jcQueueMetric : jcqMetrics) {
+ jcQueueMetric.close();
+ }
}
/**
@@ -118,6 +95,10 @@
return recvQueue.size() + overflowQ.size();
}
+ public double getQueueLoad() {
+ return ((double) recvQueue.size()) / recvQueue.capacity();
+ }
+
/**
* Non blocking. Returns immediately if Q is empty. Returns number of elements consumed from Q.
*/
@@ -149,7 +130,9 @@
// Non Blocking. returns true/false indicating success/failure. Fails if full.
private boolean tryPublishInternal(Object obj) {
if (recvQueue.offer(obj)) {
- metrics.notifyArrivals(1);
+ for (JCQueueMetrics jcQueueMetric : jcqMetrics) {
+ jcQueueMetric.notifyArrivals(1);
+ }
return true;
}
return false;
@@ -167,7 +150,9 @@
}
};
int count = recvQueue.fill(supplier, objs.size());
- metrics.notifyArrivals(count);
+ for (JCQueueMetrics jcQueueMetric : jcqMetrics) {
+ jcQueueMetric.notifyArrivals(count);
+ }
return count;
}
@@ -221,7 +206,9 @@
}
public void recordMsgDrop() {
- getMetrics().notifyDroppedMsg();
+ for (JCQueueMetrics jcQueueMetric : jcqMetrics) {
+ jcQueueMetric.notifyDroppedMsg();
+ }
}
public boolean isEmptyOverflow() {
@@ -254,16 +241,6 @@
return inserter.tryFlush();
}
- @Override
- public Object getState() {
- return metrics.getState();
- }
-
- //This method enables the metrics to be accessed from outside of the JCQueue class
- public JCQueue.QueueMetrics getMetrics() {
- return metrics;
- }
-
private interface Inserter {
// blocking call that can be interrupted using Thread.interrupt()
void publish(Object obj) throws InterruptedException;
@@ -301,7 +278,9 @@
boolean inserted = queue.tryPublishInternal(obj);
int idleCount = 0;
while (!inserted) {
- queue.metrics.notifyInsertFailure();
+ for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) {
+ jcQueueMetric.notifyInsertFailure();
+ }
if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop
LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", queue.getName());
}
@@ -322,7 +301,9 @@
public boolean tryPublish(Object obj) {
boolean inserted = queue.tryPublishInternal(obj);
if (!inserted) {
- queue.metrics.notifyInsertFailure();
+ for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) {
+ jcQueueMetric.notifyInsertFailure();
+ }
return false;
}
return true;
@@ -387,7 +368,9 @@
int publishCount = queue.tryPublishInternal(currentBatch);
int retryCount = 0;
while (publishCount == 0) { // retry till at least 1 element is drained
- queue.metrics.notifyInsertFailure();
+ for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) {
+ jcQueueMetric.notifyInsertFailure();
+ }
if (retryCount == 0) { // check avoids multiple log msgs when in a idle loop
LOG.debug("Experiencing Back Pressure when flushing batch to Q: {}. Entering BackPressure Wait.", queue.getName());
}
@@ -411,7 +394,9 @@
}
int publishCount = queue.tryPublishInternal(currentBatch);
if (publishCount == 0) {
- queue.metrics.notifyInsertFailure();
+ for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) {
+ jcQueueMetric.notifyInsertFailure();
+ }
return false;
} else {
currentBatch.subList(0, publishCount).clear();
@@ -419,67 +404,4 @@
}
}
} // class BatchInserter
-
- /**
- * This inner class provides methods to access the metrics of the JCQueue.
- */
- public class QueueMetrics implements Closeable {
- private final RateTracker arrivalsTracker = new RateTracker(10000, 10);
- private final RateTracker insertFailuresTracker = new RateTracker(10000, 10);
- private final AtomicLong droppedMessages = new AtomicLong(0);
-
- public long population() {
- return recvQueue.size();
- }
-
- public long capacity() {
- return recvQueue.capacity();
- }
-
- public Object getState() {
- Map<String, Object> state = new HashMap<>();
-
- final double arrivalRateInSecs = arrivalsTracker.reportRate();
-
- long tuplePop = population();
-
- // Assume the recvQueue is stable, in which the arrival rate is equal to the consumption rate.
- // If this assumption does not hold, the calculation of sojourn time should also consider
- // departure rate according to Queuing Theory.
- final double sojournTime = tuplePop / Math.max(arrivalRateInSecs, 0.00001) * 1000.0;
-
- long cap = capacity();
- float pctFull = (1.0F * tuplePop / cap);
-
- state.put("capacity", cap);
- state.put("pct_full", pctFull);
- state.put("population", tuplePop);
-
- state.put("arrival_rate_secs", arrivalRateInSecs);
- state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds
- state.put("insert_failures", insertFailuresTracker.reportRate());
- state.put("dropped_messages", droppedMessages);
- state.put("overflow", overflowQ.size());
- return state;
- }
-
- public void notifyArrivals(long counts) {
- arrivalsTracker.notify(counts);
- }
-
- public void notifyInsertFailure() {
- insertFailuresTracker.notify(1);
- }
-
- public void notifyDroppedMsg() {
- droppedMessages.incrementAndGet();
- }
-
- @Override
- public void close() {
- arrivalsTracker.close();
- insertFailuresTracker.close();
- }
-
- }
}
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
new file mode 100644
index 0000000..eec5b33
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.apache.storm.utils;
+
+import com.codahale.metrics.Gauge;
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.storm.metric.internal.RateTracker;
+import org.apache.storm.metrics2.StormMetricRegistry;
+import org.apache.storm.shade.org.jctools.queues.MpscArrayQueue;
+import org.apache.storm.shade.org.jctools.queues.MpscUnboundedArrayQueue;
+
+@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
+public class JCQueueMetrics implements Closeable {
+ private final RateTracker arrivalsTracker = new RateTracker(10000, 10);
+ private final RateTracker insertFailuresTracker = new RateTracker(10000, 10);
+ private final AtomicLong droppedMessages = new AtomicLong(0);
+
+ public JCQueueMetrics(String queueName, String topologyId, String componentId, int taskId, int port,
+ StormMetricRegistry metricRegistry, MpscArrayQueue<Object> receiveQ,
+ MpscUnboundedArrayQueue<Object> overflowQ) {
+
+ Gauge<Integer> cap = new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return receiveQ.capacity();
+ }
+ };
+
+ Gauge<Float> pctFull = new Gauge<Float>() {
+ @Override
+ public Float getValue() {
+ return (1.0F * receiveQ.size() / receiveQ.capacity());
+ }
+ };
+
+ Gauge<Integer> pop = new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return receiveQ.size();
+ }
+ };
+
+ Gauge<Double> arrivalRate = new Gauge<Double>() {
+ @Override
+ public Double getValue() {
+ return arrivalsTracker.reportRate();
+ }
+ };
+
+ Gauge<Double> sojourn = new Gauge<Double>() {
+ @Override
+ public Double getValue() {
+ // Assume the recvQueue is stable, in which the arrival rate is equal to the consumption rate.
+ // If this assumption does not hold, the calculation of sojourn time should also consider
+ // departure rate according to Queuing Theory.
+ return receiveQ.size() / Math.max(arrivalsTracker.reportRate(), 0.00001) * 1000.0;
+ }
+ };
+
+ Gauge<Double> insertFailures = new Gauge<Double>() {
+ @Override
+ public Double getValue() {
+ return insertFailuresTracker.reportRate();
+ }
+ };
+
+ Gauge<Long> dropped = new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return droppedMessages.get();
+ }
+ };
+
+ Gauge<Integer> overflow = new Gauge<Integer>() {
+ @Override
+ public Integer getValue() {
+ return overflowQ.size();
+ }
+ };
+
+ metricRegistry.gauge(queueName + "-capacity", cap, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(queueName + "-pct_full", pctFull, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(queueName + "-population", pop, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(queueName + "-arrival_rate_secs", arrivalRate, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(queueName + "-sojourn_time_ms", sojourn, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(queueName + "-insert_failures", insertFailures, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(queueName + "-dropped_messages", dropped, topologyId, componentId, taskId, port);
+ metricRegistry.gauge(queueName + "-overflow", overflow, topologyId, componentId, taskId, port);
+ }
+
+ public void notifyArrivals(long counts) {
+ arrivalsTracker.notify(counts);
+ }
+
+ public void notifyInsertFailure() {
+ insertFailuresTracker.notify(1);
+ }
+
+ public void notifyDroppedMsg() {
+ droppedMessages.incrementAndGet();
+ }
+
+ @Override
+ public void close() {
+ arrivalsTracker.close();
+ insertFailuresTracker.close();
+ }
+}
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
index 2cfe25d..07a9123 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
@@ -12,17 +12,17 @@
package org.apache.storm.utils;
+import java.util.Collections;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.policy.WaitStrategyPark;
import org.apache.storm.utils.JCQueue.Consumer;
import org.junit.Assert;
import org.junit.Test;
-
public class JCQueueBackpressureTest {
private static JCQueue createQueue(String name, int queueSize) {
- return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test", 1000, 1000, new StormMetricRegistry());
+ return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
}
@Test
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
index 0d597fc..4a98937 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
@@ -14,6 +14,7 @@
import static org.junit.Assert.assertFalse;
import java.time.Duration;
+import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.metrics2.StormMetricRegistry;
@@ -21,7 +22,6 @@
import org.apache.storm.policy.WaitStrategyPark;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
public class JCQueueTest {
@@ -157,7 +157,7 @@
}
private JCQueue createQueue(String name, int batchSize, int queueSize) {
- return new JCQueue(name, queueSize, 0, batchSize, waitStrategy, "test", "test", 1000, 1000, new StormMetricRegistry());
+ return new JCQueue(name, queueSize, 0, batchSize, waitStrategy, "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
}
private static class IncProducer implements Runnable {