[ISSUE #271] Fix bugs that can't consume multiple topics in single consumer (#310)

* fix issue 271
diff --git a/.travis.yml b/.travis.yml
index e68fd31..2562e74 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -19,9 +19,9 @@
 
 before_script:
   - cd ${TRAVIS_HOME}
-  - wget http://us.mirrors.quenda.co/apache/rocketmq/4.5.2/rocketmq-all-4.5.2-bin-release.zip
-  - unzip rocketmq-all-4.5.2-bin-release.zip
-  - cd rocketmq-all-4.5.2-bin-release
+  - wget http://us.mirrors.quenda.co/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-bin-release.zip
+  - unzip rocketmq-all-4.6.0-bin-release.zip
+  - cd rocketmq-all-4.6.0-bin-release
   - perl -i -pe's/-Xms8g -Xmx8g -Xmn4g/-Xms2g -Xmx2g -Xmn1g/g' bin/runbroker.sh
   - nohup sh bin/mqnamesrv &
   - nohup sh bin/mqbroker -n localhost:9876 &
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index aa999c5..3c5b79a 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -29,6 +29,7 @@
 
 	"github.com/apache/rocketmq-client-go/internal"
 	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/internal/utils"
 	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
 )
@@ -46,11 +47,20 @@
 	Mb = 1024 * 1024
 )
 
+type PushConsumerCallback struct {
+	topic string
+	f     func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)
+}
+
+func (callback PushConsumerCallback) UniqueID() string {
+	return callback.topic
+}
+
 type pushConsumer struct {
 	*defaultConsumer
 	queueFlowControlTimes        int
 	queueMaxSpanFlowControlTimes int
-	consume                      func(context.Context, ...*primitive.MessageExt) (ConsumeResult, error)
+	consumeFunc                  utils.Set
 	submitToConsume              func(*processQueue, *primitive.MessageQueue)
 	subscribedTopic              map[string]string
 	interceptor                  primitive.Interceptor
@@ -98,6 +108,7 @@
 		queueLock:       newQueueLock(),
 		lockTicker:      time.NewTicker(dc.option.RebalanceLockInterval),
 		done:            make(chan struct{}, 1),
+		consumeFunc:     utils.NewSet(),
 	}
 	dc.mqChanged = p.messageQueueChanged
 	if p.consumeOrderly {
@@ -111,7 +122,6 @@
 	return p, nil
 }
 
-// TODO: add shutdown on pushConsumer.
 func (pc *pushConsumer) Start() error {
 	var err error
 	pc.once.Do(func() {
@@ -123,13 +133,14 @@
 		pc.state = internal.StateStartFailed
 		pc.validate()
 
-		err := pc.client.RegisterConsumer(pc.consumerGroup, pc)
+		err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
 		if err != nil {
 			pc.state = internal.StateStartFailed
 			rlog.Error("the consumer group has been created, specify another one", map[string]interface{}{
 				rlog.LogKeyConsumerGroup: pc.consumerGroup,
 			})
 			err = ErrCreated
+			return
 		}
 
 		err = pc.defaultConsumer.start()
@@ -160,7 +171,6 @@
 
 		go func() {
 			// todo start clean msg expired
-			// TODO quit
 			for {
 				select {
 				case pr := <-pc.prCh:
@@ -177,6 +187,10 @@
 		}()
 	})
 
+	if err != nil {
+		return err
+	}
+
 	pc.client.UpdateTopicRouteInfo()
 	for k := range pc.subscribedTopic {
 		_, exist := pc.topicSubscribeInfoTable.Load(k)
@@ -224,7 +238,10 @@
 		pc.subscribedTopic[retryTopic] = ""
 	}
 
-	pc.consume = f
+	pc.consumeFunc.Add(&PushConsumerCallback{
+		f:     f,
+		topic: topic,
+	})
 	return nil
 }
 
@@ -752,14 +769,26 @@
 }
 
 func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.MessageExt) (ConsumeResult, error) {
+	if len(subMsgs) == 0 {
+		return ConsumeRetryLater, errors.New("msg list empty")
+	}
 
+	f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic)
+	if !exist {
+		return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic)
+	}
+
+	callback, ok := f.(*PushConsumerCallback)
+	if !ok {
+		return ConsumeRetryLater, fmt.Errorf("the consume callback assert failed for topic: %s", subMsgs[0].Topic)
+	}
 	if pc.interceptor == nil {
-		return pc.consume(ctx, subMsgs...)
+		return callback.f(ctx, subMsgs...)
 	} else {
 		var container ConsumeResultHolder
 		err := pc.interceptor(ctx, subMsgs, &container, func(ctx context.Context, req, reply interface{}) error {
 			msgs := req.([]*primitive.MessageExt)
-			r, e := pc.consume(ctx, msgs...)
+			r, e := callback.f(ctx, msgs...)
 
 			realReply := reply.(*ConsumeResultHolder)
 			realReply.ConsumeResult = r
diff --git a/internal/client.go b/internal/client.go
index f66e8b0..3336944 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -565,6 +565,13 @@
 }
 
 func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer) error {
+	_, exist := c.consumerMap.Load(group)
+	if exist {
+		rlog.Warning("the consumer group exist already", map[string]interface{}{
+			rlog.LogKeyConsumerGroup: group,
+		})
+		return fmt.Errorf("the consumer group exist already")
+	}
 	c.consumerMap.Store(group, consumer)
 	return nil
 }