[ISSUE #301] Adding GET_CONSUMER_RUNNING_INFO (#311)

* Adding GET_CONSUMER_RUNNING_INFO impl

* add Marshal ConsumerRunningInfo Test Case
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 13cef4e..070cd19 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -236,12 +236,13 @@
 	 *
 	 * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
 	 */
-	consumerGroup  string
-	model          MessageModel
-	allocate       func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
-	unitMode       bool
-	consumeOrderly bool
-	fromWhere      ConsumeFromWhere
+	consumerGroup          string
+	model                  MessageModel
+	allocate               func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
+	unitMode               bool
+	consumeOrderly         bool
+	fromWhere              ConsumeFromWhere
+	consumerStartTimestamp int64
 
 	cType     ConsumeType
 	client    internal.RMQClient
@@ -250,7 +251,7 @@
 	pause     bool
 	once      sync.Once
 	option    consumerOptions
-	// key: int, hash(*primitive.MessageQueue)
+	// key: primitive.MessageQueue
 	// value: *processQueue
 	processQueueTable sync.Map
 
@@ -287,7 +288,7 @@
 	dc.client.UpdateTopicRouteInfo()
 	dc.client.Start()
 	dc.state = internal.StateRunning
-
+	dc.consumerStartTimestamp = time.Now().UnixNano() / int64(time.Millisecond)
 	return nil
 }
 
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 13b2d25..4f4dc0c 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -28,6 +28,7 @@
 	gods_util "github.com/emirpasic/gods/utils"
 	uatomic "go.uber.org/atomic"
 
+	"github.com/apache/rocketmq-client-go/internal"
 	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
 )
@@ -277,6 +278,28 @@
 	return -1
 }
 
+func (pq *processQueue) MinOrderlyCache() int64 {
+	if pq.consumingMsgOrderlyTreeMap.Empty() {
+		return -1
+	}
+	k, _ := pq.consumingMsgOrderlyTreeMap.Min()
+	if k != nil {
+		return k.(int64)
+	}
+	return -1
+}
+
+func (pq *processQueue) MaxOrderlyCache() int64 {
+	if pq.consumingMsgOrderlyTreeMap.Empty() {
+		return -1
+	}
+	k, _ := pq.consumingMsgOrderlyTreeMap.Max()
+	if k != nil {
+		return k.(int64)
+	}
+	return -1
+}
+
 func (pq *processQueue) clear() {
 	pq.mutex.Lock()
 	pq.msgCache.Clear()
@@ -302,3 +325,31 @@
 	pq.consumingMsgOrderlyTreeMap.Clear()
 	return offset + 1
 }
+
+func (pq *processQueue) currentInfo() internal.ProcessQueueInfo {
+	pq.mutex.RLock()
+	defer pq.mutex.RUnlock()
+	info := internal.ProcessQueueInfo{
+		Locked:               pq.locked.Load(),
+		TryUnlockTimes:       pq.tryUnlockTimes,
+		LastLockTimestamp:    pq.lastLockTime.UnixNano() / 10e6,
+		Dropped:              pq.dropped.Load(),
+		LastPullTimestamp:    pq.lastPullTime.UnixNano() / 10e6,
+		LastConsumeTimestamp: pq.lastConsumeTime.UnixNano() / 10e6,
+	}
+
+	if !pq.msgCache.Empty() {
+		info.CachedMsgMinOffset = pq.Min()
+		info.CachedMsgMaxOffset = pq.Max()
+		info.CachedMsgCount = pq.msgCache.Size()
+		info.CachedMsgSizeInMiB = pq.cachedMsgSize / int64(1024*1024)
+	}
+
+	if !pq.consumingMsgOrderlyTreeMap.Empty() {
+		info.TransactionMsgMinOffset = pq.MinOrderlyCache()
+		info.TransactionMsgMaxOffset = pq.MaxOrderlyCache()
+		info.TransactionMsgCount = pq.consumingMsgOrderlyTreeMap.Size()
+	}
+
+	return info
+}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3c5b79a..98b0033 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -273,6 +273,45 @@
 	return pc.unitMode
 }
 
