benchmark producer
diff --git a/benchmark/producer/main.go b/benchmark/producer/main.go
new file mode 100644
index 0000000..6585de3
--- /dev/null
+++ b/benchmark/producer/main.go
@@ -0,0 +1,253 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "os"
+ "os/signal"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "syscall"
+ "time"
+
+ rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+type statiBenchmarkProducerSnapshot struct {
+ sendRequestSuccessCount int64
+ sendRequestFailedCount int64
+ receiveResponseSuccessCount int64
+ receiveResponseFailedCount int64
+ sendMessageSuccessTimeTotal int64
+ sendMessageMaxRT int64
+ createdAt time.Time
+ next *statiBenchmarkProducerSnapshot
+}
+
+type snapshots struct {
+ sync.RWMutex
+ head, tail, cur *statiBenchmarkProducerSnapshot
+ len int
+}
+
+func (s *snapshots) takeSnapshot() {
+ b := s.cur
+ sn := new(statiBenchmarkProducerSnapshot)
+ sn.sendRequestSuccessCount = atomic.LoadInt64(&b.sendRequestSuccessCount)
+ sn.sendRequestFailedCount = atomic.LoadInt64(&b.sendRequestFailedCount)
+ sn.receiveResponseSuccessCount = atomic.LoadInt64(&b.receiveResponseSuccessCount)
+ sn.receiveResponseFailedCount = atomic.LoadInt64(&b.receiveResponseFailedCount)
+ sn.sendMessageSuccessTimeTotal = atomic.LoadInt64(&b.sendMessageSuccessTimeTotal)
+ sn.sendMessageMaxRT = atomic.LoadInt64(&b.sendMessageMaxRT)
+ sn.createdAt = time.Now()
+
+ s.Lock()
+ if s.tail != nil {
+ s.tail.next = sn
+ }
+ s.tail = sn
+ if s.head == nil {
+ s.head = s.tail
+ }
+
+ s.len++
+ if s.len > 10 {
+ s.head = s.head.next
+ s.len--
+ }
+ s.Unlock()
+}
+
+func (s *snapshots) printStati() {
+ s.RLock()
+ if s.len < 10 {
+ s.RUnlock()
+ return
+ }
+
+ f, l := s.head, s.tail
+ respSucCount := float64(l.receiveResponseSuccessCount - f.receiveResponseSuccessCount)
+ sendTps := respSucCount / l.createdAt.Sub(f.createdAt).Seconds()
+ avgRT := float64(l.sendMessageSuccessTimeTotal-f.sendMessageSuccessTimeTotal) / respSucCount
+ maxRT := atomic.LoadInt64(&s.cur.sendMessageMaxRT)
+ s.RUnlock()
+
+ fmt.Printf(
+ "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d Total:%d\n",
+ int64(sendTps), maxRT, avgRT, l.sendRequestFailedCount, l.receiveResponseFailedCount, l.receiveResponseSuccessCount,
+ )
+}
+
+var (
+ topic string
+ nameSrv string
+ groupID string
+ instanceCount int
+ testMinutes int
+ bodySize int
+
+ longText = ""
+ longTextLen int
+)
+
+func init() {
+ flag.StringVar(&topic, "t", "", "topic name")
+ flag.StringVar(&nameSrv, "n", "", "nameserver address")
+ flag.StringVar(&groupID, "g", "", "group id")
+ flag.IntVar(&instanceCount, "i", 1, "instance count")
+ flag.IntVar(&testMinutes, "m", 10, "test minutes")
+ flag.IntVar(&bodySize, "s", 32, "body size")
+
+ longText = strings.Repeat("0123456789", 100)
+ longTextLen = len(longText)
+}
+
+func buildMsg() string {
+ return longText[:bodySize]
+}
+
+func q(flag int) string {
+ return fmt.Sprintf(queue+"_%d", flag)
+}
+
+func produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}, topics, tags []string) {
+ p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{
+ GroupID: "",
+ NameServer: nameSrv,
+ })
+ if err != nil {
+ fmt.Printf("new producer error:%s\n", err)
+ return
+ }
+
+ p.Start()
+ defer p.Shutdown()
+
+AGAIN:
+ select {
+ case <-exit:
+ return
+ default:
+ }
+
+ for i := len(topics) - 1; i >= 0; i-- {
+ topic := topics[i]
+ for _, tag := range tags {
+ req.Queue = topic
+ req.Tags = []string{tag}
+ req.Body = buildMsg()
+
+ now := time.Now()
+ ctx := context.Background()
+ _, err := p.SendMessageSync(&rocketmq.Message{
+ Topic: topic, Body: longText[:bodySize],
+ })
+
+ if err == nil {
+ atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
+ atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
+ currentRT := int64(time.Since(now) / time.Millisecond)
+ atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
+ prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
+ for currentRT > prevRT {
+ if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {
+ break
+ }
+ prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
+ }
+ continue
+ }
+
+ fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error())
+ //if _, ok := err.(*rpc.ErrorInfo); ok { TODO
+ //atomic.AddInt64(&stati.receiveResponseFailedCount, 1)
+ //} else {
+ //atomic.AddInt64(&stati.sendRequestFailedCount, 1)
+ //}
+ }
+ }
+ goto AGAIN
+}
+
+func takeSnapshot(s *snapshots, exit chan struct{}) {
+ ticker := time.NewTicker(time.Second)
+ for {
+ select {
+ case <-ticker.C:
+ s.takeSnapshot()
+ case <-exit:
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+func printStati(s *snapshots, exit chan struct{}) {
+ ticker := time.NewTicker(time.Second * 10)
+ for {
+ select {
+ case <-ticker.C:
+ s.printStati()
+ case <-exit:
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+func main() {
+ flag.Parse()
+
+ stati := statiBenchmarkProducerSnapshot{}
+ snapshots := snapshots{cur: &stati}
+ exitChan := make(chan struct{})
+ wg := sync.WaitGroup{}
+
+ topics := make([]string, topicCount)
+ for i := 0; i < topicCount; i++ {
+ topics[i] = q(i + startSuffix)
+ }
+
+ tags := make([]string, tagCount)
+ for i := 0; i < tagCount; i++ {
+ tags[i] = fmt.Sprintf("default_tag_%d", i)
+ }
+
+ for i := 0; i < instanceCount; i++ {
+ go func() {
+ wg.Add(1)
+ produceMsg(&stati, exitChan, topics, tags)
+ fmt.Println("exit of produce ms")
+ wg.Done()
+ }()
+ }
+
+ // snapshot
+ go func() {
+ wg.Add(1)
+ takeSnapshot(&snapshots, exitChan)
+ wg.Done()
+ }()
+
+ // print statistic
+ go func() {
+ wg.Add(1)
+ printStati(&snapshots, exitChan)
+ wg.Done()
+ }()
+
+ signalChan := make(chan os.Signal, 1)
+ signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
+ select {
+ case <-time.Tick(time.Minute * time.Duration(testMinutes)):
+ case <-signalChan:
+ }
+
+ close(exitChan)
+ wg.Wait()
+ snapshots.takeSnapshot()
+ snapshots.printStati()
+ fmt.Println("TEST DONE")
+}