Merge pull request #677 from wenfengwang/fix-init
[ISSUE #678] optimizing goroutine of Stat creation
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 3504786..584a4b5 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -271,6 +271,8 @@
namesrv internal.Namesrvs
pullFromWhichNodeTable sync.Map
+
+ stat *StatsManager
}
func (dc *defaultConsumer) start() error {
@@ -291,6 +293,7 @@
dc.client.Start()
atomic.StoreInt32(&dc.state, int32(internal.StateRunning))
dc.consumerStartTimestamp = time.Now().UnixNano() / int64(time.Millisecond)
+ dc.stat = NewStatsManager()
return nil
}
@@ -305,6 +308,7 @@
mqs = append(mqs, &k)
return true
})
+ dc.stat.ShutDownStat()
dc.storage.persist(mqs)
dc.client.Shutdown()
return nil
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 49342a2..519ba36 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -350,7 +350,7 @@
res.ConsumeResult = internal.ConsumeRetryLater
}
- increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
+ pc.stat.increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
return res
}
@@ -362,12 +362,12 @@
topic := key.(string)
info.SubscriptionData[value.(*internal.SubscriptionData)] = true
status := internal.ConsumeStatus{
- PullRT: getPullRT(topic, pc.consumerGroup).avgpt,
- PullTPS: getPullTPS(topic, pc.consumerGroup).tps,
- ConsumeRT: getConsumeRT(topic, pc.consumerGroup).avgpt,
- ConsumeOKTPS: getConsumeOKTPS(topic, pc.consumerGroup).tps,
- ConsumeFailedTPS: getConsumeFailedTPS(topic, pc.consumerGroup).tps,
- ConsumeFailedMsgs: topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + pc.consumerGroup).sum,
+ PullRT: pc.stat.getPullRT(topic, pc.consumerGroup).avgpt,
+ PullTPS: pc.stat.getPullTPS(topic, pc.consumerGroup).tps,
+ ConsumeRT: pc.stat.getConsumeRT(topic, pc.consumerGroup).avgpt,
+ ConsumeOKTPS: pc.stat.getConsumeOKTPS(topic, pc.consumerGroup).tps,
+ ConsumeFailedTPS: pc.stat.getConsumeFailedTPS(topic, pc.consumerGroup).tps,
+ ConsumeFailedMsgs: pc.stat.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + pc.consumerGroup).sum,
}
info.StatusTable[topic] = status
return true
@@ -743,7 +743,7 @@
request.nextOffset = result.NextBeginOffset
rt := time.Now().Sub(beginTime) / time.Millisecond
- increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
+ pc.stat.increasePullRT(pc.consumerGroup, request.mq.Topic, int64(rt))
pc.processPullResult(request.mq, result, sd)
@@ -751,7 +751,7 @@
firstMsgOffset := int64(math.MaxInt64)
if msgFounded != nil && len(msgFounded) != 0 {
firstMsgOffset = msgFounded[0].QueueOffset
- increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
+ pc.stat.increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
pq.putMessage(msgFounded...)
}
if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
@@ -1007,14 +1007,14 @@
msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
}
- increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
+ pc.stat.increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
if !pq.IsDroppd() {
msgBackFailed := make([]*primitive.MessageExt, 0)
if result == ConsumeSuccess {
- increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
+ pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
} else {
- increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
+ pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
if pc.model == BroadCasting {
for i := 0; i < len(subMsgs); i++ {
rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
diff --git a/consumer/statistics.go b/consumer/statistics.go
index aae5f89..e9d5d79 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -28,23 +28,24 @@
"github.com/apache/rocketmq-client-go/v2/rlog"
)
-var (
- csListLock sync.Mutex
- closeOnce sync.Once
-
+type StatsManager struct {
+ startOnce sync.Once
+ closeOnce sync.Once
topicAndGroupConsumeOKTPS *statsItemSet
topicAndGroupConsumeRT *statsItemSet
topicAndGroupConsumeFailedTPS *statsItemSet
topicAndGroupPullTPS *statsItemSet
topicAndGroupPullRT *statsItemSet
-)
+}
-func init() {
- topicAndGroupConsumeOKTPS = newStatsItemSet("CONSUME_OK_TPS")
- topicAndGroupConsumeRT = newStatsItemSet("CONSUME_RT")
- topicAndGroupConsumeFailedTPS = newStatsItemSet("CONSUME_FAILED_TPS")
- topicAndGroupPullTPS = newStatsItemSet("PULL_TPS")
- topicAndGroupPullRT = newStatsItemSet("PULL_RT")
+func NewStatsManager() *StatsManager {
+ mgr := &StatsManager{}
+ mgr.topicAndGroupConsumeOKTPS = newStatsItemSet("CONSUME_OK_TPS")
+ mgr.topicAndGroupConsumeRT = newStatsItemSet("CONSUME_RT")
+ mgr.topicAndGroupConsumeFailedTPS = newStatsItemSet("CONSUME_FAILED_TPS")
+ mgr.topicAndGroupPullTPS = newStatsItemSet("PULL_TPS")
+ mgr.topicAndGroupPullRT = newStatsItemSet("PULL_RT")
+ return mgr
}
type ConsumeStatus struct {
@@ -56,81 +57,104 @@
ConsumeFailedMsgs int64
}
-func increasePullRT(group, topic string, rt int64) {
- topicAndGroupPullRT.addValue(topic+"@"+group, rt, 1)
+func (mgr *StatsManager) increasePullRT(group, topic string, rt int64) {
+ mgr.topicAndGroupPullRT.addValue(topic+"@"+group, rt, 1)
}
-func increasePullTPS(group, topic string, msgs int) {
- topicAndGroupPullTPS.addValue(topic+"@"+group, int64(msgs), 1)
+func (mgr *StatsManager) increasePullTPS(group, topic string, msgs int) {
+ mgr.topicAndGroupPullTPS.addValue(topic+"@"+group, int64(msgs), 1)
}
-func increaseConsumeRT(group, topic string, rt int64) {
- topicAndGroupConsumeRT.addValue(topic+"@"+group, rt, 1)
+func (mgr *StatsManager) increaseConsumeRT(group, topic string, rt int64) {
+ mgr.topicAndGroupConsumeRT.addValue(topic+"@"+group, rt, 1)
}
-func increaseConsumeOKTPS(group, topic string, msgs int) {
- topicAndGroupConsumeOKTPS.addValue(topic+"@"+group, int64(msgs), 1)
+func (mgr *StatsManager) increaseConsumeOKTPS(group, topic string, msgs int) {
+ mgr.topicAndGroupConsumeOKTPS.addValue(topic+"@"+group, int64(msgs), 1)
}
-func increaseConsumeFailedTPS(group, topic string, msgs int) {
- topicAndGroupConsumeFailedTPS.addValue(topic+"@"+group, int64(msgs), 1)
+func (mgr *StatsManager) increaseConsumeFailedTPS(group, topic string, msgs int) {
+ mgr.topicAndGroupConsumeFailedTPS.addValue(topic+"@"+group, int64(msgs), 1)
}
-func GetConsumeStatus(group, topic string) ConsumeStatus {
+func (mgr *StatsManager) GetConsumeStatus(group, topic string) ConsumeStatus {
cs := ConsumeStatus{}
- ss := getPullRT(group, topic)
+ ss := mgr.getPullRT(group, topic)
cs.PullTPS = ss.tps
- ss = getPullTPS(group, topic)
+ ss = mgr.getPullTPS(group, topic)
cs.PullTPS = ss.tps
- ss = getConsumeRT(group, topic)
+ ss = mgr.getConsumeRT(group, topic)
cs.ConsumeRT = ss.avgpt
- ss = getConsumeOKTPS(group, topic)
+ ss = mgr.getConsumeOKTPS(group, topic)
cs.ConsumeOKTPS = ss.tps
- ss = getConsumeFailedTPS(group, topic)
+ ss = mgr.getConsumeFailedTPS(group, topic)
cs.ConsumeFailedTPS = ss.tps
- ss = topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group)
+ ss = mgr.topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + group)
cs.ConsumeFailedMsgs = ss.sum
return cs
}
-func ShutDownStatis() {
- closeOnce.Do(func() {
- close(topicAndGroupConsumeOKTPS.closed)
- close(topicAndGroupConsumeRT.closed)
- close(topicAndGroupConsumeFailedTPS.closed)
- close(topicAndGroupPullTPS.closed)
- close(topicAndGroupPullRT.closed)
+func (mgr *StatsManager) ShutDownStat() {
+ mgr.closeOnce.Do(func() {
+ close(mgr.topicAndGroupConsumeOKTPS.closed)
+ close(mgr.topicAndGroupConsumeRT.closed)
+ close(mgr.topicAndGroupConsumeFailedTPS.closed)
+ close(mgr.topicAndGroupPullTPS.closed)
+ close(mgr.topicAndGroupPullRT.closed)
})
}
-func getPullRT(group, topic string) statsSnapshot {
- return topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group)
+func (mgr *StatsManager) getPullRT(group, topic string) statsSnapshot {
+ return mgr.topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group)
}
-func getPullTPS(group, topic string) statsSnapshot {
- return topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group)
+func (mgr *StatsManager) getPullTPS(group, topic string) statsSnapshot {
+ return mgr.topicAndGroupPullTPS.getStatsDataInMinute(topic + "@" + group)
}
-func getConsumeRT(group, topic string) statsSnapshot {
- ss := topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group)
+func (mgr *StatsManager) getConsumeRT(group, topic string) statsSnapshot {
+ ss := mgr.topicAndGroupPullRT.getStatsDataInMinute(topic + "@" + group)
if ss.sum == 0 {
- return topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group)
+ return mgr.topicAndGroupConsumeRT.getStatsDataInHour(topic + "@" + group)
}
return ss
}
-func getConsumeOKTPS(group, topic string) statsSnapshot {
- return topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group)
+func (mgr *StatsManager) getConsumeOKTPS(group, topic string) statsSnapshot {
+ return mgr.topicAndGroupConsumeOKTPS.getStatsDataInMinute(topic + "@" + group)
}
-func getConsumeFailedTPS(group, topic string) statsSnapshot {
- return topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group)
+func (mgr *StatsManager) getConsumeFailedTPS(group, topic string) statsSnapshot {
+ return mgr.topicAndGroupConsumeFailedTPS.getStatsDataInMinute(topic + "@" + group)
+}
+
+var csListLock sync.Mutex
+
+func computeStatsData(csList *list.List) statsSnapshot {
+ csListLock.Lock()
+ defer csListLock.Unlock()
+ tps, avgpt, sum := 0.0, 0.0, int64(0)
+ if csList.Len() > 0 {
+ first := csList.Front().Value.(callSnapshot)
+ last := csList.Back().Value.(callSnapshot)
+ sum = last.value - first.value
+ tps = float64(sum*1000.0) / float64(last.timestamp-first.timestamp)
+ timesDiff := last.time - first.time
+ if timesDiff > 0 {
+ avgpt = float64(sum*1.0) / float64(timesDiff)
+ }
+ }
+ return statsSnapshot{
+ tps: tps,
+ avgpt: avgpt,
+ sum: sum,
+ }
}
type statsItemSet struct {
@@ -158,7 +182,6 @@
return
case <-ticker.C:
sis.samplingInSeconds()
-
}
}
})
@@ -449,27 +472,6 @@
return now.AddDate(0, 1, 0)
}
-func computeStatsData(csList *list.List) statsSnapshot {
- csListLock.Lock()
- defer csListLock.Unlock()
- tps, avgpt, sum := 0.0, 0.0, int64(0)
- if csList.Len() > 0 {
- first := csList.Front().Value.(callSnapshot)
- last := csList.Back().Value.(callSnapshot)
- sum = last.value - first.value
- tps = float64(sum*1000.0) / float64(last.timestamp-first.timestamp)
- timesDiff := last.time - first.time
- if timesDiff > 0 {
- avgpt = float64(sum*1.0) / float64(timesDiff)
- }
- }
- return statsSnapshot{
- tps: tps,
- avgpt: avgpt,
- sum: sum,
- }
-}
-
type callSnapshot struct {
timestamp int64
time int64
diff --git a/consumer/statistics_test.go b/consumer/statistics_test.go
index f70ed0b..4836141 100644
--- a/consumer/statistics_test.go
+++ b/consumer/statistics_test.go
@@ -51,7 +51,8 @@
}
func TestIncreasePullRTGetPullRT(t *testing.T) {
- ShutDownStatis()
+ mgr := NewStatsManager()
+ mgr.ShutDownStat()
tests := []struct {
RT int64
@@ -67,9 +68,9 @@
{1, 6},
}
for _, tt := range tests {
- increasePullRT("rocketmq", "default", tt.RT)
- topicAndGroupPullRT.samplingInSeconds()
- snapshot := getPullRT("rocketmq", "default")
+ mgr.increasePullRT("rocketmq", "default", tt.RT)
+ mgr.topicAndGroupPullRT.samplingInSeconds()
+ snapshot := mgr.getPullRT("rocketmq", "default")
if snapshot.sum != tt.ExpectSum {
t.Errorf("wrong Pull RT sum. want=%d, got=%d", tt.ExpectSum, snapshot.sum)
}
@@ -77,7 +78,7 @@
}
//func TestIncreaseConsumeRTGetConsumeRT(t *testing.T) {
-// ShutDownStatis()
+// ShutDownStat()
// tests := []struct {
// RT int64
// ExpectSum int64
@@ -102,7 +103,8 @@
//}
func TestIncreasePullTPSGetPullTPS(t *testing.T) {
- ShutDownStatis()
+ mgr := NewStatsManager()
+ mgr.ShutDownStat()
tests := []struct {
RT int
ExpectSum int64
@@ -117,9 +119,9 @@
{1, 6},
}
for _, tt := range tests {
- increasePullTPS("rocketmq", "default", tt.RT)
- topicAndGroupPullTPS.samplingInSeconds()
- snapshot := getPullTPS("rocketmq", "default")
+ mgr.increasePullTPS("rocketmq", "default", tt.RT)
+ mgr.topicAndGroupPullTPS.samplingInSeconds()
+ snapshot := mgr.getPullTPS("rocketmq", "default")
if snapshot.sum != tt.ExpectSum {
t.Errorf("wrong Pull TPS sum. want=%d, got=%d", tt.ExpectSum, snapshot.sum)
}
@@ -127,7 +129,8 @@
}
func TestIncreaseConsumeOKTPSGetConsumeOKTPS(t *testing.T) {
- ShutDownStatis()
+ mgr := NewStatsManager()
+ mgr.ShutDownStat()
tests := []struct {
RT int
ExpectSum int64
@@ -142,9 +145,9 @@
{1, 6},
}
for _, tt := range tests {
- increaseConsumeOKTPS("rocketmq", "default", tt.RT)
- topicAndGroupConsumeOKTPS.samplingInSeconds()
- snapshot := getConsumeOKTPS("rocketmq", "default")
+ mgr.increaseConsumeOKTPS("rocketmq", "default", tt.RT)
+ mgr.topicAndGroupConsumeOKTPS.samplingInSeconds()
+ snapshot := mgr.getConsumeOKTPS("rocketmq", "default")
if snapshot.sum != tt.ExpectSum {
t.Errorf("wrong Consume OK TPS sum. want=%d, got=%d", tt.ExpectSum, snapshot.sum)
}
@@ -152,7 +155,8 @@
}
func TestIncreaseConsumeFailedTPSGetConsumeFailedTPS(t *testing.T) {
- ShutDownStatis()
+ mgr := NewStatsManager()
+ mgr.ShutDownStat()
tests := []struct {
RT int
ExpectSum int64
@@ -167,9 +171,9 @@
{1, 6},
}
for _, tt := range tests {
- increaseConsumeFailedTPS("rocketmq", "default", tt.RT)
- topicAndGroupConsumeFailedTPS.samplingInSeconds()
- snapshot := getConsumeFailedTPS("rocketmq", "default")
+ mgr.increaseConsumeFailedTPS("rocketmq", "default", tt.RT)
+ mgr.topicAndGroupConsumeFailedTPS.samplingInSeconds()
+ snapshot := mgr.getConsumeFailedTPS("rocketmq", "default")
if snapshot.sum != tt.ExpectSum {
t.Errorf("wrong Consume Failed TPS sum. want=%d, got=%d", tt.ExpectSum, snapshot.sum)
}
@@ -177,7 +181,8 @@
}
func TestGetConsumeStatus(t *testing.T) {
- ShutDownStatis()
+ mgr := NewStatsManager()
+ mgr.ShutDownStat()
group, topic := "rocketmq", "default"
tests := []struct {
@@ -191,17 +196,17 @@
{1, 4},
}
for _, tt := range tests {
- increasePullRT(group, topic, int64(tt.RT))
- increasePullTPS(group, topic, tt.RT)
- increaseConsumeRT(group, topic, int64(tt.RT))
- increaseConsumeOKTPS(group, topic, tt.RT)
- increaseConsumeFailedTPS(group, topic, tt.RT)
- topicAndGroupPullRT.samplingInSeconds()
- topicAndGroupPullTPS.samplingInSeconds()
- topicAndGroupConsumeRT.samplingInMinutes()
- topicAndGroupConsumeOKTPS.samplingInSeconds()
- topicAndGroupConsumeFailedTPS.samplingInMinutes()
- status := GetConsumeStatus(group, topic)
+ mgr.increasePullRT(group, topic, int64(tt.RT))
+ mgr.increasePullTPS(group, topic, tt.RT)
+ mgr.increaseConsumeRT(group, topic, int64(tt.RT))
+ mgr.increaseConsumeOKTPS(group, topic, tt.RT)
+ mgr.increaseConsumeFailedTPS(group, topic, tt.RT)
+ mgr.topicAndGroupPullRT.samplingInSeconds()
+ mgr.topicAndGroupPullTPS.samplingInSeconds()
+ mgr.topicAndGroupConsumeRT.samplingInMinutes()
+ mgr.topicAndGroupConsumeOKTPS.samplingInSeconds()
+ mgr.topicAndGroupConsumeFailedTPS.samplingInMinutes()
+ status := mgr.GetConsumeStatus(group, topic)
if status.ConsumeFailedMsgs != tt.ExpectFailMessage {
t.Errorf("wrong ConsumeFailedMsg. want=0, got=%d", status.ConsumeFailedMsgs)
}