Modify examples
diff --git a/core/cfuns.go b/core/consumer_callback.go
similarity index 100%
rename from core/cfuns.go
rename to core/consumer_callback.go
diff --git a/core/error.go b/core/error.go
index 8986660..f5871f7 100644
--- a/core/error.go
+++ b/core/error.go
@@ -34,10 +34,12 @@
ErrSendSyncFailed = rmqError(C.PRODUCER_SEND_SYNC_FAILED)
ErrSendOnewayFailed = rmqError(C.PRODUCER_SEND_ONEWAY_FAILED)
ErrSendOrderlyFailed = rmqError(C.PRODUCER_SEND_ORDERLY_FAILED)
- ErrPushConsumerStartFailed = rmqError(C.PUSHCONSUMER_ERROR_CODE_START)
+ ErrSendTransactionFailed = rmqError(C.PRODUCER_SEND_TRANSACTION_FAILED)
+ ErrPushConsumerStartFailed = rmqError(C.PUSHCONSUMER_START_FAILED)
ErrPullConsumerStartFailed = rmqError(C.PULLCONSUMER_START_FAILED)
ErrFetchMQFailed = rmqError(C.PULLCONSUMER_FETCH_MQ_FAILED)
ErrFetchMessageFailed = rmqError(C.PULLCONSUMER_FETCH_MESSAGE_FAILED)
+ ErrNotSupportNow = rmqError(C.NOT_SUPPORT_NOW)
)
func (e rmqError) Error() string {
@@ -54,6 +56,8 @@
return "send message with orderly failed"
case ErrSendOnewayFailed:
return "send message with oneway failed"
+ case ErrSendTransactionFailed:
+ return "send transaction message failed"
case ErrPushConsumerStartFailed:
return "start push-consumer failed"
case ErrPullConsumerStartFailed:
@@ -62,6 +66,8 @@
return "fetch MessageQueue failed"
case ErrFetchMessageFailed:
return "fetch Message failed"
+ case ErrNotSupportNow:
+ return "this function is not support"
default:
return fmt.Sprintf("unknow error: %v", int(e))
}
diff --git a/core/transaction_funcs.go b/core/transaction_callback.go
similarity index 100%
rename from core/transaction_funcs.go
rename to core/transaction_callback.go
diff --git a/demos/main.go b/demos/main.go
deleted file mode 100644
index 1e325e9..0000000
--- a/demos/main.go
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-func main() {
- //run producer
- main0()
- //run consumer
- main1()
- //run orderly producer
- main2()
- //run orderly consumer
- main3()
-}
diff --git a/demos/producer.go b/demos/producer.go
deleted file mode 100644
index ad08c12..0000000
--- a/demos/producer.go
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import (
- "fmt"
- "github.com/apache/rocketmq-client-go/core"
-)
-
-// Change to main if you want to run it directly
-func main0() {
- pConfig := &rocketmq.ProducerConfig{
- ClientConfig: rocketmq.ClientConfig{
- GroupID: "GID_XXXXXXXXXXXX",
- NameServer: "http://XXXXXXXXXXXXXXXXXX:80",
- Credentials: &rocketmq.SessionCredentials{
- AccessKey: "Your Access Key",
- SecretKey: "Your Secret Key",
- Channel: "ALIYUN/OtherChannel",
- },
- },
- //Set to Common Producer as default.
- ProducerModel: rocketmq.CommonProducer,
- }
- sendMessage(pConfig)
-}
-func sendMessage(config *rocketmq.ProducerConfig) {
- producer, err := rocketmq.NewProducer(config)
-
- if err != nil {
- fmt.Println("create common producer failed, error:", err)
- return
- }
-
- err = producer.Start()
- if err != nil {
- fmt.Println("start common producer error", err)
- return
- }
- defer producer.Shutdown()
-
- fmt.Printf("Common producer: %s started... \n", producer)
- for i := 0; i < 10; i++ {
- msg := fmt.Sprintf("%s-%d", "Hello,Common MQ Message-", i)
- result, err := producer.SendMessageSync(&rocketmq.Message{Topic: "YourTopicXXXXXXXX", Body: msg})
- if err != nil {
- fmt.Println("Error:", err)
- }
- fmt.Printf("send message: %s result: %s\n", msg, result)
- }
- fmt.Println("shutdown common producer.")
-}
diff --git a/demos/producer_orderly.go b/demos/producer_orderly.go
deleted file mode 100644
index e379a93..0000000
--- a/demos/producer_orderly.go
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import (
- "fmt"
- "github.com/apache/rocketmq-client-go/core"
- "time"
-)
-
-// Change to main if you want to run it directly
-func main2() {
- pConfig := &rocketmq.ProducerConfig{
- ClientConfig: rocketmq.ClientConfig{
- GroupID: "GID_XXXXXXXXXXXX",
- NameServer: "http://XXXXXXXXXXXXXXXXXX:80",
- Credentials: &rocketmq.SessionCredentials{
- AccessKey: "Your Access Key",
- SecretKey: "Your Secret Key",
- Channel: "ALIYUN/OtherChannel",
- },
- },
- ProducerModel: rocketmq.OrderlyProducer,
- }
- sendMessageOrderlyByShardingKey(pConfig)
-}
-func sendMessageOrderlyByShardingKey(config *rocketmq.ProducerConfig) {
- producer, err := rocketmq.NewProducer(config)
- if err != nil {
- fmt.Println("create Producer failed, error:", err)
- return
- }
-
- producer.Start()
- defer producer.Shutdown()
- for i := 0; i < 1000; i++ {
- msg := fmt.Sprintf("%s-%d", "Hello Lite Orderly Message", i)
- r, err := producer.SendMessageOrderlyByShardingKey(
- &rocketmq.Message{Topic: "YourOrderLyTopicXXXXXXXX", Body: msg}, "ShardingKey" /*orderID*/)
- if err != nil {
- println("Send Orderly Message Error:", err)
- }
- fmt.Printf("send orderly message result:%+v\n", r)
- time.Sleep(time.Duration(1) * time.Second)
- }
-
-}
diff --git a/demos/push_consumer.go b/demos/push_consumer.go
deleted file mode 100644
index 98a1ddf..0000000
--- a/demos/push_consumer.go
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import (
- "fmt"
- "github.com/apache/rocketmq-client-go/core"
- "sync/atomic"
-)
-
-// Change to main if you want to run it directly
-func main1() {
- pConfig := &rocketmq.PushConsumerConfig{
- ClientConfig: rocketmq.ClientConfig{
- GroupID: "GID_XXXXXXXXXXXX",
- NameServer: "http://XXXXXXXXXXXXXXXXXX:80",
- Credentials: &rocketmq.SessionCredentials{
- AccessKey: "Your Access Key",
- SecretKey: "Your Secret Key",
- Channel: "ALIYUN/OtherChannel",
- },
- },
- Model: rocketmq.Clustering,
- ConsumerModel: rocketmq.CoCurrently,
- }
- consumeWithPush(pConfig)
-}
-func consumeWithPush(config *rocketmq.PushConsumerConfig) {
-
- consumer, err := rocketmq.NewPushConsumer(config)
- if err != nil {
- println("create Consumer failed, error:", err)
- return
- }
-
- ch := make(chan interface{})
- var count = (int64)(1000000)
- // ********************************************
- // MUST subscribe topic before consumer started.
- // *********************************************
- consumer.Subscribe("YourTopicXXXXXXXX", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
- fmt.Printf("A message received, MessageID:%s, Body:%s \n", msg.MessageID, msg.Body)
- if atomic.AddInt64(&count, -1) <= 0 {
- ch <- "quit"
- }
- return rocketmq.ConsumeSuccess
- })
-
- err = consumer.Start()
- if err != nil {
- println("consumer start failed,", err)
- return
- }
-
- fmt.Printf("consumer: %s started...\n", consumer)
- <-ch
- err = consumer.Shutdown()
- if err != nil {
- println("consumer shutdown failed")
- return
- }
- println("consumer has shutdown.")
-}
diff --git a/examples/main.go b/examples/main.go
index b3018f0..1e325e9 100644
--- a/examples/main.go
+++ b/examples/main.go
@@ -17,62 +17,13 @@
package main
-import (
- "github.com/apache/rocketmq-client-go/core"
- "gopkg.in/alecthomas/kingpin.v2"
- "os"
-)
-
-var (
- rmq = kingpin.New("rocketmq", "RocketMQ cmd tools")
- namesrv = rmq.Flag("namesrv", "NameServer address.").Default("localhost:9876").Short('n').String()
- topic = rmq.Flag("topic", "topic name.").Short('t').Required().String()
- gid = rmq.Flag("groupId", "group Id").Short('g').Default("testGroup").String()
- amount = rmq.Flag("amount", "how many message to produce or consume").Default("64").Short('a').Int()
-
- produce = rmq.Command("produce", "send messages to RocketMQ")
- body = produce.Flag("body", "message body").Short('b').Required().String()
- workerCount = produce.Flag("workerCount", "works of send message with orderly").Default("1").Short('w').Int()
- orderly = produce.Flag("orderly", "send msg orderly").Short('o').Bool()
-
- consume = rmq.Command("consume", "consumes message from RocketMQ")
-)
-
func main() {
- switch kingpin.MustParse(rmq.Parse(os.Args[1:])) {
- case produce.FullCommand():
- pConfig := &rocketmq.ProducerConfig{ClientConfig: rocketmq.ClientConfig{
- GroupID: "MQ_INST_xxxxxxx%GID",
- NameServer: "http://xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx:80",
- Credentials: &rocketmq.SessionCredentials{
- AccessKey: "xxxxxx",
- SecretKey: "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
- Channel: "mq-channel",
- },
- LogC: &rocketmq.LogConfig{
- Path: "example",
- FileSize: 64 * 1 << 10,
- FileNum: 1,
- Level: rocketmq.LogLevelDebug,
- },
- }}
- if *orderly {
- sendMessageOrderly(pConfig)
- } else {
- sendMessage(pConfig)
- }
- case consume.FullCommand():
- cConfig := &rocketmq.PushConsumerConfig{ClientConfig: rocketmq.ClientConfig{
- GroupID: *gid,
- NameServer: *namesrv,
- LogC: &rocketmq.LogConfig{
- Path: "example",
- FileSize: 64 * 1 << 10,
- FileNum: 1,
- Level: rocketmq.LogLevelInfo,
- },
- }, Model: rocketmq.Clustering}
-
- consumeWithPush(cConfig)
- }
+ //run producer
+ main0()
+ //run consumer
+ main1()
+ //run orderly producer
+ main2()
+ //run orderly consumer
+ main3()
}
diff --git a/demos/orderly_push_consumer.go b/examples/orderly_push_consumer.go
similarity index 100%
rename from demos/orderly_push_consumer.go
rename to examples/orderly_push_consumer.go
diff --git a/examples/producer.go b/examples/producer.go
index e1c4d2f..ad08c12 100644
--- a/examples/producer.go
+++ b/examples/producer.go
@@ -22,29 +22,46 @@
"github.com/apache/rocketmq-client-go/core"
)
+// Change to main if you want to run it directly
+func main0() {
+ pConfig := &rocketmq.ProducerConfig{
+ ClientConfig: rocketmq.ClientConfig{
+ GroupID: "GID_XXXXXXXXXXXX",
+ NameServer: "http://XXXXXXXXXXXXXXXXXX:80",
+ Credentials: &rocketmq.SessionCredentials{
+ AccessKey: "Your Access Key",
+ SecretKey: "Your Secret Key",
+ Channel: "ALIYUN/OtherChannel",
+ },
+ },
+ //Set to Common Producer as default.
+ ProducerModel: rocketmq.CommonProducer,
+ }
+ sendMessage(pConfig)
+}
func sendMessage(config *rocketmq.ProducerConfig) {
producer, err := rocketmq.NewProducer(config)
if err != nil {
- fmt.Println("create Producer failed, error:", err)
+ fmt.Println("create common producer failed, error:", err)
return
}
err = producer.Start()
if err != nil {
- fmt.Println("start producer error", err)
+ fmt.Println("start common producer error", err)
return
}
defer producer.Shutdown()
- fmt.Printf("Producer: %s started... \n", producer)
- for i := 0; i < *amount; i++ {
- msg := fmt.Sprintf("%s-%d", *body, i)
- result, err := producer.SendMessageSync(&rocketmq.Message{Topic: *topic, Body: msg})
+ fmt.Printf("Common producer: %s started... \n", producer)
+ for i := 0; i < 10; i++ {
+ msg := fmt.Sprintf("%s-%d", "Hello,Common MQ Message-", i)
+ result, err := producer.SendMessageSync(&rocketmq.Message{Topic: "YourTopicXXXXXXXX", Body: msg})
if err != nil {
fmt.Println("Error:", err)
}
fmt.Printf("send message: %s result: %s\n", msg, result)
}
- fmt.Println("shutdown producer.")
+ fmt.Println("shutdown common producer.")
}
diff --git a/examples/producer_orderly.go b/examples/producer_orderly.go
index f88c3d5..e379a93 100644
--- a/examples/producer_orderly.go
+++ b/examples/producer_orderly.go
@@ -19,37 +19,27 @@
import (
"fmt"
- "sync"
- "sync/atomic"
-
"github.com/apache/rocketmq-client-go/core"
+ "time"
)
-type queueSelectorByOrderID struct{}
-
-func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg interface{}) int {
- return arg.(int) % size
-}
-
-type worker struct {
- p rocketmq.Producer
- leftMsgCount int64
-}
-
-func (w *worker) run() {
- selector := queueSelectorByOrderID{}
- for atomic.AddInt64(&w.leftMsgCount, -1) >= 0 {
- r, err := w.p.SendMessageOrderly(
- &rocketmq.Message{Topic: *topic, Body: *body}, selector, 7 /*orderID*/, 3,
- )
- if err != nil {
- println("Send Orderly Error:", err)
- }
- fmt.Printf("send orderly result:%+v\n", r)
+// Change to main if you want to run it directly
+func main2() {
+ pConfig := &rocketmq.ProducerConfig{
+ ClientConfig: rocketmq.ClientConfig{
+ GroupID: "GID_XXXXXXXXXXXX",
+ NameServer: "http://XXXXXXXXXXXXXXXXXX:80",
+ Credentials: &rocketmq.SessionCredentials{
+ AccessKey: "Your Access Key",
+ SecretKey: "Your Secret Key",
+ Channel: "ALIYUN/OtherChannel",
+ },
+ },
+ ProducerModel: rocketmq.OrderlyProducer,
}
+ sendMessageOrderlyByShardingKey(pConfig)
}
-
-func sendMessageOrderly(config *rocketmq.ProducerConfig) {
+func sendMessageOrderlyByShardingKey(config *rocketmq.ProducerConfig) {
producer, err := rocketmq.NewProducer(config)
if err != nil {
fmt.Println("create Producer failed, error:", err)
@@ -58,19 +48,15 @@
producer.Start()
defer producer.Shutdown()
-
- wg := sync.WaitGroup{}
- wg.Add(*workerCount)
-
- workers := make([]worker, *workerCount)
- for i := range workers {
- workers[i].p = producer
- workers[i].leftMsgCount = (int64)(*amount)
+ for i := 0; i < 1000; i++ {
+ msg := fmt.Sprintf("%s-%d", "Hello Lite Orderly Message", i)
+ r, err := producer.SendMessageOrderlyByShardingKey(
+ &rocketmq.Message{Topic: "YourOrderLyTopicXXXXXXXX", Body: msg}, "ShardingKey" /*orderID*/)
+ if err != nil {
+ println("Send Orderly Message Error:", err)
+ }
+ fmt.Printf("send orderly message result:%+v\n", r)
+ time.Sleep(time.Duration(1) * time.Second)
}
- for i := range workers {
- go func(w *worker) { w.run(); wg.Done() }(&workers[i])
- }
-
- wg.Wait()
}
diff --git a/examples/pull_consumer.go b/examples/pull_consumer.go
deleted file mode 100644
index de38048..0000000
--- a/examples/pull_consumer.go
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import (
- "fmt"
- "time"
-
- "github.com/apache/rocketmq-client-go/core"
-)
-
-func consumeWithPull(config *rocketmq.PullConsumerConfig, topic string) {
-
- consumer, err := rocketmq.NewPullConsumer(config)
- if err != nil {
- fmt.Printf("new pull consumer error:%s\n", err)
- return
- }
-
- err = consumer.Start()
- if err != nil {
- fmt.Printf("start consumer error:%s\n", err)
- return
- }
- defer consumer.Shutdown()
-
- mqs := consumer.FetchSubscriptionMessageQueues(topic)
- fmt.Printf("fetch subscription mqs:%+v\n", mqs)
-
- total, offsets, now := 0, map[int]int64{}, time.Now()
-
-PULL:
- for {
- for _, mq := range mqs {
- pr := consumer.Pull(mq, "*", offsets[mq.ID], 32)
- total += len(pr.Messages)
- fmt.Printf("pull %s, result:%+v\n", mq.String(), pr)
-
- switch pr.Status {
- case rocketmq.PullNoNewMsg:
- break PULL
- case rocketmq.PullFound:
- fallthrough
- case rocketmq.PullNoMatchedMsg:
- fallthrough
- case rocketmq.PullOffsetIllegal:
- offsets[mq.ID] = pr.NextBeginOffset
- case rocketmq.PullBrokerTimeout:
- fmt.Println("broker timeout occur")
- }
- }
- }
-
- var timePerMessage time.Duration
- if total > 0 {
- timePerMessage = time.Since(now) / time.Duration(total)
- }
- fmt.Printf("total message:%d, per message time:%d\n", total, timePerMessage)
-}
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
index 3f0e34a..98a1ddf 100644
--- a/examples/push_consumer.go
+++ b/examples/push_consumer.go
@@ -23,6 +23,23 @@
"sync/atomic"
)
+// Change to main if you want to run it directly
+func main1() {
+ pConfig := &rocketmq.PushConsumerConfig{
+ ClientConfig: rocketmq.ClientConfig{
+ GroupID: "GID_XXXXXXXXXXXX",
+ NameServer: "http://XXXXXXXXXXXXXXXXXX:80",
+ Credentials: &rocketmq.SessionCredentials{
+ AccessKey: "Your Access Key",
+ SecretKey: "Your Secret Key",
+ Channel: "ALIYUN/OtherChannel",
+ },
+ },
+ Model: rocketmq.Clustering,
+ ConsumerModel: rocketmq.CoCurrently,
+ }
+ consumeWithPush(pConfig)
+}
func consumeWithPush(config *rocketmq.PushConsumerConfig) {
consumer, err := rocketmq.NewPushConsumer(config)
@@ -32,10 +49,12 @@
}
ch := make(chan interface{})
- var count = (int64)(*amount)
+ var count = (int64)(1000000)
+ // ********************************************
// MUST subscribe topic before consumer started.
- consumer.Subscribe(*topic, "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
- fmt.Printf("A message received: \"%s\" \n", msg.Body)
+ // *********************************************
+ consumer.Subscribe("YourTopicXXXXXXXX", "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
+ fmt.Printf("A message received, MessageID:%s, Body:%s \n", msg.MessageID, msg.Body)
if atomic.AddInt64(&count, -1) <= 0 {
ch <- "quit"
}
diff --git a/demos/transaction_producer.go b/examples/transaction_producer.go
similarity index 100%
rename from demos/transaction_producer.go
rename to examples/transaction_producer.go