[STORM-3684] use real compId instead of constant _system for receive-queue V2 metrics (#3320)

diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 3110696..ab4a8b6 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -186,7 +186,9 @@
         this.isWorkerActive = new CountDownLatch(1);
         this.isTopologyActive = new AtomicBoolean(false);
         this.stormComponentToDebug = new AtomicReference<>();
-        this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, localExecutors);
+        this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf));
+        this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf);
+        this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, localExecutors, taskToComponent);
         this.localTaskIds = new ArrayList<>();
         this.taskToExecutorQueue = new HashMap<>();
         this.blobToLastKnownVersion = new ConcurrentHashMap<>();
@@ -199,9 +201,7 @@
         }
         Collections.sort(localTaskIds);
         this.topologyConf = topologyConf;
-        this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf));
         this.systemTopology = StormCommon.systemTopology(topologyConf, topology);
-        this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf);
         this.componentToStreamToFields = new HashMap<>();
         for (String c : ThriftTopologyUtils.getComponentIds(systemTopology)) {
             Map<String, Fields> streamToFields = new HashMap<>();
@@ -704,7 +704,8 @@
         }
     }
 
-    private Map<List<Long>, JCQueue> mkReceiveQueueMap(Map<String, Object> topologyConf, Set<List<Long>> executors) {
+    private Map<List<Long>, JCQueue> mkReceiveQueueMap(Map<String, Object> topologyConf,
+                                                       Set<List<Long>> executors, Map<Integer, String> taskToComponent) {
         Integer recvQueueSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE));
         Integer recvBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
         Integer overflowLimit = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_OVERFLOW_LIMIT));
@@ -717,11 +718,19 @@
 
         IWaitStrategy backPressureWaitStrategy = IWaitStrategy.createBackPressureWaitStrategy(topologyConf);
         Map<List<Long>, JCQueue> receiveQueueMap = new HashMap<>();
+
         for (List<Long> executor : executors) {
             List<Integer> taskIds = StormCommon.executorIdToTasks(executor);
+            int taskId = taskIds.get(0);
+            String compId;
+            if (taskId == Constants.SYSTEM_TASK_ID) {
+                compId = Constants.SYSTEM_COMPONENT_ID;
+            } else {
+                compId = taskToComponent.get(taskId);
+            }
             receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(), "receive-queue",
                                                       recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
-                this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, taskIds, this.getPort(), metricRegistry));
+                this.getTopologyId(), compId, taskIds, this.getPort(), metricRegistry));
 
         }
         return receiveQueueMap;