Merge branch 'native' into v2.0.0-rc1
diff --git a/NOTICE b/NOTICE
index 85e2dc3..65ebdd0 100644
--- a/NOTICE
+++ b/NOTICE
@@ -1,5 +1,5 @@
Apache RocketMQ
-Copyright 2016-2019 The Apache Software Foundation
+Copyright 2016-2020 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
diff --git a/README.md b/README.md
index 2f7ba86..d4fdfa8 100644
--- a/README.md
+++ b/README.md
@@ -2,6 +2,10 @@
[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
[![TravisCI](https://travis-ci.org/apache/rocketmq-client-go.svg)](https://travis-ci.org/apache/rocketmq-client-go)
[![Coverage](https://codecov.io/gh/apache/rocketmq-client-go/branch/native/graph/badge.svg)](https://codecov.io/gh/apache/rocketmq-client-go/branch/native)
+[![GitHub release](https://img.shields.io/badge/release-download-default.svg)](https://github.com/apache/rocketmq-client-go/releases)
+[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-client-go.svg)](http://isitmaintained.com/project/apache/rocketmq-client-go "Average time to resolve an issue")
+[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-client-go.svg)](http://isitmaintained.com/project/apache/rocketmq-client-go "Percentage of issues still open")
+![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)
The RocketMQ Client in pure go, the project is developing, **there is no any guarantee in production environment**. in next versions,
we will do our best to improve reliability, stability, usability and performance. the API may be changed, and more features will be added.
@@ -9,14 +13,15 @@
optimization, documents, etc. so, any contribution is very welcome. if you want do something, please browse issue list and select one,
or create a new issue.
-- [2.0.0 Production Ready Road map](https://github.com/apache/rocketmq-client-go/v2/issues/57)
-- [The alpha1 feature list](https://github.com/apache/rocketmq-client-go/v2/issues/54)
+- [2.0.0 Production Ready Road map](https://github.com/apache/rocketmq-client-go/issues/57)
----------
## Features
-in 2.0.0-alpha1, support:
+in 2.0.0-rc1, support:
* sending message in synchronous mode
+* sending message in asynchronous mode
* sending message in oneway mode
+* sending orderly messages
* consuming message using push model
----------
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 25f7060..5a5d7d6 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -61,8 +61,8 @@
type ConsumeType string
const (
- _PullConsume = ConsumeType("pull")
- _PushConsume = ConsumeType("push")
+ _PullConsume = ConsumeType("CONSUME_ACTIVELY")
+ _PushConsume = ConsumeType("CONSUME_PASSIVELY")
_SubAll = "*"
)
@@ -268,6 +268,8 @@
prCh chan PullRequest
namesrv internal.Namesrvs
+
+ pullFromWhichNodeTable sync.Map
}
func (dc *defaultConsumer) start() error {
@@ -855,7 +857,7 @@
func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult, data *internal.SubscriptionData) {
- updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+ dc.updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
switch result.Status {
case primitive.PullFound:
@@ -1041,24 +1043,20 @@
}
func (dc *defaultConsumer) tryFindBroker(mq *primitive.MessageQueue) *internal.FindBrokerResult {
- result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
+ result := dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
if result != nil {
return result
}
dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
- return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
+ return dc.namesrv.FindBrokerAddressInSubscribe(mq.BrokerName, dc.recalculatePullFromWhichNode(mq), false)
}
-var (
- pullFromWhichNodeTable sync.Map
-)
-
-func updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
- pullFromWhichNodeTable.Store(*mq, brokerId)
+func (dc *defaultConsumer) updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
+ dc.pullFromWhichNodeTable.Store(*mq, brokerId)
}
-func recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
- v, exist := pullFromWhichNodeTable.Load(*mq)
+func (dc *defaultConsumer) recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
+ v, exist := dc.pullFromWhichNodeTable.Load(*mq)
if exist {
return v.(int64)
}