+func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
+	info := internal.NewConsumerRunningInfo()
+
+	pc.subscriptionDataTable.Range(func(key, value interface{}) bool {
+		topic := key.(string)
+		info.SubscriptionData[value.(*internal.SubscriptionData)] = true
+		status := internal.ConsumeStatus{
+			PullRT:            getPullRT(topic, pc.consumerGroup).avgpt,
+			PullTPS:           getPullTPS(topic, pc.consumerGroup).tps,
+			ConsumeRT:         getConsumeRT(topic, pc.consumerGroup).avgpt,
+			ConsumeOKTPS:      getConsumeOKTPS(topic, pc.consumerGroup).tps,
+			ConsumeFailedTPS:  getConsumeFailedTPS(topic, pc.consumerGroup).tps,
+			ConsumeFailedMsgs: topicAndGroupConsumeFailedTPS.getStatsDataInHour(topic + "@" + pc.consumerGroup).sum,
+		}
+		info.StatusTable[topic] = status
+		return true
+	})
+
+	pc.processQueueTable.Range(func(key, value interface{}) bool {
+		mq := key.(primitive.MessageQueue)
+		pq := value.(*processQueue)
+		pInfo := pq.currentInfo()
+		pInfo.CommitOffset = pc.storage.read(&mq, _ReadMemoryThenStore)
+		info.MQTable[mq] = pInfo
+		return true
+	})
+
+	nsAddr := ""
+	for _, value := range pc.namesrv.AddrList() {
+		nsAddr += fmt.Sprintf("%s;", value)
+	}
+	info.Properties[internal.PropNameServerAddr] = nsAddr
+	info.Properties[internal.PropConsumeType] = string(pc.cType)
+	info.Properties[internal.PropConsumeOrderly] = strconv.FormatBool(pc.consumeOrderly)
+	info.Properties[internal.PropThreadPoolCoreSize] = "-1"
+	info.Properties[internal.PropConsumerStartTimestamp] = strconv.FormatInt(pc.consumerStartTimestamp, 10)
+	return info
+}
+
 func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) {
 	v, exit := pc.subscriptionDataTable.Load(topic)
 	if !exit {
diff --git a/internal/client.go b/internal/client.go
index 3336944..d9c5f6a 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -18,7 +18,6 @@
 package internal
 
 import (
-	"bytes"
 	"context"
 	"errors"
 	"fmt"
@@ -36,6 +35,7 @@
 )
 
 const (
+	clientVersion        = "v2.0.0-alpha3"
 	defaultTraceRegionID = "DefaultRegion"
 
 	// tracing message switch
@@ -83,6 +83,7 @@
 	SubscriptionDataList() []*SubscriptionData
 	Rebalance()
 	IsUnitMode() bool
+	GetConsumerRunningInfo() *ConsumerRunningInfo
 }
 
 func DefaultClientOptions() ClientOptions {
@@ -220,8 +221,30 @@
 
 		client.remoteClient.RegisterRequestFunc(ReqGetConsumerRunningInfo, func(req *remote.RemotingCommand, addr net.Addr) *remote.RemotingCommand {
 			rlog.Info("receive get consumer running info request...", nil)
+			header := new(GetConsumerRunningInfoHeader)
+			header.Decode(req.ExtFields)
+			val, exist := clientMap.Load(header.clientID)
 			res := remote.NewRemotingCommand(ResError, nil, nil)
-			res.Remark = "the go client has not supported consumer running info"
+			if !exist {
+				res.Remark = fmt.Sprintf("Can't find specified client instance of: %s", header.clientID)
+			} else {
+				cli, ok := val.(*rmqClient)
+				var runningInfo *ConsumerRunningInfo
+				if ok {
+					runningInfo = cli.getConsumerRunningInfo(header.consumerGroup)
+				}
+				if runningInfo != nil {
+					res.Code = ResSuccess
+					data, err := runningInfo.Encode()
+					if err != nil {
+						res.Remark = fmt.Sprintf("json marshal error: %s", err.Error())
+					} else {
+						res.Body = data
+					}
+				} else {
+					res.Remark = "there is unexpected error when get running info, please check log"
+				}
+			}
 			return res
 		})
 	}
@@ -659,6 +682,18 @@
 	return result
 }
 
+func (c *rmqClient) getConsumerRunningInfo(group string) *ConsumerRunningInfo {
+	consumer, exist := c.consumerMap.Load(group)
+	if !exist {
+		return nil
+	}
+	info := consumer.(InnerConsumer).GetConsumerRunningInfo()
+	if info != nil {
+		info.Properties[PropClientVersion] = clientVersion
+	}
+	return info
+}
+
 func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*primitive.MessageQueue {
 	list := make([]*primitive.MessageQueue, 0)
 	for idx := range data.QueueDataList {
@@ -676,16 +711,6 @@
 	return list
 }
 
-func encodeMessages(message []*primitive.Message) []byte {
-	var buffer bytes.Buffer
-	index := 0
-	for index < len(message) {
-		buffer.Write(message[index].Body)
-		index++
-	}
-	return buffer.Bytes()
-}
-
 func brokerVIPChannel(brokerAddr string) string {
 	if !_VIPChannelEnable {
 		return brokerAddr
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index 630d4cc..19a2181 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -52,26 +52,31 @@
 
 // AddBroker mocks base method
 func (m *MockNamesrvs) AddBroker(routeData *TopicRouteData) {
+	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "AddBroker", routeData)
 }
 
 // AddBroker indicates an expected call of AddBroker
 func (mr *MockNamesrvsMockRecorder) AddBroker(routeData interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddBroker", reflect.TypeOf((*MockNamesrvs)(nil).AddBroker), routeData)
 }
 
 // cleanOfflineBroker mocks base method
 func (m *MockNamesrvs) cleanOfflineBroker() {
+	m.ctrl.T.Helper()
 	m.ctrl.Call(m, "cleanOfflineBroker")
 }
 
 // cleanOfflineBroker indicates an expected call of cleanOfflineBroker
 func (mr *MockNamesrvsMockRecorder) cleanOfflineBroker() *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "cleanOfflineBroker", reflect.TypeOf((*MockNamesrvs)(nil).cleanOfflineBroker))
 }
 
 // UpdateTopicRouteInfo mocks base method
 func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData {
+	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "UpdateTopicRouteInfo", topic)
 	ret0, _ := ret[0].(*TopicRouteData)
 	return ret0
@@ -79,11 +84,13 @@
 
 // UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
 func (mr *MockNamesrvsMockRecorder) UpdateTopicRouteInfo(topic interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicRouteInfo", reflect.TypeOf((*MockNamesrvs)(nil).UpdateTopicRouteInfo), topic)
 }
 
 // FetchPublishMessageQueues mocks base method
 func (m *MockNamesrvs) FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
+	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FetchPublishMessageQueues", topic)
 	ret0, _ := ret[0].([]*primitive.MessageQueue)
 	ret1, _ := ret[1].(error)
@@ -92,11 +99,13 @@
 
 // FetchPublishMessageQueues indicates an expected call of FetchPublishMessageQueues
 func (mr *MockNamesrvsMockRecorder) FetchPublishMessageQueues(topic interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchPublishMessageQueues", reflect.TypeOf((*MockNamesrvs)(nil).FetchPublishMessageQueues), topic)
 }
 
 // FindBrokerAddrByTopic mocks base method
 func (m *MockNamesrvs) FindBrokerAddrByTopic(topic string) string {
+	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FindBrokerAddrByTopic", topic)
 	ret0, _ := ret[0].(string)
 	return ret0
@@ -104,11 +113,13 @@
 
 // FindBrokerAddrByTopic indicates an expected call of FindBrokerAddrByTopic
 func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByTopic(topic interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddrByTopic", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddrByTopic), topic)
 }
 
 // FindBrokerAddrByName mocks base method
 func (m *MockNamesrvs) FindBrokerAddrByName(brokerName string) string {
+	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FindBrokerAddrByName", brokerName)
 	ret0, _ := ret[0].(string)
 	return ret0
@@ -116,11 +127,13 @@
 
 // FindBrokerAddrByName indicates an expected call of FindBrokerAddrByName
 func (mr *MockNamesrvsMockRecorder) FindBrokerAddrByName(brokerName interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddrByName", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddrByName), brokerName)
 }
 
 // FindBrokerAddressInSubscribe mocks base method
 func (m *MockNamesrvs) FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult {
+	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FindBrokerAddressInSubscribe", brokerName, brokerId, onlyThisBroker)
 	ret0, _ := ret[0].(*FindBrokerResult)
 	return ret0
@@ -128,11 +141,13 @@
 
 // FindBrokerAddressInSubscribe indicates an expected call of FindBrokerAddressInSubscribe
 func (mr *MockNamesrvsMockRecorder) FindBrokerAddressInSubscribe(brokerName, brokerId, onlyThisBroker interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindBrokerAddressInSubscribe", reflect.TypeOf((*MockNamesrvs)(nil).FindBrokerAddressInSubscribe), brokerName, brokerId, onlyThisBroker)
 }
 
 // FetchSubscribeMessageQueues mocks base method
 func (m *MockNamesrvs) FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
+	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "FetchSubscribeMessageQueues", topic)
 	ret0, _ := ret[0].([]*primitive.MessageQueue)
 	ret1, _ := ret[1].(error)
@@ -141,5 +156,20 @@
 
 // FetchSubscribeMessageQueues indicates an expected call of FetchSubscribeMessageQueues
 func (mr *MockNamesrvsMockRecorder) FetchSubscribeMessageQueues(topic interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchSubscribeMessageQueues", reflect.TypeOf((*MockNamesrvs)(nil).FetchSubscribeMessageQueues), topic)
 }
+
+// AddrList mocks base method
+func (m *MockNamesrvs) AddrList() []string {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "AddrList")
+	ret0, _ := ret[0].([]string)
+	return ret0
+}
+
+// AddrList indicates an expected call of AddrList
+func (mr *MockNamesrvsMockRecorder) AddrList() *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddrList", reflect.TypeOf((*MockNamesrvs)(nil).AddrList))
+}
diff --git a/internal/model.go b/internal/model.go
index 8c95458..ea1fdac 100644
--- a/internal/model.go
+++ b/internal/model.go
@@ -18,9 +18,14 @@
 package internal
 
 import (
+	"bytes"
 	"encoding/json"
+	"fmt"
+	"sort"
+	"strings"
 
 	"github.com/apache/rocketmq-client-go/internal/utils"
+	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
 )
 
@@ -100,3 +105,158 @@
 	rlog.Debug("heartbeat: "+string(d), nil)
 	return d
 }
