[ISSUE #2088] Optimize rocketmq client's stats of RT to make sense
Fix issue #2088 , make the log output of RT stat makes sense.
diff --git a/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java b/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java
index cf347b4..ba4773a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java
+++ b/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java
@@ -62,7 +62,7 @@
}
public void incPullRT(final String group, final String topic, final long rt) {
- this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1);
+ this.topicAndGroupPullRT.addRTValue(topic + "@" + group, (int) rt, 1);
}
public void incPullTPS(final String group, final String topic, final long msgs) {
@@ -70,7 +70,7 @@
}
public void incConsumeRT(final String group, final String topic, final long rt) {
- this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1);
+ this.topicAndGroupConsumeRT.addRTValue(topic + "@" + group, (int) rt, 1);
}
public void incConsumeOKTPS(final String group, final String topic, final long msgs) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/RTStatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/RTStatsItem.java
new file mode 100644
index 0000000..102148c
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/RTStatsItem.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.stats;
+
+import org.apache.rocketmq.logging.InternalLogger;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * A StatItem for response time, the only difference between from StatsItem is it has a different log output.
+ */
+public class RTStatsItem extends StatsItem {
+
+ public RTStatsItem(String statsName, String statsKey, ScheduledExecutorService scheduledExecutorService, InternalLogger log) {
+ super(statsName, statsKey, scheduledExecutorService, log);
+ }
+
+ /**
+ * For Response Time stat Item, the print detail should be a little different, TPS and SUM makes no sense.
+ * And we give a name "AVGRT" rather than AVGPT for value getAvgpt()
+ */
+ @Override
+ protected String statPrintDetail(StatsSnapshot ss) {
+ return String.format("TIMES: %d AVGRT: %.2f", ss.getTimes(), ss.getAvgpt());
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
index 6304ea2..b078551 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
@@ -55,13 +55,14 @@
double tps = 0;
double avgpt = 0;
long sum = 0;
+ long timesDiff = 0;
if (!csList.isEmpty()) {
CallSnapshot first = csList.getFirst();
CallSnapshot last = csList.getLast();
sum = last.getValue() - first.getValue();
tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());
- long timesDiff = last.getTimes() - first.getTimes();
+ timesDiff = last.getTimes() - first.getTimes();
if (timesDiff > 0) {
avgpt = (sum * 1.0d) / timesDiff;
}
@@ -70,6 +71,7 @@
statsSnapshot.setSum(sum);
statsSnapshot.setTps(tps);
statsSnapshot.setAvgpt(avgpt);
+ statsSnapshot.setTimes(timesDiff);
}
return statsSnapshot;
@@ -191,32 +193,25 @@
public void printAtMinutes() {
StatsSnapshot ss = computeStatsData(this.csListMinute);
- log.info(String.format("[%s] [%s] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2f",
- this.statsName,
- this.statsKey,
- ss.getSum(),
- ss.getTps(),
- ss.getAvgpt()));
+ log.info(String.format("[%s] [%s] Stats In One Minute, ", this.statsName, this.statsKey) + statPrintDetail(ss));
}
public void printAtHour() {
StatsSnapshot ss = computeStatsData(this.csListHour);
- log.info(String.format("[%s] [%s] Stats In One Hour, SUM: %d TPS: %.2f AVGPT: %.2f",
- this.statsName,
- this.statsKey,
- ss.getSum(),
- ss.getTps(),
- ss.getAvgpt()));
+ log.info(String.format("[%s] [%s] Stats In One Hour, ", this.statsName, this.statsKey) + statPrintDetail(ss));
+
}
public void printAtDay() {
StatsSnapshot ss = computeStatsData(this.csListDay);
- log.info(String.format("[%s] [%s] Stats In One Day, SUM: %d TPS: %.2f AVGPT: %.2f",
- this.statsName,
- this.statsKey,
- ss.getSum(),
- ss.getTps(),
- ss.getAvgpt()));
+ log.info(String.format("[%s] [%s] Stats In One Day, ", this.statsName, this.statsKey) + statPrintDetail(ss));
+ }
+
+ protected String statPrintDetail(StatsSnapshot ss) {
+ return String.format("SUM: %d TPS: %.2f AVGPT: %.2f",
+ ss.getSum(),
+ ss.getTps(),
+ ss.getAvgpt());
}
public AtomicLong getValue() {
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
index bcf9665..a28d008 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
@@ -158,6 +158,12 @@
statsItem.getTimes().addAndGet(incTimes);
}
+ public void addRTValue(final String statsKey, final int incValue, final int incTimes) {
+ StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey);
+ statsItem.getValue().addAndGet(incValue);
+ statsItem.getTimes().addAndGet(incTimes);
+ }
+
public void delValue(final String statsKey) {
StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null != statsItem) {
@@ -196,9 +202,21 @@
}
public StatsItem getAndCreateStatsItem(final String statsKey) {
+ return getAndCreateItem(statsKey, false);
+ }
+
+ public StatsItem getAndCreateRTStatsItem(final String statsKey) {
+ return getAndCreateItem(statsKey, true);
+ }
+
+ public StatsItem getAndCreateItem(final String statsKey, boolean rtItem) {
StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null == statsItem) {
- statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
+ if (rtItem) {
+ statsItem = new RTStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
+ } else {
+ statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
+ }
StatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem);
if (null != prev) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java
index 136f21a..0cecce9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsSnapshot.java
@@ -20,6 +20,8 @@
public class StatsSnapshot {
private long sum;
private double tps;
+
+ private long times;
private double avgpt;
public long getSum() {
@@ -45,4 +47,12 @@
public void setAvgpt(double avgpt) {
this.avgpt = avgpt;
}
+
+ public long getTimes() {
+ return times;
+ }
+
+ public void setTimes(long times) {
+ this.times = times;
+ }
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java
index 4b4a867..5b4c5d8 100644
--- a/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/stats/StatsItemSetTest.java
@@ -46,14 +46,17 @@
@Test
public void test_statsOfFirstStatisticsCycle() throws InterruptedException {
- final StatsItemSet statsItemSet = new StatsItemSet("topicTest", scheduler, null);
+ final String tpsStatKey = "tpsTest";
+ final String rtStatKey = "rtTest";
+ final StatsItemSet statsItemSet = new StatsItemSet(tpsStatKey, scheduler, null);
executor = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(100), new ThreadFactoryImpl("testMultiThread"));
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
- statsItemSet.addValue("topicTest", 2, 1);
+ statsItemSet.addValue(tpsStatKey, 2, 1);
+ statsItemSet.addRTValue(rtStatKey, 2, 1);
}
});
}
@@ -63,14 +66,33 @@
}
Thread.sleep(1000);
}
- // simulate schedule task execution
- statsItemSet.getStatsItem("topicTest").samplingInSeconds();
- statsItemSet.getStatsItem("topicTest").samplingInMinutes();
- statsItemSet.getStatsItem("topicTest").samplingInHour();
+ // simulate schedule task execution , tps stat
+ {
+ statsItemSet.getStatsItem(tpsStatKey).samplingInSeconds();
+ statsItemSet.getStatsItem(tpsStatKey).samplingInMinutes();
+ statsItemSet.getStatsItem(tpsStatKey).samplingInHour();
- assertEquals(20L, statsItemSet.getStatsDataInMinute("topicTest").getSum());
- assertEquals(20L, statsItemSet.getStatsDataInHour("topicTest").getSum());
- assertEquals(20L, statsItemSet.getStatsDataInDay("topicTest").getSum());
+ assertEquals(20L, statsItemSet.getStatsDataInMinute(tpsStatKey).getSum());
+ assertEquals(20L, statsItemSet.getStatsDataInHour(tpsStatKey).getSum());
+ assertEquals(20L, statsItemSet.getStatsDataInDay(tpsStatKey).getSum());
+ assertEquals(10L, statsItemSet.getStatsDataInDay(tpsStatKey).getTimes());
+ assertEquals(10L, statsItemSet.getStatsDataInHour(tpsStatKey).getTimes());
+ assertEquals(10L, statsItemSet.getStatsDataInDay(tpsStatKey).getTimes());
+ }
+
+ // simulate schedule task execution , rt stat
+ {
+ statsItemSet.getStatsItem(rtStatKey).samplingInSeconds();
+ statsItemSet.getStatsItem(rtStatKey).samplingInMinutes();
+ statsItemSet.getStatsItem(rtStatKey).samplingInHour();
+
+ assertEquals(20L, statsItemSet.getStatsDataInMinute(rtStatKey).getSum());
+ assertEquals(20L, statsItemSet.getStatsDataInHour(rtStatKey).getSum());
+ assertEquals(20L, statsItemSet.getStatsDataInDay(rtStatKey).getSum());
+ assertEquals(10L, statsItemSet.getStatsDataInDay(rtStatKey).getTimes());
+ assertEquals(10L, statsItemSet.getStatsDataInHour(rtStatKey).getTimes());
+ assertEquals(10L, statsItemSet.getStatsDataInDay(rtStatKey).getTimes());
+ }
}
private AtomicLong test_unit() throws InterruptedException {