Merge pull request #517 from murDDock/fix_PullOffsetIllegal
[ISSUE #520]fix(pushConsumer):fix PullOffsetIllegal delete processQueue error
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 3129b8d..0a2f6d0 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -754,7 +754,7 @@
case ConsumeFromLastOffset:
if lastOffset == -1 {
if strings.HasPrefix(mq.Topic, internal.RetryGroupTopicPrefix) {
- lastOffset = 0
+ result = 0
} else {
lastOffset, err := dc.queryMaxOffset(mq)
if err == nil {
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 0523f08..3a54194 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -81,6 +81,7 @@
return nil, errors.Wrap(err, "new Namesrv failed.")
}
+ defaultOpts.Namesrv = srvs
dc := &defaultConsumer{
client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
consumerGroup: defaultOpts.GroupName,
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 6d8ff2b..371b431 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -120,7 +120,7 @@
s.lock.Lock()
defer s.lock.Unlock()
- addr := s.srvs[s.index]
+ addr := s.srvs[s.index%len(s.srvs)]
index := s.index + 1
if index < 0 {
index = -index