+
+const (
+	PropNameServerAddr         = "PROP_NAMESERVER_ADDR"
+	PropThreadPoolCoreSize     = "PROP_THREADPOOL_CORE_SIZE"
+	PropConsumeOrderly         = "PROP_CONSUMEORDERLY"
+	PropConsumeType            = "PROP_CONSUME_TYPE"
+	PropClientVersion          = "PROP_CLIENT_VERSION"
+	PropConsumerStartTimestamp = "PROP_CONSUMER_START_TIMESTAMP"
+)
+
+type ProcessQueueInfo struct {
+	CommitOffset            int64 `json:"commitOffset"`
+	CachedMsgMinOffset      int64 `json:"cachedMsgMinOffset"`
+	CachedMsgMaxOffset      int64 `json:"cachedMsgMaxOffset"`
+	CachedMsgCount          int   `json:"cachedMsgCount"`
+	CachedMsgSizeInMiB      int64 `json:"cachedMsgSizeInMiB"`
+	TransactionMsgMinOffset int64 `json:"transactionMsgMinOffset"`
+	TransactionMsgMaxOffset int64 `json:"transactionMsgMaxOffset"`
+	TransactionMsgCount     int   `json:"transactionMsgCount"`
+	Locked                  bool  `json:"locked"`
+	TryUnlockTimes          int64 `json:"tryUnlockTimes"`
+	LastLockTimestamp       int64 `json:"lastLockTimestamp"`
+	Dropped                 bool  `json:"dropped"`
+	LastPullTimestamp       int64 `json:"lastPullTimestamp"`
+	LastConsumeTimestamp    int64 `json:"lastConsumeTimestamp"`
+}
+
+type ConsumeStatus struct {
+	PullRT            float64 `json:"pullRT"`
+	PullTPS           float64 `json:"pullTPS"`
+	ConsumeRT         float64 `json:"consumeRT"`
+	ConsumeOKTPS      float64 `json:"consumeOKTPS"`
+	ConsumeFailedTPS  float64 `json:"consumeFailedTPS"`
+	ConsumeFailedMsgs int64   `json:"consumeFailedMsgs"`
+}
+
+type ConsumerRunningInfo struct {
+	Properties       map[string]string
+	SubscriptionData map[*SubscriptionData]bool
+	MQTable          map[primitive.MessageQueue]ProcessQueueInfo
+	StatusTable      map[string]ConsumeStatus
+}
+
+func (info ConsumerRunningInfo) Encode() ([]byte, error) {
+	data, err := json.Marshal(info.Properties)
+	if err != nil {
+		return nil, err
+	}
+	jsonData := fmt.Sprintf("{\"%s\":%s", "properties", string(data))
+
+	data, err = json.Marshal(info.StatusTable)
+	if err != nil {
+		return nil, err
+	}
+	jsonData = fmt.Sprintf("%s,\"%s\":%s", jsonData, "statusTable", string(data))
+
+	subs := make([]*SubscriptionData, len(info.SubscriptionData))
+	idx := 0
+	for k := range info.SubscriptionData {
+		subs[idx] = k
+		idx++
+	}
+
+	// make sure test case table
+	sort.Slice(subs, func(i, j int) bool {
+		sub1 := subs[i]
+		sub2 := subs[j]
+		if sub1.ClassFilterMode != sub2.ClassFilterMode {
+			return sub1.ClassFilterMode == false
+		}
+		com := strings.Compare(sub1.Topic, sub1.Topic)
+		if com != 0 {
+			return com > 0
+		}
+
+		com = strings.Compare(sub1.SubString, sub1.SubString)
+		if com != 0 {
+			return com > 0
+		}
+
+		if sub1.SubVersion != sub2.SubVersion {
+			return sub1.SubVersion > sub2.SubVersion
+		}
+
+		com = strings.Compare(sub1.ExpType, sub1.ExpType)
+		if com != 0 {
+			return com > 0
+		}
+
+		v1, _ := sub1.Tags.MarshalJSON()
+		v2, _ := sub2.Tags.MarshalJSON()
+		com = bytes.Compare(v1, v2)
+		if com != 0 {
+			return com > 0
+		}
+
+		v1, _ = sub1.Codes.MarshalJSON()
+		v2, _ = sub2.Codes.MarshalJSON()
+		com = bytes.Compare(v1, v2)
+		if com != 0 {
+			return com > 0
+		}
+		return true
+	})
+
+	data, err = json.Marshal(subs)
+	if err != nil {
+		return nil, err
+	}
+	jsonData = fmt.Sprintf("%s,\"%s\":%s", jsonData, "subscriptionSet", string(data))
+
+	tableJson := ""
+	keys := make([]primitive.MessageQueue, 0)
+
+	for k := range info.MQTable {
+		keys = append(keys, k)
+	}
+
+	sort.Slice(keys, func(i, j int) bool {
+		q1 := keys[i]
+		q2 := keys[j]
+		com := strings.Compare(q1.Topic, q2.Topic)
+		if com != 0 {
+			return com < 0
+		}
+
+		com = strings.Compare(q1.BrokerName, q2.BrokerName)
+		if com != 0 {
+			return com < 0
+		}
+
+		return q1.QueueId < q2.QueueId
+	})
+
+	for idx := range keys {
+		dataK, err := json.Marshal(keys[idx])
+		if err != nil {
+			return nil, err
+		}
+		dataV, err := json.Marshal(info.MQTable[keys[idx]])
+		tableJson = fmt.Sprintf("%s,%s:%s", tableJson, string(dataK), string(dataV))
+	}
+	tableJson = strings.TrimLeft(tableJson, ",")
+	jsonData = fmt.Sprintf("%s,\"%s\":%s}", jsonData, "mqTable", fmt.Sprintf("{%s}", tableJson))
+	return []byte(jsonData), nil
+}
+
+func NewConsumerRunningInfo() *ConsumerRunningInfo {
+	return &ConsumerRunningInfo{
+		Properties:       make(map[string]string),
+		SubscriptionData: make(map[*SubscriptionData]bool),
+		MQTable:          make(map[primitive.MessageQueue]ProcessQueueInfo),
+		StatusTable:      make(map[string]ConsumeStatus),
+	}
+}
diff --git a/internal/model_test.go b/internal/model_test.go
index 1dac4ec..56703e0 100644
--- a/internal/model_test.go
+++ b/internal/model_test.go
@@ -20,12 +20,14 @@
 import (
 	"encoding/json"
 	"fmt"
+	"strings"
 	"testing"
 
 	. "github.com/smartystreets/goconvey/convey"
-	"github.com/stretchr/testify/assert"
+	"github.com/tidwall/gjson"
 
 	"github.com/apache/rocketmq-client-go/internal/utils"
+	"github.com/apache/rocketmq-client-go/primitive"
 )
 
 func TestHeartbeatData(t *testing.T) {
@@ -43,7 +45,7 @@
 			set.Add(pData2)
 
 			v, err := json.Marshal(set)
-			assert.Nil(t, err)
+			So(err, ShouldBeNil)
 			fmt.Printf("json producer set: %s", string(v))
 		})
 
