[ISSUE #596] Fix command register conflict, add consumer and producer logic code (#597)
diff --git a/benchmark/consumer.go b/benchmark/consumer.go
index 941713c..cada933 100644
--- a/benchmark/consumer.go
+++ b/benchmark/consumer.go
@@ -18,8 +18,12 @@
package main
import (
+ "context"
"flag"
"fmt"
+ "github.com/apache/rocketmq-client-go/v2"
+ "github.com/apache/rocketmq-client-go/v2/consumer"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
"os"
"os/signal"
"sync"
@@ -91,7 +95,7 @@
)
}
-type consumer struct {
+type consumerBenchmark struct {
topic string
groupPrefix string
nameSrv string
@@ -107,7 +111,7 @@
}
func init() {
- c := &consumer{}
+ c := &consumerBenchmark{}
flags := flag.NewFlagSet("consumer", flag.ExitOnError)
c.flags = flags
@@ -123,87 +127,87 @@
registerCommand("consumer", c)
}
-func (c *consumer) consumeMsg(stati *statiBenchmarkConsumerSnapshot, exit chan struct{}) {
- //consumer, err := rocketmq.NewPushConsumer(&rocketmq.PushConsumerConfig{
- // ClientConfig: rocketmq.ClientConfig{
- // GroupID: c.groupID,
- // NameServer: c.nameSrv,
- // },
- // ThreadCount: c.instanceCount,
- // MessageBatchMaxSize: 16,
- //})
- //if err != nil {
- // panic("new push consumer error:" + err.Error())
- //}
- //
- //consumer.Subscribe(c.topic, c.expression, func(m *rocketmq.MessageExt) rocketmq.ConsumeStatus {
- // atomic.AddInt64(&stati.receiveMessageTotal, 1)
- // now := time.Now().UnixNano() / int64(time.Millisecond)
- // b2cRT := now - m.BornTimestamp
- // atomic.AddInt64(&stati.born2ConsumerTotalRT, b2cRT)
- // s2cRT := now - m.StoreTimestamp
- // atomic.AddInt64(&stati.store2ConsumerTotalRT, s2cRT)
- //
- // for {
- // old := atomic.LoadInt64(&stati.born2ConsumerMaxRT)
- // if old >= b2cRT || atomic.CompareAndSwapInt64(&stati.born2ConsumerMaxRT, old, b2cRT) {
- // break
- // }
- // }
- //
- // for {
- // old := atomic.LoadInt64(&stati.store2ConsumerMaxRT)
- // if old >= s2cRT || atomic.CompareAndSwapInt64(&stati.store2ConsumerMaxRT, old, s2cRT) {
- // break
- // }
- // }
- //
- // return rocketmq.ConsumeSuccess
- //})
- //println("Start")
- //consumer.Start()
- //select {
- //case <-exit:
- // consumer.Shutdown()
- // return
- //}
+func (bc *consumerBenchmark) consumeMsg(stati *statiBenchmarkConsumerSnapshot, exit chan struct{}) {
+ c, err := rocketmq.NewPushConsumer(
+ consumer.WithGroupName(bc.groupID),
+ consumer.WithNameServer([]string{bc.nameSrv}),
+ )
+ if err != nil {
+ panic("new push consumer error:" + err.Error())
+ }
+
+ selector := consumer.MessageSelector{}
+ err = c.Subscribe(bc.topic, selector, func(ctx context.Context,
+ msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+ for _, msg := range msgs {
+ atomic.AddInt64(&stati.receiveMessageTotal, 1)
+ now := time.Now().UnixNano() / int64(time.Millisecond)
+ b2cRT := now - msg.BornTimestamp
+ atomic.AddInt64(&stati.born2ConsumerTotalRT, b2cRT)
+ s2cRT := now - msg.StoreTimestamp
+ atomic.AddInt64(&stati.store2ConsumerTotalRT, s2cRT)
+
+ for {
+ old := atomic.LoadInt64(&stati.born2ConsumerMaxRT)
+ if old >= b2cRT || atomic.CompareAndSwapInt64(&stati.born2ConsumerMaxRT, old, b2cRT) {
+ break
+ }
+ }
+
+ for {
+ old := atomic.LoadInt64(&stati.store2ConsumerMaxRT)
+ if old >= s2cRT || atomic.CompareAndSwapInt64(&stati.store2ConsumerMaxRT, old, s2cRT) {
+ break
+ }
+ }
+ }
+ return consumer.ConsumeSuccess, nil
+ })
+
+ println("Start")
+ c.Start()
+ select {
+ case <-exit:
+ c.Shutdown()
+ return
+ }
}
-func (c *consumer) run(args []string) {
- c.flags.Parse(args)
- if c.topic == "" {
+func (bc *consumerBenchmark) run(args []string) {
+ bc.flags.Parse(args)
+ if bc.topic == "" {
println("empty topic")
- c.usage()
+ bc.usage()
return
}
- if c.groupPrefix == "" {
+ if bc.groupPrefix == "" {
println("empty group prefix")
- c.usage()
+ bc.usage()
return
}
- if c.nameSrv == "" {
+ if bc.nameSrv == "" {
println("empty name server")
- c.usage()
+ bc.usage()
return
}
- if c.testMinutes <= 0 {
+ if bc.testMinutes <= 0 {
println("test time must be positive integer")
- c.usage()
+ bc.usage()
return
}
- if c.instanceCount <= 0 {
+ if bc.instanceCount <= 0 {
println("thread count must be positive integer")
- c.usage()
+ bc.usage()
return
}
- c.groupID = c.groupPrefix
- if c.isPrefixEnable {
- c.groupID += fmt.Sprintf("_%d", time.Now().UnixNano()/int64(time.Millisecond)%100)
+ bc.groupID = bc.groupPrefix
+ if bc.isPrefixEnable {
+ bc.groupID += fmt.Sprintf("_%d", time.Now().UnixNano()/int64(time.Millisecond)%100)
}
stati := statiBenchmarkConsumerSnapshot{}
@@ -214,7 +218,7 @@
wg.Add(1)
go func() {
- c.consumeMsg(&stati, exitChan)
+ bc.consumeMsg(&stati, exitChan)
wg.Done()
}()
@@ -253,7 +257,7 @@
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
select {
- case <-time.Tick(time.Minute * time.Duration(c.testMinutes)):
+ case <-time.Tick(time.Minute * time.Duration(bc.testMinutes)):
case <-signalChan:
}
@@ -264,6 +268,6 @@
snapshots.printStati()
}
-func (c *consumer) usage() {
- c.flags.Usage()
+func (bc *consumerBenchmark) usage() {
+ bc.flags.Usage()
}
diff --git a/benchmark/producer.go b/benchmark/producer.go
index f3c1c60..537ffbe 100644
--- a/benchmark/producer.go
+++ b/benchmark/producer.go
@@ -18,8 +18,12 @@
package main
import (
+ "context"
"flag"
"fmt"
+ "github.com/apache/rocketmq-client-go/v2"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+ "github.com/apache/rocketmq-client-go/v2/producer"
"os"
"os/signal"
"sync"
@@ -93,7 +97,7 @@
)
}
-type producer struct {
+type producerBenchmark struct {
topic string
nameSrv string
groupID string
@@ -105,8 +109,8 @@
}
func init() {
- p := &producer{}
- flags := flag.NewFlagSet("consumer", flag.ExitOnError)
+ p := &producerBenchmark{}
+ flags := flag.NewFlagSet("producer", flag.ExitOnError)
p.flags = flags
flags.StringVar(&p.topic, "t", "", "topic name")
@@ -116,60 +120,62 @@
flags.IntVar(&p.testMinutes, "m", 10, "test minutes")
flags.IntVar(&p.bodySize, "s", 32, "body size")
- registerCommand("consumer", p)
+ registerCommand("producer", p)
}
-func (bp *producer) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) {
- //p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{
- // ClientConfig: rocketmq.ClientConfig{GroupID: bp.groupID, NameServer: bp.nameSrv},
- //})
- //if err != nil {
- // fmt.Printf("new consumer error:%s\n", err)
- // return
- //}
- //
- //p.Start()
- //defer p.Shutdown()
+func (bp *producerBenchmark) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) {
+ p, err := rocketmq.NewProducer(
+ producer.WithNameServer([]string{bp.nameSrv}),
+ producer.WithRetry(2),
+ )
- //topic, tag := bp.topic, "benchmark-consumer"
- //
- //AGAIN:
- // select {
- // case <-exit:
- // return
- // default:
- // }
+ if err != nil {
+ fmt.Printf("new producer error: %s\n", err)
+ return
+ }
- //now := time.Now()
- //r, err := p.SendMessageSync(&rocketmq.Message{
- // Topic: bp.topic, Body: buildMsg(bp.bodySize),
- //})
- //
- //if err != nil {
- // fmt.Printf("send message sync error:%s", err)
- // goto AGAIN
- //}
- //
- //if r.Status == rocketmq.SendOK {
- // atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
- // atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
- // currentRT := int64(time.Since(now) / time.Millisecond)
- // atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
- // prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
- // for currentRT > prevRT {
- // if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {
- // break
- // }
- // prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
- // }
- // goto AGAIN
- //}
- //
- //fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error())
- //goto AGAIN
+ err = p.Start()
+
+ defer p.Shutdown()
+
+ topic, tag := bp.topic, "benchmark-producer"
+ msgStr := buildMsg(bp.bodySize)
+
+AGAIN:
+ select {
+ case <-exit:
+ return
+ default:
+ }
+
+ now := time.Now()
+ r, err := p.SendSync(context.Background(), primitive.NewMessage(topic, []byte(msgStr)))
+
+ if err != nil {
+ fmt.Printf("send message sync error:%s", err)
+ goto AGAIN
+ }
+
+ if r.Status == primitive.SendOK {
+ atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
+ atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
+ currentRT := int64(time.Since(now) / time.Millisecond)
+ atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
+ prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
+ for currentRT > prevRT {
+ if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {
+ break
+ }
+ prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
+ }
+ goto AGAIN
+ }
+
+ fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error())
+ goto AGAIN
}
-func (bp *producer) run(args []string) {
+func (bp *producerBenchmark) run(args []string) {
bp.flags.Parse(args)
if bp.topic == "" {
@@ -266,6 +272,6 @@
fmt.Println("TEST DONE")
}
-func (bp *producer) usage() {
+func (bp *producerBenchmark) usage() {
bp.flags.Usage()
}