blob: 6585de3ebb4d4b29ef37ceac75158385b4a8a80a [file] [log] [blame]
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
rocketmq "github.com/apache/rocketmq-client-go/core"
)
type statiBenchmarkProducerSnapshot struct {
sendRequestSuccessCount int64
sendRequestFailedCount int64
receiveResponseSuccessCount int64
receiveResponseFailedCount int64
sendMessageSuccessTimeTotal int64
sendMessageMaxRT int64
createdAt time.Time
next *statiBenchmarkProducerSnapshot
}
type snapshots struct {
sync.RWMutex
head, tail, cur *statiBenchmarkProducerSnapshot
len int
}
func (s *snapshots) takeSnapshot() {
b := s.cur
sn := new(statiBenchmarkProducerSnapshot)
sn.sendRequestSuccessCount = atomic.LoadInt64(&b.sendRequestSuccessCount)
sn.sendRequestFailedCount = atomic.LoadInt64(&b.sendRequestFailedCount)
sn.receiveResponseSuccessCount = atomic.LoadInt64(&b.receiveResponseSuccessCount)
sn.receiveResponseFailedCount = atomic.LoadInt64(&b.receiveResponseFailedCount)
sn.sendMessageSuccessTimeTotal = atomic.LoadInt64(&b.sendMessageSuccessTimeTotal)
sn.sendMessageMaxRT = atomic.LoadInt64(&b.sendMessageMaxRT)
sn.createdAt = time.Now()
s.Lock()
if s.tail != nil {
s.tail.next = sn
}
s.tail = sn
if s.head == nil {
s.head = s.tail
}
s.len++
if s.len > 10 {
s.head = s.head.next
s.len--
}
s.Unlock()
}
func (s *snapshots) printStati() {
s.RLock()
if s.len < 10 {
s.RUnlock()
return
}
f, l := s.head, s.tail
respSucCount := float64(l.receiveResponseSuccessCount - f.receiveResponseSuccessCount)
sendTps := respSucCount / l.createdAt.Sub(f.createdAt).Seconds()
avgRT := float64(l.sendMessageSuccessTimeTotal-f.sendMessageSuccessTimeTotal) / respSucCount
maxRT := atomic.LoadInt64(&s.cur.sendMessageMaxRT)
s.RUnlock()
fmt.Printf(
"Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d Total:%d\n",
int64(sendTps), maxRT, avgRT, l.sendRequestFailedCount, l.receiveResponseFailedCount, l.receiveResponseSuccessCount,
)
}
var (
topic string
nameSrv string
groupID string
instanceCount int
testMinutes int
bodySize int
longText = ""
longTextLen int
)
func init() {
flag.StringVar(&topic, "t", "", "topic name")
flag.StringVar(&nameSrv, "n", "", "nameserver address")
flag.StringVar(&groupID, "g", "", "group id")
flag.IntVar(&instanceCount, "i", 1, "instance count")
flag.IntVar(&testMinutes, "m", 10, "test minutes")
flag.IntVar(&bodySize, "s", 32, "body size")
longText = strings.Repeat("0123456789", 100)
longTextLen = len(longText)
}
func buildMsg() string {
return longText[:bodySize]
}
func q(flag int) string {
return fmt.Sprintf(queue+"_%d", flag)
}
func produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}, topics, tags []string) {
p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{
GroupID: "",
NameServer: nameSrv,
})
if err != nil {
fmt.Printf("new producer error:%s\n", err)
return
}
p.Start()
defer p.Shutdown()
AGAIN:
select {
case <-exit:
return
default:
}
for i := len(topics) - 1; i >= 0; i-- {
topic := topics[i]
for _, tag := range tags {
req.Queue = topic
req.Tags = []string{tag}
req.Body = buildMsg()
now := time.Now()
ctx := context.Background()
_, err := p.SendMessageSync(&rocketmq.Message{
Topic: topic, Body: longText[:bodySize],
})
if err == nil {
atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
currentRT := int64(time.Since(now) / time.Millisecond)
atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
for currentRT > prevRT {
if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {
break
}
prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
}
continue
}
fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error())
//if _, ok := err.(*rpc.ErrorInfo); ok { TODO
//atomic.AddInt64(&stati.receiveResponseFailedCount, 1)
//} else {
//atomic.AddInt64(&stati.sendRequestFailedCount, 1)
//}
}
}
goto AGAIN
}
func takeSnapshot(s *snapshots, exit chan struct{}) {
ticker := time.NewTicker(time.Second)
for {
select {
case <-ticker.C:
s.takeSnapshot()
case <-exit:
ticker.Stop()
return
}
}
}
func printStati(s *snapshots, exit chan struct{}) {
ticker := time.NewTicker(time.Second * 10)
for {
select {
case <-ticker.C:
s.printStati()
case <-exit:
ticker.Stop()
return
}
}
}
func main() {
flag.Parse()
stati := statiBenchmarkProducerSnapshot{}
snapshots := snapshots{cur: &stati}
exitChan := make(chan struct{})
wg := sync.WaitGroup{}
topics := make([]string, topicCount)
for i := 0; i < topicCount; i++ {
topics[i] = q(i + startSuffix)
}
tags := make([]string, tagCount)
for i := 0; i < tagCount; i++ {
tags[i] = fmt.Sprintf("default_tag_%d", i)
}
for i := 0; i < instanceCount; i++ {
go func() {
wg.Add(1)
produceMsg(&stati, exitChan, topics, tags)
fmt.Println("exit of produce ms")
wg.Done()
}()
}
// snapshot
go func() {
wg.Add(1)
takeSnapshot(&snapshots, exitChan)
wg.Done()
}()
// print statistic
go func() {
wg.Add(1)
printStati(&snapshots, exitChan)
wg.Done()
}()
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
select {
case <-time.Tick(time.Minute * time.Duration(testMinutes)):
case <-signalChan:
}
close(exitChan)
wg.Wait()
snapshots.takeSnapshot()
snapshots.printStati()
fmt.Println("TEST DONE")
}