[ISSUE #596] Fix command register conflict, add consumer and producer logic code (#597)

diff --git a/benchmark/consumer.go b/benchmark/consumer.go
index 941713c..cada933 100644
--- a/benchmark/consumer.go
+++ b/benchmark/consumer.go
@@ -18,8 +18,12 @@
 package main
 
 import (
+	"context"
 	"flag"
 	"fmt"
+	"github.com/apache/rocketmq-client-go/v2"
+	"github.com/apache/rocketmq-client-go/v2/consumer"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
 	"os"
 	"os/signal"
 	"sync"
@@ -91,7 +95,7 @@
 	)
 }
 
-type consumer struct {
+type consumerBenchmark struct {
 	topic          string
 	groupPrefix    string
 	nameSrv        string
@@ -107,7 +111,7 @@
 }
 
 func init() {
-	c := &consumer{}
+	c := &consumerBenchmark{}
 	flags := flag.NewFlagSet("consumer", flag.ExitOnError)
 	c.flags = flags
 
@@ -123,87 +127,87 @@
 	registerCommand("consumer", c)
 }
 
-func (c *consumer) consumeMsg(stati *statiBenchmarkConsumerSnapshot, exit chan struct{}) {
-	//consumer, err := rocketmq.NewPushConsumer(&rocketmq.PushConsumerConfig{
-	//	ClientConfig: rocketmq.ClientConfig{
-	//		GroupID:    c.groupID,
-	//		NameServer: c.nameSrv,
-	//	},
-	//	ThreadCount:         c.instanceCount,
-	//	MessageBatchMaxSize: 16,
-	//})
-	//if err != nil {
-	//	panic("new push consumer error:" + err.Error())
-	//}
-	//
-	//consumer.Subscribe(c.topic, c.expression, func(m *rocketmq.MessageExt) rocketmq.ConsumeStatus {
-	//	atomic.AddInt64(&stati.receiveMessageTotal, 1)
-	//	now := time.Now().UnixNano() / int64(time.Millisecond)
-	//	b2cRT := now - m.BornTimestamp
-	//	atomic.AddInt64(&stati.born2ConsumerTotalRT, b2cRT)
-	//	s2cRT := now - m.StoreTimestamp
-	//	atomic.AddInt64(&stati.store2ConsumerTotalRT, s2cRT)
-	//
-	//	for {
-	//		old := atomic.LoadInt64(&stati.born2ConsumerMaxRT)
-	//		if old >= b2cRT || atomic.CompareAndSwapInt64(&stati.born2ConsumerMaxRT, old, b2cRT) {
-	//			break
-	//		}
-	//	}
-	//
-	//	for {
-	//		old := atomic.LoadInt64(&stati.store2ConsumerMaxRT)
-	//		if old >= s2cRT || atomic.CompareAndSwapInt64(&stati.store2ConsumerMaxRT, old, s2cRT) {
-	//			break
-	//		}
-	//	}
-	//
-	//	return rocketmq.ConsumeSuccess
-	//})
-	//println("Start")
-	//consumer.Start()
-	//select {
-	//case <-exit:
-	//	consumer.Shutdown()
-	//	return
-	//}
+func (bc *consumerBenchmark) consumeMsg(stati *statiBenchmarkConsumerSnapshot, exit chan struct{}) {
+	c, err := rocketmq.NewPushConsumer(
+		consumer.WithGroupName(bc.groupID),
+		consumer.WithNameServer([]string{bc.nameSrv}),
+	)
+	if err != nil {
+		panic("new push consumer error:" + err.Error())
+	}
+
+	selector := consumer.MessageSelector{}
+	err = c.Subscribe(bc.topic, selector, func(ctx context.Context,
+		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+		for _, msg := range msgs {
+			atomic.AddInt64(&stati.receiveMessageTotal, 1)
+			now := time.Now().UnixNano() / int64(time.Millisecond)
+			b2cRT := now - msg.BornTimestamp
+			atomic.AddInt64(&stati.born2ConsumerTotalRT, b2cRT)
+			s2cRT := now - msg.StoreTimestamp
+			atomic.AddInt64(&stati.store2ConsumerTotalRT, s2cRT)
+
+			for {
+				old := atomic.LoadInt64(&stati.born2ConsumerMaxRT)
+				if old >= b2cRT || atomic.CompareAndSwapInt64(&stati.born2ConsumerMaxRT, old, b2cRT) {
+					break
+				}
+			}
+
+			for {
+				old := atomic.LoadInt64(&stati.store2ConsumerMaxRT)
+				if old >= s2cRT || atomic.CompareAndSwapInt64(&stati.store2ConsumerMaxRT, old, s2cRT) {
+					break
+				}
+			}
+		}
+		return consumer.ConsumeSuccess, nil
+	})
+
+	println("Start")
+	c.Start()
+	select {
+	case <-exit:
+		c.Shutdown()
+		return
+	}
 }
 
-func (c *consumer) run(args []string) {
-	c.flags.Parse(args)
-	if c.topic == "" {
+func (bc *consumerBenchmark) run(args []string) {
+	bc.flags.Parse(args)
+	if bc.topic == "" {
 		println("empty topic")
-		c.usage()
+		bc.usage()
 		return
 	}
 
-	if c.groupPrefix == "" {
+	if bc.groupPrefix == "" {
 		println("empty group prefix")
-		c.usage()
+		bc.usage()
 		return
 	}
 
-	if c.nameSrv == "" {
+	if bc.nameSrv == "" {
 		println("empty name server")
-		c.usage()
+		bc.usage()
 		return
 	}
 
-	if c.testMinutes <= 0 {
+	if bc.testMinutes <= 0 {
 		println("test time must be positive integer")
-		c.usage()
+		bc.usage()
 		return
 	}
 
-	if c.instanceCount <= 0 {
+	if bc.instanceCount <= 0 {
 		println("thread count must be positive integer")
-		c.usage()
+		bc.usage()
 		return
 	}
 
-	c.groupID = c.groupPrefix
-	if c.isPrefixEnable {
-		c.groupID += fmt.Sprintf("_%d", time.Now().UnixNano()/int64(time.Millisecond)%100)
+	bc.groupID = bc.groupPrefix
+	if bc.isPrefixEnable {
+		bc.groupID += fmt.Sprintf("_%d", time.Now().UnixNano()/int64(time.Millisecond)%100)
 	}
 
 	stati := statiBenchmarkConsumerSnapshot{}
@@ -214,7 +218,7 @@
 
 	wg.Add(1)
 	go func() {
-		c.consumeMsg(&stati, exitChan)
+		bc.consumeMsg(&stati, exitChan)
 		wg.Done()
 	}()
 
@@ -253,7 +257,7 @@
 	signalChan := make(chan os.Signal, 1)
 	signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
 	select {
-	case <-time.Tick(time.Minute * time.Duration(c.testMinutes)):
+	case <-time.Tick(time.Minute * time.Duration(bc.testMinutes)):
 	case <-signalChan:
 	}
 
@@ -264,6 +268,6 @@
 	snapshots.printStati()
 }
 
-func (c *consumer) usage() {
-	c.flags.Usage()
+func (bc *consumerBenchmark) usage() {
+	bc.flags.Usage()
 }
diff --git a/benchmark/producer.go b/benchmark/producer.go
index f3c1c60..537ffbe 100644
--- a/benchmark/producer.go
+++ b/benchmark/producer.go
@@ -18,8 +18,12 @@
 package main
 
 import (
+	"context"
 	"flag"
 	"fmt"
+	"github.com/apache/rocketmq-client-go/v2"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/producer"
 	"os"
 	"os/signal"
 	"sync"
@@ -93,7 +97,7 @@
 	)
 }
 
-type producer struct {
+type producerBenchmark struct {
 	topic         string
 	nameSrv       string
 	groupID       string
@@ -105,8 +109,8 @@
 }
 
 func init() {
-	p := &producer{}
-	flags := flag.NewFlagSet("consumer", flag.ExitOnError)
+	p := &producerBenchmark{}
+	flags := flag.NewFlagSet("producer", flag.ExitOnError)
 	p.flags = flags
 
 	flags.StringVar(&p.topic, "t", "", "topic name")
@@ -116,60 +120,62 @@
 	flags.IntVar(&p.testMinutes, "m", 10, "test minutes")
 	flags.IntVar(&p.bodySize, "s", 32, "body size")
 
-	registerCommand("consumer", p)
+	registerCommand("producer", p)
 }
 
-func (bp *producer) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) {
-	//p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{
-	//	ClientConfig: rocketmq.ClientConfig{GroupID: bp.groupID, NameServer: bp.nameSrv},
-	//})
-	//if err != nil {
-	//	fmt.Printf("new consumer error:%s\n", err)
-	//	return
-	//}
-	//
-	//p.Start()
-	//defer p.Shutdown()
+func (bp *producerBenchmark) produceMsg(stati *statiBenchmarkProducerSnapshot, exit chan struct{}) {
+	p, err := rocketmq.NewProducer(
+		producer.WithNameServer([]string{bp.nameSrv}),
+		producer.WithRetry(2),
+	)
 
-	//topic, tag := bp.topic, "benchmark-consumer"
-	//
-	//AGAIN:
-	//	select {
-	//	case <-exit:
-	//		return
-	//	default:
-	//	}
+	if err != nil {
+		fmt.Printf("new producer error: %s\n", err)
+		return
+	}
 
-	//now := time.Now()
-	//r, err := p.SendMessageSync(&rocketmq.Message{
-	//	Topic: bp.topic, Body: buildMsg(bp.bodySize),
-	//})
-	//
-	//if err != nil {
-	//	fmt.Printf("send message sync error:%s", err)
-	//	goto AGAIN
-	//}
-	//
-	//if r.Status == rocketmq.SendOK {
-	//	atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
-	//	atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
-	//	currentRT := int64(time.Since(now) / time.Millisecond)
-	//	atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
-	//	prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
-	//	for currentRT > prevRT {
-	//		if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {
-	//			break
-	//		}
-	//		prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
-	//	}
-	//	goto AGAIN
-	//}
-	//
-	//fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error())
-	//goto AGAIN
+	err = p.Start()
+
+	defer p.Shutdown()
+
+	topic, tag := bp.topic, "benchmark-producer"
+	msgStr := buildMsg(bp.bodySize)
+
+AGAIN:
+	select {
+	case <-exit:
+		return
+	default:
+	}
+
+	now := time.Now()
+	r, err := p.SendSync(context.Background(), primitive.NewMessage(topic, []byte(msgStr)))
+
+	if err != nil {
+		fmt.Printf("send message sync error:%s", err)
+		goto AGAIN
+	}
+
+	if r.Status == primitive.SendOK {
+		atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
+		atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
+		currentRT := int64(time.Since(now) / time.Millisecond)
+		atomic.AddInt64(&stati.sendMessageSuccessTimeTotal, currentRT)
+		prevRT := atomic.LoadInt64(&stati.sendMessageMaxRT)
+		for currentRT > prevRT {
+			if atomic.CompareAndSwapInt64(&stati.sendMessageMaxRT, prevRT, currentRT) {
+				break
+			}
+			prevRT = atomic.LoadInt64(&stati.sendMessageMaxRT)
+		}
+		goto AGAIN
+	}
+
+	fmt.Printf("%v send message %s:%s error:%s\n", time.Now(), topic, tag, err.Error())
+	goto AGAIN
 }
 
-func (bp *producer) run(args []string) {
+func (bp *producerBenchmark) run(args []string) {
 	bp.flags.Parse(args)
 
 	if bp.topic == "" {
@@ -266,6 +272,6 @@
 	fmt.Println("TEST DONE")
 }
 
-func (bp *producer) usage() {
+func (bp *producerBenchmark) usage() {
 	bp.flags.Usage()
 }