[STORM-3684] use real compId instead of constant _system for receive-queue V2 metrics (#3320)
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 3110696..ab4a8b6 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -186,7 +186,9 @@
this.isWorkerActive = new CountDownLatch(1);
this.isTopologyActive = new AtomicBoolean(false);
this.stormComponentToDebug = new AtomicReference<>();
- this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, localExecutors);
+ this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf));
+ this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf);
+ this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, localExecutors, taskToComponent);
this.localTaskIds = new ArrayList<>();
this.taskToExecutorQueue = new HashMap<>();
this.blobToLastKnownVersion = new ConcurrentHashMap<>();
@@ -199,9 +201,7 @@
}
Collections.sort(localTaskIds);
this.topologyConf = topologyConf;
- this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf));
this.systemTopology = StormCommon.systemTopology(topologyConf, topology);
- this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf);
this.componentToStreamToFields = new HashMap<>();
for (String c : ThriftTopologyUtils.getComponentIds(systemTopology)) {
Map<String, Fields> streamToFields = new HashMap<>();
@@ -704,7 +704,8 @@
}
}
- private Map<List<Long>, JCQueue> mkReceiveQueueMap(Map<String, Object> topologyConf, Set<List<Long>> executors) {
+ private Map<List<Long>, JCQueue> mkReceiveQueueMap(Map<String, Object> topologyConf,
+ Set<List<Long>> executors, Map<Integer, String> taskToComponent) {
Integer recvQueueSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE));
Integer recvBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
Integer overflowLimit = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_OVERFLOW_LIMIT));
@@ -717,11 +718,19 @@
IWaitStrategy backPressureWaitStrategy = IWaitStrategy.createBackPressureWaitStrategy(topologyConf);
Map<List<Long>, JCQueue> receiveQueueMap = new HashMap<>();
+
for (List<Long> executor : executors) {
List<Integer> taskIds = StormCommon.executorIdToTasks(executor);
+ int taskId = taskIds.get(0);
+ String compId;
+ if (taskId == Constants.SYSTEM_TASK_ID) {
+ compId = Constants.SYSTEM_COMPONENT_ID;
+ } else {
+ compId = taskToComponent.get(taskId);
+ }
receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(), "receive-queue",
recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
- this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, taskIds, this.getPort(), metricRegistry));
+ this.getTopologyId(), compId, taskIds, this.getPort(), metricRegistry));
}
return receiveQueueMap;