@@ -61,8 +63,7 @@
 			hbt.ProducerDatas.Add(p2)
 
 			v, err := json.Marshal(hbt)
-			//ShouldBeNil(t, err)
-			assert.Nil(t, err)
+			So(err, ShouldBeNil)
 			fmt.Printf("json producer: %s\n", string(v))
 		})
 
@@ -79,8 +80,7 @@
 			hbt.ConsumerDatas.Add(c2)
 
 			v, err := json.Marshal(hbt)
-			//ShouldBeNil(t, err)
-			assert.Nil(t, err)
+			So(err, ShouldBeNil)
 			fmt.Printf("json consumer: %s\n", string(v))
 		})
 
@@ -108,10 +108,257 @@
 			hbt.ConsumerDatas.Add(c2)
 
 			v, err := json.Marshal(hbt)
-			//ShouldBeNil(t, err)
-			assert.Nil(t, err)
+			So(err, ShouldBeNil)
 			fmt.Printf("json producer & consumer: %s\n", string(v))
 		})
 	})
 
 }
+
+func TestConsumerRunningInfo_MarshalJSON(t *testing.T) {
+	Convey("test ConsumerRunningInfo MarshalJson", t, func() {
+		props := map[string]string{
+			"maxReconsumeTimes":             "-1",
+			"unitMode":                      "false",
+			"adjustThreadPoolNumsThreshold": "100000",
+			"consumerGroup":                 "mq-client-go-test%GID_GO_TEST",
+			"messageModel":                  "CLUSTERING",
+			"suspendCurrentQueueTimeMillis": "1000",
+			"pullThresholdSizeForTopic":     "-1",
+			"pullThresholdSizeForQueue":     "100",
+			"PROP_CLIENT_VERSION":           "V4_5_1",
+			"consumeConcurrentlyMaxSpan":    "2000",
+			"postSubscriptionWhenPull":      "false",
+			"consumeTimestamp":              "20191127013617",
+			"PROP_CONSUME_TYPE":             "CONSUME_PASSIVELY",
+			"consumeTimeout":                "15",
+			"consumeMessageBatchMaxSize":    "1",
+			"PROP_THREADPOOL_CORE_SIZE":     "20",
+			"pullInterval":                  "0",
+			"pullThresholdForQueue":         "1000",
+			"pullThresholdForTopic":         "-1",
+			"consumeFromWhere":              "CONSUME_FROM_FIRST_OFFSET",
+			"PROP_NAMESERVER_ADDR":          "mq-client-go-test.mq-internet-access.mq-internet.aliyuncs.com:80;",
+			"pullBatchSize":                 "32",
+			"consumeThreadMin":              "20",
+			"PROP_CONSUMER_START_TIMESTAMP": "1574791577504",
+			"consumeThreadMax":              "20",
+			"subscription":                  "{}",
+			"PROP_CONSUMEORDERLY":           "false",
+		}
+		subData := map[*SubscriptionData]bool{
+			&SubscriptionData{
+				ClassFilterMode: false,
+				Codes:           utils.NewSet(),
+				ExpType:         "TAG",
+				SubString:       "*",
+				SubVersion:      1574791579242,
+				Tags:            utils.NewSet(),
+				Topic:           "%RETRY%mq-client-go-test%GID_GO_TEST",
+			}: true,
+			&SubscriptionData{
+				ClassFilterMode: true,
+				Codes:           utils.NewSet(),
+				ExpType:         "TAG",
+				SubString:       "*",
+				SubVersion:      1574791577523,
+				Tags:            utils.NewSet(),
+				Topic:           "mq-client-go-test%go-test",
+			}: true,
+		}
+		statusTable := map[string]ConsumeStatus{
+			"%RETRY%mq-client-go-test%GID_GO_TEST": {
+				PullRT:            11.11,
+				PullTPS:           22.22,
+				ConsumeRT:         33.33,
+				ConsumeOKTPS:      44.44,
+				ConsumeFailedTPS:  55.55,
+				ConsumeFailedMsgs: 666,
+			},
+			"mq-client-go-test%go-test": {
+				PullRT:            123,
+				PullTPS:           123,
+				ConsumeRT:         123,
+				ConsumeOKTPS:      123,
+				ConsumeFailedTPS:  123,
+				ConsumeFailedMsgs: 1234,
+			},
+		}
+		mqTable := map[primitive.MessageQueue]ProcessQueueInfo{
+			{
+				Topic:      "%RETRY%mq-client-go-test%GID_GO_TEST",
+				BrokerName: "qd7internet-01",
+				QueueId:    0,
+			}: {
+				CommitOffset:            0,
+				CachedMsgMinOffset:      0,
+				CachedMsgMaxOffset:      0,
+				CachedMsgCount:          0,
+				CachedMsgSizeInMiB:      0,
+				TransactionMsgMinOffset: 0,
+				TransactionMsgMaxOffset: 0,
+				TransactionMsgCount:     0,
+				Locked:                  false,
+				TryUnlockTimes:          0,
+				LastLockTimestamp:       1574791579221,
+				Dropped:                 false,
+				LastPullTimestamp:       1574791579242,
+				LastConsumeTimestamp:    1574791579221,
+			},
+			{
+				Topic:      "%RETRY%mq-client-go-test%GID_GO_TEST",
+				BrokerName: "qd7internet-01",
+				QueueId:    1,
+			}: {
+				CommitOffset:            1,
+				CachedMsgMinOffset:      2,
+				CachedMsgMaxOffset:      3,
+				CachedMsgCount:          4,
+				CachedMsgSizeInMiB:      5,
+				TransactionMsgMinOffset: 6,
+				TransactionMsgMaxOffset: 7,
+				TransactionMsgCount:     8,
+				Locked:                  true,
+				TryUnlockTimes:          9,
+				LastLockTimestamp:       1574791579221,
+				Dropped:                 false,
+				LastPullTimestamp:       1574791579242,
+				LastConsumeTimestamp:    1574791579221,
+			},
+		}
+		info := ConsumerRunningInfo{
+			Properties:       props,
+			SubscriptionData: subData,
+			StatusTable:      statusTable,
+			MQTable:          mqTable,
+		}
+		data, err := info.Encode()
+		So(err, ShouldBeNil)
+		result := gjson.ParseBytes(data)
+		Convey("test Properties fields", func() {
+			r1 := result.Get("properties")
+			So(r1.Exists(), ShouldBeTrue)
+			m := r1.Map()
+			So(len(m), ShouldEqual, 27)
+
+			So(m["PROP_CLIENT_VERSION"], ShouldNotBeEmpty)
+			So(m["PROP_CLIENT_VERSION"].String(), ShouldEqual, "V4_5_1")
+
+			So(m["PROP_CONSUME_TYPE"], ShouldNotBeNil)
+			So(m["PROP_CONSUME_TYPE"].String(), ShouldEqual, "CONSUME_PASSIVELY")
+
+			So(m["PROP_THREADPOOL_CORE_SIZE"], ShouldNotBeNil)
+			So(m["PROP_THREADPOOL_CORE_SIZE"].String(), ShouldEqual, "20")
+
+			So(m["PROP_NAMESERVER_ADDR"], ShouldNotBeNil)
+			So(m["PROP_NAMESERVER_ADDR"].String(), ShouldEqual, "mq-client-go-test.mq-internet-access.mq-internet.aliyuncs.com:80;")
+
+			So(m["PROP_CONSUMER_START_TIMESTAMP"], ShouldNotBeNil)
+			So(m["PROP_CONSUMER_START_TIMESTAMP"].String(), ShouldEqual, "1574791577504")
+
+			So(m["PROP_CONSUMEORDERLY"], ShouldNotBeNil)
+			So(m["PROP_CONSUMEORDERLY"].String(), ShouldEqual, "false")
+		})
+		Convey("test SubscriptionData fields", func() {
+			r2 := result.Get("subscriptionSet")
+			So(r2.Exists(), ShouldBeTrue)
+			arr := r2.Array()
+			So(len(arr), ShouldEqual, 2)
+
+			m1 := arr[0].Map()
+			So(len(m1), ShouldEqual, 7)
+			So(m1["classFilterMode"].Bool(), ShouldEqual, false)
+			So(len(m1["codes"].Array()), ShouldEqual, 0)
+			So(m1["expressionType"].String(), ShouldEqual, "TAG")
+			So(m1["subString"].String(), ShouldEqual, "*")
+			So(m1["subVersion"].Int(), ShouldEqual, 1574791579242)
+			So(len(m1["tags"].Array()), ShouldEqual, 0)
+			So(m1["topic"].String(), ShouldEqual, "%RETRY%mq-client-go-test%GID_GO_TEST")
+
+			m2 := arr[1].Map()
+			So(len(m2), ShouldEqual, 7)
+			So(m2["classFilterMode"].Bool(), ShouldEqual, true)
+			So(len(m2["codes"].Array()), ShouldEqual, 0)
+			So(m2["expressionType"].String(), ShouldEqual, "TAG")
+			So(m2["subString"].String(), ShouldEqual, "*")
+			So(m2["subVersion"].Int(), ShouldEqual, 1574791577523)
+			So(len(m2["tags"].Array()), ShouldEqual, 0)
+			So(m2["topic"].String(), ShouldEqual, "mq-client-go-test%go-test")
+		})
+		Convey("test StatusTable fields", func() {
+			r3 := result.Get("statusTable")
+			So(r3.Exists(), ShouldBeTrue)
+			m := r3.Map()
+			So(len(m), ShouldEqual, 2)
+
+			status1 := m["mq-client-go-test%go-test"].Map()
+			So(len(status1), ShouldEqual, 6)
+			So(status1["pullRT"].Float(), ShouldEqual, 123)
+			So(status1["pullTPS"].Float(), ShouldEqual, 123)
+			So(status1["consumeRT"].Float(), ShouldEqual, 123)
+			So(status1["consumeOKTPS"].Float(), ShouldEqual, 123)
+			So(status1["consumeFailedTPS"].Float(), ShouldEqual, 123)
+			So(status1["consumeFailedMsgs"].Int(), ShouldEqual, 1234)
+
+			status2 := m["%RETRY%mq-client-go-test%GID_GO_TEST"].Map()
+			So(len(status2), ShouldEqual, 6)
+			So(status2["pullRT"].Float(), ShouldEqual, 11.11)
+			So(status2["pullTPS"].Float(), ShouldEqual, 22.22)
+			So(status2["consumeRT"].Float(), ShouldEqual, 33.33)
+			So(status2["consumeOKTPS"].Float(), ShouldEqual, 44.44)
+			So(status2["consumeFailedTPS"].Float(), ShouldEqual, 55.55)
+			So(status2["consumeFailedMsgs"].Int(), ShouldEqual, 666)
+		})
+		Convey("test MQTable fields", func() {
+			r4 := result.Get("mqTable")
+			So(r4.Exists(), ShouldBeTrue)
+			objNumbers := strings.Split(r4.String(), "},{")
+			So(len(objNumbers), ShouldEqual, 2)
+
+			obj1Str := objNumbers[0][1:len(objNumbers[0])] + "}"
+			obj1KV := strings.Split(obj1Str, "}:{")
+			So(len(obj1KV), ShouldEqual, 2)
+
+			obj1 := gjson.Parse("{" + obj1KV[1][0:len(obj1KV[1])])
+			So(obj1.Exists(), ShouldBeTrue)
+			obj1M := obj1.Map()
+			So(len(obj1M), ShouldEqual, 14)
+			So(obj1M["commitOffset"].Int(), ShouldEqual, 0)
+			So(obj1M["cachedMsgMinOffset"].Int(), ShouldEqual, 0)
+			So(obj1M["cachedMsgMaxOffset"].Int(), ShouldEqual, 0)
+			So(obj1M["cachedMsgCount"].Int(), ShouldEqual, 0)
+			So(obj1M["cachedMsgSizeInMiB"].Int(), ShouldEqual, 0)
+			So(obj1M["transactionMsgMinOffset"].Int(), ShouldEqual, 0)
+			So(obj1M["transactionMsgMaxOffset"].Int(), ShouldEqual, 0)
+			So(obj1M["transactionMsgCount"].Int(), ShouldEqual, 0)
+			So(obj1M["locked"].Bool(), ShouldEqual, false)
+			So(obj1M["tryUnlockTimes"].Int(), ShouldEqual, 0)
+			So(obj1M["lastLockTimestamp"].Int(), ShouldEqual, 1574791579221)
+			So(obj1M["dropped"].Bool(), ShouldEqual, false)
+			So(obj1M["lastPullTimestamp"].Int(), ShouldEqual, 1574791579242)
+			So(obj1M["lastConsumeTimestamp"].Int(), ShouldEqual, 1574791579221)
+
+			obj2Str := "{" + objNumbers[1][0:len(objNumbers[1])-1]
+			obj2KV := strings.Split(obj2Str, "}:{")
+			So(len(obj2KV), ShouldEqual, 2)
+			obj2 := gjson.Parse("{" + obj2KV[1][0:len(obj2KV[1])])
+			So(obj2.Exists(), ShouldBeTrue)
+			obj2M := obj2.Map()
+			So(len(obj2M), ShouldEqual, 14)
+			So(obj2M["commitOffset"].Int(), ShouldEqual, 1)
+			So(obj2M["cachedMsgMinOffset"].Int(), ShouldEqual, 2)
+			So(obj2M["cachedMsgMaxOffset"].Int(), ShouldEqual, 3)
+			So(obj2M["cachedMsgCount"].Int(), ShouldEqual, 4)
+			So(obj2M["cachedMsgSizeInMiB"].Int(), ShouldEqual, 5)
+			So(obj2M["transactionMsgMinOffset"].Int(), ShouldEqual, 6)
+			So(obj2M["transactionMsgMaxOffset"].Int(), ShouldEqual, 7)
+			So(obj2M["transactionMsgCount"].Int(), ShouldEqual, 8)
+			So(obj2M["locked"].Bool(), ShouldEqual, true)
+			So(obj2M["tryUnlockTimes"].Int(), ShouldEqual, 9)
+			So(obj2M["lastLockTimestamp"].Int(), ShouldEqual, 1574791579221)
+			So(obj2M["dropped"].Bool(), ShouldEqual, false)
+			So(obj2M["lastPullTimestamp"].Int(), ShouldEqual, 1574791579242)
+			So(obj2M["lastConsumeTimestamp"].Int(), ShouldEqual, 1574791579221)
+		})
+	})
+}
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 3c20968..b9f1744 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -52,6 +52,8 @@
 	FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult
 
 	FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error)
