Merge pull request #3298 from RuiLi8080/STORM-3663

[STORM-3663] move org.mockito.plugins.MockMaker to test level resource
diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
index 1843755..2cbee19 100644
--- a/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
+++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/queuetest/JCQueuePerfTest.java
@@ -47,8 +47,8 @@
     private static void ackingProducerSimulation() {
         WaitStrategyPark ws = new WaitStrategyPark(100);
         StormMetricRegistry registry = new StormMetricRegistry();
-        JCQueue spoutQ = new JCQueue("spoutQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
-        JCQueue ackQ = new JCQueue("ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+        JCQueue spoutQ = new JCQueue("spoutQ", "spoutQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+        JCQueue ackQ = new JCQueue("ackQ", "ackQ", 1024, 0, 100, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
 
         final AckingProducer ackingProducer = new AckingProducer(spoutQ, ackQ);
         final Acker acker = new Acker(ackQ, spoutQ);
@@ -59,9 +59,9 @@
     private static void producerFwdConsumer(int prodBatchSz) {
         WaitStrategyPark ws = new WaitStrategyPark(100);
         StormMetricRegistry registry = new StormMetricRegistry();
-        JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test",
+        JCQueue q1 = new JCQueue("q1", "q1", 1024, 0, prodBatchSz, ws, "test", "test",
                 Collections.singletonList(1000), 1000, registry);
-        JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+        JCQueue q2 = new JCQueue("q2", "q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
 
         final Producer prod = new Producer(q1);
         final Forwarder fwd = new Forwarder(q1, q2);
@@ -72,7 +72,7 @@
 
 
     private static void oneProducer1Consumer(int prodBatchSz) {
-        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+        JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
                 Collections.singletonList(1000), 1000, new StormMetricRegistry());
 
         final Producer prod1 = new Producer(q1);
@@ -82,7 +82,7 @@
     }
 
     private static void twoProducer1Consumer(int prodBatchSz) {
-        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+        JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
                 Collections.singletonList(1000), 1000, new StormMetricRegistry());
 
         final Producer prod1 = new Producer(q1);
@@ -93,7 +93,7 @@
     }
 
     private static void threeProducer1Consumer(int prodBatchSz) {
-        JCQueue q1 = new JCQueue("q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
+        JCQueue q1 = new JCQueue("q1", "q1", 50_000, 0, prodBatchSz, new WaitStrategyPark(100), "test", "test",
                 Collections.singletonList(1000), 1000, new StormMetricRegistry());
 
         final Producer prod1 = new Producer(q1);
@@ -108,8 +108,8 @@
     private static void oneProducer2Consumers(int prodBatchSz) {
         WaitStrategyPark ws = new WaitStrategyPark(100);
         StormMetricRegistry registry = new StormMetricRegistry();
-        JCQueue q1 = new JCQueue("q1", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
-        JCQueue q2 = new JCQueue("q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+        JCQueue q1 = new JCQueue("q1", "q1", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
+        JCQueue q2 = new JCQueue("q2", "q2", 1024, 0, prodBatchSz, ws, "test", "test", Collections.singletonList(1000), 1000, registry);
 
         final Producer2 prod1 = new Producer2(q1, q2);
         final Consumer cons1 = new Consumer(q1);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index 70840ac..db0c0e5 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -36,7 +36,6 @@
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.StormTimer;
-import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.cluster.IStateStorage;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.cluster.VersionedData;
@@ -70,14 +69,12 @@
 import org.apache.storm.serialization.KryoTupleSerializer;
 import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
 import org.apache.storm.shade.com.google.common.collect.Sets;
-import org.apache.storm.shade.org.apache.commons.lang.Validate;
 import org.apache.storm.task.WorkerTopologyContext;
 import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.ConfigUtils;
 import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.ObjectReader;
-import org.apache.storm.utils.SupervisorClient;
 import org.apache.storm.utils.SupervisorIfaceFactory;
 import org.apache.storm.utils.ThriftTopologyUtils;
 import org.apache.storm.utils.Utils;
@@ -569,7 +566,7 @@
         queue.recordMsgDrop();
         LOG.warn(
             "Dropping message as overflow threshold has reached for Q = {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}",
-            queue.getName(), queue.getOverflowCount(), dropCount, tuple);
+            queue.getQueueName(), queue.getOverflowCount(), dropCount, tuple);
     }
 
     public void checkSerialize(KryoTupleSerializer serializer, AddressedTuple tuple) {
@@ -679,7 +676,7 @@
         Map<List<Long>, JCQueue> receiveQueueMap = new HashMap<>();
         for (List<Long> executor : executors) {
             List<Integer> taskIds = StormCommon.executorIdToTasks(executor);
-            receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(),
+            receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(), "receive-queue",
                                                       recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
                 this.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, taskIds, this.getPort(), metricRegistry));
 
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
index 68f4d82..f2da7bc 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
@@ -66,7 +66,8 @@
                                                + Config.TOPOLOGY_TRANSFER_BUFFER_SIZE + ":" + xferQueueSz);
         }
 
-        this.transferQueue = new JCQueue("worker-transfer-queue", xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy,
+        this.transferQueue = new JCQueue("worker-transfer-queue", "worker-transfer-queue",
+            xferQueueSz, 0, xferBatchSz, backPressureWaitStrategy,
             workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, Collections.singletonList(-1), workerState.getPort(),
             workerState.getMetricRegistry());
     }
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 6d5b9f2..d45042b 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -275,7 +275,7 @@
 
         TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
         if (isDebug) {
-            LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
+            LOG.info("Processing received TUPLE: {} for TASK: {} ", tuple, taskId);
         }
 
         try {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
index 4a40f82..f330b42 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
@@ -44,15 +44,16 @@
     private final IWaitStrategy backPressureWaitStrategy;
     private final String queueName;
 
-    public JCQueue(String queueName, int size, int overflowLimit, int producerBatchSz, IWaitStrategy backPressureWaitStrategy,
-                   String topologyId, String componentId, List<Integer> taskIds, int port, StormMetricRegistry metricRegistry) {
+    public JCQueue(String queueName, String metricNamePrefix, int size, int overflowLimit, int producerBatchSz,
+                   IWaitStrategy backPressureWaitStrategy, String topologyId, String componentId, List<Integer> taskIds,
+                   int port, StormMetricRegistry metricRegistry) {
         this.queueName = queueName;
         this.overflowLimit = overflowLimit;
         this.recvQueue = new MpscArrayQueue<>(size);
         this.overflowQ = new MpscUnboundedArrayQueue<>(size);
 
         for (Integer taskId : taskIds) {
-            this.jcqMetrics.add(new JCQueueMetrics(queueName, topologyId, componentId, taskId, port,
+            this.jcqMetrics.add(new JCQueueMetrics(metricNamePrefix, topologyId, componentId, taskId, port,
                     metricRegistry, recvQueue, overflowQ));
         }
 
@@ -61,7 +62,7 @@
         this.backPressureWaitStrategy = backPressureWaitStrategy;
     }
 
-    public String getName() {
+    public String getQueueName() {
         return queueName;
     }
 
@@ -282,7 +283,8 @@
                     jcQueueMetric.notifyInsertFailure();
                 }
                 if (idleCount == 0) { // check avoids multiple log msgs when in a idle loop
-                    LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait", queue.getName());
+                    LOG.debug("Experiencing Back Pressure on recvQueue: '{}'. Entering BackPressure Wait",
+                        queue.getQueueName());
                 }
 
                 idleCount = queue.backPressureWaitStrategy.idle(idleCount);
@@ -372,7 +374,8 @@
                     jcQueueMetric.notifyInsertFailure();
                 }
                 if (retryCount == 0) { // check avoids multiple log msgs when in a idle loop
-                    LOG.debug("Experiencing Back Pressure when flushing batch to Q: {}. Entering BackPressure Wait.", queue.getName());
+                    LOG.debug("Experiencing Back Pressure when flushing batch to Q: '{}'. Entering BackPressure Wait.",
+                        queue.getQueueName());
                 }
                 retryCount = queue.backPressureWaitStrategy.idle(retryCount);
                 if (Thread.interrupted()) {
diff --git a/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java b/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
index eec5b33..6a6a56f 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/JCQueueMetrics.java
@@ -32,7 +32,7 @@
     private final RateTracker insertFailuresTracker = new RateTracker(10000, 10);
     private final AtomicLong droppedMessages = new AtomicLong(0);
 
-    public JCQueueMetrics(String queueName, String topologyId, String componentId, int taskId, int port,
+    public JCQueueMetrics(String metricNamePrefix, String topologyId, String componentId, int taskId, int port,
                           StormMetricRegistry metricRegistry, MpscArrayQueue<Object> receiveQ,
                           MpscUnboundedArrayQueue<Object> overflowQ) {
 
@@ -95,14 +95,14 @@
             }
         };
 
-        metricRegistry.gauge(queueName + "-capacity", cap, topologyId, componentId, taskId, port);
-        metricRegistry.gauge(queueName + "-pct_full", pctFull, topologyId, componentId, taskId, port);
-        metricRegistry.gauge(queueName + "-population", pop, topologyId, componentId, taskId, port);
-        metricRegistry.gauge(queueName + "-arrival_rate_secs", arrivalRate, topologyId, componentId, taskId, port);
-        metricRegistry.gauge(queueName + "-sojourn_time_ms", sojourn, topologyId, componentId, taskId, port);
-        metricRegistry.gauge(queueName + "-insert_failures", insertFailures, topologyId, componentId, taskId, port);
-        metricRegistry.gauge(queueName + "-dropped_messages", dropped, topologyId, componentId, taskId, port);
-        metricRegistry.gauge(queueName + "-overflow", overflow, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-capacity", cap, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-pct_full", pctFull, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-population", pop, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-arrival_rate_secs", arrivalRate, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-sojourn_time_ms", sojourn, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-insert_failures", insertFailures, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-dropped_messages", dropped, topologyId, componentId, taskId, port);
+        metricRegistry.gauge(metricNamePrefix + "-overflow", overflow, topologyId, componentId, taskId, port);
     }
 
     public void notifyArrivals(long counts) {
diff --git a/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java b/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java
index 03240c8..8e493ef 100644
--- a/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java
+++ b/storm-client/test/jvm/org/apache/storm/executor/ExecutorTransferMultiThreadingTest.java
@@ -95,7 +95,7 @@
         //taskId for worker transfer queue should be -1.
         //But there is already one worker transfer queue initialized by WorkerTransfer class (taskId=-1).
         //However the taskId is only used for metrics and it is not important here. Making it -100 to avoid collision.
-        transferQueue = new JCQueue("worker-transfer-queue", 1024, 0, 1, new WaitStrategyPark(100),
+        transferQueue = new JCQueue("worker-transfer-queue", "worker-transfer-queue", 1024, 0, 1, new WaitStrategyPark(100),
             workerState.getTopologyId(), Constants.SYSTEM_COMPONENT_ID, Collections.singletonList(-100), workerState.getPort(),
             workerState.getMetricRegistry());
 
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
index 07a9123..91e26c9 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueBackpressureTest.java
@@ -22,7 +22,7 @@
 public class JCQueueBackpressureTest {
     
     private static JCQueue createQueue(String name, int queueSize) {
-        return new JCQueue(name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
+        return new JCQueue(name, name, queueSize, 0, 1, new WaitStrategyPark(0), "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
     }
 
     @Test
diff --git a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
index 4a98937..1d093af 100644
--- a/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
+++ b/storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
@@ -157,7 +157,7 @@
     }
 
     private JCQueue createQueue(String name, int batchSize, int queueSize) {
-        return new JCQueue(name, queueSize, 0, batchSize, waitStrategy, "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
+        return new JCQueue(name, name, queueSize, 0, batchSize, waitStrategy, "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
     }
 
     private static class IncProducer implements Runnable {