Merge pull request #17 from zjykzk/benchmark
Producer benchmark
diff --git a/benchmark/main.go b/benchmark/main.go
new file mode 100644
index 0000000..d268d72
--- /dev/null
+++ b/benchmark/main.go
@@ -0,0 +1,62 @@
+package main
+
+import (
+ "fmt"
+ "os"
+ "strings"
+)
+
+type command interface {
+ usage()
+ run(args []string)
+}
+
+var (
+ cmds = map[string]command{}
+ longText = ""
+ longTextLen = 0
+)
+
+func init() {
+ longText = strings.Repeat("0123456789", 100)
+ longTextLen = len(longText)
+}
+
+func registerCommand(name string, cmd command) {
+ if cmd == nil {
+ panic("empty command")
+ }
+
+ _, ok := cmds[name]
+ if ok {
+ panic(fmt.Sprintf("%s command existed", name))
+ }
+
+ cmds[name] = cmd
+}
+
+func usage() {
+ println(os.Args[0] + " commandName [...]")
+ for _, cmd := range cmds {
+ cmd.usage()
+ }
+}
+
+// go run *.go [command name] [command args]
+func main() {
+ if len(os.Args) < 2 {
+ println("error:lack cmd name\n")
+ usage()
+ return
+ }
+
+ name := os.Args[1]
+ cmd, ok := cmds[name]
+ if !ok {
+ fmt.Printf("command %s is not supported\n", name)
+ usage()
+ return
+ }
+
+ cmd.run(os.Args[2:])
+}
diff --git a/benchmark/producer.go b/benchmark/producer.go
new file mode 100644
index 0000000..b51df0a
--- /dev/null
+++ b/benchmark/producer.go
@@ -0,0 +1,267 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+ "os"
+ "os/signal"
+ "sync"
+ "sync/atomic"
+ "syscall"
+ "time"
+
+ rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+type statiBenchmarkProducerSnapshot struct {
+ sendRequestSuccessCount int64
+ sendRequestFailedCount int64
+ receiveResponseSuccessCount int64
+ receiveResponseFailedCount int64
+ sendMessageSuccessTimeTotal int64
+ sendMessageMaxRT int64
+ createdAt time.Time
+ next *statiBenchmarkProducerSnapshot
+}
+
+type snapshots struct {
+ sync.RWMutex
+ head, tail, cur *statiBenchmarkProducerSnapshot
+ len int
+}
+
+func (s *snapshots) takeSnapshot() {
+ b := s.cur
+ sn := new(statiBenchmarkProducerSnapshot)
+ sn.sendRequestSuccessCount = atomic.LoadInt64(&b.sendRequestSuccessCount)
+ sn.sendRequestFailedCount = atomic.LoadInt64(&b.sendRequestFailedCount)
+ sn.receiveResponseSuccessCount = atomic.LoadInt64(&b.receiveResponseSuccessCount)
+ sn.receiveResponseFailedCount = atomic.LoadInt64(&b.receiveResponseFailedCount)
+ sn.sendMessageSuccessTimeTotal = atomic.LoadInt64(&b.sendMessageSuccessTimeTotal)
+ sn.sendMessageMaxRT = atomic.LoadInt64(&b.sendMessageMaxRT)
+ sn.createdAt = time.Now()
+
+ s.Lock()
+ if s.tail != nil {
+ s.tail.next = sn
+ }
+ s.tail = sn
+ if s.head == nil {
+ s.head = s.tail
+ }
+
+ s.len++
+ if s.len > 10 {
+ s.head = s.head.next
+ s.len--
+ }
+ s.Unlock()
+}
+
+func (s *snapshots) printStati() {
+ s.RLock()
+ if s.len < 10 {
+ s.RUnlock()
+ return
+ }
+
+ f, l := s.head, s.tail
+ respSucCount := float64(l.receiveResponseSuccessCount - f.receiveResponseSuccessCount)
+ sendTps := respSucCount / l.createdAt.Sub(f.createdAt).Seconds()
+ avgRT := float64(l.sendMessageSuccessTimeTotal-f.sendMessageSuccessTimeTotal) / respSucCount
+ maxRT := atomic.LoadInt64(&s.cur.sendMessageMaxRT)
+ s.RUnlock()
+
+ fmt.Printf(
+ "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d Total:%d\n",
+ int64(sendTps), maxRT, avgRT, l.sendRequestFailedCount, l.receiveResponseFailedCount, l.receiveResponseSuccessCount,
+ )
+}
+func takeSnapshot(s *snapshots, exit chan struct{}) {
+ ticker := time.NewTicker(time.Second)
+ for {
+ select {
+ case <-ticker.C:
+ s.takeSnapshot()
+ case <-exit:
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+func printStati(s *snapshots, exit chan struct{}) {
+ ticker := time.NewTicker(time.Second * 10)
+ for {
+ select {
+ case <-ticker.C:
+ s.printStati()
+ case <-exit:
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+type producer struct {
+ topic string
+ nameSrv string
+ groupID string
+ instanceCount int
+ testMinutes int
+ bodySize int
+
+ flags *flag.FlagSet
+}
+
+func init() {
+ p := &producer{}
+ flags := flag.NewFlagSet("producer", flag.ExitOnError)
+ p.flags = flags
+
+ flags.StringVar(&p.topic, "t", "", "topic name")
+ flags.StringVar(&p.nameSrv, "n", "", "nameserver address")
+ flags.StringVar(&p.groupID, "g", "", "group id")
+ flags.IntVar(&p.instanceCount, "i", 1, "instance count")
+ flags.IntVar(&p.testMinutes, "m", 10, "test minutes")
+ flags.IntVar(&p.bodySize, "s", 32, "body size")
+
+ registerCommand("producer", p)
+}
+
+func (bp *producer) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) {
+ p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{
+ ClientConfig: rocketmq.ClientConfig{GroupID: bp.groupID, NameServer: bp.nameSrv},
+ })
+ if err != nil {
+ fmt.Printf("new producer error:%s\n", err)
+ return
+ }
+
+ p.Start()
+ defer p.Shutdown()
+
+ topic, tag := bp.topic, "benchmark-producer"
+
+AGAIN:
+ select {
+ case <-exit:
+ return
+ default:
+ }
+
+ now := time.Now()
+ r := p.SendMessageSync(&rocketmq.Message{
+ Topic: bp.topic, Body: longText[:bp.bodySize],
+ })
+
+ if r.Status == rocketmq.SendOK {
+ atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
+ atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
+ currentRT := int64(time.Since(now) / time.Millisecond)
+ atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
+ prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
+ for currentRT > prevRT {
+ if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {
+ break
+ }
+ prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
+ }
+ goto AGAIN
+ }
+
+ fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error())
+ //if _, ok := err.(*rpc.ErrorInfo); ok { TODO
+ //atomic.AddInt64(&stati.receiveResponseFailedCount, 1)
+ //} else {
+ //atomic.AddInt64(&stati.sendRequestFailedCount, 1)
+ //}
+ goto AGAIN
+}
+
+func (bp *producer) run(args []string) {
+ bp.flags.Parse(args)
+
+ if bp.topic == "" {
+ println("empty topic")
+ bp.flags.Usage()
+ return
+ }
+
+ if bp.groupID == "" {
+ println("empty group id")
+ bp.flags.Usage()
+ return
+ }
+
+ if bp.nameSrv == "" {
+ println("empty namesrv")
+ bp.flags.Usage()
+ return
+ }
+ if bp.instanceCount <= 0 {
+ println("instance count must be positive integer")
+ bp.flags.Usage()
+ return
+ }
+ if bp.testMinutes <= 0 {
+ println("test time must be positive integer")
+ bp.flags.Usage()
+ return
+ }
+ if bp.bodySize <= 0 {
+ println("body size must be positive integer")
+ bp.flags.Usage()
+ return
+ }
+
+ stati := statiBenchmarkProducerSnapshot{}
+ snapshots := snapshots{cur: &stati}
+ exitChan := make(chan struct{})
+ wg := sync.WaitGroup{}
+
+ for i := 0; i < bp.instanceCount; i++ {
+ i := i
+ go func() {
+ wg.Add(1)
+ bp.produceMsg(&stati, exitChan)
+ fmt.Printf("exit of produce %d\n", i)
+ wg.Done()
+ }()
+ }
+
+ // snapshot
+ go func() {
+ wg.Add(1)
+ takeSnapshot(&snapshots, exitChan)
+ wg.Done()
+ }()
+
+ // print statistic
+ go func() {
+ wg.Add(1)
+ printStati(&snapshots, exitChan)
+ wg.Done()
+ }()
+
+ signalChan := make(chan os.Signal, 1)
+ signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
+ select {
+ case <-time.Tick(time.Minute * time.Duration(bp.testMinutes)):
+ case <-signalChan:
+ }
+
+ close(exitChan)
+ wg.Wait()
+ snapshots.takeSnapshot()
+ snapshots.printStati()
+ fmt.Println("TEST DONE")
+}
+
+func (bp *producer) usage() {
+ bp.flags.Usage()
+}
+
+func (bp *producer) buildMsg() string {
+ return longText[:bp.bodySize]
+}
diff --git a/core/api.go b/core/api.go
index 58c1465..1ed6c82 100644
--- a/core/api.go
+++ b/core/api.go
@@ -22,7 +22,7 @@
return GetVersion()
}
-type clientConfig struct {
+type ClientConfig struct {
GroupID string
NameServer string
NameServerDomain string
@@ -32,7 +32,7 @@
LogC *LogConfig
}
-func (config *clientConfig) string() string {
+func (config *ClientConfig) string() string {
// For security, don't print Credentials.
str := ""
str = strJoin(str, "GroupId", config.GroupID)
@@ -55,14 +55,14 @@
// ProducerConfig define a producer
type ProducerConfig struct {
- clientConfig
+ ClientConfig
SendMsgTimeout int
CompressLevel int
MaxMessageSize int
}
func (config *ProducerConfig) String() string {
- str := "ProducerConfig=[" + config.clientConfig.string()
+ str := "ProducerConfig=[" + config.ClientConfig.string()
if config.SendMsgTimeout > 0 {
str = strJoin(str, "SendMsgTimeout", config.SendMsgTimeout)
@@ -116,7 +116,7 @@
// PushConsumerConfig define a new consumer.
type PushConsumerConfig struct {
- clientConfig
+ ClientConfig
ThreadCount int
MessageBatchMaxSize int
Model MessageModel
@@ -124,7 +124,7 @@
func (config *PushConsumerConfig) String() string {
// For security, don't print Credentials.
- str := "PushConsumerConfig=[" + config.clientConfig.string()
+ str := "PushConsumerConfig=[" + config.ClientConfig.string()
if config.ThreadCount > 0 {
str = strJoin(str, "ThreadCount", config.ThreadCount)
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
index 6dcac43..fdf8d76 100644
--- a/core/pull_consumer.go
+++ b/core/pull_consumer.go
@@ -65,7 +65,7 @@
// PullConsumerConfig the configuration for the pull consumer
type PullConsumerConfig struct {
- clientConfig
+ ClientConfig
}
// DefaultPullConsumer default consumer pulling the message