[ISSUE #356] feat(consumer): redesign stat (#357)
* feat(consumer): redesign stat
diff --git a/consumer/statistics.go b/consumer/statistics.go
index 2448c74..a58ff9e 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -29,6 +29,7 @@
var (
csListLock sync.Mutex
+ closeOnce sync.Once
topicAndGroupConsumeOKTPS *statsItemSet
topicAndGroupConsumeRT *statsItemSet
@@ -98,11 +99,13 @@
}
func ShutDownStatis() {
- topicAndGroupConsumeOKTPS.closed = true
- topicAndGroupConsumeRT.closed = true
- topicAndGroupConsumeFailedTPS.closed = true
- topicAndGroupPullTPS.closed = true
- topicAndGroupPullRT.closed = true
+ closeOnce.Do(func() {
+ close(topicAndGroupConsumeOKTPS.closed)
+ close(topicAndGroupConsumeRT.closed)
+ close(topicAndGroupConsumeFailedTPS.closed)
+ close(topicAndGroupPullTPS.closed)
+ close(topicAndGroupPullRT.closed)
+ })
}
func getPullRT(group, topic string) statsSnapshot {
@@ -132,12 +135,13 @@
type statsItemSet struct {
statsName string
statsItemTable sync.Map
- closed bool
+ closed chan struct{}
}
func newStatsItemSet(statsName string) *statsItemSet {
sis := &statsItemSet{
statsName: statsName,
+ closed: make(chan struct{}),
}
sis.init()
return sis
@@ -145,47 +149,84 @@
func (sis *statsItemSet) init() {
go func() {
- for !sis.closed {
- sis.samplingInSeconds()
- time.Sleep(10 * time.Second)
+ ticker := time.NewTicker(10 * time.Second)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-sis.closed:
+ return
+ case <-ticker.C:
+ sis.samplingInSeconds()
+
+ }
}
}()
go func() {
- for !sis.closed {
- sis.samplingInMinutes()
- time.Sleep(10 * time.Minute)
+ ticker := time.NewTicker(10 * time.Minute)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-sis.closed:
+ return
+ case <-ticker.C:
+ sis.samplingInMinutes()
+ }
}
}()
go func() {
- for !sis.closed {
- sis.samplingInHour()
- time.Sleep(time.Hour)
+ ticker := time.NewTicker(time.Hour)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-sis.closed:
+ return
+ case <-ticker.C:
+ sis.samplingInHour()
+ }
}
}()
go func() {
time.Sleep(nextMinutesTime().Sub(time.Now()))
- for !sis.closed {
- sis.printAtMinutes()
- time.Sleep(time.Minute)
+ ticker := time.NewTicker(time.Minute)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-sis.closed:
+ return
+ case <-ticker.C:
+ sis.printAtMinutes()
+ }
}
}()
go func() {
time.Sleep(nextHourTime().Sub(time.Now()))
- for !sis.closed {
- sis.printAtHour()
- time.Sleep(time.Hour)
+ ticker := time.NewTicker(time.Hour)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-sis.closed:
+ return
+ case <-ticker.C:
+ sis.printAtHour()
+ }
}
}()
go func() {
time.Sleep(nextMonthTime().Sub(time.Now()))
- for !sis.closed {
- sis.printAtDay()
- time.Sleep(24 * time.Hour)
+ ticker := time.NewTicker(24 * time.Hour)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-sis.closed:
+ return
+ case <-ticker.C:
+ sis.printAtDay()
+ }
}
}()
}