benchmark producer
diff --git a/benchmark/producer/main.go b/benchmark/producer/main.go
new file mode 100644
index 0000000..6585de3
--- /dev/null
+++ b/benchmark/producer/main.go
@@ -0,0 +1,253 @@
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"os"
+	"os/signal"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"syscall"
+	"time"
+
+	rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+type statiBenchmarkProducerSnapshot struct {
+	sendRequestSuccessCount     int64
+	sendRequestFailedCount      int64
+	receiveResponseSuccessCount int64
+	receiveResponseFailedCount  int64
+	sendMessageSuccessTimeTotal int64
+	sendMessageMaxRT            int64
+	createdAt                   time.Time
+	next                        *statiBenchmarkProducerSnapshot
+}
+
+type snapshots struct {
+	sync.RWMutex
+	head, tail, cur *statiBenchmarkProducerSnapshot
+	len             int
+}
+
+func (s *snapshots) takeSnapshot() {
+	b := s.cur
+	sn := new(statiBenchmarkProducerSnapshot)
+	sn.sendRequestSuccessCount = atomic.LoadInt64(&b.sendRequestSuccessCount)
+	sn.sendRequestFailedCount = atomic.LoadInt64(&b.sendRequestFailedCount)
+	sn.receiveResponseSuccessCount = atomic.LoadInt64(&b.receiveResponseSuccessCount)
+	sn.receiveResponseFailedCount = atomic.LoadInt64(&b.receiveResponseFailedCount)
+	sn.sendMessageSuccessTimeTotal = atomic.LoadInt64(&b.sendMessageSuccessTimeTotal)
+	sn.sendMessageMaxRT = atomic.LoadInt64(&b.sendMessageMaxRT)
+	sn.createdAt = time.Now()
+
+	s.Lock()
+	if s.tail != nil {
+		s.tail.next = sn
+	}
+	s.tail = sn
+	if s.head == nil {
+		s.head = s.tail
+	}
+
+	s.len++
+	if s.len > 10 {
+		s.head = s.head.next
+		s.len--
+	}
+	s.Unlock()
+}
+
+func (s *snapshots) printStati() {
+	s.RLock()
+	if s.len < 10 {
+		s.RUnlock()
+		return
+	}
+
+	f, l := s.head, s.tail
+	respSucCount := float64(l.receiveResponseSuccessCount - f.receiveResponseSuccessCount)
+	sendTps := respSucCount / l.createdAt.Sub(f.createdAt).Seconds()
+	avgRT := float64(l.sendMessageSuccessTimeTotal-f.sendMessageSuccessTimeTotal) / respSucCount
+	maxRT := atomic.LoadInt64(&s.cur.sendMessageMaxRT)
+	s.RUnlock()
+
+	fmt.Printf(
+		"Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d Total:%d\n",
+		int64(sendTps), maxRT, avgRT, l.sendRequestFailedCount, l.receiveResponseFailedCount, l.receiveResponseSuccessCount,
+	)
+}
+
+var (
+	topic         string
+	nameSrv       string
+	groupID       string
+	instanceCount int
+	testMinutes   int
+	bodySize      int
+
+	longText    = ""
+	longTextLen int
+)
+
+func init() {
+	flag.StringVar(&topic, "t", "", "topic name")
+	flag.StringVar(&nameSrv, "n", "", "nameserver address")
+	flag.StringVar(&groupID, "g", "", "group id")
+	flag.IntVar(&instanceCount, "i", 1, "instance count")
+	flag.IntVar(&testMinutes, "m", 10, "test minutes")
+	flag.IntVar(&bodySize, "s", 32, "body size")
+
+	longText = strings.Repeat("0123456789", 100)
+	longTextLen = len(longText)
+}
+
+func buildMsg() string {
+	return longText[:bodySize]
+}
+
+func q(flag int) string {
+	return fmt.Sprintf(queue+"_%d", flag)
+}
+
+func produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}, topics, tags []string) {
+	p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{
+		GroupID:    "",
+		NameServer: nameSrv,
+	})
+	if err != nil {
+		fmt.Printf("new producer error:%s\n", err)
+		return
+	}
+
+	p.Start()
+	defer p.Shutdown()
+
+AGAIN:
+	select {
+	case <-exit:
+		return
+	default:
+	}
+
+	for i := len(topics) - 1; i >= 0; i-- {
+		topic := topics[i]
+		for _, tag := range tags {
+			req.Queue = topic
+			req.Tags = []string{tag}
+			req.Body = buildMsg()
+
+			now := time.Now()
+			ctx := context.Background()
+			_, err := p.SendMessageSync(&rocketmq.Message{
+				Topic: topic, Body: longText[:bodySize],
+			})
+
+			if err == nil {
+				atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
+				atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
+				currentRT := int64(time.Since(now) / time.Millisecond)
+				atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
+				prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
+				for currentRT > prevRT {
+					if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {
+						break
+					}
+					prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
+				}
+				continue
+			}
+
+			fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error())
+			//if _, ok := err.(*rpc.ErrorInfo); ok { TODO
+			//atomic.AddInt64(&stati.receiveResponseFailedCount, 1)
+			//} else {
+			//atomic.AddInt64(&stati.sendRequestFailedCount, 1)
+			//}
+		}
+	}
+	goto AGAIN
+}
+
+func takeSnapshot(s *snapshots, exit chan struct{}) {
+	ticker := time.NewTicker(time.Second)
+	for {
+		select {
+		case <-ticker.C:
+			s.takeSnapshot()
+		case <-exit:
+			ticker.Stop()
+			return
+		}
+	}
+}
+
+func printStati(s *snapshots, exit chan struct{}) {
+	ticker := time.NewTicker(time.Second * 10)
+	for {
+		select {
+		case <-ticker.C:
+			s.printStati()
+		case <-exit:
+			ticker.Stop()
+			return
+		}
+	}
+}
+
+func main() {
+	flag.Parse()
+
+	stati := statiBenchmarkProducerSnapshot{}
+	snapshots := snapshots{cur: &stati}
+	exitChan := make(chan struct{})
+	wg := sync.WaitGroup{}
+
+	topics := make([]string, topicCount)
+	for i := 0; i < topicCount; i++ {
+		topics[i] = q(i + startSuffix)
+	}
+
+	tags := make([]string, tagCount)
+	for i := 0; i < tagCount; i++ {
+		tags[i] = fmt.Sprintf("default_tag_%d", i)
+	}
+
+	for i := 0; i < instanceCount; i++ {
+		go func() {
+			wg.Add(1)
+			produceMsg(&stati, exitChan, topics, tags)
+			fmt.Println("exit of produce ms")
+			wg.Done()
+		}()
+	}
+
+	// snapshot
+	go func() {
+		wg.Add(1)
+		takeSnapshot(&snapshots, exitChan)
+		wg.Done()
+	}()
+
+	// print statistic
+	go func() {
+		wg.Add(1)
+		printStati(&snapshots, exitChan)
+		wg.Done()
+	}()
+
+	signalChan := make(chan os.Signal, 1)
+	signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
+	select {
+	case <-time.Tick(time.Minute * time.Duration(testMinutes)):
+	case <-signalChan:
+	}
+
+	close(exitChan)
+	wg.Wait()
+	snapshots.takeSnapshot()
+	snapshots.printStati()
+	fmt.Println("TEST DONE")
+}