Stable test (#20)
Add a stable test;
Add license in the benchmark file.
diff --git a/benchmark/consumer.go b/benchmark/consumer.go
index 9e1cd77..1a893c9 100644
--- a/benchmark/consumer.go
+++ b/benchmark/consumer.go
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package main
import (
diff --git a/benchmark/main.go b/benchmark/main.go
index d268d72..080a948 100644
--- a/benchmark/main.go
+++ b/benchmark/main.go
@@ -1,9 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package main
import (
"fmt"
"os"
- "strings"
)
type command interface {
@@ -12,16 +28,9 @@
}
var (
- cmds = map[string]command{}
- longText = ""
- longTextLen = 0
+ cmds = map[string]command{}
)
-func init() {
- longText = strings.Repeat("0123456789", 100)
- longTextLen = len(longText)
-}
-
func registerCommand(name string, cmd command) {
if cmd == nil {
panic("empty command")
diff --git a/benchmark/message.go b/benchmark/message.go
new file mode 100644
index 0000000..d5690fe
--- /dev/null
+++ b/benchmark/message.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import "strings"
+
+var (
+ longText = ""
+ longTextLen = 0
+)
+
+func init() {
+ longText = strings.Repeat("0123456789", 100)
+ longTextLen = len(longText)
+}
+
+func buildMsg(size int) string {
+ return longText[:size]
+}
diff --git a/benchmark/producer.go b/benchmark/producer.go
index 7b21356..e183269 100644
--- a/benchmark/producer.go
+++ b/benchmark/producer.go
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package main
import (
@@ -126,10 +143,15 @@
}
now := time.Now()
- r := p.SendMessageSync(&rocketmq.Message{
- Topic: bp.topic, Body: longText[:bp.bodySize],
+ r, err := p.SendMessageSync(&rocketmq.Message{
+ Topic: bp.topic, Body: buildMsg(bp.bodySize),
})
+ if err != nil {
+ fmt.Printf("send message sync error:%s", err)
+ goto AGAIN
+ }
+
if r.Status == rocketmq.SendOK {
atomic.AddInt64(&stati.receiveResponseSuccessCount, 1)
atomic.AddInt64(&stati.sendRequestSuccessCount, 1)
@@ -249,7 +271,3 @@
func (bp *producer) usage() {
bp.flags.Usage()
}
-
-func (bp *producer) buildMsg() string {
- return longText[:bp.bodySize]
-}
diff --git a/benchmark/stable.go b/benchmark/stable.go
new file mode 100644
index 0000000..6c7a12c
--- /dev/null
+++ b/benchmark/stable.go
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "errors"
+ "flag"
+ "fmt"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ rocketmq "github.com/apache/rocketmq-client-go/core"
+)
+
+type stableTest struct {
+ nameSrv string
+ topic string
+ groupID string
+ opIntervalSec int
+ testMin int
+
+ op func()
+
+ flags *flag.FlagSet
+}
+
+func (st *stableTest) buildFlags(name string) {
+ flags := flag.NewFlagSet(name, flag.ExitOnError)
+ flags.StringVar(&st.topic, "t", "stable-test", "topic name")
+ flags.StringVar(&st.nameSrv, "n", "", "nameserver address")
+ flags.StringVar(&st.groupID, "g", "stable-test", "group id")
+ flags.IntVar(&st.testMin, "m", 10, "test minutes")
+ flags.IntVar(&st.opIntervalSec, "s", 1, "operation interval[produce/consume]")
+
+ st.flags = flags
+}
+
+func (st *stableTest) checkFlag() error {
+ if st.topic == "" {
+ return errors.New("empty topic")
+ }
+
+ if st.nameSrv == "" {
+ return errors.New("empty namesrv")
+ }
+
+ if st.groupID == "" {
+ return errors.New("empty group id")
+ }
+
+ if st.testMin <= 0 {
+ return errors.New("test miniutes must be positive integer")
+ }
+
+ if st.opIntervalSec <= 0 {
+ return errors.New("operation interval must be positive integer")
+ }
+
+ return nil
+}
+
+func (st *stableTest) run() {
+ opTicker := time.NewTicker(time.Duration(st.opIntervalSec) * time.Second)
+ closeChan := time.Tick(time.Duration(st.testMin) * time.Minute)
+
+ signalChan := make(chan os.Signal, 1)
+ signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
+ for {
+ select {
+ case <-signalChan:
+ opTicker.Stop()
+ fmt.Println("test over")
+ return
+ case <-closeChan:
+ opTicker.Stop()
+ fmt.Println("test over")
+ return
+ case <-opTicker.C:
+ st.op()
+ }
+ }
+}
+
+type stableTestProducer struct {
+ *stableTest
+ bodySize int
+
+ p rocketmq.Producer
+}
+
+func (stp *stableTestProducer) buildFlags(name string) {
+ stp.stableTest.buildFlags(name)
+ stp.flags.IntVar(&stp.bodySize, "b", 32, "body size")
+}
+
+func (stp *stableTestProducer) checkFlag() error {
+ err := stp.stableTest.checkFlag()
+ if err != nil {
+ return err
+ }
+ if stp.bodySize <= 0 {
+ return errors.New("message body size must be positive integer")
+ }
+
+ return nil
+}
+
+func (stp *stableTestProducer) usage() {
+ stp.flags.Usage()
+}
+
+func (stp *stableTestProducer) run(args []string) {
+ err := stp.flags.Parse(args)
+ if err != nil {
+ fmt.Printf("parse args:%v, error:%s\n", args, err)
+ stp.usage()
+ return
+ }
+
+ err = stp.checkFlag()
+ if err != nil {
+ fmt.Println(err)
+ stp.usage()
+ return
+ }
+
+ p, err := rocketmq.NewProducer(&rocketmq.ProducerConfig{
+ ClientConfig: rocketmq.ClientConfig{GroupID: stp.groupID, NameServer: stp.nameSrv},
+ })
+ if err != nil {
+ fmt.Printf("new producer error:%s\n", err)
+ return
+ }
+
+ err = p.Start()
+ if err != nil {
+ fmt.Printf("start producer error:%s\n", err)
+ return
+ }
+ defer p.Shutdown()
+
+ stp.p = p
+ stp.stableTest.run()
+}
+
+func (stp *stableTestProducer) sendMessage() {
+ r, err := stp.p.SendMessageSync(&rocketmq.Message{Topic: stp.topic, Body: buildMsg(stp.bodySize)})
+ if err == nil {
+ fmt.Printf("send result:%+v\n", r)
+ return
+ }
+ fmt.Printf("send message error:%s", err)
+}
+
+type stableTestConsumer struct {
+ *stableTest
+ expression string
+
+ c rocketmq.PullConsumer
+ offsets map[int]int64
+}
+
+func (stc *stableTestConsumer) buildFlags(name string) {
+ stc.stableTest.buildFlags(name)
+ stc.flags.StringVar(&stc.expression, "e", "*", "expression")
+}
+
+func (stc *stableTestConsumer) checkFlag() error {
+ err := stc.stableTest.checkFlag()
+ if err != nil {
+ return err
+ }
+
+ if stc.expression == "" {
+ return errors.New("empty expression")
+ }
+ return nil
+}
+
+func (stc *stableTestConsumer) usage() {
+ stc.flags.Usage()
+}
+
+func (stc *stableTestConsumer) run(args []string) {
+ err := stc.flags.Parse(args)
+ if err != nil {
+ fmt.Printf("parse args:%v, error:%s\n", args, err)
+ stc.usage()
+ return
+ }
+
+ err = stc.checkFlag()
+ if err != nil {
+ stc.usage()
+ fmt.Printf("%s\n", err)
+ return
+ }
+
+ c, err := rocketmq.NewPullConsumer(&rocketmq.PullConsumerConfig{
+ ClientConfig: rocketmq.ClientConfig{GroupID: stc.groupID, NameServer: stc.nameSrv},
+ })
+ if err != nil {
+ fmt.Printf("new pull consumer error:%s\n", err)
+ return
+ }
+
+ err = c.Start()
+ if err != nil {
+ fmt.Printf("start consumer error:%s\n", err)
+ return
+ }
+ defer c.Shutdown()
+
+ stc.c = c
+ stc.stableTest.run()
+}
+
+func (stc *stableTestConsumer) pullMessage() {
+ mqs := stc.c.FetchSubscriptionMessageQueues(stc.topic)
+
+ for _, mq := range mqs {
+ offset := stc.offsets[mq.ID]
+ pr := stc.c.Pull(mq, stc.expression, offset, 32)
+ fmt.Printf("pull from %s, offset:%d, count:%+v\n", mq.String(), offset, len(pr.Messages))
+
+ switch pr.Status {
+ case rocketmq.PullNoNewMsg:
+ stc.offsets[mq.ID] = 0 // pull from the begin
+ case rocketmq.PullFound:
+ fallthrough
+ case rocketmq.PullNoMatchedMsg:
+ fallthrough
+ case rocketmq.PullOffsetIllegal:
+ stc.offsets[mq.ID] = pr.NextBeginOffset
+ case rocketmq.PullBrokerTimeout:
+ fmt.Println("broker timeout occur")
+ }
+ }
+}
+
+func init() {
+ // producer
+ name := "stableTestProducer"
+ p := &stableTestProducer{stableTest: &stableTest{}}
+ p.buildFlags(name)
+ p.op = p.sendMessage
+ registerCommand(name, p)
+
+ // consumer
+ name = "stableTestConsumer"
+ c := &stableTestConsumer{stableTest: &stableTest{}, offsets: map[int]int64{}}
+ c.buildFlags(name)
+ c.op = c.pullMessage
+ registerCommand(name, c)
+}