Stable test (#20)

Add a stable test;
Add license in the benchmark file.
diff --git a/benchmark/consumer.go b/benchmark/consumer.go
index 9e1cd77..1a893c9 100644
--- a/benchmark/consumer.go
+++ b/benchmark/consumer.go
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package main
 
 import (
diff --git a/benchmark/main.go b/benchmark/main.go
index d268d72..080a948 100644
--- a/benchmark/main.go
+++ b/benchmark/main.go
@@ -1,9 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package main
 
 import (
 	"fmt"
 	"os"
-	"strings"
 )
 
 type command interface {
@@ -12,16 +28,9 @@
 }
 
 var (
-	cmds        = map[string]command{}
-	longText    = ""
-	longTextLen = 0
+	cmds = map[string]command{}
 )
 
-func init() {
-	longText = strings.Repeat("0123456789", 100)
-	longTextLen = len(longText)
-}
-
 func registerCommand(name string, cmd command) {
 	if cmd == nil {
 		panic("empty command")
diff --git a/benchmark/message.go b/benchmark/message.go
new file mode 100644
index 0000000..d5690fe
--- /dev/null
+++ b/benchmark/message.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package main
+
+import "strings"
+
+var (
+	longText    = ""
+	longTextLen = 0
+)
+
+func init() {
+	longText = strings.Repeat("0123456789", 100)
+	longTextLen = len(longText)
+}
+
+func buildMsg(size int) string {
+	return longText[:size]
+}
diff --git a/benchmark/producer.go b/benchmark/producer.go
index 7b21356..e183269 100644
--- a/benchmark/producer.go
+++ b/benchmark/producer.go
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package main
 
 import (
@@ -126,10 +143,15 @@
 	}
 
 	now := time.Now()
-	r := p.SendMessageSync(&rocketmq.Message{
-		Topic: bp.topic, Body: longText[:bp.bodySize],
+	r, err := p.SendMessageSync(&rocketmq.Message{
+		Topic: bp.topic, Body: buildMsg(bp.bodySize),
 	})
 
+	if err != nil {
+		fmt.Printf("send message sync error:%s", err)
+		goto AGAIN
+	}
+
 	if r.Status == rocketmq.SendOK {
 		atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
 		atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
@@ -249,7 +271,3 @@
 func (bp *producer) usage() {
 	bp.flags.Usage()
 }
-
-func (bp *producer) buildMsg() string {
-	return longText[:bp.bodySize]
-}
diff --git a/benchmark/stable.go b/benchmark/stable.go
new file mode 100644
index 0000000..6c7a12c
--- /dev/null
+++ b/benchmark/stable.go
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package main
+
+import (
+	"errors"
+	"flag"
+	"fmt"
+	"os"
+	"os/signal"
+	"syscall"
+	"time"
+
+	rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+type stableTest struct {
+	nameSrv       string
+	topic         string
+	groupID       string
+	opIntervalSec int
+	testMin       int
+
+	op func()
+
+	flags *flag.FlagSet
+}
+
+func (st *stableTest) buildFlags(name string) {
+	flags := flag.NewFlagSet(name, flag.ExitOnError)
+	flags.StringVar(&st.topic, "t", "stable-test", "topic name")
+	flags.StringVar(&st.nameSrv, "n", "", "nameserver address")
+	flags.StringVar(&st.groupID, "g", "stable-test", "group id")
+	flags.IntVar(&st.testMin, "m", 10, "test minutes")
+	flags.IntVar(&st.opIntervalSec, "s", 1, "operation interval[produce/consume]")
+
+	st.flags = flags
+}
+
+func (st *stableTest) checkFlag() error {
+	if st.topic == "" {
+		return errors.New("empty topic")
+	}
+
+	if st.nameSrv == "" {
+		return errors.New("empty namesrv")
+	}
+
+	if st.groupID == "" {
+		return errors.New("empty group id")
+	}
+
+	if st.testMin <= 0 {
+		return errors.New("test miniutes must be positive integer")
+	}
+
+	if st.opIntervalSec <= 0 {
+		return errors.New("operation interval must be positive integer")
+	}
+
+	return nil
+}
+
+func (st *stableTest) run() {
+	opTicker := time.NewTicker(time.Duration(st.opIntervalSec) * time.Second)
+	closeChan := time.Tick(time.Duration(st.testMin) * time.Minute)
+
+	signalChan := make(chan os.Signal, 1)
+	signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
+	for {
+		select {
+		case <-signalChan:
+			opTicker.Stop()
+			fmt.Println("test over")
+			return
+		case <-closeChan:
+			opTicker.Stop()
+			fmt.Println("test over")
+			return
+		case <-opTicker.C:
+			st.op()
+		}
+	}
+}
+
+type stableTestProducer struct {
+	*stableTest
+	bodySize int
+
+	p rocketmq.Producer
+}
+
+func (stp *stableTestProducer) buildFlags(name string) {
+	stp.stableTest.buildFlags(name)
+	stp.flags.IntVar(&stp.bodySize, "b", 32, "body size")
+}
+
+func (stp *stableTestProducer) checkFlag() error {
+	err := stp.stableTest.checkFlag()
+	if err != nil {
+		return err
+	}
+	if stp.bodySize <= 0 {
+		return errors.New("message body size must be positive integer")
+	}
+
+	return nil
+}
+
+func (stp *stableTestProducer) usage() {
+	stp.flags.Usage()
+}
+
+func (stp *stableTestProducer) run(args []string) {
+	err := stp.flags.Parse(args)
+	if err != nil {
+		fmt.Printf("parse args:%v, error:%s\n", args, err)
+		stp.usage()
+		return
+	}
+
+	err = stp.checkFlag()
+	if err != nil {
+		fmt.Println(err)
+		stp.usage()
+		return
+	}
+
+	p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{
+		ClientConfig: rocketmq.ClientConfig{GroupID: stp.groupID, NameServer: stp.nameSrv},
+	})
+	if err != nil {
+		fmt.Printf("new producer error:%s\n", err)
+		return
+	}
+
+	err = p.Start()
+	if err != nil {
+		fmt.Printf("start producer error:%s\n", err)
+		return
+	}
+	defer p.Shutdown()
+
+	stp.p = p
+	stp.stableTest.run()
+}
+
+func (stp *stableTestProducer) sendMessage() {
+	r, err := stp.p.SendMessageSync(&rocketmq.Message{Topic: stp.topic, Body: buildMsg(stp.bodySize)})
+	if err == nil {
+		fmt.Printf("send result:%+v\n", r)
+		return
+	}
+	fmt.Printf("send message error:%s", err)
+}
+
+type stableTestConsumer struct {
+	*stableTest
+	expression string
+
+	c       rocketmq.PullConsumer
+	offsets map[int]int64
+}
+
+func (stc *stableTestConsumer) buildFlags(name string) {
+	stc.stableTest.buildFlags(name)
+	stc.flags.StringVar(&stc.expression, "e", "*", "expression")
+}
+
+func (stc *stableTestConsumer) checkFlag() error {
+	err := stc.stableTest.checkFlag()
+	if err != nil {
+		return err
+	}
+
+	if stc.expression == "" {
+		return errors.New("empty expression")
+	}
+	return nil
+}
+
+func (stc *stableTestConsumer) usage() {
+	stc.flags.Usage()
+}
+
+func (stc *stableTestConsumer) run(args []string) {
+	err := stc.flags.Parse(args)
+	if err != nil {
+		fmt.Printf("parse args:%v, error:%s\n", args, err)
+		stc.usage()
+		return
+	}
+
+	err = stc.checkFlag()
+	if err != nil {
+		stc.usage()
+		fmt.Printf("%s\n", err)
+		return
+	}
+
+	c, err := rocketmq.NewPullConsumer(&rocketmq.PullConsumerConfig{
+		ClientConfig: rocketmq.ClientConfig{GroupID: stc.groupID, NameServer: stc.nameSrv},
+	})
+	if err != nil {
+		fmt.Printf("new pull consumer error:%s\n", err)
+		return
+	}
+
+	err = c.Start()
+	if err != nil {
+		fmt.Printf("start consumer error:%s\n", err)
+		return
+	}
+	defer c.Shutdown()
+
+	stc.c = c
+	stc.stableTest.run()
+}
+
+func (stc *stableTestConsumer) pullMessage() {
+	mqs := stc.c.FetchSubscriptionMessageQueues(stc.topic)
+
+	for _, mq := range mqs {
+		offset := stc.offsets[mq.ID]
+		pr := stc.c.Pull(mq, stc.expression, offset, 32)
+		fmt.Printf("pull from %s, offset:%d, count:%+v\n", mq.String(), offset, len(pr.Messages))
+
+		switch pr.Status {
+		case rocketmq.PullNoNewMsg:
+			stc.offsets[mq.ID] = 0 // pull from the begin
+		case rocketmq.PullFound:
+			fallthrough
+		case rocketmq.PullNoMatchedMsg:
+			fallthrough
+		case rocketmq.PullOffsetIllegal:
+			stc.offsets[mq.ID] = pr.NextBeginOffset
+		case rocketmq.PullBrokerTimeout:
+			fmt.Println("broker timeout occur")
+		}
+	}
+}
+
+func init() {
+	// producer
+	name := "stableTestProducer"
+	p := &stableTestProducer{stableTest: &stableTest{}}
+	p.buildFlags(name)
+	p.op = p.sendMessage
+	registerCommand(name, p)
+
+	// consumer
+	name = "stableTestConsumer"
+	c := &stableTestConsumer{stableTest: &stableTest{}, offsets: map[int]int64{}}
+	c.buildFlags(name)
+	c.op = c.pullMessage
+	registerCommand(name, c)
+}