+
+	AddrList() []string
 }
 
 // namesrvs rocketmq namesrv instance.
@@ -119,3 +121,7 @@
 func (s *namesrvs) SetCredentials(credentials primitive.Credentials) {
 	s.nameSrvClient.RegisterInterceptor(remote.ACLInterceptor(credentials))
 }
+
+func (s *namesrvs) AddrList() []string {
+	return s.srvs
+}
diff --git a/internal/request.go b/internal/request.go
index 76d0d7d..470fd35 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -78,10 +78,6 @@
 	return maps
 }
 
-func (request *SendMessageRequestHeader) Decode(properties map[string]string) error {
-	return nil
-}
-
 type EndTransactionRequestHeader struct {
 	ProducerGroup        string
 	TranStateTableOffset int64
@@ -123,23 +119,23 @@
 	return maps
 }
 
-func (request *CheckTransactionStateRequestHeader) Decode(ext map[string]string) {
-	if len(ext) == 0 {
+func (request *CheckTransactionStateRequestHeader) Decode(properties map[string]string) {
+	if len(properties) == 0 {
 		return
 	}
-	if v, existed := ext["tranStateTableOffset"]; existed {
+	if v, existed := properties["tranStateTableOffset"]; existed {
 		request.TranStateTableOffset, _ = strconv.ParseInt(v, 10, 0)
 	}
-	if v, existed := ext["commitLogOffset"]; existed {
+	if v, existed := properties["commitLogOffset"]; existed {
 		request.CommitLogOffset, _ = strconv.ParseInt(v, 10, 0)
 	}
-	if v, existed := ext["msgId"]; existed {
+	if v, existed := properties["msgId"]; existed {
 		request.MsgId = v
 	}
-	if v, existed := ext["transactionId"]; existed {
+	if v, existed := properties["transactionId"]; existed {
 		request.MsgId = v
 	}
-	if v, existed := ext["offsetMsgId"]; existed {
+	if v, existed := properties["offsetMsgId"]; existed {
 		request.MsgId = v
 	}
 }
@@ -273,6 +269,27 @@
 	return maps
 }
 
-func (request *GetRouteInfoRequestHeader) Decode(properties map[string]string) error {
-	return nil
+type GetConsumerRunningInfoHeader struct {
+	consumerGroup string
+	clientID      string
+}
+
+func (request *GetConsumerRunningInfoHeader) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["consumerGroup"] = request.consumerGroup
+	maps["clientId"] = request.clientID
+	return maps
+}
+
+func (request *GetConsumerRunningInfoHeader) Decode(properties map[string]string) {
+	if len(properties) == 0 {
+		return
+	}
+	if v, existed := properties["consumerGroup"]; existed {
+		request.consumerGroup = v
+	}
+
+	if v, existed := properties["clientId"]; existed {
+		request.clientID = v
+	}
 }